xref: /qemu/scripts/qmp/qmp-shell (revision 26d3ce9e5e42920a6f9c1f481d900e63a636b07d)
1#!/usr/bin/env python3
2#
3# Low-level QEMU shell on top of QMP.
4#
5# Copyright (C) 2009, 2010 Red Hat Inc.
6#
7# Authors:
8#  Luiz Capitulino <lcapitulino@redhat.com>
9#
10# This work is licensed under the terms of the GNU GPL, version 2.  See
11# the COPYING file in the top-level directory.
12#
13# Usage:
14#
15# Start QEMU with:
16#
17# # qemu [...] -qmp unix:./qmp-sock,server
18#
19# Run the shell:
20#
21# $ qmp-shell ./qmp-sock
22#
23# Commands have the following format:
24#
25#    < command-name > [ arg-name1=arg1 ] ... [ arg-nameN=argN ]
26#
27# For example:
28#
29# (QEMU) device_add driver=e1000 id=net1
30# {u'return': {}}
31# (QEMU)
32#
33# key=value pairs also support Python or JSON object literal subset notations,
34# without spaces. Dictionaries/objects {} are supported as are arrays [].
35#
36#    example-command arg-name1={'key':'value','obj'={'prop':"value"}}
37#
38# Both JSON and Python formatting should work, including both styles of
39# string literal quotes. Both paradigms of literal values should work,
40# including null/true/false for JSON and None/True/False for Python.
41#
42#
43# Transactions have the following multi-line format:
44#
45#    transaction(
46#    action-name1 [ arg-name1=arg1 ] ... [arg-nameN=argN ]
47#    ...
48#    action-nameN [ arg-name1=arg1 ] ... [arg-nameN=argN ]
49#    )
50#
51# One line transactions are also supported:
52#
53#    transaction( action-name1 ... )
54#
55# For example:
56#
57#     (QEMU) transaction(
58#     TRANS> block-dirty-bitmap-add node=drive0 name=bitmap1
59#     TRANS> block-dirty-bitmap-clear node=drive0 name=bitmap0
60#     TRANS> )
61#     {"return": {}}
62#     (QEMU)
63#
64# Use the -v and -p options to activate the verbose and pretty-print options,
65# which will echo back the properly formatted JSON-compliant QMP that is being
66# sent to QEMU, which is useful for debugging and documentation generation.
67import argparse
68import ast
69import json
70import logging
71import os
72import re
73import readline
74import sys
75from typing import (
76    Iterator,
77    List,
78    NoReturn,
79    Optional,
80    Sequence,
81)
82
83
84sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', 'python'))
85from qemu import qmp
86from qemu.qmp import QMPMessage
87
88
89LOG = logging.getLogger(__name__)
90
91
92class QMPCompleter:
93    # NB: Python 3.9+ will probably allow us to subclass list[str] directly,
94    # but pylint as of today does not know that List[str] is simply 'list'.
95    def __init__(self) -> None:
96        self._matches: List[str] = []
97
98    def append(self, value: str) -> None:
99        return self._matches.append(value)
100
101    def complete(self, text: str, state: int) -> Optional[str]:
102        for cmd in self._matches:
103            if cmd.startswith(text):
104                if state == 0:
105                    return cmd
106                state -= 1
107        return None
108
109
110class QMPShellError(Exception):
111    pass
112
113
114class FuzzyJSON(ast.NodeTransformer):
115    """
116    This extension of ast.NodeTransformer filters literal "true/false/null"
117    values in a Python AST and replaces them by proper "True/False/None" values
118    that Python can properly evaluate.
119    """
120
121    @classmethod
122    def visit_Name(cls,  # pylint: disable=invalid-name
123                   node: ast.Name) -> ast.AST:
124        if node.id == 'true':
125            return ast.Constant(value=True)
126        if node.id == 'false':
127            return ast.Constant(value=False)
128        if node.id == 'null':
129            return ast.Constant(value=None)
130        return node
131
132
133class QMPShell(qmp.QEMUMonitorProtocol):
134    def __init__(self, address: qmp.SocketAddrT,
135                 pretty: bool = False, verbose: bool = False):
136        super().__init__(address)
137        self._greeting: Optional[QMPMessage] = None
138        self._completer = QMPCompleter()
139        self._transmode = False
140        self._actions: List[QMPMessage] = []
141        self._histfile = os.path.join(os.path.expanduser('~'),
142                                      '.qmp-shell_history')
143        self.pretty = pretty
144        self.verbose = verbose
145
146    def close(self) -> None:
147        # Hook into context manager of parent to save shell history.
148        self._save_history()
149        super().close()
150
151    def _fill_completion(self) -> None:
152        cmds = self.cmd('query-commands')
153        if 'error' in cmds:
154            return
155        for cmd in cmds['return']:
156            self._completer.append(cmd['name'])
157
158    def __completer_setup(self) -> None:
159        self._completer = QMPCompleter()
160        self._fill_completion()
161        readline.set_history_length(1024)
162        readline.set_completer(self._completer.complete)
163        readline.parse_and_bind("tab: complete")
164        # NB: default delimiters conflict with some command names
165        # (eg. query-), clearing everything as it doesn't seem to matter
166        readline.set_completer_delims('')
167        try:
168            readline.read_history_file(self._histfile)
169        except FileNotFoundError:
170            pass
171        except IOError as err:
172            msg = f"Failed to read history '{self._histfile}': {err!s}"
173            LOG.warning(msg)
174
175    def _save_history(self) -> None:
176        try:
177            readline.write_history_file(self._histfile)
178        except IOError as err:
179            msg = f"Failed to save history file '{self._histfile}': {err!s}"
180            LOG.warning(msg)
181
182    @classmethod
183    def __parse_value(cls, val: str) -> object:
184        try:
185            return int(val)
186        except ValueError:
187            pass
188
189        if val.lower() == 'true':
190            return True
191        if val.lower() == 'false':
192            return False
193        if val.startswith(('{', '[')):
194            # Try first as pure JSON:
195            try:
196                return json.loads(val)
197            except ValueError:
198                pass
199            # Try once again as FuzzyJSON:
200            try:
201                tree = ast.parse(val, mode='eval')
202                transformed = FuzzyJSON().visit(tree)
203                return ast.literal_eval(transformed)
204            except (SyntaxError, ValueError):
205                pass
206        return val
207
208    def __cli_expr(self,
209                   tokens: Sequence[str],
210                   parent: qmp.QMPObject) -> None:
211        for arg in tokens:
212            (key, sep, val) = arg.partition('=')
213            if sep != '=':
214                raise QMPShellError(
215                    f"Expected a key=value pair, got '{arg!s}'"
216                )
217
218            value = self.__parse_value(val)
219            optpath = key.split('.')
220            curpath = []
221            for path in optpath[:-1]:
222                curpath.append(path)
223                obj = parent.get(path, {})
224                if not isinstance(obj, dict):
225                    msg = 'Cannot use "{:s}" as both leaf and non-leaf key'
226                    raise QMPShellError(msg.format('.'.join(curpath)))
227                parent[path] = obj
228                parent = obj
229            if optpath[-1] in parent:
230                if isinstance(parent[optpath[-1]], dict):
231                    msg = 'Cannot use "{:s}" as both leaf and non-leaf key'
232                    raise QMPShellError(msg.format('.'.join(curpath)))
233                raise QMPShellError(f'Cannot set "{key}" multiple times')
234            parent[optpath[-1]] = value
235
236    def __build_cmd(self, cmdline: str) -> Optional[QMPMessage]:
237        """
238        Build a QMP input object from a user provided command-line in the
239        following format:
240
241            < command-name > [ arg-name1=arg1 ] ... [ arg-nameN=argN ]
242        """
243        argument_regex = r'''(?:[^\s"']|"(?:\\.|[^"])*"|'(?:\\.|[^'])*')+'''
244        cmdargs = re.findall(argument_regex, cmdline)
245        qmpcmd: QMPMessage
246
247        # Transactional CLI entry:
248        if cmdargs and cmdargs[0] == 'transaction(':
249            self._transmode = True
250            self._actions = []
251            cmdargs.pop(0)
252
253        # Transactional CLI exit:
254        if cmdargs and cmdargs[0] == ')' and self._transmode:
255            self._transmode = False
256            if len(cmdargs) > 1:
257                msg = 'Unexpected input after close of Transaction sub-shell'
258                raise QMPShellError(msg)
259            qmpcmd = {
260                'execute': 'transaction',
261                'arguments': {'actions': self._actions}
262            }
263            return qmpcmd
264
265        # No args, or no args remaining
266        if not cmdargs:
267            return None
268
269        if self._transmode:
270            # Parse and cache this Transactional Action
271            finalize = False
272            action = {'type': cmdargs[0], 'data': {}}
273            if cmdargs[-1] == ')':
274                cmdargs.pop(-1)
275                finalize = True
276            self.__cli_expr(cmdargs[1:], action['data'])
277            self._actions.append(action)
278            return self.__build_cmd(')') if finalize else None
279
280        # Standard command: parse and return it to be executed.
281        qmpcmd = {'execute': cmdargs[0], 'arguments': {}}
282        self.__cli_expr(cmdargs[1:], qmpcmd['arguments'])
283        return qmpcmd
284
285    def _print(self, qmp_message: object) -> None:
286        jsobj = json.dumps(qmp_message,
287                           indent=4 if self.pretty else None,
288                           sort_keys=self.pretty)
289        print(str(jsobj))
290
291    def _execute_cmd(self, cmdline: str) -> bool:
292        try:
293            qmpcmd = self.__build_cmd(cmdline)
294        except QMPShellError as err:
295            print(
296                f"Error while parsing command line: {err!s}\n"
297                "command format: <command-name> "
298                "[arg-name1=arg1] ... [arg-nameN=argN",
299                file=sys.stderr
300            )
301            return True
302        # For transaction mode, we may have just cached the action:
303        if qmpcmd is None:
304            return True
305        if self.verbose:
306            self._print(qmpcmd)
307        resp = self.cmd_obj(qmpcmd)
308        if resp is None:
309            print('Disconnected')
310            return False
311        self._print(resp)
312        return True
313
314    def connect(self, negotiate: bool = True) -> None:
315        self._greeting = super().connect(negotiate)
316        self.__completer_setup()
317
318    def show_banner(self,
319                    msg: str = 'Welcome to the QMP low-level shell!') -> None:
320        print(msg)
321        if not self._greeting:
322            print('Connected')
323            return
324        version = self._greeting['QMP']['version']['qemu']
325        print("Connected to QEMU {major}.{minor}.{micro}\n".format(**version))
326
327    @property
328    def prompt(self) -> str:
329        if self._transmode:
330            return 'TRANS> '
331        return '(QEMU) '
332
333    def read_exec_command(self) -> bool:
334        """
335        Read and execute a command.
336
337        @return True if execution was ok, return False if disconnected.
338        """
339        try:
340            cmdline = input(self.prompt)
341        except EOFError:
342            print()
343            return False
344
345        if cmdline == '':
346            for event in self.get_events():
347                print(event)
348            self.clear_events()
349            return True
350
351        return self._execute_cmd(cmdline)
352
353    def repl(self) -> Iterator[None]:
354        self.show_banner()
355        while self.read_exec_command():
356            yield
357        self.close()
358
359
360class HMPShell(QMPShell):
361    def __init__(self, address: qmp.SocketAddrT,
362                 pretty: bool = False, verbose: bool = False):
363        super().__init__(address, pretty, verbose)
364        self.__cpu_index = 0
365
366    def __cmd_completion(self) -> None:
367        for cmd in self.__cmd_passthrough('help')['return'].split('\r\n'):
368            if cmd and cmd[0] != '[' and cmd[0] != '\t':
369                name = cmd.split()[0]  # drop help text
370                if name == 'info':
371                    continue
372                if name.find('|') != -1:
373                    # Command in the form 'foobar|f' or 'f|foobar', take the
374                    # full name
375                    opt = name.split('|')
376                    if len(opt[0]) == 1:
377                        name = opt[1]
378                    else:
379                        name = opt[0]
380                self._completer.append(name)
381                self._completer.append('help ' + name)  # help completion
382
383    def __info_completion(self) -> None:
384        for cmd in self.__cmd_passthrough('info')['return'].split('\r\n'):
385            if cmd:
386                self._completer.append('info ' + cmd.split()[1])
387
388    def __other_completion(self) -> None:
389        # special cases
390        self._completer.append('help info')
391
392    def _fill_completion(self) -> None:
393        self.__cmd_completion()
394        self.__info_completion()
395        self.__other_completion()
396
397    def __cmd_passthrough(self, cmdline: str,
398                          cpu_index: int = 0) -> QMPMessage:
399        return self.cmd_obj({
400            'execute': 'human-monitor-command',
401            'arguments': {
402                'command-line': cmdline,
403                'cpu-index': cpu_index
404            }
405        })
406
407    def _execute_cmd(self, cmdline: str) -> bool:
408        if cmdline.split()[0] == "cpu":
409            # trap the cpu command, it requires special setting
410            try:
411                idx = int(cmdline.split()[1])
412                if 'return' not in self.__cmd_passthrough('info version', idx):
413                    print('bad CPU index')
414                    return True
415                self.__cpu_index = idx
416            except ValueError:
417                print('cpu command takes an integer argument')
418                return True
419        resp = self.__cmd_passthrough(cmdline, self.__cpu_index)
420        if resp is None:
421            print('Disconnected')
422            return False
423        assert 'return' in resp or 'error' in resp
424        if 'return' in resp:
425            # Success
426            if len(resp['return']) > 0:
427                print(resp['return'], end=' ')
428        else:
429            # Error
430            print('%s: %s' % (resp['error']['class'], resp['error']['desc']))
431        return True
432
433    def show_banner(self, msg: str = 'Welcome to the HMP shell!') -> None:
434        QMPShell.show_banner(self, msg)
435
436
437def die(msg: str) -> NoReturn:
438    sys.stderr.write('ERROR: %s\n' % msg)
439    sys.exit(1)
440
441
442def main() -> None:
443    parser = argparse.ArgumentParser()
444    parser.add_argument('-H', '--hmp', action='store_true',
445                        help='Use HMP interface')
446    parser.add_argument('-N', '--skip-negotiation', action='store_true',
447                        help='Skip negotiate (for qemu-ga)')
448    parser.add_argument('-v', '--verbose', action='store_true',
449                        help='Verbose (echo commands sent and received)')
450    parser.add_argument('-p', '--pretty', action='store_true',
451                        help='Pretty-print JSON')
452
453    default_server = os.environ.get('QMP_SOCKET')
454    parser.add_argument('qmp_server', action='store',
455                        default=default_server,
456                        help='< UNIX socket path | TCP address:port >')
457
458    args = parser.parse_args()
459    if args.qmp_server is None:
460        parser.error("QMP socket or TCP address must be specified")
461
462    shell_class = HMPShell if args.hmp else QMPShell
463
464    try:
465        address = shell_class.parse_address(args.qmp_server)
466    except qmp.QMPBadPortError:
467        parser.error(f"Bad port number: {args.qmp_server}")
468        return  # pycharm doesn't know error() is noreturn
469
470    with shell_class(address, args.pretty, args.verbose) as qemu:
471        try:
472            qemu.connect(negotiate=not args.skip_negotiation)
473        except qmp.QMPConnectError:
474            die("Didn't get QMP greeting message")
475        except qmp.QMPCapabilitiesError:
476            die("Couldn't negotiate capabilities")
477        except OSError as err:
478            die(f"Couldn't connect to {args.qmp_server}: {err!s}")
479
480        for _ in qemu.repl():
481            pass
482
483
484if __name__ == '__main__':
485    main()
486