1#!/usr/bin/env python3 2# 3# Low-level QEMU shell on top of QMP. 4# 5# Copyright (C) 2009, 2010 Red Hat Inc. 6# 7# Authors: 8# Luiz Capitulino <lcapitulino@redhat.com> 9# 10# This work is licensed under the terms of the GNU GPL, version 2. See 11# the COPYING file in the top-level directory. 12# 13# Usage: 14# 15# Start QEMU with: 16# 17# # qemu [...] -qmp unix:./qmp-sock,server 18# 19# Run the shell: 20# 21# $ qmp-shell ./qmp-sock 22# 23# Commands have the following format: 24# 25# < command-name > [ arg-name1=arg1 ] ... [ arg-nameN=argN ] 26# 27# For example: 28# 29# (QEMU) device_add driver=e1000 id=net1 30# {u'return': {}} 31# (QEMU) 32# 33# key=value pairs also support Python or JSON object literal subset notations, 34# without spaces. Dictionaries/objects {} are supported as are arrays []. 35# 36# example-command arg-name1={'key':'value','obj'={'prop':"value"}} 37# 38# Both JSON and Python formatting should work, including both styles of 39# string literal quotes. Both paradigms of literal values should work, 40# including null/true/false for JSON and None/True/False for Python. 41# 42# 43# Transactions have the following multi-line format: 44# 45# transaction( 46# action-name1 [ arg-name1=arg1 ] ... [arg-nameN=argN ] 47# ... 48# action-nameN [ arg-name1=arg1 ] ... [arg-nameN=argN ] 49# ) 50# 51# One line transactions are also supported: 52# 53# transaction( action-name1 ... ) 54# 55# For example: 56# 57# (QEMU) transaction( 58# TRANS> block-dirty-bitmap-add node=drive0 name=bitmap1 59# TRANS> block-dirty-bitmap-clear node=drive0 name=bitmap0 60# TRANS> ) 61# {"return": {}} 62# (QEMU) 63# 64# Use the -v and -p options to activate the verbose and pretty-print options, 65# which will echo back the properly formatted JSON-compliant QMP that is being 66# sent to QEMU, which is useful for debugging and documentation generation. 67import argparse 68import ast 69import json 70import os 71import re 72import readline 73import sys 74from typing import ( 75 Iterator, 76 List, 77 NoReturn, 78 Optional, 79 Sequence, 80) 81 82 83sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', 'python')) 84from qemu import qmp 85from qemu.qmp import QMPMessage 86 87 88class QMPCompleter: 89 # NB: Python 3.9+ will probably allow us to subclass list[str] directly, 90 # but pylint as of today does not know that List[str] is simply 'list'. 91 def __init__(self) -> None: 92 self._matches: List[str] = [] 93 94 def append(self, value: str) -> None: 95 return self._matches.append(value) 96 97 def complete(self, text: str, state: int) -> Optional[str]: 98 for cmd in self._matches: 99 if cmd.startswith(text): 100 if state == 0: 101 return cmd 102 state -= 1 103 return None 104 105 106class QMPShellError(Exception): 107 pass 108 109 110class FuzzyJSON(ast.NodeTransformer): 111 """ 112 This extension of ast.NodeTransformer filters literal "true/false/null" 113 values in a Python AST and replaces them by proper "True/False/None" values 114 that Python can properly evaluate. 115 """ 116 117 @classmethod 118 def visit_Name(cls, # pylint: disable=invalid-name 119 node: ast.Name) -> ast.AST: 120 if node.id == 'true': 121 return ast.Constant(value=True) 122 if node.id == 'false': 123 return ast.Constant(value=False) 124 if node.id == 'null': 125 return ast.Constant(value=None) 126 return node 127 128 129# TODO: QMPShell's interface is a bit ugly (eg. _fill_completion() and 130# _execute_cmd()). Let's design a better one. 131class QMPShell(qmp.QEMUMonitorProtocol): 132 def __init__(self, address: qmp.SocketAddrT, 133 pretty: bool = False, verbose: bool = False): 134 super().__init__(address) 135 self._greeting: Optional[QMPMessage] = None 136 self._completer = QMPCompleter() 137 self._transmode = False 138 self._actions: List[QMPMessage] = [] 139 self._histfile = os.path.join(os.path.expanduser('~'), 140 '.qmp-shell_history') 141 self.pretty = pretty 142 self.verbose = verbose 143 144 def close(self) -> None: 145 # Hook into context manager of parent to save shell history. 146 self._save_history() 147 super().close() 148 149 def _fill_completion(self) -> None: 150 cmds = self.cmd('query-commands') 151 if 'error' in cmds: 152 return 153 for cmd in cmds['return']: 154 self._completer.append(cmd['name']) 155 156 def __completer_setup(self) -> None: 157 self._completer = QMPCompleter() 158 self._fill_completion() 159 readline.set_history_length(1024) 160 readline.set_completer(self._completer.complete) 161 readline.parse_and_bind("tab: complete") 162 # NB: default delimiters conflict with some command names 163 # (eg. query-), clearing everything as it doesn't seem to matter 164 readline.set_completer_delims('') 165 try: 166 readline.read_history_file(self._histfile) 167 except FileNotFoundError: 168 pass 169 except IOError as err: 170 print(f"Failed to read history '{self._histfile}': {err!s}") 171 172 def _save_history(self) -> None: 173 try: 174 readline.write_history_file(self._histfile) 175 except IOError as err: 176 print(f"Failed to save history file '{self._histfile}': {err!s}") 177 178 @classmethod 179 def __parse_value(cls, val: str) -> object: 180 try: 181 return int(val) 182 except ValueError: 183 pass 184 185 if val.lower() == 'true': 186 return True 187 if val.lower() == 'false': 188 return False 189 if val.startswith(('{', '[')): 190 # Try first as pure JSON: 191 try: 192 return json.loads(val) 193 except ValueError: 194 pass 195 # Try once again as FuzzyJSON: 196 try: 197 tree = ast.parse(val, mode='eval') 198 transformed = FuzzyJSON().visit(tree) 199 return ast.literal_eval(transformed) 200 except (SyntaxError, ValueError): 201 pass 202 return val 203 204 def __cli_expr(self, 205 tokens: Sequence[str], 206 parent: qmp.QMPObject) -> None: 207 for arg in tokens: 208 (key, sep, val) = arg.partition('=') 209 if sep != '=': 210 raise QMPShellError( 211 f"Expected a key=value pair, got '{arg!s}'" 212 ) 213 214 value = self.__parse_value(val) 215 optpath = key.split('.') 216 curpath = [] 217 for path in optpath[:-1]: 218 curpath.append(path) 219 obj = parent.get(path, {}) 220 if not isinstance(obj, dict): 221 msg = 'Cannot use "{:s}" as both leaf and non-leaf key' 222 raise QMPShellError(msg.format('.'.join(curpath))) 223 parent[path] = obj 224 parent = obj 225 if optpath[-1] in parent: 226 if isinstance(parent[optpath[-1]], dict): 227 msg = 'Cannot use "{:s}" as both leaf and non-leaf key' 228 raise QMPShellError(msg.format('.'.join(curpath))) 229 raise QMPShellError(f'Cannot set "{key}" multiple times') 230 parent[optpath[-1]] = value 231 232 def __build_cmd(self, cmdline: str) -> Optional[QMPMessage]: 233 """ 234 Build a QMP input object from a user provided command-line in the 235 following format: 236 237 < command-name > [ arg-name1=arg1 ] ... [ arg-nameN=argN ] 238 """ 239 argument_regex = r'''(?:[^\s"']|"(?:\\.|[^"])*"|'(?:\\.|[^'])*')+''' 240 cmdargs = re.findall(argument_regex, cmdline) 241 qmpcmd: QMPMessage 242 243 # Transactional CLI entry/exit: 244 if cmdargs[0] == 'transaction(': 245 self._transmode = True 246 cmdargs.pop(0) 247 elif cmdargs[0] == ')' and self._transmode: 248 self._transmode = False 249 if len(cmdargs) > 1: 250 msg = 'Unexpected input after close of Transaction sub-shell' 251 raise QMPShellError(msg) 252 qmpcmd = { 253 'execute': 'transaction', 254 'arguments': {'actions': self._actions} 255 } 256 self._actions = list() 257 return qmpcmd 258 259 # Nothing to process? 260 if not cmdargs: 261 return None 262 263 # Parse and then cache this Transactional Action 264 if self._transmode: 265 finalize = False 266 action = {'type': cmdargs[0], 'data': {}} 267 if cmdargs[-1] == ')': 268 cmdargs.pop(-1) 269 finalize = True 270 self.__cli_expr(cmdargs[1:], action['data']) 271 self._actions.append(action) 272 return self.__build_cmd(')') if finalize else None 273 274 # Standard command: parse and return it to be executed. 275 qmpcmd = {'execute': cmdargs[0], 'arguments': {}} 276 self.__cli_expr(cmdargs[1:], qmpcmd['arguments']) 277 return qmpcmd 278 279 def _print(self, qmp_message: object) -> None: 280 jsobj = json.dumps(qmp_message, 281 indent=4 if self.pretty else None, 282 sort_keys=self.pretty) 283 print(str(jsobj)) 284 285 def _execute_cmd(self, cmdline: str) -> bool: 286 try: 287 qmpcmd = self.__build_cmd(cmdline) 288 except Exception as err: 289 print('Error while parsing command line: %s' % err) 290 print('command format: <command-name> ', end=' ') 291 print('[arg-name1=arg1] ... [arg-nameN=argN]') 292 return True 293 # For transaction mode, we may have just cached the action: 294 if qmpcmd is None: 295 return True 296 if self.verbose: 297 self._print(qmpcmd) 298 resp = self.cmd_obj(qmpcmd) 299 if resp is None: 300 print('Disconnected') 301 return False 302 self._print(resp) 303 return True 304 305 def connect(self, negotiate: bool = True) -> None: 306 self._greeting = super().connect(negotiate) 307 self.__completer_setup() 308 309 def show_banner(self, 310 msg: str = 'Welcome to the QMP low-level shell!') -> None: 311 print(msg) 312 if not self._greeting: 313 print('Connected') 314 return 315 version = self._greeting['QMP']['version']['qemu'] 316 print("Connected to QEMU {major}.{minor}.{micro}\n".format(**version)) 317 318 @property 319 def prompt(self) -> str: 320 if self._transmode: 321 return 'TRANS> ' 322 return '(QEMU) ' 323 324 def read_exec_command(self) -> bool: 325 """ 326 Read and execute a command. 327 328 @return True if execution was ok, return False if disconnected. 329 """ 330 try: 331 cmdline = input(self.prompt) 332 except EOFError: 333 print() 334 return False 335 336 if cmdline == '': 337 for event in self.get_events(): 338 print(event) 339 self.clear_events() 340 return True 341 342 return self._execute_cmd(cmdline) 343 344 def repl(self) -> Iterator[None]: 345 self.show_banner() 346 while self.read_exec_command(): 347 yield 348 self.close() 349 350 351class HMPShell(QMPShell): 352 def __init__(self, address: qmp.SocketAddrT, 353 pretty: bool = False, verbose: bool = False): 354 super().__init__(address, pretty, verbose) 355 self.__cpu_index = 0 356 357 def __cmd_completion(self) -> None: 358 for cmd in self.__cmd_passthrough('help')['return'].split('\r\n'): 359 if cmd and cmd[0] != '[' and cmd[0] != '\t': 360 name = cmd.split()[0] # drop help text 361 if name == 'info': 362 continue 363 if name.find('|') != -1: 364 # Command in the form 'foobar|f' or 'f|foobar', take the 365 # full name 366 opt = name.split('|') 367 if len(opt[0]) == 1: 368 name = opt[1] 369 else: 370 name = opt[0] 371 self._completer.append(name) 372 self._completer.append('help ' + name) # help completion 373 374 def __info_completion(self) -> None: 375 for cmd in self.__cmd_passthrough('info')['return'].split('\r\n'): 376 if cmd: 377 self._completer.append('info ' + cmd.split()[1]) 378 379 def __other_completion(self) -> None: 380 # special cases 381 self._completer.append('help info') 382 383 def _fill_completion(self) -> None: 384 self.__cmd_completion() 385 self.__info_completion() 386 self.__other_completion() 387 388 def __cmd_passthrough(self, cmdline: str, 389 cpu_index: int = 0) -> QMPMessage: 390 return self.cmd_obj({ 391 'execute': 'human-monitor-command', 392 'arguments': { 393 'command-line': cmdline, 394 'cpu-index': cpu_index 395 } 396 }) 397 398 def _execute_cmd(self, cmdline: str) -> bool: 399 if cmdline.split()[0] == "cpu": 400 # trap the cpu command, it requires special setting 401 try: 402 idx = int(cmdline.split()[1]) 403 if 'return' not in self.__cmd_passthrough('info version', idx): 404 print('bad CPU index') 405 return True 406 self.__cpu_index = idx 407 except ValueError: 408 print('cpu command takes an integer argument') 409 return True 410 resp = self.__cmd_passthrough(cmdline, self.__cpu_index) 411 if resp is None: 412 print('Disconnected') 413 return False 414 assert 'return' in resp or 'error' in resp 415 if 'return' in resp: 416 # Success 417 if len(resp['return']) > 0: 418 print(resp['return'], end=' ') 419 else: 420 # Error 421 print('%s: %s' % (resp['error']['class'], resp['error']['desc'])) 422 return True 423 424 def show_banner(self, msg: str = 'Welcome to the HMP shell!') -> None: 425 QMPShell.show_banner(self, msg) 426 427 428def die(msg: str) -> NoReturn: 429 sys.stderr.write('ERROR: %s\n' % msg) 430 sys.exit(1) 431 432 433def main() -> None: 434 parser = argparse.ArgumentParser() 435 parser.add_argument('-H', '--hmp', action='store_true', 436 help='Use HMP interface') 437 parser.add_argument('-N', '--skip-negotiation', action='store_true', 438 help='Skip negotiate (for qemu-ga)') 439 parser.add_argument('-v', '--verbose', action='store_true', 440 help='Verbose (echo commands sent and received)') 441 parser.add_argument('-p', '--pretty', action='store_true', 442 help='Pretty-print JSON') 443 444 default_server = os.environ.get('QMP_SOCKET') 445 parser.add_argument('qmp_server', action='store', 446 default=default_server, 447 help='< UNIX socket path | TCP address:port >') 448 449 args = parser.parse_args() 450 if args.qmp_server is None: 451 parser.error("QMP socket or TCP address must be specified") 452 453 shell_class = HMPShell if args.hmp else QMPShell 454 455 try: 456 address = shell_class.parse_address(args.qmp_server) 457 except qmp.QMPBadPortError: 458 parser.error(f"Bad port number: {args.qmp_server}") 459 return # pycharm doesn't know error() is noreturn 460 461 with shell_class(address, args.pretty, args.verbose) as qemu: 462 try: 463 qemu.connect(negotiate=not args.skip_negotiation) 464 except qmp.QMPConnectError: 465 die("Didn't get QMP greeting message") 466 except qmp.QMPCapabilitiesError: 467 die("Couldn't negotiate capabilities") 468 except OSError as err: 469 die(f"Couldn't connect to {args.qmp_server}: {err!s}") 470 471 for _ in qemu.repl(): 472 pass 473 474 475if __name__ == '__main__': 476 main() 477