1#!/usr/bin/env python3 2# 3# Copyright (C) 2009, 2010 Red Hat Inc. 4# 5# Authors: 6# Luiz Capitulino <lcapitulino@redhat.com> 7# 8# This work is licensed under the terms of the GNU GPL, version 2. See 9# the COPYING file in the top-level directory. 10# 11 12""" 13Low-level QEMU shell on top of QMP. 14 15usage: qmp-shell [-h] [-H] [-N] [-v] [-p] qmp_server 16 17positional arguments: 18 qmp_server < UNIX socket path | TCP address:port > 19 20optional arguments: 21 -h, --help show this help message and exit 22 -H, --hmp Use HMP interface 23 -N, --skip-negotiation 24 Skip negotiate (for qemu-ga) 25 -v, --verbose Verbose (echo commands sent and received) 26 -p, --pretty Pretty-print JSON 27 28 29Start QEMU with: 30 31# qemu [...] -qmp unix:./qmp-sock,server 32 33Run the shell: 34 35$ qmp-shell ./qmp-sock 36 37Commands have the following format: 38 39 < command-name > [ arg-name1=arg1 ] ... [ arg-nameN=argN ] 40 41For example: 42 43(QEMU) device_add driver=e1000 id=net1 44{'return': {}} 45(QEMU) 46 47key=value pairs also support Python or JSON object literal subset notations, 48without spaces. Dictionaries/objects {} are supported as are arrays []. 49 50 example-command arg-name1={'key':'value','obj'={'prop':"value"}} 51 52Both JSON and Python formatting should work, including both styles of 53string literal quotes. Both paradigms of literal values should work, 54including null/true/false for JSON and None/True/False for Python. 55 56 57Transactions have the following multi-line format: 58 59 transaction( 60 action-name1 [ arg-name1=arg1 ] ... [arg-nameN=argN ] 61 ... 62 action-nameN [ arg-name1=arg1 ] ... [arg-nameN=argN ] 63 ) 64 65One line transactions are also supported: 66 67 transaction( action-name1 ... ) 68 69For example: 70 71 (QEMU) transaction( 72 TRANS> block-dirty-bitmap-add node=drive0 name=bitmap1 73 TRANS> block-dirty-bitmap-clear node=drive0 name=bitmap0 74 TRANS> ) 75 {"return": {}} 76 (QEMU) 77 78Use the -v and -p options to activate the verbose and pretty-print options, 79which will echo back the properly formatted JSON-compliant QMP that is being 80sent to QEMU, which is useful for debugging and documentation generation. 81""" 82 83import argparse 84import ast 85import json 86import logging 87import os 88import re 89import readline 90import sys 91from typing import ( 92 Iterator, 93 List, 94 NoReturn, 95 Optional, 96 Sequence, 97) 98 99 100sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', 'python')) 101from qemu import qmp 102from qemu.qmp import QMPMessage 103 104 105LOG = logging.getLogger(__name__) 106 107 108class QMPCompleter: 109 """ 110 QMPCompleter provides a readline library tab-complete behavior. 111 """ 112 # NB: Python 3.9+ will probably allow us to subclass list[str] directly, 113 # but pylint as of today does not know that List[str] is simply 'list'. 114 def __init__(self) -> None: 115 self._matches: List[str] = [] 116 117 def append(self, value: str) -> None: 118 """Append a new valid completion to the list of possibilities.""" 119 return self._matches.append(value) 120 121 def complete(self, text: str, state: int) -> Optional[str]: 122 """readline.set_completer() callback implementation.""" 123 for cmd in self._matches: 124 if cmd.startswith(text): 125 if state == 0: 126 return cmd 127 state -= 1 128 return None 129 130 131class QMPShellError(qmp.QMPError): 132 """ 133 QMP Shell Base error class. 134 """ 135 136 137class FuzzyJSON(ast.NodeTransformer): 138 """ 139 This extension of ast.NodeTransformer filters literal "true/false/null" 140 values in a Python AST and replaces them by proper "True/False/None" values 141 that Python can properly evaluate. 142 """ 143 144 @classmethod 145 def visit_Name(cls, # pylint: disable=invalid-name 146 node: ast.Name) -> ast.AST: 147 """ 148 Transform Name nodes with certain values into Constant (keyword) nodes. 149 """ 150 if node.id == 'true': 151 return ast.Constant(value=True) 152 if node.id == 'false': 153 return ast.Constant(value=False) 154 if node.id == 'null': 155 return ast.Constant(value=None) 156 return node 157 158 159class QMPShell(qmp.QEMUMonitorProtocol): 160 """ 161 QMPShell provides a basic readline-based QMP shell. 162 163 :param address: Address of the QMP server. 164 :param pretty: Pretty-print QMP messages. 165 :param verbose: Echo outgoing QMP messages to console. 166 """ 167 def __init__(self, address: qmp.SocketAddrT, 168 pretty: bool = False, verbose: bool = False): 169 super().__init__(address) 170 self._greeting: Optional[QMPMessage] = None 171 self._completer = QMPCompleter() 172 self._transmode = False 173 self._actions: List[QMPMessage] = [] 174 self._histfile = os.path.join(os.path.expanduser('~'), 175 '.qmp-shell_history') 176 self.pretty = pretty 177 self.verbose = verbose 178 179 def close(self) -> None: 180 # Hook into context manager of parent to save shell history. 181 self._save_history() 182 super().close() 183 184 def _fill_completion(self) -> None: 185 cmds = self.cmd('query-commands') 186 if 'error' in cmds: 187 return 188 for cmd in cmds['return']: 189 self._completer.append(cmd['name']) 190 191 def _completer_setup(self) -> None: 192 self._completer = QMPCompleter() 193 self._fill_completion() 194 readline.set_history_length(1024) 195 readline.set_completer(self._completer.complete) 196 readline.parse_and_bind("tab: complete") 197 # NB: default delimiters conflict with some command names 198 # (eg. query-), clearing everything as it doesn't seem to matter 199 readline.set_completer_delims('') 200 try: 201 readline.read_history_file(self._histfile) 202 except FileNotFoundError: 203 pass 204 except IOError as err: 205 msg = f"Failed to read history '{self._histfile}': {err!s}" 206 LOG.warning(msg) 207 208 def _save_history(self) -> None: 209 try: 210 readline.write_history_file(self._histfile) 211 except IOError as err: 212 msg = f"Failed to save history file '{self._histfile}': {err!s}" 213 LOG.warning(msg) 214 215 @classmethod 216 def _parse_value(cls, val: str) -> object: 217 try: 218 return int(val) 219 except ValueError: 220 pass 221 222 if val.lower() == 'true': 223 return True 224 if val.lower() == 'false': 225 return False 226 if val.startswith(('{', '[')): 227 # Try first as pure JSON: 228 try: 229 return json.loads(val) 230 except ValueError: 231 pass 232 # Try once again as FuzzyJSON: 233 try: 234 tree = ast.parse(val, mode='eval') 235 transformed = FuzzyJSON().visit(tree) 236 return ast.literal_eval(transformed) 237 except (SyntaxError, ValueError): 238 pass 239 return val 240 241 def _cli_expr(self, 242 tokens: Sequence[str], 243 parent: qmp.QMPObject) -> None: 244 for arg in tokens: 245 (key, sep, val) = arg.partition('=') 246 if sep != '=': 247 raise QMPShellError( 248 f"Expected a key=value pair, got '{arg!s}'" 249 ) 250 251 value = self._parse_value(val) 252 optpath = key.split('.') 253 curpath = [] 254 for path in optpath[:-1]: 255 curpath.append(path) 256 obj = parent.get(path, {}) 257 if not isinstance(obj, dict): 258 msg = 'Cannot use "{:s}" as both leaf and non-leaf key' 259 raise QMPShellError(msg.format('.'.join(curpath))) 260 parent[path] = obj 261 parent = obj 262 if optpath[-1] in parent: 263 if isinstance(parent[optpath[-1]], dict): 264 msg = 'Cannot use "{:s}" as both leaf and non-leaf key' 265 raise QMPShellError(msg.format('.'.join(curpath))) 266 raise QMPShellError(f'Cannot set "{key}" multiple times') 267 parent[optpath[-1]] = value 268 269 def _build_cmd(self, cmdline: str) -> Optional[QMPMessage]: 270 """ 271 Build a QMP input object from a user provided command-line in the 272 following format: 273 274 < command-name > [ arg-name1=arg1 ] ... [ arg-nameN=argN ] 275 """ 276 argument_regex = r'''(?:[^\s"']|"(?:\\.|[^"])*"|'(?:\\.|[^'])*')+''' 277 cmdargs = re.findall(argument_regex, cmdline) 278 qmpcmd: QMPMessage 279 280 # Transactional CLI entry: 281 if cmdargs and cmdargs[0] == 'transaction(': 282 self._transmode = True 283 self._actions = [] 284 cmdargs.pop(0) 285 286 # Transactional CLI exit: 287 if cmdargs and cmdargs[0] == ')' and self._transmode: 288 self._transmode = False 289 if len(cmdargs) > 1: 290 msg = 'Unexpected input after close of Transaction sub-shell' 291 raise QMPShellError(msg) 292 qmpcmd = { 293 'execute': 'transaction', 294 'arguments': {'actions': self._actions} 295 } 296 return qmpcmd 297 298 # No args, or no args remaining 299 if not cmdargs: 300 return None 301 302 if self._transmode: 303 # Parse and cache this Transactional Action 304 finalize = False 305 action = {'type': cmdargs[0], 'data': {}} 306 if cmdargs[-1] == ')': 307 cmdargs.pop(-1) 308 finalize = True 309 self._cli_expr(cmdargs[1:], action['data']) 310 self._actions.append(action) 311 return self._build_cmd(')') if finalize else None 312 313 # Standard command: parse and return it to be executed. 314 qmpcmd = {'execute': cmdargs[0], 'arguments': {}} 315 self._cli_expr(cmdargs[1:], qmpcmd['arguments']) 316 return qmpcmd 317 318 def _print(self, qmp_message: object) -> None: 319 jsobj = json.dumps(qmp_message, 320 indent=4 if self.pretty else None, 321 sort_keys=self.pretty) 322 print(str(jsobj)) 323 324 def _execute_cmd(self, cmdline: str) -> bool: 325 try: 326 qmpcmd = self._build_cmd(cmdline) 327 except QMPShellError as err: 328 print( 329 f"Error while parsing command line: {err!s}\n" 330 "command format: <command-name> " 331 "[arg-name1=arg1] ... [arg-nameN=argN", 332 file=sys.stderr 333 ) 334 return True 335 # For transaction mode, we may have just cached the action: 336 if qmpcmd is None: 337 return True 338 if self.verbose: 339 self._print(qmpcmd) 340 resp = self.cmd_obj(qmpcmd) 341 if resp is None: 342 print('Disconnected') 343 return False 344 self._print(resp) 345 return True 346 347 def connect(self, negotiate: bool = True) -> None: 348 self._greeting = super().connect(negotiate) 349 self._completer_setup() 350 351 def show_banner(self, 352 msg: str = 'Welcome to the QMP low-level shell!') -> None: 353 """ 354 Print to stdio a greeting, and the QEMU version if available. 355 """ 356 print(msg) 357 if not self._greeting: 358 print('Connected') 359 return 360 version = self._greeting['QMP']['version']['qemu'] 361 print("Connected to QEMU {major}.{minor}.{micro}\n".format(**version)) 362 363 @property 364 def prompt(self) -> str: 365 """ 366 Return the current shell prompt, including a trailing space. 367 """ 368 if self._transmode: 369 return 'TRANS> ' 370 return '(QEMU) ' 371 372 def read_exec_command(self) -> bool: 373 """ 374 Read and execute a command. 375 376 @return True if execution was ok, return False if disconnected. 377 """ 378 try: 379 cmdline = input(self.prompt) 380 except EOFError: 381 print() 382 return False 383 384 if cmdline == '': 385 for event in self.get_events(): 386 print(event) 387 self.clear_events() 388 return True 389 390 return self._execute_cmd(cmdline) 391 392 def repl(self) -> Iterator[None]: 393 """ 394 Return an iterator that implements the REPL. 395 """ 396 self.show_banner() 397 while self.read_exec_command(): 398 yield 399 self.close() 400 401 402class HMPShell(QMPShell): 403 """ 404 HMPShell provides a basic readline-based HMP shell, tunnelled via QMP. 405 406 :param address: Address of the QMP server. 407 :param pretty: Pretty-print QMP messages. 408 :param verbose: Echo outgoing QMP messages to console. 409 """ 410 def __init__(self, address: qmp.SocketAddrT, 411 pretty: bool = False, verbose: bool = False): 412 super().__init__(address, pretty, verbose) 413 self._cpu_index = 0 414 415 def _cmd_completion(self) -> None: 416 for cmd in self._cmd_passthrough('help')['return'].split('\r\n'): 417 if cmd and cmd[0] != '[' and cmd[0] != '\t': 418 name = cmd.split()[0] # drop help text 419 if name == 'info': 420 continue 421 if name.find('|') != -1: 422 # Command in the form 'foobar|f' or 'f|foobar', take the 423 # full name 424 opt = name.split('|') 425 if len(opt[0]) == 1: 426 name = opt[1] 427 else: 428 name = opt[0] 429 self._completer.append(name) 430 self._completer.append('help ' + name) # help completion 431 432 def _info_completion(self) -> None: 433 for cmd in self._cmd_passthrough('info')['return'].split('\r\n'): 434 if cmd: 435 self._completer.append('info ' + cmd.split()[1]) 436 437 def _other_completion(self) -> None: 438 # special cases 439 self._completer.append('help info') 440 441 def _fill_completion(self) -> None: 442 self._cmd_completion() 443 self._info_completion() 444 self._other_completion() 445 446 def _cmd_passthrough(self, cmdline: str, 447 cpu_index: int = 0) -> QMPMessage: 448 return self.cmd_obj({ 449 'execute': 'human-monitor-command', 450 'arguments': { 451 'command-line': cmdline, 452 'cpu-index': cpu_index 453 } 454 }) 455 456 def _execute_cmd(self, cmdline: str) -> bool: 457 if cmdline.split()[0] == "cpu": 458 # trap the cpu command, it requires special setting 459 try: 460 idx = int(cmdline.split()[1]) 461 if 'return' not in self._cmd_passthrough('info version', idx): 462 print('bad CPU index') 463 return True 464 self._cpu_index = idx 465 except ValueError: 466 print('cpu command takes an integer argument') 467 return True 468 resp = self._cmd_passthrough(cmdline, self._cpu_index) 469 if resp is None: 470 print('Disconnected') 471 return False 472 assert 'return' in resp or 'error' in resp 473 if 'return' in resp: 474 # Success 475 if len(resp['return']) > 0: 476 print(resp['return'], end=' ') 477 else: 478 # Error 479 print('%s: %s' % (resp['error']['class'], resp['error']['desc'])) 480 return True 481 482 def show_banner(self, msg: str = 'Welcome to the HMP shell!') -> None: 483 QMPShell.show_banner(self, msg) 484 485 486def die(msg: str) -> NoReturn: 487 """Write an error to stderr, then exit with a return code of 1.""" 488 sys.stderr.write('ERROR: %s\n' % msg) 489 sys.exit(1) 490 491 492def main() -> None: 493 """ 494 qmp-shell entry point: parse command line arguments and start the REPL. 495 """ 496 parser = argparse.ArgumentParser() 497 parser.add_argument('-H', '--hmp', action='store_true', 498 help='Use HMP interface') 499 parser.add_argument('-N', '--skip-negotiation', action='store_true', 500 help='Skip negotiate (for qemu-ga)') 501 parser.add_argument('-v', '--verbose', action='store_true', 502 help='Verbose (echo commands sent and received)') 503 parser.add_argument('-p', '--pretty', action='store_true', 504 help='Pretty-print JSON') 505 506 default_server = os.environ.get('QMP_SOCKET') 507 parser.add_argument('qmp_server', action='store', 508 default=default_server, 509 help='< UNIX socket path | TCP address:port >') 510 511 args = parser.parse_args() 512 if args.qmp_server is None: 513 parser.error("QMP socket or TCP address must be specified") 514 515 shell_class = HMPShell if args.hmp else QMPShell 516 517 try: 518 address = shell_class.parse_address(args.qmp_server) 519 except qmp.QMPBadPortError: 520 parser.error(f"Bad port number: {args.qmp_server}") 521 return # pycharm doesn't know error() is noreturn 522 523 with shell_class(address, args.pretty, args.verbose) as qemu: 524 try: 525 qemu.connect(negotiate=not args.skip_negotiation) 526 except qmp.QMPConnectError: 527 die("Didn't get QMP greeting message") 528 except qmp.QMPCapabilitiesError: 529 die("Couldn't negotiate capabilities") 530 except OSError as err: 531 die(f"Couldn't connect to {args.qmp_server}: {err!s}") 532 533 for _ in qemu.repl(): 534 pass 535 536 537if __name__ == '__main__': 538 main() 539