1#!/usr/bin/env python3 2# 3# Low-level QEMU shell on top of QMP. 4# 5# Copyright (C) 2009, 2010 Red Hat Inc. 6# 7# Authors: 8# Luiz Capitulino <lcapitulino@redhat.com> 9# 10# This work is licensed under the terms of the GNU GPL, version 2. See 11# the COPYING file in the top-level directory. 12# 13# Usage: 14# 15# Start QEMU with: 16# 17# # qemu [...] -qmp unix:./qmp-sock,server 18# 19# Run the shell: 20# 21# $ qmp-shell ./qmp-sock 22# 23# Commands have the following format: 24# 25# < command-name > [ arg-name1=arg1 ] ... [ arg-nameN=argN ] 26# 27# For example: 28# 29# (QEMU) device_add driver=e1000 id=net1 30# {u'return': {}} 31# (QEMU) 32# 33# key=value pairs also support Python or JSON object literal subset notations, 34# without spaces. Dictionaries/objects {} are supported as are arrays []. 35# 36# example-command arg-name1={'key':'value','obj'={'prop':"value"}} 37# 38# Both JSON and Python formatting should work, including both styles of 39# string literal quotes. Both paradigms of literal values should work, 40# including null/true/false for JSON and None/True/False for Python. 41# 42# 43# Transactions have the following multi-line format: 44# 45# transaction( 46# action-name1 [ arg-name1=arg1 ] ... [arg-nameN=argN ] 47# ... 48# action-nameN [ arg-name1=arg1 ] ... [arg-nameN=argN ] 49# ) 50# 51# One line transactions are also supported: 52# 53# transaction( action-name1 ... ) 54# 55# For example: 56# 57# (QEMU) transaction( 58# TRANS> block-dirty-bitmap-add node=drive0 name=bitmap1 59# TRANS> block-dirty-bitmap-clear node=drive0 name=bitmap0 60# TRANS> ) 61# {"return": {}} 62# (QEMU) 63# 64# Use the -v and -p options to activate the verbose and pretty-print options, 65# which will echo back the properly formatted JSON-compliant QMP that is being 66# sent to QEMU, which is useful for debugging and documentation generation. 67import argparse 68import ast 69import json 70import logging 71import os 72import re 73import readline 74import sys 75from typing import ( 76 Iterator, 77 List, 78 NoReturn, 79 Optional, 80 Sequence, 81) 82 83 84sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', 'python')) 85from qemu import qmp 86from qemu.qmp import QMPMessage 87 88 89LOG = logging.getLogger(__name__) 90 91 92class QMPCompleter: 93 # NB: Python 3.9+ will probably allow us to subclass list[str] directly, 94 # but pylint as of today does not know that List[str] is simply 'list'. 95 def __init__(self) -> None: 96 self._matches: List[str] = [] 97 98 def append(self, value: str) -> None: 99 return self._matches.append(value) 100 101 def complete(self, text: str, state: int) -> Optional[str]: 102 for cmd in self._matches: 103 if cmd.startswith(text): 104 if state == 0: 105 return cmd 106 state -= 1 107 return None 108 109 110class QMPShellError(Exception): 111 pass 112 113 114class FuzzyJSON(ast.NodeTransformer): 115 """ 116 This extension of ast.NodeTransformer filters literal "true/false/null" 117 values in a Python AST and replaces them by proper "True/False/None" values 118 that Python can properly evaluate. 119 """ 120 121 @classmethod 122 def visit_Name(cls, # pylint: disable=invalid-name 123 node: ast.Name) -> ast.AST: 124 if node.id == 'true': 125 return ast.Constant(value=True) 126 if node.id == 'false': 127 return ast.Constant(value=False) 128 if node.id == 'null': 129 return ast.Constant(value=None) 130 return node 131 132 133class QMPShell(qmp.QEMUMonitorProtocol): 134 def __init__(self, address: qmp.SocketAddrT, 135 pretty: bool = False, verbose: bool = False): 136 super().__init__(address) 137 self._greeting: Optional[QMPMessage] = None 138 self._completer = QMPCompleter() 139 self._transmode = False 140 self._actions: List[QMPMessage] = [] 141 self._histfile = os.path.join(os.path.expanduser('~'), 142 '.qmp-shell_history') 143 self.pretty = pretty 144 self.verbose = verbose 145 146 def close(self) -> None: 147 # Hook into context manager of parent to save shell history. 148 self._save_history() 149 super().close() 150 151 def _fill_completion(self) -> None: 152 cmds = self.cmd('query-commands') 153 if 'error' in cmds: 154 return 155 for cmd in cmds['return']: 156 self._completer.append(cmd['name']) 157 158 def __completer_setup(self) -> None: 159 self._completer = QMPCompleter() 160 self._fill_completion() 161 readline.set_history_length(1024) 162 readline.set_completer(self._completer.complete) 163 readline.parse_and_bind("tab: complete") 164 # NB: default delimiters conflict with some command names 165 # (eg. query-), clearing everything as it doesn't seem to matter 166 readline.set_completer_delims('') 167 try: 168 readline.read_history_file(self._histfile) 169 except FileNotFoundError: 170 pass 171 except IOError as err: 172 msg = f"Failed to read history '{self._histfile}': {err!s}" 173 LOG.warning(msg) 174 175 def _save_history(self) -> None: 176 try: 177 readline.write_history_file(self._histfile) 178 except IOError as err: 179 msg = f"Failed to save history file '{self._histfile}': {err!s}" 180 LOG.warning(msg) 181 182 @classmethod 183 def __parse_value(cls, val: str) -> object: 184 try: 185 return int(val) 186 except ValueError: 187 pass 188 189 if val.lower() == 'true': 190 return True 191 if val.lower() == 'false': 192 return False 193 if val.startswith(('{', '[')): 194 # Try first as pure JSON: 195 try: 196 return json.loads(val) 197 except ValueError: 198 pass 199 # Try once again as FuzzyJSON: 200 try: 201 tree = ast.parse(val, mode='eval') 202 transformed = FuzzyJSON().visit(tree) 203 return ast.literal_eval(transformed) 204 except (SyntaxError, ValueError): 205 pass 206 return val 207 208 def __cli_expr(self, 209 tokens: Sequence[str], 210 parent: qmp.QMPObject) -> None: 211 for arg in tokens: 212 (key, sep, val) = arg.partition('=') 213 if sep != '=': 214 raise QMPShellError( 215 f"Expected a key=value pair, got '{arg!s}'" 216 ) 217 218 value = self.__parse_value(val) 219 optpath = key.split('.') 220 curpath = [] 221 for path in optpath[:-1]: 222 curpath.append(path) 223 obj = parent.get(path, {}) 224 if not isinstance(obj, dict): 225 msg = 'Cannot use "{:s}" as both leaf and non-leaf key' 226 raise QMPShellError(msg.format('.'.join(curpath))) 227 parent[path] = obj 228 parent = obj 229 if optpath[-1] in parent: 230 if isinstance(parent[optpath[-1]], dict): 231 msg = 'Cannot use "{:s}" as both leaf and non-leaf key' 232 raise QMPShellError(msg.format('.'.join(curpath))) 233 raise QMPShellError(f'Cannot set "{key}" multiple times') 234 parent[optpath[-1]] = value 235 236 def __build_cmd(self, cmdline: str) -> Optional[QMPMessage]: 237 """ 238 Build a QMP input object from a user provided command-line in the 239 following format: 240 241 < command-name > [ arg-name1=arg1 ] ... [ arg-nameN=argN ] 242 """ 243 argument_regex = r'''(?:[^\s"']|"(?:\\.|[^"])*"|'(?:\\.|[^'])*')+''' 244 cmdargs = re.findall(argument_regex, cmdline) 245 qmpcmd: QMPMessage 246 247 # Transactional CLI entry/exit: 248 if cmdargs[0] == 'transaction(': 249 self._transmode = True 250 cmdargs.pop(0) 251 elif cmdargs[0] == ')' and self._transmode: 252 self._transmode = False 253 if len(cmdargs) > 1: 254 msg = 'Unexpected input after close of Transaction sub-shell' 255 raise QMPShellError(msg) 256 qmpcmd = { 257 'execute': 'transaction', 258 'arguments': {'actions': self._actions} 259 } 260 self._actions = list() 261 return qmpcmd 262 263 # Nothing to process? 264 if not cmdargs: 265 return None 266 267 # Parse and then cache this Transactional Action 268 if self._transmode: 269 finalize = False 270 action = {'type': cmdargs[0], 'data': {}} 271 if cmdargs[-1] == ')': 272 cmdargs.pop(-1) 273 finalize = True 274 self.__cli_expr(cmdargs[1:], action['data']) 275 self._actions.append(action) 276 return self.__build_cmd(')') if finalize else None 277 278 # Standard command: parse and return it to be executed. 279 qmpcmd = {'execute': cmdargs[0], 'arguments': {}} 280 self.__cli_expr(cmdargs[1:], qmpcmd['arguments']) 281 return qmpcmd 282 283 def _print(self, qmp_message: object) -> None: 284 jsobj = json.dumps(qmp_message, 285 indent=4 if self.pretty else None, 286 sort_keys=self.pretty) 287 print(str(jsobj)) 288 289 def _execute_cmd(self, cmdline: str) -> bool: 290 try: 291 qmpcmd = self.__build_cmd(cmdline) 292 except Exception as err: 293 print('Error while parsing command line: %s' % err) 294 print('command format: <command-name> ', end=' ') 295 print('[arg-name1=arg1] ... [arg-nameN=argN]') 296 return True 297 # For transaction mode, we may have just cached the action: 298 if qmpcmd is None: 299 return True 300 if self.verbose: 301 self._print(qmpcmd) 302 resp = self.cmd_obj(qmpcmd) 303 if resp is None: 304 print('Disconnected') 305 return False 306 self._print(resp) 307 return True 308 309 def connect(self, negotiate: bool = True) -> None: 310 self._greeting = super().connect(negotiate) 311 self.__completer_setup() 312 313 def show_banner(self, 314 msg: str = 'Welcome to the QMP low-level shell!') -> None: 315 print(msg) 316 if not self._greeting: 317 print('Connected') 318 return 319 version = self._greeting['QMP']['version']['qemu'] 320 print("Connected to QEMU {major}.{minor}.{micro}\n".format(**version)) 321 322 @property 323 def prompt(self) -> str: 324 if self._transmode: 325 return 'TRANS> ' 326 return '(QEMU) ' 327 328 def read_exec_command(self) -> bool: 329 """ 330 Read and execute a command. 331 332 @return True if execution was ok, return False if disconnected. 333 """ 334 try: 335 cmdline = input(self.prompt) 336 except EOFError: 337 print() 338 return False 339 340 if cmdline == '': 341 for event in self.get_events(): 342 print(event) 343 self.clear_events() 344 return True 345 346 return self._execute_cmd(cmdline) 347 348 def repl(self) -> Iterator[None]: 349 self.show_banner() 350 while self.read_exec_command(): 351 yield 352 self.close() 353 354 355class HMPShell(QMPShell): 356 def __init__(self, address: qmp.SocketAddrT, 357 pretty: bool = False, verbose: bool = False): 358 super().__init__(address, pretty, verbose) 359 self.__cpu_index = 0 360 361 def __cmd_completion(self) -> None: 362 for cmd in self.__cmd_passthrough('help')['return'].split('\r\n'): 363 if cmd and cmd[0] != '[' and cmd[0] != '\t': 364 name = cmd.split()[0] # drop help text 365 if name == 'info': 366 continue 367 if name.find('|') != -1: 368 # Command in the form 'foobar|f' or 'f|foobar', take the 369 # full name 370 opt = name.split('|') 371 if len(opt[0]) == 1: 372 name = opt[1] 373 else: 374 name = opt[0] 375 self._completer.append(name) 376 self._completer.append('help ' + name) # help completion 377 378 def __info_completion(self) -> None: 379 for cmd in self.__cmd_passthrough('info')['return'].split('\r\n'): 380 if cmd: 381 self._completer.append('info ' + cmd.split()[1]) 382 383 def __other_completion(self) -> None: 384 # special cases 385 self._completer.append('help info') 386 387 def _fill_completion(self) -> None: 388 self.__cmd_completion() 389 self.__info_completion() 390 self.__other_completion() 391 392 def __cmd_passthrough(self, cmdline: str, 393 cpu_index: int = 0) -> QMPMessage: 394 return self.cmd_obj({ 395 'execute': 'human-monitor-command', 396 'arguments': { 397 'command-line': cmdline, 398 'cpu-index': cpu_index 399 } 400 }) 401 402 def _execute_cmd(self, cmdline: str) -> bool: 403 if cmdline.split()[0] == "cpu": 404 # trap the cpu command, it requires special setting 405 try: 406 idx = int(cmdline.split()[1]) 407 if 'return' not in self.__cmd_passthrough('info version', idx): 408 print('bad CPU index') 409 return True 410 self.__cpu_index = idx 411 except ValueError: 412 print('cpu command takes an integer argument') 413 return True 414 resp = self.__cmd_passthrough(cmdline, self.__cpu_index) 415 if resp is None: 416 print('Disconnected') 417 return False 418 assert 'return' in resp or 'error' in resp 419 if 'return' in resp: 420 # Success 421 if len(resp['return']) > 0: 422 print(resp['return'], end=' ') 423 else: 424 # Error 425 print('%s: %s' % (resp['error']['class'], resp['error']['desc'])) 426 return True 427 428 def show_banner(self, msg: str = 'Welcome to the HMP shell!') -> None: 429 QMPShell.show_banner(self, msg) 430 431 432def die(msg: str) -> NoReturn: 433 sys.stderr.write('ERROR: %s\n' % msg) 434 sys.exit(1) 435 436 437def main() -> None: 438 parser = argparse.ArgumentParser() 439 parser.add_argument('-H', '--hmp', action='store_true', 440 help='Use HMP interface') 441 parser.add_argument('-N', '--skip-negotiation', action='store_true', 442 help='Skip negotiate (for qemu-ga)') 443 parser.add_argument('-v', '--verbose', action='store_true', 444 help='Verbose (echo commands sent and received)') 445 parser.add_argument('-p', '--pretty', action='store_true', 446 help='Pretty-print JSON') 447 448 default_server = os.environ.get('QMP_SOCKET') 449 parser.add_argument('qmp_server', action='store', 450 default=default_server, 451 help='< UNIX socket path | TCP address:port >') 452 453 args = parser.parse_args() 454 if args.qmp_server is None: 455 parser.error("QMP socket or TCP address must be specified") 456 457 shell_class = HMPShell if args.hmp else QMPShell 458 459 try: 460 address = shell_class.parse_address(args.qmp_server) 461 except qmp.QMPBadPortError: 462 parser.error(f"Bad port number: {args.qmp_server}") 463 return # pycharm doesn't know error() is noreturn 464 465 with shell_class(address, args.pretty, args.verbose) as qemu: 466 try: 467 qemu.connect(negotiate=not args.skip_negotiation) 468 except qmp.QMPConnectError: 469 die("Didn't get QMP greeting message") 470 except qmp.QMPCapabilitiesError: 471 die("Couldn't negotiate capabilities") 472 except OSError as err: 473 die(f"Couldn't connect to {args.qmp_server}: {err!s}") 474 475 for _ in qemu.repl(): 476 pass 477 478 479if __name__ == '__main__': 480 main() 481