1#!/usr/bin/env python3 2# 3# Low-level QEMU shell on top of QMP. 4# 5# Copyright (C) 2009, 2010 Red Hat Inc. 6# 7# Authors: 8# Luiz Capitulino <lcapitulino@redhat.com> 9# 10# This work is licensed under the terms of the GNU GPL, version 2. See 11# the COPYING file in the top-level directory. 12# 13# Usage: 14# 15# Start QEMU with: 16# 17# # qemu [...] -qmp unix:./qmp-sock,server 18# 19# Run the shell: 20# 21# $ qmp-shell ./qmp-sock 22# 23# Commands have the following format: 24# 25# < command-name > [ arg-name1=arg1 ] ... [ arg-nameN=argN ] 26# 27# For example: 28# 29# (QEMU) device_add driver=e1000 id=net1 30# {u'return': {}} 31# (QEMU) 32# 33# key=value pairs also support Python or JSON object literal subset notations, 34# without spaces. Dictionaries/objects {} are supported as are arrays []. 35# 36# example-command arg-name1={'key':'value','obj'={'prop':"value"}} 37# 38# Both JSON and Python formatting should work, including both styles of 39# string literal quotes. Both paradigms of literal values should work, 40# including null/true/false for JSON and None/True/False for Python. 41# 42# 43# Transactions have the following multi-line format: 44# 45# transaction( 46# action-name1 [ arg-name1=arg1 ] ... [arg-nameN=argN ] 47# ... 48# action-nameN [ arg-name1=arg1 ] ... [arg-nameN=argN ] 49# ) 50# 51# One line transactions are also supported: 52# 53# transaction( action-name1 ... ) 54# 55# For example: 56# 57# (QEMU) transaction( 58# TRANS> block-dirty-bitmap-add node=drive0 name=bitmap1 59# TRANS> block-dirty-bitmap-clear node=drive0 name=bitmap0 60# TRANS> ) 61# {"return": {}} 62# (QEMU) 63# 64# Use the -v and -p options to activate the verbose and pretty-print options, 65# which will echo back the properly formatted JSON-compliant QMP that is being 66# sent to QEMU, which is useful for debugging and documentation generation. 67import argparse 68import ast 69import atexit 70import json 71import os 72import re 73import readline 74import sys 75from typing import ( 76 Iterator, 77 List, 78 NoReturn, 79 Optional, 80 Sequence, 81) 82 83 84sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', 'python')) 85from qemu import qmp 86from qemu.qmp import QMPMessage 87 88 89class QMPCompleter: 90 # NB: Python 3.9+ will probably allow us to subclass list[str] directly, 91 # but pylint as of today does not know that List[str] is simply 'list'. 92 def __init__(self) -> None: 93 self._matches: List[str] = [] 94 95 def append(self, value: str) -> None: 96 return self._matches.append(value) 97 98 def complete(self, text: str, state: int) -> Optional[str]: 99 for cmd in self._matches: 100 if cmd.startswith(text): 101 if state == 0: 102 return cmd 103 state -= 1 104 return None 105 106 107class QMPShellError(Exception): 108 pass 109 110 111class FuzzyJSON(ast.NodeTransformer): 112 """ 113 This extension of ast.NodeTransformer filters literal "true/false/null" 114 values in a Python AST and replaces them by proper "True/False/None" values 115 that Python can properly evaluate. 116 """ 117 118 @classmethod 119 def visit_Name(cls, # pylint: disable=invalid-name 120 node: ast.Name) -> ast.AST: 121 if node.id == 'true': 122 return ast.Constant(value=True) 123 if node.id == 'false': 124 return ast.Constant(value=False) 125 if node.id == 'null': 126 return ast.Constant(value=None) 127 return node 128 129 130# TODO: QMPShell's interface is a bit ugly (eg. _fill_completion() and 131# _execute_cmd()). Let's design a better one. 132class QMPShell(qmp.QEMUMonitorProtocol): 133 def __init__(self, address: qmp.SocketAddrT, 134 pretty: bool = False, verbose: bool = False): 135 super().__init__(address) 136 self._greeting: Optional[QMPMessage] = None 137 self._completer = QMPCompleter() 138 self._transmode = False 139 self._actions: List[QMPMessage] = [] 140 self._histfile = os.path.join(os.path.expanduser('~'), 141 '.qmp-shell_history') 142 self.pretty = pretty 143 self.verbose = verbose 144 145 def _fill_completion(self) -> None: 146 cmds = self.cmd('query-commands') 147 if 'error' in cmds: 148 return 149 for cmd in cmds['return']: 150 self._completer.append(cmd['name']) 151 152 def __completer_setup(self) -> None: 153 self._completer = QMPCompleter() 154 self._fill_completion() 155 readline.set_history_length(1024) 156 readline.set_completer(self._completer.complete) 157 readline.parse_and_bind("tab: complete") 158 # NB: default delimiters conflict with some command names 159 # (eg. query-), clearing everything as it doesn't seem to matter 160 readline.set_completer_delims('') 161 try: 162 readline.read_history_file(self._histfile) 163 except FileNotFoundError: 164 pass 165 except IOError as err: 166 print(f"Failed to read history '{self._histfile}': {err!s}") 167 atexit.register(self.__save_history) 168 169 def __save_history(self) -> None: 170 try: 171 readline.write_history_file(self._histfile) 172 except IOError as err: 173 print(f"Failed to save history file '{self._histfile}': {err!s}") 174 175 @classmethod 176 def __parse_value(cls, val: str) -> object: 177 try: 178 return int(val) 179 except ValueError: 180 pass 181 182 if val.lower() == 'true': 183 return True 184 if val.lower() == 'false': 185 return False 186 if val.startswith(('{', '[')): 187 # Try first as pure JSON: 188 try: 189 return json.loads(val) 190 except ValueError: 191 pass 192 # Try once again as FuzzyJSON: 193 try: 194 tree = ast.parse(val, mode='eval') 195 transformed = FuzzyJSON().visit(tree) 196 return ast.literal_eval(transformed) 197 except (SyntaxError, ValueError): 198 pass 199 return val 200 201 def __cli_expr(self, 202 tokens: Sequence[str], 203 parent: qmp.QMPObject) -> None: 204 for arg in tokens: 205 (key, sep, val) = arg.partition('=') 206 if sep != '=': 207 raise QMPShellError( 208 f"Expected a key=value pair, got '{arg!s}'" 209 ) 210 211 value = self.__parse_value(val) 212 optpath = key.split('.') 213 curpath = [] 214 for path in optpath[:-1]: 215 curpath.append(path) 216 obj = parent.get(path, {}) 217 if not isinstance(obj, dict): 218 msg = 'Cannot use "{:s}" as both leaf and non-leaf key' 219 raise QMPShellError(msg.format('.'.join(curpath))) 220 parent[path] = obj 221 parent = obj 222 if optpath[-1] in parent: 223 if isinstance(parent[optpath[-1]], dict): 224 msg = 'Cannot use "{:s}" as both leaf and non-leaf key' 225 raise QMPShellError(msg.format('.'.join(curpath))) 226 raise QMPShellError(f'Cannot set "{key}" multiple times') 227 parent[optpath[-1]] = value 228 229 def __build_cmd(self, cmdline: str) -> Optional[QMPMessage]: 230 """ 231 Build a QMP input object from a user provided command-line in the 232 following format: 233 234 < command-name > [ arg-name1=arg1 ] ... [ arg-nameN=argN ] 235 """ 236 argument_regex = r'''(?:[^\s"']|"(?:\\.|[^"])*"|'(?:\\.|[^'])*')+''' 237 cmdargs = re.findall(argument_regex, cmdline) 238 qmpcmd: QMPMessage 239 240 # Transactional CLI entry/exit: 241 if cmdargs[0] == 'transaction(': 242 self._transmode = True 243 cmdargs.pop(0) 244 elif cmdargs[0] == ')' and self._transmode: 245 self._transmode = False 246 if len(cmdargs) > 1: 247 msg = 'Unexpected input after close of Transaction sub-shell' 248 raise QMPShellError(msg) 249 qmpcmd = { 250 'execute': 'transaction', 251 'arguments': {'actions': self._actions} 252 } 253 self._actions = list() 254 return qmpcmd 255 256 # Nothing to process? 257 if not cmdargs: 258 return None 259 260 # Parse and then cache this Transactional Action 261 if self._transmode: 262 finalize = False 263 action = {'type': cmdargs[0], 'data': {}} 264 if cmdargs[-1] == ')': 265 cmdargs.pop(-1) 266 finalize = True 267 self.__cli_expr(cmdargs[1:], action['data']) 268 self._actions.append(action) 269 return self.__build_cmd(')') if finalize else None 270 271 # Standard command: parse and return it to be executed. 272 qmpcmd = {'execute': cmdargs[0], 'arguments': {}} 273 self.__cli_expr(cmdargs[1:], qmpcmd['arguments']) 274 return qmpcmd 275 276 def _print(self, qmp_message: object) -> None: 277 jsobj = json.dumps(qmp_message, 278 indent=4 if self.pretty else None, 279 sort_keys=self.pretty) 280 print(str(jsobj)) 281 282 def _execute_cmd(self, cmdline: str) -> bool: 283 try: 284 qmpcmd = self.__build_cmd(cmdline) 285 except Exception as err: 286 print('Error while parsing command line: %s' % err) 287 print('command format: <command-name> ', end=' ') 288 print('[arg-name1=arg1] ... [arg-nameN=argN]') 289 return True 290 # For transaction mode, we may have just cached the action: 291 if qmpcmd is None: 292 return True 293 if self.verbose: 294 self._print(qmpcmd) 295 resp = self.cmd_obj(qmpcmd) 296 if resp is None: 297 print('Disconnected') 298 return False 299 self._print(resp) 300 return True 301 302 def connect(self, negotiate: bool = True) -> None: 303 self._greeting = super().connect(negotiate) 304 self.__completer_setup() 305 306 def show_banner(self, 307 msg: str = 'Welcome to the QMP low-level shell!') -> None: 308 print(msg) 309 if not self._greeting: 310 print('Connected') 311 return 312 version = self._greeting['QMP']['version']['qemu'] 313 print("Connected to QEMU {major}.{minor}.{micro}\n".format(**version)) 314 315 @property 316 def prompt(self) -> str: 317 if self._transmode: 318 return 'TRANS> ' 319 return '(QEMU) ' 320 321 def read_exec_command(self) -> bool: 322 """ 323 Read and execute a command. 324 325 @return True if execution was ok, return False if disconnected. 326 """ 327 try: 328 cmdline = input(self.prompt) 329 except EOFError: 330 print() 331 return False 332 333 if cmdline == '': 334 for event in self.get_events(): 335 print(event) 336 self.clear_events() 337 return True 338 339 return self._execute_cmd(cmdline) 340 341 def repl(self) -> Iterator[None]: 342 self.show_banner() 343 while self.read_exec_command(): 344 yield 345 self.close() 346 347 348class HMPShell(QMPShell): 349 def __init__(self, address: qmp.SocketAddrT, 350 pretty: bool = False, verbose: bool = False): 351 super().__init__(address, pretty, verbose) 352 self.__cpu_index = 0 353 354 def __cmd_completion(self) -> None: 355 for cmd in self.__cmd_passthrough('help')['return'].split('\r\n'): 356 if cmd and cmd[0] != '[' and cmd[0] != '\t': 357 name = cmd.split()[0] # drop help text 358 if name == 'info': 359 continue 360 if name.find('|') != -1: 361 # Command in the form 'foobar|f' or 'f|foobar', take the 362 # full name 363 opt = name.split('|') 364 if len(opt[0]) == 1: 365 name = opt[1] 366 else: 367 name = opt[0] 368 self._completer.append(name) 369 self._completer.append('help ' + name) # help completion 370 371 def __info_completion(self) -> None: 372 for cmd in self.__cmd_passthrough('info')['return'].split('\r\n'): 373 if cmd: 374 self._completer.append('info ' + cmd.split()[1]) 375 376 def __other_completion(self) -> None: 377 # special cases 378 self._completer.append('help info') 379 380 def _fill_completion(self) -> None: 381 self.__cmd_completion() 382 self.__info_completion() 383 self.__other_completion() 384 385 def __cmd_passthrough(self, cmdline: str, 386 cpu_index: int = 0) -> QMPMessage: 387 return self.cmd_obj({ 388 'execute': 'human-monitor-command', 389 'arguments': { 390 'command-line': cmdline, 391 'cpu-index': cpu_index 392 } 393 }) 394 395 def _execute_cmd(self, cmdline: str) -> bool: 396 if cmdline.split()[0] == "cpu": 397 # trap the cpu command, it requires special setting 398 try: 399 idx = int(cmdline.split()[1]) 400 if 'return' not in self.__cmd_passthrough('info version', idx): 401 print('bad CPU index') 402 return True 403 self.__cpu_index = idx 404 except ValueError: 405 print('cpu command takes an integer argument') 406 return True 407 resp = self.__cmd_passthrough(cmdline, self.__cpu_index) 408 if resp is None: 409 print('Disconnected') 410 return False 411 assert 'return' in resp or 'error' in resp 412 if 'return' in resp: 413 # Success 414 if len(resp['return']) > 0: 415 print(resp['return'], end=' ') 416 else: 417 # Error 418 print('%s: %s' % (resp['error']['class'], resp['error']['desc'])) 419 return True 420 421 def show_banner(self, msg: str = 'Welcome to the HMP shell!') -> None: 422 QMPShell.show_banner(self, msg) 423 424 425def die(msg: str) -> NoReturn: 426 sys.stderr.write('ERROR: %s\n' % msg) 427 sys.exit(1) 428 429 430def main() -> None: 431 parser = argparse.ArgumentParser() 432 parser.add_argument('-H', '--hmp', action='store_true', 433 help='Use HMP interface') 434 parser.add_argument('-N', '--skip-negotiation', action='store_true', 435 help='Skip negotiate (for qemu-ga)') 436 parser.add_argument('-v', '--verbose', action='store_true', 437 help='Verbose (echo commands sent and received)') 438 parser.add_argument('-p', '--pretty', action='store_true', 439 help='Pretty-print JSON') 440 441 default_server = os.environ.get('QMP_SOCKET') 442 parser.add_argument('qmp_server', action='store', 443 default=default_server, 444 help='< UNIX socket path | TCP address:port >') 445 446 args = parser.parse_args() 447 if args.qmp_server is None: 448 parser.error("QMP socket or TCP address must be specified") 449 450 shell_class = HMPShell if args.hmp else QMPShell 451 try: 452 address = shell_class.parse_address(args.qmp_server) 453 except qmp.QMPBadPortError: 454 parser.error(f"Bad port number: {args.qmp_server}") 455 return # pycharm doesn't know error() is noreturn 456 457 qemu = shell_class(address, args.pretty, args.verbose) 458 459 try: 460 qemu.connect(negotiate=not args.skip_negotiation) 461 except qmp.QMPConnectError: 462 die("Didn't get QMP greeting message") 463 except qmp.QMPCapabilitiesError: 464 die("Couldn't negotiate capabilities") 465 except OSError as err: 466 die(f"Couldn't connect to {args.qmp_server}: {err!s}") 467 468 for _ in qemu.repl(): 469 pass 470 471 472if __name__ == '__main__': 473 main() 474