Lines Matching +full:0 +full:- +full:9 +full:a +full:- +full:f
1 # Common utilities and Python wrappers for qemu-iotests
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # You should have received a copy of the GNU General Public License
56 qemu_img_args = [os.environ.get('QEMU_IMG_PROG', 'qemu-img')]
60 qemu_io_args = [os.environ.get('QEMU_IO_PROG', 'qemu-io')]
64 qemu_io_args_no_fmt = [os.environ.get('QEMU_IO_PROG', 'qemu-io')]
69 qemu_nbd_prog = os.environ.get('QEMU_NBD_PROG', 'qemu-nbd')
77 qsd_prog = os.environ.get('QSD_PROG', 'qemu-storage-daemon')
105 valgrind_logfile = "--log-file=" + test_dir
107 # we don't know it a priori (subprocess.Popen is
111 qemu_valgrind = ['valgrind', valgrind_logfile, '--error-exitcode=99']
115 luks_default_key_secret_opt = 'key-secret=keysec0'
122 logger_name: str, level: int = logging.CRITICAL) -> Iterator[None]:
124 Utility function for temporarily changing the log level of a logger.
145 connect_stderr: bool = True) -> 'subprocess.Popen[str]':
147 # pylint: disable=consider-using-with
157 -> Tuple[str, int]:
159 Run a tool and return both its output and its exit code
162 output = subp.communicate()[0]
163 if subp.returncode < 0:
165 sys.stderr.write(f'{tool} received signal \
166 {-subp.returncode}: {cmd}\n')
167 if drop_successful_output and subp.returncode == 0:
171 def qemu_img_create_prepare_args(args: List[str]) -> List[str]:
172 if not args or args[0] != 'create':
177 # -o option may be specified several times
178 p.add_argument('-o', action='append', default=[])
179 p.add_argument('-f')
185 if parsed.f is not None:
186 result += ['-f', parsed.f]
193 if imgopts and parsed.f == imgfmt:
194 opts_list.insert(0, imgopts)
197 if parsed.f == 'luks' and \
198 all('key-secret' not in opts for opts in opts_list):
199 result += ['--object', luks_default_secret_object]
203 result += ['-o', opts]
211 ) -> 'subprocess.CompletedProcess[str]':
213 Run a qemu tool and return its status code and console output.
216 :param check: Enforce a return code of zero.
220 When the return code is negative, or on any non-zero exit code
224 handled, the command-line, return code, and all console output
228 a CompletedProcess. This object has args, returncode, and stdout
229 properties. If streams are not combined, it will also have a
240 if check and subp.returncode or (subp.returncode < 0):
251 ) -> 'subprocess.CompletedProcess[str]':
272 k = k.replace('_', '-')
277 def qemu_img_create(*args: str) -> 'subprocess.CompletedProcess[str]':
280 def qemu_img_json(*args: str) -> Any:
282 Run qemu-img and return its output as deserialized JSON.
285 When qemu-img crashes, or returns a non-zero exit code without
286 producing a valid JSON document to stdout.
288 When qemu-img returns 0, but failed to produce a valid JSON document.
290 :return: A deserialized JSON object; probably a dict[str, Any].
296 if exc.returncode < 0:
301 # multi-command flexibility, ignore the exact error codes and
312 def qemu_img_measure(*args: str) -> Any:
313 return qemu_img_json("measure", "--output", "json", *args)
315 def qemu_img_check(*args: str) -> Any:
316 return qemu_img_json("check", "--output", "json", *args)
318 def qemu_img_info(*args: str) -> Any:
319 return qemu_img_json('info', "--output", "json", *args)
321 def qemu_img_map(*args: str) -> Any:
322 return qemu_img_json('map', "--output", "json", *args)
325 ) -> 'subprocess.CompletedProcess[str]':
333 ) -> None:
336 args.append('--image-opts')
338 args += ['-f', imgfmt]
347 def qemu_io_wrap_args(args: Sequence[str]) -> List[str]:
348 if '-f' in args or '--image-opts' in args:
357 ) -> 'subprocess.CompletedProcess[str]':
368 ) -> 'subprocess.CompletedProcess[str]':
378 # pylint: disable=consider-using-with
383 out = self._p.stdout.read(9)
384 if out != 'qemu-io> ':
385 # Most probably qemu-io just failed to start.
395 pattern = 'qemu-io> '
397 pos = 0
407 pos = 0
409 return ''.join(s[:-n])
424 # Python < 3.8 would complain if this type were not a string literal
428 def __init__(self, *args: str, instance_id: str = 'a', qmp: bool = False):
429 assert '--pidfile' not in args
430 self.pidfile = os.path.join(test_dir, f'qsd-{instance_id}-pid')
431 all_args = [qsd_prog] + list(args) + ['--pidfile', self.pidfile]
434 self._qmpsock = os.path.join(sock_dir, f'qsd-{instance_id}.sock')
435 all_args += ['--chardev',
436 f'socket,id=qmp-sock,path={self._qmpsock}',
437 '--monitor', 'qmp-sock']
442 # pylint: disable=consider-using-with
450 'qemu-storage-daemon terminated with exit code ' +
451 f'{self._p.returncode}: {cmd}')
455 with open(self.pidfile, encoding='utf-8') as f:
456 self._pid = int(f.read().strip())
461 -> QMPMessage:
465 def get_qmp(self) -> QEMUMonitorProtocol:
470 -> QMPReturnValue:
494 self.stop(kill_signal=9)
498 '''Run qemu-nbd in daemon mode and return the parent's exit code'''
499 return subprocess.call(qemu_nbd_args + ['--fork'] + list(args))
501 def qemu_nbd_early_pipe(*args: str) -> Tuple[int, str]:
502 '''Run qemu-nbd in daemon mode and return both the parent's exit code
504 full_args = qemu_nbd_args + ['--fork'] + list(args)
505 output, returncode = qemu_tool_pipe_and_status('qemu-nbd', full_args,
509 def qemu_nbd_list_log(*args: str) -> str:
510 '''Run qemu-nbd to list remote exports'''
511 full_args = [qemu_nbd_prog, '-L'] + list(args)
512 output, _ = qemu_tool_pipe_and_status('qemu-nbd', full_args)
518 '''Context manager running qemu-nbd within the context'''
519 pid_file = file_path("qemu_nbd_popen-nbd-pid-file")
524 cmd.extend(('--persistent', '--pid-file', pid_file))
533 "qemu-nbd terminated with exit code {}: {}"
546 fmt1: str = imgfmt, fmt2: str = imgfmt) -> bool:
551 when qemu-img crashes or returns a status code of anything other
552 than 0 (identical) or 1 (different).
555 qemu_img('compare', '-f', fmt1, '-F', fmt2, img1, img2)
563 '''Create a fully-allocated raw image with sector markers'''
565 i = 0
571 def image_size(img: str) -> int:
573 value = qemu_img_info('-f', imgfmt, img)['virtual-size']
576 raise TypeError("Expected 'int' for 'virtual-size', "
577 f"got '{value}' of type '{type_name}'")
591 qemu_io_re = re.compile(r"[0-9]* ops; [0-9\/:. sec]* "
592 r"\([0-9\/.inf]* [EPTGMKiBbytes]*\/sec "
593 r"and [0-9\/.inf]* ops\/sec\)")
599 chown_re = re.compile(r"chown [0-9]+:[0-9]+")
604 '''Filter the timestamp of a QMP event dict'''
612 '''Filter the offset and length of a QMP block job event dict'''
622 '''Given a string filter, filter a QMP object's values.
623 filter_fn takes a (key, value) pair.'''
640 pref1 = os.path.join(test_dir, "%s-" % (os.getpid()))
641 pref2 = os.path.join(sock_dir, "%s-" % (os.getpid()))
642 return msg.replace(pref1, 'TEST_DIR/PID-').replace(pref2, 'SOCK_DIR/PID-')
651 def filter_virtio_scsi(output: str) -> str:
652 return re.sub(r'(virtio-scsi)-(ccw|pci)', r'\1', output)
662 return re.sub("#block[0-9]+", "NODE_NAME", msg)
672 drop_child_info: bool = True) -> str:
676 if 'disk size' in line or 'actual-size' in line:
691 line = re.sub('iters: [0-9]+', 'iters: XXX', line)
692 line = re.sub('uuid: [-a-f0-9]+',
693 'uuid: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX',
695 line = re.sub('cid: [0-9]+', 'cid: XXXXXXXXXX', line)
711 def filter_nbd_exports(output: str) -> str:
712 return re.sub(r'((min|opt|max) block): [0-9]+', r'\1: XXX', output)
714 def filter_qtest(output: str) -> str:
723 indent: Optional[int] = None) -> None:
725 Logs either a string message or a JSON serializable message (like QMP).
726 If indent is provided, JSON serializable messages are pretty-printed.
750 signal.setitimer(signal.ITIMER_REAL, 0)
756 return "{0}-{1}".format(os.getpid(), name)
765 with FilePath('a.img', 'b.img') as (img_a, img_b):
768 # a.img and b.img are automatically removed here.
773 with FilePath('a.sock', base_dir=iotests.sock_dir) as sock:
775 For convenience, calling with one argument yields a single file instead of
776 a tuple with one item.
785 return self.paths[0]
810 ''' Another way to get auto-generated filename that cleans itself up.
814 img_a, img_b = file_path('a.img', 'b.img')
829 return paths[0] if len(paths) == 1 else paths
840 '''A QEMU VM'''
843 name = "qemu%s-%d" % (path_suffix, os.getpid())
853 self._num_drives = 0
855 def _post_shutdown(self) -> None:
859 valgrind_filename = f"{test_dir}/{self._popen.pid}.valgrind"
861 with open(valgrind_filename, encoding='utf-8') as f:
862 print(f.read())
866 def _pre_launch(self) -> None:
873 self._args.append('-object')
878 self._args.append('-device')
883 self._args.append('-drive')
888 '''Add a virtio-blk drive to the VM'''
901 if img_format == 'luks' and 'key-secret' not in opts:
908 self._args.append('-drive')
914 self._args.append('-blockdev')
922 self._args.append('-incoming')
927 self._args.append('-S')
930 def hmp(self, command_line: str, use_log: bool = False) -> QMPMessage:
931 cmd = 'human-monitor-command'
932 kwargs: Dict[str, Any] = {'command-line': command_line}
938 def pause_drive(self, drive: str, event: Optional[str] = None) -> None:
944 self.hmp(f'qemu-io {drive} "break {event} bp_{drive}"')
946 def resume_drive(self, drive: str) -> None:
948 self.hmp(f'qemu-io {drive} "remove_break bp_{drive}"')
951 use_log: bool = False, qdev: bool = False) -> QMPMessage:
952 """Write to a given drive using an HMP command"""
953 d = '-d ' if qdev else ''
954 return self.hmp(f'qemu-io {d}{drive} "{cmd}"', use_log=use_log)
966 output[basestr[:-1]] = obj # Strip trailing '.'
998 ) -> Optional[str]:
1000 run_job moves a job from creation through to dismissal.
1002 :param job: String. ID of recently-launched job
1007 :param pre_finalize: Callback. A callable that takes no arguments to be
1008 invoked prior to issuing job-finalize, if any.
1032 result = self.qmp('query-jobs')
1038 self.qmp_log('job-complete', id=job, filters=filters)
1043 self.qmp_log('job-cancel', id=job, filters=filters)
1045 self.qmp_log('job-finalize', id=job, filters=filters)
1047 self.qmp_log('job-dismiss', id=job, filters=filters)
1055 result = self.qmp_log('blockdev-create', filters=filters,
1069 log(self.qmp('migrate-set-capabilities', capabilities=[
1076 def wait_migration(self, expect_runstate: Optional[str]) -> bool:
1079 # We use the default timeout, and with a timeout, event_wait()
1088 # The event may occur in finish-migrate, so wait for the expected
1089 # post-migration runstate
1092 runstate = self.qmp('query-status')['return']['status']
1098 nodes = self.qmp('query-named-block-nodes')
1100 if x['node-name'] == node_name:
1105 res = self.qmp("query-named-block-nodes")
1106 return {device['node-name']: device['dirty-bitmaps']
1107 for device in res['return'] if 'dirty-bitmaps' in device}
1111 get a specific bitmap from the object returned by query_bitmaps.
1136 @path is a string that consists of child names separated by
1137 slashes. It must begin with a slash.
1140 - root="qcow2-node", path="/backing/file"
1141 - root="quorum-node", path="/children.2/file"
1150 @graph may be None or the result of an x-debug-query-block-graph
1154 graph = self.qmp('x-debug-query-block-graph')['return']
1158 # Must start with a /
1197 # Many users of this class set a VM property we rely on heavily
1202 '''Traverse a path in a nested dict'''
1210 self.fail(f'failed path traversal for "{path}" in "{d}"')
1215 self.fail(f'path component "{component}" in "{path}" '
1216 f'is not a list in "{d}"')
1220 self.fail(f'invalid index "{idx}" in path "{path}" '
1221 f'in "{d}"')
1232 '''Assert that the value for a specific path in a QMP dict
1233 matches. When given a list of values, assert that any of
1238 # [] makes no sense as a list of valid values, so treat it as
1251 result = self.vm.qmp('query-block-jobs')
1255 """Issue a query-named-block-nodes and assert node_name and/or
1257 def check_equal_or_none(a, b): argument
1258 return a is None or b is None or a == b
1260 result = self.vm.qmp('query-named-block-nodes')
1262 if check_equal_or_none(x.get("node-name"), node_name) and \
1269 '''Asserts that the given filename is a json: filename and that its
1279 '''Cancel a block job and wait for it to finish, returning the event'''
1280 self.vm.cmd('block-job-cancel', device=drive, force=force)
1303 '''Wait for a block job to finish, returning the event'''
1321 """Wait until a BLOCK_JOB_READY event, and return the event."""
1338 '''Complete a block job and wait for it to finish'''
1342 self.vm.cmd('block-job-complete', device=drive)
1350 result = self.vm.qmp('query-block-jobs')
1361 self.vm.cmd('block-job-pause', device=job_id)
1373 # Each test in qemu-iotests has a number ("seq")
1374 seq = os.path.basename(sys.argv[0])
1376 with open('%s/%s.notrun' % (test_dir, seq), 'w', encoding='utf-8') \
1380 sys.exit(0)
1385 QMPTestCase.case_skip() for a variant that actually skips the
1388 # Each test in qemu-iotests has a number ("seq")
1389 seq = os.path.basename(sys.argv[0])
1391 with open('%s/%s.casenotrun' % (test_dir, seq), 'a', encoding='utf-8') \
1396 unsupported_fmts: Sequence[str] = ()) -> None:
1412 unsupported: Sequence[str] = ()) -> None:
1423 unsupported: Sequence[str] = ()) -> None:
1431 def _verify_cache_mode(supported_cache_modes: Sequence[str] = ()) -> None:
1435 def _verify_aio_mode(supported_aio_modes: Sequence[str] = ()) -> None:
1439 def _verify_formats(required_formats: Sequence[str] = ()) -> None:
1440 usf_list = list(set(required_formats) - set(supported_formats()))
1442 notrun(f'formats {usf_list} are not whitelisted')
1445 def _verify_virtio_blk() -> None:
1446 out = qemu_pipe('-M', 'none', '-device', 'help')
1447 if 'virtio-blk' not in out:
1448 notrun('Missing virtio-blk in QEMU binary')
1450 def verify_virtio_scsi_pci_or_ccw() -> None:
1451 out = qemu_pipe('-M', 'none', '-device', 'help')
1452 if 'virtio-scsi-pci' not in out and 'virtio-scsi-ccw' not in out:
1453 notrun('Missing virtio-scsi-pci or virtio-scsi-ccw in QEMU binary')
1456 def _verify_imgopts(unsupported: Sequence[str] = ()) -> None:
1459 # but it supported only for bash tests. We don't have a concept of global
1464 notrun(f'not suitable for this imgopts: {imgopts}')
1467 def supports_quorum() -> bool:
1468 return 'quorum' in qemu_img('--help').stdout
1475 def has_working_luks() -> Tuple[bool, str]:
1483 img_file = f'{test_dir}/luks-test.luks'
1484 res = qemu_img('create', '-f', 'luks',
1485 '--object', luks_default_secret_object,
1486 '-o', luks_default_key_secret_opt,
1487 '-o', 'iter-time=10',
1514 def supports_qcow2_zstd_compression() -> bool:
1515 img_file = f'{test_dir}/qcow2-zstd-test.qcow2'
1516 res = qemu_img('create', '-f', 'qcow2', '-o', 'compression_type=zstd',
1517 img_file, '0',
1525 "'compression-type' does not accept value 'zstd'" in res.stdout:
1534 def qemu_pipe(*args: str) -> str:
1536 Run qemu with an option to print something and exit (e.g. a help option).
1545 '''Set 'read_only' to True to check ro-whitelist
1546 Otherwise, rw-whitelist is checked'''
1552 format_message = qemu_pipe("-drive", "format=help")
1553 line = 1 if read_only else 0
1564 **kwargs: Dict[str, Any]) -> None:
1570 usf_list = list(set(fmts) - set(supported_formats(read_only)))
1572 msg = f'{test_case}: formats {usf_list} are not whitelisted'
1580 -> Callable[[Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]],
1586 **kwargs: Dict[str, Any]) -> None:
1588 msg = f'{test_case}: Skipped for format {imgfmt}'
1599 if os.getuid() == 0:
1600 case_notrun('{}: cannot be run as root'.format(args[0]))
1607 # qemu-iotest can reliably diff the results against master output,
1615 self.stream.writeln("skipped {0!r}".format(reason))
1641 ) -> None:
1648 def execute_unittest(argv: List[str], debug: bool = False) -> None:
1667 unsupported_imgopts: Sequence[str] = ()) -> bool:
1669 Perform necessary setup for either script-style or unittest-style tests.
1675 debug = '-d' in sys.argv
1677 sys.argv.remove('-d')
1692 """Run either unittest or script-style tests."""
1701 """Activate iotests.log() output to stdout for script-style tests."""
1709 # This is called from script-style iotests without a single point of entry
1711 """Initialize script-style tests without running any tests."""
1715 # This is called from script-style iotests with a single point of entry
1717 """Run script-style tests outside of the unittest framework"""