xref: /qemu/scripts/qmp/qmp-shell (revision d1d14e59895b2ebd2953e9442225ffba56f80e9b)
1#!/usr/bin/env python3
2#
3# Low-level QEMU shell on top of QMP.
4#
5# Copyright (C) 2009, 2010 Red Hat Inc.
6#
7# Authors:
8#  Luiz Capitulino <lcapitulino@redhat.com>
9#
10# This work is licensed under the terms of the GNU GPL, version 2.  See
11# the COPYING file in the top-level directory.
12#
13# Usage:
14#
15# Start QEMU with:
16#
17# # qemu [...] -qmp unix:./qmp-sock,server
18#
19# Run the shell:
20#
21# $ qmp-shell ./qmp-sock
22#
23# Commands have the following format:
24#
25#    < command-name > [ arg-name1=arg1 ] ... [ arg-nameN=argN ]
26#
27# For example:
28#
29# (QEMU) device_add driver=e1000 id=net1
30# {u'return': {}}
31# (QEMU)
32#
33# key=value pairs also support Python or JSON object literal subset notations,
34# without spaces. Dictionaries/objects {} are supported as are arrays [].
35#
36#    example-command arg-name1={'key':'value','obj'={'prop':"value"}}
37#
38# Both JSON and Python formatting should work, including both styles of
39# string literal quotes. Both paradigms of literal values should work,
40# including null/true/false for JSON and None/True/False for Python.
41#
42#
43# Transactions have the following multi-line format:
44#
45#    transaction(
46#    action-name1 [ arg-name1=arg1 ] ... [arg-nameN=argN ]
47#    ...
48#    action-nameN [ arg-name1=arg1 ] ... [arg-nameN=argN ]
49#    )
50#
51# One line transactions are also supported:
52#
53#    transaction( action-name1 ... )
54#
55# For example:
56#
57#     (QEMU) transaction(
58#     TRANS> block-dirty-bitmap-add node=drive0 name=bitmap1
59#     TRANS> block-dirty-bitmap-clear node=drive0 name=bitmap0
60#     TRANS> )
61#     {"return": {}}
62#     (QEMU)
63#
64# Use the -v and -p options to activate the verbose and pretty-print options,
65# which will echo back the properly formatted JSON-compliant QMP that is being
66# sent to QEMU, which is useful for debugging and documentation generation.
67import argparse
68import ast
69import json
70import os
71import re
72import readline
73import sys
74from typing import (
75    Iterator,
76    List,
77    NoReturn,
78    Optional,
79    Sequence,
80)
81
82
83sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', 'python'))
84from qemu import qmp
85from qemu.qmp import QMPMessage
86
87
88class QMPCompleter:
89    # NB: Python 3.9+ will probably allow us to subclass list[str] directly,
90    # but pylint as of today does not know that List[str] is simply 'list'.
91    def __init__(self) -> None:
92        self._matches: List[str] = []
93
94    def append(self, value: str) -> None:
95        return self._matches.append(value)
96
97    def complete(self, text: str, state: int) -> Optional[str]:
98        for cmd in self._matches:
99            if cmd.startswith(text):
100                if state == 0:
101                    return cmd
102                state -= 1
103        return None
104
105
106class QMPShellError(Exception):
107    pass
108
109
110class FuzzyJSON(ast.NodeTransformer):
111    """
112    This extension of ast.NodeTransformer filters literal "true/false/null"
113    values in a Python AST and replaces them by proper "True/False/None" values
114    that Python can properly evaluate.
115    """
116
117    @classmethod
118    def visit_Name(cls,  # pylint: disable=invalid-name
119                   node: ast.Name) -> ast.AST:
120        if node.id == 'true':
121            return ast.Constant(value=True)
122        if node.id == 'false':
123            return ast.Constant(value=False)
124        if node.id == 'null':
125            return ast.Constant(value=None)
126        return node
127
128
129# TODO: QMPShell's interface is a bit ugly (eg. _fill_completion() and
130#       _execute_cmd()). Let's design a better one.
131class QMPShell(qmp.QEMUMonitorProtocol):
132    def __init__(self, address: qmp.SocketAddrT,
133                 pretty: bool = False, verbose: bool = False):
134        super().__init__(address)
135        self._greeting: Optional[QMPMessage] = None
136        self._completer = QMPCompleter()
137        self._transmode = False
138        self._actions: List[QMPMessage] = []
139        self._histfile = os.path.join(os.path.expanduser('~'),
140                                      '.qmp-shell_history')
141        self.pretty = pretty
142        self.verbose = verbose
143
144    def close(self) -> None:
145        # Hook into context manager of parent to save shell history.
146        self._save_history()
147        super().close()
148
149    def _fill_completion(self) -> None:
150        cmds = self.cmd('query-commands')
151        if 'error' in cmds:
152            return
153        for cmd in cmds['return']:
154            self._completer.append(cmd['name'])
155
156    def __completer_setup(self) -> None:
157        self._completer = QMPCompleter()
158        self._fill_completion()
159        readline.set_history_length(1024)
160        readline.set_completer(self._completer.complete)
161        readline.parse_and_bind("tab: complete")
162        # NB: default delimiters conflict with some command names
163        # (eg. query-), clearing everything as it doesn't seem to matter
164        readline.set_completer_delims('')
165        try:
166            readline.read_history_file(self._histfile)
167        except FileNotFoundError:
168            pass
169        except IOError as err:
170            print(f"Failed to read history '{self._histfile}': {err!s}")
171
172    def _save_history(self) -> None:
173        try:
174            readline.write_history_file(self._histfile)
175        except IOError as err:
176            print(f"Failed to save history file '{self._histfile}': {err!s}")
177
178    @classmethod
179    def __parse_value(cls, val: str) -> object:
180        try:
181            return int(val)
182        except ValueError:
183            pass
184
185        if val.lower() == 'true':
186            return True
187        if val.lower() == 'false':
188            return False
189        if val.startswith(('{', '[')):
190            # Try first as pure JSON:
191            try:
192                return json.loads(val)
193            except ValueError:
194                pass
195            # Try once again as FuzzyJSON:
196            try:
197                tree = ast.parse(val, mode='eval')
198                transformed = FuzzyJSON().visit(tree)
199                return ast.literal_eval(transformed)
200            except (SyntaxError, ValueError):
201                pass
202        return val
203
204    def __cli_expr(self,
205                   tokens: Sequence[str],
206                   parent: qmp.QMPObject) -> None:
207        for arg in tokens:
208            (key, sep, val) = arg.partition('=')
209            if sep != '=':
210                raise QMPShellError(
211                    f"Expected a key=value pair, got '{arg!s}'"
212                )
213
214            value = self.__parse_value(val)
215            optpath = key.split('.')
216            curpath = []
217            for path in optpath[:-1]:
218                curpath.append(path)
219                obj = parent.get(path, {})
220                if not isinstance(obj, dict):
221                    msg = 'Cannot use "{:s}" as both leaf and non-leaf key'
222                    raise QMPShellError(msg.format('.'.join(curpath)))
223                parent[path] = obj
224                parent = obj
225            if optpath[-1] in parent:
226                if isinstance(parent[optpath[-1]], dict):
227                    msg = 'Cannot use "{:s}" as both leaf and non-leaf key'
228                    raise QMPShellError(msg.format('.'.join(curpath)))
229                raise QMPShellError(f'Cannot set "{key}" multiple times')
230            parent[optpath[-1]] = value
231
232    def __build_cmd(self, cmdline: str) -> Optional[QMPMessage]:
233        """
234        Build a QMP input object from a user provided command-line in the
235        following format:
236
237            < command-name > [ arg-name1=arg1 ] ... [ arg-nameN=argN ]
238        """
239        argument_regex = r'''(?:[^\s"']|"(?:\\.|[^"])*"|'(?:\\.|[^'])*')+'''
240        cmdargs = re.findall(argument_regex, cmdline)
241        qmpcmd: QMPMessage
242
243        # Transactional CLI entry/exit:
244        if cmdargs[0] == 'transaction(':
245            self._transmode = True
246            cmdargs.pop(0)
247        elif cmdargs[0] == ')' and self._transmode:
248            self._transmode = False
249            if len(cmdargs) > 1:
250                msg = 'Unexpected input after close of Transaction sub-shell'
251                raise QMPShellError(msg)
252            qmpcmd = {
253                'execute': 'transaction',
254                'arguments': {'actions': self._actions}
255            }
256            self._actions = list()
257            return qmpcmd
258
259        # Nothing to process?
260        if not cmdargs:
261            return None
262
263        # Parse and then cache this Transactional Action
264        if self._transmode:
265            finalize = False
266            action = {'type': cmdargs[0], 'data': {}}
267            if cmdargs[-1] == ')':
268                cmdargs.pop(-1)
269                finalize = True
270            self.__cli_expr(cmdargs[1:], action['data'])
271            self._actions.append(action)
272            return self.__build_cmd(')') if finalize else None
273
274        # Standard command: parse and return it to be executed.
275        qmpcmd = {'execute': cmdargs[0], 'arguments': {}}
276        self.__cli_expr(cmdargs[1:], qmpcmd['arguments'])
277        return qmpcmd
278
279    def _print(self, qmp_message: object) -> None:
280        jsobj = json.dumps(qmp_message,
281                           indent=4 if self.pretty else None,
282                           sort_keys=self.pretty)
283        print(str(jsobj))
284
285    def _execute_cmd(self, cmdline: str) -> bool:
286        try:
287            qmpcmd = self.__build_cmd(cmdline)
288        except Exception as err:
289            print('Error while parsing command line: %s' % err)
290            print('command format: <command-name> ', end=' ')
291            print('[arg-name1=arg1] ... [arg-nameN=argN]')
292            return True
293        # For transaction mode, we may have just cached the action:
294        if qmpcmd is None:
295            return True
296        if self.verbose:
297            self._print(qmpcmd)
298        resp = self.cmd_obj(qmpcmd)
299        if resp is None:
300            print('Disconnected')
301            return False
302        self._print(resp)
303        return True
304
305    def connect(self, negotiate: bool = True) -> None:
306        self._greeting = super().connect(negotiate)
307        self.__completer_setup()
308
309    def show_banner(self,
310                    msg: str = 'Welcome to the QMP low-level shell!') -> None:
311        print(msg)
312        if not self._greeting:
313            print('Connected')
314            return
315        version = self._greeting['QMP']['version']['qemu']
316        print("Connected to QEMU {major}.{minor}.{micro}\n".format(**version))
317
318    @property
319    def prompt(self) -> str:
320        if self._transmode:
321            return 'TRANS> '
322        return '(QEMU) '
323
324    def read_exec_command(self) -> bool:
325        """
326        Read and execute a command.
327
328        @return True if execution was ok, return False if disconnected.
329        """
330        try:
331            cmdline = input(self.prompt)
332        except EOFError:
333            print()
334            return False
335
336        if cmdline == '':
337            for event in self.get_events():
338                print(event)
339            self.clear_events()
340            return True
341
342        return self._execute_cmd(cmdline)
343
344    def repl(self) -> Iterator[None]:
345        self.show_banner()
346        while self.read_exec_command():
347            yield
348        self.close()
349
350
351class HMPShell(QMPShell):
352    def __init__(self, address: qmp.SocketAddrT,
353                 pretty: bool = False, verbose: bool = False):
354        super().__init__(address, pretty, verbose)
355        self.__cpu_index = 0
356
357    def __cmd_completion(self) -> None:
358        for cmd in self.__cmd_passthrough('help')['return'].split('\r\n'):
359            if cmd and cmd[0] != '[' and cmd[0] != '\t':
360                name = cmd.split()[0]  # drop help text
361                if name == 'info':
362                    continue
363                if name.find('|') != -1:
364                    # Command in the form 'foobar|f' or 'f|foobar', take the
365                    # full name
366                    opt = name.split('|')
367                    if len(opt[0]) == 1:
368                        name = opt[1]
369                    else:
370                        name = opt[0]
371                self._completer.append(name)
372                self._completer.append('help ' + name)  # help completion
373
374    def __info_completion(self) -> None:
375        for cmd in self.__cmd_passthrough('info')['return'].split('\r\n'):
376            if cmd:
377                self._completer.append('info ' + cmd.split()[1])
378
379    def __other_completion(self) -> None:
380        # special cases
381        self._completer.append('help info')
382
383    def _fill_completion(self) -> None:
384        self.__cmd_completion()
385        self.__info_completion()
386        self.__other_completion()
387
388    def __cmd_passthrough(self, cmdline: str,
389                          cpu_index: int = 0) -> QMPMessage:
390        return self.cmd_obj({
391            'execute': 'human-monitor-command',
392            'arguments': {
393                'command-line': cmdline,
394                'cpu-index': cpu_index
395            }
396        })
397
398    def _execute_cmd(self, cmdline: str) -> bool:
399        if cmdline.split()[0] == "cpu":
400            # trap the cpu command, it requires special setting
401            try:
402                idx = int(cmdline.split()[1])
403                if 'return' not in self.__cmd_passthrough('info version', idx):
404                    print('bad CPU index')
405                    return True
406                self.__cpu_index = idx
407            except ValueError:
408                print('cpu command takes an integer argument')
409                return True
410        resp = self.__cmd_passthrough(cmdline, self.__cpu_index)
411        if resp is None:
412            print('Disconnected')
413            return False
414        assert 'return' in resp or 'error' in resp
415        if 'return' in resp:
416            # Success
417            if len(resp['return']) > 0:
418                print(resp['return'], end=' ')
419        else:
420            # Error
421            print('%s: %s' % (resp['error']['class'], resp['error']['desc']))
422        return True
423
424    def show_banner(self, msg: str = 'Welcome to the HMP shell!') -> None:
425        QMPShell.show_banner(self, msg)
426
427
428def die(msg: str) -> NoReturn:
429    sys.stderr.write('ERROR: %s\n' % msg)
430    sys.exit(1)
431
432
433def main() -> None:
434    parser = argparse.ArgumentParser()
435    parser.add_argument('-H', '--hmp', action='store_true',
436                        help='Use HMP interface')
437    parser.add_argument('-N', '--skip-negotiation', action='store_true',
438                        help='Skip negotiate (for qemu-ga)')
439    parser.add_argument('-v', '--verbose', action='store_true',
440                        help='Verbose (echo commands sent and received)')
441    parser.add_argument('-p', '--pretty', action='store_true',
442                        help='Pretty-print JSON')
443
444    default_server = os.environ.get('QMP_SOCKET')
445    parser.add_argument('qmp_server', action='store',
446                        default=default_server,
447                        help='< UNIX socket path | TCP address:port >')
448
449    args = parser.parse_args()
450    if args.qmp_server is None:
451        parser.error("QMP socket or TCP address must be specified")
452
453    shell_class = HMPShell if args.hmp else QMPShell
454
455    try:
456        address = shell_class.parse_address(args.qmp_server)
457    except qmp.QMPBadPortError:
458        parser.error(f"Bad port number: {args.qmp_server}")
459        return  # pycharm doesn't know error() is noreturn
460
461    with shell_class(address, args.pretty, args.verbose) as qemu:
462        try:
463            qemu.connect(negotiate=not args.skip_negotiation)
464        except qmp.QMPConnectError:
465            die("Didn't get QMP greeting message")
466        except qmp.QMPCapabilitiesError:
467            die("Couldn't negotiate capabilities")
468        except OSError as err:
469            die(f"Couldn't connect to {args.qmp_server}: {err!s}")
470
471        for _ in qemu.repl():
472            pass
473
474
475if __name__ == '__main__':
476    main()
477