1# Common utilities and Python wrappers for qemu-iotests 2# 3# Copyright (C) 2012 IBM Corp. 4# 5# This program is free software; you can redistribute it and/or modify 6# it under the terms of the GNU General Public License as published by 7# the Free Software Foundation; either version 2 of the License, or 8# (at your option) any later version. 9# 10# This program is distributed in the hope that it will be useful, 11# but WITHOUT ANY WARRANTY; without even the implied warranty of 12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 13# GNU General Public License for more details. 14# 15# You should have received a copy of the GNU General Public License 16# along with this program. If not, see <http://www.gnu.org/licenses/>. 17# 18 19import argparse 20import atexit 21import bz2 22from collections import OrderedDict 23import faulthandler 24import json 25import logging 26import os 27import re 28import shutil 29import signal 30import struct 31import subprocess 32import sys 33import time 34from typing import (Any, Callable, Dict, Iterable, Iterator, 35 List, Optional, Sequence, TextIO, Tuple, Type, TypeVar) 36import unittest 37 38from contextlib import contextmanager 39 40from qemu.machine import qtest 41from qemu.qmp.legacy import QMPMessage, QMPReturnValue, QEMUMonitorProtocol 42from qemu.utils import VerboseProcessError 43 44# Use this logger for logging messages directly from the iotests module 45logger = logging.getLogger('qemu.iotests') 46logger.addHandler(logging.NullHandler()) 47 48# Use this logger for messages that ought to be used for diff output. 49test_logger = logging.getLogger('qemu.iotests.diff_io') 50 51 52faulthandler.enable() 53 54# This will not work if arguments contain spaces but is necessary if we 55# want to support the override options that ./check supports. 56qemu_img_args = [os.environ.get('QEMU_IMG_PROG', 'qemu-img')] 57if os.environ.get('QEMU_IMG_OPTIONS'): 58 qemu_img_args += os.environ['QEMU_IMG_OPTIONS'].strip().split(' ') 59 60qemu_io_args = [os.environ.get('QEMU_IO_PROG', 'qemu-io')] 61if os.environ.get('QEMU_IO_OPTIONS'): 62 qemu_io_args += os.environ['QEMU_IO_OPTIONS'].strip().split(' ') 63 64qemu_io_args_no_fmt = [os.environ.get('QEMU_IO_PROG', 'qemu-io')] 65if os.environ.get('QEMU_IO_OPTIONS_NO_FMT'): 66 qemu_io_args_no_fmt += \ 67 os.environ['QEMU_IO_OPTIONS_NO_FMT'].strip().split(' ') 68 69qemu_nbd_prog = os.environ.get('QEMU_NBD_PROG', 'qemu-nbd') 70qemu_nbd_args = [qemu_nbd_prog] 71if os.environ.get('QEMU_NBD_OPTIONS'): 72 qemu_nbd_args += os.environ['QEMU_NBD_OPTIONS'].strip().split(' ') 73 74qemu_prog = os.environ.get('QEMU_PROG', 'qemu') 75qemu_opts = os.environ.get('QEMU_OPTIONS', '').strip().split(' ') 76 77qsd_prog = os.environ.get('QSD_PROG', 'qemu-storage-daemon') 78 79gdb_qemu_env = os.environ.get('GDB_OPTIONS') 80qemu_gdb = [] 81if gdb_qemu_env: 82 qemu_gdb = ['gdbserver'] + gdb_qemu_env.strip().split(' ') 83 84qemu_print = os.environ.get('PRINT_QEMU', False) 85 86imgfmt = os.environ.get('IMGFMT', 'raw') 87imgproto = os.environ.get('IMGPROTO', 'file') 88 89try: 90 test_dir = os.environ['TEST_DIR'] 91 sock_dir = os.environ['SOCK_DIR'] 92 cachemode = os.environ['CACHEMODE'] 93 aiomode = os.environ['AIOMODE'] 94 qemu_default_machine = os.environ['QEMU_DEFAULT_MACHINE'] 95except KeyError: 96 # We are using these variables as proxies to indicate that we're 97 # not being run via "check". There may be other things set up by 98 # "check" that individual test cases rely on. 99 sys.stderr.write('Please run this test via the "check" script\n') 100 sys.exit(os.EX_USAGE) 101 102qemu_valgrind = [] 103if os.environ.get('VALGRIND_QEMU') == "y" and \ 104 os.environ.get('NO_VALGRIND') != "y": 105 valgrind_logfile = "--log-file=" + test_dir 106 # %p allows to put the valgrind process PID, since 107 # we don't know it a priori (subprocess.Popen is 108 # not yet invoked) 109 valgrind_logfile += "/%p.valgrind" 110 111 qemu_valgrind = ['valgrind', valgrind_logfile, '--error-exitcode=99'] 112 113luks_default_secret_object = 'secret,id=keysec0,data=' + \ 114 os.environ.get('IMGKEYSECRET', '') 115luks_default_key_secret_opt = 'key-secret=keysec0' 116 117sample_img_dir = os.environ['SAMPLE_IMG_DIR'] 118 119 120@contextmanager 121def change_log_level( 122 logger_name: str, level: int = logging.CRITICAL) -> Iterator[None]: 123 """ 124 Utility function for temporarily changing the log level of a logger. 125 126 This can be used to silence errors that are expected or uninteresting. 127 """ 128 _logger = logging.getLogger(logger_name) 129 current_level = _logger.level 130 _logger.setLevel(level) 131 132 try: 133 yield 134 finally: 135 _logger.setLevel(current_level) 136 137 138def unarchive_sample_image(sample, fname): 139 sample_fname = os.path.join(sample_img_dir, sample + '.bz2') 140 with bz2.open(sample_fname) as f_in, open(fname, 'wb') as f_out: 141 shutil.copyfileobj(f_in, f_out) 142 143 144def qemu_tool_popen(args: Sequence[str], 145 connect_stderr: bool = True) -> 'subprocess.Popen[str]': 146 stderr = subprocess.STDOUT if connect_stderr else None 147 # pylint: disable=consider-using-with 148 return subprocess.Popen(args, 149 stdout=subprocess.PIPE, 150 stderr=stderr, 151 universal_newlines=True) 152 153 154def qemu_tool_pipe_and_status(tool: str, args: Sequence[str], 155 connect_stderr: bool = True, 156 drop_successful_output: bool = False) \ 157 -> Tuple[str, int]: 158 """ 159 Run a tool and return both its output and its exit code 160 """ 161 with qemu_tool_popen(args, connect_stderr) as subp: 162 output = subp.communicate()[0] 163 if subp.returncode < 0: 164 cmd = ' '.join(args) 165 sys.stderr.write(f'{tool} received signal \ 166 {-subp.returncode}: {cmd}\n') 167 if drop_successful_output and subp.returncode == 0: 168 output = '' 169 return (output, subp.returncode) 170 171def qemu_img_create_prepare_args(args: List[str]) -> List[str]: 172 if not args or args[0] != 'create': 173 return list(args) 174 args = args[1:] 175 176 p = argparse.ArgumentParser(allow_abbrev=False) 177 # -o option may be specified several times 178 p.add_argument('-o', action='append', default=[]) 179 p.add_argument('-f') 180 parsed, remaining = p.parse_known_args(args) 181 182 opts_list = parsed.o 183 184 result = ['create'] 185 if parsed.f is not None: 186 result += ['-f', parsed.f] 187 188 # IMGOPTS most probably contain options specific for the selected format, 189 # like extended_l2 or compression_type for qcow2. Test may want to create 190 # additional images in other formats that doesn't support these options. 191 # So, use IMGOPTS only for images created in imgfmt format. 192 imgopts = os.environ.get('IMGOPTS') 193 if imgopts and parsed.f == imgfmt: 194 opts_list.insert(0, imgopts) 195 196 # default luks support 197 if parsed.f == 'luks' and \ 198 all('key-secret' not in opts for opts in opts_list): 199 result += ['--object', luks_default_secret_object] 200 opts_list.append(luks_default_key_secret_opt) 201 202 for opts in opts_list: 203 result += ['-o', opts] 204 205 result += remaining 206 207 return result 208 209 210def qemu_tool(*args: str, check: bool = True, combine_stdio: bool = True 211 ) -> 'subprocess.CompletedProcess[str]': 212 """ 213 Run a qemu tool and return its status code and console output. 214 215 :param args: full command line to run. 216 :param check: Enforce a return code of zero. 217 :param combine_stdio: set to False to keep stdout/stderr separated. 218 219 :raise VerboseProcessError: 220 When the return code is negative, or on any non-zero exit code 221 when 'check=True' was provided (the default). This exception has 222 'stdout', 'stderr', and 'returncode' properties that may be 223 inspected to show greater detail. If this exception is not 224 handled, the command-line, return code, and all console output 225 will be included at the bottom of the stack trace. 226 227 :return: 228 a CompletedProcess. This object has args, returncode, and stdout 229 properties. If streams are not combined, it will also have a 230 stderr property. 231 """ 232 subp = subprocess.run( 233 args, 234 stdout=subprocess.PIPE, 235 stderr=subprocess.STDOUT if combine_stdio else subprocess.PIPE, 236 universal_newlines=True, 237 check=False 238 ) 239 240 if check and subp.returncode or (subp.returncode < 0): 241 raise VerboseProcessError( 242 subp.returncode, args, 243 output=subp.stdout, 244 stderr=subp.stderr, 245 ) 246 247 return subp 248 249 250def qemu_img(*args: str, check: bool = True, combine_stdio: bool = True 251 ) -> 'subprocess.CompletedProcess[str]': 252 """ 253 Run QEMU_IMG_PROG and return its status code and console output. 254 255 This function always prepends QEMU_IMG_OPTIONS and may further alter 256 the args for 'create' commands. 257 258 See `qemu_tool()` for greater detail. 259 """ 260 full_args = qemu_img_args + qemu_img_create_prepare_args(list(args)) 261 return qemu_tool(*full_args, check=check, combine_stdio=combine_stdio) 262 263 264def ordered_qmp(qmsg, conv_keys=True): 265 # Dictionaries are not ordered prior to 3.6, therefore: 266 if isinstance(qmsg, list): 267 return [ordered_qmp(atom) for atom in qmsg] 268 if isinstance(qmsg, dict): 269 od = OrderedDict() 270 for k, v in sorted(qmsg.items()): 271 if conv_keys: 272 k = k.replace('_', '-') 273 od[k] = ordered_qmp(v, conv_keys=False) 274 return od 275 return qmsg 276 277def qemu_img_create(*args: str) -> 'subprocess.CompletedProcess[str]': 278 return qemu_img('create', *args) 279 280def qemu_img_json(*args: str) -> Any: 281 """ 282 Run qemu-img and return its output as deserialized JSON. 283 284 :raise CalledProcessError: 285 When qemu-img crashes, or returns a non-zero exit code without 286 producing a valid JSON document to stdout. 287 :raise JSONDecoderError: 288 When qemu-img returns 0, but failed to produce a valid JSON document. 289 290 :return: A deserialized JSON object; probably a dict[str, Any]. 291 """ 292 try: 293 res = qemu_img(*args, combine_stdio=False) 294 except subprocess.CalledProcessError as exc: 295 # Terminated due to signal. Don't bother. 296 if exc.returncode < 0: 297 raise 298 299 # Commands like 'check' can return failure (exit codes 2 and 3) 300 # to indicate command completion, but with errors found. For 301 # multi-command flexibility, ignore the exact error codes and 302 # *try* to load JSON. 303 try: 304 return json.loads(exc.stdout) 305 except json.JSONDecodeError: 306 # Nope. This thing is toast. Raise the /process/ error. 307 pass 308 raise 309 310 return json.loads(res.stdout) 311 312def qemu_img_measure(*args: str) -> Any: 313 return qemu_img_json("measure", "--output", "json", *args) 314 315def qemu_img_check(*args: str) -> Any: 316 return qemu_img_json("check", "--output", "json", *args) 317 318def qemu_img_info(*args: str) -> Any: 319 return qemu_img_json('info', "--output", "json", *args) 320 321def qemu_img_map(*args: str) -> Any: 322 return qemu_img_json('map', "--output", "json", *args) 323 324def qemu_img_log(*args: str, check: bool = True 325 ) -> 'subprocess.CompletedProcess[str]': 326 result = qemu_img(*args, check=check) 327 log(result.stdout, filters=[filter_testfiles]) 328 return result 329 330def img_info_log(filename: str, filter_path: Optional[str] = None, 331 use_image_opts: bool = False, extra_args: Sequence[str] = (), 332 check: bool = True, drop_child_info: bool = True, 333 ) -> None: 334 args = ['info'] 335 if use_image_opts: 336 args.append('--image-opts') 337 else: 338 args += ['-f', imgfmt] 339 args += extra_args 340 args.append(filename) 341 342 output = qemu_img(*args, check=check).stdout 343 if not filter_path: 344 filter_path = filename 345 log(filter_img_info(output, filter_path, drop_child_info)) 346 347def qemu_io_wrap_args(args: Sequence[str]) -> List[str]: 348 if '-f' in args or '--image-opts' in args: 349 return qemu_io_args_no_fmt + list(args) 350 else: 351 return qemu_io_args + list(args) 352 353def qemu_io_popen(*args): 354 return qemu_tool_popen(qemu_io_wrap_args(args)) 355 356def qemu_io(*args: str, check: bool = True, combine_stdio: bool = True 357 ) -> 'subprocess.CompletedProcess[str]': 358 """ 359 Run QEMU_IO_PROG and return the status code and console output. 360 361 This function always prepends either QEMU_IO_OPTIONS or 362 QEMU_IO_OPTIONS_NO_FMT. 363 """ 364 return qemu_tool(*qemu_io_wrap_args(args), 365 check=check, combine_stdio=combine_stdio) 366 367def qemu_io_log(*args: str, check: bool = True 368 ) -> 'subprocess.CompletedProcess[str]': 369 result = qemu_io(*args, check=check) 370 log(result.stdout, filters=[filter_testfiles, filter_qemu_io]) 371 return result 372 373class QemuIoInteractive: 374 def __init__(self, *args): 375 self.args = qemu_io_wrap_args(args) 376 # We need to keep the Popen objext around, and not 377 # close it immediately. Therefore, disable the pylint check: 378 # pylint: disable=consider-using-with 379 self._p = subprocess.Popen(self.args, stdin=subprocess.PIPE, 380 stdout=subprocess.PIPE, 381 stderr=subprocess.STDOUT, 382 universal_newlines=True) 383 out = self._p.stdout.read(9) 384 if out != 'qemu-io> ': 385 # Most probably qemu-io just failed to start. 386 # Let's collect the whole output and exit. 387 out += self._p.stdout.read() 388 self._p.wait(timeout=1) 389 raise ValueError(out) 390 391 def close(self): 392 self._p.communicate('q\n') 393 394 def _read_output(self): 395 pattern = 'qemu-io> ' 396 n = len(pattern) 397 pos = 0 398 s = [] 399 while pos != n: 400 c = self._p.stdout.read(1) 401 # check unexpected EOF 402 assert c != '' 403 s.append(c) 404 if c == pattern[pos]: 405 pos += 1 406 else: 407 pos = 0 408 409 return ''.join(s[:-n]) 410 411 def cmd(self, cmd): 412 # quit command is in close(), '\n' is added automatically 413 assert '\n' not in cmd 414 cmd = cmd.strip() 415 assert cmd not in ('q', 'quit') 416 self._p.stdin.write(cmd + '\n') 417 self._p.stdin.flush() 418 return self._read_output() 419 420 421class QemuStorageDaemon: 422 _qmp: Optional[QEMUMonitorProtocol] = None 423 _qmpsock: Optional[str] = None 424 # Python < 3.8 would complain if this type were not a string literal 425 # (importing `annotations` from `__future__` would work; but not on <= 3.6) 426 _p: 'Optional[subprocess.Popen[bytes]]' = None 427 428 def __init__(self, *args: str, instance_id: str = 'a', qmp: bool = False): 429 assert '--pidfile' not in args 430 self.pidfile = os.path.join(test_dir, f'qsd-{instance_id}-pid') 431 all_args = [qsd_prog] + list(args) + ['--pidfile', self.pidfile] 432 433 if qmp: 434 self._qmpsock = os.path.join(sock_dir, f'qsd-{instance_id}.sock') 435 all_args += ['--chardev', 436 f'socket,id=qmp-sock,path={self._qmpsock}', 437 '--monitor', 'qmp-sock'] 438 439 self._qmp = QEMUMonitorProtocol(self._qmpsock, server=True) 440 441 # Cannot use with here, we want the subprocess to stay around 442 # pylint: disable=consider-using-with 443 self._p = subprocess.Popen(all_args) 444 if self._qmp is not None: 445 self._qmp.accept() 446 while not os.path.exists(self.pidfile): 447 if self._p.poll() is not None: 448 cmd = ' '.join(all_args) 449 raise RuntimeError( 450 'qemu-storage-daemon terminated with exit code ' + 451 f'{self._p.returncode}: {cmd}') 452 453 time.sleep(0.01) 454 455 with open(self.pidfile, encoding='utf-8') as f: 456 self._pid = int(f.read().strip()) 457 458 assert self._pid == self._p.pid 459 460 def qmp(self, cmd: str, args: Optional[Dict[str, object]] = None) \ 461 -> QMPMessage: 462 assert self._qmp is not None 463 return self._qmp.cmd_raw(cmd, args) 464 465 def get_qmp(self) -> QEMUMonitorProtocol: 466 assert self._qmp is not None 467 return self._qmp 468 469 def cmd(self, cmd: str, args: Optional[Dict[str, object]] = None) \ 470 -> QMPReturnValue: 471 assert self._qmp is not None 472 return self._qmp.cmd(cmd, **(args or {})) 473 474 def stop(self, kill_signal=15): 475 self._p.send_signal(kill_signal) 476 self._p.wait() 477 self._p = None 478 479 if self._qmp: 480 self._qmp.close() 481 482 if self._qmpsock is not None: 483 try: 484 os.remove(self._qmpsock) 485 except OSError: 486 pass 487 try: 488 os.remove(self.pidfile) 489 except OSError: 490 pass 491 492 def __del__(self): 493 if self._p is not None: 494 self.stop(kill_signal=9) 495 496 497def qemu_nbd(*args): 498 '''Run qemu-nbd in daemon mode and return the parent's exit code''' 499 return subprocess.call(qemu_nbd_args + ['--fork'] + list(args)) 500 501def qemu_nbd_early_pipe(*args: str) -> Tuple[int, str]: 502 '''Run qemu-nbd in daemon mode and return both the parent's exit code 503 and its output in case of an error''' 504 full_args = qemu_nbd_args + ['--fork'] + list(args) 505 output, returncode = qemu_tool_pipe_and_status('qemu-nbd', full_args, 506 connect_stderr=False) 507 return returncode, output if returncode else '' 508 509def qemu_nbd_list_log(*args: str) -> str: 510 '''Run qemu-nbd to list remote exports''' 511 full_args = [qemu_nbd_prog, '-L'] + list(args) 512 output, _ = qemu_tool_pipe_and_status('qemu-nbd', full_args) 513 log(output, filters=[filter_testfiles, filter_nbd_exports]) 514 return output 515 516@contextmanager 517def qemu_nbd_popen(*args): 518 '''Context manager running qemu-nbd within the context''' 519 pid_file = file_path("qemu_nbd_popen-nbd-pid-file") 520 521 assert not os.path.exists(pid_file) 522 523 cmd = list(qemu_nbd_args) 524 cmd.extend(('--persistent', '--pid-file', pid_file)) 525 cmd.extend(args) 526 527 log('Start NBD server') 528 with subprocess.Popen(cmd) as p: 529 try: 530 while not os.path.exists(pid_file): 531 if p.poll() is not None: 532 raise RuntimeError( 533 "qemu-nbd terminated with exit code {}: {}" 534 .format(p.returncode, ' '.join(cmd))) 535 536 time.sleep(0.01) 537 yield 538 finally: 539 if os.path.exists(pid_file): 540 os.remove(pid_file) 541 log('Kill NBD server') 542 p.kill() 543 p.wait() 544 545def compare_images(img1: str, img2: str, 546 fmt1: str = imgfmt, fmt2: str = imgfmt) -> bool: 547 """ 548 Compare two images with QEMU_IMG; return True if they are identical. 549 550 :raise CalledProcessError: 551 when qemu-img crashes or returns a status code of anything other 552 than 0 (identical) or 1 (different). 553 """ 554 try: 555 qemu_img('compare', '-f', fmt1, '-F', fmt2, img1, img2) 556 return True 557 except subprocess.CalledProcessError as exc: 558 if exc.returncode == 1: 559 return False 560 raise 561 562def create_image(name, size): 563 '''Create a fully-allocated raw image with sector markers''' 564 with open(name, 'wb') as file: 565 i = 0 566 while i < size: 567 sector = struct.pack('>l504xl', i // 512, i // 512) 568 file.write(sector) 569 i = i + 512 570 571def image_size(img: str) -> int: 572 """Return image's virtual size""" 573 value = qemu_img_info('-f', imgfmt, img)['virtual-size'] 574 if not isinstance(value, int): 575 type_name = type(value).__name__ 576 raise TypeError("Expected 'int' for 'virtual-size', " 577 f"got '{value}' of type '{type_name}'") 578 return value 579 580def is_str(val): 581 return isinstance(val, str) 582 583test_dir_re = re.compile(r"%s" % test_dir) 584def filter_test_dir(msg): 585 return test_dir_re.sub("TEST_DIR", msg) 586 587win32_re = re.compile(r"\r") 588def filter_win32(msg): 589 return win32_re.sub("", msg) 590 591qemu_io_re = re.compile(r"[0-9]* ops; [0-9\/:. sec]* " 592 r"\([0-9\/.inf]* [EPTGMKiBbytes]*\/sec " 593 r"and [0-9\/.inf]* ops\/sec\)") 594def filter_qemu_io(msg): 595 msg = filter_win32(msg) 596 return qemu_io_re.sub("X ops; XX:XX:XX.X " 597 "(XXX YYY/sec and XXX ops/sec)", msg) 598 599chown_re = re.compile(r"chown [0-9]+:[0-9]+") 600def filter_chown(msg): 601 return chown_re.sub("chown UID:GID", msg) 602 603def filter_qmp_event(event): 604 '''Filter a QMP event dict''' 605 event = dict(event) 606 if 'timestamp' in event: 607 event['timestamp']['seconds'] = 'SECS' 608 event['timestamp']['microseconds'] = 'USECS' 609 return event 610 611def filter_qmp(qmsg, filter_fn): 612 '''Given a string filter, filter a QMP object's values. 613 filter_fn takes a (key, value) pair.''' 614 # Iterate through either lists or dicts; 615 if isinstance(qmsg, list): 616 items = enumerate(qmsg) 617 elif isinstance(qmsg, dict): 618 items = qmsg.items() 619 else: 620 return filter_fn(None, qmsg) 621 622 for k, v in items: 623 if isinstance(v, (dict, list)): 624 qmsg[k] = filter_qmp(v, filter_fn) 625 else: 626 qmsg[k] = filter_fn(k, v) 627 return qmsg 628 629def filter_testfiles(msg): 630 pref1 = os.path.join(test_dir, "%s-" % (os.getpid())) 631 pref2 = os.path.join(sock_dir, "%s-" % (os.getpid())) 632 return msg.replace(pref1, 'TEST_DIR/PID-').replace(pref2, 'SOCK_DIR/PID-') 633 634def filter_qmp_testfiles(qmsg): 635 def _filter(_key, value): 636 if is_str(value): 637 return filter_testfiles(value) 638 return value 639 return filter_qmp(qmsg, _filter) 640 641def filter_virtio_scsi(output: str) -> str: 642 return re.sub(r'(virtio-scsi)-(ccw|pci)', r'\1', output) 643 644def filter_qmp_virtio_scsi(qmsg): 645 def _filter(_key, value): 646 if is_str(value): 647 return filter_virtio_scsi(value) 648 return value 649 return filter_qmp(qmsg, _filter) 650 651def filter_generated_node_ids(msg): 652 return re.sub("#block[0-9]+", "NODE_NAME", msg) 653 654def filter_qmp_generated_node_ids(qmsg): 655 def _filter(_key, value): 656 if is_str(value): 657 return filter_generated_node_ids(value) 658 return value 659 return filter_qmp(qmsg, _filter) 660 661def filter_img_info(output: str, filename: str, 662 drop_child_info: bool = True) -> str: 663 lines = [] 664 drop_indented = False 665 for line in output.split('\n'): 666 if 'disk size' in line or 'actual-size' in line: 667 continue 668 669 # Drop child node info 670 if drop_indented: 671 if line.startswith(' '): 672 continue 673 drop_indented = False 674 if drop_child_info and "Child node '/" in line: 675 drop_indented = True 676 continue 677 678 line = line.replace(filename, 'TEST_IMG') 679 line = filter_testfiles(line) 680 line = line.replace(imgfmt, 'IMGFMT') 681 line = re.sub('iters: [0-9]+', 'iters: XXX', line) 682 line = re.sub('uuid: [-a-f0-9]+', 683 'uuid: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX', 684 line) 685 line = re.sub('cid: [0-9]+', 'cid: XXXXXXXXXX', line) 686 line = re.sub('(compression type: )(zlib|zstd)', r'\1COMPRESSION_TYPE', 687 line) 688 lines.append(line) 689 return '\n'.join(lines) 690 691def filter_imgfmt(msg): 692 return msg.replace(imgfmt, 'IMGFMT') 693 694def filter_qmp_imgfmt(qmsg): 695 def _filter(_key, value): 696 if is_str(value): 697 return filter_imgfmt(value) 698 return value 699 return filter_qmp(qmsg, _filter) 700 701def filter_nbd_exports(output: str) -> str: 702 return re.sub(r'((min|opt|max) block): [0-9]+', r'\1: XXX', output) 703 704def filter_qtest(output: str) -> str: 705 output = re.sub(r'^\[I \d+\.\d+\] OPENED\n', '', output) 706 output = re.sub(r'\n?\[I \+\d+\.\d+\] CLOSED\n?$', '', output) 707 return output 708 709Msg = TypeVar('Msg', Dict[str, Any], List[Any], str) 710 711def log(msg: Msg, 712 filters: Iterable[Callable[[Msg], Msg]] = (), 713 indent: Optional[int] = None) -> None: 714 """ 715 Logs either a string message or a JSON serializable message (like QMP). 716 If indent is provided, JSON serializable messages are pretty-printed. 717 """ 718 for flt in filters: 719 msg = flt(msg) 720 if isinstance(msg, (dict, list)): 721 # Don't sort if it's already sorted 722 do_sort = not isinstance(msg, OrderedDict) 723 test_logger.info(json.dumps(msg, sort_keys=do_sort, indent=indent)) 724 else: 725 test_logger.info(msg) 726 727class Timeout: 728 def __init__(self, seconds, errmsg="Timeout"): 729 self.seconds = seconds 730 self.errmsg = errmsg 731 def __enter__(self): 732 if qemu_gdb or qemu_valgrind: 733 return self 734 signal.signal(signal.SIGALRM, self.timeout) 735 signal.setitimer(signal.ITIMER_REAL, self.seconds) 736 return self 737 def __exit__(self, exc_type, value, traceback): 738 if qemu_gdb or qemu_valgrind: 739 return False 740 signal.setitimer(signal.ITIMER_REAL, 0) 741 return False 742 def timeout(self, signum, frame): 743 raise TimeoutError(self.errmsg) 744 745def file_pattern(name): 746 return "{0}-{1}".format(os.getpid(), name) 747 748class FilePath: 749 """ 750 Context manager generating multiple file names. The generated files are 751 removed when exiting the context. 752 753 Example usage: 754 755 with FilePath('a.img', 'b.img') as (img_a, img_b): 756 # Use img_a and img_b here... 757 758 # a.img and b.img are automatically removed here. 759 760 By default images are created in iotests.test_dir. To create sockets use 761 iotests.sock_dir: 762 763 with FilePath('a.sock', base_dir=iotests.sock_dir) as sock: 764 765 For convenience, calling with one argument yields a single file instead of 766 a tuple with one item. 767 768 """ 769 def __init__(self, *names, base_dir=test_dir): 770 self.paths = [os.path.join(base_dir, file_pattern(name)) 771 for name in names] 772 773 def __enter__(self): 774 if len(self.paths) == 1: 775 return self.paths[0] 776 else: 777 return self.paths 778 779 def __exit__(self, exc_type, exc_val, exc_tb): 780 for path in self.paths: 781 try: 782 os.remove(path) 783 except OSError: 784 pass 785 return False 786 787 788def try_remove(img): 789 try: 790 os.remove(img) 791 except OSError: 792 pass 793 794def file_path_remover(): 795 for path in reversed(file_path_remover.paths): 796 try_remove(path) 797 798 799def file_path(*names, base_dir=test_dir): 800 ''' Another way to get auto-generated filename that cleans itself up. 801 802 Use is as simple as: 803 804 img_a, img_b = file_path('a.img', 'b.img') 805 sock = file_path('socket') 806 ''' 807 808 if not hasattr(file_path_remover, 'paths'): 809 file_path_remover.paths = [] 810 atexit.register(file_path_remover) 811 812 paths = [] 813 for name in names: 814 filename = file_pattern(name) 815 path = os.path.join(base_dir, filename) 816 file_path_remover.paths.append(path) 817 paths.append(path) 818 819 return paths[0] if len(paths) == 1 else paths 820 821def remote_filename(path): 822 if imgproto == 'file': 823 return path 824 elif imgproto == 'ssh': 825 return "ssh://%s@127.0.0.1:22%s" % (os.environ.get('USER'), path) 826 else: 827 raise ValueError("Protocol %s not supported" % (imgproto)) 828 829class VM(qtest.QEMUQtestMachine): 830 '''A QEMU VM''' 831 832 def __init__(self, path_suffix=''): 833 name = "qemu%s-%d" % (path_suffix, os.getpid()) 834 timer = 15.0 if not (qemu_gdb or qemu_valgrind) else None 835 if qemu_gdb and qemu_valgrind: 836 sys.stderr.write('gdb and valgrind are mutually exclusive\n') 837 sys.exit(1) 838 wrapper = qemu_gdb if qemu_gdb else qemu_valgrind 839 super().__init__(qemu_prog, qemu_opts, wrapper=wrapper, 840 name=name, 841 base_temp_dir=test_dir, 842 qmp_timer=timer) 843 self._num_drives = 0 844 845 def _post_shutdown(self) -> None: 846 super()._post_shutdown() 847 if not qemu_valgrind or not self._popen: 848 return 849 valgrind_filename = f"{test_dir}/{self._popen.pid}.valgrind" 850 if self.exitcode() == 99: 851 with open(valgrind_filename, encoding='utf-8') as f: 852 print(f.read()) 853 else: 854 os.remove(valgrind_filename) 855 856 def _pre_launch(self) -> None: 857 super()._pre_launch() 858 if qemu_print: 859 # set QEMU binary output to stdout 860 self._close_qemu_log_file() 861 862 def add_object(self, opts): 863 self._args.append('-object') 864 self._args.append(opts) 865 return self 866 867 def add_device(self, opts): 868 self._args.append('-device') 869 self._args.append(opts) 870 return self 871 872 def add_drive_raw(self, opts): 873 self._args.append('-drive') 874 self._args.append(opts) 875 return self 876 877 def add_drive(self, path, opts='', interface='virtio', img_format=imgfmt): 878 '''Add a virtio-blk drive to the VM''' 879 options = ['if=%s' % interface, 880 'id=drive%d' % self._num_drives] 881 882 if path is not None: 883 options.append('file=%s' % path) 884 options.append('format=%s' % img_format) 885 options.append('cache=%s' % cachemode) 886 options.append('aio=%s' % aiomode) 887 888 if opts: 889 options.append(opts) 890 891 if img_format == 'luks' and 'key-secret' not in opts: 892 # default luks support 893 if luks_default_secret_object not in self._args: 894 self.add_object(luks_default_secret_object) 895 896 options.append(luks_default_key_secret_opt) 897 898 self._args.append('-drive') 899 self._args.append(','.join(options)) 900 self._num_drives += 1 901 return self 902 903 def add_blockdev(self, opts): 904 self._args.append('-blockdev') 905 if isinstance(opts, str): 906 self._args.append(opts) 907 else: 908 self._args.append(','.join(opts)) 909 return self 910 911 def add_incoming(self, addr): 912 self._args.append('-incoming') 913 self._args.append(addr) 914 return self 915 916 def add_paused(self): 917 self._args.append('-S') 918 return self 919 920 def hmp(self, command_line: str, use_log: bool = False) -> QMPMessage: 921 cmd = 'human-monitor-command' 922 kwargs: Dict[str, Any] = {'command-line': command_line} 923 if use_log: 924 return self.qmp_log(cmd, **kwargs) 925 else: 926 return self.qmp(cmd, **kwargs) 927 928 def pause_drive(self, drive: str, event: Optional[str] = None) -> None: 929 """Pause drive r/w operations""" 930 if not event: 931 self.pause_drive(drive, "read_aio") 932 self.pause_drive(drive, "write_aio") 933 return 934 self.hmp(f'qemu-io {drive} "break {event} bp_{drive}"') 935 936 def resume_drive(self, drive: str) -> None: 937 """Resume drive r/w operations""" 938 self.hmp(f'qemu-io {drive} "remove_break bp_{drive}"') 939 940 def hmp_qemu_io(self, drive: str, cmd: str, 941 use_log: bool = False, qdev: bool = False) -> QMPMessage: 942 """Write to a given drive using an HMP command""" 943 d = '-d ' if qdev else '' 944 return self.hmp(f'qemu-io {d}{drive} "{cmd}"', use_log=use_log) 945 946 def flatten_qmp_object(self, obj, output=None, basestr=''): 947 if output is None: 948 output = {} 949 if isinstance(obj, list): 950 for i, item in enumerate(obj): 951 self.flatten_qmp_object(item, output, basestr + str(i) + '.') 952 elif isinstance(obj, dict): 953 for key in obj: 954 self.flatten_qmp_object(obj[key], output, basestr + key + '.') 955 else: 956 output[basestr[:-1]] = obj # Strip trailing '.' 957 return output 958 959 def qmp_to_opts(self, obj): 960 obj = self.flatten_qmp_object(obj) 961 output_list = [] 962 for key in obj: 963 output_list += [key + '=' + obj[key]] 964 return ','.join(output_list) 965 966 def get_qmp_events_filtered(self, wait=60.0): 967 result = [] 968 for ev in self.get_qmp_events(wait=wait): 969 result.append(filter_qmp_event(ev)) 970 return result 971 972 def qmp_log(self, cmd, filters=(), indent=None, **kwargs): 973 full_cmd = OrderedDict(( 974 ("execute", cmd), 975 ("arguments", ordered_qmp(kwargs)) 976 )) 977 log(full_cmd, filters, indent=indent) 978 result = self.qmp(cmd, **kwargs) 979 log(result, filters, indent=indent) 980 return result 981 982 # Returns None on success, and an error string on failure 983 def run_job(self, job: str, auto_finalize: bool = True, 984 auto_dismiss: bool = False, 985 pre_finalize: Optional[Callable[[], None]] = None, 986 cancel: bool = False, wait: float = 60.0, 987 filters: Iterable[Callable[[Any], Any]] = (), 988 ) -> Optional[str]: 989 """ 990 run_job moves a job from creation through to dismissal. 991 992 :param job: String. ID of recently-launched job 993 :param auto_finalize: Bool. True if the job was launched with 994 auto_finalize. Defaults to True. 995 :param auto_dismiss: Bool. True if the job was launched with 996 auto_dismiss=True. Defaults to False. 997 :param pre_finalize: Callback. A callable that takes no arguments to be 998 invoked prior to issuing job-finalize, if any. 999 :param cancel: Bool. When true, cancels the job after the pre_finalize 1000 callback. 1001 :param wait: Float. Timeout value specifying how long to wait for any 1002 event, in seconds. Defaults to 60.0. 1003 """ 1004 match_device = {'data': {'device': job}} 1005 match_id = {'data': {'id': job}} 1006 events = [ 1007 ('BLOCK_JOB_COMPLETED', match_device), 1008 ('BLOCK_JOB_CANCELLED', match_device), 1009 ('BLOCK_JOB_ERROR', match_device), 1010 ('BLOCK_JOB_READY', match_device), 1011 ('BLOCK_JOB_PENDING', match_id), 1012 ('JOB_STATUS_CHANGE', match_id) 1013 ] 1014 error = None 1015 while True: 1016 ev = filter_qmp_event(self.events_wait(events, timeout=wait)) 1017 if ev['event'] != 'JOB_STATUS_CHANGE': 1018 log(ev, filters=filters) 1019 continue 1020 status = ev['data']['status'] 1021 if status == 'aborting': 1022 result = self.qmp('query-jobs') 1023 for j in result['return']: 1024 if j['id'] == job: 1025 error = j['error'] 1026 log('Job failed: %s' % (j['error']), filters=filters) 1027 elif status == 'ready': 1028 self.qmp_log('job-complete', id=job, filters=filters) 1029 elif status == 'pending' and not auto_finalize: 1030 if pre_finalize: 1031 pre_finalize() 1032 if cancel: 1033 self.qmp_log('job-cancel', id=job, filters=filters) 1034 else: 1035 self.qmp_log('job-finalize', id=job, filters=filters) 1036 elif status == 'concluded' and not auto_dismiss: 1037 self.qmp_log('job-dismiss', id=job, filters=filters) 1038 elif status == 'null': 1039 return error 1040 1041 # Returns None on success, and an error string on failure 1042 def blockdev_create(self, options, job_id='job0', filters=None): 1043 if filters is None: 1044 filters = [filter_qmp_testfiles] 1045 result = self.qmp_log('blockdev-create', filters=filters, 1046 job_id=job_id, options=options) 1047 1048 if 'return' in result: 1049 assert result['return'] == {} 1050 job_result = self.run_job(job_id, filters=filters) 1051 else: 1052 job_result = result['error'] 1053 1054 log("") 1055 return job_result 1056 1057 def enable_migration_events(self, name): 1058 log('Enabling migration QMP events on %s...' % name) 1059 log(self.qmp('migrate-set-capabilities', capabilities=[ 1060 { 1061 'capability': 'events', 1062 'state': True 1063 } 1064 ])) 1065 1066 def wait_migration(self, expect_runstate: Optional[str]) -> bool: 1067 while True: 1068 event = self.event_wait('MIGRATION') 1069 # We use the default timeout, and with a timeout, event_wait() 1070 # never returns None 1071 assert event 1072 1073 log(event, filters=[filter_qmp_event]) 1074 if event['data']['status'] in ('completed', 'failed'): 1075 break 1076 1077 if event['data']['status'] == 'completed': 1078 # The event may occur in finish-migrate, so wait for the expected 1079 # post-migration runstate 1080 runstate = None 1081 while runstate != expect_runstate: 1082 runstate = self.qmp('query-status')['return']['status'] 1083 return True 1084 else: 1085 return False 1086 1087 def node_info(self, node_name): 1088 nodes = self.qmp('query-named-block-nodes') 1089 for x in nodes['return']: 1090 if x['node-name'] == node_name: 1091 return x 1092 return None 1093 1094 def query_bitmaps(self): 1095 res = self.qmp("query-named-block-nodes") 1096 return {device['node-name']: device['dirty-bitmaps'] 1097 for device in res['return'] if 'dirty-bitmaps' in device} 1098 1099 def get_bitmap(self, node_name, bitmap_name, recording=None, bitmaps=None): 1100 """ 1101 get a specific bitmap from the object returned by query_bitmaps. 1102 :param recording: If specified, filter results by the specified value. 1103 :param bitmaps: If specified, use it instead of call query_bitmaps() 1104 """ 1105 if bitmaps is None: 1106 bitmaps = self.query_bitmaps() 1107 1108 for bitmap in bitmaps[node_name]: 1109 if bitmap.get('name', '') == bitmap_name: 1110 if recording is None or bitmap.get('recording') == recording: 1111 return bitmap 1112 return None 1113 1114 def check_bitmap_status(self, node_name, bitmap_name, fields): 1115 ret = self.get_bitmap(node_name, bitmap_name) 1116 1117 return fields.items() <= ret.items() 1118 1119 def assert_block_path(self, root, path, expected_node, graph=None): 1120 """ 1121 Check whether the node under the given path in the block graph 1122 is @expected_node. 1123 1124 @root is the node name of the node where the @path is rooted. 1125 1126 @path is a string that consists of child names separated by 1127 slashes. It must begin with a slash. 1128 1129 Examples for @root + @path: 1130 - root="qcow2-node", path="/backing/file" 1131 - root="quorum-node", path="/children.2/file" 1132 1133 Hypothetically, @path could be empty, in which case it would 1134 point to @root. However, in practice this case is not useful 1135 and hence not allowed. 1136 1137 @expected_node may be None. (All elements of the path but the 1138 leaf must still exist.) 1139 1140 @graph may be None or the result of an x-debug-query-block-graph 1141 call that has already been performed. 1142 """ 1143 if graph is None: 1144 graph = self.qmp('x-debug-query-block-graph')['return'] 1145 1146 iter_path = iter(path.split('/')) 1147 1148 # Must start with a / 1149 assert next(iter_path) == '' 1150 1151 node = next((node for node in graph['nodes'] if node['name'] == root), 1152 None) 1153 1154 # An empty @path is not allowed, so the root node must be present 1155 assert node is not None, 'Root node %s not found' % root 1156 1157 for child_name in iter_path: 1158 assert node is not None, 'Cannot follow path %s%s' % (root, path) 1159 1160 try: 1161 node_id = next(edge['child'] for edge in graph['edges'] 1162 if (edge['parent'] == node['id'] and 1163 edge['name'] == child_name)) 1164 1165 node = next(node for node in graph['nodes'] 1166 if node['id'] == node_id) 1167 1168 except StopIteration: 1169 node = None 1170 1171 if node is None: 1172 assert expected_node is None, \ 1173 'No node found under %s (but expected %s)' % \ 1174 (path, expected_node) 1175 else: 1176 assert node['name'] == expected_node, \ 1177 'Found node %s under %s (but expected %s)' % \ 1178 (node['name'], path, expected_node) 1179 1180index_re = re.compile(r'([^\[]+)\[([^\]]+)\]') 1181 1182class QMPTestCase(unittest.TestCase): 1183 '''Abstract base class for QMP test cases''' 1184 1185 def __init__(self, *args, **kwargs): 1186 super().__init__(*args, **kwargs) 1187 # Many users of this class set a VM property we rely on heavily 1188 # in the methods below. 1189 self.vm = None 1190 1191 def dictpath(self, d, path): 1192 '''Traverse a path in a nested dict''' 1193 for component in path.split('/'): 1194 m = index_re.match(component) 1195 if m: 1196 component, idx = m.groups() 1197 idx = int(idx) 1198 1199 if not isinstance(d, dict) or component not in d: 1200 self.fail(f'failed path traversal for "{path}" in "{d}"') 1201 d = d[component] 1202 1203 if m: 1204 if not isinstance(d, list): 1205 self.fail(f'path component "{component}" in "{path}" ' 1206 f'is not a list in "{d}"') 1207 try: 1208 d = d[idx] 1209 except IndexError: 1210 self.fail(f'invalid index "{idx}" in path "{path}" ' 1211 f'in "{d}"') 1212 return d 1213 1214 def assert_qmp_absent(self, d, path): 1215 try: 1216 result = self.dictpath(d, path) 1217 except AssertionError: 1218 return 1219 self.fail('path "%s" has value "%s"' % (path, str(result))) 1220 1221 def assert_qmp(self, d, path, value): 1222 '''Assert that the value for a specific path in a QMP dict 1223 matches. When given a list of values, assert that any of 1224 them matches.''' 1225 1226 result = self.dictpath(d, path) 1227 1228 # [] makes no sense as a list of valid values, so treat it as 1229 # an actual single value. 1230 if isinstance(value, list) and value != []: 1231 for v in value: 1232 if result == v: 1233 return 1234 self.fail('no match for "%s" in %s' % (str(result), str(value))) 1235 else: 1236 self.assertEqual(result, value, 1237 '"%s" is "%s", expected "%s"' 1238 % (path, str(result), str(value))) 1239 1240 def assert_no_active_block_jobs(self): 1241 result = self.vm.qmp('query-block-jobs') 1242 self.assert_qmp(result, 'return', []) 1243 1244 def assert_has_block_node(self, node_name=None, file_name=None): 1245 """Issue a query-named-block-nodes and assert node_name and/or 1246 file_name is present in the result""" 1247 def check_equal_or_none(a, b): 1248 return a is None or b is None or a == b 1249 assert node_name or file_name 1250 result = self.vm.qmp('query-named-block-nodes') 1251 for x in result["return"]: 1252 if check_equal_or_none(x.get("node-name"), node_name) and \ 1253 check_equal_or_none(x.get("file"), file_name): 1254 return 1255 self.fail("Cannot find %s %s in result:\n%s" % 1256 (node_name, file_name, result)) 1257 1258 def assert_json_filename_equal(self, json_filename, reference): 1259 '''Asserts that the given filename is a json: filename and that its 1260 content is equal to the given reference object''' 1261 self.assertEqual(json_filename[:5], 'json:') 1262 self.assertEqual( 1263 self.vm.flatten_qmp_object(json.loads(json_filename[5:])), 1264 self.vm.flatten_qmp_object(reference) 1265 ) 1266 1267 def cancel_and_wait(self, drive='drive0', force=False, 1268 resume=False, wait=60.0): 1269 '''Cancel a block job and wait for it to finish, returning the event''' 1270 self.vm.cmd('block-job-cancel', device=drive, force=force) 1271 1272 if resume: 1273 self.vm.resume_drive(drive) 1274 1275 cancelled = False 1276 result = None 1277 while not cancelled: 1278 for event in self.vm.get_qmp_events(wait=wait): 1279 if event['event'] == 'BLOCK_JOB_COMPLETED' or \ 1280 event['event'] == 'BLOCK_JOB_CANCELLED': 1281 self.assert_qmp(event, 'data/device', drive) 1282 result = event 1283 cancelled = True 1284 elif event['event'] == 'JOB_STATUS_CHANGE': 1285 self.assert_qmp(event, 'data/id', drive) 1286 1287 1288 self.assert_no_active_block_jobs() 1289 return result 1290 1291 def wait_until_completed(self, drive='drive0', check_offset=True, 1292 wait=60.0, error=None): 1293 '''Wait for a block job to finish, returning the event''' 1294 while True: 1295 for event in self.vm.get_qmp_events(wait=wait): 1296 if event['event'] == 'BLOCK_JOB_COMPLETED': 1297 self.assert_qmp(event, 'data/device', drive) 1298 if error is None: 1299 self.assert_qmp_absent(event, 'data/error') 1300 if check_offset: 1301 self.assert_qmp(event, 'data/offset', 1302 event['data']['len']) 1303 else: 1304 self.assert_qmp(event, 'data/error', error) 1305 self.assert_no_active_block_jobs() 1306 return event 1307 if event['event'] == 'JOB_STATUS_CHANGE': 1308 self.assert_qmp(event, 'data/id', drive) 1309 1310 def wait_ready(self, drive='drive0'): 1311 """Wait until a BLOCK_JOB_READY event, and return the event.""" 1312 return self.vm.events_wait([ 1313 ('BLOCK_JOB_READY', 1314 {'data': {'type': 'mirror', 'device': drive}}), 1315 ('BLOCK_JOB_READY', 1316 {'data': {'type': 'commit', 'device': drive}}) 1317 ]) 1318 1319 def wait_ready_and_cancel(self, drive='drive0'): 1320 self.wait_ready(drive=drive) 1321 event = self.cancel_and_wait(drive=drive) 1322 self.assertEqual(event['event'], 'BLOCK_JOB_COMPLETED') 1323 self.assert_qmp(event, 'data/type', 'mirror') 1324 self.assert_qmp(event, 'data/offset', event['data']['len']) 1325 1326 def complete_and_wait(self, drive='drive0', wait_ready=True, 1327 completion_error=None): 1328 '''Complete a block job and wait for it to finish''' 1329 if wait_ready: 1330 self.wait_ready(drive=drive) 1331 1332 self.vm.cmd('block-job-complete', device=drive) 1333 1334 event = self.wait_until_completed(drive=drive, error=completion_error) 1335 self.assertTrue(event['data']['type'] in ['mirror', 'commit']) 1336 1337 def pause_wait(self, job_id='job0'): 1338 with Timeout(3, "Timeout waiting for job to pause"): 1339 while True: 1340 result = self.vm.qmp('query-block-jobs') 1341 found = False 1342 for job in result['return']: 1343 if job['device'] == job_id: 1344 found = True 1345 if job['paused'] and not job['busy']: 1346 return job 1347 break 1348 assert found 1349 1350 def pause_job(self, job_id='job0', wait=True): 1351 self.vm.cmd('block-job-pause', device=job_id) 1352 if wait: 1353 self.pause_wait(job_id) 1354 1355 def case_skip(self, reason): 1356 '''Skip this test case''' 1357 case_notrun(reason) 1358 self.skipTest(reason) 1359 1360 1361def notrun(reason): 1362 '''Skip this test suite''' 1363 # Each test in qemu-iotests has a number ("seq") 1364 seq = os.path.basename(sys.argv[0]) 1365 1366 with open('%s/%s.notrun' % (test_dir, seq), 'w', encoding='utf-8') \ 1367 as outfile: 1368 outfile.write(reason + '\n') 1369 logger.warning("%s not run: %s", seq, reason) 1370 sys.exit(0) 1371 1372def case_notrun(reason): 1373 '''Mark this test case as not having been run (without actually 1374 skipping it, that is left to the caller). See 1375 QMPTestCase.case_skip() for a variant that actually skips the 1376 current test case.''' 1377 1378 # Each test in qemu-iotests has a number ("seq") 1379 seq = os.path.basename(sys.argv[0]) 1380 1381 with open('%s/%s.casenotrun' % (test_dir, seq), 'a', encoding='utf-8') \ 1382 as outfile: 1383 outfile.write(' [case not run] ' + reason + '\n') 1384 1385def _verify_image_format(supported_fmts: Sequence[str] = (), 1386 unsupported_fmts: Sequence[str] = ()) -> None: 1387 if 'generic' in supported_fmts and \ 1388 os.environ.get('IMGFMT_GENERIC', 'true') == 'true': 1389 # similar to 1390 # _supported_fmt generic 1391 # for bash tests 1392 supported_fmts = () 1393 1394 not_sup = supported_fmts and (imgfmt not in supported_fmts) 1395 if not_sup or (imgfmt in unsupported_fmts): 1396 notrun('not suitable for this image format: %s' % imgfmt) 1397 1398 if imgfmt == 'luks': 1399 verify_working_luks() 1400 1401def _verify_protocol(supported: Sequence[str] = (), 1402 unsupported: Sequence[str] = ()) -> None: 1403 assert not (supported and unsupported) 1404 1405 if 'generic' in supported: 1406 return 1407 1408 not_sup = supported and (imgproto not in supported) 1409 if not_sup or (imgproto in unsupported): 1410 notrun('not suitable for this protocol: %s' % imgproto) 1411 1412def _verify_platform(supported: Sequence[str] = (), 1413 unsupported: Sequence[str] = ()) -> None: 1414 if any((sys.platform.startswith(x) for x in unsupported)): 1415 notrun('not suitable for this OS: %s' % sys.platform) 1416 1417 if supported: 1418 if not any((sys.platform.startswith(x) for x in supported)): 1419 notrun('not suitable for this OS: %s' % sys.platform) 1420 1421def _verify_cache_mode(supported_cache_modes: Sequence[str] = ()) -> None: 1422 if supported_cache_modes and (cachemode not in supported_cache_modes): 1423 notrun('not suitable for this cache mode: %s' % cachemode) 1424 1425def _verify_aio_mode(supported_aio_modes: Sequence[str] = ()) -> None: 1426 if supported_aio_modes and (aiomode not in supported_aio_modes): 1427 notrun('not suitable for this aio mode: %s' % aiomode) 1428 1429def _verify_formats(required_formats: Sequence[str] = ()) -> None: 1430 usf_list = list(set(required_formats) - set(supported_formats())) 1431 if usf_list: 1432 notrun(f'formats {usf_list} are not whitelisted') 1433 1434 1435def _verify_virtio_blk() -> None: 1436 out = qemu_pipe('-M', 'none', '-device', 'help') 1437 if 'virtio-blk' not in out: 1438 notrun('Missing virtio-blk in QEMU binary') 1439 1440def verify_virtio_scsi_pci_or_ccw() -> None: 1441 out = qemu_pipe('-M', 'none', '-device', 'help') 1442 if 'virtio-scsi-pci' not in out and 'virtio-scsi-ccw' not in out: 1443 notrun('Missing virtio-scsi-pci or virtio-scsi-ccw in QEMU binary') 1444 1445 1446def _verify_imgopts(unsupported: Sequence[str] = ()) -> None: 1447 imgopts = os.environ.get('IMGOPTS') 1448 # One of usage examples for IMGOPTS is "data_file=$TEST_IMG.ext_data_file" 1449 # but it supported only for bash tests. We don't have a concept of global 1450 # TEST_IMG in iotests.py, not saying about somehow parsing $variables. 1451 # So, for simplicity let's just not support any IMGOPTS with '$' inside. 1452 unsup = list(unsupported) + ['$'] 1453 if imgopts and any(x in imgopts for x in unsup): 1454 notrun(f'not suitable for this imgopts: {imgopts}') 1455 1456 1457def supports_quorum() -> bool: 1458 return 'quorum' in qemu_img('--help').stdout 1459 1460def verify_quorum(): 1461 '''Skip test suite if quorum support is not available''' 1462 if not supports_quorum(): 1463 notrun('quorum support missing') 1464 1465def has_working_luks() -> Tuple[bool, str]: 1466 """ 1467 Check whether our LUKS driver can actually create images 1468 (this extends to LUKS encryption for qcow2). 1469 1470 If not, return the reason why. 1471 """ 1472 1473 img_file = f'{test_dir}/luks-test.luks' 1474 res = qemu_img('create', '-f', 'luks', 1475 '--object', luks_default_secret_object, 1476 '-o', luks_default_key_secret_opt, 1477 '-o', 'iter-time=10', 1478 img_file, '1G', 1479 check=False) 1480 try: 1481 os.remove(img_file) 1482 except OSError: 1483 pass 1484 1485 if res.returncode: 1486 reason = res.stdout 1487 for line in res.stdout.splitlines(): 1488 if img_file + ':' in line: 1489 reason = line.split(img_file + ':', 1)[1].strip() 1490 break 1491 1492 return (False, reason) 1493 else: 1494 return (True, '') 1495 1496def verify_working_luks(): 1497 """ 1498 Skip test suite if LUKS does not work 1499 """ 1500 (working, reason) = has_working_luks() 1501 if not working: 1502 notrun(reason) 1503 1504def supports_qcow2_zstd_compression() -> bool: 1505 img_file = f'{test_dir}/qcow2-zstd-test.qcow2' 1506 res = qemu_img('create', '-f', 'qcow2', '-o', 'compression_type=zstd', 1507 img_file, '0', 1508 check=False) 1509 try: 1510 os.remove(img_file) 1511 except OSError: 1512 pass 1513 1514 if res.returncode == 1 and \ 1515 "'compression-type' does not accept value 'zstd'" in res.stdout: 1516 return False 1517 else: 1518 return True 1519 1520def verify_qcow2_zstd_compression(): 1521 if not supports_qcow2_zstd_compression(): 1522 notrun('zstd compression not supported') 1523 1524def qemu_pipe(*args: str) -> str: 1525 """ 1526 Run qemu with an option to print something and exit (e.g. a help option). 1527 1528 :return: QEMU's stdout output. 1529 """ 1530 full_args = [qemu_prog] + qemu_opts + list(args) 1531 output, _ = qemu_tool_pipe_and_status('qemu', full_args) 1532 return output 1533 1534def supported_formats(read_only=False): 1535 '''Set 'read_only' to True to check ro-whitelist 1536 Otherwise, rw-whitelist is checked''' 1537 1538 if not hasattr(supported_formats, "formats"): 1539 supported_formats.formats = {} 1540 1541 if read_only not in supported_formats.formats: 1542 format_message = qemu_pipe("-drive", "format=help") 1543 line = 1 if read_only else 0 1544 supported_formats.formats[read_only] = \ 1545 format_message.splitlines()[line].split(":")[1].split() 1546 1547 return supported_formats.formats[read_only] 1548 1549def skip_if_unsupported(required_formats=(), read_only=False): 1550 '''Skip Test Decorator 1551 Runs the test if all the required formats are whitelisted''' 1552 def skip_test_decorator(func): 1553 def func_wrapper(test_case: QMPTestCase, *args: List[Any], 1554 **kwargs: Dict[str, Any]) -> None: 1555 if callable(required_formats): 1556 fmts = required_formats(test_case) 1557 else: 1558 fmts = required_formats 1559 1560 usf_list = list(set(fmts) - set(supported_formats(read_only))) 1561 if usf_list: 1562 msg = f'{test_case}: formats {usf_list} are not whitelisted' 1563 test_case.case_skip(msg) 1564 else: 1565 func(test_case, *args, **kwargs) 1566 return func_wrapper 1567 return skip_test_decorator 1568 1569def skip_for_formats(formats: Sequence[str] = ()) \ 1570 -> Callable[[Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]], 1571 Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]]: 1572 '''Skip Test Decorator 1573 Skips the test for the given formats''' 1574 def skip_test_decorator(func): 1575 def func_wrapper(test_case: QMPTestCase, *args: List[Any], 1576 **kwargs: Dict[str, Any]) -> None: 1577 if imgfmt in formats: 1578 msg = f'{test_case}: Skipped for format {imgfmt}' 1579 test_case.case_skip(msg) 1580 else: 1581 func(test_case, *args, **kwargs) 1582 return func_wrapper 1583 return skip_test_decorator 1584 1585def skip_if_user_is_root(func): 1586 '''Skip Test Decorator 1587 Runs the test only without root permissions''' 1588 def func_wrapper(*args, **kwargs): 1589 if os.getuid() == 0: 1590 case_notrun('{}: cannot be run as root'.format(args[0])) 1591 return None 1592 else: 1593 return func(*args, **kwargs) 1594 return func_wrapper 1595 1596# We need to filter out the time taken from the output so that 1597# qemu-iotest can reliably diff the results against master output, 1598# and hide skipped tests from the reference output. 1599 1600class ReproducibleTestResult(unittest.TextTestResult): 1601 def addSkip(self, test, reason): 1602 # Same as TextTestResult, but print dot instead of "s" 1603 unittest.TestResult.addSkip(self, test, reason) 1604 if self.showAll: 1605 self.stream.writeln("skipped {0!r}".format(reason)) 1606 elif self.dots: 1607 self.stream.write(".") 1608 self.stream.flush() 1609 1610class ReproducibleStreamWrapper: 1611 def __init__(self, stream: TextIO): 1612 self.stream = stream 1613 1614 def __getattr__(self, attr): 1615 if attr in ('stream', '__getstate__'): 1616 raise AttributeError(attr) 1617 return getattr(self.stream, attr) 1618 1619 def write(self, arg=None): 1620 arg = re.sub(r'Ran (\d+) tests? in [\d.]+s', r'Ran \1 tests', arg) 1621 arg = re.sub(r' \(skipped=\d+\)', r'', arg) 1622 self.stream.write(arg) 1623 1624class ReproducibleTestRunner(unittest.TextTestRunner): 1625 def __init__( 1626 self, 1627 stream: Optional[TextIO] = None, 1628 resultclass: Type[unittest.TextTestResult] = 1629 ReproducibleTestResult, 1630 **kwargs: Any 1631 ) -> None: 1632 rstream = ReproducibleStreamWrapper(stream or sys.stdout) 1633 super().__init__(stream=rstream, # type: ignore 1634 descriptions=True, 1635 resultclass=resultclass, 1636 **kwargs) 1637 1638def execute_unittest(argv: List[str], debug: bool = False) -> None: 1639 """Executes unittests within the calling module.""" 1640 1641 # Some tests have warnings, especially ResourceWarnings for unclosed 1642 # files and sockets. Ignore them for now to ensure reproducibility of 1643 # the test output. 1644 unittest.main(argv=argv, 1645 testRunner=ReproducibleTestRunner, 1646 verbosity=2 if debug else 1, 1647 warnings=None if sys.warnoptions else 'ignore') 1648 1649def execute_setup_common(supported_fmts: Sequence[str] = (), 1650 supported_platforms: Sequence[str] = (), 1651 supported_cache_modes: Sequence[str] = (), 1652 supported_aio_modes: Sequence[str] = (), 1653 unsupported_fmts: Sequence[str] = (), 1654 supported_protocols: Sequence[str] = (), 1655 unsupported_protocols: Sequence[str] = (), 1656 required_fmts: Sequence[str] = (), 1657 unsupported_imgopts: Sequence[str] = ()) -> bool: 1658 """ 1659 Perform necessary setup for either script-style or unittest-style tests. 1660 1661 :return: Bool; Whether or not debug mode has been requested via the CLI. 1662 """ 1663 # Note: Python 3.6 and pylint do not like 'Collection' so use 'Sequence'. 1664 1665 debug = '-d' in sys.argv 1666 if debug: 1667 sys.argv.remove('-d') 1668 logging.basicConfig(level=(logging.DEBUG if debug else logging.WARN)) 1669 1670 _verify_image_format(supported_fmts, unsupported_fmts) 1671 _verify_protocol(supported_protocols, unsupported_protocols) 1672 _verify_platform(supported=supported_platforms) 1673 _verify_cache_mode(supported_cache_modes) 1674 _verify_aio_mode(supported_aio_modes) 1675 _verify_formats(required_fmts) 1676 _verify_virtio_blk() 1677 _verify_imgopts(unsupported_imgopts) 1678 1679 return debug 1680 1681def execute_test(*args, test_function=None, **kwargs): 1682 """Run either unittest or script-style tests.""" 1683 1684 debug = execute_setup_common(*args, **kwargs) 1685 if not test_function: 1686 execute_unittest(sys.argv, debug) 1687 else: 1688 test_function() 1689 1690def activate_logging(): 1691 """Activate iotests.log() output to stdout for script-style tests.""" 1692 handler = logging.StreamHandler(stream=sys.stdout) 1693 formatter = logging.Formatter('%(message)s') 1694 handler.setFormatter(formatter) 1695 test_logger.addHandler(handler) 1696 test_logger.setLevel(logging.INFO) 1697 test_logger.propagate = False 1698 1699# This is called from script-style iotests without a single point of entry 1700def script_initialize(*args, **kwargs): 1701 """Initialize script-style tests without running any tests.""" 1702 activate_logging() 1703 execute_setup_common(*args, **kwargs) 1704 1705# This is called from script-style iotests with a single point of entry 1706def script_main(test_function, *args, **kwargs): 1707 """Run script-style tests outside of the unittest framework""" 1708 activate_logging() 1709 execute_test(*args, test_function=test_function, **kwargs) 1710 1711# This is called from unittest style iotests 1712def main(*args, **kwargs): 1713 """Run tests using the unittest framework""" 1714 execute_test(*args, **kwargs) 1715