xref: /qemu/tests/qemu-iotests/iotests.py (revision 599f2762ed8c86a6eea03b9f91d49d14a874a95c)
1# Common utilities and Python wrappers for qemu-iotests
2#
3# Copyright (C) 2012 IBM Corp.
4#
5# This program is free software; you can redistribute it and/or modify
6# it under the terms of the GNU General Public License as published by
7# the Free Software Foundation; either version 2 of the License, or
8# (at your option) any later version.
9#
10# This program is distributed in the hope that it will be useful,
11# but WITHOUT ANY WARRANTY; without even the implied warranty of
12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13# GNU General Public License for more details.
14#
15# You should have received a copy of the GNU General Public License
16# along with this program.  If not, see <http://www.gnu.org/licenses/>.
17#
18
19import argparse
20import atexit
21import bz2
22from collections import OrderedDict
23import faulthandler
24import json
25import logging
26import os
27import re
28import shutil
29import signal
30import struct
31import subprocess
32import sys
33import time
34from typing import (Any, Callable, Dict, Iterable, Iterator,
35                    List, Optional, Sequence, TextIO, Tuple, Type, TypeVar)
36import unittest
37
38from contextlib import contextmanager
39
40from qemu.machine import qtest
41from qemu.qmp.legacy import QMPMessage, QMPReturnValue, QEMUMonitorProtocol
42from qemu.utils import VerboseProcessError
43
44# Use this logger for logging messages directly from the iotests module
45logger = logging.getLogger('qemu.iotests')
46logger.addHandler(logging.NullHandler())
47
48# Use this logger for messages that ought to be used for diff output.
49test_logger = logging.getLogger('qemu.iotests.diff_io')
50
51
52faulthandler.enable()
53
54# This will not work if arguments contain spaces but is necessary if we
55# want to support the override options that ./check supports.
56qemu_img_args = [os.environ.get('QEMU_IMG_PROG', 'qemu-img')]
57if os.environ.get('QEMU_IMG_OPTIONS'):
58    qemu_img_args += os.environ['QEMU_IMG_OPTIONS'].strip().split(' ')
59
60qemu_io_args = [os.environ.get('QEMU_IO_PROG', 'qemu-io')]
61if os.environ.get('QEMU_IO_OPTIONS'):
62    qemu_io_args += os.environ['QEMU_IO_OPTIONS'].strip().split(' ')
63
64qemu_io_args_no_fmt = [os.environ.get('QEMU_IO_PROG', 'qemu-io')]
65if os.environ.get('QEMU_IO_OPTIONS_NO_FMT'):
66    qemu_io_args_no_fmt += \
67        os.environ['QEMU_IO_OPTIONS_NO_FMT'].strip().split(' ')
68
69qemu_nbd_prog = os.environ.get('QEMU_NBD_PROG', 'qemu-nbd')
70qemu_nbd_args = [qemu_nbd_prog]
71if os.environ.get('QEMU_NBD_OPTIONS'):
72    qemu_nbd_args += os.environ['QEMU_NBD_OPTIONS'].strip().split(' ')
73
74qemu_prog = os.environ.get('QEMU_PROG', 'qemu')
75qemu_opts = os.environ.get('QEMU_OPTIONS', '').strip().split(' ')
76
77qsd_prog = os.environ.get('QSD_PROG', 'qemu-storage-daemon')
78
79gdb_qemu_env = os.environ.get('GDB_OPTIONS')
80qemu_gdb = []
81if gdb_qemu_env:
82    qemu_gdb = ['gdbserver'] + gdb_qemu_env.strip().split(' ')
83
84qemu_print = os.environ.get('PRINT_QEMU', False)
85
86imgfmt = os.environ.get('IMGFMT', 'raw')
87imgproto = os.environ.get('IMGPROTO', 'file')
88
89try:
90    test_dir = os.environ['TEST_DIR']
91    sock_dir = os.environ['SOCK_DIR']
92    cachemode = os.environ['CACHEMODE']
93    aiomode = os.environ['AIOMODE']
94    qemu_default_machine = os.environ['QEMU_DEFAULT_MACHINE']
95except KeyError:
96    # We are using these variables as proxies to indicate that we're
97    # not being run via "check". There may be other things set up by
98    # "check" that individual test cases rely on.
99    sys.stderr.write('Please run this test via the "check" script\n')
100    sys.exit(os.EX_USAGE)
101
102qemu_valgrind = []
103if os.environ.get('VALGRIND_QEMU') == "y" and \
104    os.environ.get('NO_VALGRIND') != "y":
105    valgrind_logfile = "--log-file=" + test_dir
106    # %p allows to put the valgrind process PID, since
107    # we don't know it a priori (subprocess.Popen is
108    # not yet invoked)
109    valgrind_logfile += "/%p.valgrind"
110
111    qemu_valgrind = ['valgrind', valgrind_logfile, '--error-exitcode=99']
112
113luks_default_secret_object = 'secret,id=keysec0,data=' + \
114                             os.environ.get('IMGKEYSECRET', '')
115luks_default_key_secret_opt = 'key-secret=keysec0'
116
117sample_img_dir = os.environ['SAMPLE_IMG_DIR']
118
119
120@contextmanager
121def change_log_level(
122        logger_name: str, level: int = logging.CRITICAL) -> Iterator[None]:
123    """
124    Utility function for temporarily changing the log level of a logger.
125
126    This can be used to silence errors that are expected or uninteresting.
127    """
128    _logger = logging.getLogger(logger_name)
129    current_level = _logger.level
130    _logger.setLevel(level)
131
132    try:
133        yield
134    finally:
135        _logger.setLevel(current_level)
136
137
138def unarchive_sample_image(sample, fname):
139    sample_fname = os.path.join(sample_img_dir, sample + '.bz2')
140    with bz2.open(sample_fname) as f_in, open(fname, 'wb') as f_out:
141        shutil.copyfileobj(f_in, f_out)
142
143
144def qemu_tool_popen(args: Sequence[str],
145                    connect_stderr: bool = True) -> 'subprocess.Popen[str]':
146    stderr = subprocess.STDOUT if connect_stderr else None
147    # pylint: disable=consider-using-with
148    return subprocess.Popen(args,
149                            stdout=subprocess.PIPE,
150                            stderr=stderr,
151                            universal_newlines=True)
152
153
154def qemu_tool_pipe_and_status(tool: str, args: Sequence[str],
155                              connect_stderr: bool = True,
156                              drop_successful_output: bool = False) \
157        -> Tuple[str, int]:
158    """
159    Run a tool and return both its output and its exit code
160    """
161    with qemu_tool_popen(args, connect_stderr) as subp:
162        output = subp.communicate()[0]
163        if subp.returncode < 0:
164            cmd = ' '.join(args)
165            sys.stderr.write(f'{tool} received signal \
166                               {-subp.returncode}: {cmd}\n')
167        if drop_successful_output and subp.returncode == 0:
168            output = ''
169        return (output, subp.returncode)
170
171def qemu_img_create_prepare_args(args: List[str]) -> List[str]:
172    if not args or args[0] != 'create':
173        return list(args)
174    args = args[1:]
175
176    p = argparse.ArgumentParser(allow_abbrev=False)
177    # -o option may be specified several times
178    p.add_argument('-o', action='append', default=[])
179    p.add_argument('-f')
180    parsed, remaining = p.parse_known_args(args)
181
182    opts_list = parsed.o
183
184    result = ['create']
185    if parsed.f is not None:
186        result += ['-f', parsed.f]
187
188    # IMGOPTS most probably contain options specific for the selected format,
189    # like extended_l2 or compression_type for qcow2. Test may want to create
190    # additional images in other formats that doesn't support these options.
191    # So, use IMGOPTS only for images created in imgfmt format.
192    imgopts = os.environ.get('IMGOPTS')
193    if imgopts and parsed.f == imgfmt:
194        opts_list.insert(0, imgopts)
195
196    # default luks support
197    if parsed.f == 'luks' and \
198            all('key-secret' not in opts for opts in opts_list):
199        result += ['--object', luks_default_secret_object]
200        opts_list.append(luks_default_key_secret_opt)
201
202    for opts in opts_list:
203        result += ['-o', opts]
204
205    result += remaining
206
207    return result
208
209
210def qemu_tool(*args: str, check: bool = True, combine_stdio: bool = True
211              ) -> 'subprocess.CompletedProcess[str]':
212    """
213    Run a qemu tool and return its status code and console output.
214
215    :param args: full command line to run.
216    :param check: Enforce a return code of zero.
217    :param combine_stdio: set to False to keep stdout/stderr separated.
218
219    :raise VerboseProcessError:
220        When the return code is negative, or on any non-zero exit code
221        when 'check=True' was provided (the default). This exception has
222        'stdout', 'stderr', and 'returncode' properties that may be
223        inspected to show greater detail. If this exception is not
224        handled, the command-line, return code, and all console output
225        will be included at the bottom of the stack trace.
226
227    :return:
228        a CompletedProcess. This object has args, returncode, and stdout
229        properties. If streams are not combined, it will also have a
230        stderr property.
231    """
232    subp = subprocess.run(
233        args,
234        stdout=subprocess.PIPE,
235        stderr=subprocess.STDOUT if combine_stdio else subprocess.PIPE,
236        universal_newlines=True,
237        check=False
238    )
239
240    if check and subp.returncode or (subp.returncode < 0):
241        raise VerboseProcessError(
242            subp.returncode, args,
243            output=subp.stdout,
244            stderr=subp.stderr,
245        )
246
247    return subp
248
249
250def qemu_img(*args: str, check: bool = True, combine_stdio: bool = True
251             ) -> 'subprocess.CompletedProcess[str]':
252    """
253    Run QEMU_IMG_PROG and return its status code and console output.
254
255    This function always prepends QEMU_IMG_OPTIONS and may further alter
256    the args for 'create' commands.
257
258    See `qemu_tool()` for greater detail.
259    """
260    full_args = qemu_img_args + qemu_img_create_prepare_args(list(args))
261    return qemu_tool(*full_args, check=check, combine_stdio=combine_stdio)
262
263
264def ordered_qmp(qmsg, conv_keys=True):
265    # Dictionaries are not ordered prior to 3.6, therefore:
266    if isinstance(qmsg, list):
267        return [ordered_qmp(atom) for atom in qmsg]
268    if isinstance(qmsg, dict):
269        od = OrderedDict()
270        for k, v in sorted(qmsg.items()):
271            if conv_keys:
272                k = k.replace('_', '-')
273            od[k] = ordered_qmp(v, conv_keys=False)
274        return od
275    return qmsg
276
277def qemu_img_create(*args: str) -> 'subprocess.CompletedProcess[str]':
278    return qemu_img('create', *args)
279
280def qemu_img_json(*args: str) -> Any:
281    """
282    Run qemu-img and return its output as deserialized JSON.
283
284    :raise CalledProcessError:
285        When qemu-img crashes, or returns a non-zero exit code without
286        producing a valid JSON document to stdout.
287    :raise JSONDecoderError:
288        When qemu-img returns 0, but failed to produce a valid JSON document.
289
290    :return: A deserialized JSON object; probably a dict[str, Any].
291    """
292    try:
293        res = qemu_img(*args, combine_stdio=False)
294    except subprocess.CalledProcessError as exc:
295        # Terminated due to signal. Don't bother.
296        if exc.returncode < 0:
297            raise
298
299        # Commands like 'check' can return failure (exit codes 2 and 3)
300        # to indicate command completion, but with errors found. For
301        # multi-command flexibility, ignore the exact error codes and
302        # *try* to load JSON.
303        try:
304            return json.loads(exc.stdout)
305        except json.JSONDecodeError:
306            # Nope. This thing is toast. Raise the /process/ error.
307            pass
308        raise
309
310    return json.loads(res.stdout)
311
312def qemu_img_measure(*args: str) -> Any:
313    return qemu_img_json("measure", "--output", "json", *args)
314
315def qemu_img_check(*args: str) -> Any:
316    return qemu_img_json("check", "--output", "json", *args)
317
318def qemu_img_info(*args: str) -> Any:
319    return qemu_img_json('info', "--output", "json", *args)
320
321def qemu_img_map(*args: str) -> Any:
322    return qemu_img_json('map', "--output", "json", *args)
323
324def qemu_img_log(*args: str, check: bool = True
325                 ) -> 'subprocess.CompletedProcess[str]':
326    result = qemu_img(*args, check=check)
327    log(result.stdout, filters=[filter_testfiles])
328    return result
329
330def img_info_log(filename: str, filter_path: Optional[str] = None,
331                 use_image_opts: bool = False, extra_args: Sequence[str] = (),
332                 check: bool = True, drop_child_info: bool = True,
333                 ) -> None:
334    args = ['info']
335    if use_image_opts:
336        args.append('--image-opts')
337    else:
338        args += ['-f', imgfmt]
339    args += extra_args
340    args.append(filename)
341
342    output = qemu_img(*args, check=check).stdout
343    if not filter_path:
344        filter_path = filename
345    log(filter_img_info(output, filter_path, drop_child_info))
346
347def qemu_io_wrap_args(args: Sequence[str]) -> List[str]:
348    if '-f' in args or '--image-opts' in args:
349        return qemu_io_args_no_fmt + list(args)
350    else:
351        return qemu_io_args + list(args)
352
353def qemu_io_popen(*args):
354    return qemu_tool_popen(qemu_io_wrap_args(args))
355
356def qemu_io(*args: str, check: bool = True, combine_stdio: bool = True
357            ) -> 'subprocess.CompletedProcess[str]':
358    """
359    Run QEMU_IO_PROG and return the status code and console output.
360
361    This function always prepends either QEMU_IO_OPTIONS or
362    QEMU_IO_OPTIONS_NO_FMT.
363    """
364    return qemu_tool(*qemu_io_wrap_args(args),
365                     check=check, combine_stdio=combine_stdio)
366
367def qemu_io_log(*args: str, check: bool = True
368                ) -> 'subprocess.CompletedProcess[str]':
369    result = qemu_io(*args, check=check)
370    log(result.stdout, filters=[filter_testfiles, filter_qemu_io])
371    return result
372
373class QemuIoInteractive:
374    def __init__(self, *args):
375        self.args = qemu_io_wrap_args(args)
376        # We need to keep the Popen objext around, and not
377        # close it immediately. Therefore, disable the pylint check:
378        # pylint: disable=consider-using-with
379        self._p = subprocess.Popen(self.args, stdin=subprocess.PIPE,
380                                   stdout=subprocess.PIPE,
381                                   stderr=subprocess.STDOUT,
382                                   universal_newlines=True)
383        out = self._p.stdout.read(9)
384        if out != 'qemu-io> ':
385            # Most probably qemu-io just failed to start.
386            # Let's collect the whole output and exit.
387            out += self._p.stdout.read()
388            self._p.wait(timeout=1)
389            raise ValueError(out)
390
391    def close(self):
392        self._p.communicate('q\n')
393
394    def _read_output(self):
395        pattern = 'qemu-io> '
396        n = len(pattern)
397        pos = 0
398        s = []
399        while pos != n:
400            c = self._p.stdout.read(1)
401            # check unexpected EOF
402            assert c != ''
403            s.append(c)
404            if c == pattern[pos]:
405                pos += 1
406            else:
407                pos = 0
408
409        return ''.join(s[:-n])
410
411    def cmd(self, cmd):
412        # quit command is in close(), '\n' is added automatically
413        assert '\n' not in cmd
414        cmd = cmd.strip()
415        assert cmd not in ('q', 'quit')
416        self._p.stdin.write(cmd + '\n')
417        self._p.stdin.flush()
418        return self._read_output()
419
420
421class QemuStorageDaemon:
422    _qmp: Optional[QEMUMonitorProtocol] = None
423    _qmpsock: Optional[str] = None
424    # Python < 3.8 would complain if this type were not a string literal
425    # (importing `annotations` from `__future__` would work; but not on <= 3.6)
426    _p: 'Optional[subprocess.Popen[bytes]]' = None
427
428    def __init__(self, *args: str, instance_id: str = 'a', qmp: bool = False):
429        assert '--pidfile' not in args
430        self.pidfile = os.path.join(test_dir, f'qsd-{instance_id}-pid')
431        all_args = [qsd_prog] + list(args) + ['--pidfile', self.pidfile]
432
433        if qmp:
434            self._qmpsock = os.path.join(sock_dir, f'qsd-{instance_id}.sock')
435            all_args += ['--chardev',
436                         f'socket,id=qmp-sock,path={self._qmpsock}',
437                         '--monitor', 'qmp-sock']
438
439            self._qmp = QEMUMonitorProtocol(self._qmpsock, server=True)
440
441        # Cannot use with here, we want the subprocess to stay around
442        # pylint: disable=consider-using-with
443        self._p = subprocess.Popen(all_args)
444        if self._qmp is not None:
445            self._qmp.accept()
446        while not os.path.exists(self.pidfile):
447            if self._p.poll() is not None:
448                cmd = ' '.join(all_args)
449                raise RuntimeError(
450                    'qemu-storage-daemon terminated with exit code ' +
451                    f'{self._p.returncode}: {cmd}')
452
453            time.sleep(0.01)
454
455        with open(self.pidfile, encoding='utf-8') as f:
456            self._pid = int(f.read().strip())
457
458        assert self._pid == self._p.pid
459
460    def qmp(self, cmd: str, args: Optional[Dict[str, object]] = None) \
461            -> QMPMessage:
462        assert self._qmp is not None
463        return self._qmp.cmd_raw(cmd, args)
464
465    def get_qmp(self) -> QEMUMonitorProtocol:
466        assert self._qmp is not None
467        return self._qmp
468
469    def cmd(self, cmd: str, args: Optional[Dict[str, object]] = None) \
470            -> QMPReturnValue:
471        assert self._qmp is not None
472        return self._qmp.cmd(cmd, **(args or {}))
473
474    def stop(self, kill_signal=15):
475        self._p.send_signal(kill_signal)
476        self._p.wait()
477        self._p = None
478
479        if self._qmp:
480            self._qmp.close()
481
482        if self._qmpsock is not None:
483            try:
484                os.remove(self._qmpsock)
485            except OSError:
486                pass
487        try:
488            os.remove(self.pidfile)
489        except OSError:
490            pass
491
492    def __del__(self):
493        if self._p is not None:
494            self.stop(kill_signal=9)
495
496
497def qemu_nbd(*args):
498    '''Run qemu-nbd in daemon mode and return the parent's exit code'''
499    return subprocess.call(qemu_nbd_args + ['--fork'] + list(args))
500
501def qemu_nbd_early_pipe(*args: str) -> Tuple[int, str]:
502    '''Run qemu-nbd in daemon mode and return both the parent's exit code
503       and its output in case of an error'''
504    full_args = qemu_nbd_args + ['--fork'] + list(args)
505    output, returncode = qemu_tool_pipe_and_status('qemu-nbd', full_args,
506                                                   connect_stderr=False)
507    return returncode, output if returncode else ''
508
509def qemu_nbd_list_log(*args: str) -> str:
510    '''Run qemu-nbd to list remote exports'''
511    full_args = [qemu_nbd_prog, '-L'] + list(args)
512    output, _ = qemu_tool_pipe_and_status('qemu-nbd', full_args)
513    log(output, filters=[filter_testfiles, filter_nbd_exports])
514    return output
515
516@contextmanager
517def qemu_nbd_popen(*args):
518    '''Context manager running qemu-nbd within the context'''
519    pid_file = file_path("qemu_nbd_popen-nbd-pid-file")
520
521    assert not os.path.exists(pid_file)
522
523    cmd = list(qemu_nbd_args)
524    cmd.extend(('--persistent', '--pid-file', pid_file))
525    cmd.extend(args)
526
527    log('Start NBD server')
528    with subprocess.Popen(cmd) as p:
529        try:
530            while not os.path.exists(pid_file):
531                if p.poll() is not None:
532                    raise RuntimeError(
533                        "qemu-nbd terminated with exit code {}: {}"
534                        .format(p.returncode, ' '.join(cmd)))
535
536                time.sleep(0.01)
537            yield
538        finally:
539            if os.path.exists(pid_file):
540                os.remove(pid_file)
541            log('Kill NBD server')
542            p.kill()
543            p.wait()
544
545def compare_images(img1: str, img2: str,
546                   fmt1: str = imgfmt, fmt2: str = imgfmt) -> bool:
547    """
548    Compare two images with QEMU_IMG; return True if they are identical.
549
550    :raise CalledProcessError:
551        when qemu-img crashes or returns a status code of anything other
552        than 0 (identical) or 1 (different).
553    """
554    try:
555        qemu_img('compare', '-f', fmt1, '-F', fmt2, img1, img2)
556        return True
557    except subprocess.CalledProcessError as exc:
558        if exc.returncode == 1:
559            return False
560        raise
561
562def create_image(name, size):
563    '''Create a fully-allocated raw image with sector markers'''
564    with open(name, 'wb') as file:
565        i = 0
566        while i < size:
567            sector = struct.pack('>l504xl', i // 512, i // 512)
568            file.write(sector)
569            i = i + 512
570
571def image_size(img: str) -> int:
572    """Return image's virtual size"""
573    value = qemu_img_info('-f', imgfmt, img)['virtual-size']
574    if not isinstance(value, int):
575        type_name = type(value).__name__
576        raise TypeError("Expected 'int' for 'virtual-size', "
577                        f"got '{value}' of type '{type_name}'")
578    return value
579
580def is_str(val):
581    return isinstance(val, str)
582
583test_dir_re = re.compile(r"%s" % test_dir)
584def filter_test_dir(msg):
585    return test_dir_re.sub("TEST_DIR", msg)
586
587win32_re = re.compile(r"\r")
588def filter_win32(msg):
589    return win32_re.sub("", msg)
590
591qemu_io_re = re.compile(r"[0-9]* ops; [0-9\/:. sec]* "
592                        r"\([0-9\/.inf]* [EPTGMKiBbytes]*\/sec "
593                        r"and [0-9\/.inf]* ops\/sec\)")
594def filter_qemu_io(msg):
595    msg = filter_win32(msg)
596    return qemu_io_re.sub("X ops; XX:XX:XX.X "
597                          "(XXX YYY/sec and XXX ops/sec)", msg)
598
599chown_re = re.compile(r"chown [0-9]+:[0-9]+")
600def filter_chown(msg):
601    return chown_re.sub("chown UID:GID", msg)
602
603def filter_qmp_event(event):
604    '''Filter the timestamp of a QMP event dict'''
605    event = dict(event)
606    if 'timestamp' in event:
607        event['timestamp']['seconds'] = 'SECS'
608        event['timestamp']['microseconds'] = 'USECS'
609    return event
610
611def filter_block_job(event):
612    '''Filter the offset and length of a QMP block job event dict'''
613    event = dict(event)
614    if 'data' in event:
615        if 'offset' in event['data']:
616            event['data']['offset'] = 'OFFSET'
617        if 'len' in event['data']:
618            event['data']['len'] = 'LEN'
619    return event
620
621def filter_qmp(qmsg, filter_fn):
622    '''Given a string filter, filter a QMP object's values.
623    filter_fn takes a (key, value) pair.'''
624    # Iterate through either lists or dicts;
625    if isinstance(qmsg, list):
626        items = enumerate(qmsg)
627    elif isinstance(qmsg, dict):
628        items = qmsg.items()
629    else:
630        return filter_fn(None, qmsg)
631
632    for k, v in items:
633        if isinstance(v, (dict, list)):
634            qmsg[k] = filter_qmp(v, filter_fn)
635        else:
636            qmsg[k] = filter_fn(k, v)
637    return qmsg
638
639def filter_testfiles(msg):
640    pref1 = os.path.join(test_dir, "%s-" % (os.getpid()))
641    pref2 = os.path.join(sock_dir, "%s-" % (os.getpid()))
642    return msg.replace(pref1, 'TEST_DIR/PID-').replace(pref2, 'SOCK_DIR/PID-')
643
644def filter_qmp_testfiles(qmsg):
645    def _filter(_key, value):
646        if is_str(value):
647            return filter_testfiles(value)
648        return value
649    return filter_qmp(qmsg, _filter)
650
651def filter_virtio_scsi(output: str) -> str:
652    return re.sub(r'(virtio-scsi)-(ccw|pci)', r'\1', output)
653
654def filter_qmp_virtio_scsi(qmsg):
655    def _filter(_key, value):
656        if is_str(value):
657            return filter_virtio_scsi(value)
658        return value
659    return filter_qmp(qmsg, _filter)
660
661def filter_generated_node_ids(msg):
662    return re.sub("#block[0-9]+", "NODE_NAME", msg)
663
664def filter_qmp_generated_node_ids(qmsg):
665    def _filter(_key, value):
666        if is_str(value):
667            return filter_generated_node_ids(value)
668        return value
669    return filter_qmp(qmsg, _filter)
670
671def filter_img_info(output: str, filename: str,
672                    drop_child_info: bool = True) -> str:
673    lines = []
674    drop_indented = False
675    for line in output.split('\n'):
676        if 'disk size' in line or 'actual-size' in line:
677            continue
678
679        # Drop child node info
680        if drop_indented:
681            if line.startswith(' '):
682                continue
683            drop_indented = False
684        if drop_child_info and "Child node '/" in line:
685            drop_indented = True
686            continue
687
688        line = line.replace(filename, 'TEST_IMG')
689        line = filter_testfiles(line)
690        line = line.replace(imgfmt, 'IMGFMT')
691        line = re.sub('iters: [0-9]+', 'iters: XXX', line)
692        line = re.sub('uuid: [-a-f0-9]+',
693                      'uuid: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX',
694                      line)
695        line = re.sub('cid: [0-9]+', 'cid: XXXXXXXXXX', line)
696        line = re.sub('(compression type: )(zlib|zstd)', r'\1COMPRESSION_TYPE',
697                      line)
698        lines.append(line)
699    return '\n'.join(lines)
700
701def filter_imgfmt(msg):
702    return msg.replace(imgfmt, 'IMGFMT')
703
704def filter_qmp_imgfmt(qmsg):
705    def _filter(_key, value):
706        if is_str(value):
707            return filter_imgfmt(value)
708        return value
709    return filter_qmp(qmsg, _filter)
710
711def filter_nbd_exports(output: str) -> str:
712    return re.sub(r'((min|opt|max) block): [0-9]+', r'\1: XXX', output)
713
714def filter_qtest(output: str) -> str:
715    output = re.sub(r'^\[I \d+\.\d+\] OPENED\n', '', output)
716    output = re.sub(r'\n?\[I \+\d+\.\d+\] CLOSED\n?$', '', output)
717    return output
718
719Msg = TypeVar('Msg', Dict[str, Any], List[Any], str)
720
721def log(msg: Msg,
722        filters: Iterable[Callable[[Msg], Msg]] = (),
723        indent: Optional[int] = None) -> None:
724    """
725    Logs either a string message or a JSON serializable message (like QMP).
726    If indent is provided, JSON serializable messages are pretty-printed.
727    """
728    for flt in filters:
729        msg = flt(msg)
730    if isinstance(msg, (dict, list)):
731        # Don't sort if it's already sorted
732        do_sort = not isinstance(msg, OrderedDict)
733        test_logger.info(json.dumps(msg, sort_keys=do_sort, indent=indent))
734    else:
735        test_logger.info(msg)
736
737class Timeout:
738    def __init__(self, seconds, errmsg="Timeout"):
739        self.seconds = seconds
740        self.errmsg = errmsg
741    def __enter__(self):
742        if qemu_gdb or qemu_valgrind:
743            return self
744        signal.signal(signal.SIGALRM, self.timeout)
745        signal.setitimer(signal.ITIMER_REAL, self.seconds)
746        return self
747    def __exit__(self, exc_type, value, traceback):
748        if qemu_gdb or qemu_valgrind:
749            return False
750        signal.setitimer(signal.ITIMER_REAL, 0)
751        return False
752    def timeout(self, signum, frame):
753        raise TimeoutError(self.errmsg)
754
755def file_pattern(name):
756    return "{0}-{1}".format(os.getpid(), name)
757
758class FilePath:
759    """
760    Context manager generating multiple file names. The generated files are
761    removed when exiting the context.
762
763    Example usage:
764
765        with FilePath('a.img', 'b.img') as (img_a, img_b):
766            # Use img_a and img_b here...
767
768        # a.img and b.img are automatically removed here.
769
770    By default images are created in iotests.test_dir. To create sockets use
771    iotests.sock_dir:
772
773       with FilePath('a.sock', base_dir=iotests.sock_dir) as sock:
774
775    For convenience, calling with one argument yields a single file instead of
776    a tuple with one item.
777
778    """
779    def __init__(self, *names, base_dir=test_dir):
780        self.paths = [os.path.join(base_dir, file_pattern(name))
781                      for name in names]
782
783    def __enter__(self):
784        if len(self.paths) == 1:
785            return self.paths[0]
786        else:
787            return self.paths
788
789    def __exit__(self, exc_type, exc_val, exc_tb):
790        for path in self.paths:
791            try:
792                os.remove(path)
793            except OSError:
794                pass
795        return False
796
797
798def try_remove(img):
799    try:
800        os.remove(img)
801    except OSError:
802        pass
803
804def file_path_remover():
805    for path in reversed(file_path_remover.paths):
806        try_remove(path)
807
808
809def file_path(*names, base_dir=test_dir):
810    ''' Another way to get auto-generated filename that cleans itself up.
811
812    Use is as simple as:
813
814    img_a, img_b = file_path('a.img', 'b.img')
815    sock = file_path('socket')
816    '''
817
818    if not hasattr(file_path_remover, 'paths'):
819        file_path_remover.paths = []
820        atexit.register(file_path_remover)
821
822    paths = []
823    for name in names:
824        filename = file_pattern(name)
825        path = os.path.join(base_dir, filename)
826        file_path_remover.paths.append(path)
827        paths.append(path)
828
829    return paths[0] if len(paths) == 1 else paths
830
831def remote_filename(path):
832    if imgproto == 'file':
833        return path
834    elif imgproto == 'ssh':
835        return "ssh://%s@127.0.0.1:22%s" % (os.environ.get('USER'), path)
836    else:
837        raise ValueError("Protocol %s not supported" % (imgproto))
838
839class VM(qtest.QEMUQtestMachine):
840    '''A QEMU VM'''
841
842    def __init__(self, path_suffix=''):
843        name = "qemu%s-%d" % (path_suffix, os.getpid())
844        timer = 15.0 if not (qemu_gdb or qemu_valgrind) else None
845        if qemu_gdb and qemu_valgrind:
846            sys.stderr.write('gdb and valgrind are mutually exclusive\n')
847            sys.exit(1)
848        wrapper = qemu_gdb if qemu_gdb else qemu_valgrind
849        super().__init__(qemu_prog, qemu_opts, wrapper=wrapper,
850                         name=name,
851                         base_temp_dir=test_dir,
852                         qmp_timer=timer)
853        self._num_drives = 0
854
855    def _post_shutdown(self) -> None:
856        super()._post_shutdown()
857        if not qemu_valgrind or not self._popen:
858            return
859        valgrind_filename = f"{test_dir}/{self._popen.pid}.valgrind"
860        if self.exitcode() == 99:
861            with open(valgrind_filename, encoding='utf-8') as f:
862                print(f.read())
863        else:
864            os.remove(valgrind_filename)
865
866    def _pre_launch(self) -> None:
867        super()._pre_launch()
868        if qemu_print:
869            # set QEMU binary output to stdout
870            self._close_qemu_log_file()
871
872    def add_object(self, opts):
873        self._args.append('-object')
874        self._args.append(opts)
875        return self
876
877    def add_device(self, opts):
878        self._args.append('-device')
879        self._args.append(opts)
880        return self
881
882    def add_drive_raw(self, opts):
883        self._args.append('-drive')
884        self._args.append(opts)
885        return self
886
887    def add_drive(self, path, opts='', interface='virtio', img_format=imgfmt):
888        '''Add a virtio-blk drive to the VM'''
889        options = ['if=%s' % interface,
890                   'id=drive%d' % self._num_drives]
891
892        if path is not None:
893            options.append('file=%s' % path)
894            options.append('format=%s' % img_format)
895            options.append('cache=%s' % cachemode)
896            options.append('aio=%s' % aiomode)
897
898        if opts:
899            options.append(opts)
900
901        if img_format == 'luks' and 'key-secret' not in opts:
902            # default luks support
903            if luks_default_secret_object not in self._args:
904                self.add_object(luks_default_secret_object)
905
906            options.append(luks_default_key_secret_opt)
907
908        self._args.append('-drive')
909        self._args.append(','.join(options))
910        self._num_drives += 1
911        return self
912
913    def add_blockdev(self, opts):
914        self._args.append('-blockdev')
915        if isinstance(opts, str):
916            self._args.append(opts)
917        else:
918            self._args.append(','.join(opts))
919        return self
920
921    def add_incoming(self, addr):
922        self._args.append('-incoming')
923        self._args.append(addr)
924        return self
925
926    def add_paused(self):
927        self._args.append('-S')
928        return self
929
930    def hmp(self, command_line: str, use_log: bool = False) -> QMPMessage:
931        cmd = 'human-monitor-command'
932        kwargs: Dict[str, Any] = {'command-line': command_line}
933        if use_log:
934            return self.qmp_log(cmd, **kwargs)
935        else:
936            return self.qmp(cmd, **kwargs)
937
938    def pause_drive(self, drive: str, event: Optional[str] = None) -> None:
939        """Pause drive r/w operations"""
940        if not event:
941            self.pause_drive(drive, "read_aio")
942            self.pause_drive(drive, "write_aio")
943            return
944        self.hmp(f'qemu-io {drive} "break {event} bp_{drive}"')
945
946    def resume_drive(self, drive: str) -> None:
947        """Resume drive r/w operations"""
948        self.hmp(f'qemu-io {drive} "remove_break bp_{drive}"')
949
950    def hmp_qemu_io(self, drive: str, cmd: str,
951                    use_log: bool = False, qdev: bool = False) -> QMPMessage:
952        """Write to a given drive using an HMP command"""
953        d = '-d ' if qdev else ''
954        return self.hmp(f'qemu-io {d}{drive} "{cmd}"', use_log=use_log)
955
956    def flatten_qmp_object(self, obj, output=None, basestr=''):
957        if output is None:
958            output = {}
959        if isinstance(obj, list):
960            for i, item in enumerate(obj):
961                self.flatten_qmp_object(item, output, basestr + str(i) + '.')
962        elif isinstance(obj, dict):
963            for key in obj:
964                self.flatten_qmp_object(obj[key], output, basestr + key + '.')
965        else:
966            output[basestr[:-1]] = obj # Strip trailing '.'
967        return output
968
969    def qmp_to_opts(self, obj):
970        obj = self.flatten_qmp_object(obj)
971        output_list = []
972        for key in obj:
973            output_list += [key + '=' + obj[key]]
974        return ','.join(output_list)
975
976    def get_qmp_events_filtered(self, wait=60.0):
977        result = []
978        for ev in self.get_qmp_events(wait=wait):
979            result.append(filter_qmp_event(ev))
980        return result
981
982    def qmp_log(self, cmd, filters=(), indent=None, **kwargs):
983        full_cmd = OrderedDict((
984            ("execute", cmd),
985            ("arguments", ordered_qmp(kwargs))
986        ))
987        log(full_cmd, filters, indent=indent)
988        result = self.qmp(cmd, **kwargs)
989        log(result, filters, indent=indent)
990        return result
991
992    # Returns None on success, and an error string on failure
993    def run_job(self, job: str, auto_finalize: bool = True,
994                auto_dismiss: bool = False,
995                pre_finalize: Optional[Callable[[], None]] = None,
996                cancel: bool = False, wait: float = 60.0,
997                filters: Iterable[Callable[[Any], Any]] = (),
998                ) -> Optional[str]:
999        """
1000        run_job moves a job from creation through to dismissal.
1001
1002        :param job: String. ID of recently-launched job
1003        :param auto_finalize: Bool. True if the job was launched with
1004                              auto_finalize. Defaults to True.
1005        :param auto_dismiss: Bool. True if the job was launched with
1006                             auto_dismiss=True. Defaults to False.
1007        :param pre_finalize: Callback. A callable that takes no arguments to be
1008                             invoked prior to issuing job-finalize, if any.
1009        :param cancel: Bool. When true, cancels the job after the pre_finalize
1010                       callback.
1011        :param wait: Float. Timeout value specifying how long to wait for any
1012                     event, in seconds. Defaults to 60.0.
1013        """
1014        match_device = {'data': {'device': job}}
1015        match_id = {'data': {'id': job}}
1016        events = [
1017            ('BLOCK_JOB_COMPLETED', match_device),
1018            ('BLOCK_JOB_CANCELLED', match_device),
1019            ('BLOCK_JOB_ERROR', match_device),
1020            ('BLOCK_JOB_READY', match_device),
1021            ('BLOCK_JOB_PENDING', match_id),
1022            ('JOB_STATUS_CHANGE', match_id)
1023        ]
1024        error = None
1025        while True:
1026            ev = filter_qmp_event(self.events_wait(events, timeout=wait))
1027            if ev['event'] != 'JOB_STATUS_CHANGE':
1028                log(ev, filters=filters)
1029                continue
1030            status = ev['data']['status']
1031            if status == 'aborting':
1032                result = self.qmp('query-jobs')
1033                for j in result['return']:
1034                    if j['id'] == job:
1035                        error = j['error']
1036                        log('Job failed: %s' % (j['error']), filters=filters)
1037            elif status == 'ready':
1038                self.qmp_log('job-complete', id=job, filters=filters)
1039            elif status == 'pending' and not auto_finalize:
1040                if pre_finalize:
1041                    pre_finalize()
1042                if cancel:
1043                    self.qmp_log('job-cancel', id=job, filters=filters)
1044                else:
1045                    self.qmp_log('job-finalize', id=job, filters=filters)
1046            elif status == 'concluded' and not auto_dismiss:
1047                self.qmp_log('job-dismiss', id=job, filters=filters)
1048            elif status == 'null':
1049                return error
1050
1051    # Returns None on success, and an error string on failure
1052    def blockdev_create(self, options, job_id='job0', filters=None):
1053        if filters is None:
1054            filters = [filter_qmp_testfiles]
1055        result = self.qmp_log('blockdev-create', filters=filters,
1056                              job_id=job_id, options=options)
1057
1058        if 'return' in result:
1059            assert result['return'] == {}
1060            job_result = self.run_job(job_id, filters=filters)
1061        else:
1062            job_result = result['error']
1063
1064        log("")
1065        return job_result
1066
1067    def enable_migration_events(self, name):
1068        log('Enabling migration QMP events on %s...' % name)
1069        log(self.qmp('migrate-set-capabilities', capabilities=[
1070            {
1071                'capability': 'events',
1072                'state': True
1073            }
1074        ]))
1075
1076    def wait_migration(self, expect_runstate: Optional[str]) -> bool:
1077        while True:
1078            event = self.event_wait('MIGRATION')
1079            # We use the default timeout, and with a timeout, event_wait()
1080            # never returns None
1081            assert event
1082
1083            log(event, filters=[filter_qmp_event])
1084            if event['data']['status'] in ('completed', 'failed'):
1085                break
1086
1087        if event['data']['status'] == 'completed':
1088            # The event may occur in finish-migrate, so wait for the expected
1089            # post-migration runstate
1090            runstate = None
1091            while runstate != expect_runstate:
1092                runstate = self.qmp('query-status')['return']['status']
1093            return True
1094        else:
1095            return False
1096
1097    def node_info(self, node_name):
1098        nodes = self.qmp('query-named-block-nodes')
1099        for x in nodes['return']:
1100            if x['node-name'] == node_name:
1101                return x
1102        return None
1103
1104    def query_bitmaps(self):
1105        res = self.qmp("query-named-block-nodes")
1106        return {device['node-name']: device['dirty-bitmaps']
1107                for device in res['return'] if 'dirty-bitmaps' in device}
1108
1109    def get_bitmap(self, node_name, bitmap_name, recording=None, bitmaps=None):
1110        """
1111        get a specific bitmap from the object returned by query_bitmaps.
1112        :param recording: If specified, filter results by the specified value.
1113        :param bitmaps: If specified, use it instead of call query_bitmaps()
1114        """
1115        if bitmaps is None:
1116            bitmaps = self.query_bitmaps()
1117
1118        for bitmap in bitmaps[node_name]:
1119            if bitmap.get('name', '') == bitmap_name:
1120                if recording is None or bitmap.get('recording') == recording:
1121                    return bitmap
1122        return None
1123
1124    def check_bitmap_status(self, node_name, bitmap_name, fields):
1125        ret = self.get_bitmap(node_name, bitmap_name)
1126
1127        return fields.items() <= ret.items()
1128
1129    def assert_block_path(self, root, path, expected_node, graph=None):
1130        """
1131        Check whether the node under the given path in the block graph
1132        is @expected_node.
1133
1134        @root is the node name of the node where the @path is rooted.
1135
1136        @path is a string that consists of child names separated by
1137        slashes.  It must begin with a slash.
1138
1139        Examples for @root + @path:
1140          - root="qcow2-node", path="/backing/file"
1141          - root="quorum-node", path="/children.2/file"
1142
1143        Hypothetically, @path could be empty, in which case it would
1144        point to @root.  However, in practice this case is not useful
1145        and hence not allowed.
1146
1147        @expected_node may be None.  (All elements of the path but the
1148        leaf must still exist.)
1149
1150        @graph may be None or the result of an x-debug-query-block-graph
1151        call that has already been performed.
1152        """
1153        if graph is None:
1154            graph = self.qmp('x-debug-query-block-graph')['return']
1155
1156        iter_path = iter(path.split('/'))
1157
1158        # Must start with a /
1159        assert next(iter_path) == ''
1160
1161        node = next((node for node in graph['nodes'] if node['name'] == root),
1162                    None)
1163
1164        # An empty @path is not allowed, so the root node must be present
1165        assert node is not None, 'Root node %s not found' % root
1166
1167        for child_name in iter_path:
1168            assert node is not None, 'Cannot follow path %s%s' % (root, path)
1169
1170            try:
1171                node_id = next(edge['child'] for edge in graph['edges']
1172                               if (edge['parent'] == node['id'] and
1173                                   edge['name'] == child_name))
1174
1175                node = next(node for node in graph['nodes']
1176                            if node['id'] == node_id)
1177
1178            except StopIteration:
1179                node = None
1180
1181        if node is None:
1182            assert expected_node is None, \
1183                   'No node found under %s (but expected %s)' % \
1184                   (path, expected_node)
1185        else:
1186            assert node['name'] == expected_node, \
1187                   'Found node %s under %s (but expected %s)' % \
1188                   (node['name'], path, expected_node)
1189
1190index_re = re.compile(r'([^\[]+)\[([^\]]+)\]')
1191
1192class QMPTestCase(unittest.TestCase):
1193    '''Abstract base class for QMP test cases'''
1194
1195    def __init__(self, *args, **kwargs):
1196        super().__init__(*args, **kwargs)
1197        # Many users of this class set a VM property we rely on heavily
1198        # in the methods below.
1199        self.vm = None
1200
1201    def dictpath(self, d, path):
1202        '''Traverse a path in a nested dict'''
1203        for component in path.split('/'):
1204            m = index_re.match(component)
1205            if m:
1206                component, idx = m.groups()
1207                idx = int(idx)
1208
1209            if not isinstance(d, dict) or component not in d:
1210                self.fail(f'failed path traversal for "{path}" in "{d}"')
1211            d = d[component]
1212
1213            if m:
1214                if not isinstance(d, list):
1215                    self.fail(f'path component "{component}" in "{path}" '
1216                              f'is not a list in "{d}"')
1217                try:
1218                    d = d[idx]
1219                except IndexError:
1220                    self.fail(f'invalid index "{idx}" in path "{path}" '
1221                              f'in "{d}"')
1222        return d
1223
1224    def assert_qmp_absent(self, d, path):
1225        try:
1226            result = self.dictpath(d, path)
1227        except AssertionError:
1228            return
1229        self.fail('path "%s" has value "%s"' % (path, str(result)))
1230
1231    def assert_qmp(self, d, path, value):
1232        '''Assert that the value for a specific path in a QMP dict
1233           matches.  When given a list of values, assert that any of
1234           them matches.'''
1235
1236        result = self.dictpath(d, path)
1237
1238        # [] makes no sense as a list of valid values, so treat it as
1239        # an actual single value.
1240        if isinstance(value, list) and value != []:
1241            for v in value:
1242                if result == v:
1243                    return
1244            self.fail('no match for "%s" in %s' % (str(result), str(value)))
1245        else:
1246            self.assertEqual(result, value,
1247                             '"%s" is "%s", expected "%s"'
1248                             % (path, str(result), str(value)))
1249
1250    def assert_no_active_block_jobs(self):
1251        result = self.vm.qmp('query-block-jobs')
1252        self.assert_qmp(result, 'return', [])
1253
1254    def assert_has_block_node(self, node_name=None, file_name=None):
1255        """Issue a query-named-block-nodes and assert node_name and/or
1256        file_name is present in the result"""
1257        def check_equal_or_none(a, b):
1258            return a is None or b is None or a == b
1259        assert node_name or file_name
1260        result = self.vm.qmp('query-named-block-nodes')
1261        for x in result["return"]:
1262            if check_equal_or_none(x.get("node-name"), node_name) and \
1263                    check_equal_or_none(x.get("file"), file_name):
1264                return
1265        self.fail("Cannot find %s %s in result:\n%s" %
1266                  (node_name, file_name, result))
1267
1268    def assert_json_filename_equal(self, json_filename, reference):
1269        '''Asserts that the given filename is a json: filename and that its
1270           content is equal to the given reference object'''
1271        self.assertEqual(json_filename[:5], 'json:')
1272        self.assertEqual(
1273            self.vm.flatten_qmp_object(json.loads(json_filename[5:])),
1274            self.vm.flatten_qmp_object(reference)
1275        )
1276
1277    def cancel_and_wait(self, drive='drive0', force=False,
1278                        resume=False, wait=60.0):
1279        '''Cancel a block job and wait for it to finish, returning the event'''
1280        self.vm.cmd('block-job-cancel', device=drive, force=force)
1281
1282        if resume:
1283            self.vm.resume_drive(drive)
1284
1285        cancelled = False
1286        result = None
1287        while not cancelled:
1288            for event in self.vm.get_qmp_events(wait=wait):
1289                if event['event'] == 'BLOCK_JOB_COMPLETED' or \
1290                   event['event'] == 'BLOCK_JOB_CANCELLED':
1291                    self.assert_qmp(event, 'data/device', drive)
1292                    result = event
1293                    cancelled = True
1294                elif event['event'] == 'JOB_STATUS_CHANGE':
1295                    self.assert_qmp(event, 'data/id', drive)
1296
1297
1298        self.assert_no_active_block_jobs()
1299        return result
1300
1301    def wait_until_completed(self, drive='drive0', check_offset=True,
1302                             wait=60.0, error=None):
1303        '''Wait for a block job to finish, returning the event'''
1304        while True:
1305            for event in self.vm.get_qmp_events(wait=wait):
1306                if event['event'] == 'BLOCK_JOB_COMPLETED':
1307                    self.assert_qmp(event, 'data/device', drive)
1308                    if error is None:
1309                        self.assert_qmp_absent(event, 'data/error')
1310                        if check_offset:
1311                            self.assert_qmp(event, 'data/offset',
1312                                            event['data']['len'])
1313                    else:
1314                        self.assert_qmp(event, 'data/error', error)
1315                    self.assert_no_active_block_jobs()
1316                    return event
1317                if event['event'] == 'JOB_STATUS_CHANGE':
1318                    self.assert_qmp(event, 'data/id', drive)
1319
1320    def wait_ready(self, drive='drive0'):
1321        """Wait until a BLOCK_JOB_READY event, and return the event."""
1322        return self.vm.events_wait([
1323            ('BLOCK_JOB_READY',
1324             {'data': {'type': 'mirror', 'device': drive}}),
1325            ('BLOCK_JOB_READY',
1326             {'data': {'type': 'commit', 'device': drive}})
1327        ])
1328
1329    def wait_ready_and_cancel(self, drive='drive0'):
1330        self.wait_ready(drive=drive)
1331        event = self.cancel_and_wait(drive=drive)
1332        self.assertEqual(event['event'], 'BLOCK_JOB_COMPLETED')
1333        self.assert_qmp(event, 'data/type', 'mirror')
1334        self.assert_qmp(event, 'data/offset', event['data']['len'])
1335
1336    def complete_and_wait(self, drive='drive0', wait_ready=True,
1337                          completion_error=None):
1338        '''Complete a block job and wait for it to finish'''
1339        if wait_ready:
1340            self.wait_ready(drive=drive)
1341
1342        self.vm.cmd('block-job-complete', device=drive)
1343
1344        event = self.wait_until_completed(drive=drive, error=completion_error)
1345        self.assertTrue(event['data']['type'] in ['mirror', 'commit'])
1346
1347    def pause_wait(self, job_id='job0'):
1348        with Timeout(3, "Timeout waiting for job to pause"):
1349            while True:
1350                result = self.vm.qmp('query-block-jobs')
1351                found = False
1352                for job in result['return']:
1353                    if job['device'] == job_id:
1354                        found = True
1355                        if job['paused'] and not job['busy']:
1356                            return job
1357                        break
1358                assert found
1359
1360    def pause_job(self, job_id='job0', wait=True):
1361        self.vm.cmd('block-job-pause', device=job_id)
1362        if wait:
1363            self.pause_wait(job_id)
1364
1365    def case_skip(self, reason):
1366        '''Skip this test case'''
1367        case_notrun(reason)
1368        self.skipTest(reason)
1369
1370
1371def notrun(reason):
1372    '''Skip this test suite'''
1373    # Each test in qemu-iotests has a number ("seq")
1374    seq = os.path.basename(sys.argv[0])
1375
1376    with open('%s/%s.notrun' % (test_dir, seq), 'w', encoding='utf-8') \
1377            as outfile:
1378        outfile.write(reason + '\n')
1379    logger.warning("%s not run: %s", seq, reason)
1380    sys.exit(0)
1381
1382def case_notrun(reason):
1383    '''Mark this test case as not having been run (without actually
1384    skipping it, that is left to the caller).  See
1385    QMPTestCase.case_skip() for a variant that actually skips the
1386    current test case.'''
1387
1388    # Each test in qemu-iotests has a number ("seq")
1389    seq = os.path.basename(sys.argv[0])
1390
1391    with open('%s/%s.casenotrun' % (test_dir, seq), 'a', encoding='utf-8') \
1392            as outfile:
1393        outfile.write('    [case not run] ' + reason + '\n')
1394
1395def _verify_image_format(supported_fmts: Sequence[str] = (),
1396                         unsupported_fmts: Sequence[str] = ()) -> None:
1397    if 'generic' in supported_fmts and \
1398            os.environ.get('IMGFMT_GENERIC', 'true') == 'true':
1399        # similar to
1400        #   _supported_fmt generic
1401        # for bash tests
1402        supported_fmts = ()
1403
1404    not_sup = supported_fmts and (imgfmt not in supported_fmts)
1405    if not_sup or (imgfmt in unsupported_fmts):
1406        notrun('not suitable for this image format: %s' % imgfmt)
1407
1408    if imgfmt == 'luks':
1409        verify_working_luks()
1410
1411def _verify_protocol(supported: Sequence[str] = (),
1412                     unsupported: Sequence[str] = ()) -> None:
1413    assert not (supported and unsupported)
1414
1415    if 'generic' in supported:
1416        return
1417
1418    not_sup = supported and (imgproto not in supported)
1419    if not_sup or (imgproto in unsupported):
1420        notrun('not suitable for this protocol: %s' % imgproto)
1421
1422def _verify_platform(supported: Sequence[str] = (),
1423                     unsupported: Sequence[str] = ()) -> None:
1424    if any((sys.platform.startswith(x) for x in unsupported)):
1425        notrun('not suitable for this OS: %s' % sys.platform)
1426
1427    if supported:
1428        if not any((sys.platform.startswith(x) for x in supported)):
1429            notrun('not suitable for this OS: %s' % sys.platform)
1430
1431def _verify_cache_mode(supported_cache_modes: Sequence[str] = ()) -> None:
1432    if supported_cache_modes and (cachemode not in supported_cache_modes):
1433        notrun('not suitable for this cache mode: %s' % cachemode)
1434
1435def _verify_aio_mode(supported_aio_modes: Sequence[str] = ()) -> None:
1436    if supported_aio_modes and (aiomode not in supported_aio_modes):
1437        notrun('not suitable for this aio mode: %s' % aiomode)
1438
1439def _verify_formats(required_formats: Sequence[str] = ()) -> None:
1440    usf_list = list(set(required_formats) - set(supported_formats()))
1441    if usf_list:
1442        notrun(f'formats {usf_list} are not whitelisted')
1443
1444
1445def _verify_virtio_blk() -> None:
1446    out = qemu_pipe('-M', 'none', '-device', 'help')
1447    if 'virtio-blk' not in out:
1448        notrun('Missing virtio-blk in QEMU binary')
1449
1450def verify_virtio_scsi_pci_or_ccw() -> None:
1451    out = qemu_pipe('-M', 'none', '-device', 'help')
1452    if 'virtio-scsi-pci' not in out and 'virtio-scsi-ccw' not in out:
1453        notrun('Missing virtio-scsi-pci or virtio-scsi-ccw in QEMU binary')
1454
1455
1456def _verify_imgopts(unsupported: Sequence[str] = ()) -> None:
1457    imgopts = os.environ.get('IMGOPTS')
1458    # One of usage examples for IMGOPTS is "data_file=$TEST_IMG.ext_data_file"
1459    # but it supported only for bash tests. We don't have a concept of global
1460    # TEST_IMG in iotests.py, not saying about somehow parsing $variables.
1461    # So, for simplicity let's just not support any IMGOPTS with '$' inside.
1462    unsup = list(unsupported) + ['$']
1463    if imgopts and any(x in imgopts for x in unsup):
1464        notrun(f'not suitable for this imgopts: {imgopts}')
1465
1466
1467def supports_quorum() -> bool:
1468    return 'quorum' in qemu_img('--help').stdout
1469
1470def verify_quorum():
1471    '''Skip test suite if quorum support is not available'''
1472    if not supports_quorum():
1473        notrun('quorum support missing')
1474
1475def has_working_luks() -> Tuple[bool, str]:
1476    """
1477    Check whether our LUKS driver can actually create images
1478    (this extends to LUKS encryption for qcow2).
1479
1480    If not, return the reason why.
1481    """
1482
1483    img_file = f'{test_dir}/luks-test.luks'
1484    res = qemu_img('create', '-f', 'luks',
1485                   '--object', luks_default_secret_object,
1486                   '-o', luks_default_key_secret_opt,
1487                   '-o', 'iter-time=10',
1488                   img_file, '1G',
1489                   check=False)
1490    try:
1491        os.remove(img_file)
1492    except OSError:
1493        pass
1494
1495    if res.returncode:
1496        reason = res.stdout
1497        for line in res.stdout.splitlines():
1498            if img_file + ':' in line:
1499                reason = line.split(img_file + ':', 1)[1].strip()
1500                break
1501
1502        return (False, reason)
1503    else:
1504        return (True, '')
1505
1506def verify_working_luks():
1507    """
1508    Skip test suite if LUKS does not work
1509    """
1510    (working, reason) = has_working_luks()
1511    if not working:
1512        notrun(reason)
1513
1514def supports_qcow2_zstd_compression() -> bool:
1515    img_file = f'{test_dir}/qcow2-zstd-test.qcow2'
1516    res = qemu_img('create', '-f', 'qcow2', '-o', 'compression_type=zstd',
1517                   img_file, '0',
1518                   check=False)
1519    try:
1520        os.remove(img_file)
1521    except OSError:
1522        pass
1523
1524    if res.returncode == 1 and \
1525            "'compression-type' does not accept value 'zstd'" in res.stdout:
1526        return False
1527    else:
1528        return True
1529
1530def verify_qcow2_zstd_compression():
1531    if not supports_qcow2_zstd_compression():
1532        notrun('zstd compression not supported')
1533
1534def qemu_pipe(*args: str) -> str:
1535    """
1536    Run qemu with an option to print something and exit (e.g. a help option).
1537
1538    :return: QEMU's stdout output.
1539    """
1540    full_args = [qemu_prog] + qemu_opts + list(args)
1541    output, _ = qemu_tool_pipe_and_status('qemu', full_args)
1542    return output
1543
1544def supported_formats(read_only=False):
1545    '''Set 'read_only' to True to check ro-whitelist
1546       Otherwise, rw-whitelist is checked'''
1547
1548    if not hasattr(supported_formats, "formats"):
1549        supported_formats.formats = {}
1550
1551    if read_only not in supported_formats.formats:
1552        format_message = qemu_pipe("-drive", "format=help")
1553        line = 1 if read_only else 0
1554        supported_formats.formats[read_only] = \
1555            format_message.splitlines()[line].split(":")[1].split()
1556
1557    return supported_formats.formats[read_only]
1558
1559def skip_if_unsupported(required_formats=(), read_only=False):
1560    '''Skip Test Decorator
1561       Runs the test if all the required formats are whitelisted'''
1562    def skip_test_decorator(func):
1563        def func_wrapper(test_case: QMPTestCase, *args: List[Any],
1564                         **kwargs: Dict[str, Any]) -> None:
1565            if callable(required_formats):
1566                fmts = required_formats(test_case)
1567            else:
1568                fmts = required_formats
1569
1570            usf_list = list(set(fmts) - set(supported_formats(read_only)))
1571            if usf_list:
1572                msg = f'{test_case}: formats {usf_list} are not whitelisted'
1573                test_case.case_skip(msg)
1574            else:
1575                func(test_case, *args, **kwargs)
1576        return func_wrapper
1577    return skip_test_decorator
1578
1579def skip_for_formats(formats: Sequence[str] = ()) \
1580    -> Callable[[Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]],
1581                Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]]:
1582    '''Skip Test Decorator
1583       Skips the test for the given formats'''
1584    def skip_test_decorator(func):
1585        def func_wrapper(test_case: QMPTestCase, *args: List[Any],
1586                         **kwargs: Dict[str, Any]) -> None:
1587            if imgfmt in formats:
1588                msg = f'{test_case}: Skipped for format {imgfmt}'
1589                test_case.case_skip(msg)
1590            else:
1591                func(test_case, *args, **kwargs)
1592        return func_wrapper
1593    return skip_test_decorator
1594
1595def skip_if_user_is_root(func):
1596    '''Skip Test Decorator
1597       Runs the test only without root permissions'''
1598    def func_wrapper(*args, **kwargs):
1599        if os.getuid() == 0:
1600            case_notrun('{}: cannot be run as root'.format(args[0]))
1601            return None
1602        else:
1603            return func(*args, **kwargs)
1604    return func_wrapper
1605
1606# We need to filter out the time taken from the output so that
1607# qemu-iotest can reliably diff the results against master output,
1608# and hide skipped tests from the reference output.
1609
1610class ReproducibleTestResult(unittest.TextTestResult):
1611    def addSkip(self, test, reason):
1612        # Same as TextTestResult, but print dot instead of "s"
1613        unittest.TestResult.addSkip(self, test, reason)
1614        if self.showAll:
1615            self.stream.writeln("skipped {0!r}".format(reason))
1616        elif self.dots:
1617            self.stream.write(".")
1618            self.stream.flush()
1619
1620class ReproducibleStreamWrapper:
1621    def __init__(self, stream: TextIO):
1622        self.stream = stream
1623
1624    def __getattr__(self, attr):
1625        if attr in ('stream', '__getstate__'):
1626            raise AttributeError(attr)
1627        return getattr(self.stream, attr)
1628
1629    def write(self, arg=None):
1630        arg = re.sub(r'Ran (\d+) tests? in [\d.]+s', r'Ran \1 tests', arg)
1631        arg = re.sub(r' \(skipped=\d+\)', r'', arg)
1632        self.stream.write(arg)
1633
1634class ReproducibleTestRunner(unittest.TextTestRunner):
1635    def __init__(
1636        self,
1637        stream: Optional[TextIO] = None,
1638        resultclass: Type[unittest.TextTestResult] =
1639        ReproducibleTestResult,
1640        **kwargs: Any
1641    ) -> None:
1642        rstream = ReproducibleStreamWrapper(stream or sys.stdout)
1643        super().__init__(stream=rstream,           # type: ignore
1644                         descriptions=True,
1645                         resultclass=resultclass,
1646                         **kwargs)
1647
1648def execute_unittest(argv: List[str], debug: bool = False) -> None:
1649    """Executes unittests within the calling module."""
1650
1651    # Some tests have warnings, especially ResourceWarnings for unclosed
1652    # files and sockets.  Ignore them for now to ensure reproducibility of
1653    # the test output.
1654    unittest.main(argv=argv,
1655                  testRunner=ReproducibleTestRunner,
1656                  verbosity=2 if debug else 1,
1657                  warnings=None if sys.warnoptions else 'ignore')
1658
1659def execute_setup_common(supported_fmts: Sequence[str] = (),
1660                         supported_platforms: Sequence[str] = (),
1661                         supported_cache_modes: Sequence[str] = (),
1662                         supported_aio_modes: Sequence[str] = (),
1663                         unsupported_fmts: Sequence[str] = (),
1664                         supported_protocols: Sequence[str] = (),
1665                         unsupported_protocols: Sequence[str] = (),
1666                         required_fmts: Sequence[str] = (),
1667                         unsupported_imgopts: Sequence[str] = ()) -> bool:
1668    """
1669    Perform necessary setup for either script-style or unittest-style tests.
1670
1671    :return: Bool; Whether or not debug mode has been requested via the CLI.
1672    """
1673    # Note: Python 3.6 and pylint do not like 'Collection' so use 'Sequence'.
1674
1675    debug = '-d' in sys.argv
1676    if debug:
1677        sys.argv.remove('-d')
1678    logging.basicConfig(level=(logging.DEBUG if debug else logging.WARN))
1679
1680    _verify_image_format(supported_fmts, unsupported_fmts)
1681    _verify_protocol(supported_protocols, unsupported_protocols)
1682    _verify_platform(supported=supported_platforms)
1683    _verify_cache_mode(supported_cache_modes)
1684    _verify_aio_mode(supported_aio_modes)
1685    _verify_formats(required_fmts)
1686    _verify_virtio_blk()
1687    _verify_imgopts(unsupported_imgopts)
1688
1689    return debug
1690
1691def execute_test(*args, test_function=None, **kwargs):
1692    """Run either unittest or script-style tests."""
1693
1694    debug = execute_setup_common(*args, **kwargs)
1695    if not test_function:
1696        execute_unittest(sys.argv, debug)
1697    else:
1698        test_function()
1699
1700def activate_logging():
1701    """Activate iotests.log() output to stdout for script-style tests."""
1702    handler = logging.StreamHandler(stream=sys.stdout)
1703    formatter = logging.Formatter('%(message)s')
1704    handler.setFormatter(formatter)
1705    test_logger.addHandler(handler)
1706    test_logger.setLevel(logging.INFO)
1707    test_logger.propagate = False
1708
1709# This is called from script-style iotests without a single point of entry
1710def script_initialize(*args, **kwargs):
1711    """Initialize script-style tests without running any tests."""
1712    activate_logging()
1713    execute_setup_common(*args, **kwargs)
1714
1715# This is called from script-style iotests with a single point of entry
1716def script_main(test_function, *args, **kwargs):
1717    """Run script-style tests outside of the unittest framework"""
1718    activate_logging()
1719    execute_test(*args, test_function=test_function, **kwargs)
1720
1721# This is called from unittest style iotests
1722def main(*args, **kwargs):
1723    """Run tests using the unittest framework"""
1724    execute_test(*args, **kwargs)
1725