xref: /qemu/scripts/qmp/qmp-shell (revision 7fc29896d237b6cb2db49e65f00882f554fc48c0)
1#!/usr/bin/env python3
2#
3# Copyright (C) 2009, 2010 Red Hat Inc.
4#
5# Authors:
6#  Luiz Capitulino <lcapitulino@redhat.com>
7#
8# This work is licensed under the terms of the GNU GPL, version 2.  See
9# the COPYING file in the top-level directory.
10#
11
12"""
13Low-level QEMU shell on top of QMP.
14
15usage: qmp-shell [-h] [-H] [-N] [-v] [-p] qmp_server
16
17positional arguments:
18  qmp_server            < UNIX socket path | TCP address:port >
19
20optional arguments:
21  -h, --help            show this help message and exit
22  -H, --hmp             Use HMP interface
23  -N, --skip-negotiation
24                        Skip negotiate (for qemu-ga)
25  -v, --verbose         Verbose (echo commands sent and received)
26  -p, --pretty          Pretty-print JSON
27
28
29Start QEMU with:
30
31# qemu [...] -qmp unix:./qmp-sock,server
32
33Run the shell:
34
35$ qmp-shell ./qmp-sock
36
37Commands have the following format:
38
39   < command-name > [ arg-name1=arg1 ] ... [ arg-nameN=argN ]
40
41For example:
42
43(QEMU) device_add driver=e1000 id=net1
44{'return': {}}
45(QEMU)
46
47key=value pairs also support Python or JSON object literal subset notations,
48without spaces. Dictionaries/objects {} are supported as are arrays [].
49
50   example-command arg-name1={'key':'value','obj'={'prop':"value"}}
51
52Both JSON and Python formatting should work, including both styles of
53string literal quotes. Both paradigms of literal values should work,
54including null/true/false for JSON and None/True/False for Python.
55
56
57Transactions have the following multi-line format:
58
59   transaction(
60   action-name1 [ arg-name1=arg1 ] ... [arg-nameN=argN ]
61   ...
62   action-nameN [ arg-name1=arg1 ] ... [arg-nameN=argN ]
63   )
64
65One line transactions are also supported:
66
67   transaction( action-name1 ... )
68
69For example:
70
71    (QEMU) transaction(
72    TRANS> block-dirty-bitmap-add node=drive0 name=bitmap1
73    TRANS> block-dirty-bitmap-clear node=drive0 name=bitmap0
74    TRANS> )
75    {"return": {}}
76    (QEMU)
77
78Use the -v and -p options to activate the verbose and pretty-print options,
79which will echo back the properly formatted JSON-compliant QMP that is being
80sent to QEMU, which is useful for debugging and documentation generation.
81"""
82
83import argparse
84import ast
85import json
86import logging
87import os
88import re
89import readline
90import sys
91from typing import (
92    Iterator,
93    List,
94    NoReturn,
95    Optional,
96    Sequence,
97)
98
99
100sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', 'python'))
101from qemu import qmp
102from qemu.qmp import QMPMessage
103
104
105LOG = logging.getLogger(__name__)
106
107
108class QMPCompleter:
109    # NB: Python 3.9+ will probably allow us to subclass list[str] directly,
110    # but pylint as of today does not know that List[str] is simply 'list'.
111    def __init__(self) -> None:
112        self._matches: List[str] = []
113
114    def append(self, value: str) -> None:
115        return self._matches.append(value)
116
117    def complete(self, text: str, state: int) -> Optional[str]:
118        for cmd in self._matches:
119            if cmd.startswith(text):
120                if state == 0:
121                    return cmd
122                state -= 1
123        return None
124
125
126class QMPShellError(Exception):
127    pass
128
129
130class FuzzyJSON(ast.NodeTransformer):
131    """
132    This extension of ast.NodeTransformer filters literal "true/false/null"
133    values in a Python AST and replaces them by proper "True/False/None" values
134    that Python can properly evaluate.
135    """
136
137    @classmethod
138    def visit_Name(cls,  # pylint: disable=invalid-name
139                   node: ast.Name) -> ast.AST:
140        if node.id == 'true':
141            return ast.Constant(value=True)
142        if node.id == 'false':
143            return ast.Constant(value=False)
144        if node.id == 'null':
145            return ast.Constant(value=None)
146        return node
147
148
149class QMPShell(qmp.QEMUMonitorProtocol):
150    def __init__(self, address: qmp.SocketAddrT,
151                 pretty: bool = False, verbose: bool = False):
152        super().__init__(address)
153        self._greeting: Optional[QMPMessage] = None
154        self._completer = QMPCompleter()
155        self._transmode = False
156        self._actions: List[QMPMessage] = []
157        self._histfile = os.path.join(os.path.expanduser('~'),
158                                      '.qmp-shell_history')
159        self.pretty = pretty
160        self.verbose = verbose
161
162    def close(self) -> None:
163        # Hook into context manager of parent to save shell history.
164        self._save_history()
165        super().close()
166
167    def _fill_completion(self) -> None:
168        cmds = self.cmd('query-commands')
169        if 'error' in cmds:
170            return
171        for cmd in cmds['return']:
172            self._completer.append(cmd['name'])
173
174    def __completer_setup(self) -> None:
175        self._completer = QMPCompleter()
176        self._fill_completion()
177        readline.set_history_length(1024)
178        readline.set_completer(self._completer.complete)
179        readline.parse_and_bind("tab: complete")
180        # NB: default delimiters conflict with some command names
181        # (eg. query-), clearing everything as it doesn't seem to matter
182        readline.set_completer_delims('')
183        try:
184            readline.read_history_file(self._histfile)
185        except FileNotFoundError:
186            pass
187        except IOError as err:
188            msg = f"Failed to read history '{self._histfile}': {err!s}"
189            LOG.warning(msg)
190
191    def _save_history(self) -> None:
192        try:
193            readline.write_history_file(self._histfile)
194        except IOError as err:
195            msg = f"Failed to save history file '{self._histfile}': {err!s}"
196            LOG.warning(msg)
197
198    @classmethod
199    def __parse_value(cls, val: str) -> object:
200        try:
201            return int(val)
202        except ValueError:
203            pass
204
205        if val.lower() == 'true':
206            return True
207        if val.lower() == 'false':
208            return False
209        if val.startswith(('{', '[')):
210            # Try first as pure JSON:
211            try:
212                return json.loads(val)
213            except ValueError:
214                pass
215            # Try once again as FuzzyJSON:
216            try:
217                tree = ast.parse(val, mode='eval')
218                transformed = FuzzyJSON().visit(tree)
219                return ast.literal_eval(transformed)
220            except (SyntaxError, ValueError):
221                pass
222        return val
223
224    def __cli_expr(self,
225                   tokens: Sequence[str],
226                   parent: qmp.QMPObject) -> None:
227        for arg in tokens:
228            (key, sep, val) = arg.partition('=')
229            if sep != '=':
230                raise QMPShellError(
231                    f"Expected a key=value pair, got '{arg!s}'"
232                )
233
234            value = self.__parse_value(val)
235            optpath = key.split('.')
236            curpath = []
237            for path in optpath[:-1]:
238                curpath.append(path)
239                obj = parent.get(path, {})
240                if not isinstance(obj, dict):
241                    msg = 'Cannot use "{:s}" as both leaf and non-leaf key'
242                    raise QMPShellError(msg.format('.'.join(curpath)))
243                parent[path] = obj
244                parent = obj
245            if optpath[-1] in parent:
246                if isinstance(parent[optpath[-1]], dict):
247                    msg = 'Cannot use "{:s}" as both leaf and non-leaf key'
248                    raise QMPShellError(msg.format('.'.join(curpath)))
249                raise QMPShellError(f'Cannot set "{key}" multiple times')
250            parent[optpath[-1]] = value
251
252    def __build_cmd(self, cmdline: str) -> Optional[QMPMessage]:
253        """
254        Build a QMP input object from a user provided command-line in the
255        following format:
256
257            < command-name > [ arg-name1=arg1 ] ... [ arg-nameN=argN ]
258        """
259        argument_regex = r'''(?:[^\s"']|"(?:\\.|[^"])*"|'(?:\\.|[^'])*')+'''
260        cmdargs = re.findall(argument_regex, cmdline)
261        qmpcmd: QMPMessage
262
263        # Transactional CLI entry:
264        if cmdargs and cmdargs[0] == 'transaction(':
265            self._transmode = True
266            self._actions = []
267            cmdargs.pop(0)
268
269        # Transactional CLI exit:
270        if cmdargs and cmdargs[0] == ')' and self._transmode:
271            self._transmode = False
272            if len(cmdargs) > 1:
273                msg = 'Unexpected input after close of Transaction sub-shell'
274                raise QMPShellError(msg)
275            qmpcmd = {
276                'execute': 'transaction',
277                'arguments': {'actions': self._actions}
278            }
279            return qmpcmd
280
281        # No args, or no args remaining
282        if not cmdargs:
283            return None
284
285        if self._transmode:
286            # Parse and cache this Transactional Action
287            finalize = False
288            action = {'type': cmdargs[0], 'data': {}}
289            if cmdargs[-1] == ')':
290                cmdargs.pop(-1)
291                finalize = True
292            self.__cli_expr(cmdargs[1:], action['data'])
293            self._actions.append(action)
294            return self.__build_cmd(')') if finalize else None
295
296        # Standard command: parse and return it to be executed.
297        qmpcmd = {'execute': cmdargs[0], 'arguments': {}}
298        self.__cli_expr(cmdargs[1:], qmpcmd['arguments'])
299        return qmpcmd
300
301    def _print(self, qmp_message: object) -> None:
302        jsobj = json.dumps(qmp_message,
303                           indent=4 if self.pretty else None,
304                           sort_keys=self.pretty)
305        print(str(jsobj))
306
307    def _execute_cmd(self, cmdline: str) -> bool:
308        try:
309            qmpcmd = self.__build_cmd(cmdline)
310        except QMPShellError as err:
311            print(
312                f"Error while parsing command line: {err!s}\n"
313                "command format: <command-name> "
314                "[arg-name1=arg1] ... [arg-nameN=argN",
315                file=sys.stderr
316            )
317            return True
318        # For transaction mode, we may have just cached the action:
319        if qmpcmd is None:
320            return True
321        if self.verbose:
322            self._print(qmpcmd)
323        resp = self.cmd_obj(qmpcmd)
324        if resp is None:
325            print('Disconnected')
326            return False
327        self._print(resp)
328        return True
329
330    def connect(self, negotiate: bool = True) -> None:
331        self._greeting = super().connect(negotiate)
332        self.__completer_setup()
333
334    def show_banner(self,
335                    msg: str = 'Welcome to the QMP low-level shell!') -> None:
336        print(msg)
337        if not self._greeting:
338            print('Connected')
339            return
340        version = self._greeting['QMP']['version']['qemu']
341        print("Connected to QEMU {major}.{minor}.{micro}\n".format(**version))
342
343    @property
344    def prompt(self) -> str:
345        if self._transmode:
346            return 'TRANS> '
347        return '(QEMU) '
348
349    def read_exec_command(self) -> bool:
350        """
351        Read and execute a command.
352
353        @return True if execution was ok, return False if disconnected.
354        """
355        try:
356            cmdline = input(self.prompt)
357        except EOFError:
358            print()
359            return False
360
361        if cmdline == '':
362            for event in self.get_events():
363                print(event)
364            self.clear_events()
365            return True
366
367        return self._execute_cmd(cmdline)
368
369    def repl(self) -> Iterator[None]:
370        self.show_banner()
371        while self.read_exec_command():
372            yield
373        self.close()
374
375
376class HMPShell(QMPShell):
377    def __init__(self, address: qmp.SocketAddrT,
378                 pretty: bool = False, verbose: bool = False):
379        super().__init__(address, pretty, verbose)
380        self.__cpu_index = 0
381
382    def __cmd_completion(self) -> None:
383        for cmd in self.__cmd_passthrough('help')['return'].split('\r\n'):
384            if cmd and cmd[0] != '[' and cmd[0] != '\t':
385                name = cmd.split()[0]  # drop help text
386                if name == 'info':
387                    continue
388                if name.find('|') != -1:
389                    # Command in the form 'foobar|f' or 'f|foobar', take the
390                    # full name
391                    opt = name.split('|')
392                    if len(opt[0]) == 1:
393                        name = opt[1]
394                    else:
395                        name = opt[0]
396                self._completer.append(name)
397                self._completer.append('help ' + name)  # help completion
398
399    def __info_completion(self) -> None:
400        for cmd in self.__cmd_passthrough('info')['return'].split('\r\n'):
401            if cmd:
402                self._completer.append('info ' + cmd.split()[1])
403
404    def __other_completion(self) -> None:
405        # special cases
406        self._completer.append('help info')
407
408    def _fill_completion(self) -> None:
409        self.__cmd_completion()
410        self.__info_completion()
411        self.__other_completion()
412
413    def __cmd_passthrough(self, cmdline: str,
414                          cpu_index: int = 0) -> QMPMessage:
415        return self.cmd_obj({
416            'execute': 'human-monitor-command',
417            'arguments': {
418                'command-line': cmdline,
419                'cpu-index': cpu_index
420            }
421        })
422
423    def _execute_cmd(self, cmdline: str) -> bool:
424        if cmdline.split()[0] == "cpu":
425            # trap the cpu command, it requires special setting
426            try:
427                idx = int(cmdline.split()[1])
428                if 'return' not in self.__cmd_passthrough('info version', idx):
429                    print('bad CPU index')
430                    return True
431                self.__cpu_index = idx
432            except ValueError:
433                print('cpu command takes an integer argument')
434                return True
435        resp = self.__cmd_passthrough(cmdline, self.__cpu_index)
436        if resp is None:
437            print('Disconnected')
438            return False
439        assert 'return' in resp or 'error' in resp
440        if 'return' in resp:
441            # Success
442            if len(resp['return']) > 0:
443                print(resp['return'], end=' ')
444        else:
445            # Error
446            print('%s: %s' % (resp['error']['class'], resp['error']['desc']))
447        return True
448
449    def show_banner(self, msg: str = 'Welcome to the HMP shell!') -> None:
450        QMPShell.show_banner(self, msg)
451
452
453def die(msg: str) -> NoReturn:
454    sys.stderr.write('ERROR: %s\n' % msg)
455    sys.exit(1)
456
457
458def main() -> None:
459    parser = argparse.ArgumentParser()
460    parser.add_argument('-H', '--hmp', action='store_true',
461                        help='Use HMP interface')
462    parser.add_argument('-N', '--skip-negotiation', action='store_true',
463                        help='Skip negotiate (for qemu-ga)')
464    parser.add_argument('-v', '--verbose', action='store_true',
465                        help='Verbose (echo commands sent and received)')
466    parser.add_argument('-p', '--pretty', action='store_true',
467                        help='Pretty-print JSON')
468
469    default_server = os.environ.get('QMP_SOCKET')
470    parser.add_argument('qmp_server', action='store',
471                        default=default_server,
472                        help='< UNIX socket path | TCP address:port >')
473
474    args = parser.parse_args()
475    if args.qmp_server is None:
476        parser.error("QMP socket or TCP address must be specified")
477
478    shell_class = HMPShell if args.hmp else QMPShell
479
480    try:
481        address = shell_class.parse_address(args.qmp_server)
482    except qmp.QMPBadPortError:
483        parser.error(f"Bad port number: {args.qmp_server}")
484        return  # pycharm doesn't know error() is noreturn
485
486    with shell_class(address, args.pretty, args.verbose) as qemu:
487        try:
488            qemu.connect(negotiate=not args.skip_negotiation)
489        except qmp.QMPConnectError:
490            die("Didn't get QMP greeting message")
491        except qmp.QMPCapabilitiesError:
492            die("Couldn't negotiate capabilities")
493        except OSError as err:
494            die(f"Couldn't connect to {args.qmp_server}: {err!s}")
495
496        for _ in qemu.repl():
497            pass
498
499
500if __name__ == '__main__':
501    main()
502