1# Common utilities and Python wrappers for qemu-iotests 2# 3# Copyright (C) 2012 IBM Corp. 4# 5# This program is free software; you can redistribute it and/or modify 6# it under the terms of the GNU General Public License as published by 7# the Free Software Foundation; either version 2 of the License, or 8# (at your option) any later version. 9# 10# This program is distributed in the hope that it will be useful, 11# but WITHOUT ANY WARRANTY; without even the implied warranty of 12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 13# GNU General Public License for more details. 14# 15# You should have received a copy of the GNU General Public License 16# along with this program. If not, see <http://www.gnu.org/licenses/>. 17# 18 19import argparse 20import atexit 21import bz2 22from collections import OrderedDict 23import faulthandler 24import json 25import logging 26import os 27import re 28import shutil 29import signal 30import struct 31import subprocess 32import sys 33import time 34from typing import (Any, Callable, Dict, Iterable, Iterator, 35 List, Optional, Sequence, TextIO, Tuple, Type, TypeVar) 36import unittest 37 38from contextlib import contextmanager 39 40from qemu.machine import qtest 41from qemu.qmp.legacy import QMPMessage, QMPReturnValue, QEMUMonitorProtocol 42from qemu.utils import VerboseProcessError 43 44# Use this logger for logging messages directly from the iotests module 45logger = logging.getLogger('qemu.iotests') 46logger.addHandler(logging.NullHandler()) 47 48# Use this logger for messages that ought to be used for diff output. 49test_logger = logging.getLogger('qemu.iotests.diff_io') 50 51 52faulthandler.enable() 53 54# This will not work if arguments contain spaces but is necessary if we 55# want to support the override options that ./check supports. 56qemu_img_args = [os.environ.get('QEMU_IMG_PROG', 'qemu-img')] 57if os.environ.get('QEMU_IMG_OPTIONS'): 58 qemu_img_args += os.environ['QEMU_IMG_OPTIONS'].strip().split(' ') 59 60qemu_io_args = [os.environ.get('QEMU_IO_PROG', 'qemu-io')] 61if os.environ.get('QEMU_IO_OPTIONS'): 62 qemu_io_args += os.environ['QEMU_IO_OPTIONS'].strip().split(' ') 63 64qemu_io_args_no_fmt = [os.environ.get('QEMU_IO_PROG', 'qemu-io')] 65if os.environ.get('QEMU_IO_OPTIONS_NO_FMT'): 66 qemu_io_args_no_fmt += \ 67 os.environ['QEMU_IO_OPTIONS_NO_FMT'].strip().split(' ') 68 69qemu_nbd_prog = os.environ.get('QEMU_NBD_PROG', 'qemu-nbd') 70qemu_nbd_args = [qemu_nbd_prog] 71if os.environ.get('QEMU_NBD_OPTIONS'): 72 qemu_nbd_args += os.environ['QEMU_NBD_OPTIONS'].strip().split(' ') 73 74qemu_prog = os.environ.get('QEMU_PROG', 'qemu') 75qemu_opts = os.environ.get('QEMU_OPTIONS', '').strip().split(' ') 76 77qsd_prog = os.environ.get('QSD_PROG', 'qemu-storage-daemon') 78 79gdb_qemu_env = os.environ.get('GDB_OPTIONS') 80qemu_gdb = [] 81if gdb_qemu_env: 82 qemu_gdb = ['gdbserver'] + gdb_qemu_env.strip().split(' ') 83 84qemu_print = os.environ.get('PRINT_QEMU', False) 85 86imgfmt = os.environ.get('IMGFMT', 'raw') 87imgproto = os.environ.get('IMGPROTO', 'file') 88 89try: 90 test_dir = os.environ['TEST_DIR'] 91 sock_dir = os.environ['SOCK_DIR'] 92 cachemode = os.environ['CACHEMODE'] 93 aiomode = os.environ['AIOMODE'] 94 qemu_default_machine = os.environ['QEMU_DEFAULT_MACHINE'] 95except KeyError: 96 # We are using these variables as proxies to indicate that we're 97 # not being run via "check". There may be other things set up by 98 # "check" that individual test cases rely on. 99 sys.stderr.write('Please run this test via the "check" script\n') 100 sys.exit(os.EX_USAGE) 101 102qemu_valgrind = [] 103if os.environ.get('VALGRIND_QEMU') == "y" and \ 104 os.environ.get('NO_VALGRIND') != "y": 105 valgrind_logfile = "--log-file=" + test_dir 106 # %p allows to put the valgrind process PID, since 107 # we don't know it a priori (subprocess.Popen is 108 # not yet invoked) 109 valgrind_logfile += "/%p.valgrind" 110 111 qemu_valgrind = ['valgrind', valgrind_logfile, '--error-exitcode=99'] 112 113luks_default_secret_object = 'secret,id=keysec0,data=' + \ 114 os.environ.get('IMGKEYSECRET', '') 115luks_default_key_secret_opt = 'key-secret=keysec0' 116 117sample_img_dir = os.environ['SAMPLE_IMG_DIR'] 118 119 120@contextmanager 121def change_log_level( 122 logger_name: str, level: int = logging.CRITICAL) -> Iterator[None]: 123 """ 124 Utility function for temporarily changing the log level of a logger. 125 126 This can be used to silence errors that are expected or uninteresting. 127 """ 128 _logger = logging.getLogger(logger_name) 129 current_level = _logger.level 130 _logger.setLevel(level) 131 132 try: 133 yield 134 finally: 135 _logger.setLevel(current_level) 136 137 138def unarchive_sample_image(sample, fname): 139 sample_fname = os.path.join(sample_img_dir, sample + '.bz2') 140 with bz2.open(sample_fname) as f_in, open(fname, 'wb') as f_out: 141 shutil.copyfileobj(f_in, f_out) 142 143 144def qemu_tool_popen(args: Sequence[str], 145 connect_stderr: bool = True) -> 'subprocess.Popen[str]': 146 stderr = subprocess.STDOUT if connect_stderr else None 147 # pylint: disable=consider-using-with 148 return subprocess.Popen(args, 149 stdout=subprocess.PIPE, 150 stderr=stderr, 151 universal_newlines=True) 152 153 154def qemu_tool_pipe_and_status(tool: str, args: Sequence[str], 155 connect_stderr: bool = True, 156 drop_successful_output: bool = False) \ 157 -> Tuple[str, int]: 158 """ 159 Run a tool and return both its output and its exit code 160 """ 161 with qemu_tool_popen(args, connect_stderr) as subp: 162 output = subp.communicate()[0] 163 if subp.returncode < 0: 164 cmd = ' '.join(args) 165 sys.stderr.write(f'{tool} received signal \ 166 {-subp.returncode}: {cmd}\n') 167 if drop_successful_output and subp.returncode == 0: 168 output = '' 169 return (output, subp.returncode) 170 171def qemu_img_create_prepare_args(args: List[str]) -> List[str]: 172 if not args or args[0] != 'create': 173 return list(args) 174 args = args[1:] 175 176 p = argparse.ArgumentParser(allow_abbrev=False) 177 # -o option may be specified several times 178 p.add_argument('-o', action='append', default=[]) 179 p.add_argument('-f') 180 parsed, remaining = p.parse_known_args(args) 181 182 opts_list = parsed.o 183 184 result = ['create'] 185 if parsed.f is not None: 186 result += ['-f', parsed.f] 187 188 # IMGOPTS most probably contain options specific for the selected format, 189 # like extended_l2 or compression_type for qcow2. Test may want to create 190 # additional images in other formats that doesn't support these options. 191 # So, use IMGOPTS only for images created in imgfmt format. 192 imgopts = os.environ.get('IMGOPTS') 193 if imgopts and parsed.f == imgfmt: 194 opts_list.insert(0, imgopts) 195 196 # default luks support 197 if parsed.f == 'luks' and \ 198 all('key-secret' not in opts for opts in opts_list): 199 result += ['--object', luks_default_secret_object] 200 opts_list.append(luks_default_key_secret_opt) 201 202 for opts in opts_list: 203 result += ['-o', opts] 204 205 result += remaining 206 207 return result 208 209 210def qemu_tool(*args: str, check: bool = True, combine_stdio: bool = True 211 ) -> 'subprocess.CompletedProcess[str]': 212 """ 213 Run a qemu tool and return its status code and console output. 214 215 :param args: full command line to run. 216 :param check: Enforce a return code of zero. 217 :param combine_stdio: set to False to keep stdout/stderr separated. 218 219 :raise VerboseProcessError: 220 When the return code is negative, or on any non-zero exit code 221 when 'check=True' was provided (the default). This exception has 222 'stdout', 'stderr', and 'returncode' properties that may be 223 inspected to show greater detail. If this exception is not 224 handled, the command-line, return code, and all console output 225 will be included at the bottom of the stack trace. 226 227 :return: 228 a CompletedProcess. This object has args, returncode, and stdout 229 properties. If streams are not combined, it will also have a 230 stderr property. 231 """ 232 subp = subprocess.run( 233 args, 234 stdout=subprocess.PIPE, 235 stderr=subprocess.STDOUT if combine_stdio else subprocess.PIPE, 236 universal_newlines=True, 237 check=False 238 ) 239 240 if check and subp.returncode or (subp.returncode < 0): 241 raise VerboseProcessError( 242 subp.returncode, args, 243 output=subp.stdout, 244 stderr=subp.stderr, 245 ) 246 247 return subp 248 249 250def qemu_img(*args: str, check: bool = True, combine_stdio: bool = True 251 ) -> 'subprocess.CompletedProcess[str]': 252 """ 253 Run QEMU_IMG_PROG and return its status code and console output. 254 255 This function always prepends QEMU_IMG_OPTIONS and may further alter 256 the args for 'create' commands. 257 258 See `qemu_tool()` for greater detail. 259 """ 260 full_args = qemu_img_args + qemu_img_create_prepare_args(list(args)) 261 return qemu_tool(*full_args, check=check, combine_stdio=combine_stdio) 262 263 264def ordered_qmp(qmsg, conv_keys=True): 265 # Dictionaries are not ordered prior to 3.6, therefore: 266 if isinstance(qmsg, list): 267 return [ordered_qmp(atom) for atom in qmsg] 268 if isinstance(qmsg, dict): 269 od = OrderedDict() 270 for k, v in sorted(qmsg.items()): 271 if conv_keys: 272 k = k.replace('_', '-') 273 od[k] = ordered_qmp(v, conv_keys=False) 274 return od 275 return qmsg 276 277def qemu_img_create(*args: str) -> 'subprocess.CompletedProcess[str]': 278 return qemu_img('create', *args) 279 280def qemu_img_json(*args: str) -> Any: 281 """ 282 Run qemu-img and return its output as deserialized JSON. 283 284 :raise CalledProcessError: 285 When qemu-img crashes, or returns a non-zero exit code without 286 producing a valid JSON document to stdout. 287 :raise JSONDecoderError: 288 When qemu-img returns 0, but failed to produce a valid JSON document. 289 290 :return: A deserialized JSON object; probably a dict[str, Any]. 291 """ 292 try: 293 res = qemu_img(*args, combine_stdio=False) 294 except subprocess.CalledProcessError as exc: 295 # Terminated due to signal. Don't bother. 296 if exc.returncode < 0: 297 raise 298 299 # Commands like 'check' can return failure (exit codes 2 and 3) 300 # to indicate command completion, but with errors found. For 301 # multi-command flexibility, ignore the exact error codes and 302 # *try* to load JSON. 303 try: 304 return json.loads(exc.stdout) 305 except json.JSONDecodeError: 306 # Nope. This thing is toast. Raise the /process/ error. 307 pass 308 raise 309 310 return json.loads(res.stdout) 311 312def qemu_img_measure(*args: str) -> Any: 313 return qemu_img_json("measure", "--output", "json", *args) 314 315def qemu_img_check(*args: str) -> Any: 316 return qemu_img_json("check", "--output", "json", *args) 317 318def qemu_img_info(*args: str) -> Any: 319 return qemu_img_json('info', "--output", "json", *args) 320 321def qemu_img_map(*args: str) -> Any: 322 return qemu_img_json('map', "--output", "json", *args) 323 324def qemu_img_log(*args: str, check: bool = True 325 ) -> 'subprocess.CompletedProcess[str]': 326 result = qemu_img(*args, check=check) 327 log(result.stdout, filters=[filter_testfiles]) 328 return result 329 330def img_info_log(filename: str, filter_path: Optional[str] = None, 331 use_image_opts: bool = False, extra_args: Sequence[str] = (), 332 check: bool = True, drop_child_info: bool = True, 333 ) -> None: 334 args = ['info'] 335 if use_image_opts: 336 args.append('--image-opts') 337 else: 338 args += ['-f', imgfmt] 339 args += extra_args 340 args.append(filename) 341 342 output = qemu_img(*args, check=check).stdout 343 if not filter_path: 344 filter_path = filename 345 log(filter_img_info(output, filter_path, drop_child_info)) 346 347def qemu_io_wrap_args(args: Sequence[str]) -> List[str]: 348 if '-f' in args or '--image-opts' in args: 349 return qemu_io_args_no_fmt + list(args) 350 else: 351 return qemu_io_args + list(args) 352 353def qemu_io_popen(*args): 354 return qemu_tool_popen(qemu_io_wrap_args(args)) 355 356def qemu_io(*args: str, check: bool = True, combine_stdio: bool = True 357 ) -> 'subprocess.CompletedProcess[str]': 358 """ 359 Run QEMU_IO_PROG and return the status code and console output. 360 361 This function always prepends either QEMU_IO_OPTIONS or 362 QEMU_IO_OPTIONS_NO_FMT. 363 """ 364 return qemu_tool(*qemu_io_wrap_args(args), 365 check=check, combine_stdio=combine_stdio) 366 367def qemu_io_log(*args: str, check: bool = True 368 ) -> 'subprocess.CompletedProcess[str]': 369 result = qemu_io(*args, check=check) 370 log(result.stdout, filters=[filter_testfiles, filter_qemu_io]) 371 return result 372 373class QemuIoInteractive: 374 def __init__(self, *args): 375 self.args = qemu_io_wrap_args(args) 376 # We need to keep the Popen objext around, and not 377 # close it immediately. Therefore, disable the pylint check: 378 # pylint: disable=consider-using-with 379 self._p = subprocess.Popen(self.args, stdin=subprocess.PIPE, 380 stdout=subprocess.PIPE, 381 stderr=subprocess.STDOUT, 382 universal_newlines=True) 383 out = self._p.stdout.read(9) 384 if out != 'qemu-io> ': 385 # Most probably qemu-io just failed to start. 386 # Let's collect the whole output and exit. 387 out += self._p.stdout.read() 388 self._p.wait(timeout=1) 389 raise ValueError(out) 390 391 def close(self): 392 self._p.communicate('q\n') 393 394 def _read_output(self): 395 pattern = 'qemu-io> ' 396 n = len(pattern) 397 pos = 0 398 s = [] 399 while pos != n: 400 c = self._p.stdout.read(1) 401 # check unexpected EOF 402 assert c != '' 403 s.append(c) 404 if c == pattern[pos]: 405 pos += 1 406 else: 407 pos = 0 408 409 return ''.join(s[:-n]) 410 411 def cmd(self, cmd): 412 # quit command is in close(), '\n' is added automatically 413 assert '\n' not in cmd 414 cmd = cmd.strip() 415 assert cmd not in ('q', 'quit') 416 self._p.stdin.write(cmd + '\n') 417 self._p.stdin.flush() 418 return self._read_output() 419 420 421class QemuStorageDaemon: 422 _qmp: Optional[QEMUMonitorProtocol] = None 423 _qmpsock: Optional[str] = None 424 # Python < 3.8 would complain if this type were not a string literal 425 # (importing `annotations` from `__future__` would work; but not on <= 3.6) 426 _p: 'Optional[subprocess.Popen[bytes]]' = None 427 428 def __init__(self, *args: str, instance_id: str = 'a', qmp: bool = False): 429 assert '--pidfile' not in args 430 self.pidfile = os.path.join(test_dir, f'qsd-{instance_id}-pid') 431 all_args = [qsd_prog] + list(args) + ['--pidfile', self.pidfile] 432 433 if qmp: 434 self._qmpsock = os.path.join(sock_dir, f'qsd-{instance_id}.sock') 435 all_args += ['--chardev', 436 f'socket,id=qmp-sock,path={self._qmpsock}', 437 '--monitor', 'qmp-sock'] 438 439 self._qmp = QEMUMonitorProtocol(self._qmpsock, server=True) 440 441 # Cannot use with here, we want the subprocess to stay around 442 # pylint: disable=consider-using-with 443 self._p = subprocess.Popen(all_args) 444 if self._qmp is not None: 445 self._qmp.accept() 446 while not os.path.exists(self.pidfile): 447 if self._p.poll() is not None: 448 cmd = ' '.join(all_args) 449 raise RuntimeError( 450 'qemu-storage-daemon terminated with exit code ' + 451 f'{self._p.returncode}: {cmd}') 452 453 time.sleep(0.01) 454 455 with open(self.pidfile, encoding='utf-8') as f: 456 self._pid = int(f.read().strip()) 457 458 assert self._pid == self._p.pid 459 460 def qmp(self, cmd: str, args: Optional[Dict[str, object]] = None) \ 461 -> QMPMessage: 462 assert self._qmp is not None 463 return self._qmp.cmd_raw(cmd, args) 464 465 def get_qmp(self) -> QEMUMonitorProtocol: 466 assert self._qmp is not None 467 return self._qmp 468 469 def cmd(self, cmd: str, args: Optional[Dict[str, object]] = None) \ 470 -> QMPReturnValue: 471 assert self._qmp is not None 472 return self._qmp.cmd(cmd, **(args or {})) 473 474 def stop(self, kill_signal=15): 475 self._p.send_signal(kill_signal) 476 self._p.wait() 477 self._p = None 478 479 if self._qmp: 480 self._qmp.close() 481 482 if self._qmpsock is not None: 483 try: 484 os.remove(self._qmpsock) 485 except OSError: 486 pass 487 try: 488 os.remove(self.pidfile) 489 except OSError: 490 pass 491 492 def __del__(self): 493 if self._p is not None: 494 self.stop(kill_signal=9) 495 496 497def qemu_nbd(*args): 498 '''Run qemu-nbd in daemon mode and return the parent's exit code''' 499 return subprocess.call(qemu_nbd_args + ['--fork'] + list(args)) 500 501def qemu_nbd_early_pipe(*args: str) -> Tuple[int, str]: 502 '''Run qemu-nbd in daemon mode and return both the parent's exit code 503 and its output in case of an error''' 504 full_args = qemu_nbd_args + ['--fork'] + list(args) 505 output, returncode = qemu_tool_pipe_and_status('qemu-nbd', full_args, 506 connect_stderr=False) 507 return returncode, output if returncode else '' 508 509def qemu_nbd_list_log(*args: str) -> str: 510 '''Run qemu-nbd to list remote exports''' 511 full_args = [qemu_nbd_prog, '-L'] + list(args) 512 output, _ = qemu_tool_pipe_and_status('qemu-nbd', full_args) 513 log(output, filters=[filter_testfiles, filter_nbd_exports]) 514 return output 515 516@contextmanager 517def qemu_nbd_popen(*args): 518 '''Context manager running qemu-nbd within the context''' 519 pid_file = file_path("qemu_nbd_popen-nbd-pid-file") 520 521 assert not os.path.exists(pid_file) 522 523 cmd = list(qemu_nbd_args) 524 cmd.extend(('--persistent', '--pid-file', pid_file)) 525 cmd.extend(args) 526 527 log('Start NBD server') 528 with subprocess.Popen(cmd) as p: 529 try: 530 while not os.path.exists(pid_file): 531 if p.poll() is not None: 532 raise RuntimeError( 533 "qemu-nbd terminated with exit code {}: {}" 534 .format(p.returncode, ' '.join(cmd))) 535 536 time.sleep(0.01) 537 yield 538 finally: 539 if os.path.exists(pid_file): 540 os.remove(pid_file) 541 log('Kill NBD server') 542 p.kill() 543 p.wait() 544 545def compare_images(img1: str, img2: str, 546 fmt1: str = imgfmt, fmt2: str = imgfmt) -> bool: 547 """ 548 Compare two images with QEMU_IMG; return True if they are identical. 549 550 :raise CalledProcessError: 551 when qemu-img crashes or returns a status code of anything other 552 than 0 (identical) or 1 (different). 553 """ 554 try: 555 qemu_img('compare', '-f', fmt1, '-F', fmt2, img1, img2) 556 return True 557 except subprocess.CalledProcessError as exc: 558 if exc.returncode == 1: 559 return False 560 raise 561 562def create_image(name, size): 563 '''Create a fully-allocated raw image with sector markers''' 564 with open(name, 'wb') as file: 565 i = 0 566 while i < size: 567 sector = struct.pack('>l504xl', i // 512, i // 512) 568 file.write(sector) 569 i = i + 512 570 571def image_size(img: str) -> int: 572 """Return image's virtual size""" 573 value = qemu_img_info('-f', imgfmt, img)['virtual-size'] 574 if not isinstance(value, int): 575 type_name = type(value).__name__ 576 raise TypeError("Expected 'int' for 'virtual-size', " 577 f"got '{value}' of type '{type_name}'") 578 return value 579 580def is_str(val): 581 return isinstance(val, str) 582 583test_dir_re = re.compile(r"%s" % test_dir) 584def filter_test_dir(msg): 585 return test_dir_re.sub("TEST_DIR", msg) 586 587win32_re = re.compile(r"\r") 588def filter_win32(msg): 589 return win32_re.sub("", msg) 590 591qemu_io_re = re.compile(r"[0-9]* ops; [0-9\/:. sec]* " 592 r"\([0-9\/.inf]* [EPTGMKiBbytes]*\/sec " 593 r"and [0-9\/.inf]* ops\/sec\)") 594def filter_qemu_io(msg): 595 msg = filter_win32(msg) 596 return qemu_io_re.sub("X ops; XX:XX:XX.X " 597 "(XXX YYY/sec and XXX ops/sec)", msg) 598 599chown_re = re.compile(r"chown [0-9]+:[0-9]+") 600def filter_chown(msg): 601 return chown_re.sub("chown UID:GID", msg) 602 603def filter_qmp_event(event): 604 '''Filter the timestamp of a QMP event dict''' 605 event = dict(event) 606 if 'timestamp' in event: 607 event['timestamp']['seconds'] = 'SECS' 608 event['timestamp']['microseconds'] = 'USECS' 609 return event 610 611def filter_block_job(event): 612 '''Filter the offset and length of a QMP block job event dict''' 613 event = dict(event) 614 if 'data' in event: 615 if 'offset' in event['data']: 616 event['data']['offset'] = 'OFFSET' 617 if 'len' in event['data']: 618 event['data']['len'] = 'LEN' 619 return event 620 621def filter_qmp(qmsg, filter_fn): 622 '''Given a string filter, filter a QMP object's values. 623 filter_fn takes a (key, value) pair.''' 624 # Iterate through either lists or dicts; 625 if isinstance(qmsg, list): 626 items = enumerate(qmsg) 627 elif isinstance(qmsg, dict): 628 items = qmsg.items() 629 else: 630 return filter_fn(None, qmsg) 631 632 for k, v in items: 633 if isinstance(v, (dict, list)): 634 qmsg[k] = filter_qmp(v, filter_fn) 635 else: 636 qmsg[k] = filter_fn(k, v) 637 return qmsg 638 639def filter_testfiles(msg): 640 pref1 = os.path.join(test_dir, "%s-" % (os.getpid())) 641 pref2 = os.path.join(sock_dir, "%s-" % (os.getpid())) 642 return msg.replace(pref1, 'TEST_DIR/PID-').replace(pref2, 'SOCK_DIR/PID-') 643 644def filter_qmp_testfiles(qmsg): 645 def _filter(_key, value): 646 if is_str(value): 647 return filter_testfiles(value) 648 return value 649 return filter_qmp(qmsg, _filter) 650 651def filter_virtio_scsi(output: str) -> str: 652 return re.sub(r'(virtio-scsi)-(ccw|pci)', r'\1', output) 653 654def filter_qmp_virtio_scsi(qmsg): 655 def _filter(_key, value): 656 if is_str(value): 657 return filter_virtio_scsi(value) 658 return value 659 return filter_qmp(qmsg, _filter) 660 661def filter_generated_node_ids(msg): 662 return re.sub("#block[0-9]+", "NODE_NAME", msg) 663 664def filter_qmp_generated_node_ids(qmsg): 665 def _filter(_key, value): 666 if is_str(value): 667 return filter_generated_node_ids(value) 668 return value 669 return filter_qmp(qmsg, _filter) 670 671def filter_img_info(output: str, filename: str, 672 drop_child_info: bool = True) -> str: 673 lines = [] 674 drop_indented = False 675 for line in output.split('\n'): 676 if 'disk size' in line or 'actual-size' in line: 677 continue 678 679 # Drop child node info 680 if drop_indented: 681 if line.startswith(' '): 682 continue 683 drop_indented = False 684 if drop_child_info and "Child node '/" in line: 685 drop_indented = True 686 continue 687 688 line = line.replace(filename, 'TEST_IMG') 689 line = filter_testfiles(line) 690 line = line.replace(imgfmt, 'IMGFMT') 691 line = re.sub('iters: [0-9]+', 'iters: XXX', line) 692 line = re.sub('uuid: [-a-f0-9]+', 693 'uuid: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX', 694 line) 695 line = re.sub('cid: [0-9]+', 'cid: XXXXXXXXXX', line) 696 line = re.sub('(compression type: )(zlib|zstd)', r'\1COMPRESSION_TYPE', 697 line) 698 lines.append(line) 699 return '\n'.join(lines) 700 701def filter_imgfmt(msg): 702 return msg.replace(imgfmt, 'IMGFMT') 703 704def filter_qmp_imgfmt(qmsg): 705 def _filter(_key, value): 706 if is_str(value): 707 return filter_imgfmt(value) 708 return value 709 return filter_qmp(qmsg, _filter) 710 711def filter_nbd_exports(output: str) -> str: 712 return re.sub(r'((min|opt|max) block): [0-9]+', r'\1: XXX', output) 713 714def filter_qtest(output: str) -> str: 715 output = re.sub(r'^\[I \d+\.\d+\] OPENED\n', '', output) 716 output = re.sub(r'\n?\[I \+\d+\.\d+\] CLOSED\n?$', '', output) 717 return output 718 719Msg = TypeVar('Msg', Dict[str, Any], List[Any], str) 720 721def log(msg: Msg, 722 filters: Iterable[Callable[[Msg], Msg]] = (), 723 indent: Optional[int] = None) -> None: 724 """ 725 Logs either a string message or a JSON serializable message (like QMP). 726 If indent is provided, JSON serializable messages are pretty-printed. 727 """ 728 for flt in filters: 729 msg = flt(msg) 730 if isinstance(msg, (dict, list)): 731 # Don't sort if it's already sorted 732 do_sort = not isinstance(msg, OrderedDict) 733 test_logger.info(json.dumps(msg, sort_keys=do_sort, indent=indent)) 734 else: 735 test_logger.info(msg) 736 737class Timeout: 738 def __init__(self, seconds, errmsg="Timeout"): 739 self.seconds = seconds 740 self.errmsg = errmsg 741 def __enter__(self): 742 if qemu_gdb or qemu_valgrind: 743 return self 744 signal.signal(signal.SIGALRM, self.timeout) 745 signal.setitimer(signal.ITIMER_REAL, self.seconds) 746 return self 747 def __exit__(self, exc_type, value, traceback): 748 if qemu_gdb or qemu_valgrind: 749 return False 750 signal.setitimer(signal.ITIMER_REAL, 0) 751 return False 752 def timeout(self, signum, frame): 753 raise TimeoutError(self.errmsg) 754 755def file_pattern(name): 756 return "{0}-{1}".format(os.getpid(), name) 757 758class FilePath: 759 """ 760 Context manager generating multiple file names. The generated files are 761 removed when exiting the context. 762 763 Example usage: 764 765 with FilePath('a.img', 'b.img') as (img_a, img_b): 766 # Use img_a and img_b here... 767 768 # a.img and b.img are automatically removed here. 769 770 By default images are created in iotests.test_dir. To create sockets use 771 iotests.sock_dir: 772 773 with FilePath('a.sock', base_dir=iotests.sock_dir) as sock: 774 775 For convenience, calling with one argument yields a single file instead of 776 a tuple with one item. 777 778 """ 779 def __init__(self, *names, base_dir=test_dir): 780 self.paths = [os.path.join(base_dir, file_pattern(name)) 781 for name in names] 782 783 def __enter__(self): 784 if len(self.paths) == 1: 785 return self.paths[0] 786 else: 787 return self.paths 788 789 def __exit__(self, exc_type, exc_val, exc_tb): 790 for path in self.paths: 791 try: 792 os.remove(path) 793 except OSError: 794 pass 795 return False 796 797 798def try_remove(img): 799 try: 800 os.remove(img) 801 except OSError: 802 pass 803 804def file_path_remover(): 805 for path in reversed(file_path_remover.paths): 806 try_remove(path) 807 808 809def file_path(*names, base_dir=test_dir): 810 ''' Another way to get auto-generated filename that cleans itself up. 811 812 Use is as simple as: 813 814 img_a, img_b = file_path('a.img', 'b.img') 815 sock = file_path('socket') 816 ''' 817 818 if not hasattr(file_path_remover, 'paths'): 819 file_path_remover.paths = [] 820 atexit.register(file_path_remover) 821 822 paths = [] 823 for name in names: 824 filename = file_pattern(name) 825 path = os.path.join(base_dir, filename) 826 file_path_remover.paths.append(path) 827 paths.append(path) 828 829 return paths[0] if len(paths) == 1 else paths 830 831def remote_filename(path): 832 if imgproto == 'file': 833 return path 834 elif imgproto == 'ssh': 835 return "ssh://%s@127.0.0.1:22%s" % (os.environ.get('USER'), path) 836 else: 837 raise ValueError("Protocol %s not supported" % (imgproto)) 838 839class VM(qtest.QEMUQtestMachine): 840 '''A QEMU VM''' 841 842 def __init__(self, path_suffix=''): 843 name = "qemu%s-%d" % (path_suffix, os.getpid()) 844 timer = 15.0 if not (qemu_gdb or qemu_valgrind) else None 845 if qemu_gdb and qemu_valgrind: 846 sys.stderr.write('gdb and valgrind are mutually exclusive\n') 847 sys.exit(1) 848 wrapper = qemu_gdb if qemu_gdb else qemu_valgrind 849 super().__init__(qemu_prog, qemu_opts, wrapper=wrapper, 850 name=name, 851 base_temp_dir=test_dir, 852 qmp_timer=timer) 853 self._num_drives = 0 854 855 def _post_shutdown(self) -> None: 856 super()._post_shutdown() 857 if not qemu_valgrind or not self._popen: 858 return 859 valgrind_filename = f"{test_dir}/{self._popen.pid}.valgrind" 860 if self.exitcode() == 99: 861 with open(valgrind_filename, encoding='utf-8') as f: 862 print(f.read()) 863 else: 864 os.remove(valgrind_filename) 865 866 def _pre_launch(self) -> None: 867 super()._pre_launch() 868 if qemu_print: 869 # set QEMU binary output to stdout 870 self._close_qemu_log_file() 871 872 def add_object(self, opts): 873 self._args.append('-object') 874 self._args.append(opts) 875 return self 876 877 def add_device(self, opts): 878 self._args.append('-device') 879 self._args.append(opts) 880 return self 881 882 def add_drive_raw(self, opts): 883 self._args.append('-drive') 884 self._args.append(opts) 885 return self 886 887 def add_drive(self, path, opts='', interface='virtio', img_format=imgfmt): 888 '''Add a virtio-blk drive to the VM''' 889 options = ['if=%s' % interface, 890 'id=drive%d' % self._num_drives] 891 892 if path is not None: 893 options.append('file=%s' % path) 894 options.append('format=%s' % img_format) 895 options.append('cache=%s' % cachemode) 896 options.append('aio=%s' % aiomode) 897 898 if opts: 899 options.append(opts) 900 901 if img_format == 'luks' and 'key-secret' not in opts: 902 # default luks support 903 if luks_default_secret_object not in self._args: 904 self.add_object(luks_default_secret_object) 905 906 options.append(luks_default_key_secret_opt) 907 908 self._args.append('-drive') 909 self._args.append(','.join(options)) 910 self._num_drives += 1 911 return self 912 913 def add_blockdev(self, opts): 914 self._args.append('-blockdev') 915 if isinstance(opts, str): 916 self._args.append(opts) 917 else: 918 self._args.append(','.join(opts)) 919 return self 920 921 def add_incoming(self, addr): 922 self._args.append('-incoming') 923 self._args.append(addr) 924 return self 925 926 def add_paused(self): 927 self._args.append('-S') 928 return self 929 930 def hmp(self, command_line: str, use_log: bool = False) -> QMPMessage: 931 cmd = 'human-monitor-command' 932 kwargs: Dict[str, Any] = {'command-line': command_line} 933 if use_log: 934 return self.qmp_log(cmd, **kwargs) 935 else: 936 return self.qmp(cmd, **kwargs) 937 938 def pause_drive(self, drive: str, event: Optional[str] = None) -> None: 939 """Pause drive r/w operations""" 940 if not event: 941 self.pause_drive(drive, "read_aio") 942 self.pause_drive(drive, "write_aio") 943 return 944 self.hmp(f'qemu-io {drive} "break {event} bp_{drive}"') 945 946 def resume_drive(self, drive: str) -> None: 947 """Resume drive r/w operations""" 948 self.hmp(f'qemu-io {drive} "remove_break bp_{drive}"') 949 950 def hmp_qemu_io(self, drive: str, cmd: str, 951 use_log: bool = False, qdev: bool = False) -> QMPMessage: 952 """Write to a given drive using an HMP command""" 953 d = '-d ' if qdev else '' 954 return self.hmp(f'qemu-io {d}{drive} "{cmd}"', use_log=use_log) 955 956 def flatten_qmp_object(self, obj, output=None, basestr=''): 957 if output is None: 958 output = {} 959 if isinstance(obj, list): 960 for i, item in enumerate(obj): 961 self.flatten_qmp_object(item, output, basestr + str(i) + '.') 962 elif isinstance(obj, dict): 963 for key in obj: 964 self.flatten_qmp_object(obj[key], output, basestr + key + '.') 965 else: 966 output[basestr[:-1]] = obj # Strip trailing '.' 967 return output 968 969 def qmp_to_opts(self, obj): 970 obj = self.flatten_qmp_object(obj) 971 output_list = [] 972 for key in obj: 973 output_list += [key + '=' + obj[key]] 974 return ','.join(output_list) 975 976 def get_qmp_events_filtered(self, wait=60.0): 977 result = [] 978 for ev in self.get_qmp_events(wait=wait): 979 result.append(filter_qmp_event(ev)) 980 return result 981 982 def qmp_log(self, cmd, filters=(), indent=None, **kwargs): 983 full_cmd = OrderedDict(( 984 ("execute", cmd), 985 ("arguments", ordered_qmp(kwargs)) 986 )) 987 log(full_cmd, filters, indent=indent) 988 result = self.qmp(cmd, **kwargs) 989 log(result, filters, indent=indent) 990 return result 991 992 # Returns None on success, and an error string on failure 993 def run_job(self, job: str, auto_finalize: bool = True, 994 auto_dismiss: bool = False, 995 pre_finalize: Optional[Callable[[], None]] = None, 996 cancel: bool = False, wait: float = 60.0, 997 filters: Iterable[Callable[[Any], Any]] = (), 998 ) -> Optional[str]: 999 """ 1000 run_job moves a job from creation through to dismissal. 1001 1002 :param job: String. ID of recently-launched job 1003 :param auto_finalize: Bool. True if the job was launched with 1004 auto_finalize. Defaults to True. 1005 :param auto_dismiss: Bool. True if the job was launched with 1006 auto_dismiss=True. Defaults to False. 1007 :param pre_finalize: Callback. A callable that takes no arguments to be 1008 invoked prior to issuing job-finalize, if any. 1009 :param cancel: Bool. When true, cancels the job after the pre_finalize 1010 callback. 1011 :param wait: Float. Timeout value specifying how long to wait for any 1012 event, in seconds. Defaults to 60.0. 1013 """ 1014 match_device = {'data': {'device': job}} 1015 match_id = {'data': {'id': job}} 1016 events = [ 1017 ('BLOCK_JOB_COMPLETED', match_device), 1018 ('BLOCK_JOB_CANCELLED', match_device), 1019 ('BLOCK_JOB_ERROR', match_device), 1020 ('BLOCK_JOB_READY', match_device), 1021 ('BLOCK_JOB_PENDING', match_id), 1022 ('JOB_STATUS_CHANGE', match_id) 1023 ] 1024 error = None 1025 while True: 1026 ev = filter_qmp_event(self.events_wait(events, timeout=wait)) 1027 if ev['event'] != 'JOB_STATUS_CHANGE': 1028 log(ev, filters=filters) 1029 continue 1030 status = ev['data']['status'] 1031 if status == 'aborting': 1032 result = self.qmp('query-jobs') 1033 for j in result['return']: 1034 if j['id'] == job: 1035 error = j['error'] 1036 log('Job failed: %s' % (j['error']), filters=filters) 1037 elif status == 'ready': 1038 self.qmp_log('job-complete', id=job, filters=filters) 1039 elif status == 'pending' and not auto_finalize: 1040 if pre_finalize: 1041 pre_finalize() 1042 if cancel: 1043 self.qmp_log('job-cancel', id=job, filters=filters) 1044 else: 1045 self.qmp_log('job-finalize', id=job, filters=filters) 1046 elif status == 'concluded' and not auto_dismiss: 1047 self.qmp_log('job-dismiss', id=job, filters=filters) 1048 elif status == 'null': 1049 return error 1050 1051 # Returns None on success, and an error string on failure 1052 def blockdev_create(self, options, job_id='job0', filters=None): 1053 if filters is None: 1054 filters = [filter_qmp_testfiles] 1055 result = self.qmp_log('blockdev-create', filters=filters, 1056 job_id=job_id, options=options) 1057 1058 if 'return' in result: 1059 assert result['return'] == {} 1060 job_result = self.run_job(job_id, filters=filters) 1061 else: 1062 job_result = result['error'] 1063 1064 log("") 1065 return job_result 1066 1067 def enable_migration_events(self, name): 1068 log('Enabling migration QMP events on %s...' % name) 1069 log(self.qmp('migrate-set-capabilities', capabilities=[ 1070 { 1071 'capability': 'events', 1072 'state': True 1073 } 1074 ])) 1075 1076 def wait_migration(self, expect_runstate: Optional[str]) -> bool: 1077 while True: 1078 event = self.event_wait('MIGRATION') 1079 # We use the default timeout, and with a timeout, event_wait() 1080 # never returns None 1081 assert event 1082 1083 log(event, filters=[filter_qmp_event]) 1084 if event['data']['status'] in ('completed', 'failed'): 1085 break 1086 1087 if event['data']['status'] == 'completed': 1088 # The event may occur in finish-migrate, so wait for the expected 1089 # post-migration runstate 1090 runstate = None 1091 while runstate != expect_runstate: 1092 runstate = self.qmp('query-status')['return']['status'] 1093 return True 1094 else: 1095 return False 1096 1097 def node_info(self, node_name): 1098 nodes = self.qmp('query-named-block-nodes') 1099 for x in nodes['return']: 1100 if x['node-name'] == node_name: 1101 return x 1102 return None 1103 1104 def query_bitmaps(self): 1105 res = self.qmp("query-named-block-nodes") 1106 return {device['node-name']: device['dirty-bitmaps'] 1107 for device in res['return'] if 'dirty-bitmaps' in device} 1108 1109 def get_bitmap(self, node_name, bitmap_name, recording=None, bitmaps=None): 1110 """ 1111 get a specific bitmap from the object returned by query_bitmaps. 1112 :param recording: If specified, filter results by the specified value. 1113 :param bitmaps: If specified, use it instead of call query_bitmaps() 1114 """ 1115 if bitmaps is None: 1116 bitmaps = self.query_bitmaps() 1117 1118 for bitmap in bitmaps[node_name]: 1119 if bitmap.get('name', '') == bitmap_name: 1120 if recording is None or bitmap.get('recording') == recording: 1121 return bitmap 1122 return None 1123 1124 def check_bitmap_status(self, node_name, bitmap_name, fields): 1125 ret = self.get_bitmap(node_name, bitmap_name) 1126 1127 return fields.items() <= ret.items() 1128 1129 def assert_block_path(self, root, path, expected_node, graph=None): 1130 """ 1131 Check whether the node under the given path in the block graph 1132 is @expected_node. 1133 1134 @root is the node name of the node where the @path is rooted. 1135 1136 @path is a string that consists of child names separated by 1137 slashes. It must begin with a slash. 1138 1139 Examples for @root + @path: 1140 - root="qcow2-node", path="/backing/file" 1141 - root="quorum-node", path="/children.2/file" 1142 1143 Hypothetically, @path could be empty, in which case it would 1144 point to @root. However, in practice this case is not useful 1145 and hence not allowed. 1146 1147 @expected_node may be None. (All elements of the path but the 1148 leaf must still exist.) 1149 1150 @graph may be None or the result of an x-debug-query-block-graph 1151 call that has already been performed. 1152 """ 1153 if graph is None: 1154 graph = self.qmp('x-debug-query-block-graph')['return'] 1155 1156 iter_path = iter(path.split('/')) 1157 1158 # Must start with a / 1159 assert next(iter_path) == '' 1160 1161 node = next((node for node in graph['nodes'] if node['name'] == root), 1162 None) 1163 1164 # An empty @path is not allowed, so the root node must be present 1165 assert node is not None, 'Root node %s not found' % root 1166 1167 for child_name in iter_path: 1168 assert node is not None, 'Cannot follow path %s%s' % (root, path) 1169 1170 try: 1171 node_id = next(edge['child'] for edge in graph['edges'] 1172 if (edge['parent'] == node['id'] and 1173 edge['name'] == child_name)) 1174 1175 node = next(node for node in graph['nodes'] 1176 if node['id'] == node_id) 1177 1178 except StopIteration: 1179 node = None 1180 1181 if node is None: 1182 assert expected_node is None, \ 1183 'No node found under %s (but expected %s)' % \ 1184 (path, expected_node) 1185 else: 1186 assert node['name'] == expected_node, \ 1187 'Found node %s under %s (but expected %s)' % \ 1188 (node['name'], path, expected_node) 1189 1190index_re = re.compile(r'([^\[]+)\[([^\]]+)\]') 1191 1192class QMPTestCase(unittest.TestCase): 1193 '''Abstract base class for QMP test cases''' 1194 1195 def __init__(self, *args, **kwargs): 1196 super().__init__(*args, **kwargs) 1197 # Many users of this class set a VM property we rely on heavily 1198 # in the methods below. 1199 self.vm = None 1200 1201 def dictpath(self, d, path): 1202 '''Traverse a path in a nested dict''' 1203 for component in path.split('/'): 1204 m = index_re.match(component) 1205 if m: 1206 component, idx = m.groups() 1207 idx = int(idx) 1208 1209 if not isinstance(d, dict) or component not in d: 1210 self.fail(f'failed path traversal for "{path}" in "{d}"') 1211 d = d[component] 1212 1213 if m: 1214 if not isinstance(d, list): 1215 self.fail(f'path component "{component}" in "{path}" ' 1216 f'is not a list in "{d}"') 1217 try: 1218 d = d[idx] 1219 except IndexError: 1220 self.fail(f'invalid index "{idx}" in path "{path}" ' 1221 f'in "{d}"') 1222 return d 1223 1224 def assert_qmp_absent(self, d, path): 1225 try: 1226 result = self.dictpath(d, path) 1227 except AssertionError: 1228 return 1229 self.fail('path "%s" has value "%s"' % (path, str(result))) 1230 1231 def assert_qmp(self, d, path, value): 1232 '''Assert that the value for a specific path in a QMP dict 1233 matches. When given a list of values, assert that any of 1234 them matches.''' 1235 1236 result = self.dictpath(d, path) 1237 1238 # [] makes no sense as a list of valid values, so treat it as 1239 # an actual single value. 1240 if isinstance(value, list) and value != []: 1241 for v in value: 1242 if result == v: 1243 return 1244 self.fail('no match for "%s" in %s' % (str(result), str(value))) 1245 else: 1246 self.assertEqual(result, value, 1247 '"%s" is "%s", expected "%s"' 1248 % (path, str(result), str(value))) 1249 1250 def assert_no_active_block_jobs(self): 1251 result = self.vm.qmp('query-block-jobs') 1252 self.assert_qmp(result, 'return', []) 1253 1254 def assert_has_block_node(self, node_name=None, file_name=None): 1255 """Issue a query-named-block-nodes and assert node_name and/or 1256 file_name is present in the result""" 1257 def check_equal_or_none(a, b): 1258 return a is None or b is None or a == b 1259 assert node_name or file_name 1260 result = self.vm.qmp('query-named-block-nodes') 1261 for x in result["return"]: 1262 if check_equal_or_none(x.get("node-name"), node_name) and \ 1263 check_equal_or_none(x.get("file"), file_name): 1264 return 1265 self.fail("Cannot find %s %s in result:\n%s" % 1266 (node_name, file_name, result)) 1267 1268 def assert_json_filename_equal(self, json_filename, reference): 1269 '''Asserts that the given filename is a json: filename and that its 1270 content is equal to the given reference object''' 1271 self.assertEqual(json_filename[:5], 'json:') 1272 self.assertEqual( 1273 self.vm.flatten_qmp_object(json.loads(json_filename[5:])), 1274 self.vm.flatten_qmp_object(reference) 1275 ) 1276 1277 def cancel_and_wait(self, drive='drive0', force=False, 1278 resume=False, wait=60.0): 1279 '''Cancel a block job and wait for it to finish, returning the event''' 1280 self.vm.cmd('block-job-cancel', device=drive, force=force) 1281 1282 if resume: 1283 self.vm.resume_drive(drive) 1284 1285 cancelled = False 1286 result = None 1287 while not cancelled: 1288 for event in self.vm.get_qmp_events(wait=wait): 1289 if event['event'] == 'BLOCK_JOB_COMPLETED' or \ 1290 event['event'] == 'BLOCK_JOB_CANCELLED': 1291 self.assert_qmp(event, 'data/device', drive) 1292 result = event 1293 cancelled = True 1294 elif event['event'] == 'JOB_STATUS_CHANGE': 1295 self.assert_qmp(event, 'data/id', drive) 1296 1297 1298 self.assert_no_active_block_jobs() 1299 return result 1300 1301 def wait_until_completed(self, drive='drive0', check_offset=True, 1302 wait=60.0, error=None): 1303 '''Wait for a block job to finish, returning the event''' 1304 while True: 1305 for event in self.vm.get_qmp_events(wait=wait): 1306 if event['event'] == 'BLOCK_JOB_COMPLETED': 1307 self.assert_qmp(event, 'data/device', drive) 1308 if error is None: 1309 self.assert_qmp_absent(event, 'data/error') 1310 if check_offset: 1311 self.assert_qmp(event, 'data/offset', 1312 event['data']['len']) 1313 else: 1314 self.assert_qmp(event, 'data/error', error) 1315 self.assert_no_active_block_jobs() 1316 return event 1317 if event['event'] == 'JOB_STATUS_CHANGE': 1318 self.assert_qmp(event, 'data/id', drive) 1319 1320 def wait_ready(self, drive='drive0'): 1321 """Wait until a BLOCK_JOB_READY event, and return the event.""" 1322 return self.vm.events_wait([ 1323 ('BLOCK_JOB_READY', 1324 {'data': {'type': 'mirror', 'device': drive}}), 1325 ('BLOCK_JOB_READY', 1326 {'data': {'type': 'commit', 'device': drive}}) 1327 ]) 1328 1329 def wait_ready_and_cancel(self, drive='drive0'): 1330 self.wait_ready(drive=drive) 1331 event = self.cancel_and_wait(drive=drive) 1332 self.assertEqual(event['event'], 'BLOCK_JOB_COMPLETED') 1333 self.assert_qmp(event, 'data/type', 'mirror') 1334 self.assert_qmp(event, 'data/offset', event['data']['len']) 1335 1336 def complete_and_wait(self, drive='drive0', wait_ready=True, 1337 completion_error=None): 1338 '''Complete a block job and wait for it to finish''' 1339 if wait_ready: 1340 self.wait_ready(drive=drive) 1341 1342 self.vm.cmd('block-job-complete', device=drive) 1343 1344 event = self.wait_until_completed(drive=drive, error=completion_error) 1345 self.assertTrue(event['data']['type'] in ['mirror', 'commit']) 1346 1347 def pause_wait(self, job_id='job0'): 1348 with Timeout(3, "Timeout waiting for job to pause"): 1349 while True: 1350 result = self.vm.qmp('query-block-jobs') 1351 found = False 1352 for job in result['return']: 1353 if job['device'] == job_id: 1354 found = True 1355 if job['paused'] and not job['busy']: 1356 return job 1357 break 1358 assert found 1359 1360 def pause_job(self, job_id='job0', wait=True): 1361 self.vm.cmd('block-job-pause', device=job_id) 1362 if wait: 1363 self.pause_wait(job_id) 1364 1365 def case_skip(self, reason): 1366 '''Skip this test case''' 1367 case_notrun(reason) 1368 self.skipTest(reason) 1369 1370 1371def notrun(reason): 1372 '''Skip this test suite''' 1373 # Each test in qemu-iotests has a number ("seq") 1374 seq = os.path.basename(sys.argv[0]) 1375 1376 with open('%s/%s.notrun' % (test_dir, seq), 'w', encoding='utf-8') \ 1377 as outfile: 1378 outfile.write(reason + '\n') 1379 logger.warning("%s not run: %s", seq, reason) 1380 sys.exit(0) 1381 1382def case_notrun(reason): 1383 '''Mark this test case as not having been run (without actually 1384 skipping it, that is left to the caller). See 1385 QMPTestCase.case_skip() for a variant that actually skips the 1386 current test case.''' 1387 1388 # Each test in qemu-iotests has a number ("seq") 1389 seq = os.path.basename(sys.argv[0]) 1390 1391 with open('%s/%s.casenotrun' % (test_dir, seq), 'a', encoding='utf-8') \ 1392 as outfile: 1393 outfile.write(' [case not run] ' + reason + '\n') 1394 1395def _verify_image_format(supported_fmts: Sequence[str] = (), 1396 unsupported_fmts: Sequence[str] = ()) -> None: 1397 if 'generic' in supported_fmts and \ 1398 os.environ.get('IMGFMT_GENERIC', 'true') == 'true': 1399 # similar to 1400 # _supported_fmt generic 1401 # for bash tests 1402 supported_fmts = () 1403 1404 not_sup = supported_fmts and (imgfmt not in supported_fmts) 1405 if not_sup or (imgfmt in unsupported_fmts): 1406 notrun('not suitable for this image format: %s' % imgfmt) 1407 1408 if imgfmt == 'luks': 1409 verify_working_luks() 1410 1411def _verify_protocol(supported: Sequence[str] = (), 1412 unsupported: Sequence[str] = ()) -> None: 1413 assert not (supported and unsupported) 1414 1415 if 'generic' in supported: 1416 return 1417 1418 not_sup = supported and (imgproto not in supported) 1419 if not_sup or (imgproto in unsupported): 1420 notrun('not suitable for this protocol: %s' % imgproto) 1421 1422def _verify_platform(supported: Sequence[str] = (), 1423 unsupported: Sequence[str] = ()) -> None: 1424 if any((sys.platform.startswith(x) for x in unsupported)): 1425 notrun('not suitable for this OS: %s' % sys.platform) 1426 1427 if supported: 1428 if not any((sys.platform.startswith(x) for x in supported)): 1429 notrun('not suitable for this OS: %s' % sys.platform) 1430 1431def _verify_cache_mode(supported_cache_modes: Sequence[str] = ()) -> None: 1432 if supported_cache_modes and (cachemode not in supported_cache_modes): 1433 notrun('not suitable for this cache mode: %s' % cachemode) 1434 1435def _verify_aio_mode(supported_aio_modes: Sequence[str] = ()) -> None: 1436 if supported_aio_modes and (aiomode not in supported_aio_modes): 1437 notrun('not suitable for this aio mode: %s' % aiomode) 1438 1439def _verify_formats(required_formats: Sequence[str] = ()) -> None: 1440 usf_list = list(set(required_formats) - set(supported_formats())) 1441 if usf_list: 1442 notrun(f'formats {usf_list} are not whitelisted') 1443 1444 1445def _verify_virtio_blk() -> None: 1446 out = qemu_pipe('-M', 'none', '-device', 'help') 1447 if 'virtio-blk' not in out: 1448 notrun('Missing virtio-blk in QEMU binary') 1449 1450def verify_virtio_scsi_pci_or_ccw() -> None: 1451 out = qemu_pipe('-M', 'none', '-device', 'help') 1452 if 'virtio-scsi-pci' not in out and 'virtio-scsi-ccw' not in out: 1453 notrun('Missing virtio-scsi-pci or virtio-scsi-ccw in QEMU binary') 1454 1455 1456def _verify_imgopts(unsupported: Sequence[str] = ()) -> None: 1457 imgopts = os.environ.get('IMGOPTS') 1458 # One of usage examples for IMGOPTS is "data_file=$TEST_IMG.ext_data_file" 1459 # but it supported only for bash tests. We don't have a concept of global 1460 # TEST_IMG in iotests.py, not saying about somehow parsing $variables. 1461 # So, for simplicity let's just not support any IMGOPTS with '$' inside. 1462 unsup = list(unsupported) + ['$'] 1463 if imgopts and any(x in imgopts for x in unsup): 1464 notrun(f'not suitable for this imgopts: {imgopts}') 1465 1466 1467def supports_quorum() -> bool: 1468 return 'quorum' in qemu_img('--help').stdout 1469 1470def verify_quorum(): 1471 '''Skip test suite if quorum support is not available''' 1472 if not supports_quorum(): 1473 notrun('quorum support missing') 1474 1475def has_working_luks() -> Tuple[bool, str]: 1476 """ 1477 Check whether our LUKS driver can actually create images 1478 (this extends to LUKS encryption for qcow2). 1479 1480 If not, return the reason why. 1481 """ 1482 1483 img_file = f'{test_dir}/luks-test.luks' 1484 res = qemu_img('create', '-f', 'luks', 1485 '--object', luks_default_secret_object, 1486 '-o', luks_default_key_secret_opt, 1487 '-o', 'iter-time=10', 1488 img_file, '1G', 1489 check=False) 1490 try: 1491 os.remove(img_file) 1492 except OSError: 1493 pass 1494 1495 if res.returncode: 1496 reason = res.stdout 1497 for line in res.stdout.splitlines(): 1498 if img_file + ':' in line: 1499 reason = line.split(img_file + ':', 1)[1].strip() 1500 break 1501 1502 return (False, reason) 1503 else: 1504 return (True, '') 1505 1506def verify_working_luks(): 1507 """ 1508 Skip test suite if LUKS does not work 1509 """ 1510 (working, reason) = has_working_luks() 1511 if not working: 1512 notrun(reason) 1513 1514def supports_qcow2_zstd_compression() -> bool: 1515 img_file = f'{test_dir}/qcow2-zstd-test.qcow2' 1516 res = qemu_img('create', '-f', 'qcow2', '-o', 'compression_type=zstd', 1517 img_file, '0', 1518 check=False) 1519 try: 1520 os.remove(img_file) 1521 except OSError: 1522 pass 1523 1524 if res.returncode == 1 and \ 1525 "'compression-type' does not accept value 'zstd'" in res.stdout: 1526 return False 1527 else: 1528 return True 1529 1530def verify_qcow2_zstd_compression(): 1531 if not supports_qcow2_zstd_compression(): 1532 notrun('zstd compression not supported') 1533 1534def qemu_pipe(*args: str) -> str: 1535 """ 1536 Run qemu with an option to print something and exit (e.g. a help option). 1537 1538 :return: QEMU's stdout output. 1539 """ 1540 full_args = [qemu_prog] + qemu_opts + list(args) 1541 output, _ = qemu_tool_pipe_and_status('qemu', full_args) 1542 return output 1543 1544def supported_formats(read_only=False): 1545 '''Set 'read_only' to True to check ro-whitelist 1546 Otherwise, rw-whitelist is checked''' 1547 1548 if not hasattr(supported_formats, "formats"): 1549 supported_formats.formats = {} 1550 1551 if read_only not in supported_formats.formats: 1552 format_message = qemu_pipe("-drive", "format=help") 1553 line = 1 if read_only else 0 1554 supported_formats.formats[read_only] = \ 1555 format_message.splitlines()[line].split(":")[1].split() 1556 1557 return supported_formats.formats[read_only] 1558 1559def skip_if_unsupported(required_formats=(), read_only=False): 1560 '''Skip Test Decorator 1561 Runs the test if all the required formats are whitelisted''' 1562 def skip_test_decorator(func): 1563 def func_wrapper(test_case: QMPTestCase, *args: List[Any], 1564 **kwargs: Dict[str, Any]) -> None: 1565 if callable(required_formats): 1566 fmts = required_formats(test_case) 1567 else: 1568 fmts = required_formats 1569 1570 usf_list = list(set(fmts) - set(supported_formats(read_only))) 1571 if usf_list: 1572 msg = f'{test_case}: formats {usf_list} are not whitelisted' 1573 test_case.case_skip(msg) 1574 else: 1575 func(test_case, *args, **kwargs) 1576 return func_wrapper 1577 return skip_test_decorator 1578 1579def skip_for_formats(formats: Sequence[str] = ()) \ 1580 -> Callable[[Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]], 1581 Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]]: 1582 '''Skip Test Decorator 1583 Skips the test for the given formats''' 1584 def skip_test_decorator(func): 1585 def func_wrapper(test_case: QMPTestCase, *args: List[Any], 1586 **kwargs: Dict[str, Any]) -> None: 1587 if imgfmt in formats: 1588 msg = f'{test_case}: Skipped for format {imgfmt}' 1589 test_case.case_skip(msg) 1590 else: 1591 func(test_case, *args, **kwargs) 1592 return func_wrapper 1593 return skip_test_decorator 1594 1595def skip_if_user_is_root(func): 1596 '''Skip Test Decorator 1597 Runs the test only without root permissions''' 1598 def func_wrapper(*args, **kwargs): 1599 if os.getuid() == 0: 1600 case_notrun('{}: cannot be run as root'.format(args[0])) 1601 return None 1602 else: 1603 return func(*args, **kwargs) 1604 return func_wrapper 1605 1606# We need to filter out the time taken from the output so that 1607# qemu-iotest can reliably diff the results against master output, 1608# and hide skipped tests from the reference output. 1609 1610class ReproducibleTestResult(unittest.TextTestResult): 1611 def addSkip(self, test, reason): 1612 # Same as TextTestResult, but print dot instead of "s" 1613 unittest.TestResult.addSkip(self, test, reason) 1614 if self.showAll: 1615 self.stream.writeln("skipped {0!r}".format(reason)) 1616 elif self.dots: 1617 self.stream.write(".") 1618 self.stream.flush() 1619 1620class ReproducibleStreamWrapper: 1621 def __init__(self, stream: TextIO): 1622 self.stream = stream 1623 1624 def __getattr__(self, attr): 1625 if attr in ('stream', '__getstate__'): 1626 raise AttributeError(attr) 1627 return getattr(self.stream, attr) 1628 1629 def write(self, arg=None): 1630 arg = re.sub(r'Ran (\d+) tests? in [\d.]+s', r'Ran \1 tests', arg) 1631 arg = re.sub(r' \(skipped=\d+\)', r'', arg) 1632 self.stream.write(arg) 1633 1634class ReproducibleTestRunner(unittest.TextTestRunner): 1635 def __init__( 1636 self, 1637 stream: Optional[TextIO] = None, 1638 resultclass: Type[unittest.TextTestResult] = 1639 ReproducibleTestResult, 1640 **kwargs: Any 1641 ) -> None: 1642 rstream = ReproducibleStreamWrapper(stream or sys.stdout) 1643 super().__init__(stream=rstream, # type: ignore 1644 descriptions=True, 1645 resultclass=resultclass, 1646 **kwargs) 1647 1648def execute_unittest(argv: List[str], debug: bool = False) -> None: 1649 """Executes unittests within the calling module.""" 1650 1651 # Some tests have warnings, especially ResourceWarnings for unclosed 1652 # files and sockets. Ignore them for now to ensure reproducibility of 1653 # the test output. 1654 unittest.main(argv=argv, 1655 testRunner=ReproducibleTestRunner, 1656 verbosity=2 if debug else 1, 1657 warnings=None if sys.warnoptions else 'ignore') 1658 1659def execute_setup_common(supported_fmts: Sequence[str] = (), 1660 supported_platforms: Sequence[str] = (), 1661 supported_cache_modes: Sequence[str] = (), 1662 supported_aio_modes: Sequence[str] = (), 1663 unsupported_fmts: Sequence[str] = (), 1664 supported_protocols: Sequence[str] = (), 1665 unsupported_protocols: Sequence[str] = (), 1666 required_fmts: Sequence[str] = (), 1667 unsupported_imgopts: Sequence[str] = ()) -> bool: 1668 """ 1669 Perform necessary setup for either script-style or unittest-style tests. 1670 1671 :return: Bool; Whether or not debug mode has been requested via the CLI. 1672 """ 1673 # Note: Python 3.6 and pylint do not like 'Collection' so use 'Sequence'. 1674 1675 debug = '-d' in sys.argv 1676 if debug: 1677 sys.argv.remove('-d') 1678 logging.basicConfig(level=(logging.DEBUG if debug else logging.WARN)) 1679 1680 _verify_image_format(supported_fmts, unsupported_fmts) 1681 _verify_protocol(supported_protocols, unsupported_protocols) 1682 _verify_platform(supported=supported_platforms) 1683 _verify_cache_mode(supported_cache_modes) 1684 _verify_aio_mode(supported_aio_modes) 1685 _verify_formats(required_fmts) 1686 _verify_virtio_blk() 1687 _verify_imgopts(unsupported_imgopts) 1688 1689 return debug 1690 1691def execute_test(*args, test_function=None, **kwargs): 1692 """Run either unittest or script-style tests.""" 1693 1694 debug = execute_setup_common(*args, **kwargs) 1695 if not test_function: 1696 execute_unittest(sys.argv, debug) 1697 else: 1698 test_function() 1699 1700def activate_logging(): 1701 """Activate iotests.log() output to stdout for script-style tests.""" 1702 handler = logging.StreamHandler(stream=sys.stdout) 1703 formatter = logging.Formatter('%(message)s') 1704 handler.setFormatter(formatter) 1705 test_logger.addHandler(handler) 1706 test_logger.setLevel(logging.INFO) 1707 test_logger.propagate = False 1708 1709# This is called from script-style iotests without a single point of entry 1710def script_initialize(*args, **kwargs): 1711 """Initialize script-style tests without running any tests.""" 1712 activate_logging() 1713 execute_setup_common(*args, **kwargs) 1714 1715# This is called from script-style iotests with a single point of entry 1716def script_main(test_function, *args, **kwargs): 1717 """Run script-style tests outside of the unittest framework""" 1718 activate_logging() 1719 execute_test(*args, test_function=test_function, **kwargs) 1720 1721# This is called from unittest style iotests 1722def main(*args, **kwargs): 1723 """Run tests using the unittest framework""" 1724 execute_test(*args, **kwargs) 1725