xref: /qemu/scripts/qmp/qmp-shell (revision 1acde76328de10beff07c7f1c8146af72813ecd7)
1#!/usr/bin/env python3
2#
3# Low-level QEMU shell on top of QMP.
4#
5# Copyright (C) 2009, 2010 Red Hat Inc.
6#
7# Authors:
8#  Luiz Capitulino <lcapitulino@redhat.com>
9#
10# This work is licensed under the terms of the GNU GPL, version 2.  See
11# the COPYING file in the top-level directory.
12#
13# Usage:
14#
15# Start QEMU with:
16#
17# # qemu [...] -qmp unix:./qmp-sock,server
18#
19# Run the shell:
20#
21# $ qmp-shell ./qmp-sock
22#
23# Commands have the following format:
24#
25#    < command-name > [ arg-name1=arg1 ] ... [ arg-nameN=argN ]
26#
27# For example:
28#
29# (QEMU) device_add driver=e1000 id=net1
30# {u'return': {}}
31# (QEMU)
32#
33# key=value pairs also support Python or JSON object literal subset notations,
34# without spaces. Dictionaries/objects {} are supported as are arrays [].
35#
36#    example-command arg-name1={'key':'value','obj'={'prop':"value"}}
37#
38# Both JSON and Python formatting should work, including both styles of
39# string literal quotes. Both paradigms of literal values should work,
40# including null/true/false for JSON and None/True/False for Python.
41#
42#
43# Transactions have the following multi-line format:
44#
45#    transaction(
46#    action-name1 [ arg-name1=arg1 ] ... [arg-nameN=argN ]
47#    ...
48#    action-nameN [ arg-name1=arg1 ] ... [arg-nameN=argN ]
49#    )
50#
51# One line transactions are also supported:
52#
53#    transaction( action-name1 ... )
54#
55# For example:
56#
57#     (QEMU) transaction(
58#     TRANS> block-dirty-bitmap-add node=drive0 name=bitmap1
59#     TRANS> block-dirty-bitmap-clear node=drive0 name=bitmap0
60#     TRANS> )
61#     {"return": {}}
62#     (QEMU)
63#
64# Use the -v and -p options to activate the verbose and pretty-print options,
65# which will echo back the properly formatted JSON-compliant QMP that is being
66# sent to QEMU, which is useful for debugging and documentation generation.
67import argparse
68import ast
69import atexit
70import json
71import os
72import re
73import readline
74import sys
75
76
77sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', 'python'))
78from qemu import qmp
79
80
81class QMPCompleter:
82    # NB: Python 3.9+ will probably allow us to subclass list[str] directly,
83    # but pylint as of today does not know that List[str] is simply 'list'.
84    def __init__(self) -> None:
85        self._matches: List[str] = []
86
87    def append(self, value: str) -> None:
88        return self._matches.append(value)
89
90    def complete(self, text: str, state: int) -> Optional[str]:
91        for cmd in self._matches:
92            if cmd.startswith(text):
93                if state == 0:
94                    return cmd
95                state -= 1
96        return None
97
98
99class QMPShellError(Exception):
100    pass
101
102
103class FuzzyJSON(ast.NodeTransformer):
104    """
105    This extension of ast.NodeTransformer filters literal "true/false/null"
106    values in a Python AST and replaces them by proper "True/False/None" values
107    that Python can properly evaluate.
108    """
109
110    @classmethod
111    def visit_Name(cls,  # pylint: disable=invalid-name
112                   node: ast.Name) -> ast.AST:
113        if node.id == 'true':
114            return ast.Constant(value=True)
115        if node.id == 'false':
116            return ast.Constant(value=False)
117        if node.id == 'null':
118            return ast.Constant(value=None)
119        return node
120
121
122# TODO: QMPShell's interface is a bit ugly (eg. _fill_completion() and
123#       _execute_cmd()). Let's design a better one.
124class QMPShell(qmp.QEMUMonitorProtocol):
125    def __init__(self, address, pretty=False, verbose=False):
126        super().__init__(self.parse_address(address))
127        self._greeting = None
128        self._completer = QMPCompleter()
129        self._pretty = pretty
130        self._transmode = False
131        self._actions = list()
132        self._histfile = os.path.join(os.path.expanduser('~'),
133                                      '.qmp-shell_history')
134        self.verbose = verbose
135
136    def _fill_completion(self):
137        cmds = self.cmd('query-commands')
138        if 'error' in cmds:
139            return
140        for cmd in cmds['return']:
141            self._completer.append(cmd['name'])
142
143    def __completer_setup(self):
144        self._completer = QMPCompleter()
145        self._fill_completion()
146        readline.set_history_length(1024)
147        readline.set_completer(self._completer.complete)
148        readline.parse_and_bind("tab: complete")
149        # NB: default delimiters conflict with some command names
150        # (eg. query-), clearing everything as it doesn't seem to matter
151        readline.set_completer_delims('')
152        try:
153            readline.read_history_file(self._histfile)
154        except FileNotFoundError:
155            pass
156        except IOError as err:
157            print(f"Failed to read history '{self._histfile}': {err!s}")
158        atexit.register(self.__save_history)
159
160    def __save_history(self):
161        try:
162            readline.write_history_file(self._histfile)
163        except IOError as err:
164            print(f"Failed to save history file '{self._histfile}': {err!s}")
165
166    @classmethod
167    def __parse_value(cls, val):
168        try:
169            return int(val)
170        except ValueError:
171            pass
172
173        if val.lower() == 'true':
174            return True
175        if val.lower() == 'false':
176            return False
177        if val.startswith(('{', '[')):
178            # Try first as pure JSON:
179            try:
180                return json.loads(val)
181            except ValueError:
182                pass
183            # Try once again as FuzzyJSON:
184            try:
185                tree = ast.parse(val, mode='eval')
186                transformed = FuzzyJSON().visit(tree)
187                return ast.literal_eval(transformed)
188            except (SyntaxError, ValueError):
189                pass
190        return val
191
192    def __cli_expr(self, tokens, parent):
193        for arg in tokens:
194            (key, sep, val) = arg.partition('=')
195            if sep != '=':
196                raise QMPShellError(
197                    f"Expected a key=value pair, got '{arg!s}'"
198                )
199
200            value = self.__parse_value(val)
201            optpath = key.split('.')
202            curpath = []
203            for path in optpath[:-1]:
204                curpath.append(path)
205                obj = parent.get(path, {})
206                if not isinstance(obj, dict):
207                    msg = 'Cannot use "{:s}" as both leaf and non-leaf key'
208                    raise QMPShellError(msg.format('.'.join(curpath)))
209                parent[path] = obj
210                parent = obj
211            if optpath[-1] in parent:
212                if isinstance(parent[optpath[-1]], dict):
213                    msg = 'Cannot use "{:s}" as both leaf and non-leaf key'
214                    raise QMPShellError(msg.format('.'.join(curpath)))
215                raise QMPShellError(f'Cannot set "{key}" multiple times')
216            parent[optpath[-1]] = value
217
218    def __build_cmd(self, cmdline):
219        """
220        Build a QMP input object from a user provided command-line in the
221        following format:
222
223            < command-name > [ arg-name1=arg1 ] ... [ arg-nameN=argN ]
224        """
225        argument_regex = r'''(?:[^\s"']|"(?:\\.|[^"])*"|'(?:\\.|[^'])*')+'''
226        cmdargs = re.findall(argument_regex, cmdline)
227
228        # Transactional CLI entry/exit:
229        if cmdargs[0] == 'transaction(':
230            self._transmode = True
231            cmdargs.pop(0)
232        elif cmdargs[0] == ')' and self._transmode:
233            self._transmode = False
234            if len(cmdargs) > 1:
235                msg = 'Unexpected input after close of Transaction sub-shell'
236                raise QMPShellError(msg)
237            qmpcmd = {
238                'execute': 'transaction',
239                'arguments': {'actions': self._actions}
240            }
241            self._actions = list()
242            return qmpcmd
243
244        # Nothing to process?
245        if not cmdargs:
246            return None
247
248        # Parse and then cache this Transactional Action
249        if self._transmode:
250            finalize = False
251            action = {'type': cmdargs[0], 'data': {}}
252            if cmdargs[-1] == ')':
253                cmdargs.pop(-1)
254                finalize = True
255            self.__cli_expr(cmdargs[1:], action['data'])
256            self._actions.append(action)
257            return self.__build_cmd(')') if finalize else None
258
259        # Standard command: parse and return it to be executed.
260        qmpcmd = {'execute': cmdargs[0], 'arguments': {}}
261        self.__cli_expr(cmdargs[1:], qmpcmd['arguments'])
262        return qmpcmd
263
264    def _print(self, qmp_message):
265        indent = None
266        if self._pretty:
267            indent = 4
268        jsobj = json.dumps(qmp_message, indent=indent, sort_keys=self._pretty)
269        print(str(jsobj))
270
271    def _execute_cmd(self, cmdline):
272        try:
273            qmpcmd = self.__build_cmd(cmdline)
274        except Exception as err:
275            print('Error while parsing command line: %s' % err)
276            print('command format: <command-name> ', end=' ')
277            print('[arg-name1=arg1] ... [arg-nameN=argN]')
278            return True
279        # For transaction mode, we may have just cached the action:
280        if qmpcmd is None:
281            return True
282        if self.verbose:
283            self._print(qmpcmd)
284        resp = self.cmd_obj(qmpcmd)
285        if resp is None:
286            print('Disconnected')
287            return False
288        self._print(resp)
289        return True
290
291    def connect(self, negotiate: bool = True):
292        self._greeting = super().connect(negotiate)
293        self.__completer_setup()
294
295    def show_banner(self, msg='Welcome to the QMP low-level shell!'):
296        print(msg)
297        if not self._greeting:
298            print('Connected')
299            return
300        version = self._greeting['QMP']['version']['qemu']
301        print("Connected to QEMU {major}.{minor}.{micro}\n".format(**version))
302
303    @property
304    def prompt(self):
305        if self._transmode:
306            return 'TRANS> '
307        return '(QEMU) '
308
309    def read_exec_command(self):
310        """
311        Read and execute a command.
312
313        @return True if execution was ok, return False if disconnected.
314        """
315        try:
316            cmdline = input(self.prompt)
317        except EOFError:
318            print()
319            return False
320
321        if cmdline == '':
322            for event in self.get_events():
323                print(event)
324            self.clear_events()
325            return True
326
327        return self._execute_cmd(cmdline)
328
329    def repl(self):
330        self.show_banner()
331        while self.read_exec_command():
332            yield
333        self.close()
334
335
336class HMPShell(QMPShell):
337    def __init__(self, address, pretty=False, verbose=False):
338        super().__init__(address, pretty, verbose)
339        self.__cpu_index = 0
340
341    def __cmd_completion(self):
342        for cmd in self.__cmd_passthrough('help')['return'].split('\r\n'):
343            if cmd and cmd[0] != '[' and cmd[0] != '\t':
344                name = cmd.split()[0]  # drop help text
345                if name == 'info':
346                    continue
347                if name.find('|') != -1:
348                    # Command in the form 'foobar|f' or 'f|foobar', take the
349                    # full name
350                    opt = name.split('|')
351                    if len(opt[0]) == 1:
352                        name = opt[1]
353                    else:
354                        name = opt[0]
355                self._completer.append(name)
356                self._completer.append('help ' + name)  # help completion
357
358    def __info_completion(self):
359        for cmd in self.__cmd_passthrough('info')['return'].split('\r\n'):
360            if cmd:
361                self._completer.append('info ' + cmd.split()[1])
362
363    def __other_completion(self):
364        # special cases
365        self._completer.append('help info')
366
367    def _fill_completion(self):
368        self.__cmd_completion()
369        self.__info_completion()
370        self.__other_completion()
371
372    def __cmd_passthrough(self, cmdline, cpu_index=0):
373        return self.cmd_obj({
374            'execute': 'human-monitor-command',
375            'arguments': {
376                'command-line': cmdline,
377                'cpu-index': cpu_index
378            }
379        })
380
381    def _execute_cmd(self, cmdline):
382        if cmdline.split()[0] == "cpu":
383            # trap the cpu command, it requires special setting
384            try:
385                idx = int(cmdline.split()[1])
386                if 'return' not in self.__cmd_passthrough('info version', idx):
387                    print('bad CPU index')
388                    return True
389                self.__cpu_index = idx
390            except ValueError:
391                print('cpu command takes an integer argument')
392                return True
393        resp = self.__cmd_passthrough(cmdline, self.__cpu_index)
394        if resp is None:
395            print('Disconnected')
396            return False
397        assert 'return' in resp or 'error' in resp
398        if 'return' in resp:
399            # Success
400            if len(resp['return']) > 0:
401                print(resp['return'], end=' ')
402        else:
403            # Error
404            print('%s: %s' % (resp['error']['class'], resp['error']['desc']))
405        return True
406
407    def show_banner(self, msg='Welcome to the HMP shell!'):
408        QMPShell.show_banner(self, msg)
409
410
411def die(msg):
412    sys.stderr.write('ERROR: %s\n' % msg)
413    sys.exit(1)
414
415
416def main():
417    parser = argparse.ArgumentParser()
418    parser.add_argument('-H', '--hmp', action='store_true',
419                        help='Use HMP interface')
420    parser.add_argument('-N', '--skip-negotiation', action='store_true',
421                        help='Skip negotiate (for qemu-ga)')
422    parser.add_argument('-v', '--verbose', action='store_true',
423                        help='Verbose (echo commands sent and received)')
424    parser.add_argument('-p', '--pretty', action='store_true',
425                        help='Pretty-print JSON')
426
427    default_server = os.environ.get('QMP_SOCKET')
428    parser.add_argument('qmp_server', action='store',
429                        default=default_server,
430                        help='< UNIX socket path | TCP address:port >')
431
432    args = parser.parse_args()
433    if args.qmp_server is None:
434        parser.error("QMP socket or TCP address must be specified")
435
436    shell_class = HMPShell if args.hmp else QMPShell
437    try:
438        qemu = shell_class(args.qmp_server, args.pretty, args.verbose)
439    except qmp.QMPBadPortError:
440        parser.error(f"Bad port number: {args.qmp_server}")
441        return  # pycharm doesn't know error() is noreturn
442
443    try:
444        qemu.connect(negotiate=not args.skip_negotiation)
445    except qmp.QMPConnectError:
446        die("Didn't get QMP greeting message")
447    except qmp.QMPCapabilitiesError:
448        die("Couldn't negotiate capabilities")
449    except OSError as err:
450        die(f"Couldn't connect to {args.qmp_server}: {err!s}")
451
452    for _ in qemu.repl():
453        pass
454
455
456if __name__ == '__main__':
457    main()
458