xref: /qemu/scripts/qmp/qmp-shell (revision e359c5a8b8e6184c15806d1408de085aab9c268b)
1#!/usr/bin/env python3
2#
3# Copyright (C) 2009, 2010 Red Hat Inc.
4#
5# Authors:
6#  Luiz Capitulino <lcapitulino@redhat.com>
7#
8# This work is licensed under the terms of the GNU GPL, version 2.  See
9# the COPYING file in the top-level directory.
10#
11
12"""
13Low-level QEMU shell on top of QMP.
14
15usage: qmp-shell [-h] [-H] [-N] [-v] [-p] qmp_server
16
17positional arguments:
18  qmp_server            < UNIX socket path | TCP address:port >
19
20optional arguments:
21  -h, --help            show this help message and exit
22  -H, --hmp             Use HMP interface
23  -N, --skip-negotiation
24                        Skip negotiate (for qemu-ga)
25  -v, --verbose         Verbose (echo commands sent and received)
26  -p, --pretty          Pretty-print JSON
27
28
29Start QEMU with:
30
31# qemu [...] -qmp unix:./qmp-sock,server
32
33Run the shell:
34
35$ qmp-shell ./qmp-sock
36
37Commands have the following format:
38
39   < command-name > [ arg-name1=arg1 ] ... [ arg-nameN=argN ]
40
41For example:
42
43(QEMU) device_add driver=e1000 id=net1
44{'return': {}}
45(QEMU)
46
47key=value pairs also support Python or JSON object literal subset notations,
48without spaces. Dictionaries/objects {} are supported as are arrays [].
49
50   example-command arg-name1={'key':'value','obj'={'prop':"value"}}
51
52Both JSON and Python formatting should work, including both styles of
53string literal quotes. Both paradigms of literal values should work,
54including null/true/false for JSON and None/True/False for Python.
55
56
57Transactions have the following multi-line format:
58
59   transaction(
60   action-name1 [ arg-name1=arg1 ] ... [arg-nameN=argN ]
61   ...
62   action-nameN [ arg-name1=arg1 ] ... [arg-nameN=argN ]
63   )
64
65One line transactions are also supported:
66
67   transaction( action-name1 ... )
68
69For example:
70
71    (QEMU) transaction(
72    TRANS> block-dirty-bitmap-add node=drive0 name=bitmap1
73    TRANS> block-dirty-bitmap-clear node=drive0 name=bitmap0
74    TRANS> )
75    {"return": {}}
76    (QEMU)
77
78Use the -v and -p options to activate the verbose and pretty-print options,
79which will echo back the properly formatted JSON-compliant QMP that is being
80sent to QEMU, which is useful for debugging and documentation generation.
81"""
82
83import argparse
84import ast
85import json
86import logging
87import os
88import re
89import readline
90import sys
91from typing import (
92    Iterator,
93    List,
94    NoReturn,
95    Optional,
96    Sequence,
97)
98
99
100sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', 'python'))
101from qemu import qmp
102from qemu.qmp import QMPMessage
103
104
105LOG = logging.getLogger(__name__)
106
107
108class QMPCompleter:
109    """
110    QMPCompleter provides a readline library tab-complete behavior.
111    """
112    # NB: Python 3.9+ will probably allow us to subclass list[str] directly,
113    # but pylint as of today does not know that List[str] is simply 'list'.
114    def __init__(self) -> None:
115        self._matches: List[str] = []
116
117    def append(self, value: str) -> None:
118        """Append a new valid completion to the list of possibilities."""
119        return self._matches.append(value)
120
121    def complete(self, text: str, state: int) -> Optional[str]:
122        """readline.set_completer() callback implementation."""
123        for cmd in self._matches:
124            if cmd.startswith(text):
125                if state == 0:
126                    return cmd
127                state -= 1
128        return None
129
130
131class QMPShellError(qmp.QMPError):
132    """
133    QMP Shell Base error class.
134    """
135
136
137class FuzzyJSON(ast.NodeTransformer):
138    """
139    This extension of ast.NodeTransformer filters literal "true/false/null"
140    values in a Python AST and replaces them by proper "True/False/None" values
141    that Python can properly evaluate.
142    """
143
144    @classmethod
145    def visit_Name(cls,  # pylint: disable=invalid-name
146                   node: ast.Name) -> ast.AST:
147        """
148        Transform Name nodes with certain values into Constant (keyword) nodes.
149        """
150        if node.id == 'true':
151            return ast.Constant(value=True)
152        if node.id == 'false':
153            return ast.Constant(value=False)
154        if node.id == 'null':
155            return ast.Constant(value=None)
156        return node
157
158
159class QMPShell(qmp.QEMUMonitorProtocol):
160    """
161    QMPShell provides a basic readline-based QMP shell.
162
163    :param address: Address of the QMP server.
164    :param pretty: Pretty-print QMP messages.
165    :param verbose: Echo outgoing QMP messages to console.
166    """
167    def __init__(self, address: qmp.SocketAddrT,
168                 pretty: bool = False, verbose: bool = False):
169        super().__init__(address)
170        self._greeting: Optional[QMPMessage] = None
171        self._completer = QMPCompleter()
172        self._transmode = False
173        self._actions: List[QMPMessage] = []
174        self._histfile = os.path.join(os.path.expanduser('~'),
175                                      '.qmp-shell_history')
176        self.pretty = pretty
177        self.verbose = verbose
178
179    def close(self) -> None:
180        # Hook into context manager of parent to save shell history.
181        self._save_history()
182        super().close()
183
184    def _fill_completion(self) -> None:
185        cmds = self.cmd('query-commands')
186        if 'error' in cmds:
187            return
188        for cmd in cmds['return']:
189            self._completer.append(cmd['name'])
190
191    def _completer_setup(self) -> None:
192        self._completer = QMPCompleter()
193        self._fill_completion()
194        readline.set_history_length(1024)
195        readline.set_completer(self._completer.complete)
196        readline.parse_and_bind("tab: complete")
197        # NB: default delimiters conflict with some command names
198        # (eg. query-), clearing everything as it doesn't seem to matter
199        readline.set_completer_delims('')
200        try:
201            readline.read_history_file(self._histfile)
202        except FileNotFoundError:
203            pass
204        except IOError as err:
205            msg = f"Failed to read history '{self._histfile}': {err!s}"
206            LOG.warning(msg)
207
208    def _save_history(self) -> None:
209        try:
210            readline.write_history_file(self._histfile)
211        except IOError as err:
212            msg = f"Failed to save history file '{self._histfile}': {err!s}"
213            LOG.warning(msg)
214
215    @classmethod
216    def _parse_value(cls, val: str) -> object:
217        try:
218            return int(val)
219        except ValueError:
220            pass
221
222        if val.lower() == 'true':
223            return True
224        if val.lower() == 'false':
225            return False
226        if val.startswith(('{', '[')):
227            # Try first as pure JSON:
228            try:
229                return json.loads(val)
230            except ValueError:
231                pass
232            # Try once again as FuzzyJSON:
233            try:
234                tree = ast.parse(val, mode='eval')
235                transformed = FuzzyJSON().visit(tree)
236                return ast.literal_eval(transformed)
237            except (SyntaxError, ValueError):
238                pass
239        return val
240
241    def _cli_expr(self,
242                  tokens: Sequence[str],
243                  parent: qmp.QMPObject) -> None:
244        for arg in tokens:
245            (key, sep, val) = arg.partition('=')
246            if sep != '=':
247                raise QMPShellError(
248                    f"Expected a key=value pair, got '{arg!s}'"
249                )
250
251            value = self._parse_value(val)
252            optpath = key.split('.')
253            curpath = []
254            for path in optpath[:-1]:
255                curpath.append(path)
256                obj = parent.get(path, {})
257                if not isinstance(obj, dict):
258                    msg = 'Cannot use "{:s}" as both leaf and non-leaf key'
259                    raise QMPShellError(msg.format('.'.join(curpath)))
260                parent[path] = obj
261                parent = obj
262            if optpath[-1] in parent:
263                if isinstance(parent[optpath[-1]], dict):
264                    msg = 'Cannot use "{:s}" as both leaf and non-leaf key'
265                    raise QMPShellError(msg.format('.'.join(curpath)))
266                raise QMPShellError(f'Cannot set "{key}" multiple times')
267            parent[optpath[-1]] = value
268
269    def _build_cmd(self, cmdline: str) -> Optional[QMPMessage]:
270        """
271        Build a QMP input object from a user provided command-line in the
272        following format:
273
274            < command-name > [ arg-name1=arg1 ] ... [ arg-nameN=argN ]
275        """
276        argument_regex = r'''(?:[^\s"']|"(?:\\.|[^"])*"|'(?:\\.|[^'])*')+'''
277        cmdargs = re.findall(argument_regex, cmdline)
278        qmpcmd: QMPMessage
279
280        # Transactional CLI entry:
281        if cmdargs and cmdargs[0] == 'transaction(':
282            self._transmode = True
283            self._actions = []
284            cmdargs.pop(0)
285
286        # Transactional CLI exit:
287        if cmdargs and cmdargs[0] == ')' and self._transmode:
288            self._transmode = False
289            if len(cmdargs) > 1:
290                msg = 'Unexpected input after close of Transaction sub-shell'
291                raise QMPShellError(msg)
292            qmpcmd = {
293                'execute': 'transaction',
294                'arguments': {'actions': self._actions}
295            }
296            return qmpcmd
297
298        # No args, or no args remaining
299        if not cmdargs:
300            return None
301
302        if self._transmode:
303            # Parse and cache this Transactional Action
304            finalize = False
305            action = {'type': cmdargs[0], 'data': {}}
306            if cmdargs[-1] == ')':
307                cmdargs.pop(-1)
308                finalize = True
309            self._cli_expr(cmdargs[1:], action['data'])
310            self._actions.append(action)
311            return self._build_cmd(')') if finalize else None
312
313        # Standard command: parse and return it to be executed.
314        qmpcmd = {'execute': cmdargs[0], 'arguments': {}}
315        self._cli_expr(cmdargs[1:], qmpcmd['arguments'])
316        return qmpcmd
317
318    def _print(self, qmp_message: object) -> None:
319        jsobj = json.dumps(qmp_message,
320                           indent=4 if self.pretty else None,
321                           sort_keys=self.pretty)
322        print(str(jsobj))
323
324    def _execute_cmd(self, cmdline: str) -> bool:
325        try:
326            qmpcmd = self._build_cmd(cmdline)
327        except QMPShellError as err:
328            print(
329                f"Error while parsing command line: {err!s}\n"
330                "command format: <command-name> "
331                "[arg-name1=arg1] ... [arg-nameN=argN",
332                file=sys.stderr
333            )
334            return True
335        # For transaction mode, we may have just cached the action:
336        if qmpcmd is None:
337            return True
338        if self.verbose:
339            self._print(qmpcmd)
340        resp = self.cmd_obj(qmpcmd)
341        if resp is None:
342            print('Disconnected')
343            return False
344        self._print(resp)
345        return True
346
347    def connect(self, negotiate: bool = True) -> None:
348        self._greeting = super().connect(negotiate)
349        self._completer_setup()
350
351    def show_banner(self,
352                    msg: str = 'Welcome to the QMP low-level shell!') -> None:
353        """
354        Print to stdio a greeting, and the QEMU version if available.
355        """
356        print(msg)
357        if not self._greeting:
358            print('Connected')
359            return
360        version = self._greeting['QMP']['version']['qemu']
361        print("Connected to QEMU {major}.{minor}.{micro}\n".format(**version))
362
363    @property
364    def prompt(self) -> str:
365        """
366        Return the current shell prompt, including a trailing space.
367        """
368        if self._transmode:
369            return 'TRANS> '
370        return '(QEMU) '
371
372    def read_exec_command(self) -> bool:
373        """
374        Read and execute a command.
375
376        @return True if execution was ok, return False if disconnected.
377        """
378        try:
379            cmdline = input(self.prompt)
380        except EOFError:
381            print()
382            return False
383
384        if cmdline == '':
385            for event in self.get_events():
386                print(event)
387            self.clear_events()
388            return True
389
390        return self._execute_cmd(cmdline)
391
392    def repl(self) -> Iterator[None]:
393        """
394        Return an iterator that implements the REPL.
395        """
396        self.show_banner()
397        while self.read_exec_command():
398            yield
399        self.close()
400
401
402class HMPShell(QMPShell):
403    """
404    HMPShell provides a basic readline-based HMP shell, tunnelled via QMP.
405
406    :param address: Address of the QMP server.
407    :param pretty: Pretty-print QMP messages.
408    :param verbose: Echo outgoing QMP messages to console.
409    """
410    def __init__(self, address: qmp.SocketAddrT,
411                 pretty: bool = False, verbose: bool = False):
412        super().__init__(address, pretty, verbose)
413        self._cpu_index = 0
414
415    def _cmd_completion(self) -> None:
416        for cmd in self._cmd_passthrough('help')['return'].split('\r\n'):
417            if cmd and cmd[0] != '[' and cmd[0] != '\t':
418                name = cmd.split()[0]  # drop help text
419                if name == 'info':
420                    continue
421                if name.find('|') != -1:
422                    # Command in the form 'foobar|f' or 'f|foobar', take the
423                    # full name
424                    opt = name.split('|')
425                    if len(opt[0]) == 1:
426                        name = opt[1]
427                    else:
428                        name = opt[0]
429                self._completer.append(name)
430                self._completer.append('help ' + name)  # help completion
431
432    def _info_completion(self) -> None:
433        for cmd in self._cmd_passthrough('info')['return'].split('\r\n'):
434            if cmd:
435                self._completer.append('info ' + cmd.split()[1])
436
437    def _other_completion(self) -> None:
438        # special cases
439        self._completer.append('help info')
440
441    def _fill_completion(self) -> None:
442        self._cmd_completion()
443        self._info_completion()
444        self._other_completion()
445
446    def _cmd_passthrough(self, cmdline: str,
447                         cpu_index: int = 0) -> QMPMessage:
448        return self.cmd_obj({
449            'execute': 'human-monitor-command',
450            'arguments': {
451                'command-line': cmdline,
452                'cpu-index': cpu_index
453            }
454        })
455
456    def _execute_cmd(self, cmdline: str) -> bool:
457        if cmdline.split()[0] == "cpu":
458            # trap the cpu command, it requires special setting
459            try:
460                idx = int(cmdline.split()[1])
461                if 'return' not in self._cmd_passthrough('info version', idx):
462                    print('bad CPU index')
463                    return True
464                self._cpu_index = idx
465            except ValueError:
466                print('cpu command takes an integer argument')
467                return True
468        resp = self._cmd_passthrough(cmdline, self._cpu_index)
469        if resp is None:
470            print('Disconnected')
471            return False
472        assert 'return' in resp or 'error' in resp
473        if 'return' in resp:
474            # Success
475            if len(resp['return']) > 0:
476                print(resp['return'], end=' ')
477        else:
478            # Error
479            print('%s: %s' % (resp['error']['class'], resp['error']['desc']))
480        return True
481
482    def show_banner(self, msg: str = 'Welcome to the HMP shell!') -> None:
483        QMPShell.show_banner(self, msg)
484
485
486def die(msg: str) -> NoReturn:
487    """Write an error to stderr, then exit with a return code of 1."""
488    sys.stderr.write('ERROR: %s\n' % msg)
489    sys.exit(1)
490
491
492def main() -> None:
493    """
494    qmp-shell entry point: parse command line arguments and start the REPL.
495    """
496    parser = argparse.ArgumentParser()
497    parser.add_argument('-H', '--hmp', action='store_true',
498                        help='Use HMP interface')
499    parser.add_argument('-N', '--skip-negotiation', action='store_true',
500                        help='Skip negotiate (for qemu-ga)')
501    parser.add_argument('-v', '--verbose', action='store_true',
502                        help='Verbose (echo commands sent and received)')
503    parser.add_argument('-p', '--pretty', action='store_true',
504                        help='Pretty-print JSON')
505
506    default_server = os.environ.get('QMP_SOCKET')
507    parser.add_argument('qmp_server', action='store',
508                        default=default_server,
509                        help='< UNIX socket path | TCP address:port >')
510
511    args = parser.parse_args()
512    if args.qmp_server is None:
513        parser.error("QMP socket or TCP address must be specified")
514
515    shell_class = HMPShell if args.hmp else QMPShell
516
517    try:
518        address = shell_class.parse_address(args.qmp_server)
519    except qmp.QMPBadPortError:
520        parser.error(f"Bad port number: {args.qmp_server}")
521        return  # pycharm doesn't know error() is noreturn
522
523    with shell_class(address, args.pretty, args.verbose) as qemu:
524        try:
525            qemu.connect(negotiate=not args.skip_negotiation)
526        except qmp.QMPConnectError:
527            die("Didn't get QMP greeting message")
528        except qmp.QMPCapabilitiesError:
529            die("Couldn't negotiate capabilities")
530        except OSError as err:
531            die(f"Couldn't connect to {args.qmp_server}: {err!s}")
532
533        for _ in qemu.repl():
534            pass
535
536
537if __name__ == '__main__':
538    main()
539