xref: /qemu/tests/qemu-iotests/iotests.py (revision d128c341a744ba3e92fa67d9f1b02dd9a7bd68b9)
1# Common utilities and Python wrappers for qemu-iotests
2#
3# Copyright (C) 2012 IBM Corp.
4#
5# This program is free software; you can redistribute it and/or modify
6# it under the terms of the GNU General Public License as published by
7# the Free Software Foundation; either version 2 of the License, or
8# (at your option) any later version.
9#
10# This program is distributed in the hope that it will be useful,
11# but WITHOUT ANY WARRANTY; without even the implied warranty of
12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13# GNU General Public License for more details.
14#
15# You should have received a copy of the GNU General Public License
16# along with this program.  If not, see <http://www.gnu.org/licenses/>.
17#
18
19import argparse
20import atexit
21import bz2
22from collections import OrderedDict
23import faulthandler
24import json
25import logging
26import os
27import re
28import shutil
29import signal
30import struct
31import subprocess
32import sys
33import time
34from typing import (Any, Callable, Dict, Iterable, Iterator,
35                    List, Optional, Sequence, TextIO, Tuple, Type, TypeVar)
36import unittest
37
38from contextlib import contextmanager
39
40from qemu.machine import qtest
41from qemu.qmp.legacy import QMPMessage, QMPReturnValue, QEMUMonitorProtocol
42from qemu.utils import VerboseProcessError
43
44# Use this logger for logging messages directly from the iotests module
45logger = logging.getLogger('qemu.iotests')
46logger.addHandler(logging.NullHandler())
47
48# Use this logger for messages that ought to be used for diff output.
49test_logger = logging.getLogger('qemu.iotests.diff_io')
50
51
52faulthandler.enable()
53
54# This will not work if arguments contain spaces but is necessary if we
55# want to support the override options that ./check supports.
56qemu_img_args = [os.environ.get('QEMU_IMG_PROG', 'qemu-img')]
57if os.environ.get('QEMU_IMG_OPTIONS'):
58    qemu_img_args += os.environ['QEMU_IMG_OPTIONS'].strip().split(' ')
59
60qemu_io_args = [os.environ.get('QEMU_IO_PROG', 'qemu-io')]
61if os.environ.get('QEMU_IO_OPTIONS'):
62    qemu_io_args += os.environ['QEMU_IO_OPTIONS'].strip().split(' ')
63
64qemu_io_args_no_fmt = [os.environ.get('QEMU_IO_PROG', 'qemu-io')]
65if os.environ.get('QEMU_IO_OPTIONS_NO_FMT'):
66    qemu_io_args_no_fmt += \
67        os.environ['QEMU_IO_OPTIONS_NO_FMT'].strip().split(' ')
68
69qemu_nbd_prog = os.environ.get('QEMU_NBD_PROG', 'qemu-nbd')
70qemu_nbd_args = [qemu_nbd_prog]
71if os.environ.get('QEMU_NBD_OPTIONS'):
72    qemu_nbd_args += os.environ['QEMU_NBD_OPTIONS'].strip().split(' ')
73
74qemu_prog = os.environ.get('QEMU_PROG', 'qemu')
75qemu_opts = os.environ.get('QEMU_OPTIONS', '').strip().split(' ')
76
77qsd_prog = os.environ.get('QSD_PROG', 'qemu-storage-daemon')
78
79gdb_qemu_env = os.environ.get('GDB_OPTIONS')
80qemu_gdb = []
81if gdb_qemu_env:
82    qemu_gdb = ['gdbserver'] + gdb_qemu_env.strip().split(' ')
83
84qemu_print = os.environ.get('PRINT_QEMU', False)
85
86imgfmt = os.environ.get('IMGFMT', 'raw')
87imgproto = os.environ.get('IMGPROTO', 'file')
88
89try:
90    test_dir = os.environ['TEST_DIR']
91    sock_dir = os.environ['SOCK_DIR']
92    cachemode = os.environ['CACHEMODE']
93    aiomode = os.environ['AIOMODE']
94    qemu_default_machine = os.environ['QEMU_DEFAULT_MACHINE']
95except KeyError:
96    # We are using these variables as proxies to indicate that we're
97    # not being run via "check". There may be other things set up by
98    # "check" that individual test cases rely on.
99    sys.stderr.write('Please run this test via the "check" script\n')
100    sys.exit(os.EX_USAGE)
101
102qemu_valgrind = []
103if os.environ.get('VALGRIND_QEMU') == "y" and \
104    os.environ.get('NO_VALGRIND') != "y":
105    valgrind_logfile = "--log-file=" + test_dir
106    # %p allows to put the valgrind process PID, since
107    # we don't know it a priori (subprocess.Popen is
108    # not yet invoked)
109    valgrind_logfile += "/%p.valgrind"
110
111    qemu_valgrind = ['valgrind', valgrind_logfile, '--error-exitcode=99']
112
113luks_default_secret_object = 'secret,id=keysec0,data=' + \
114                             os.environ.get('IMGKEYSECRET', '')
115luks_default_key_secret_opt = 'key-secret=keysec0'
116
117sample_img_dir = os.environ['SAMPLE_IMG_DIR']
118
119
120@contextmanager
121def change_log_level(
122        logger_name: str, level: int = logging.CRITICAL) -> Iterator[None]:
123    """
124    Utility function for temporarily changing the log level of a logger.
125
126    This can be used to silence errors that are expected or uninteresting.
127    """
128    _logger = logging.getLogger(logger_name)
129    current_level = _logger.level
130    _logger.setLevel(level)
131
132    try:
133        yield
134    finally:
135        _logger.setLevel(current_level)
136
137
138def unarchive_sample_image(sample, fname):
139    sample_fname = os.path.join(sample_img_dir, sample + '.bz2')
140    with bz2.open(sample_fname) as f_in, open(fname, 'wb') as f_out:
141        shutil.copyfileobj(f_in, f_out)
142
143
144def qemu_tool_popen(args: Sequence[str],
145                    connect_stderr: bool = True) -> 'subprocess.Popen[str]':
146    stderr = subprocess.STDOUT if connect_stderr else None
147    # pylint: disable=consider-using-with
148    return subprocess.Popen(args,
149                            stdout=subprocess.PIPE,
150                            stderr=stderr,
151                            universal_newlines=True)
152
153
154def qemu_tool_pipe_and_status(tool: str, args: Sequence[str],
155                              connect_stderr: bool = True,
156                              drop_successful_output: bool = False) \
157        -> Tuple[str, int]:
158    """
159    Run a tool and return both its output and its exit code
160    """
161    with qemu_tool_popen(args, connect_stderr) as subp:
162        output = subp.communicate()[0]
163        if subp.returncode < 0:
164            cmd = ' '.join(args)
165            sys.stderr.write(f'{tool} received signal \
166                               {-subp.returncode}: {cmd}\n')
167        if drop_successful_output and subp.returncode == 0:
168            output = ''
169        return (output, subp.returncode)
170
171def qemu_img_create_prepare_args(args: List[str]) -> List[str]:
172    if not args or args[0] != 'create':
173        return list(args)
174    args = args[1:]
175
176    p = argparse.ArgumentParser(allow_abbrev=False)
177    # -o option may be specified several times
178    p.add_argument('-o', action='append', default=[])
179    p.add_argument('-f')
180    parsed, remaining = p.parse_known_args(args)
181
182    opts_list = parsed.o
183
184    result = ['create']
185    if parsed.f is not None:
186        result += ['-f', parsed.f]
187
188    # IMGOPTS most probably contain options specific for the selected format,
189    # like extended_l2 or compression_type for qcow2. Test may want to create
190    # additional images in other formats that doesn't support these options.
191    # So, use IMGOPTS only for images created in imgfmt format.
192    imgopts = os.environ.get('IMGOPTS')
193    if imgopts and parsed.f == imgfmt:
194        opts_list.insert(0, imgopts)
195
196    # default luks support
197    if parsed.f == 'luks' and \
198            all('key-secret' not in opts for opts in opts_list):
199        result += ['--object', luks_default_secret_object]
200        opts_list.append(luks_default_key_secret_opt)
201
202    for opts in opts_list:
203        result += ['-o', opts]
204
205    result += remaining
206
207    return result
208
209
210def qemu_tool(*args: str, check: bool = True, combine_stdio: bool = True
211              ) -> 'subprocess.CompletedProcess[str]':
212    """
213    Run a qemu tool and return its status code and console output.
214
215    :param args: full command line to run.
216    :param check: Enforce a return code of zero.
217    :param combine_stdio: set to False to keep stdout/stderr separated.
218
219    :raise VerboseProcessError:
220        When the return code is negative, or on any non-zero exit code
221        when 'check=True' was provided (the default). This exception has
222        'stdout', 'stderr', and 'returncode' properties that may be
223        inspected to show greater detail. If this exception is not
224        handled, the command-line, return code, and all console output
225        will be included at the bottom of the stack trace.
226
227    :return:
228        a CompletedProcess. This object has args, returncode, and stdout
229        properties. If streams are not combined, it will also have a
230        stderr property.
231    """
232    subp = subprocess.run(
233        args,
234        stdout=subprocess.PIPE,
235        stderr=subprocess.STDOUT if combine_stdio else subprocess.PIPE,
236        universal_newlines=True,
237        check=False
238    )
239
240    if check and subp.returncode or (subp.returncode < 0):
241        raise VerboseProcessError(
242            subp.returncode, args,
243            output=subp.stdout,
244            stderr=subp.stderr,
245        )
246
247    return subp
248
249
250def qemu_img(*args: str, check: bool = True, combine_stdio: bool = True
251             ) -> 'subprocess.CompletedProcess[str]':
252    """
253    Run QEMU_IMG_PROG and return its status code and console output.
254
255    This function always prepends QEMU_IMG_OPTIONS and may further alter
256    the args for 'create' commands.
257
258    See `qemu_tool()` for greater detail.
259    """
260    full_args = qemu_img_args + qemu_img_create_prepare_args(list(args))
261    return qemu_tool(*full_args, check=check, combine_stdio=combine_stdio)
262
263
264def ordered_qmp(qmsg, conv_keys=True):
265    # Dictionaries are not ordered prior to 3.6, therefore:
266    if isinstance(qmsg, list):
267        return [ordered_qmp(atom) for atom in qmsg]
268    if isinstance(qmsg, dict):
269        od = OrderedDict()
270        for k, v in sorted(qmsg.items()):
271            if conv_keys:
272                k = k.replace('_', '-')
273            od[k] = ordered_qmp(v, conv_keys=False)
274        return od
275    return qmsg
276
277def qemu_img_create(*args: str) -> 'subprocess.CompletedProcess[str]':
278    return qemu_img('create', *args)
279
280def qemu_img_json(*args: str) -> Any:
281    """
282    Run qemu-img and return its output as deserialized JSON.
283
284    :raise CalledProcessError:
285        When qemu-img crashes, or returns a non-zero exit code without
286        producing a valid JSON document to stdout.
287    :raise JSONDecoderError:
288        When qemu-img returns 0, but failed to produce a valid JSON document.
289
290    :return: A deserialized JSON object; probably a dict[str, Any].
291    """
292    try:
293        res = qemu_img(*args, combine_stdio=False)
294    except subprocess.CalledProcessError as exc:
295        # Terminated due to signal. Don't bother.
296        if exc.returncode < 0:
297            raise
298
299        # Commands like 'check' can return failure (exit codes 2 and 3)
300        # to indicate command completion, but with errors found. For
301        # multi-command flexibility, ignore the exact error codes and
302        # *try* to load JSON.
303        try:
304            return json.loads(exc.stdout)
305        except json.JSONDecodeError:
306            # Nope. This thing is toast. Raise the /process/ error.
307            pass
308        raise
309
310    return json.loads(res.stdout)
311
312def qemu_img_measure(*args: str) -> Any:
313    return qemu_img_json("measure", "--output", "json", *args)
314
315def qemu_img_check(*args: str) -> Any:
316    return qemu_img_json("check", "--output", "json", *args)
317
318def qemu_img_info(*args: str) -> Any:
319    return qemu_img_json('info', "--output", "json", *args)
320
321def qemu_img_map(*args: str) -> Any:
322    return qemu_img_json('map', "--output", "json", *args)
323
324def qemu_img_log(*args: str, check: bool = True
325                 ) -> 'subprocess.CompletedProcess[str]':
326    result = qemu_img(*args, check=check)
327    log(result.stdout, filters=[filter_testfiles])
328    return result
329
330def img_info_log(filename: str, filter_path: Optional[str] = None,
331                 use_image_opts: bool = False, extra_args: Sequence[str] = (),
332                 check: bool = True, drop_child_info: bool = True,
333                 ) -> None:
334    args = ['info']
335    if use_image_opts:
336        args.append('--image-opts')
337    else:
338        args += ['-f', imgfmt]
339    args += extra_args
340    args.append(filename)
341
342    output = qemu_img(*args, check=check).stdout
343    if not filter_path:
344        filter_path = filename
345    log(filter_img_info(output, filter_path, drop_child_info))
346
347def qemu_io_wrap_args(args: Sequence[str]) -> List[str]:
348    if '-f' in args or '--image-opts' in args:
349        return qemu_io_args_no_fmt + list(args)
350    else:
351        return qemu_io_args + list(args)
352
353def qemu_io_popen(*args):
354    return qemu_tool_popen(qemu_io_wrap_args(args))
355
356def qemu_io(*args: str, check: bool = True, combine_stdio: bool = True
357            ) -> 'subprocess.CompletedProcess[str]':
358    """
359    Run QEMU_IO_PROG and return the status code and console output.
360
361    This function always prepends either QEMU_IO_OPTIONS or
362    QEMU_IO_OPTIONS_NO_FMT.
363    """
364    return qemu_tool(*qemu_io_wrap_args(args),
365                     check=check, combine_stdio=combine_stdio)
366
367def qemu_io_log(*args: str, check: bool = True
368                ) -> 'subprocess.CompletedProcess[str]':
369    result = qemu_io(*args, check=check)
370    log(result.stdout, filters=[filter_testfiles, filter_qemu_io])
371    return result
372
373class QemuIoInteractive:
374    def __init__(self, *args):
375        self.args = qemu_io_wrap_args(args)
376        # We need to keep the Popen objext around, and not
377        # close it immediately. Therefore, disable the pylint check:
378        # pylint: disable=consider-using-with
379        self._p = subprocess.Popen(self.args, stdin=subprocess.PIPE,
380                                   stdout=subprocess.PIPE,
381                                   stderr=subprocess.STDOUT,
382                                   universal_newlines=True)
383        out = self._p.stdout.read(9)
384        if out != 'qemu-io> ':
385            # Most probably qemu-io just failed to start.
386            # Let's collect the whole output and exit.
387            out += self._p.stdout.read()
388            self._p.wait(timeout=1)
389            raise ValueError(out)
390
391    def close(self):
392        self._p.communicate('q\n')
393
394    def _read_output(self):
395        pattern = 'qemu-io> '
396        n = len(pattern)
397        pos = 0
398        s = []
399        while pos != n:
400            c = self._p.stdout.read(1)
401            # check unexpected EOF
402            assert c != ''
403            s.append(c)
404            if c == pattern[pos]:
405                pos += 1
406            else:
407                pos = 0
408
409        return ''.join(s[:-n])
410
411    def cmd(self, cmd):
412        # quit command is in close(), '\n' is added automatically
413        assert '\n' not in cmd
414        cmd = cmd.strip()
415        assert cmd not in ('q', 'quit')
416        self._p.stdin.write(cmd + '\n')
417        self._p.stdin.flush()
418        return self._read_output()
419
420
421class QemuStorageDaemon:
422    _qmp: Optional[QEMUMonitorProtocol] = None
423    _qmpsock: Optional[str] = None
424    # Python < 3.8 would complain if this type were not a string literal
425    # (importing `annotations` from `__future__` would work; but not on <= 3.6)
426    _p: 'Optional[subprocess.Popen[bytes]]' = None
427
428    def __init__(self, *args: str, instance_id: str = 'a', qmp: bool = False):
429        assert '--pidfile' not in args
430        self.pidfile = os.path.join(test_dir, f'qsd-{instance_id}-pid')
431        all_args = [qsd_prog] + list(args) + ['--pidfile', self.pidfile]
432
433        if qmp:
434            self._qmpsock = os.path.join(sock_dir, f'qsd-{instance_id}.sock')
435            all_args += ['--chardev',
436                         f'socket,id=qmp-sock,path={self._qmpsock}',
437                         '--monitor', 'qmp-sock']
438
439            self._qmp = QEMUMonitorProtocol(self._qmpsock, server=True)
440
441        # Cannot use with here, we want the subprocess to stay around
442        # pylint: disable=consider-using-with
443        self._p = subprocess.Popen(all_args)
444        if self._qmp is not None:
445            self._qmp.accept()
446        while not os.path.exists(self.pidfile):
447            if self._p.poll() is not None:
448                cmd = ' '.join(all_args)
449                raise RuntimeError(
450                    'qemu-storage-daemon terminated with exit code ' +
451                    f'{self._p.returncode}: {cmd}')
452
453            time.sleep(0.01)
454
455        with open(self.pidfile, encoding='utf-8') as f:
456            self._pid = int(f.read().strip())
457
458        assert self._pid == self._p.pid
459
460    def qmp(self, cmd: str, args: Optional[Dict[str, object]] = None) \
461            -> QMPMessage:
462        assert self._qmp is not None
463        return self._qmp.cmd_raw(cmd, args)
464
465    def get_qmp(self) -> QEMUMonitorProtocol:
466        assert self._qmp is not None
467        return self._qmp
468
469    def cmd(self, cmd: str, args: Optional[Dict[str, object]] = None) \
470            -> QMPReturnValue:
471        assert self._qmp is not None
472        return self._qmp.cmd(cmd, **(args or {}))
473
474    def stop(self, kill_signal=15):
475        self._p.send_signal(kill_signal)
476        self._p.wait()
477        self._p = None
478
479        if self._qmp:
480            self._qmp.close()
481
482        if self._qmpsock is not None:
483            try:
484                os.remove(self._qmpsock)
485            except OSError:
486                pass
487        try:
488            os.remove(self.pidfile)
489        except OSError:
490            pass
491
492    def __del__(self):
493        if self._p is not None:
494            self.stop(kill_signal=9)
495
496
497def qemu_nbd(*args):
498    '''Run qemu-nbd in daemon mode and return the parent's exit code'''
499    return subprocess.call(qemu_nbd_args + ['--fork'] + list(args))
500
501def qemu_nbd_early_pipe(*args: str) -> Tuple[int, str]:
502    '''Run qemu-nbd in daemon mode and return both the parent's exit code
503       and its output in case of an error'''
504    full_args = qemu_nbd_args + ['--fork'] + list(args)
505    output, returncode = qemu_tool_pipe_and_status('qemu-nbd', full_args,
506                                                   connect_stderr=False)
507    return returncode, output if returncode else ''
508
509def qemu_nbd_list_log(*args: str) -> str:
510    '''Run qemu-nbd to list remote exports'''
511    full_args = [qemu_nbd_prog, '-L'] + list(args)
512    output, _ = qemu_tool_pipe_and_status('qemu-nbd', full_args)
513    log(output, filters=[filter_testfiles, filter_nbd_exports])
514    return output
515
516@contextmanager
517def qemu_nbd_popen(*args):
518    '''Context manager running qemu-nbd within the context'''
519    pid_file = file_path("qemu_nbd_popen-nbd-pid-file")
520
521    assert not os.path.exists(pid_file)
522
523    cmd = list(qemu_nbd_args)
524    cmd.extend(('--persistent', '--pid-file', pid_file))
525    cmd.extend(args)
526
527    log('Start NBD server')
528    with subprocess.Popen(cmd) as p:
529        try:
530            while not os.path.exists(pid_file):
531                if p.poll() is not None:
532                    raise RuntimeError(
533                        "qemu-nbd terminated with exit code {}: {}"
534                        .format(p.returncode, ' '.join(cmd)))
535
536                time.sleep(0.01)
537            yield
538        finally:
539            if os.path.exists(pid_file):
540                os.remove(pid_file)
541            log('Kill NBD server')
542            p.kill()
543            p.wait()
544
545def compare_images(img1: str, img2: str,
546                   fmt1: str = imgfmt, fmt2: str = imgfmt) -> bool:
547    """
548    Compare two images with QEMU_IMG; return True if they are identical.
549
550    :raise CalledProcessError:
551        when qemu-img crashes or returns a status code of anything other
552        than 0 (identical) or 1 (different).
553    """
554    try:
555        qemu_img('compare', '-f', fmt1, '-F', fmt2, img1, img2)
556        return True
557    except subprocess.CalledProcessError as exc:
558        if exc.returncode == 1:
559            return False
560        raise
561
562def create_image(name, size):
563    '''Create a fully-allocated raw image with sector markers'''
564    with open(name, 'wb') as file:
565        i = 0
566        while i < size:
567            sector = struct.pack('>l504xl', i // 512, i // 512)
568            file.write(sector)
569            i = i + 512
570
571def image_size(img: str) -> int:
572    """Return image's virtual size"""
573    value = qemu_img_info('-f', imgfmt, img)['virtual-size']
574    if not isinstance(value, int):
575        type_name = type(value).__name__
576        raise TypeError("Expected 'int' for 'virtual-size', "
577                        f"got '{value}' of type '{type_name}'")
578    return value
579
580def is_str(val):
581    return isinstance(val, str)
582
583test_dir_re = re.compile(r"%s" % test_dir)
584def filter_test_dir(msg):
585    return test_dir_re.sub("TEST_DIR", msg)
586
587win32_re = re.compile(r"\r")
588def filter_win32(msg):
589    return win32_re.sub("", msg)
590
591qemu_io_re = re.compile(r"[0-9]* ops; [0-9\/:. sec]* "
592                        r"\([0-9\/.inf]* [EPTGMKiBbytes]*\/sec "
593                        r"and [0-9\/.inf]* ops\/sec\)")
594def filter_qemu_io(msg):
595    msg = filter_win32(msg)
596    return qemu_io_re.sub("X ops; XX:XX:XX.X "
597                          "(XXX YYY/sec and XXX ops/sec)", msg)
598
599chown_re = re.compile(r"chown [0-9]+:[0-9]+")
600def filter_chown(msg):
601    return chown_re.sub("chown UID:GID", msg)
602
603def filter_qmp_event(event):
604    '''Filter a QMP event dict'''
605    event = dict(event)
606    if 'timestamp' in event:
607        event['timestamp']['seconds'] = 'SECS'
608        event['timestamp']['microseconds'] = 'USECS'
609    return event
610
611def filter_qmp(qmsg, filter_fn):
612    '''Given a string filter, filter a QMP object's values.
613    filter_fn takes a (key, value) pair.'''
614    # Iterate through either lists or dicts;
615    if isinstance(qmsg, list):
616        items = enumerate(qmsg)
617    elif isinstance(qmsg, dict):
618        items = qmsg.items()
619    else:
620        return filter_fn(None, qmsg)
621
622    for k, v in items:
623        if isinstance(v, (dict, list)):
624            qmsg[k] = filter_qmp(v, filter_fn)
625        else:
626            qmsg[k] = filter_fn(k, v)
627    return qmsg
628
629def filter_testfiles(msg):
630    pref1 = os.path.join(test_dir, "%s-" % (os.getpid()))
631    pref2 = os.path.join(sock_dir, "%s-" % (os.getpid()))
632    return msg.replace(pref1, 'TEST_DIR/PID-').replace(pref2, 'SOCK_DIR/PID-')
633
634def filter_qmp_testfiles(qmsg):
635    def _filter(_key, value):
636        if is_str(value):
637            return filter_testfiles(value)
638        return value
639    return filter_qmp(qmsg, _filter)
640
641def filter_virtio_scsi(output: str) -> str:
642    return re.sub(r'(virtio-scsi)-(ccw|pci)', r'\1', output)
643
644def filter_qmp_virtio_scsi(qmsg):
645    def _filter(_key, value):
646        if is_str(value):
647            return filter_virtio_scsi(value)
648        return value
649    return filter_qmp(qmsg, _filter)
650
651def filter_generated_node_ids(msg):
652    return re.sub("#block[0-9]+", "NODE_NAME", msg)
653
654def filter_qmp_generated_node_ids(qmsg):
655    def _filter(_key, value):
656        if is_str(value):
657            return filter_generated_node_ids(value)
658        return value
659    return filter_qmp(qmsg, _filter)
660
661def filter_img_info(output: str, filename: str,
662                    drop_child_info: bool = True) -> str:
663    lines = []
664    drop_indented = False
665    for line in output.split('\n'):
666        if 'disk size' in line or 'actual-size' in line:
667            continue
668
669        # Drop child node info
670        if drop_indented:
671            if line.startswith(' '):
672                continue
673            drop_indented = False
674        if drop_child_info and "Child node '/" in line:
675            drop_indented = True
676            continue
677
678        line = line.replace(filename, 'TEST_IMG')
679        line = filter_testfiles(line)
680        line = line.replace(imgfmt, 'IMGFMT')
681        line = re.sub('iters: [0-9]+', 'iters: XXX', line)
682        line = re.sub('uuid: [-a-f0-9]+',
683                      'uuid: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX',
684                      line)
685        line = re.sub('cid: [0-9]+', 'cid: XXXXXXXXXX', line)
686        line = re.sub('(compression type: )(zlib|zstd)', r'\1COMPRESSION_TYPE',
687                      line)
688        lines.append(line)
689    return '\n'.join(lines)
690
691def filter_imgfmt(msg):
692    return msg.replace(imgfmt, 'IMGFMT')
693
694def filter_qmp_imgfmt(qmsg):
695    def _filter(_key, value):
696        if is_str(value):
697            return filter_imgfmt(value)
698        return value
699    return filter_qmp(qmsg, _filter)
700
701def filter_nbd_exports(output: str) -> str:
702    return re.sub(r'((min|opt|max) block): [0-9]+', r'\1: XXX', output)
703
704def filter_qtest(output: str) -> str:
705    output = re.sub(r'^\[I \d+\.\d+\] OPENED\n', '', output)
706    output = re.sub(r'\n?\[I \+\d+\.\d+\] CLOSED\n?$', '', output)
707    return output
708
709Msg = TypeVar('Msg', Dict[str, Any], List[Any], str)
710
711def log(msg: Msg,
712        filters: Iterable[Callable[[Msg], Msg]] = (),
713        indent: Optional[int] = None) -> None:
714    """
715    Logs either a string message or a JSON serializable message (like QMP).
716    If indent is provided, JSON serializable messages are pretty-printed.
717    """
718    for flt in filters:
719        msg = flt(msg)
720    if isinstance(msg, (dict, list)):
721        # Don't sort if it's already sorted
722        do_sort = not isinstance(msg, OrderedDict)
723        test_logger.info(json.dumps(msg, sort_keys=do_sort, indent=indent))
724    else:
725        test_logger.info(msg)
726
727class Timeout:
728    def __init__(self, seconds, errmsg="Timeout"):
729        self.seconds = seconds
730        self.errmsg = errmsg
731    def __enter__(self):
732        if qemu_gdb or qemu_valgrind:
733            return self
734        signal.signal(signal.SIGALRM, self.timeout)
735        signal.setitimer(signal.ITIMER_REAL, self.seconds)
736        return self
737    def __exit__(self, exc_type, value, traceback):
738        if qemu_gdb or qemu_valgrind:
739            return False
740        signal.setitimer(signal.ITIMER_REAL, 0)
741        return False
742    def timeout(self, signum, frame):
743        raise TimeoutError(self.errmsg)
744
745def file_pattern(name):
746    return "{0}-{1}".format(os.getpid(), name)
747
748class FilePath:
749    """
750    Context manager generating multiple file names. The generated files are
751    removed when exiting the context.
752
753    Example usage:
754
755        with FilePath('a.img', 'b.img') as (img_a, img_b):
756            # Use img_a and img_b here...
757
758        # a.img and b.img are automatically removed here.
759
760    By default images are created in iotests.test_dir. To create sockets use
761    iotests.sock_dir:
762
763       with FilePath('a.sock', base_dir=iotests.sock_dir) as sock:
764
765    For convenience, calling with one argument yields a single file instead of
766    a tuple with one item.
767
768    """
769    def __init__(self, *names, base_dir=test_dir):
770        self.paths = [os.path.join(base_dir, file_pattern(name))
771                      for name in names]
772
773    def __enter__(self):
774        if len(self.paths) == 1:
775            return self.paths[0]
776        else:
777            return self.paths
778
779    def __exit__(self, exc_type, exc_val, exc_tb):
780        for path in self.paths:
781            try:
782                os.remove(path)
783            except OSError:
784                pass
785        return False
786
787
788def try_remove(img):
789    try:
790        os.remove(img)
791    except OSError:
792        pass
793
794def file_path_remover():
795    for path in reversed(file_path_remover.paths):
796        try_remove(path)
797
798
799def file_path(*names, base_dir=test_dir):
800    ''' Another way to get auto-generated filename that cleans itself up.
801
802    Use is as simple as:
803
804    img_a, img_b = file_path('a.img', 'b.img')
805    sock = file_path('socket')
806    '''
807
808    if not hasattr(file_path_remover, 'paths'):
809        file_path_remover.paths = []
810        atexit.register(file_path_remover)
811
812    paths = []
813    for name in names:
814        filename = file_pattern(name)
815        path = os.path.join(base_dir, filename)
816        file_path_remover.paths.append(path)
817        paths.append(path)
818
819    return paths[0] if len(paths) == 1 else paths
820
821def remote_filename(path):
822    if imgproto == 'file':
823        return path
824    elif imgproto == 'ssh':
825        return "ssh://%s@127.0.0.1:22%s" % (os.environ.get('USER'), path)
826    else:
827        raise ValueError("Protocol %s not supported" % (imgproto))
828
829class VM(qtest.QEMUQtestMachine):
830    '''A QEMU VM'''
831
832    def __init__(self, path_suffix=''):
833        name = "qemu%s-%d" % (path_suffix, os.getpid())
834        timer = 15.0 if not (qemu_gdb or qemu_valgrind) else None
835        if qemu_gdb and qemu_valgrind:
836            sys.stderr.write('gdb and valgrind are mutually exclusive\n')
837            sys.exit(1)
838        wrapper = qemu_gdb if qemu_gdb else qemu_valgrind
839        super().__init__(qemu_prog, qemu_opts, wrapper=wrapper,
840                         name=name,
841                         base_temp_dir=test_dir,
842                         qmp_timer=timer)
843        self._num_drives = 0
844
845    def _post_shutdown(self) -> None:
846        super()._post_shutdown()
847        if not qemu_valgrind or not self._popen:
848            return
849        valgrind_filename = f"{test_dir}/{self._popen.pid}.valgrind"
850        if self.exitcode() == 99:
851            with open(valgrind_filename, encoding='utf-8') as f:
852                print(f.read())
853        else:
854            os.remove(valgrind_filename)
855
856    def _pre_launch(self) -> None:
857        super()._pre_launch()
858        if qemu_print:
859            # set QEMU binary output to stdout
860            self._close_qemu_log_file()
861
862    def add_object(self, opts):
863        self._args.append('-object')
864        self._args.append(opts)
865        return self
866
867    def add_device(self, opts):
868        self._args.append('-device')
869        self._args.append(opts)
870        return self
871
872    def add_drive_raw(self, opts):
873        self._args.append('-drive')
874        self._args.append(opts)
875        return self
876
877    def add_drive(self, path, opts='', interface='virtio', img_format=imgfmt):
878        '''Add a virtio-blk drive to the VM'''
879        options = ['if=%s' % interface,
880                   'id=drive%d' % self._num_drives]
881
882        if path is not None:
883            options.append('file=%s' % path)
884            options.append('format=%s' % img_format)
885            options.append('cache=%s' % cachemode)
886            options.append('aio=%s' % aiomode)
887
888        if opts:
889            options.append(opts)
890
891        if img_format == 'luks' and 'key-secret' not in opts:
892            # default luks support
893            if luks_default_secret_object not in self._args:
894                self.add_object(luks_default_secret_object)
895
896            options.append(luks_default_key_secret_opt)
897
898        self._args.append('-drive')
899        self._args.append(','.join(options))
900        self._num_drives += 1
901        return self
902
903    def add_blockdev(self, opts):
904        self._args.append('-blockdev')
905        if isinstance(opts, str):
906            self._args.append(opts)
907        else:
908            self._args.append(','.join(opts))
909        return self
910
911    def add_incoming(self, addr):
912        self._args.append('-incoming')
913        self._args.append(addr)
914        return self
915
916    def add_paused(self):
917        self._args.append('-S')
918        return self
919
920    def hmp(self, command_line: str, use_log: bool = False) -> QMPMessage:
921        cmd = 'human-monitor-command'
922        kwargs: Dict[str, Any] = {'command-line': command_line}
923        if use_log:
924            return self.qmp_log(cmd, **kwargs)
925        else:
926            return self.qmp(cmd, **kwargs)
927
928    def pause_drive(self, drive: str, event: Optional[str] = None) -> None:
929        """Pause drive r/w operations"""
930        if not event:
931            self.pause_drive(drive, "read_aio")
932            self.pause_drive(drive, "write_aio")
933            return
934        self.hmp(f'qemu-io {drive} "break {event} bp_{drive}"')
935
936    def resume_drive(self, drive: str) -> None:
937        """Resume drive r/w operations"""
938        self.hmp(f'qemu-io {drive} "remove_break bp_{drive}"')
939
940    def hmp_qemu_io(self, drive: str, cmd: str,
941                    use_log: bool = False, qdev: bool = False) -> QMPMessage:
942        """Write to a given drive using an HMP command"""
943        d = '-d ' if qdev else ''
944        return self.hmp(f'qemu-io {d}{drive} "{cmd}"', use_log=use_log)
945
946    def flatten_qmp_object(self, obj, output=None, basestr=''):
947        if output is None:
948            output = {}
949        if isinstance(obj, list):
950            for i, item in enumerate(obj):
951                self.flatten_qmp_object(item, output, basestr + str(i) + '.')
952        elif isinstance(obj, dict):
953            for key in obj:
954                self.flatten_qmp_object(obj[key], output, basestr + key + '.')
955        else:
956            output[basestr[:-1]] = obj # Strip trailing '.'
957        return output
958
959    def qmp_to_opts(self, obj):
960        obj = self.flatten_qmp_object(obj)
961        output_list = []
962        for key in obj:
963            output_list += [key + '=' + obj[key]]
964        return ','.join(output_list)
965
966    def get_qmp_events_filtered(self, wait=60.0):
967        result = []
968        for ev in self.get_qmp_events(wait=wait):
969            result.append(filter_qmp_event(ev))
970        return result
971
972    def qmp_log(self, cmd, filters=(), indent=None, **kwargs):
973        full_cmd = OrderedDict((
974            ("execute", cmd),
975            ("arguments", ordered_qmp(kwargs))
976        ))
977        log(full_cmd, filters, indent=indent)
978        result = self.qmp(cmd, **kwargs)
979        log(result, filters, indent=indent)
980        return result
981
982    # Returns None on success, and an error string on failure
983    def run_job(self, job: str, auto_finalize: bool = True,
984                auto_dismiss: bool = False,
985                pre_finalize: Optional[Callable[[], None]] = None,
986                cancel: bool = False, wait: float = 60.0,
987                filters: Iterable[Callable[[Any], Any]] = (),
988                ) -> Optional[str]:
989        """
990        run_job moves a job from creation through to dismissal.
991
992        :param job: String. ID of recently-launched job
993        :param auto_finalize: Bool. True if the job was launched with
994                              auto_finalize. Defaults to True.
995        :param auto_dismiss: Bool. True if the job was launched with
996                             auto_dismiss=True. Defaults to False.
997        :param pre_finalize: Callback. A callable that takes no arguments to be
998                             invoked prior to issuing job-finalize, if any.
999        :param cancel: Bool. When true, cancels the job after the pre_finalize
1000                       callback.
1001        :param wait: Float. Timeout value specifying how long to wait for any
1002                     event, in seconds. Defaults to 60.0.
1003        """
1004        match_device = {'data': {'device': job}}
1005        match_id = {'data': {'id': job}}
1006        events = [
1007            ('BLOCK_JOB_COMPLETED', match_device),
1008            ('BLOCK_JOB_CANCELLED', match_device),
1009            ('BLOCK_JOB_ERROR', match_device),
1010            ('BLOCK_JOB_READY', match_device),
1011            ('BLOCK_JOB_PENDING', match_id),
1012            ('JOB_STATUS_CHANGE', match_id)
1013        ]
1014        error = None
1015        while True:
1016            ev = filter_qmp_event(self.events_wait(events, timeout=wait))
1017            if ev['event'] != 'JOB_STATUS_CHANGE':
1018                log(ev, filters=filters)
1019                continue
1020            status = ev['data']['status']
1021            if status == 'aborting':
1022                result = self.qmp('query-jobs')
1023                for j in result['return']:
1024                    if j['id'] == job:
1025                        error = j['error']
1026                        log('Job failed: %s' % (j['error']), filters=filters)
1027            elif status == 'ready':
1028                self.qmp_log('job-complete', id=job, filters=filters)
1029            elif status == 'pending' and not auto_finalize:
1030                if pre_finalize:
1031                    pre_finalize()
1032                if cancel:
1033                    self.qmp_log('job-cancel', id=job, filters=filters)
1034                else:
1035                    self.qmp_log('job-finalize', id=job, filters=filters)
1036            elif status == 'concluded' and not auto_dismiss:
1037                self.qmp_log('job-dismiss', id=job, filters=filters)
1038            elif status == 'null':
1039                return error
1040
1041    # Returns None on success, and an error string on failure
1042    def blockdev_create(self, options, job_id='job0', filters=None):
1043        if filters is None:
1044            filters = [filter_qmp_testfiles]
1045        result = self.qmp_log('blockdev-create', filters=filters,
1046                              job_id=job_id, options=options)
1047
1048        if 'return' in result:
1049            assert result['return'] == {}
1050            job_result = self.run_job(job_id, filters=filters)
1051        else:
1052            job_result = result['error']
1053
1054        log("")
1055        return job_result
1056
1057    def enable_migration_events(self, name):
1058        log('Enabling migration QMP events on %s...' % name)
1059        log(self.qmp('migrate-set-capabilities', capabilities=[
1060            {
1061                'capability': 'events',
1062                'state': True
1063            }
1064        ]))
1065
1066    def wait_migration(self, expect_runstate: Optional[str]) -> bool:
1067        while True:
1068            event = self.event_wait('MIGRATION')
1069            # We use the default timeout, and with a timeout, event_wait()
1070            # never returns None
1071            assert event
1072
1073            log(event, filters=[filter_qmp_event])
1074            if event['data']['status'] in ('completed', 'failed'):
1075                break
1076
1077        if event['data']['status'] == 'completed':
1078            # The event may occur in finish-migrate, so wait for the expected
1079            # post-migration runstate
1080            runstate = None
1081            while runstate != expect_runstate:
1082                runstate = self.qmp('query-status')['return']['status']
1083            return True
1084        else:
1085            return False
1086
1087    def node_info(self, node_name):
1088        nodes = self.qmp('query-named-block-nodes')
1089        for x in nodes['return']:
1090            if x['node-name'] == node_name:
1091                return x
1092        return None
1093
1094    def query_bitmaps(self):
1095        res = self.qmp("query-named-block-nodes")
1096        return {device['node-name']: device['dirty-bitmaps']
1097                for device in res['return'] if 'dirty-bitmaps' in device}
1098
1099    def get_bitmap(self, node_name, bitmap_name, recording=None, bitmaps=None):
1100        """
1101        get a specific bitmap from the object returned by query_bitmaps.
1102        :param recording: If specified, filter results by the specified value.
1103        :param bitmaps: If specified, use it instead of call query_bitmaps()
1104        """
1105        if bitmaps is None:
1106            bitmaps = self.query_bitmaps()
1107
1108        for bitmap in bitmaps[node_name]:
1109            if bitmap.get('name', '') == bitmap_name:
1110                if recording is None or bitmap.get('recording') == recording:
1111                    return bitmap
1112        return None
1113
1114    def check_bitmap_status(self, node_name, bitmap_name, fields):
1115        ret = self.get_bitmap(node_name, bitmap_name)
1116
1117        return fields.items() <= ret.items()
1118
1119    def assert_block_path(self, root, path, expected_node, graph=None):
1120        """
1121        Check whether the node under the given path in the block graph
1122        is @expected_node.
1123
1124        @root is the node name of the node where the @path is rooted.
1125
1126        @path is a string that consists of child names separated by
1127        slashes.  It must begin with a slash.
1128
1129        Examples for @root + @path:
1130          - root="qcow2-node", path="/backing/file"
1131          - root="quorum-node", path="/children.2/file"
1132
1133        Hypothetically, @path could be empty, in which case it would
1134        point to @root.  However, in practice this case is not useful
1135        and hence not allowed.
1136
1137        @expected_node may be None.  (All elements of the path but the
1138        leaf must still exist.)
1139
1140        @graph may be None or the result of an x-debug-query-block-graph
1141        call that has already been performed.
1142        """
1143        if graph is None:
1144            graph = self.qmp('x-debug-query-block-graph')['return']
1145
1146        iter_path = iter(path.split('/'))
1147
1148        # Must start with a /
1149        assert next(iter_path) == ''
1150
1151        node = next((node for node in graph['nodes'] if node['name'] == root),
1152                    None)
1153
1154        # An empty @path is not allowed, so the root node must be present
1155        assert node is not None, 'Root node %s not found' % root
1156
1157        for child_name in iter_path:
1158            assert node is not None, 'Cannot follow path %s%s' % (root, path)
1159
1160            try:
1161                node_id = next(edge['child'] for edge in graph['edges']
1162                               if (edge['parent'] == node['id'] and
1163                                   edge['name'] == child_name))
1164
1165                node = next(node for node in graph['nodes']
1166                            if node['id'] == node_id)
1167
1168            except StopIteration:
1169                node = None
1170
1171        if node is None:
1172            assert expected_node is None, \
1173                   'No node found under %s (but expected %s)' % \
1174                   (path, expected_node)
1175        else:
1176            assert node['name'] == expected_node, \
1177                   'Found node %s under %s (but expected %s)' % \
1178                   (node['name'], path, expected_node)
1179
1180index_re = re.compile(r'([^\[]+)\[([^\]]+)\]')
1181
1182class QMPTestCase(unittest.TestCase):
1183    '''Abstract base class for QMP test cases'''
1184
1185    def __init__(self, *args, **kwargs):
1186        super().__init__(*args, **kwargs)
1187        # Many users of this class set a VM property we rely on heavily
1188        # in the methods below.
1189        self.vm = None
1190
1191    def dictpath(self, d, path):
1192        '''Traverse a path in a nested dict'''
1193        for component in path.split('/'):
1194            m = index_re.match(component)
1195            if m:
1196                component, idx = m.groups()
1197                idx = int(idx)
1198
1199            if not isinstance(d, dict) or component not in d:
1200                self.fail(f'failed path traversal for "{path}" in "{d}"')
1201            d = d[component]
1202
1203            if m:
1204                if not isinstance(d, list):
1205                    self.fail(f'path component "{component}" in "{path}" '
1206                              f'is not a list in "{d}"')
1207                try:
1208                    d = d[idx]
1209                except IndexError:
1210                    self.fail(f'invalid index "{idx}" in path "{path}" '
1211                              f'in "{d}"')
1212        return d
1213
1214    def assert_qmp_absent(self, d, path):
1215        try:
1216            result = self.dictpath(d, path)
1217        except AssertionError:
1218            return
1219        self.fail('path "%s" has value "%s"' % (path, str(result)))
1220
1221    def assert_qmp(self, d, path, value):
1222        '''Assert that the value for a specific path in a QMP dict
1223           matches.  When given a list of values, assert that any of
1224           them matches.'''
1225
1226        result = self.dictpath(d, path)
1227
1228        # [] makes no sense as a list of valid values, so treat it as
1229        # an actual single value.
1230        if isinstance(value, list) and value != []:
1231            for v in value:
1232                if result == v:
1233                    return
1234            self.fail('no match for "%s" in %s' % (str(result), str(value)))
1235        else:
1236            self.assertEqual(result, value,
1237                             '"%s" is "%s", expected "%s"'
1238                             % (path, str(result), str(value)))
1239
1240    def assert_no_active_block_jobs(self):
1241        result = self.vm.qmp('query-block-jobs')
1242        self.assert_qmp(result, 'return', [])
1243
1244    def assert_has_block_node(self, node_name=None, file_name=None):
1245        """Issue a query-named-block-nodes and assert node_name and/or
1246        file_name is present in the result"""
1247        def check_equal_or_none(a, b):
1248            return a is None or b is None or a == b
1249        assert node_name or file_name
1250        result = self.vm.qmp('query-named-block-nodes')
1251        for x in result["return"]:
1252            if check_equal_or_none(x.get("node-name"), node_name) and \
1253                    check_equal_or_none(x.get("file"), file_name):
1254                return
1255        self.fail("Cannot find %s %s in result:\n%s" %
1256                  (node_name, file_name, result))
1257
1258    def assert_json_filename_equal(self, json_filename, reference):
1259        '''Asserts that the given filename is a json: filename and that its
1260           content is equal to the given reference object'''
1261        self.assertEqual(json_filename[:5], 'json:')
1262        self.assertEqual(
1263            self.vm.flatten_qmp_object(json.loads(json_filename[5:])),
1264            self.vm.flatten_qmp_object(reference)
1265        )
1266
1267    def cancel_and_wait(self, drive='drive0', force=False,
1268                        resume=False, wait=60.0):
1269        '''Cancel a block job and wait for it to finish, returning the event'''
1270        self.vm.cmd('block-job-cancel', device=drive, force=force)
1271
1272        if resume:
1273            self.vm.resume_drive(drive)
1274
1275        cancelled = False
1276        result = None
1277        while not cancelled:
1278            for event in self.vm.get_qmp_events(wait=wait):
1279                if event['event'] == 'BLOCK_JOB_COMPLETED' or \
1280                   event['event'] == 'BLOCK_JOB_CANCELLED':
1281                    self.assert_qmp(event, 'data/device', drive)
1282                    result = event
1283                    cancelled = True
1284                elif event['event'] == 'JOB_STATUS_CHANGE':
1285                    self.assert_qmp(event, 'data/id', drive)
1286
1287
1288        self.assert_no_active_block_jobs()
1289        return result
1290
1291    def wait_until_completed(self, drive='drive0', check_offset=True,
1292                             wait=60.0, error=None):
1293        '''Wait for a block job to finish, returning the event'''
1294        while True:
1295            for event in self.vm.get_qmp_events(wait=wait):
1296                if event['event'] == 'BLOCK_JOB_COMPLETED':
1297                    self.assert_qmp(event, 'data/device', drive)
1298                    if error is None:
1299                        self.assert_qmp_absent(event, 'data/error')
1300                        if check_offset:
1301                            self.assert_qmp(event, 'data/offset',
1302                                            event['data']['len'])
1303                    else:
1304                        self.assert_qmp(event, 'data/error', error)
1305                    self.assert_no_active_block_jobs()
1306                    return event
1307                if event['event'] == 'JOB_STATUS_CHANGE':
1308                    self.assert_qmp(event, 'data/id', drive)
1309
1310    def wait_ready(self, drive='drive0'):
1311        """Wait until a BLOCK_JOB_READY event, and return the event."""
1312        return self.vm.events_wait([
1313            ('BLOCK_JOB_READY',
1314             {'data': {'type': 'mirror', 'device': drive}}),
1315            ('BLOCK_JOB_READY',
1316             {'data': {'type': 'commit', 'device': drive}})
1317        ])
1318
1319    def wait_ready_and_cancel(self, drive='drive0'):
1320        self.wait_ready(drive=drive)
1321        event = self.cancel_and_wait(drive=drive)
1322        self.assertEqual(event['event'], 'BLOCK_JOB_COMPLETED')
1323        self.assert_qmp(event, 'data/type', 'mirror')
1324        self.assert_qmp(event, 'data/offset', event['data']['len'])
1325
1326    def complete_and_wait(self, drive='drive0', wait_ready=True,
1327                          completion_error=None):
1328        '''Complete a block job and wait for it to finish'''
1329        if wait_ready:
1330            self.wait_ready(drive=drive)
1331
1332        self.vm.cmd('block-job-complete', device=drive)
1333
1334        event = self.wait_until_completed(drive=drive, error=completion_error)
1335        self.assertTrue(event['data']['type'] in ['mirror', 'commit'])
1336
1337    def pause_wait(self, job_id='job0'):
1338        with Timeout(3, "Timeout waiting for job to pause"):
1339            while True:
1340                result = self.vm.qmp('query-block-jobs')
1341                found = False
1342                for job in result['return']:
1343                    if job['device'] == job_id:
1344                        found = True
1345                        if job['paused'] and not job['busy']:
1346                            return job
1347                        break
1348                assert found
1349
1350    def pause_job(self, job_id='job0', wait=True):
1351        self.vm.cmd('block-job-pause', device=job_id)
1352        if wait:
1353            self.pause_wait(job_id)
1354
1355    def case_skip(self, reason):
1356        '''Skip this test case'''
1357        case_notrun(reason)
1358        self.skipTest(reason)
1359
1360
1361def notrun(reason):
1362    '''Skip this test suite'''
1363    # Each test in qemu-iotests has a number ("seq")
1364    seq = os.path.basename(sys.argv[0])
1365
1366    with open('%s/%s.notrun' % (test_dir, seq), 'w', encoding='utf-8') \
1367            as outfile:
1368        outfile.write(reason + '\n')
1369    logger.warning("%s not run: %s", seq, reason)
1370    sys.exit(0)
1371
1372def case_notrun(reason):
1373    '''Mark this test case as not having been run (without actually
1374    skipping it, that is left to the caller).  See
1375    QMPTestCase.case_skip() for a variant that actually skips the
1376    current test case.'''
1377
1378    # Each test in qemu-iotests has a number ("seq")
1379    seq = os.path.basename(sys.argv[0])
1380
1381    with open('%s/%s.casenotrun' % (test_dir, seq), 'a', encoding='utf-8') \
1382            as outfile:
1383        outfile.write('    [case not run] ' + reason + '\n')
1384
1385def _verify_image_format(supported_fmts: Sequence[str] = (),
1386                         unsupported_fmts: Sequence[str] = ()) -> None:
1387    if 'generic' in supported_fmts and \
1388            os.environ.get('IMGFMT_GENERIC', 'true') == 'true':
1389        # similar to
1390        #   _supported_fmt generic
1391        # for bash tests
1392        supported_fmts = ()
1393
1394    not_sup = supported_fmts and (imgfmt not in supported_fmts)
1395    if not_sup or (imgfmt in unsupported_fmts):
1396        notrun('not suitable for this image format: %s' % imgfmt)
1397
1398    if imgfmt == 'luks':
1399        verify_working_luks()
1400
1401def _verify_protocol(supported: Sequence[str] = (),
1402                     unsupported: Sequence[str] = ()) -> None:
1403    assert not (supported and unsupported)
1404
1405    if 'generic' in supported:
1406        return
1407
1408    not_sup = supported and (imgproto not in supported)
1409    if not_sup or (imgproto in unsupported):
1410        notrun('not suitable for this protocol: %s' % imgproto)
1411
1412def _verify_platform(supported: Sequence[str] = (),
1413                     unsupported: Sequence[str] = ()) -> None:
1414    if any((sys.platform.startswith(x) for x in unsupported)):
1415        notrun('not suitable for this OS: %s' % sys.platform)
1416
1417    if supported:
1418        if not any((sys.platform.startswith(x) for x in supported)):
1419            notrun('not suitable for this OS: %s' % sys.platform)
1420
1421def _verify_cache_mode(supported_cache_modes: Sequence[str] = ()) -> None:
1422    if supported_cache_modes and (cachemode not in supported_cache_modes):
1423        notrun('not suitable for this cache mode: %s' % cachemode)
1424
1425def _verify_aio_mode(supported_aio_modes: Sequence[str] = ()) -> None:
1426    if supported_aio_modes and (aiomode not in supported_aio_modes):
1427        notrun('not suitable for this aio mode: %s' % aiomode)
1428
1429def _verify_formats(required_formats: Sequence[str] = ()) -> None:
1430    usf_list = list(set(required_formats) - set(supported_formats()))
1431    if usf_list:
1432        notrun(f'formats {usf_list} are not whitelisted')
1433
1434
1435def _verify_virtio_blk() -> None:
1436    out = qemu_pipe('-M', 'none', '-device', 'help')
1437    if 'virtio-blk' not in out:
1438        notrun('Missing virtio-blk in QEMU binary')
1439
1440def verify_virtio_scsi_pci_or_ccw() -> None:
1441    out = qemu_pipe('-M', 'none', '-device', 'help')
1442    if 'virtio-scsi-pci' not in out and 'virtio-scsi-ccw' not in out:
1443        notrun('Missing virtio-scsi-pci or virtio-scsi-ccw in QEMU binary')
1444
1445
1446def _verify_imgopts(unsupported: Sequence[str] = ()) -> None:
1447    imgopts = os.environ.get('IMGOPTS')
1448    # One of usage examples for IMGOPTS is "data_file=$TEST_IMG.ext_data_file"
1449    # but it supported only for bash tests. We don't have a concept of global
1450    # TEST_IMG in iotests.py, not saying about somehow parsing $variables.
1451    # So, for simplicity let's just not support any IMGOPTS with '$' inside.
1452    unsup = list(unsupported) + ['$']
1453    if imgopts and any(x in imgopts for x in unsup):
1454        notrun(f'not suitable for this imgopts: {imgopts}')
1455
1456
1457def supports_quorum() -> bool:
1458    return 'quorum' in qemu_img('--help').stdout
1459
1460def verify_quorum():
1461    '''Skip test suite if quorum support is not available'''
1462    if not supports_quorum():
1463        notrun('quorum support missing')
1464
1465def has_working_luks() -> Tuple[bool, str]:
1466    """
1467    Check whether our LUKS driver can actually create images
1468    (this extends to LUKS encryption for qcow2).
1469
1470    If not, return the reason why.
1471    """
1472
1473    img_file = f'{test_dir}/luks-test.luks'
1474    res = qemu_img('create', '-f', 'luks',
1475                   '--object', luks_default_secret_object,
1476                   '-o', luks_default_key_secret_opt,
1477                   '-o', 'iter-time=10',
1478                   img_file, '1G',
1479                   check=False)
1480    try:
1481        os.remove(img_file)
1482    except OSError:
1483        pass
1484
1485    if res.returncode:
1486        reason = res.stdout
1487        for line in res.stdout.splitlines():
1488            if img_file + ':' in line:
1489                reason = line.split(img_file + ':', 1)[1].strip()
1490                break
1491
1492        return (False, reason)
1493    else:
1494        return (True, '')
1495
1496def verify_working_luks():
1497    """
1498    Skip test suite if LUKS does not work
1499    """
1500    (working, reason) = has_working_luks()
1501    if not working:
1502        notrun(reason)
1503
1504def supports_qcow2_zstd_compression() -> bool:
1505    img_file = f'{test_dir}/qcow2-zstd-test.qcow2'
1506    res = qemu_img('create', '-f', 'qcow2', '-o', 'compression_type=zstd',
1507                   img_file, '0',
1508                   check=False)
1509    try:
1510        os.remove(img_file)
1511    except OSError:
1512        pass
1513
1514    if res.returncode == 1 and \
1515            "'compression-type' does not accept value 'zstd'" in res.stdout:
1516        return False
1517    else:
1518        return True
1519
1520def verify_qcow2_zstd_compression():
1521    if not supports_qcow2_zstd_compression():
1522        notrun('zstd compression not supported')
1523
1524def qemu_pipe(*args: str) -> str:
1525    """
1526    Run qemu with an option to print something and exit (e.g. a help option).
1527
1528    :return: QEMU's stdout output.
1529    """
1530    full_args = [qemu_prog] + qemu_opts + list(args)
1531    output, _ = qemu_tool_pipe_and_status('qemu', full_args)
1532    return output
1533
1534def supported_formats(read_only=False):
1535    '''Set 'read_only' to True to check ro-whitelist
1536       Otherwise, rw-whitelist is checked'''
1537
1538    if not hasattr(supported_formats, "formats"):
1539        supported_formats.formats = {}
1540
1541    if read_only not in supported_formats.formats:
1542        format_message = qemu_pipe("-drive", "format=help")
1543        line = 1 if read_only else 0
1544        supported_formats.formats[read_only] = \
1545            format_message.splitlines()[line].split(":")[1].split()
1546
1547    return supported_formats.formats[read_only]
1548
1549def skip_if_unsupported(required_formats=(), read_only=False):
1550    '''Skip Test Decorator
1551       Runs the test if all the required formats are whitelisted'''
1552    def skip_test_decorator(func):
1553        def func_wrapper(test_case: QMPTestCase, *args: List[Any],
1554                         **kwargs: Dict[str, Any]) -> None:
1555            if callable(required_formats):
1556                fmts = required_formats(test_case)
1557            else:
1558                fmts = required_formats
1559
1560            usf_list = list(set(fmts) - set(supported_formats(read_only)))
1561            if usf_list:
1562                msg = f'{test_case}: formats {usf_list} are not whitelisted'
1563                test_case.case_skip(msg)
1564            else:
1565                func(test_case, *args, **kwargs)
1566        return func_wrapper
1567    return skip_test_decorator
1568
1569def skip_for_formats(formats: Sequence[str] = ()) \
1570    -> Callable[[Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]],
1571                Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]]:
1572    '''Skip Test Decorator
1573       Skips the test for the given formats'''
1574    def skip_test_decorator(func):
1575        def func_wrapper(test_case: QMPTestCase, *args: List[Any],
1576                         **kwargs: Dict[str, Any]) -> None:
1577            if imgfmt in formats:
1578                msg = f'{test_case}: Skipped for format {imgfmt}'
1579                test_case.case_skip(msg)
1580            else:
1581                func(test_case, *args, **kwargs)
1582        return func_wrapper
1583    return skip_test_decorator
1584
1585def skip_if_user_is_root(func):
1586    '''Skip Test Decorator
1587       Runs the test only without root permissions'''
1588    def func_wrapper(*args, **kwargs):
1589        if os.getuid() == 0:
1590            case_notrun('{}: cannot be run as root'.format(args[0]))
1591            return None
1592        else:
1593            return func(*args, **kwargs)
1594    return func_wrapper
1595
1596# We need to filter out the time taken from the output so that
1597# qemu-iotest can reliably diff the results against master output,
1598# and hide skipped tests from the reference output.
1599
1600class ReproducibleTestResult(unittest.TextTestResult):
1601    def addSkip(self, test, reason):
1602        # Same as TextTestResult, but print dot instead of "s"
1603        unittest.TestResult.addSkip(self, test, reason)
1604        if self.showAll:
1605            self.stream.writeln("skipped {0!r}".format(reason))
1606        elif self.dots:
1607            self.stream.write(".")
1608            self.stream.flush()
1609
1610class ReproducibleStreamWrapper:
1611    def __init__(self, stream: TextIO):
1612        self.stream = stream
1613
1614    def __getattr__(self, attr):
1615        if attr in ('stream', '__getstate__'):
1616            raise AttributeError(attr)
1617        return getattr(self.stream, attr)
1618
1619    def write(self, arg=None):
1620        arg = re.sub(r'Ran (\d+) tests? in [\d.]+s', r'Ran \1 tests', arg)
1621        arg = re.sub(r' \(skipped=\d+\)', r'', arg)
1622        self.stream.write(arg)
1623
1624class ReproducibleTestRunner(unittest.TextTestRunner):
1625    def __init__(
1626        self,
1627        stream: Optional[TextIO] = None,
1628        resultclass: Type[unittest.TextTestResult] =
1629        ReproducibleTestResult,
1630        **kwargs: Any
1631    ) -> None:
1632        rstream = ReproducibleStreamWrapper(stream or sys.stdout)
1633        super().__init__(stream=rstream,           # type: ignore
1634                         descriptions=True,
1635                         resultclass=resultclass,
1636                         **kwargs)
1637
1638def execute_unittest(argv: List[str], debug: bool = False) -> None:
1639    """Executes unittests within the calling module."""
1640
1641    # Some tests have warnings, especially ResourceWarnings for unclosed
1642    # files and sockets.  Ignore them for now to ensure reproducibility of
1643    # the test output.
1644    unittest.main(argv=argv,
1645                  testRunner=ReproducibleTestRunner,
1646                  verbosity=2 if debug else 1,
1647                  warnings=None if sys.warnoptions else 'ignore')
1648
1649def execute_setup_common(supported_fmts: Sequence[str] = (),
1650                         supported_platforms: Sequence[str] = (),
1651                         supported_cache_modes: Sequence[str] = (),
1652                         supported_aio_modes: Sequence[str] = (),
1653                         unsupported_fmts: Sequence[str] = (),
1654                         supported_protocols: Sequence[str] = (),
1655                         unsupported_protocols: Sequence[str] = (),
1656                         required_fmts: Sequence[str] = (),
1657                         unsupported_imgopts: Sequence[str] = ()) -> bool:
1658    """
1659    Perform necessary setup for either script-style or unittest-style tests.
1660
1661    :return: Bool; Whether or not debug mode has been requested via the CLI.
1662    """
1663    # Note: Python 3.6 and pylint do not like 'Collection' so use 'Sequence'.
1664
1665    debug = '-d' in sys.argv
1666    if debug:
1667        sys.argv.remove('-d')
1668    logging.basicConfig(level=(logging.DEBUG if debug else logging.WARN))
1669
1670    _verify_image_format(supported_fmts, unsupported_fmts)
1671    _verify_protocol(supported_protocols, unsupported_protocols)
1672    _verify_platform(supported=supported_platforms)
1673    _verify_cache_mode(supported_cache_modes)
1674    _verify_aio_mode(supported_aio_modes)
1675    _verify_formats(required_fmts)
1676    _verify_virtio_blk()
1677    _verify_imgopts(unsupported_imgopts)
1678
1679    return debug
1680
1681def execute_test(*args, test_function=None, **kwargs):
1682    """Run either unittest or script-style tests."""
1683
1684    debug = execute_setup_common(*args, **kwargs)
1685    if not test_function:
1686        execute_unittest(sys.argv, debug)
1687    else:
1688        test_function()
1689
1690def activate_logging():
1691    """Activate iotests.log() output to stdout for script-style tests."""
1692    handler = logging.StreamHandler(stream=sys.stdout)
1693    formatter = logging.Formatter('%(message)s')
1694    handler.setFormatter(formatter)
1695    test_logger.addHandler(handler)
1696    test_logger.setLevel(logging.INFO)
1697    test_logger.propagate = False
1698
1699# This is called from script-style iotests without a single point of entry
1700def script_initialize(*args, **kwargs):
1701    """Initialize script-style tests without running any tests."""
1702    activate_logging()
1703    execute_setup_common(*args, **kwargs)
1704
1705# This is called from script-style iotests with a single point of entry
1706def script_main(test_function, *args, **kwargs):
1707    """Run script-style tests outside of the unittest framework"""
1708    activate_logging()
1709    execute_test(*args, test_function=test_function, **kwargs)
1710
1711# This is called from unittest style iotests
1712def main(*args, **kwargs):
1713    """Run tests using the unittest framework"""
1714    execute_test(*args, **kwargs)
1715