1#!/usr/bin/env python3 2# 3# Low-level QEMU shell on top of QMP. 4# 5# Copyright (C) 2009, 2010 Red Hat Inc. 6# 7# Authors: 8# Luiz Capitulino <lcapitulino@redhat.com> 9# 10# This work is licensed under the terms of the GNU GPL, version 2. See 11# the COPYING file in the top-level directory. 12# 13# Usage: 14# 15# Start QEMU with: 16# 17# # qemu [...] -qmp unix:./qmp-sock,server 18# 19# Run the shell: 20# 21# $ qmp-shell ./qmp-sock 22# 23# Commands have the following format: 24# 25# < command-name > [ arg-name1=arg1 ] ... [ arg-nameN=argN ] 26# 27# For example: 28# 29# (QEMU) device_add driver=e1000 id=net1 30# {u'return': {}} 31# (QEMU) 32# 33# key=value pairs also support Python or JSON object literal subset notations, 34# without spaces. Dictionaries/objects {} are supported as are arrays []. 35# 36# example-command arg-name1={'key':'value','obj'={'prop':"value"}} 37# 38# Both JSON and Python formatting should work, including both styles of 39# string literal quotes. Both paradigms of literal values should work, 40# including null/true/false for JSON and None/True/False for Python. 41# 42# 43# Transactions have the following multi-line format: 44# 45# transaction( 46# action-name1 [ arg-name1=arg1 ] ... [arg-nameN=argN ] 47# ... 48# action-nameN [ arg-name1=arg1 ] ... [arg-nameN=argN ] 49# ) 50# 51# One line transactions are also supported: 52# 53# transaction( action-name1 ... ) 54# 55# For example: 56# 57# (QEMU) transaction( 58# TRANS> block-dirty-bitmap-add node=drive0 name=bitmap1 59# TRANS> block-dirty-bitmap-clear node=drive0 name=bitmap0 60# TRANS> ) 61# {"return": {}} 62# (QEMU) 63# 64# Use the -v and -p options to activate the verbose and pretty-print options, 65# which will echo back the properly formatted JSON-compliant QMP that is being 66# sent to QEMU, which is useful for debugging and documentation generation. 67import argparse 68import ast 69import atexit 70import json 71import os 72import re 73import readline 74import sys 75 76 77sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', 'python')) 78from qemu import qmp 79 80 81class QMPCompleter: 82 # NB: Python 3.9+ will probably allow us to subclass list[str] directly, 83 # but pylint as of today does not know that List[str] is simply 'list'. 84 def __init__(self) -> None: 85 self._matches: List[str] = [] 86 87 def append(self, value: str) -> None: 88 return self._matches.append(value) 89 90 def complete(self, text: str, state: int) -> Optional[str]: 91 for cmd in self._matches: 92 if cmd.startswith(text): 93 if state == 0: 94 return cmd 95 state -= 1 96 return None 97 98 99class QMPShellError(Exception): 100 pass 101 102 103class FuzzyJSON(ast.NodeTransformer): 104 """ 105 This extension of ast.NodeTransformer filters literal "true/false/null" 106 values in a Python AST and replaces them by proper "True/False/None" values 107 that Python can properly evaluate. 108 """ 109 110 @classmethod 111 def visit_Name(cls, # pylint: disable=invalid-name 112 node: ast.Name) -> ast.AST: 113 if node.id == 'true': 114 return ast.Constant(value=True) 115 if node.id == 'false': 116 return ast.Constant(value=False) 117 if node.id == 'null': 118 return ast.Constant(value=None) 119 return node 120 121 122# TODO: QMPShell's interface is a bit ugly (eg. _fill_completion() and 123# _execute_cmd()). Let's design a better one. 124class QMPShell(qmp.QEMUMonitorProtocol): 125 def __init__(self, address, pretty=False, verbose=False): 126 super().__init__(self.parse_address(address)) 127 self._greeting = None 128 self._completer = QMPCompleter() 129 self._pretty = pretty 130 self._transmode = False 131 self._actions = list() 132 self._histfile = os.path.join(os.path.expanduser('~'), 133 '.qmp-shell_history') 134 self.verbose = verbose 135 136 def _fill_completion(self): 137 cmds = self.cmd('query-commands') 138 if 'error' in cmds: 139 return 140 for cmd in cmds['return']: 141 self._completer.append(cmd['name']) 142 143 def __completer_setup(self): 144 self._completer = QMPCompleter() 145 self._fill_completion() 146 readline.set_history_length(1024) 147 readline.set_completer(self._completer.complete) 148 readline.parse_and_bind("tab: complete") 149 # NB: default delimiters conflict with some command names 150 # (eg. query-), clearing everything as it doesn't seem to matter 151 readline.set_completer_delims('') 152 try: 153 readline.read_history_file(self._histfile) 154 except FileNotFoundError: 155 pass 156 except IOError as err: 157 print(f"Failed to read history '{self._histfile}': {err!s}") 158 atexit.register(self.__save_history) 159 160 def __save_history(self): 161 try: 162 readline.write_history_file(self._histfile) 163 except IOError as err: 164 print(f"Failed to save history file '{self._histfile}': {err!s}") 165 166 @classmethod 167 def __parse_value(cls, val): 168 try: 169 return int(val) 170 except ValueError: 171 pass 172 173 if val.lower() == 'true': 174 return True 175 if val.lower() == 'false': 176 return False 177 if val.startswith(('{', '[')): 178 # Try first as pure JSON: 179 try: 180 return json.loads(val) 181 except ValueError: 182 pass 183 # Try once again as FuzzyJSON: 184 try: 185 tree = ast.parse(val, mode='eval') 186 transformed = FuzzyJSON().visit(tree) 187 return ast.literal_eval(transformed) 188 except (SyntaxError, ValueError): 189 pass 190 return val 191 192 def __cli_expr(self, tokens, parent): 193 for arg in tokens: 194 (key, sep, val) = arg.partition('=') 195 if sep != '=': 196 raise QMPShellError( 197 f"Expected a key=value pair, got '{arg!s}'" 198 ) 199 200 value = self.__parse_value(val) 201 optpath = key.split('.') 202 curpath = [] 203 for path in optpath[:-1]: 204 curpath.append(path) 205 obj = parent.get(path, {}) 206 if not isinstance(obj, dict): 207 msg = 'Cannot use "{:s}" as both leaf and non-leaf key' 208 raise QMPShellError(msg.format('.'.join(curpath))) 209 parent[path] = obj 210 parent = obj 211 if optpath[-1] in parent: 212 if isinstance(parent[optpath[-1]], dict): 213 msg = 'Cannot use "{:s}" as both leaf and non-leaf key' 214 raise QMPShellError(msg.format('.'.join(curpath))) 215 raise QMPShellError(f'Cannot set "{key}" multiple times') 216 parent[optpath[-1]] = value 217 218 def __build_cmd(self, cmdline): 219 """ 220 Build a QMP input object from a user provided command-line in the 221 following format: 222 223 < command-name > [ arg-name1=arg1 ] ... [ arg-nameN=argN ] 224 """ 225 argument_regex = r'''(?:[^\s"']|"(?:\\.|[^"])*"|'(?:\\.|[^'])*')+''' 226 cmdargs = re.findall(argument_regex, cmdline) 227 228 # Transactional CLI entry/exit: 229 if cmdargs[0] == 'transaction(': 230 self._transmode = True 231 cmdargs.pop(0) 232 elif cmdargs[0] == ')' and self._transmode: 233 self._transmode = False 234 if len(cmdargs) > 1: 235 msg = 'Unexpected input after close of Transaction sub-shell' 236 raise QMPShellError(msg) 237 qmpcmd = { 238 'execute': 'transaction', 239 'arguments': {'actions': self._actions} 240 } 241 self._actions = list() 242 return qmpcmd 243 244 # Nothing to process? 245 if not cmdargs: 246 return None 247 248 # Parse and then cache this Transactional Action 249 if self._transmode: 250 finalize = False 251 action = {'type': cmdargs[0], 'data': {}} 252 if cmdargs[-1] == ')': 253 cmdargs.pop(-1) 254 finalize = True 255 self.__cli_expr(cmdargs[1:], action['data']) 256 self._actions.append(action) 257 return self.__build_cmd(')') if finalize else None 258 259 # Standard command: parse and return it to be executed. 260 qmpcmd = {'execute': cmdargs[0], 'arguments': {}} 261 self.__cli_expr(cmdargs[1:], qmpcmd['arguments']) 262 return qmpcmd 263 264 def _print(self, qmp_message): 265 indent = None 266 if self._pretty: 267 indent = 4 268 jsobj = json.dumps(qmp_message, indent=indent, sort_keys=self._pretty) 269 print(str(jsobj)) 270 271 def _execute_cmd(self, cmdline): 272 try: 273 qmpcmd = self.__build_cmd(cmdline) 274 except Exception as err: 275 print('Error while parsing command line: %s' % err) 276 print('command format: <command-name> ', end=' ') 277 print('[arg-name1=arg1] ... [arg-nameN=argN]') 278 return True 279 # For transaction mode, we may have just cached the action: 280 if qmpcmd is None: 281 return True 282 if self.verbose: 283 self._print(qmpcmd) 284 resp = self.cmd_obj(qmpcmd) 285 if resp is None: 286 print('Disconnected') 287 return False 288 self._print(resp) 289 return True 290 291 def connect(self, negotiate: bool = True): 292 self._greeting = super().connect(negotiate) 293 self.__completer_setup() 294 295 def show_banner(self, msg='Welcome to the QMP low-level shell!'): 296 print(msg) 297 if not self._greeting: 298 print('Connected') 299 return 300 version = self._greeting['QMP']['version']['qemu'] 301 print("Connected to QEMU {major}.{minor}.{micro}\n".format(**version)) 302 303 @property 304 def prompt(self): 305 if self._transmode: 306 return 'TRANS> ' 307 return '(QEMU) ' 308 309 def read_exec_command(self): 310 """ 311 Read and execute a command. 312 313 @return True if execution was ok, return False if disconnected. 314 """ 315 try: 316 cmdline = input(self.prompt) 317 except EOFError: 318 print() 319 return False 320 321 if cmdline == '': 322 for event in self.get_events(): 323 print(event) 324 self.clear_events() 325 return True 326 327 return self._execute_cmd(cmdline) 328 329 def repl(self): 330 self.show_banner() 331 while self.read_exec_command(): 332 yield 333 self.close() 334 335 336class HMPShell(QMPShell): 337 def __init__(self, address, pretty=False, verbose=False): 338 super().__init__(address, pretty, verbose) 339 self.__cpu_index = 0 340 341 def __cmd_completion(self): 342 for cmd in self.__cmd_passthrough('help')['return'].split('\r\n'): 343 if cmd and cmd[0] != '[' and cmd[0] != '\t': 344 name = cmd.split()[0] # drop help text 345 if name == 'info': 346 continue 347 if name.find('|') != -1: 348 # Command in the form 'foobar|f' or 'f|foobar', take the 349 # full name 350 opt = name.split('|') 351 if len(opt[0]) == 1: 352 name = opt[1] 353 else: 354 name = opt[0] 355 self._completer.append(name) 356 self._completer.append('help ' + name) # help completion 357 358 def __info_completion(self): 359 for cmd in self.__cmd_passthrough('info')['return'].split('\r\n'): 360 if cmd: 361 self._completer.append('info ' + cmd.split()[1]) 362 363 def __other_completion(self): 364 # special cases 365 self._completer.append('help info') 366 367 def _fill_completion(self): 368 self.__cmd_completion() 369 self.__info_completion() 370 self.__other_completion() 371 372 def __cmd_passthrough(self, cmdline, cpu_index=0): 373 return self.cmd_obj({ 374 'execute': 'human-monitor-command', 375 'arguments': { 376 'command-line': cmdline, 377 'cpu-index': cpu_index 378 } 379 }) 380 381 def _execute_cmd(self, cmdline): 382 if cmdline.split()[0] == "cpu": 383 # trap the cpu command, it requires special setting 384 try: 385 idx = int(cmdline.split()[1]) 386 if 'return' not in self.__cmd_passthrough('info version', idx): 387 print('bad CPU index') 388 return True 389 self.__cpu_index = idx 390 except ValueError: 391 print('cpu command takes an integer argument') 392 return True 393 resp = self.__cmd_passthrough(cmdline, self.__cpu_index) 394 if resp is None: 395 print('Disconnected') 396 return False 397 assert 'return' in resp or 'error' in resp 398 if 'return' in resp: 399 # Success 400 if len(resp['return']) > 0: 401 print(resp['return'], end=' ') 402 else: 403 # Error 404 print('%s: %s' % (resp['error']['class'], resp['error']['desc'])) 405 return True 406 407 def show_banner(self, msg='Welcome to the HMP shell!'): 408 QMPShell.show_banner(self, msg) 409 410 411def die(msg): 412 sys.stderr.write('ERROR: %s\n' % msg) 413 sys.exit(1) 414 415 416def main(): 417 parser = argparse.ArgumentParser() 418 parser.add_argument('-H', '--hmp', action='store_true', 419 help='Use HMP interface') 420 parser.add_argument('-N', '--skip-negotiation', action='store_true', 421 help='Skip negotiate (for qemu-ga)') 422 parser.add_argument('-v', '--verbose', action='store_true', 423 help='Verbose (echo commands sent and received)') 424 parser.add_argument('-p', '--pretty', action='store_true', 425 help='Pretty-print JSON') 426 427 default_server = os.environ.get('QMP_SOCKET') 428 parser.add_argument('qmp_server', action='store', 429 default=default_server, 430 help='< UNIX socket path | TCP address:port >') 431 432 args = parser.parse_args() 433 if args.qmp_server is None: 434 parser.error("QMP socket or TCP address must be specified") 435 436 shell_class = HMPShell if args.hmp else QMPShell 437 try: 438 qemu = shell_class(args.qmp_server, args.pretty, args.verbose) 439 except qmp.QMPBadPortError: 440 parser.error(f"Bad port number: {args.qmp_server}") 441 return # pycharm doesn't know error() is noreturn 442 443 try: 444 qemu.connect(negotiate=not args.skip_negotiation) 445 except qmp.QMPConnectError: 446 die("Didn't get QMP greeting message") 447 except qmp.QMPCapabilitiesError: 448 die("Couldn't negotiate capabilities") 449 except OSError as err: 450 die(f"Couldn't connect to {args.qmp_server}: {err!s}") 451 452 for _ in qemu.repl(): 453 pass 454 455 456if __name__ == '__main__': 457 main() 458