xref: /qemu/tests/qemu-iotests/iotests.py (revision 599f2762ed8c86a6eea03b9f91d49d14a874a95c)
1 # Common utilities and Python wrappers for qemu-iotests
2 #
3 # Copyright (C) 2012 IBM Corp.
4 #
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 2 of the License, or
8 # (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU General Public License for more details.
14 #
15 # You should have received a copy of the GNU General Public License
16 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 
19 import argparse
20 import atexit
21 import bz2
22 from collections import OrderedDict
23 import faulthandler
24 import json
25 import logging
26 import os
27 import re
28 import shutil
29 import signal
30 import struct
31 import subprocess
32 import sys
33 import time
34 from typing import (Any, Callable, Dict, Iterable, Iterator,
35                     List, Optional, Sequence, TextIO, Tuple, Type, TypeVar)
36 import unittest
37 
38 from contextlib import contextmanager
39 
40 from qemu.machine import qtest
41 from qemu.qmp.legacy import QMPMessage, QMPReturnValue, QEMUMonitorProtocol
42 from qemu.utils import VerboseProcessError
43 
44 # Use this logger for logging messages directly from the iotests module
45 logger = logging.getLogger('qemu.iotests')
46 logger.addHandler(logging.NullHandler())
47 
48 # Use this logger for messages that ought to be used for diff output.
49 test_logger = logging.getLogger('qemu.iotests.diff_io')
50 
51 
52 faulthandler.enable()
53 
54 # This will not work if arguments contain spaces but is necessary if we
55 # want to support the override options that ./check supports.
56 qemu_img_args = [os.environ.get('QEMU_IMG_PROG', 'qemu-img')]
57 if os.environ.get('QEMU_IMG_OPTIONS'):
58     qemu_img_args += os.environ['QEMU_IMG_OPTIONS'].strip().split(' ')
59 
60 qemu_io_args = [os.environ.get('QEMU_IO_PROG', 'qemu-io')]
61 if os.environ.get('QEMU_IO_OPTIONS'):
62     qemu_io_args += os.environ['QEMU_IO_OPTIONS'].strip().split(' ')
63 
64 qemu_io_args_no_fmt = [os.environ.get('QEMU_IO_PROG', 'qemu-io')]
65 if os.environ.get('QEMU_IO_OPTIONS_NO_FMT'):
66     qemu_io_args_no_fmt += \
67         os.environ['QEMU_IO_OPTIONS_NO_FMT'].strip().split(' ')
68 
69 qemu_nbd_prog = os.environ.get('QEMU_NBD_PROG', 'qemu-nbd')
70 qemu_nbd_args = [qemu_nbd_prog]
71 if os.environ.get('QEMU_NBD_OPTIONS'):
72     qemu_nbd_args += os.environ['QEMU_NBD_OPTIONS'].strip().split(' ')
73 
74 qemu_prog = os.environ.get('QEMU_PROG', 'qemu')
75 qemu_opts = os.environ.get('QEMU_OPTIONS', '').strip().split(' ')
76 
77 qsd_prog = os.environ.get('QSD_PROG', 'qemu-storage-daemon')
78 
79 gdb_qemu_env = os.environ.get('GDB_OPTIONS')
80 qemu_gdb = []
81 if gdb_qemu_env:
82     qemu_gdb = ['gdbserver'] + gdb_qemu_env.strip().split(' ')
83 
84 qemu_print = os.environ.get('PRINT_QEMU', False)
85 
86 imgfmt = os.environ.get('IMGFMT', 'raw')
87 imgproto = os.environ.get('IMGPROTO', 'file')
88 
89 try:
90     test_dir = os.environ['TEST_DIR']
91     sock_dir = os.environ['SOCK_DIR']
92     cachemode = os.environ['CACHEMODE']
93     aiomode = os.environ['AIOMODE']
94     qemu_default_machine = os.environ['QEMU_DEFAULT_MACHINE']
95 except KeyError:
96     # We are using these variables as proxies to indicate that we're
97     # not being run via "check". There may be other things set up by
98     # "check" that individual test cases rely on.
99     sys.stderr.write('Please run this test via the "check" script\n')
100     sys.exit(os.EX_USAGE)
101 
102 qemu_valgrind = []
103 if os.environ.get('VALGRIND_QEMU') == "y" and \
104     os.environ.get('NO_VALGRIND') != "y":
105     valgrind_logfile = "--log-file=" + test_dir
106     # %p allows to put the valgrind process PID, since
107     # we don't know it a priori (subprocess.Popen is
108     # not yet invoked)
109     valgrind_logfile += "/%p.valgrind"
110 
111     qemu_valgrind = ['valgrind', valgrind_logfile, '--error-exitcode=99']
112 
113 luks_default_secret_object = 'secret,id=keysec0,data=' + \
114                              os.environ.get('IMGKEYSECRET', '')
115 luks_default_key_secret_opt = 'key-secret=keysec0'
116 
117 sample_img_dir = os.environ['SAMPLE_IMG_DIR']
118 
119 
120 @contextmanager
121 def change_log_level(
122         logger_name: str, level: int = logging.CRITICAL) -> Iterator[None]:
123     """
124     Utility function for temporarily changing the log level of a logger.
125 
126     This can be used to silence errors that are expected or uninteresting.
127     """
128     _logger = logging.getLogger(logger_name)
129     current_level = _logger.level
130     _logger.setLevel(level)
131 
132     try:
133         yield
134     finally:
135         _logger.setLevel(current_level)
136 
137 
138 def unarchive_sample_image(sample, fname):
139     sample_fname = os.path.join(sample_img_dir, sample + '.bz2')
140     with bz2.open(sample_fname) as f_in, open(fname, 'wb') as f_out:
141         shutil.copyfileobj(f_in, f_out)
142 
143 
144 def qemu_tool_popen(args: Sequence[str],
145                     connect_stderr: bool = True) -> 'subprocess.Popen[str]':
146     stderr = subprocess.STDOUT if connect_stderr else None
147     # pylint: disable=consider-using-with
148     return subprocess.Popen(args,
149                             stdout=subprocess.PIPE,
150                             stderr=stderr,
151                             universal_newlines=True)
152 
153 
154 def qemu_tool_pipe_and_status(tool: str, args: Sequence[str],
155                               connect_stderr: bool = True,
156                               drop_successful_output: bool = False) \
157         -> Tuple[str, int]:
158     """
159     Run a tool and return both its output and its exit code
160     """
161     with qemu_tool_popen(args, connect_stderr) as subp:
162         output = subp.communicate()[0]
163         if subp.returncode < 0:
164             cmd = ' '.join(args)
165             sys.stderr.write(f'{tool} received signal \
166                                {-subp.returncode}: {cmd}\n')
167         if drop_successful_output and subp.returncode == 0:
168             output = ''
169         return (output, subp.returncode)
170 
171 def qemu_img_create_prepare_args(args: List[str]) -> List[str]:
172     if not args or args[0] != 'create':
173         return list(args)
174     args = args[1:]
175 
176     p = argparse.ArgumentParser(allow_abbrev=False)
177     # -o option may be specified several times
178     p.add_argument('-o', action='append', default=[])
179     p.add_argument('-f')
180     parsed, remaining = p.parse_known_args(args)
181 
182     opts_list = parsed.o
183 
184     result = ['create']
185     if parsed.f is not None:
186         result += ['-f', parsed.f]
187 
188     # IMGOPTS most probably contain options specific for the selected format,
189     # like extended_l2 or compression_type for qcow2. Test may want to create
190     # additional images in other formats that doesn't support these options.
191     # So, use IMGOPTS only for images created in imgfmt format.
192     imgopts = os.environ.get('IMGOPTS')
193     if imgopts and parsed.f == imgfmt:
194         opts_list.insert(0, imgopts)
195 
196     # default luks support
197     if parsed.f == 'luks' and \
198             all('key-secret' not in opts for opts in opts_list):
199         result += ['--object', luks_default_secret_object]
200         opts_list.append(luks_default_key_secret_opt)
201 
202     for opts in opts_list:
203         result += ['-o', opts]
204 
205     result += remaining
206 
207     return result
208 
209 
210 def qemu_tool(*args: str, check: bool = True, combine_stdio: bool = True
211               ) -> 'subprocess.CompletedProcess[str]':
212     """
213     Run a qemu tool and return its status code and console output.
214 
215     :param args: full command line to run.
216     :param check: Enforce a return code of zero.
217     :param combine_stdio: set to False to keep stdout/stderr separated.
218 
219     :raise VerboseProcessError:
220         When the return code is negative, or on any non-zero exit code
221         when 'check=True' was provided (the default). This exception has
222         'stdout', 'stderr', and 'returncode' properties that may be
223         inspected to show greater detail. If this exception is not
224         handled, the command-line, return code, and all console output
225         will be included at the bottom of the stack trace.
226 
227     :return:
228         a CompletedProcess. This object has args, returncode, and stdout
229         properties. If streams are not combined, it will also have a
230         stderr property.
231     """
232     subp = subprocess.run(
233         args,
234         stdout=subprocess.PIPE,
235         stderr=subprocess.STDOUT if combine_stdio else subprocess.PIPE,
236         universal_newlines=True,
237         check=False
238     )
239 
240     if check and subp.returncode or (subp.returncode < 0):
241         raise VerboseProcessError(
242             subp.returncode, args,
243             output=subp.stdout,
244             stderr=subp.stderr,
245         )
246 
247     return subp
248 
249 
250 def qemu_img(*args: str, check: bool = True, combine_stdio: bool = True
251              ) -> 'subprocess.CompletedProcess[str]':
252     """
253     Run QEMU_IMG_PROG and return its status code and console output.
254 
255     This function always prepends QEMU_IMG_OPTIONS and may further alter
256     the args for 'create' commands.
257 
258     See `qemu_tool()` for greater detail.
259     """
260     full_args = qemu_img_args + qemu_img_create_prepare_args(list(args))
261     return qemu_tool(*full_args, check=check, combine_stdio=combine_stdio)
262 
263 
264 def ordered_qmp(qmsg, conv_keys=True):
265     # Dictionaries are not ordered prior to 3.6, therefore:
266     if isinstance(qmsg, list):
267         return [ordered_qmp(atom) for atom in qmsg]
268     if isinstance(qmsg, dict):
269         od = OrderedDict()
270         for k, v in sorted(qmsg.items()):
271             if conv_keys:
272                 k = k.replace('_', '-')
273             od[k] = ordered_qmp(v, conv_keys=False)
274         return od
275     return qmsg
276 
277 def qemu_img_create(*args: str) -> 'subprocess.CompletedProcess[str]':
278     return qemu_img('create', *args)
279 
280 def qemu_img_json(*args: str) -> Any:
281     """
282     Run qemu-img and return its output as deserialized JSON.
283 
284     :raise CalledProcessError:
285         When qemu-img crashes, or returns a non-zero exit code without
286         producing a valid JSON document to stdout.
287     :raise JSONDecoderError:
288         When qemu-img returns 0, but failed to produce a valid JSON document.
289 
290     :return: A deserialized JSON object; probably a dict[str, Any].
291     """
292     try:
293         res = qemu_img(*args, combine_stdio=False)
294     except subprocess.CalledProcessError as exc:
295         # Terminated due to signal. Don't bother.
296         if exc.returncode < 0:
297             raise
298 
299         # Commands like 'check' can return failure (exit codes 2 and 3)
300         # to indicate command completion, but with errors found. For
301         # multi-command flexibility, ignore the exact error codes and
302         # *try* to load JSON.
303         try:
304             return json.loads(exc.stdout)
305         except json.JSONDecodeError:
306             # Nope. This thing is toast. Raise the /process/ error.
307             pass
308         raise
309 
310     return json.loads(res.stdout)
311 
312 def qemu_img_measure(*args: str) -> Any:
313     return qemu_img_json("measure", "--output", "json", *args)
314 
315 def qemu_img_check(*args: str) -> Any:
316     return qemu_img_json("check", "--output", "json", *args)
317 
318 def qemu_img_info(*args: str) -> Any:
319     return qemu_img_json('info', "--output", "json", *args)
320 
321 def qemu_img_map(*args: str) -> Any:
322     return qemu_img_json('map', "--output", "json", *args)
323 
324 def qemu_img_log(*args: str, check: bool = True
325                  ) -> 'subprocess.CompletedProcess[str]':
326     result = qemu_img(*args, check=check)
327     log(result.stdout, filters=[filter_testfiles])
328     return result
329 
330 def img_info_log(filename: str, filter_path: Optional[str] = None,
331                  use_image_opts: bool = False, extra_args: Sequence[str] = (),
332                  check: bool = True, drop_child_info: bool = True,
333                  ) -> None:
334     args = ['info']
335     if use_image_opts:
336         args.append('--image-opts')
337     else:
338         args += ['-f', imgfmt]
339     args += extra_args
340     args.append(filename)
341 
342     output = qemu_img(*args, check=check).stdout
343     if not filter_path:
344         filter_path = filename
345     log(filter_img_info(output, filter_path, drop_child_info))
346 
347 def qemu_io_wrap_args(args: Sequence[str]) -> List[str]:
348     if '-f' in args or '--image-opts' in args:
349         return qemu_io_args_no_fmt + list(args)
350     else:
351         return qemu_io_args + list(args)
352 
353 def qemu_io_popen(*args):
354     return qemu_tool_popen(qemu_io_wrap_args(args))
355 
356 def qemu_io(*args: str, check: bool = True, combine_stdio: bool = True
357             ) -> 'subprocess.CompletedProcess[str]':
358     """
359     Run QEMU_IO_PROG and return the status code and console output.
360 
361     This function always prepends either QEMU_IO_OPTIONS or
362     QEMU_IO_OPTIONS_NO_FMT.
363     """
364     return qemu_tool(*qemu_io_wrap_args(args),
365                      check=check, combine_stdio=combine_stdio)
366 
367 def qemu_io_log(*args: str, check: bool = True
368                 ) -> 'subprocess.CompletedProcess[str]':
369     result = qemu_io(*args, check=check)
370     log(result.stdout, filters=[filter_testfiles, filter_qemu_io])
371     return result
372 
373 class QemuIoInteractive:
374     def __init__(self, *args):
375         self.args = qemu_io_wrap_args(args)
376         # We need to keep the Popen objext around, and not
377         # close it immediately. Therefore, disable the pylint check:
378         # pylint: disable=consider-using-with
379         self._p = subprocess.Popen(self.args, stdin=subprocess.PIPE,
380                                    stdout=subprocess.PIPE,
381                                    stderr=subprocess.STDOUT,
382                                    universal_newlines=True)
383         out = self._p.stdout.read(9)
384         if out != 'qemu-io> ':
385             # Most probably qemu-io just failed to start.
386             # Let's collect the whole output and exit.
387             out += self._p.stdout.read()
388             self._p.wait(timeout=1)
389             raise ValueError(out)
390 
391     def close(self):
392         self._p.communicate('q\n')
393 
394     def _read_output(self):
395         pattern = 'qemu-io> '
396         n = len(pattern)
397         pos = 0
398         s = []
399         while pos != n:
400             c = self._p.stdout.read(1)
401             # check unexpected EOF
402             assert c != ''
403             s.append(c)
404             if c == pattern[pos]:
405                 pos += 1
406             else:
407                 pos = 0
408 
409         return ''.join(s[:-n])
410 
411     def cmd(self, cmd):
412         # quit command is in close(), '\n' is added automatically
413         assert '\n' not in cmd
414         cmd = cmd.strip()
415         assert cmd not in ('q', 'quit')
416         self._p.stdin.write(cmd + '\n')
417         self._p.stdin.flush()
418         return self._read_output()
419 
420 
421 class QemuStorageDaemon:
422     _qmp: Optional[QEMUMonitorProtocol] = None
423     _qmpsock: Optional[str] = None
424     # Python < 3.8 would complain if this type were not a string literal
425     # (importing `annotations` from `__future__` would work; but not on <= 3.6)
426     _p: 'Optional[subprocess.Popen[bytes]]' = None
427 
428     def __init__(self, *args: str, instance_id: str = 'a', qmp: bool = False):
429         assert '--pidfile' not in args
430         self.pidfile = os.path.join(test_dir, f'qsd-{instance_id}-pid')
431         all_args = [qsd_prog] + list(args) + ['--pidfile', self.pidfile]
432 
433         if qmp:
434             self._qmpsock = os.path.join(sock_dir, f'qsd-{instance_id}.sock')
435             all_args += ['--chardev',
436                          f'socket,id=qmp-sock,path={self._qmpsock}',
437                          '--monitor', 'qmp-sock']
438 
439             self._qmp = QEMUMonitorProtocol(self._qmpsock, server=True)
440 
441         # Cannot use with here, we want the subprocess to stay around
442         # pylint: disable=consider-using-with
443         self._p = subprocess.Popen(all_args)
444         if self._qmp is not None:
445             self._qmp.accept()
446         while not os.path.exists(self.pidfile):
447             if self._p.poll() is not None:
448                 cmd = ' '.join(all_args)
449                 raise RuntimeError(
450                     'qemu-storage-daemon terminated with exit code ' +
451                     f'{self._p.returncode}: {cmd}')
452 
453             time.sleep(0.01)
454 
455         with open(self.pidfile, encoding='utf-8') as f:
456             self._pid = int(f.read().strip())
457 
458         assert self._pid == self._p.pid
459 
460     def qmp(self, cmd: str, args: Optional[Dict[str, object]] = None) \
461             -> QMPMessage:
462         assert self._qmp is not None
463         return self._qmp.cmd_raw(cmd, args)
464 
465     def get_qmp(self) -> QEMUMonitorProtocol:
466         assert self._qmp is not None
467         return self._qmp
468 
469     def cmd(self, cmd: str, args: Optional[Dict[str, object]] = None) \
470             -> QMPReturnValue:
471         assert self._qmp is not None
472         return self._qmp.cmd(cmd, **(args or {}))
473 
474     def stop(self, kill_signal=15):
475         self._p.send_signal(kill_signal)
476         self._p.wait()
477         self._p = None
478 
479         if self._qmp:
480             self._qmp.close()
481 
482         if self._qmpsock is not None:
483             try:
484                 os.remove(self._qmpsock)
485             except OSError:
486                 pass
487         try:
488             os.remove(self.pidfile)
489         except OSError:
490             pass
491 
492     def __del__(self):
493         if self._p is not None:
494             self.stop(kill_signal=9)
495 
496 
497 def qemu_nbd(*args):
498     '''Run qemu-nbd in daemon mode and return the parent's exit code'''
499     return subprocess.call(qemu_nbd_args + ['--fork'] + list(args))
500 
501 def qemu_nbd_early_pipe(*args: str) -> Tuple[int, str]:
502     '''Run qemu-nbd in daemon mode and return both the parent's exit code
503        and its output in case of an error'''
504     full_args = qemu_nbd_args + ['--fork'] + list(args)
505     output, returncode = qemu_tool_pipe_and_status('qemu-nbd', full_args,
506                                                    connect_stderr=False)
507     return returncode, output if returncode else ''
508 
509 def qemu_nbd_list_log(*args: str) -> str:
510     '''Run qemu-nbd to list remote exports'''
511     full_args = [qemu_nbd_prog, '-L'] + list(args)
512     output, _ = qemu_tool_pipe_and_status('qemu-nbd', full_args)
513     log(output, filters=[filter_testfiles, filter_nbd_exports])
514     return output
515 
516 @contextmanager
517 def qemu_nbd_popen(*args):
518     '''Context manager running qemu-nbd within the context'''
519     pid_file = file_path("qemu_nbd_popen-nbd-pid-file")
520 
521     assert not os.path.exists(pid_file)
522 
523     cmd = list(qemu_nbd_args)
524     cmd.extend(('--persistent', '--pid-file', pid_file))
525     cmd.extend(args)
526 
527     log('Start NBD server')
528     with subprocess.Popen(cmd) as p:
529         try:
530             while not os.path.exists(pid_file):
531                 if p.poll() is not None:
532                     raise RuntimeError(
533                         "qemu-nbd terminated with exit code {}: {}"
534                         .format(p.returncode, ' '.join(cmd)))
535 
536                 time.sleep(0.01)
537             yield
538         finally:
539             if os.path.exists(pid_file):
540                 os.remove(pid_file)
541             log('Kill NBD server')
542             p.kill()
543             p.wait()
544 
545 def compare_images(img1: str, img2: str,
546                    fmt1: str = imgfmt, fmt2: str = imgfmt) -> bool:
547     """
548     Compare two images with QEMU_IMG; return True if they are identical.
549 
550     :raise CalledProcessError:
551         when qemu-img crashes or returns a status code of anything other
552         than 0 (identical) or 1 (different).
553     """
554     try:
555         qemu_img('compare', '-f', fmt1, '-F', fmt2, img1, img2)
556         return True
557     except subprocess.CalledProcessError as exc:
558         if exc.returncode == 1:
559             return False
560         raise
561 
562 def create_image(name, size):
563     '''Create a fully-allocated raw image with sector markers'''
564     with open(name, 'wb') as file:
565         i = 0
566         while i < size:
567             sector = struct.pack('>l504xl', i // 512, i // 512)
568             file.write(sector)
569             i = i + 512
570 
571 def image_size(img: str) -> int:
572     """Return image's virtual size"""
573     value = qemu_img_info('-f', imgfmt, img)['virtual-size']
574     if not isinstance(value, int):
575         type_name = type(value).__name__
576         raise TypeError("Expected 'int' for 'virtual-size', "
577                         f"got '{value}' of type '{type_name}'")
578     return value
579 
580 def is_str(val):
581     return isinstance(val, str)
582 
583 test_dir_re = re.compile(r"%s" % test_dir)
584 def filter_test_dir(msg):
585     return test_dir_re.sub("TEST_DIR", msg)
586 
587 win32_re = re.compile(r"\r")
588 def filter_win32(msg):
589     return win32_re.sub("", msg)
590 
591 qemu_io_re = re.compile(r"[0-9]* ops; [0-9\/:. sec]* "
592                         r"\([0-9\/.inf]* [EPTGMKiBbytes]*\/sec "
593                         r"and [0-9\/.inf]* ops\/sec\)")
594 def filter_qemu_io(msg):
595     msg = filter_win32(msg)
596     return qemu_io_re.sub("X ops; XX:XX:XX.X "
597                           "(XXX YYY/sec and XXX ops/sec)", msg)
598 
599 chown_re = re.compile(r"chown [0-9]+:[0-9]+")
600 def filter_chown(msg):
601     return chown_re.sub("chown UID:GID", msg)
602 
603 def filter_qmp_event(event):
604     '''Filter the timestamp of a QMP event dict'''
605     event = dict(event)
606     if 'timestamp' in event:
607         event['timestamp']['seconds'] = 'SECS'
608         event['timestamp']['microseconds'] = 'USECS'
609     return event
610 
611 def filter_block_job(event):
612     '''Filter the offset and length of a QMP block job event dict'''
613     event = dict(event)
614     if 'data' in event:
615         if 'offset' in event['data']:
616             event['data']['offset'] = 'OFFSET'
617         if 'len' in event['data']:
618             event['data']['len'] = 'LEN'
619     return event
620 
621 def filter_qmp(qmsg, filter_fn):
622     '''Given a string filter, filter a QMP object's values.
623     filter_fn takes a (key, value) pair.'''
624     # Iterate through either lists or dicts;
625     if isinstance(qmsg, list):
626         items = enumerate(qmsg)
627     elif isinstance(qmsg, dict):
628         items = qmsg.items()
629     else:
630         return filter_fn(None, qmsg)
631 
632     for k, v in items:
633         if isinstance(v, (dict, list)):
634             qmsg[k] = filter_qmp(v, filter_fn)
635         else:
636             qmsg[k] = filter_fn(k, v)
637     return qmsg
638 
639 def filter_testfiles(msg):
640     pref1 = os.path.join(test_dir, "%s-" % (os.getpid()))
641     pref2 = os.path.join(sock_dir, "%s-" % (os.getpid()))
642     return msg.replace(pref1, 'TEST_DIR/PID-').replace(pref2, 'SOCK_DIR/PID-')
643 
644 def filter_qmp_testfiles(qmsg):
645     def _filter(_key, value):
646         if is_str(value):
647             return filter_testfiles(value)
648         return value
649     return filter_qmp(qmsg, _filter)
650 
651 def filter_virtio_scsi(output: str) -> str:
652     return re.sub(r'(virtio-scsi)-(ccw|pci)', r'\1', output)
653 
654 def filter_qmp_virtio_scsi(qmsg):
655     def _filter(_key, value):
656         if is_str(value):
657             return filter_virtio_scsi(value)
658         return value
659     return filter_qmp(qmsg, _filter)
660 
661 def filter_generated_node_ids(msg):
662     return re.sub("#block[0-9]+", "NODE_NAME", msg)
663 
664 def filter_qmp_generated_node_ids(qmsg):
665     def _filter(_key, value):
666         if is_str(value):
667             return filter_generated_node_ids(value)
668         return value
669     return filter_qmp(qmsg, _filter)
670 
671 def filter_img_info(output: str, filename: str,
672                     drop_child_info: bool = True) -> str:
673     lines = []
674     drop_indented = False
675     for line in output.split('\n'):
676         if 'disk size' in line or 'actual-size' in line:
677             continue
678 
679         # Drop child node info
680         if drop_indented:
681             if line.startswith(' '):
682                 continue
683             drop_indented = False
684         if drop_child_info and "Child node '/" in line:
685             drop_indented = True
686             continue
687 
688         line = line.replace(filename, 'TEST_IMG')
689         line = filter_testfiles(line)
690         line = line.replace(imgfmt, 'IMGFMT')
691         line = re.sub('iters: [0-9]+', 'iters: XXX', line)
692         line = re.sub('uuid: [-a-f0-9]+',
693                       'uuid: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX',
694                       line)
695         line = re.sub('cid: [0-9]+', 'cid: XXXXXXXXXX', line)
696         line = re.sub('(compression type: )(zlib|zstd)', r'\1COMPRESSION_TYPE',
697                       line)
698         lines.append(line)
699     return '\n'.join(lines)
700 
701 def filter_imgfmt(msg):
702     return msg.replace(imgfmt, 'IMGFMT')
703 
704 def filter_qmp_imgfmt(qmsg):
705     def _filter(_key, value):
706         if is_str(value):
707             return filter_imgfmt(value)
708         return value
709     return filter_qmp(qmsg, _filter)
710 
711 def filter_nbd_exports(output: str) -> str:
712     return re.sub(r'((min|opt|max) block): [0-9]+', r'\1: XXX', output)
713 
714 def filter_qtest(output: str) -> str:
715     output = re.sub(r'^\[I \d+\.\d+\] OPENED\n', '', output)
716     output = re.sub(r'\n?\[I \+\d+\.\d+\] CLOSED\n?$', '', output)
717     return output
718 
719 Msg = TypeVar('Msg', Dict[str, Any], List[Any], str)
720 
721 def log(msg: Msg,
722         filters: Iterable[Callable[[Msg], Msg]] = (),
723         indent: Optional[int] = None) -> None:
724     """
725     Logs either a string message or a JSON serializable message (like QMP).
726     If indent is provided, JSON serializable messages are pretty-printed.
727     """
728     for flt in filters:
729         msg = flt(msg)
730     if isinstance(msg, (dict, list)):
731         # Don't sort if it's already sorted
732         do_sort = not isinstance(msg, OrderedDict)
733         test_logger.info(json.dumps(msg, sort_keys=do_sort, indent=indent))
734     else:
735         test_logger.info(msg)
736 
737 class Timeout:
738     def __init__(self, seconds, errmsg="Timeout"):
739         self.seconds = seconds
740         self.errmsg = errmsg
741     def __enter__(self):
742         if qemu_gdb or qemu_valgrind:
743             return self
744         signal.signal(signal.SIGALRM, self.timeout)
745         signal.setitimer(signal.ITIMER_REAL, self.seconds)
746         return self
747     def __exit__(self, exc_type, value, traceback):
748         if qemu_gdb or qemu_valgrind:
749             return False
750         signal.setitimer(signal.ITIMER_REAL, 0)
751         return False
752     def timeout(self, signum, frame):
753         raise TimeoutError(self.errmsg)
754 
755 def file_pattern(name):
756     return "{0}-{1}".format(os.getpid(), name)
757 
758 class FilePath:
759     """
760     Context manager generating multiple file names. The generated files are
761     removed when exiting the context.
762 
763     Example usage:
764 
765         with FilePath('a.img', 'b.img') as (img_a, img_b):
766             # Use img_a and img_b here...
767 
768         # a.img and b.img are automatically removed here.
769 
770     By default images are created in iotests.test_dir. To create sockets use
771     iotests.sock_dir:
772 
773        with FilePath('a.sock', base_dir=iotests.sock_dir) as sock:
774 
775     For convenience, calling with one argument yields a single file instead of
776     a tuple with one item.
777 
778     """
779     def __init__(self, *names, base_dir=test_dir):
780         self.paths = [os.path.join(base_dir, file_pattern(name))
781                       for name in names]
782 
783     def __enter__(self):
784         if len(self.paths) == 1:
785             return self.paths[0]
786         else:
787             return self.paths
788 
789     def __exit__(self, exc_type, exc_val, exc_tb):
790         for path in self.paths:
791             try:
792                 os.remove(path)
793             except OSError:
794                 pass
795         return False
796 
797 
798 def try_remove(img):
799     try:
800         os.remove(img)
801     except OSError:
802         pass
803 
804 def file_path_remover():
805     for path in reversed(file_path_remover.paths):
806         try_remove(path)
807 
808 
809 def file_path(*names, base_dir=test_dir):
810     ''' Another way to get auto-generated filename that cleans itself up.
811 
812     Use is as simple as:
813 
814     img_a, img_b = file_path('a.img', 'b.img')
815     sock = file_path('socket')
816     '''
817 
818     if not hasattr(file_path_remover, 'paths'):
819         file_path_remover.paths = []
820         atexit.register(file_path_remover)
821 
822     paths = []
823     for name in names:
824         filename = file_pattern(name)
825         path = os.path.join(base_dir, filename)
826         file_path_remover.paths.append(path)
827         paths.append(path)
828 
829     return paths[0] if len(paths) == 1 else paths
830 
831 def remote_filename(path):
832     if imgproto == 'file':
833         return path
834     elif imgproto == 'ssh':
835         return "ssh://%s@127.0.0.1:22%s" % (os.environ.get('USER'), path)
836     else:
837         raise ValueError("Protocol %s not supported" % (imgproto))
838 
839 class VM(qtest.QEMUQtestMachine):
840     '''A QEMU VM'''
841 
842     def __init__(self, path_suffix=''):
843         name = "qemu%s-%d" % (path_suffix, os.getpid())
844         timer = 15.0 if not (qemu_gdb or qemu_valgrind) else None
845         if qemu_gdb and qemu_valgrind:
846             sys.stderr.write('gdb and valgrind are mutually exclusive\n')
847             sys.exit(1)
848         wrapper = qemu_gdb if qemu_gdb else qemu_valgrind
849         super().__init__(qemu_prog, qemu_opts, wrapper=wrapper,
850                          name=name,
851                          base_temp_dir=test_dir,
852                          qmp_timer=timer)
853         self._num_drives = 0
854 
855     def _post_shutdown(self) -> None:
856         super()._post_shutdown()
857         if not qemu_valgrind or not self._popen:
858             return
859         valgrind_filename = f"{test_dir}/{self._popen.pid}.valgrind"
860         if self.exitcode() == 99:
861             with open(valgrind_filename, encoding='utf-8') as f:
862                 print(f.read())
863         else:
864             os.remove(valgrind_filename)
865 
866     def _pre_launch(self) -> None:
867         super()._pre_launch()
868         if qemu_print:
869             # set QEMU binary output to stdout
870             self._close_qemu_log_file()
871 
872     def add_object(self, opts):
873         self._args.append('-object')
874         self._args.append(opts)
875         return self
876 
877     def add_device(self, opts):
878         self._args.append('-device')
879         self._args.append(opts)
880         return self
881 
882     def add_drive_raw(self, opts):
883         self._args.append('-drive')
884         self._args.append(opts)
885         return self
886 
887     def add_drive(self, path, opts='', interface='virtio', img_format=imgfmt):
888         '''Add a virtio-blk drive to the VM'''
889         options = ['if=%s' % interface,
890                    'id=drive%d' % self._num_drives]
891 
892         if path is not None:
893             options.append('file=%s' % path)
894             options.append('format=%s' % img_format)
895             options.append('cache=%s' % cachemode)
896             options.append('aio=%s' % aiomode)
897 
898         if opts:
899             options.append(opts)
900 
901         if img_format == 'luks' and 'key-secret' not in opts:
902             # default luks support
903             if luks_default_secret_object not in self._args:
904                 self.add_object(luks_default_secret_object)
905 
906             options.append(luks_default_key_secret_opt)
907 
908         self._args.append('-drive')
909         self._args.append(','.join(options))
910         self._num_drives += 1
911         return self
912 
913     def add_blockdev(self, opts):
914         self._args.append('-blockdev')
915         if isinstance(opts, str):
916             self._args.append(opts)
917         else:
918             self._args.append(','.join(opts))
919         return self
920 
921     def add_incoming(self, addr):
922         self._args.append('-incoming')
923         self._args.append(addr)
924         return self
925 
926     def add_paused(self):
927         self._args.append('-S')
928         return self
929 
930     def hmp(self, command_line: str, use_log: bool = False) -> QMPMessage:
931         cmd = 'human-monitor-command'
932         kwargs: Dict[str, Any] = {'command-line': command_line}
933         if use_log:
934             return self.qmp_log(cmd, **kwargs)
935         else:
936             return self.qmp(cmd, **kwargs)
937 
938     def pause_drive(self, drive: str, event: Optional[str] = None) -> None:
939         """Pause drive r/w operations"""
940         if not event:
941             self.pause_drive(drive, "read_aio")
942             self.pause_drive(drive, "write_aio")
943             return
944         self.hmp(f'qemu-io {drive} "break {event} bp_{drive}"')
945 
946     def resume_drive(self, drive: str) -> None:
947         """Resume drive r/w operations"""
948         self.hmp(f'qemu-io {drive} "remove_break bp_{drive}"')
949 
950     def hmp_qemu_io(self, drive: str, cmd: str,
951                     use_log: bool = False, qdev: bool = False) -> QMPMessage:
952         """Write to a given drive using an HMP command"""
953         d = '-d ' if qdev else ''
954         return self.hmp(f'qemu-io {d}{drive} "{cmd}"', use_log=use_log)
955 
956     def flatten_qmp_object(self, obj, output=None, basestr=''):
957         if output is None:
958             output = {}
959         if isinstance(obj, list):
960             for i, item in enumerate(obj):
961                 self.flatten_qmp_object(item, output, basestr + str(i) + '.')
962         elif isinstance(obj, dict):
963             for key in obj:
964                 self.flatten_qmp_object(obj[key], output, basestr + key + '.')
965         else:
966             output[basestr[:-1]] = obj # Strip trailing '.'
967         return output
968 
969     def qmp_to_opts(self, obj):
970         obj = self.flatten_qmp_object(obj)
971         output_list = []
972         for key in obj:
973             output_list += [key + '=' + obj[key]]
974         return ','.join(output_list)
975 
976     def get_qmp_events_filtered(self, wait=60.0):
977         result = []
978         for ev in self.get_qmp_events(wait=wait):
979             result.append(filter_qmp_event(ev))
980         return result
981 
982     def qmp_log(self, cmd, filters=(), indent=None, **kwargs):
983         full_cmd = OrderedDict((
984             ("execute", cmd),
985             ("arguments", ordered_qmp(kwargs))
986         ))
987         log(full_cmd, filters, indent=indent)
988         result = self.qmp(cmd, **kwargs)
989         log(result, filters, indent=indent)
990         return result
991 
992     # Returns None on success, and an error string on failure
993     def run_job(self, job: str, auto_finalize: bool = True,
994                 auto_dismiss: bool = False,
995                 pre_finalize: Optional[Callable[[], None]] = None,
996                 cancel: bool = False, wait: float = 60.0,
997                 filters: Iterable[Callable[[Any], Any]] = (),
998                 ) -> Optional[str]:
999         """
1000         run_job moves a job from creation through to dismissal.
1001 
1002         :param job: String. ID of recently-launched job
1003         :param auto_finalize: Bool. True if the job was launched with
1004                               auto_finalize. Defaults to True.
1005         :param auto_dismiss: Bool. True if the job was launched with
1006                              auto_dismiss=True. Defaults to False.
1007         :param pre_finalize: Callback. A callable that takes no arguments to be
1008                              invoked prior to issuing job-finalize, if any.
1009         :param cancel: Bool. When true, cancels the job after the pre_finalize
1010                        callback.
1011         :param wait: Float. Timeout value specifying how long to wait for any
1012                      event, in seconds. Defaults to 60.0.
1013         """
1014         match_device = {'data': {'device': job}}
1015         match_id = {'data': {'id': job}}
1016         events = [
1017             ('BLOCK_JOB_COMPLETED', match_device),
1018             ('BLOCK_JOB_CANCELLED', match_device),
1019             ('BLOCK_JOB_ERROR', match_device),
1020             ('BLOCK_JOB_READY', match_device),
1021             ('BLOCK_JOB_PENDING', match_id),
1022             ('JOB_STATUS_CHANGE', match_id)
1023         ]
1024         error = None
1025         while True:
1026             ev = filter_qmp_event(self.events_wait(events, timeout=wait))
1027             if ev['event'] != 'JOB_STATUS_CHANGE':
1028                 log(ev, filters=filters)
1029                 continue
1030             status = ev['data']['status']
1031             if status == 'aborting':
1032                 result = self.qmp('query-jobs')
1033                 for j in result['return']:
1034                     if j['id'] == job:
1035                         error = j['error']
1036                         log('Job failed: %s' % (j['error']), filters=filters)
1037             elif status == 'ready':
1038                 self.qmp_log('job-complete', id=job, filters=filters)
1039             elif status == 'pending' and not auto_finalize:
1040                 if pre_finalize:
1041                     pre_finalize()
1042                 if cancel:
1043                     self.qmp_log('job-cancel', id=job, filters=filters)
1044                 else:
1045                     self.qmp_log('job-finalize', id=job, filters=filters)
1046             elif status == 'concluded' and not auto_dismiss:
1047                 self.qmp_log('job-dismiss', id=job, filters=filters)
1048             elif status == 'null':
1049                 return error
1050 
1051     # Returns None on success, and an error string on failure
1052     def blockdev_create(self, options, job_id='job0', filters=None):
1053         if filters is None:
1054             filters = [filter_qmp_testfiles]
1055         result = self.qmp_log('blockdev-create', filters=filters,
1056                               job_id=job_id, options=options)
1057 
1058         if 'return' in result:
1059             assert result['return'] == {}
1060             job_result = self.run_job(job_id, filters=filters)
1061         else:
1062             job_result = result['error']
1063 
1064         log("")
1065         return job_result
1066 
1067     def enable_migration_events(self, name):
1068         log('Enabling migration QMP events on %s...' % name)
1069         log(self.qmp('migrate-set-capabilities', capabilities=[
1070             {
1071                 'capability': 'events',
1072                 'state': True
1073             }
1074         ]))
1075 
1076     def wait_migration(self, expect_runstate: Optional[str]) -> bool:
1077         while True:
1078             event = self.event_wait('MIGRATION')
1079             # We use the default timeout, and with a timeout, event_wait()
1080             # never returns None
1081             assert event
1082 
1083             log(event, filters=[filter_qmp_event])
1084             if event['data']['status'] in ('completed', 'failed'):
1085                 break
1086 
1087         if event['data']['status'] == 'completed':
1088             # The event may occur in finish-migrate, so wait for the expected
1089             # post-migration runstate
1090             runstate = None
1091             while runstate != expect_runstate:
1092                 runstate = self.qmp('query-status')['return']['status']
1093             return True
1094         else:
1095             return False
1096 
1097     def node_info(self, node_name):
1098         nodes = self.qmp('query-named-block-nodes')
1099         for x in nodes['return']:
1100             if x['node-name'] == node_name:
1101                 return x
1102         return None
1103 
1104     def query_bitmaps(self):
1105         res = self.qmp("query-named-block-nodes")
1106         return {device['node-name']: device['dirty-bitmaps']
1107                 for device in res['return'] if 'dirty-bitmaps' in device}
1108 
1109     def get_bitmap(self, node_name, bitmap_name, recording=None, bitmaps=None):
1110         """
1111         get a specific bitmap from the object returned by query_bitmaps.
1112         :param recording: If specified, filter results by the specified value.
1113         :param bitmaps: If specified, use it instead of call query_bitmaps()
1114         """
1115         if bitmaps is None:
1116             bitmaps = self.query_bitmaps()
1117 
1118         for bitmap in bitmaps[node_name]:
1119             if bitmap.get('name', '') == bitmap_name:
1120                 if recording is None or bitmap.get('recording') == recording:
1121                     return bitmap
1122         return None
1123 
1124     def check_bitmap_status(self, node_name, bitmap_name, fields):
1125         ret = self.get_bitmap(node_name, bitmap_name)
1126 
1127         return fields.items() <= ret.items()
1128 
1129     def assert_block_path(self, root, path, expected_node, graph=None):
1130         """
1131         Check whether the node under the given path in the block graph
1132         is @expected_node.
1133 
1134         @root is the node name of the node where the @path is rooted.
1135 
1136         @path is a string that consists of child names separated by
1137         slashes.  It must begin with a slash.
1138 
1139         Examples for @root + @path:
1140           - root="qcow2-node", path="/backing/file"
1141           - root="quorum-node", path="/children.2/file"
1142 
1143         Hypothetically, @path could be empty, in which case it would
1144         point to @root.  However, in practice this case is not useful
1145         and hence not allowed.
1146 
1147         @expected_node may be None.  (All elements of the path but the
1148         leaf must still exist.)
1149 
1150         @graph may be None or the result of an x-debug-query-block-graph
1151         call that has already been performed.
1152         """
1153         if graph is None:
1154             graph = self.qmp('x-debug-query-block-graph')['return']
1155 
1156         iter_path = iter(path.split('/'))
1157 
1158         # Must start with a /
1159         assert next(iter_path) == ''
1160 
1161         node = next((node for node in graph['nodes'] if node['name'] == root),
1162                     None)
1163 
1164         # An empty @path is not allowed, so the root node must be present
1165         assert node is not None, 'Root node %s not found' % root
1166 
1167         for child_name in iter_path:
1168             assert node is not None, 'Cannot follow path %s%s' % (root, path)
1169 
1170             try:
1171                 node_id = next(edge['child'] for edge in graph['edges']
1172                                if (edge['parent'] == node['id'] and
1173                                    edge['name'] == child_name))
1174 
1175                 node = next(node for node in graph['nodes']
1176                             if node['id'] == node_id)
1177 
1178             except StopIteration:
1179                 node = None
1180 
1181         if node is None:
1182             assert expected_node is None, \
1183                    'No node found under %s (but expected %s)' % \
1184                    (path, expected_node)
1185         else:
1186             assert node['name'] == expected_node, \
1187                    'Found node %s under %s (but expected %s)' % \
1188                    (node['name'], path, expected_node)
1189 
1190 index_re = re.compile(r'([^\[]+)\[([^\]]+)\]')
1191 
1192 class QMPTestCase(unittest.TestCase):
1193     '''Abstract base class for QMP test cases'''
1194 
1195     def __init__(self, *args, **kwargs):
1196         super().__init__(*args, **kwargs)
1197         # Many users of this class set a VM property we rely on heavily
1198         # in the methods below.
1199         self.vm = None
1200 
1201     def dictpath(self, d, path):
1202         '''Traverse a path in a nested dict'''
1203         for component in path.split('/'):
1204             m = index_re.match(component)
1205             if m:
1206                 component, idx = m.groups()
1207                 idx = int(idx)
1208 
1209             if not isinstance(d, dict) or component not in d:
1210                 self.fail(f'failed path traversal for "{path}" in "{d}"')
1211             d = d[component]
1212 
1213             if m:
1214                 if not isinstance(d, list):
1215                     self.fail(f'path component "{component}" in "{path}" '
1216                               f'is not a list in "{d}"')
1217                 try:
1218                     d = d[idx]
1219                 except IndexError:
1220                     self.fail(f'invalid index "{idx}" in path "{path}" '
1221                               f'in "{d}"')
1222         return d
1223 
1224     def assert_qmp_absent(self, d, path):
1225         try:
1226             result = self.dictpath(d, path)
1227         except AssertionError:
1228             return
1229         self.fail('path "%s" has value "%s"' % (path, str(result)))
1230 
1231     def assert_qmp(self, d, path, value):
1232         '''Assert that the value for a specific path in a QMP dict
1233            matches.  When given a list of values, assert that any of
1234            them matches.'''
1235 
1236         result = self.dictpath(d, path)
1237 
1238         # [] makes no sense as a list of valid values, so treat it as
1239         # an actual single value.
1240         if isinstance(value, list) and value != []:
1241             for v in value:
1242                 if result == v:
1243                     return
1244             self.fail('no match for "%s" in %s' % (str(result), str(value)))
1245         else:
1246             self.assertEqual(result, value,
1247                              '"%s" is "%s", expected "%s"'
1248                              % (path, str(result), str(value)))
1249 
1250     def assert_no_active_block_jobs(self):
1251         result = self.vm.qmp('query-block-jobs')
1252         self.assert_qmp(result, 'return', [])
1253 
1254     def assert_has_block_node(self, node_name=None, file_name=None):
1255         """Issue a query-named-block-nodes and assert node_name and/or
1256         file_name is present in the result"""
1257         def check_equal_or_none(a, b):
1258             return a is None or b is None or a == b
1259         assert node_name or file_name
1260         result = self.vm.qmp('query-named-block-nodes')
1261         for x in result["return"]:
1262             if check_equal_or_none(x.get("node-name"), node_name) and \
1263                     check_equal_or_none(x.get("file"), file_name):
1264                 return
1265         self.fail("Cannot find %s %s in result:\n%s" %
1266                   (node_name, file_name, result))
1267 
1268     def assert_json_filename_equal(self, json_filename, reference):
1269         '''Asserts that the given filename is a json: filename and that its
1270            content is equal to the given reference object'''
1271         self.assertEqual(json_filename[:5], 'json:')
1272         self.assertEqual(
1273             self.vm.flatten_qmp_object(json.loads(json_filename[5:])),
1274             self.vm.flatten_qmp_object(reference)
1275         )
1276 
1277     def cancel_and_wait(self, drive='drive0', force=False,
1278                         resume=False, wait=60.0):
1279         '''Cancel a block job and wait for it to finish, returning the event'''
1280         self.vm.cmd('block-job-cancel', device=drive, force=force)
1281 
1282         if resume:
1283             self.vm.resume_drive(drive)
1284 
1285         cancelled = False
1286         result = None
1287         while not cancelled:
1288             for event in self.vm.get_qmp_events(wait=wait):
1289                 if event['event'] == 'BLOCK_JOB_COMPLETED' or \
1290                    event['event'] == 'BLOCK_JOB_CANCELLED':
1291                     self.assert_qmp(event, 'data/device', drive)
1292                     result = event
1293                     cancelled = True
1294                 elif event['event'] == 'JOB_STATUS_CHANGE':
1295                     self.assert_qmp(event, 'data/id', drive)
1296 
1297 
1298         self.assert_no_active_block_jobs()
1299         return result
1300 
1301     def wait_until_completed(self, drive='drive0', check_offset=True,
1302                              wait=60.0, error=None):
1303         '''Wait for a block job to finish, returning the event'''
1304         while True:
1305             for event in self.vm.get_qmp_events(wait=wait):
1306                 if event['event'] == 'BLOCK_JOB_COMPLETED':
1307                     self.assert_qmp(event, 'data/device', drive)
1308                     if error is None:
1309                         self.assert_qmp_absent(event, 'data/error')
1310                         if check_offset:
1311                             self.assert_qmp(event, 'data/offset',
1312                                             event['data']['len'])
1313                     else:
1314                         self.assert_qmp(event, 'data/error', error)
1315                     self.assert_no_active_block_jobs()
1316                     return event
1317                 if event['event'] == 'JOB_STATUS_CHANGE':
1318                     self.assert_qmp(event, 'data/id', drive)
1319 
1320     def wait_ready(self, drive='drive0'):
1321         """Wait until a BLOCK_JOB_READY event, and return the event."""
1322         return self.vm.events_wait([
1323             ('BLOCK_JOB_READY',
1324              {'data': {'type': 'mirror', 'device': drive}}),
1325             ('BLOCK_JOB_READY',
1326              {'data': {'type': 'commit', 'device': drive}})
1327         ])
1328 
1329     def wait_ready_and_cancel(self, drive='drive0'):
1330         self.wait_ready(drive=drive)
1331         event = self.cancel_and_wait(drive=drive)
1332         self.assertEqual(event['event'], 'BLOCK_JOB_COMPLETED')
1333         self.assert_qmp(event, 'data/type', 'mirror')
1334         self.assert_qmp(event, 'data/offset', event['data']['len'])
1335 
1336     def complete_and_wait(self, drive='drive0', wait_ready=True,
1337                           completion_error=None):
1338         '''Complete a block job and wait for it to finish'''
1339         if wait_ready:
1340             self.wait_ready(drive=drive)
1341 
1342         self.vm.cmd('block-job-complete', device=drive)
1343 
1344         event = self.wait_until_completed(drive=drive, error=completion_error)
1345         self.assertTrue(event['data']['type'] in ['mirror', 'commit'])
1346 
1347     def pause_wait(self, job_id='job0'):
1348         with Timeout(3, "Timeout waiting for job to pause"):
1349             while True:
1350                 result = self.vm.qmp('query-block-jobs')
1351                 found = False
1352                 for job in result['return']:
1353                     if job['device'] == job_id:
1354                         found = True
1355                         if job['paused'] and not job['busy']:
1356                             return job
1357                         break
1358                 assert found
1359 
1360     def pause_job(self, job_id='job0', wait=True):
1361         self.vm.cmd('block-job-pause', device=job_id)
1362         if wait:
1363             self.pause_wait(job_id)
1364 
1365     def case_skip(self, reason):
1366         '''Skip this test case'''
1367         case_notrun(reason)
1368         self.skipTest(reason)
1369 
1370 
1371 def notrun(reason):
1372     '''Skip this test suite'''
1373     # Each test in qemu-iotests has a number ("seq")
1374     seq = os.path.basename(sys.argv[0])
1375 
1376     with open('%s/%s.notrun' % (test_dir, seq), 'w', encoding='utf-8') \
1377             as outfile:
1378         outfile.write(reason + '\n')
1379     logger.warning("%s not run: %s", seq, reason)
1380     sys.exit(0)
1381 
1382 def case_notrun(reason):
1383     '''Mark this test case as not having been run (without actually
1384     skipping it, that is left to the caller).  See
1385     QMPTestCase.case_skip() for a variant that actually skips the
1386     current test case.'''
1387 
1388     # Each test in qemu-iotests has a number ("seq")
1389     seq = os.path.basename(sys.argv[0])
1390 
1391     with open('%s/%s.casenotrun' % (test_dir, seq), 'a', encoding='utf-8') \
1392             as outfile:
1393         outfile.write('    [case not run] ' + reason + '\n')
1394 
1395 def _verify_image_format(supported_fmts: Sequence[str] = (),
1396                          unsupported_fmts: Sequence[str] = ()) -> None:
1397     if 'generic' in supported_fmts and \
1398             os.environ.get('IMGFMT_GENERIC', 'true') == 'true':
1399         # similar to
1400         #   _supported_fmt generic
1401         # for bash tests
1402         supported_fmts = ()
1403 
1404     not_sup = supported_fmts and (imgfmt not in supported_fmts)
1405     if not_sup or (imgfmt in unsupported_fmts):
1406         notrun('not suitable for this image format: %s' % imgfmt)
1407 
1408     if imgfmt == 'luks':
1409         verify_working_luks()
1410 
1411 def _verify_protocol(supported: Sequence[str] = (),
1412                      unsupported: Sequence[str] = ()) -> None:
1413     assert not (supported and unsupported)
1414 
1415     if 'generic' in supported:
1416         return
1417 
1418     not_sup = supported and (imgproto not in supported)
1419     if not_sup or (imgproto in unsupported):
1420         notrun('not suitable for this protocol: %s' % imgproto)
1421 
1422 def _verify_platform(supported: Sequence[str] = (),
1423                      unsupported: Sequence[str] = ()) -> None:
1424     if any((sys.platform.startswith(x) for x in unsupported)):
1425         notrun('not suitable for this OS: %s' % sys.platform)
1426 
1427     if supported:
1428         if not any((sys.platform.startswith(x) for x in supported)):
1429             notrun('not suitable for this OS: %s' % sys.platform)
1430 
1431 def _verify_cache_mode(supported_cache_modes: Sequence[str] = ()) -> None:
1432     if supported_cache_modes and (cachemode not in supported_cache_modes):
1433         notrun('not suitable for this cache mode: %s' % cachemode)
1434 
1435 def _verify_aio_mode(supported_aio_modes: Sequence[str] = ()) -> None:
1436     if supported_aio_modes and (aiomode not in supported_aio_modes):
1437         notrun('not suitable for this aio mode: %s' % aiomode)
1438 
1439 def _verify_formats(required_formats: Sequence[str] = ()) -> None:
1440     usf_list = list(set(required_formats) - set(supported_formats()))
1441     if usf_list:
1442         notrun(f'formats {usf_list} are not whitelisted')
1443 
1444 
1445 def _verify_virtio_blk() -> None:
1446     out = qemu_pipe('-M', 'none', '-device', 'help')
1447     if 'virtio-blk' not in out:
1448         notrun('Missing virtio-blk in QEMU binary')
1449 
1450 def verify_virtio_scsi_pci_or_ccw() -> None:
1451     out = qemu_pipe('-M', 'none', '-device', 'help')
1452     if 'virtio-scsi-pci' not in out and 'virtio-scsi-ccw' not in out:
1453         notrun('Missing virtio-scsi-pci or virtio-scsi-ccw in QEMU binary')
1454 
1455 
1456 def _verify_imgopts(unsupported: Sequence[str] = ()) -> None:
1457     imgopts = os.environ.get('IMGOPTS')
1458     # One of usage examples for IMGOPTS is "data_file=$TEST_IMG.ext_data_file"
1459     # but it supported only for bash tests. We don't have a concept of global
1460     # TEST_IMG in iotests.py, not saying about somehow parsing $variables.
1461     # So, for simplicity let's just not support any IMGOPTS with '$' inside.
1462     unsup = list(unsupported) + ['$']
1463     if imgopts and any(x in imgopts for x in unsup):
1464         notrun(f'not suitable for this imgopts: {imgopts}')
1465 
1466 
1467 def supports_quorum() -> bool:
1468     return 'quorum' in qemu_img('--help').stdout
1469 
1470 def verify_quorum():
1471     '''Skip test suite if quorum support is not available'''
1472     if not supports_quorum():
1473         notrun('quorum support missing')
1474 
1475 def has_working_luks() -> Tuple[bool, str]:
1476     """
1477     Check whether our LUKS driver can actually create images
1478     (this extends to LUKS encryption for qcow2).
1479 
1480     If not, return the reason why.
1481     """
1482 
1483     img_file = f'{test_dir}/luks-test.luks'
1484     res = qemu_img('create', '-f', 'luks',
1485                    '--object', luks_default_secret_object,
1486                    '-o', luks_default_key_secret_opt,
1487                    '-o', 'iter-time=10',
1488                    img_file, '1G',
1489                    check=False)
1490     try:
1491         os.remove(img_file)
1492     except OSError:
1493         pass
1494 
1495     if res.returncode:
1496         reason = res.stdout
1497         for line in res.stdout.splitlines():
1498             if img_file + ':' in line:
1499                 reason = line.split(img_file + ':', 1)[1].strip()
1500                 break
1501 
1502         return (False, reason)
1503     else:
1504         return (True, '')
1505 
1506 def verify_working_luks():
1507     """
1508     Skip test suite if LUKS does not work
1509     """
1510     (working, reason) = has_working_luks()
1511     if not working:
1512         notrun(reason)
1513 
1514 def supports_qcow2_zstd_compression() -> bool:
1515     img_file = f'{test_dir}/qcow2-zstd-test.qcow2'
1516     res = qemu_img('create', '-f', 'qcow2', '-o', 'compression_type=zstd',
1517                    img_file, '0',
1518                    check=False)
1519     try:
1520         os.remove(img_file)
1521     except OSError:
1522         pass
1523 
1524     if res.returncode == 1 and \
1525             "'compression-type' does not accept value 'zstd'" in res.stdout:
1526         return False
1527     else:
1528         return True
1529 
1530 def verify_qcow2_zstd_compression():
1531     if not supports_qcow2_zstd_compression():
1532         notrun('zstd compression not supported')
1533 
1534 def qemu_pipe(*args: str) -> str:
1535     """
1536     Run qemu with an option to print something and exit (e.g. a help option).
1537 
1538     :return: QEMU's stdout output.
1539     """
1540     full_args = [qemu_prog] + qemu_opts + list(args)
1541     output, _ = qemu_tool_pipe_and_status('qemu', full_args)
1542     return output
1543 
1544 def supported_formats(read_only=False):
1545     '''Set 'read_only' to True to check ro-whitelist
1546        Otherwise, rw-whitelist is checked'''
1547 
1548     if not hasattr(supported_formats, "formats"):
1549         supported_formats.formats = {}
1550 
1551     if read_only not in supported_formats.formats:
1552         format_message = qemu_pipe("-drive", "format=help")
1553         line = 1 if read_only else 0
1554         supported_formats.formats[read_only] = \
1555             format_message.splitlines()[line].split(":")[1].split()
1556 
1557     return supported_formats.formats[read_only]
1558 
1559 def skip_if_unsupported(required_formats=(), read_only=False):
1560     '''Skip Test Decorator
1561        Runs the test if all the required formats are whitelisted'''
1562     def skip_test_decorator(func):
1563         def func_wrapper(test_case: QMPTestCase, *args: List[Any],
1564                          **kwargs: Dict[str, Any]) -> None:
1565             if callable(required_formats):
1566                 fmts = required_formats(test_case)
1567             else:
1568                 fmts = required_formats
1569 
1570             usf_list = list(set(fmts) - set(supported_formats(read_only)))
1571             if usf_list:
1572                 msg = f'{test_case}: formats {usf_list} are not whitelisted'
1573                 test_case.case_skip(msg)
1574             else:
1575                 func(test_case, *args, **kwargs)
1576         return func_wrapper
1577     return skip_test_decorator
1578 
1579 def skip_for_formats(formats: Sequence[str] = ()) \
1580     -> Callable[[Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]],
1581                 Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]]:
1582     '''Skip Test Decorator
1583        Skips the test for the given formats'''
1584     def skip_test_decorator(func):
1585         def func_wrapper(test_case: QMPTestCase, *args: List[Any],
1586                          **kwargs: Dict[str, Any]) -> None:
1587             if imgfmt in formats:
1588                 msg = f'{test_case}: Skipped for format {imgfmt}'
1589                 test_case.case_skip(msg)
1590             else:
1591                 func(test_case, *args, **kwargs)
1592         return func_wrapper
1593     return skip_test_decorator
1594 
1595 def skip_if_user_is_root(func):
1596     '''Skip Test Decorator
1597        Runs the test only without root permissions'''
1598     def func_wrapper(*args, **kwargs):
1599         if os.getuid() == 0:
1600             case_notrun('{}: cannot be run as root'.format(args[0]))
1601             return None
1602         else:
1603             return func(*args, **kwargs)
1604     return func_wrapper
1605 
1606 # We need to filter out the time taken from the output so that
1607 # qemu-iotest can reliably diff the results against master output,
1608 # and hide skipped tests from the reference output.
1609 
1610 class ReproducibleTestResult(unittest.TextTestResult):
1611     def addSkip(self, test, reason):
1612         # Same as TextTestResult, but print dot instead of "s"
1613         unittest.TestResult.addSkip(self, test, reason)
1614         if self.showAll:
1615             self.stream.writeln("skipped {0!r}".format(reason))
1616         elif self.dots:
1617             self.stream.write(".")
1618             self.stream.flush()
1619 
1620 class ReproducibleStreamWrapper:
1621     def __init__(self, stream: TextIO):
1622         self.stream = stream
1623 
1624     def __getattr__(self, attr):
1625         if attr in ('stream', '__getstate__'):
1626             raise AttributeError(attr)
1627         return getattr(self.stream, attr)
1628 
1629     def write(self, arg=None):
1630         arg = re.sub(r'Ran (\d+) tests? in [\d.]+s', r'Ran \1 tests', arg)
1631         arg = re.sub(r' \(skipped=\d+\)', r'', arg)
1632         self.stream.write(arg)
1633 
1634 class ReproducibleTestRunner(unittest.TextTestRunner):
1635     def __init__(
1636         self,
1637         stream: Optional[TextIO] = None,
1638         resultclass: Type[unittest.TextTestResult] =
1639         ReproducibleTestResult,
1640         **kwargs: Any
1641     ) -> None:
1642         rstream = ReproducibleStreamWrapper(stream or sys.stdout)
1643         super().__init__(stream=rstream,           # type: ignore
1644                          descriptions=True,
1645                          resultclass=resultclass,
1646                          **kwargs)
1647 
1648 def execute_unittest(argv: List[str], debug: bool = False) -> None:
1649     """Executes unittests within the calling module."""
1650 
1651     # Some tests have warnings, especially ResourceWarnings for unclosed
1652     # files and sockets.  Ignore them for now to ensure reproducibility of
1653     # the test output.
1654     unittest.main(argv=argv,
1655                   testRunner=ReproducibleTestRunner,
1656                   verbosity=2 if debug else 1,
1657                   warnings=None if sys.warnoptions else 'ignore')
1658 
1659 def execute_setup_common(supported_fmts: Sequence[str] = (),
1660                          supported_platforms: Sequence[str] = (),
1661                          supported_cache_modes: Sequence[str] = (),
1662                          supported_aio_modes: Sequence[str] = (),
1663                          unsupported_fmts: Sequence[str] = (),
1664                          supported_protocols: Sequence[str] = (),
1665                          unsupported_protocols: Sequence[str] = (),
1666                          required_fmts: Sequence[str] = (),
1667                          unsupported_imgopts: Sequence[str] = ()) -> bool:
1668     """
1669     Perform necessary setup for either script-style or unittest-style tests.
1670 
1671     :return: Bool; Whether or not debug mode has been requested via the CLI.
1672     """
1673     # Note: Python 3.6 and pylint do not like 'Collection' so use 'Sequence'.
1674 
1675     debug = '-d' in sys.argv
1676     if debug:
1677         sys.argv.remove('-d')
1678     logging.basicConfig(level=(logging.DEBUG if debug else logging.WARN))
1679 
1680     _verify_image_format(supported_fmts, unsupported_fmts)
1681     _verify_protocol(supported_protocols, unsupported_protocols)
1682     _verify_platform(supported=supported_platforms)
1683     _verify_cache_mode(supported_cache_modes)
1684     _verify_aio_mode(supported_aio_modes)
1685     _verify_formats(required_fmts)
1686     _verify_virtio_blk()
1687     _verify_imgopts(unsupported_imgopts)
1688 
1689     return debug
1690 
1691 def execute_test(*args, test_function=None, **kwargs):
1692     """Run either unittest or script-style tests."""
1693 
1694     debug = execute_setup_common(*args, **kwargs)
1695     if not test_function:
1696         execute_unittest(sys.argv, debug)
1697     else:
1698         test_function()
1699 
1700 def activate_logging():
1701     """Activate iotests.log() output to stdout for script-style tests."""
1702     handler = logging.StreamHandler(stream=sys.stdout)
1703     formatter = logging.Formatter('%(message)s')
1704     handler.setFormatter(formatter)
1705     test_logger.addHandler(handler)
1706     test_logger.setLevel(logging.INFO)
1707     test_logger.propagate = False
1708 
1709 # This is called from script-style iotests without a single point of entry
1710 def script_initialize(*args, **kwargs):
1711     """Initialize script-style tests without running any tests."""
1712     activate_logging()
1713     execute_setup_common(*args, **kwargs)
1714 
1715 # This is called from script-style iotests with a single point of entry
1716 def script_main(test_function, *args, **kwargs):
1717     """Run script-style tests outside of the unittest framework"""
1718     activate_logging()
1719     execute_test(*args, test_function=test_function, **kwargs)
1720 
1721 # This is called from unittest style iotests
1722 def main(*args, **kwargs):
1723     """Run tests using the unittest framework"""
1724     execute_test(*args, **kwargs)
1725