xref: /qemu/scripts/qmp/qmp-shell (revision 90bd8eb8dcdc94da964786ddedd90c30eb54ada7)
1#!/usr/bin/env python3
2#
3# Low-level QEMU shell on top of QMP.
4#
5# Copyright (C) 2009, 2010 Red Hat Inc.
6#
7# Authors:
8#  Luiz Capitulino <lcapitulino@redhat.com>
9#
10# This work is licensed under the terms of the GNU GPL, version 2.  See
11# the COPYING file in the top-level directory.
12#
13# Usage:
14#
15# Start QEMU with:
16#
17# # qemu [...] -qmp unix:./qmp-sock,server
18#
19# Run the shell:
20#
21# $ qmp-shell ./qmp-sock
22#
23# Commands have the following format:
24#
25#    < command-name > [ arg-name1=arg1 ] ... [ arg-nameN=argN ]
26#
27# For example:
28#
29# (QEMU) device_add driver=e1000 id=net1
30# {u'return': {}}
31# (QEMU)
32#
33# key=value pairs also support Python or JSON object literal subset notations,
34# without spaces. Dictionaries/objects {} are supported as are arrays [].
35#
36#    example-command arg-name1={'key':'value','obj'={'prop':"value"}}
37#
38# Both JSON and Python formatting should work, including both styles of
39# string literal quotes. Both paradigms of literal values should work,
40# including null/true/false for JSON and None/True/False for Python.
41#
42#
43# Transactions have the following multi-line format:
44#
45#    transaction(
46#    action-name1 [ arg-name1=arg1 ] ... [arg-nameN=argN ]
47#    ...
48#    action-nameN [ arg-name1=arg1 ] ... [arg-nameN=argN ]
49#    )
50#
51# One line transactions are also supported:
52#
53#    transaction( action-name1 ... )
54#
55# For example:
56#
57#     (QEMU) transaction(
58#     TRANS> block-dirty-bitmap-add node=drive0 name=bitmap1
59#     TRANS> block-dirty-bitmap-clear node=drive0 name=bitmap0
60#     TRANS> )
61#     {"return": {}}
62#     (QEMU)
63#
64# Use the -v and -p options to activate the verbose and pretty-print options,
65# which will echo back the properly formatted JSON-compliant QMP that is being
66# sent to QEMU, which is useful for debugging and documentation generation.
67
68import ast
69import atexit
70import json
71import os
72import re
73import readline
74import sys
75
76
77sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', 'python'))
78from qemu import qmp
79
80
81class QMPCompleter(list):
82    def complete(self, text, state):
83        for cmd in self:
84            if cmd.startswith(text):
85                if state == 0:
86                    return cmd
87                state -= 1
88        return None
89
90
91class QMPShellError(Exception):
92    pass
93
94
95class FuzzyJSON(ast.NodeTransformer):
96    """
97    This extension of ast.NodeTransformer filters literal "true/false/null"
98    values in an AST and replaces them by proper "True/False/None" values that
99    Python can properly evaluate.
100    """
101
102    @classmethod
103    def visit_Name(cls, node):  # pylint: disable=invalid-name
104        if node.id == 'true':
105            node.id = 'True'
106        if node.id == 'false':
107            node.id = 'False'
108        if node.id == 'null':
109            node.id = 'None'
110        return node
111
112
113# TODO: QMPShell's interface is a bit ugly (eg. _fill_completion() and
114#       _execute_cmd()). Let's design a better one.
115class QMPShell(qmp.QEMUMonitorProtocol):
116    def __init__(self, address, pretty=False):
117        super().__init__(self.parse_address(address))
118        self._greeting = None
119        self._completer = None
120        self._pretty = pretty
121        self._transmode = False
122        self._actions = list()
123        self._histfile = os.path.join(os.path.expanduser('~'),
124                                      '.qmp-shell_history')
125        self._verbose = False
126
127    def _fill_completion(self):
128        cmds = self.cmd('query-commands')
129        if 'error' in cmds:
130            return
131        for cmd in cmds['return']:
132            self._completer.append(cmd['name'])
133
134    def __completer_setup(self):
135        self._completer = QMPCompleter()
136        self._fill_completion()
137        readline.set_history_length(1024)
138        readline.set_completer(self._completer.complete)
139        readline.parse_and_bind("tab: complete")
140        # NB: default delimiters conflict with some command names
141        # (eg. query-), clearing everything as it doesn't seem to matter
142        readline.set_completer_delims('')
143        try:
144            readline.read_history_file(self._histfile)
145        except FileNotFoundError:
146            pass
147        except IOError as err:
148            print(f"Failed to read history '{self._histfile}': {err!s}")
149        atexit.register(self.__save_history)
150
151    def __save_history(self):
152        try:
153            readline.write_history_file(self._histfile)
154        except IOError as err:
155            print(f"Failed to save history file '{self._histfile}': {err!s}")
156
157    @classmethod
158    def __parse_value(cls, val):
159        try:
160            return int(val)
161        except ValueError:
162            pass
163
164        if val.lower() == 'true':
165            return True
166        if val.lower() == 'false':
167            return False
168        if val.startswith(('{', '[')):
169            # Try first as pure JSON:
170            try:
171                return json.loads(val)
172            except ValueError:
173                pass
174            # Try once again as FuzzyJSON:
175            try:
176                tree = ast.parse(val, mode='eval')
177                return ast.literal_eval(FuzzyJSON().visit(tree))
178            except SyntaxError:
179                pass
180            except ValueError:
181                pass
182        return val
183
184    def __cli_expr(self, tokens, parent):
185        for arg in tokens:
186            (key, sep, val) = arg.partition('=')
187            if sep != '=':
188                raise QMPShellError(
189                    f"Expected a key=value pair, got '{arg!s}'"
190                )
191
192            value = self.__parse_value(val)
193            optpath = key.split('.')
194            curpath = []
195            for path in optpath[:-1]:
196                curpath.append(path)
197                obj = parent.get(path, {})
198                if not isinstance(obj, dict):
199                    msg = 'Cannot use "{:s}" as both leaf and non-leaf key'
200                    raise QMPShellError(msg.format('.'.join(curpath)))
201                parent[path] = obj
202                parent = obj
203            if optpath[-1] in parent:
204                if isinstance(parent[optpath[-1]], dict):
205                    msg = 'Cannot use "{:s}" as both leaf and non-leaf key'
206                    raise QMPShellError(msg.format('.'.join(curpath)))
207                raise QMPShellError(f'Cannot set "{key}" multiple times')
208            parent[optpath[-1]] = value
209
210    def __build_cmd(self, cmdline):
211        """
212        Build a QMP input object from a user provided command-line in the
213        following format:
214
215            < command-name > [ arg-name1=arg1 ] ... [ arg-nameN=argN ]
216        """
217        argument_regex = r'''(?:[^\s"']|"(?:\\.|[^"])*"|'(?:\\.|[^'])*')+'''
218        cmdargs = re.findall(argument_regex, cmdline)
219
220        # Transactional CLI entry/exit:
221        if cmdargs[0] == 'transaction(':
222            self._transmode = True
223            cmdargs.pop(0)
224        elif cmdargs[0] == ')' and self._transmode:
225            self._transmode = False
226            if len(cmdargs) > 1:
227                msg = 'Unexpected input after close of Transaction sub-shell'
228                raise QMPShellError(msg)
229            qmpcmd = {
230                'execute': 'transaction',
231                'arguments': {'actions': self._actions}
232            }
233            self._actions = list()
234            return qmpcmd
235
236        # Nothing to process?
237        if not cmdargs:
238            return None
239
240        # Parse and then cache this Transactional Action
241        if self._transmode:
242            finalize = False
243            action = {'type': cmdargs[0], 'data': {}}
244            if cmdargs[-1] == ')':
245                cmdargs.pop(-1)
246                finalize = True
247            self.__cli_expr(cmdargs[1:], action['data'])
248            self._actions.append(action)
249            return self.__build_cmd(')') if finalize else None
250
251        # Standard command: parse and return it to be executed.
252        qmpcmd = {'execute': cmdargs[0], 'arguments': {}}
253        self.__cli_expr(cmdargs[1:], qmpcmd['arguments'])
254        return qmpcmd
255
256    def _print(self, qmp_message):
257        indent = None
258        if self._pretty:
259            indent = 4
260        jsobj = json.dumps(qmp_message, indent=indent, sort_keys=self._pretty)
261        print(str(jsobj))
262
263    def _execute_cmd(self, cmdline):
264        try:
265            qmpcmd = self.__build_cmd(cmdline)
266        except Exception as err:
267            print('Error while parsing command line: %s' % err)
268            print('command format: <command-name> ', end=' ')
269            print('[arg-name1=arg1] ... [arg-nameN=argN]')
270            return True
271        # For transaction mode, we may have just cached the action:
272        if qmpcmd is None:
273            return True
274        if self._verbose:
275            self._print(qmpcmd)
276        resp = self.cmd_obj(qmpcmd)
277        if resp is None:
278            print('Disconnected')
279            return False
280        self._print(resp)
281        return True
282
283    def connect(self, negotiate: bool = True):
284        self._greeting = super().connect(negotiate)
285        self.__completer_setup()
286
287    def show_banner(self, msg='Welcome to the QMP low-level shell!'):
288        print(msg)
289        if not self._greeting:
290            print('Connected')
291            return
292        version = self._greeting['QMP']['version']['qemu']
293        print("Connected to QEMU {major}.{minor}.{micro}\n".format(**version))
294
295    def get_prompt(self):
296        if self._transmode:
297            return "TRANS> "
298        return "(QEMU) "
299
300    def read_exec_command(self, prompt):
301        """
302        Read and execute a command.
303
304        @return True if execution was ok, return False if disconnected.
305        """
306        try:
307            cmdline = input(prompt)
308        except EOFError:
309            print()
310            return False
311
312        if cmdline == '':
313            for event in self.get_events():
314                print(event)
315            self.clear_events()
316            return True
317
318        return self._execute_cmd(cmdline)
319
320    def set_verbosity(self, verbose):
321        self._verbose = verbose
322
323
324class HMPShell(QMPShell):
325    def __init__(self, address):
326        super().__init__(address)
327        self.__cpu_index = 0
328
329    def __cmd_completion(self):
330        for cmd in self.__cmd_passthrough('help')['return'].split('\r\n'):
331            if cmd and cmd[0] != '[' and cmd[0] != '\t':
332                name = cmd.split()[0]  # drop help text
333                if name == 'info':
334                    continue
335                if name.find('|') != -1:
336                    # Command in the form 'foobar|f' or 'f|foobar', take the
337                    # full name
338                    opt = name.split('|')
339                    if len(opt[0]) == 1:
340                        name = opt[1]
341                    else:
342                        name = opt[0]
343                self._completer.append(name)
344                self._completer.append('help ' + name)  # help completion
345
346    def __info_completion(self):
347        for cmd in self.__cmd_passthrough('info')['return'].split('\r\n'):
348            if cmd:
349                self._completer.append('info ' + cmd.split()[1])
350
351    def __other_completion(self):
352        # special cases
353        self._completer.append('help info')
354
355    def _fill_completion(self):
356        self.__cmd_completion()
357        self.__info_completion()
358        self.__other_completion()
359
360    def __cmd_passthrough(self, cmdline, cpu_index=0):
361        return self.cmd_obj({
362            'execute': 'human-monitor-command',
363            'arguments': {
364                'command-line': cmdline,
365                'cpu-index': cpu_index
366            }
367        })
368
369    def _execute_cmd(self, cmdline):
370        if cmdline.split()[0] == "cpu":
371            # trap the cpu command, it requires special setting
372            try:
373                idx = int(cmdline.split()[1])
374                if 'return' not in self.__cmd_passthrough('info version', idx):
375                    print('bad CPU index')
376                    return True
377                self.__cpu_index = idx
378            except ValueError:
379                print('cpu command takes an integer argument')
380                return True
381        resp = self.__cmd_passthrough(cmdline, self.__cpu_index)
382        if resp is None:
383            print('Disconnected')
384            return False
385        assert 'return' in resp or 'error' in resp
386        if 'return' in resp:
387            # Success
388            if len(resp['return']) > 0:
389                print(resp['return'], end=' ')
390        else:
391            # Error
392            print('%s: %s' % (resp['error']['class'], resp['error']['desc']))
393        return True
394
395    def show_banner(self, msg='Welcome to the HMP shell!'):
396        QMPShell.show_banner(self, msg)
397
398
399def die(msg):
400    sys.stderr.write('ERROR: %s\n' % msg)
401    sys.exit(1)
402
403
404def fail_cmdline(option=None):
405    if option:
406        sys.stderr.write('ERROR: bad command-line option \'%s\'\n' % option)
407    sys.stderr.write(
408        'qmp-shell [ -v ] [ -p ] [ -H ] [ -N ] '
409        '< UNIX socket path> | < TCP address:port >\n'
410    )
411    sys.stderr.write('    -v     Verbose (echo command sent and received)\n')
412    sys.stderr.write('    -p     Pretty-print JSON\n')
413    sys.stderr.write('    -H     Use HMP interface\n')
414    sys.stderr.write('    -N     Skip negotiate (for qemu-ga)\n')
415    sys.exit(1)
416
417
418def main():
419    addr = ''
420    qemu = None
421    hmp = False
422    pretty = False
423    verbose = False
424    negotiate = True
425
426    try:
427        for arg in sys.argv[1:]:
428            if arg == "-H":
429                if qemu is not None:
430                    fail_cmdline(arg)
431                hmp = True
432            elif arg == "-p":
433                pretty = True
434            elif arg == "-N":
435                negotiate = False
436            elif arg == "-v":
437                verbose = True
438            else:
439                if qemu is not None:
440                    fail_cmdline(arg)
441                if hmp:
442                    qemu = HMPShell(arg)
443                else:
444                    qemu = QMPShell(arg, pretty)
445                addr = arg
446
447        if qemu is None:
448            fail_cmdline()
449    except qmp.QMPBadPortError:
450        die('bad port number in command-line')
451
452    try:
453        qemu.connect(negotiate)
454    except qmp.QMPConnectError:
455        die('Didn\'t get QMP greeting message')
456    except qmp.QMPCapabilitiesError:
457        die('Could not negotiate capabilities')
458    except OSError:
459        die('Could not connect to %s' % addr)
460
461    qemu.show_banner()
462    qemu.set_verbosity(verbose)
463    while qemu.read_exec_command(qemu.get_prompt()):
464        pass
465    qemu.close()
466
467
468if __name__ == '__main__':
469    main()
470