1# Test class and utilities for functional tests 2# 3# Copyright 2018, 2024 Red Hat, Inc. 4# 5# Original Author (Avocado-based tests): 6# Cleber Rosa <crosa@redhat.com> 7# 8# Adaption for standalone version: 9# Thomas Huth <thuth@redhat.com> 10# 11# This work is licensed under the terms of the GNU GPL, version 2 or 12# later. See the COPYING file in the top-level directory. 13 14import logging 15import os 16from pathlib import Path 17import pycotap 18import shutil 19from subprocess import run 20import sys 21import tempfile 22import unittest 23import uuid 24 25from qemu.machine import QEMUMachine 26from qemu.utils import kvm_available, tcg_available 27 28from .archive import archive_extract 29from .asset import Asset 30from .config import BUILD_DIR 31from .uncompress import uncompress 32 33 34class QemuBaseTest(unittest.TestCase): 35 36 qemu_bin = os.getenv('QEMU_TEST_QEMU_BINARY') 37 arch = None 38 39 workdir = None 40 log = None 41 logdir = None 42 43 ''' 44 @params compressed: filename, Asset, or file-like object to uncompress 45 @params format: optional compression format (gzip, lzma) 46 47 Uncompresses @compressed into the scratch directory. 48 49 If @format is None, heuristics will be applied to guess the format 50 from the filename or Asset URL. @format must be non-None if @uncompressed 51 is a file-like object. 52 53 Returns the fully qualified path to the uncompressed file 54 ''' 55 def uncompress(self, compressed, format=None): 56 self.log.debug(f"Uncompress {compressed} format={format}") 57 if type(compressed) == Asset: 58 compressed.fetch() 59 60 (name, ext) = os.path.splitext(str(compressed)) 61 uncompressed = self.scratch_file(os.path.basename(name)) 62 63 uncompress(compressed, uncompressed, format) 64 65 return uncompressed 66 67 ''' 68 @params archive: filename, Asset, or file-like object to extract 69 @params format: optional archive format (tar, zip, deb, cpio) 70 @params sub_dir: optional sub-directory to extract into 71 @params member: optional member file to limit extraction to 72 73 Extracts @archive into the scratch directory, or a directory beneath 74 named by @sub_dir. All files are extracted unless @member specifies 75 a limit. 76 77 If @format is None, heuristics will be applied to guess the format 78 from the filename or Asset URL. @format must be non-None if @archive 79 is a file-like object. 80 81 If @member is non-None, returns the fully qualified path to @member 82 ''' 83 def archive_extract(self, archive, format=None, sub_dir=None, member=None): 84 self.log.debug(f"Extract {archive} format={format}" + 85 f"sub_dir={sub_dir} member={member}") 86 if type(archive) == Asset: 87 archive.fetch() 88 if sub_dir is None: 89 archive_extract(archive, self.scratch_file(), format, member) 90 else: 91 archive_extract(archive, self.scratch_file(sub_dir), 92 format, member) 93 94 if member is not None: 95 return self.scratch_file(member) 96 return None 97 98 ''' 99 Create a temporary directory suitable for storing UNIX 100 socket paths. 101 102 Returns: a tempfile.TemporaryDirectory instance 103 ''' 104 def socket_dir(self): 105 if self.socketdir is None: 106 self.socketdir = tempfile.TemporaryDirectory( 107 prefix="qemu_func_test_sock_") 108 return self.socketdir 109 110 ''' 111 @params args list of zero or more subdirectories or file 112 113 Construct a path for accessing a data file located 114 relative to the source directory that is the root for 115 functional tests. 116 117 @args may be an empty list to reference the root dir 118 itself, may be a single element to reference a file in 119 the root directory, or may be multiple elements to 120 reference a file nested below. The path components 121 will be joined using the platform appropriate path 122 separator. 123 124 Returns: string representing a file path 125 ''' 126 def data_file(self, *args): 127 return str(Path(Path(__file__).parent.parent, *args)) 128 129 ''' 130 @params args list of zero or more subdirectories or file 131 132 Construct a path for accessing a data file located 133 relative to the build directory root. 134 135 @args may be an empty list to reference the build dir 136 itself, may be a single element to reference a file in 137 the build directory, or may be multiple elements to 138 reference a file nested below. The path components 139 will be joined using the platform appropriate path 140 separator. 141 142 Returns: string representing a file path 143 ''' 144 def build_file(self, *args): 145 return str(Path(BUILD_DIR, *args)) 146 147 ''' 148 @params args list of zero or more subdirectories or file 149 150 Construct a path for accessing/creating a scratch file 151 located relative to a temporary directory dedicated to 152 this test case. The directory and its contents will be 153 purged upon completion of the test. 154 155 @args may be an empty list to reference the scratch dir 156 itself, may be a single element to reference a file in 157 the scratch directory, or may be multiple elements to 158 reference a file nested below. The path components 159 will be joined using the platform appropriate path 160 separator. 161 162 Returns: string representing a file path 163 ''' 164 def scratch_file(self, *args): 165 return str(Path(self.workdir, *args)) 166 167 ''' 168 @params args list of zero or more subdirectories or file 169 170 Construct a path for accessing/creating a log file 171 located relative to a temporary directory dedicated to 172 this test case. The directory and its log files will be 173 preserved upon completion of the test. 174 175 @args may be an empty list to reference the log dir 176 itself, may be a single element to reference a file in 177 the log directory, or may be multiple elements to 178 reference a file nested below. The path components 179 will be joined using the platform appropriate path 180 separator. 181 182 Returns: string representing a file path 183 ''' 184 def log_file(self, *args): 185 return str(Path(self.outputdir, *args)) 186 187 def assets_available(self): 188 for name, asset in vars(self.__class__).items(): 189 if name.startswith("ASSET_") and type(asset) == Asset: 190 if not asset.available(): 191 self.log.debug(f"Asset {asset.url} not available") 192 return False 193 return True 194 195 def setUp(self, bin_prefix): 196 self.assertIsNotNone(self.qemu_bin, 'QEMU_TEST_QEMU_BINARY must be set') 197 self.arch = self.qemu_bin.split('-')[-1] 198 self.socketdir = None 199 200 self.outputdir = self.build_file('tests', 'functional', 201 self.arch, self.id()) 202 self.workdir = os.path.join(self.outputdir, 'scratch') 203 os.makedirs(self.workdir, exist_ok=True) 204 205 self.log_filename = self.log_file('base.log') 206 self.log = logging.getLogger('qemu-test') 207 self.log.setLevel(logging.DEBUG) 208 self._log_fh = logging.FileHandler(self.log_filename, mode='w') 209 self._log_fh.setLevel(logging.DEBUG) 210 fileFormatter = logging.Formatter( 211 '%(asctime)s - %(levelname)s: %(message)s') 212 self._log_fh.setFormatter(fileFormatter) 213 self.log.addHandler(self._log_fh) 214 215 # Capture QEMUMachine logging 216 self.machinelog = logging.getLogger('qemu.machine') 217 self.machinelog.setLevel(logging.DEBUG) 218 self.machinelog.addHandler(self._log_fh) 219 220 if not self.assets_available(): 221 self.skipTest('One or more assets is not available') 222 223 def tearDown(self): 224 if "QEMU_TEST_KEEP_SCRATCH" not in os.environ: 225 shutil.rmtree(self.workdir) 226 if self.socketdir is not None: 227 shutil.rmtree(self.socketdir.name) 228 self.socketdir = None 229 self.machinelog.removeHandler(self._log_fh) 230 self.log.removeHandler(self._log_fh) 231 232 def main(): 233 path = os.path.basename(sys.argv[0])[:-3] 234 235 cache = os.environ.get("QEMU_TEST_PRECACHE", None) 236 if cache is not None: 237 Asset.precache_suites(path, cache) 238 return 239 240 tr = pycotap.TAPTestRunner(message_log = pycotap.LogMode.LogToError, 241 test_output_log = pycotap.LogMode.LogToError) 242 res = unittest.main(module = None, testRunner = tr, exit = False, 243 argv=["__dummy__", path]) 244 for (test, message) in res.result.errors + res.result.failures: 245 246 if hasattr(test, "log_filename"): 247 print('More information on ' + test.id() + ' could be found here:' 248 '\n %s' % test.log_filename, file=sys.stderr) 249 if hasattr(test, 'console_log_name'): 250 print(' %s' % test.console_log_name, file=sys.stderr) 251 sys.exit(not res.result.wasSuccessful()) 252 253 254class QemuUserTest(QemuBaseTest): 255 256 def setUp(self): 257 super().setUp('qemu-') 258 self._ldpath = [] 259 260 def add_ldpath(self, ldpath): 261 self._ldpath.append(os.path.abspath(ldpath)) 262 263 def run_cmd(self, bin_path, args=[]): 264 return run([self.qemu_bin] 265 + ["-L %s" % ldpath for ldpath in self._ldpath] 266 + [bin_path] 267 + args, 268 text=True, capture_output=True) 269 270class QemuSystemTest(QemuBaseTest): 271 """Facilitates system emulation tests.""" 272 273 cpu = None 274 machine = None 275 _machinehelp = None 276 277 def setUp(self): 278 self._vms = {} 279 280 super().setUp('qemu-system-') 281 282 console_log = logging.getLogger('console') 283 console_log.setLevel(logging.DEBUG) 284 self.console_log_name = self.log_file('console.log') 285 self._console_log_fh = logging.FileHandler(self.console_log_name, 286 mode='w') 287 self._console_log_fh.setLevel(logging.DEBUG) 288 fileFormatter = logging.Formatter('%(asctime)s: %(message)s') 289 self._console_log_fh.setFormatter(fileFormatter) 290 console_log.addHandler(self._console_log_fh) 291 292 def set_machine(self, machinename): 293 # TODO: We should use QMP to get the list of available machines 294 if not self._machinehelp: 295 self._machinehelp = run( 296 [self.qemu_bin, '-M', 'help'], 297 capture_output=True, check=True, encoding='utf8').stdout 298 if self._machinehelp.find(machinename) < 0: 299 self.skipTest('no support for machine ' + machinename) 300 self.machine = machinename 301 302 def require_accelerator(self, accelerator): 303 """ 304 Requires an accelerator to be available for the test to continue 305 306 It takes into account the currently set qemu binary. 307 308 If the check fails, the test is canceled. If the check itself 309 for the given accelerator is not available, the test is also 310 canceled. 311 312 :param accelerator: name of the accelerator, such as "kvm" or "tcg" 313 :type accelerator: str 314 """ 315 checker = {'tcg': tcg_available, 316 'kvm': kvm_available}.get(accelerator) 317 if checker is None: 318 self.skipTest("Don't know how to check for the presence " 319 "of accelerator %s" % accelerator) 320 if not checker(qemu_bin=self.qemu_bin): 321 self.skipTest("%s accelerator does not seem to be " 322 "available" % accelerator) 323 324 def require_netdev(self, netdevname): 325 help = run([self.qemu_bin, 326 '-M', 'none', '-netdev', 'help'], 327 capture_output=True, check=True, encoding='utf8').stdout; 328 if help.find('\n' + netdevname + '\n') < 0: 329 self.skipTest('no support for " + netdevname + " networking') 330 331 def require_device(self, devicename): 332 help = run([self.qemu_bin, 333 '-M', 'none', '-device', 'help'], 334 capture_output=True, check=True, encoding='utf8').stdout; 335 if help.find(devicename) < 0: 336 self.skipTest('no support for device ' + devicename) 337 338 def _new_vm(self, name, *args): 339 vm = QEMUMachine(self.qemu_bin, 340 name=name, 341 base_temp_dir=self.workdir, 342 log_dir=self.log_file()) 343 self.log.debug('QEMUMachine "%s" created', name) 344 self.log.debug('QEMUMachine "%s" temp_dir: %s', name, vm.temp_dir) 345 346 sockpath = os.environ.get("QEMU_TEST_QMP_BACKDOOR", None) 347 if sockpath is not None: 348 vm.add_args("-chardev", 349 f"socket,id=backdoor,path={sockpath},server=on,wait=off", 350 "-mon", "chardev=backdoor,mode=control") 351 352 if args: 353 vm.add_args(*args) 354 return vm 355 356 @property 357 def vm(self): 358 return self.get_vm(name='default') 359 360 def get_vm(self, *args, name=None): 361 if not name: 362 name = str(uuid.uuid4()) 363 if self._vms.get(name) is None: 364 self._vms[name] = self._new_vm(name, *args) 365 if self.cpu is not None: 366 self._vms[name].add_args('-cpu', self.cpu) 367 if self.machine is not None: 368 self._vms[name].set_machine(self.machine) 369 return self._vms[name] 370 371 def set_vm_arg(self, arg, value): 372 """ 373 Set an argument to list of extra arguments to be given to the QEMU 374 binary. If the argument already exists then its value is replaced. 375 376 :param arg: the QEMU argument, such as "-cpu" in "-cpu host" 377 :type arg: str 378 :param value: the argument value, such as "host" in "-cpu host" 379 :type value: str 380 """ 381 if not arg or not value: 382 return 383 if arg not in self.vm.args: 384 self.vm.args.extend([arg, value]) 385 else: 386 idx = self.vm.args.index(arg) + 1 387 if idx < len(self.vm.args): 388 self.vm.args[idx] = value 389 else: 390 self.vm.args.append(value) 391 392 def tearDown(self): 393 for vm in self._vms.values(): 394 vm.shutdown() 395 logging.getLogger('console').removeHandler(self._console_log_fh) 396 super().tearDown() 397