1# Test class and utilities for functional tests 2# 3# Copyright 2018, 2024 Red Hat, Inc. 4# 5# Original Author (Avocado-based tests): 6# Cleber Rosa <crosa@redhat.com> 7# 8# Adaption for standalone version: 9# Thomas Huth <thuth@redhat.com> 10# 11# This work is licensed under the terms of the GNU GPL, version 2 or 12# later. See the COPYING file in the top-level directory. 13 14import logging 15import os 16from pathlib import Path 17import pycotap 18import shutil 19from subprocess import run 20import sys 21import tempfile 22import unittest 23import uuid 24 25from qemu.machine import QEMUMachine 26from qemu.utils import hvf_available, kvm_available, tcg_available 27 28from .archive import archive_extract 29from .asset import Asset 30from .config import BUILD_DIR, dso_suffix 31from .uncompress import uncompress 32 33 34class QemuBaseTest(unittest.TestCase): 35 36 ''' 37 @params compressed: filename, Asset, or file-like object to uncompress 38 @params format: optional compression format (gzip, lzma) 39 40 Uncompresses @compressed into the scratch directory. 41 42 If @format is None, heuristics will be applied to guess the format 43 from the filename or Asset URL. @format must be non-None if @uncompressed 44 is a file-like object. 45 46 Returns the fully qualified path to the uncompressed file 47 ''' 48 def uncompress(self, compressed, format=None): 49 self.log.debug(f"Uncompress {compressed} format={format}") 50 if type(compressed) == Asset: 51 compressed.fetch() 52 53 (name, ext) = os.path.splitext(str(compressed)) 54 uncompressed = self.scratch_file(os.path.basename(name)) 55 56 uncompress(compressed, uncompressed, format) 57 58 return uncompressed 59 60 ''' 61 @params archive: filename, Asset, or file-like object to extract 62 @params format: optional archive format (tar, zip, deb, cpio) 63 @params sub_dir: optional sub-directory to extract into 64 @params member: optional member file to limit extraction to 65 66 Extracts @archive into the scratch directory, or a directory beneath 67 named by @sub_dir. All files are extracted unless @member specifies 68 a limit. 69 70 If @format is None, heuristics will be applied to guess the format 71 from the filename or Asset URL. @format must be non-None if @archive 72 is a file-like object. 73 74 If @member is non-None, returns the fully qualified path to @member 75 ''' 76 def archive_extract(self, archive, format=None, sub_dir=None, member=None): 77 self.log.debug(f"Extract {archive} format={format}" + 78 f"sub_dir={sub_dir} member={member}") 79 if type(archive) == Asset: 80 archive.fetch() 81 if sub_dir is None: 82 archive_extract(archive, self.scratch_file(), format, member) 83 else: 84 archive_extract(archive, self.scratch_file(sub_dir), 85 format, member) 86 87 if member is not None: 88 return self.scratch_file(member) 89 return None 90 91 ''' 92 Create a temporary directory suitable for storing UNIX 93 socket paths. 94 95 Returns: a tempfile.TemporaryDirectory instance 96 ''' 97 def socket_dir(self): 98 if self.socketdir is None: 99 self.socketdir = tempfile.TemporaryDirectory( 100 prefix="qemu_func_test_sock_") 101 return self.socketdir 102 103 ''' 104 @params args list of zero or more subdirectories or file 105 106 Construct a path for accessing a data file located 107 relative to the source directory that is the root for 108 functional tests. 109 110 @args may be an empty list to reference the root dir 111 itself, may be a single element to reference a file in 112 the root directory, or may be multiple elements to 113 reference a file nested below. The path components 114 will be joined using the platform appropriate path 115 separator. 116 117 Returns: string representing a file path 118 ''' 119 def data_file(self, *args): 120 return str(Path(Path(__file__).parent.parent, *args)) 121 122 ''' 123 @params args list of zero or more subdirectories or file 124 125 Construct a path for accessing a data file located 126 relative to the build directory root. 127 128 @args may be an empty list to reference the build dir 129 itself, may be a single element to reference a file in 130 the build directory, or may be multiple elements to 131 reference a file nested below. The path components 132 will be joined using the platform appropriate path 133 separator. 134 135 Returns: string representing a file path 136 ''' 137 def build_file(self, *args): 138 return str(Path(BUILD_DIR, *args)) 139 140 ''' 141 @params args list of zero or more subdirectories or file 142 143 Construct a path for accessing/creating a scratch file 144 located relative to a temporary directory dedicated to 145 this test case. The directory and its contents will be 146 purged upon completion of the test. 147 148 @args may be an empty list to reference the scratch dir 149 itself, may be a single element to reference a file in 150 the scratch directory, or may be multiple elements to 151 reference a file nested below. The path components 152 will be joined using the platform appropriate path 153 separator. 154 155 Returns: string representing a file path 156 ''' 157 def scratch_file(self, *args): 158 return str(Path(self.workdir, *args)) 159 160 ''' 161 @params args list of zero or more subdirectories or file 162 163 Construct a path for accessing/creating a log file 164 located relative to a temporary directory dedicated to 165 this test case. The directory and its log files will be 166 preserved upon completion of the test. 167 168 @args may be an empty list to reference the log dir 169 itself, may be a single element to reference a file in 170 the log directory, or may be multiple elements to 171 reference a file nested below. The path components 172 will be joined using the platform appropriate path 173 separator. 174 175 Returns: string representing a file path 176 ''' 177 def log_file(self, *args): 178 return str(Path(self.outputdir, *args)) 179 180 ''' 181 @params plugin name 182 183 Return the full path to the plugin taking into account any host OS 184 specific suffixes. 185 ''' 186 def plugin_file(self, plugin_name): 187 sfx = dso_suffix() 188 return os.path.join('tests', 'tcg', 'plugins', f'{plugin_name}.{sfx}') 189 190 def assets_available(self): 191 for name, asset in vars(self.__class__).items(): 192 if name.startswith("ASSET_") and type(asset) == Asset: 193 if not asset.available(): 194 self.log.debug(f"Asset {asset.url} not available") 195 return False 196 return True 197 198 def setUp(self): 199 self.qemu_bin = os.getenv('QEMU_TEST_QEMU_BINARY') 200 self.assertIsNotNone(self.qemu_bin, 'QEMU_TEST_QEMU_BINARY must be set') 201 self.arch = self.qemu_bin.split('-')[-1] 202 self.socketdir = None 203 204 self.outputdir = self.build_file('tests', 'functional', 205 self.arch, self.id()) 206 self.workdir = os.path.join(self.outputdir, 'scratch') 207 os.makedirs(self.workdir, exist_ok=True) 208 209 self.log_filename = self.log_file('base.log') 210 self.log = logging.getLogger('qemu-test') 211 self.log.setLevel(logging.DEBUG) 212 self._log_fh = logging.FileHandler(self.log_filename, mode='w') 213 self._log_fh.setLevel(logging.DEBUG) 214 fileFormatter = logging.Formatter( 215 '%(asctime)s - %(levelname)s: %(message)s') 216 self._log_fh.setFormatter(fileFormatter) 217 self.log.addHandler(self._log_fh) 218 219 # Capture QEMUMachine logging 220 self.machinelog = logging.getLogger('qemu.machine') 221 self.machinelog.setLevel(logging.DEBUG) 222 self.machinelog.addHandler(self._log_fh) 223 224 if not self.assets_available(): 225 self.skipTest('One or more assets is not available') 226 227 def tearDown(self): 228 if "QEMU_TEST_KEEP_SCRATCH" not in os.environ: 229 shutil.rmtree(self.workdir) 230 if self.socketdir is not None: 231 shutil.rmtree(self.socketdir.name) 232 self.socketdir = None 233 self.machinelog.removeHandler(self._log_fh) 234 self.log.removeHandler(self._log_fh) 235 236 def main(): 237 path = os.path.basename(sys.argv[0])[:-3] 238 239 cache = os.environ.get("QEMU_TEST_PRECACHE", None) 240 if cache is not None: 241 Asset.precache_suites(path, cache) 242 return 243 244 tr = pycotap.TAPTestRunner(message_log = pycotap.LogMode.LogToError, 245 test_output_log = pycotap.LogMode.LogToError) 246 res = unittest.main(module = None, testRunner = tr, exit = False, 247 argv=["__dummy__", path]) 248 for (test, message) in res.result.errors + res.result.failures: 249 250 if hasattr(test, "log_filename"): 251 print('More information on ' + test.id() + ' could be found here:' 252 '\n %s' % test.log_filename, file=sys.stderr) 253 if hasattr(test, 'console_log_name'): 254 print(' %s' % test.console_log_name, file=sys.stderr) 255 sys.exit(not res.result.wasSuccessful()) 256 257 258class QemuUserTest(QemuBaseTest): 259 260 def setUp(self): 261 super().setUp() 262 self._ldpath = [] 263 264 def add_ldpath(self, ldpath): 265 self._ldpath.append(os.path.abspath(ldpath)) 266 267 def run_cmd(self, bin_path, args=[]): 268 return run([self.qemu_bin] 269 + ["-L %s" % ldpath for ldpath in self._ldpath] 270 + [bin_path] 271 + args, 272 text=True, capture_output=True) 273 274class QemuSystemTest(QemuBaseTest): 275 """Facilitates system emulation tests.""" 276 277 cpu = None 278 machine = None 279 _machinehelp = None 280 281 def setUp(self): 282 self._vms = {} 283 284 super().setUp() 285 286 console_log = logging.getLogger('console') 287 console_log.setLevel(logging.DEBUG) 288 self.console_log_name = self.log_file('console.log') 289 self._console_log_fh = logging.FileHandler(self.console_log_name, 290 mode='w') 291 self._console_log_fh.setLevel(logging.DEBUG) 292 fileFormatter = logging.Formatter('%(asctime)s: %(message)s') 293 self._console_log_fh.setFormatter(fileFormatter) 294 console_log.addHandler(self._console_log_fh) 295 296 def set_machine(self, machinename): 297 # TODO: We should use QMP to get the list of available machines 298 if not self._machinehelp: 299 self._machinehelp = run( 300 [self.qemu_bin, '-M', 'help'], 301 capture_output=True, check=True, encoding='utf8').stdout 302 if self._machinehelp.find(machinename) < 0: 303 self.skipTest('no support for machine ' + machinename) 304 self.machine = machinename 305 306 def require_accelerator(self, accelerator): 307 """ 308 Requires an accelerator to be available for the test to continue 309 310 It takes into account the currently set qemu binary. 311 312 If the check fails, the test is canceled. If the check itself 313 for the given accelerator is not available, the test is also 314 canceled. 315 316 :param accelerator: name of the accelerator, such as "kvm" or "tcg" 317 :type accelerator: str 318 """ 319 checker = {'tcg': tcg_available, 320 'kvm': kvm_available, 321 'hvf': hvf_available, 322 }.get(accelerator) 323 if checker is None: 324 self.skipTest("Don't know how to check for the presence " 325 "of accelerator %s" % accelerator) 326 if not checker(qemu_bin=self.qemu_bin): 327 self.skipTest("%s accelerator does not seem to be " 328 "available" % accelerator) 329 330 def require_netdev(self, netdevname): 331 help = run([self.qemu_bin, 332 '-M', 'none', '-netdev', 'help'], 333 capture_output=True, check=True, encoding='utf8').stdout; 334 if help.find('\n' + netdevname + '\n') < 0: 335 self.skipTest('no support for " + netdevname + " networking') 336 337 def require_device(self, devicename): 338 help = run([self.qemu_bin, 339 '-M', 'none', '-device', 'help'], 340 capture_output=True, check=True, encoding='utf8').stdout; 341 if help.find(devicename) < 0: 342 self.skipTest('no support for device ' + devicename) 343 344 def _new_vm(self, name, *args): 345 vm = QEMUMachine(self.qemu_bin, 346 name=name, 347 base_temp_dir=self.workdir, 348 log_dir=self.log_file()) 349 self.log.debug('QEMUMachine "%s" created', name) 350 self.log.debug('QEMUMachine "%s" temp_dir: %s', name, vm.temp_dir) 351 352 sockpath = os.environ.get("QEMU_TEST_QMP_BACKDOOR", None) 353 if sockpath is not None: 354 vm.add_args("-chardev", 355 f"socket,id=backdoor,path={sockpath},server=on,wait=off", 356 "-mon", "chardev=backdoor,mode=control") 357 358 if args: 359 vm.add_args(*args) 360 return vm 361 362 @property 363 def vm(self): 364 return self.get_vm(name='default') 365 366 def get_vm(self, *args, name=None): 367 if not name: 368 name = str(uuid.uuid4()) 369 if self._vms.get(name) is None: 370 self._vms[name] = self._new_vm(name, *args) 371 if self.cpu is not None: 372 self._vms[name].add_args('-cpu', self.cpu) 373 if self.machine is not None: 374 self._vms[name].set_machine(self.machine) 375 return self._vms[name] 376 377 def set_vm_arg(self, arg, value): 378 """ 379 Set an argument to list of extra arguments to be given to the QEMU 380 binary. If the argument already exists then its value is replaced. 381 382 :param arg: the QEMU argument, such as "-cpu" in "-cpu host" 383 :type arg: str 384 :param value: the argument value, such as "host" in "-cpu host" 385 :type value: str 386 """ 387 if not arg or not value: 388 return 389 if arg not in self.vm.args: 390 self.vm.args.extend([arg, value]) 391 else: 392 idx = self.vm.args.index(arg) + 1 393 if idx < len(self.vm.args): 394 self.vm.args[idx] = value 395 else: 396 self.vm.args.append(value) 397 398 def tearDown(self): 399 for vm in self._vms.values(): 400 vm.shutdown() 401 logging.getLogger('console').removeHandler(self._console_log_fh) 402 super().tearDown() 403