1#!/usr/bin/env python3 2# 3# Copyright (C) 2009, 2010 Red Hat Inc. 4# 5# Authors: 6# Luiz Capitulino <lcapitulino@redhat.com> 7# 8# This work is licensed under the terms of the GNU GPL, version 2. See 9# the COPYING file in the top-level directory. 10# 11 12""" 13Low-level QEMU shell on top of QMP. 14 15usage: qmp-shell [-h] [-H] [-N] [-v] [-p] qmp_server 16 17positional arguments: 18 qmp_server < UNIX socket path | TCP address:port > 19 20optional arguments: 21 -h, --help show this help message and exit 22 -H, --hmp Use HMP interface 23 -N, --skip-negotiation 24 Skip negotiate (for qemu-ga) 25 -v, --verbose Verbose (echo commands sent and received) 26 -p, --pretty Pretty-print JSON 27 28 29Start QEMU with: 30 31# qemu [...] -qmp unix:./qmp-sock,server 32 33Run the shell: 34 35$ qmp-shell ./qmp-sock 36 37Commands have the following format: 38 39 < command-name > [ arg-name1=arg1 ] ... [ arg-nameN=argN ] 40 41For example: 42 43(QEMU) device_add driver=e1000 id=net1 44{'return': {}} 45(QEMU) 46 47key=value pairs also support Python or JSON object literal subset notations, 48without spaces. Dictionaries/objects {} are supported as are arrays []. 49 50 example-command arg-name1={'key':'value','obj'={'prop':"value"}} 51 52Both JSON and Python formatting should work, including both styles of 53string literal quotes. Both paradigms of literal values should work, 54including null/true/false for JSON and None/True/False for Python. 55 56 57Transactions have the following multi-line format: 58 59 transaction( 60 action-name1 [ arg-name1=arg1 ] ... [arg-nameN=argN ] 61 ... 62 action-nameN [ arg-name1=arg1 ] ... [arg-nameN=argN ] 63 ) 64 65One line transactions are also supported: 66 67 transaction( action-name1 ... ) 68 69For example: 70 71 (QEMU) transaction( 72 TRANS> block-dirty-bitmap-add node=drive0 name=bitmap1 73 TRANS> block-dirty-bitmap-clear node=drive0 name=bitmap0 74 TRANS> ) 75 {"return": {}} 76 (QEMU) 77 78Use the -v and -p options to activate the verbose and pretty-print options, 79which will echo back the properly formatted JSON-compliant QMP that is being 80sent to QEMU, which is useful for debugging and documentation generation. 81""" 82 83import argparse 84import ast 85import json 86import logging 87import os 88import re 89import readline 90import sys 91from typing import ( 92 Iterator, 93 List, 94 NoReturn, 95 Optional, 96 Sequence, 97) 98 99 100sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', 'python')) 101from qemu import qmp 102from qemu.qmp import QMPMessage 103 104 105LOG = logging.getLogger(__name__) 106 107 108class QMPCompleter: 109 # NB: Python 3.9+ will probably allow us to subclass list[str] directly, 110 # but pylint as of today does not know that List[str] is simply 'list'. 111 def __init__(self) -> None: 112 self._matches: List[str] = [] 113 114 def append(self, value: str) -> None: 115 return self._matches.append(value) 116 117 def complete(self, text: str, state: int) -> Optional[str]: 118 for cmd in self._matches: 119 if cmd.startswith(text): 120 if state == 0: 121 return cmd 122 state -= 1 123 return None 124 125 126class QMPShellError(Exception): 127 pass 128 129 130class FuzzyJSON(ast.NodeTransformer): 131 """ 132 This extension of ast.NodeTransformer filters literal "true/false/null" 133 values in a Python AST and replaces them by proper "True/False/None" values 134 that Python can properly evaluate. 135 """ 136 137 @classmethod 138 def visit_Name(cls, # pylint: disable=invalid-name 139 node: ast.Name) -> ast.AST: 140 if node.id == 'true': 141 return ast.Constant(value=True) 142 if node.id == 'false': 143 return ast.Constant(value=False) 144 if node.id == 'null': 145 return ast.Constant(value=None) 146 return node 147 148 149class QMPShell(qmp.QEMUMonitorProtocol): 150 def __init__(self, address: qmp.SocketAddrT, 151 pretty: bool = False, verbose: bool = False): 152 super().__init__(address) 153 self._greeting: Optional[QMPMessage] = None 154 self._completer = QMPCompleter() 155 self._transmode = False 156 self._actions: List[QMPMessage] = [] 157 self._histfile = os.path.join(os.path.expanduser('~'), 158 '.qmp-shell_history') 159 self.pretty = pretty 160 self.verbose = verbose 161 162 def close(self) -> None: 163 # Hook into context manager of parent to save shell history. 164 self._save_history() 165 super().close() 166 167 def _fill_completion(self) -> None: 168 cmds = self.cmd('query-commands') 169 if 'error' in cmds: 170 return 171 for cmd in cmds['return']: 172 self._completer.append(cmd['name']) 173 174 def __completer_setup(self) -> None: 175 self._completer = QMPCompleter() 176 self._fill_completion() 177 readline.set_history_length(1024) 178 readline.set_completer(self._completer.complete) 179 readline.parse_and_bind("tab: complete") 180 # NB: default delimiters conflict with some command names 181 # (eg. query-), clearing everything as it doesn't seem to matter 182 readline.set_completer_delims('') 183 try: 184 readline.read_history_file(self._histfile) 185 except FileNotFoundError: 186 pass 187 except IOError as err: 188 msg = f"Failed to read history '{self._histfile}': {err!s}" 189 LOG.warning(msg) 190 191 def _save_history(self) -> None: 192 try: 193 readline.write_history_file(self._histfile) 194 except IOError as err: 195 msg = f"Failed to save history file '{self._histfile}': {err!s}" 196 LOG.warning(msg) 197 198 @classmethod 199 def __parse_value(cls, val: str) -> object: 200 try: 201 return int(val) 202 except ValueError: 203 pass 204 205 if val.lower() == 'true': 206 return True 207 if val.lower() == 'false': 208 return False 209 if val.startswith(('{', '[')): 210 # Try first as pure JSON: 211 try: 212 return json.loads(val) 213 except ValueError: 214 pass 215 # Try once again as FuzzyJSON: 216 try: 217 tree = ast.parse(val, mode='eval') 218 transformed = FuzzyJSON().visit(tree) 219 return ast.literal_eval(transformed) 220 except (SyntaxError, ValueError): 221 pass 222 return val 223 224 def __cli_expr(self, 225 tokens: Sequence[str], 226 parent: qmp.QMPObject) -> None: 227 for arg in tokens: 228 (key, sep, val) = arg.partition('=') 229 if sep != '=': 230 raise QMPShellError( 231 f"Expected a key=value pair, got '{arg!s}'" 232 ) 233 234 value = self.__parse_value(val) 235 optpath = key.split('.') 236 curpath = [] 237 for path in optpath[:-1]: 238 curpath.append(path) 239 obj = parent.get(path, {}) 240 if not isinstance(obj, dict): 241 msg = 'Cannot use "{:s}" as both leaf and non-leaf key' 242 raise QMPShellError(msg.format('.'.join(curpath))) 243 parent[path] = obj 244 parent = obj 245 if optpath[-1] in parent: 246 if isinstance(parent[optpath[-1]], dict): 247 msg = 'Cannot use "{:s}" as both leaf and non-leaf key' 248 raise QMPShellError(msg.format('.'.join(curpath))) 249 raise QMPShellError(f'Cannot set "{key}" multiple times') 250 parent[optpath[-1]] = value 251 252 def __build_cmd(self, cmdline: str) -> Optional[QMPMessage]: 253 """ 254 Build a QMP input object from a user provided command-line in the 255 following format: 256 257 < command-name > [ arg-name1=arg1 ] ... [ arg-nameN=argN ] 258 """ 259 argument_regex = r'''(?:[^\s"']|"(?:\\.|[^"])*"|'(?:\\.|[^'])*')+''' 260 cmdargs = re.findall(argument_regex, cmdline) 261 qmpcmd: QMPMessage 262 263 # Transactional CLI entry: 264 if cmdargs and cmdargs[0] == 'transaction(': 265 self._transmode = True 266 self._actions = [] 267 cmdargs.pop(0) 268 269 # Transactional CLI exit: 270 if cmdargs and cmdargs[0] == ')' and self._transmode: 271 self._transmode = False 272 if len(cmdargs) > 1: 273 msg = 'Unexpected input after close of Transaction sub-shell' 274 raise QMPShellError(msg) 275 qmpcmd = { 276 'execute': 'transaction', 277 'arguments': {'actions': self._actions} 278 } 279 return qmpcmd 280 281 # No args, or no args remaining 282 if not cmdargs: 283 return None 284 285 if self._transmode: 286 # Parse and cache this Transactional Action 287 finalize = False 288 action = {'type': cmdargs[0], 'data': {}} 289 if cmdargs[-1] == ')': 290 cmdargs.pop(-1) 291 finalize = True 292 self.__cli_expr(cmdargs[1:], action['data']) 293 self._actions.append(action) 294 return self.__build_cmd(')') if finalize else None 295 296 # Standard command: parse and return it to be executed. 297 qmpcmd = {'execute': cmdargs[0], 'arguments': {}} 298 self.__cli_expr(cmdargs[1:], qmpcmd['arguments']) 299 return qmpcmd 300 301 def _print(self, qmp_message: object) -> None: 302 jsobj = json.dumps(qmp_message, 303 indent=4 if self.pretty else None, 304 sort_keys=self.pretty) 305 print(str(jsobj)) 306 307 def _execute_cmd(self, cmdline: str) -> bool: 308 try: 309 qmpcmd = self.__build_cmd(cmdline) 310 except QMPShellError as err: 311 print( 312 f"Error while parsing command line: {err!s}\n" 313 "command format: <command-name> " 314 "[arg-name1=arg1] ... [arg-nameN=argN", 315 file=sys.stderr 316 ) 317 return True 318 # For transaction mode, we may have just cached the action: 319 if qmpcmd is None: 320 return True 321 if self.verbose: 322 self._print(qmpcmd) 323 resp = self.cmd_obj(qmpcmd) 324 if resp is None: 325 print('Disconnected') 326 return False 327 self._print(resp) 328 return True 329 330 def connect(self, negotiate: bool = True) -> None: 331 self._greeting = super().connect(negotiate) 332 self.__completer_setup() 333 334 def show_banner(self, 335 msg: str = 'Welcome to the QMP low-level shell!') -> None: 336 print(msg) 337 if not self._greeting: 338 print('Connected') 339 return 340 version = self._greeting['QMP']['version']['qemu'] 341 print("Connected to QEMU {major}.{minor}.{micro}\n".format(**version)) 342 343 @property 344 def prompt(self) -> str: 345 if self._transmode: 346 return 'TRANS> ' 347 return '(QEMU) ' 348 349 def read_exec_command(self) -> bool: 350 """ 351 Read and execute a command. 352 353 @return True if execution was ok, return False if disconnected. 354 """ 355 try: 356 cmdline = input(self.prompt) 357 except EOFError: 358 print() 359 return False 360 361 if cmdline == '': 362 for event in self.get_events(): 363 print(event) 364 self.clear_events() 365 return True 366 367 return self._execute_cmd(cmdline) 368 369 def repl(self) -> Iterator[None]: 370 self.show_banner() 371 while self.read_exec_command(): 372 yield 373 self.close() 374 375 376class HMPShell(QMPShell): 377 def __init__(self, address: qmp.SocketAddrT, 378 pretty: bool = False, verbose: bool = False): 379 super().__init__(address, pretty, verbose) 380 self.__cpu_index = 0 381 382 def __cmd_completion(self) -> None: 383 for cmd in self.__cmd_passthrough('help')['return'].split('\r\n'): 384 if cmd and cmd[0] != '[' and cmd[0] != '\t': 385 name = cmd.split()[0] # drop help text 386 if name == 'info': 387 continue 388 if name.find('|') != -1: 389 # Command in the form 'foobar|f' or 'f|foobar', take the 390 # full name 391 opt = name.split('|') 392 if len(opt[0]) == 1: 393 name = opt[1] 394 else: 395 name = opt[0] 396 self._completer.append(name) 397 self._completer.append('help ' + name) # help completion 398 399 def __info_completion(self) -> None: 400 for cmd in self.__cmd_passthrough('info')['return'].split('\r\n'): 401 if cmd: 402 self._completer.append('info ' + cmd.split()[1]) 403 404 def __other_completion(self) -> None: 405 # special cases 406 self._completer.append('help info') 407 408 def _fill_completion(self) -> None: 409 self.__cmd_completion() 410 self.__info_completion() 411 self.__other_completion() 412 413 def __cmd_passthrough(self, cmdline: str, 414 cpu_index: int = 0) -> QMPMessage: 415 return self.cmd_obj({ 416 'execute': 'human-monitor-command', 417 'arguments': { 418 'command-line': cmdline, 419 'cpu-index': cpu_index 420 } 421 }) 422 423 def _execute_cmd(self, cmdline: str) -> bool: 424 if cmdline.split()[0] == "cpu": 425 # trap the cpu command, it requires special setting 426 try: 427 idx = int(cmdline.split()[1]) 428 if 'return' not in self.__cmd_passthrough('info version', idx): 429 print('bad CPU index') 430 return True 431 self.__cpu_index = idx 432 except ValueError: 433 print('cpu command takes an integer argument') 434 return True 435 resp = self.__cmd_passthrough(cmdline, self.__cpu_index) 436 if resp is None: 437 print('Disconnected') 438 return False 439 assert 'return' in resp or 'error' in resp 440 if 'return' in resp: 441 # Success 442 if len(resp['return']) > 0: 443 print(resp['return'], end=' ') 444 else: 445 # Error 446 print('%s: %s' % (resp['error']['class'], resp['error']['desc'])) 447 return True 448 449 def show_banner(self, msg: str = 'Welcome to the HMP shell!') -> None: 450 QMPShell.show_banner(self, msg) 451 452 453def die(msg: str) -> NoReturn: 454 sys.stderr.write('ERROR: %s\n' % msg) 455 sys.exit(1) 456 457 458def main() -> None: 459 parser = argparse.ArgumentParser() 460 parser.add_argument('-H', '--hmp', action='store_true', 461 help='Use HMP interface') 462 parser.add_argument('-N', '--skip-negotiation', action='store_true', 463 help='Skip negotiate (for qemu-ga)') 464 parser.add_argument('-v', '--verbose', action='store_true', 465 help='Verbose (echo commands sent and received)') 466 parser.add_argument('-p', '--pretty', action='store_true', 467 help='Pretty-print JSON') 468 469 default_server = os.environ.get('QMP_SOCKET') 470 parser.add_argument('qmp_server', action='store', 471 default=default_server, 472 help='< UNIX socket path | TCP address:port >') 473 474 args = parser.parse_args() 475 if args.qmp_server is None: 476 parser.error("QMP socket or TCP address must be specified") 477 478 shell_class = HMPShell if args.hmp else QMPShell 479 480 try: 481 address = shell_class.parse_address(args.qmp_server) 482 except qmp.QMPBadPortError: 483 parser.error(f"Bad port number: {args.qmp_server}") 484 return # pycharm doesn't know error() is noreturn 485 486 with shell_class(address, args.pretty, args.verbose) as qemu: 487 try: 488 qemu.connect(negotiate=not args.skip_negotiation) 489 except qmp.QMPConnectError: 490 die("Didn't get QMP greeting message") 491 except qmp.QMPCapabilitiesError: 492 die("Couldn't negotiate capabilities") 493 except OSError as err: 494 die(f"Couldn't connect to {args.qmp_server}: {err!s}") 495 496 for _ in qemu.repl(): 497 pass 498 499 500if __name__ == '__main__': 501 main() 502