1#!/usr/bin/env python3 2# 3# Low-level QEMU shell on top of QMP. 4# 5# Copyright (C) 2009, 2010 Red Hat Inc. 6# 7# Authors: 8# Luiz Capitulino <lcapitulino@redhat.com> 9# 10# This work is licensed under the terms of the GNU GPL, version 2. See 11# the COPYING file in the top-level directory. 12# 13# Usage: 14# 15# Start QEMU with: 16# 17# # qemu [...] -qmp unix:./qmp-sock,server 18# 19# Run the shell: 20# 21# $ qmp-shell ./qmp-sock 22# 23# Commands have the following format: 24# 25# < command-name > [ arg-name1=arg1 ] ... [ arg-nameN=argN ] 26# 27# For example: 28# 29# (QEMU) device_add driver=e1000 id=net1 30# {u'return': {}} 31# (QEMU) 32# 33# key=value pairs also support Python or JSON object literal subset notations, 34# without spaces. Dictionaries/objects {} are supported as are arrays []. 35# 36# example-command arg-name1={'key':'value','obj'={'prop':"value"}} 37# 38# Both JSON and Python formatting should work, including both styles of 39# string literal quotes. Both paradigms of literal values should work, 40# including null/true/false for JSON and None/True/False for Python. 41# 42# 43# Transactions have the following multi-line format: 44# 45# transaction( 46# action-name1 [ arg-name1=arg1 ] ... [arg-nameN=argN ] 47# ... 48# action-nameN [ arg-name1=arg1 ] ... [arg-nameN=argN ] 49# ) 50# 51# One line transactions are also supported: 52# 53# transaction( action-name1 ... ) 54# 55# For example: 56# 57# (QEMU) transaction( 58# TRANS> block-dirty-bitmap-add node=drive0 name=bitmap1 59# TRANS> block-dirty-bitmap-clear node=drive0 name=bitmap0 60# TRANS> ) 61# {"return": {}} 62# (QEMU) 63# 64# Use the -v and -p options to activate the verbose and pretty-print options, 65# which will echo back the properly formatted JSON-compliant QMP that is being 66# sent to QEMU, which is useful for debugging and documentation generation. 67import argparse 68import ast 69import json 70import logging 71import os 72import re 73import readline 74import sys 75from typing import ( 76 Iterator, 77 List, 78 NoReturn, 79 Optional, 80 Sequence, 81) 82 83 84sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', 'python')) 85from qemu import qmp 86from qemu.qmp import QMPMessage 87 88 89LOG = logging.getLogger(__name__) 90 91 92class QMPCompleter: 93 # NB: Python 3.9+ will probably allow us to subclass list[str] directly, 94 # but pylint as of today does not know that List[str] is simply 'list'. 95 def __init__(self) -> None: 96 self._matches: List[str] = [] 97 98 def append(self, value: str) -> None: 99 return self._matches.append(value) 100 101 def complete(self, text: str, state: int) -> Optional[str]: 102 for cmd in self._matches: 103 if cmd.startswith(text): 104 if state == 0: 105 return cmd 106 state -= 1 107 return None 108 109 110class QMPShellError(Exception): 111 pass 112 113 114class FuzzyJSON(ast.NodeTransformer): 115 """ 116 This extension of ast.NodeTransformer filters literal "true/false/null" 117 values in a Python AST and replaces them by proper "True/False/None" values 118 that Python can properly evaluate. 119 """ 120 121 @classmethod 122 def visit_Name(cls, # pylint: disable=invalid-name 123 node: ast.Name) -> ast.AST: 124 if node.id == 'true': 125 return ast.Constant(value=True) 126 if node.id == 'false': 127 return ast.Constant(value=False) 128 if node.id == 'null': 129 return ast.Constant(value=None) 130 return node 131 132 133class QMPShell(qmp.QEMUMonitorProtocol): 134 def __init__(self, address: qmp.SocketAddrT, 135 pretty: bool = False, verbose: bool = False): 136 super().__init__(address) 137 self._greeting: Optional[QMPMessage] = None 138 self._completer = QMPCompleter() 139 self._transmode = False 140 self._actions: List[QMPMessage] = [] 141 self._histfile = os.path.join(os.path.expanduser('~'), 142 '.qmp-shell_history') 143 self.pretty = pretty 144 self.verbose = verbose 145 146 def close(self) -> None: 147 # Hook into context manager of parent to save shell history. 148 self._save_history() 149 super().close() 150 151 def _fill_completion(self) -> None: 152 cmds = self.cmd('query-commands') 153 if 'error' in cmds: 154 return 155 for cmd in cmds['return']: 156 self._completer.append(cmd['name']) 157 158 def __completer_setup(self) -> None: 159 self._completer = QMPCompleter() 160 self._fill_completion() 161 readline.set_history_length(1024) 162 readline.set_completer(self._completer.complete) 163 readline.parse_and_bind("tab: complete") 164 # NB: default delimiters conflict with some command names 165 # (eg. query-), clearing everything as it doesn't seem to matter 166 readline.set_completer_delims('') 167 try: 168 readline.read_history_file(self._histfile) 169 except FileNotFoundError: 170 pass 171 except IOError as err: 172 msg = f"Failed to read history '{self._histfile}': {err!s}" 173 LOG.warning(msg) 174 175 def _save_history(self) -> None: 176 try: 177 readline.write_history_file(self._histfile) 178 except IOError as err: 179 msg = f"Failed to save history file '{self._histfile}': {err!s}" 180 LOG.warning(msg) 181 182 @classmethod 183 def __parse_value(cls, val: str) -> object: 184 try: 185 return int(val) 186 except ValueError: 187 pass 188 189 if val.lower() == 'true': 190 return True 191 if val.lower() == 'false': 192 return False 193 if val.startswith(('{', '[')): 194 # Try first as pure JSON: 195 try: 196 return json.loads(val) 197 except ValueError: 198 pass 199 # Try once again as FuzzyJSON: 200 try: 201 tree = ast.parse(val, mode='eval') 202 transformed = FuzzyJSON().visit(tree) 203 return ast.literal_eval(transformed) 204 except (SyntaxError, ValueError): 205 pass 206 return val 207 208 def __cli_expr(self, 209 tokens: Sequence[str], 210 parent: qmp.QMPObject) -> None: 211 for arg in tokens: 212 (key, sep, val) = arg.partition('=') 213 if sep != '=': 214 raise QMPShellError( 215 f"Expected a key=value pair, got '{arg!s}'" 216 ) 217 218 value = self.__parse_value(val) 219 optpath = key.split('.') 220 curpath = [] 221 for path in optpath[:-1]: 222 curpath.append(path) 223 obj = parent.get(path, {}) 224 if not isinstance(obj, dict): 225 msg = 'Cannot use "{:s}" as both leaf and non-leaf key' 226 raise QMPShellError(msg.format('.'.join(curpath))) 227 parent[path] = obj 228 parent = obj 229 if optpath[-1] in parent: 230 if isinstance(parent[optpath[-1]], dict): 231 msg = 'Cannot use "{:s}" as both leaf and non-leaf key' 232 raise QMPShellError(msg.format('.'.join(curpath))) 233 raise QMPShellError(f'Cannot set "{key}" multiple times') 234 parent[optpath[-1]] = value 235 236 def __build_cmd(self, cmdline: str) -> Optional[QMPMessage]: 237 """ 238 Build a QMP input object from a user provided command-line in the 239 following format: 240 241 < command-name > [ arg-name1=arg1 ] ... [ arg-nameN=argN ] 242 """ 243 argument_regex = r'''(?:[^\s"']|"(?:\\.|[^"])*"|'(?:\\.|[^'])*')+''' 244 cmdargs = re.findall(argument_regex, cmdline) 245 qmpcmd: QMPMessage 246 247 # Transactional CLI entry: 248 if cmdargs and cmdargs[0] == 'transaction(': 249 self._transmode = True 250 self._actions = [] 251 cmdargs.pop(0) 252 253 # Transactional CLI exit: 254 if cmdargs and cmdargs[0] == ')' and self._transmode: 255 self._transmode = False 256 if len(cmdargs) > 1: 257 msg = 'Unexpected input after close of Transaction sub-shell' 258 raise QMPShellError(msg) 259 qmpcmd = { 260 'execute': 'transaction', 261 'arguments': {'actions': self._actions} 262 } 263 return qmpcmd 264 265 # No args, or no args remaining 266 if not cmdargs: 267 return None 268 269 if self._transmode: 270 # Parse and cache this Transactional Action 271 finalize = False 272 action = {'type': cmdargs[0], 'data': {}} 273 if cmdargs[-1] == ')': 274 cmdargs.pop(-1) 275 finalize = True 276 self.__cli_expr(cmdargs[1:], action['data']) 277 self._actions.append(action) 278 return self.__build_cmd(')') if finalize else None 279 280 # Standard command: parse and return it to be executed. 281 qmpcmd = {'execute': cmdargs[0], 'arguments': {}} 282 self.__cli_expr(cmdargs[1:], qmpcmd['arguments']) 283 return qmpcmd 284 285 def _print(self, qmp_message: object) -> None: 286 jsobj = json.dumps(qmp_message, 287 indent=4 if self.pretty else None, 288 sort_keys=self.pretty) 289 print(str(jsobj)) 290 291 def _execute_cmd(self, cmdline: str) -> bool: 292 try: 293 qmpcmd = self.__build_cmd(cmdline) 294 except Exception as err: 295 print('Error while parsing command line: %s' % err) 296 print('command format: <command-name> ', end=' ') 297 print('[arg-name1=arg1] ... [arg-nameN=argN]') 298 return True 299 # For transaction mode, we may have just cached the action: 300 if qmpcmd is None: 301 return True 302 if self.verbose: 303 self._print(qmpcmd) 304 resp = self.cmd_obj(qmpcmd) 305 if resp is None: 306 print('Disconnected') 307 return False 308 self._print(resp) 309 return True 310 311 def connect(self, negotiate: bool = True) -> None: 312 self._greeting = super().connect(negotiate) 313 self.__completer_setup() 314 315 def show_banner(self, 316 msg: str = 'Welcome to the QMP low-level shell!') -> None: 317 print(msg) 318 if not self._greeting: 319 print('Connected') 320 return 321 version = self._greeting['QMP']['version']['qemu'] 322 print("Connected to QEMU {major}.{minor}.{micro}\n".format(**version)) 323 324 @property 325 def prompt(self) -> str: 326 if self._transmode: 327 return 'TRANS> ' 328 return '(QEMU) ' 329 330 def read_exec_command(self) -> bool: 331 """ 332 Read and execute a command. 333 334 @return True if execution was ok, return False if disconnected. 335 """ 336 try: 337 cmdline = input(self.prompt) 338 except EOFError: 339 print() 340 return False 341 342 if cmdline == '': 343 for event in self.get_events(): 344 print(event) 345 self.clear_events() 346 return True 347 348 return self._execute_cmd(cmdline) 349 350 def repl(self) -> Iterator[None]: 351 self.show_banner() 352 while self.read_exec_command(): 353 yield 354 self.close() 355 356 357class HMPShell(QMPShell): 358 def __init__(self, address: qmp.SocketAddrT, 359 pretty: bool = False, verbose: bool = False): 360 super().__init__(address, pretty, verbose) 361 self.__cpu_index = 0 362 363 def __cmd_completion(self) -> None: 364 for cmd in self.__cmd_passthrough('help')['return'].split('\r\n'): 365 if cmd and cmd[0] != '[' and cmd[0] != '\t': 366 name = cmd.split()[0] # drop help text 367 if name == 'info': 368 continue 369 if name.find('|') != -1: 370 # Command in the form 'foobar|f' or 'f|foobar', take the 371 # full name 372 opt = name.split('|') 373 if len(opt[0]) == 1: 374 name = opt[1] 375 else: 376 name = opt[0] 377 self._completer.append(name) 378 self._completer.append('help ' + name) # help completion 379 380 def __info_completion(self) -> None: 381 for cmd in self.__cmd_passthrough('info')['return'].split('\r\n'): 382 if cmd: 383 self._completer.append('info ' + cmd.split()[1]) 384 385 def __other_completion(self) -> None: 386 # special cases 387 self._completer.append('help info') 388 389 def _fill_completion(self) -> None: 390 self.__cmd_completion() 391 self.__info_completion() 392 self.__other_completion() 393 394 def __cmd_passthrough(self, cmdline: str, 395 cpu_index: int = 0) -> QMPMessage: 396 return self.cmd_obj({ 397 'execute': 'human-monitor-command', 398 'arguments': { 399 'command-line': cmdline, 400 'cpu-index': cpu_index 401 } 402 }) 403 404 def _execute_cmd(self, cmdline: str) -> bool: 405 if cmdline.split()[0] == "cpu": 406 # trap the cpu command, it requires special setting 407 try: 408 idx = int(cmdline.split()[1]) 409 if 'return' not in self.__cmd_passthrough('info version', idx): 410 print('bad CPU index') 411 return True 412 self.__cpu_index = idx 413 except ValueError: 414 print('cpu command takes an integer argument') 415 return True 416 resp = self.__cmd_passthrough(cmdline, self.__cpu_index) 417 if resp is None: 418 print('Disconnected') 419 return False 420 assert 'return' in resp or 'error' in resp 421 if 'return' in resp: 422 # Success 423 if len(resp['return']) > 0: 424 print(resp['return'], end=' ') 425 else: 426 # Error 427 print('%s: %s' % (resp['error']['class'], resp['error']['desc'])) 428 return True 429 430 def show_banner(self, msg: str = 'Welcome to the HMP shell!') -> None: 431 QMPShell.show_banner(self, msg) 432 433 434def die(msg: str) -> NoReturn: 435 sys.stderr.write('ERROR: %s\n' % msg) 436 sys.exit(1) 437 438 439def main() -> None: 440 parser = argparse.ArgumentParser() 441 parser.add_argument('-H', '--hmp', action='store_true', 442 help='Use HMP interface') 443 parser.add_argument('-N', '--skip-negotiation', action='store_true', 444 help='Skip negotiate (for qemu-ga)') 445 parser.add_argument('-v', '--verbose', action='store_true', 446 help='Verbose (echo commands sent and received)') 447 parser.add_argument('-p', '--pretty', action='store_true', 448 help='Pretty-print JSON') 449 450 default_server = os.environ.get('QMP_SOCKET') 451 parser.add_argument('qmp_server', action='store', 452 default=default_server, 453 help='< UNIX socket path | TCP address:port >') 454 455 args = parser.parse_args() 456 if args.qmp_server is None: 457 parser.error("QMP socket or TCP address must be specified") 458 459 shell_class = HMPShell if args.hmp else QMPShell 460 461 try: 462 address = shell_class.parse_address(args.qmp_server) 463 except qmp.QMPBadPortError: 464 parser.error(f"Bad port number: {args.qmp_server}") 465 return # pycharm doesn't know error() is noreturn 466 467 with shell_class(address, args.pretty, args.verbose) as qemu: 468 try: 469 qemu.connect(negotiate=not args.skip_negotiation) 470 except qmp.QMPConnectError: 471 die("Didn't get QMP greeting message") 472 except qmp.QMPCapabilitiesError: 473 die("Couldn't negotiate capabilities") 474 except OSError as err: 475 die(f"Couldn't connect to {args.qmp_server}: {err!s}") 476 477 for _ in qemu.repl(): 478 pass 479 480 481if __name__ == '__main__': 482 main() 483