1# Test class and utilities for functional tests 2# 3# Copyright 2018, 2024 Red Hat, Inc. 4# 5# Original Author (Avocado-based tests): 6# Cleber Rosa <crosa@redhat.com> 7# 8# Adaption for standalone version: 9# Thomas Huth <thuth@redhat.com> 10# 11# This work is licensed under the terms of the GNU GPL, version 2 or 12# later. See the COPYING file in the top-level directory. 13 14import logging 15import os 16from pathlib import Path 17import pycotap 18import shutil 19from subprocess import run 20import sys 21import tempfile 22import unittest 23import uuid 24 25from qemu.machine import QEMUMachine 26from qemu.utils import kvm_available, tcg_available 27 28from .archive import archive_extract 29from .asset import Asset 30from .config import BUILD_DIR, dso_suffix 31from .uncompress import uncompress 32 33 34class QemuBaseTest(unittest.TestCase): 35 36 ''' 37 @params compressed: filename, Asset, or file-like object to uncompress 38 @params format: optional compression format (gzip, lzma) 39 40 Uncompresses @compressed into the scratch directory. 41 42 If @format is None, heuristics will be applied to guess the format 43 from the filename or Asset URL. @format must be non-None if @uncompressed 44 is a file-like object. 45 46 Returns the fully qualified path to the uncompressed file 47 ''' 48 def uncompress(self, compressed, format=None): 49 self.log.debug(f"Uncompress {compressed} format={format}") 50 if type(compressed) == Asset: 51 compressed.fetch() 52 53 (name, ext) = os.path.splitext(str(compressed)) 54 uncompressed = self.scratch_file(os.path.basename(name)) 55 56 uncompress(compressed, uncompressed, format) 57 58 return uncompressed 59 60 ''' 61 @params archive: filename, Asset, or file-like object to extract 62 @params format: optional archive format (tar, zip, deb, cpio) 63 @params sub_dir: optional sub-directory to extract into 64 @params member: optional member file to limit extraction to 65 66 Extracts @archive into the scratch directory, or a directory beneath 67 named by @sub_dir. All files are extracted unless @member specifies 68 a limit. 69 70 If @format is None, heuristics will be applied to guess the format 71 from the filename or Asset URL. @format must be non-None if @archive 72 is a file-like object. 73 74 If @member is non-None, returns the fully qualified path to @member 75 ''' 76 def archive_extract(self, archive, format=None, sub_dir=None, member=None): 77 self.log.debug(f"Extract {archive} format={format}" + 78 f"sub_dir={sub_dir} member={member}") 79 if type(archive) == Asset: 80 archive.fetch() 81 if sub_dir is None: 82 archive_extract(archive, self.scratch_file(), format, member) 83 else: 84 archive_extract(archive, self.scratch_file(sub_dir), 85 format, member) 86 87 if member is not None: 88 return self.scratch_file(member) 89 return None 90 91 ''' 92 Create a temporary directory suitable for storing UNIX 93 socket paths. 94 95 Returns: a tempfile.TemporaryDirectory instance 96 ''' 97 def socket_dir(self): 98 if self.socketdir is None: 99 self.socketdir = tempfile.TemporaryDirectory( 100 prefix="qemu_func_test_sock_") 101 return self.socketdir 102 103 ''' 104 @params args list of zero or more subdirectories or file 105 106 Construct a path for accessing a data file located 107 relative to the source directory that is the root for 108 functional tests. 109 110 @args may be an empty list to reference the root dir 111 itself, may be a single element to reference a file in 112 the root directory, or may be multiple elements to 113 reference a file nested below. The path components 114 will be joined using the platform appropriate path 115 separator. 116 117 Returns: string representing a file path 118 ''' 119 def data_file(self, *args): 120 return str(Path(Path(__file__).parent.parent, *args)) 121 122 ''' 123 @params args list of zero or more subdirectories or file 124 125 Construct a path for accessing a data file located 126 relative to the build directory root. 127 128 @args may be an empty list to reference the build dir 129 itself, may be a single element to reference a file in 130 the build directory, or may be multiple elements to 131 reference a file nested below. The path components 132 will be joined using the platform appropriate path 133 separator. 134 135 Returns: string representing a file path 136 ''' 137 def build_file(self, *args): 138 return str(Path(BUILD_DIR, *args)) 139 140 ''' 141 @params args list of zero or more subdirectories or file 142 143 Construct a path for accessing/creating a scratch file 144 located relative to a temporary directory dedicated to 145 this test case. The directory and its contents will be 146 purged upon completion of the test. 147 148 @args may be an empty list to reference the scratch dir 149 itself, may be a single element to reference a file in 150 the scratch directory, or may be multiple elements to 151 reference a file nested below. The path components 152 will be joined using the platform appropriate path 153 separator. 154 155 Returns: string representing a file path 156 ''' 157 def scratch_file(self, *args): 158 return str(Path(self.workdir, *args)) 159 160 ''' 161 @params args list of zero or more subdirectories or file 162 163 Construct a path for accessing/creating a log file 164 located relative to a temporary directory dedicated to 165 this test case. The directory and its log files will be 166 preserved upon completion of the test. 167 168 @args may be an empty list to reference the log dir 169 itself, may be a single element to reference a file in 170 the log directory, or may be multiple elements to 171 reference a file nested below. The path components 172 will be joined using the platform appropriate path 173 separator. 174 175 Returns: string representing a file path 176 ''' 177 def log_file(self, *args): 178 return str(Path(self.outputdir, *args)) 179 180 ''' 181 @params plugin name 182 183 Return the full path to the plugin taking into account any host OS 184 specific suffixes. 185 ''' 186 def plugin_file(self, plugin_name): 187 sfx = dso_suffix() 188 return os.path.join('tests', 'tcg', 'plugins', f'{plugin_name}.{sfx}') 189 190 def assets_available(self): 191 for name, asset in vars(self.__class__).items(): 192 if name.startswith("ASSET_") and type(asset) == Asset: 193 if not asset.available(): 194 self.log.debug(f"Asset {asset.url} not available") 195 return False 196 return True 197 198 def setUp(self): 199 self.qemu_bin = os.getenv('QEMU_TEST_QEMU_BINARY') 200 self.assertIsNotNone(self.qemu_bin, 'QEMU_TEST_QEMU_BINARY must be set') 201 self.arch = self.qemu_bin.split('-')[-1] 202 self.socketdir = None 203 204 self.outputdir = self.build_file('tests', 'functional', 205 self.arch, self.id()) 206 self.workdir = os.path.join(self.outputdir, 'scratch') 207 os.makedirs(self.workdir, exist_ok=True) 208 209 self.log_filename = self.log_file('base.log') 210 self.log = logging.getLogger('qemu-test') 211 self.log.setLevel(logging.DEBUG) 212 self._log_fh = logging.FileHandler(self.log_filename, mode='w') 213 self._log_fh.setLevel(logging.DEBUG) 214 fileFormatter = logging.Formatter( 215 '%(asctime)s - %(levelname)s: %(message)s') 216 self._log_fh.setFormatter(fileFormatter) 217 self.log.addHandler(self._log_fh) 218 219 # Capture QEMUMachine logging 220 self.machinelog = logging.getLogger('qemu.machine') 221 self.machinelog.setLevel(logging.DEBUG) 222 self.machinelog.addHandler(self._log_fh) 223 224 if not self.assets_available(): 225 self.skipTest('One or more assets is not available') 226 227 def tearDown(self): 228 if "QEMU_TEST_KEEP_SCRATCH" not in os.environ: 229 shutil.rmtree(self.workdir) 230 if self.socketdir is not None: 231 shutil.rmtree(self.socketdir.name) 232 self.socketdir = None 233 self.machinelog.removeHandler(self._log_fh) 234 self.log.removeHandler(self._log_fh) 235 236 def main(): 237 path = os.path.basename(sys.argv[0])[:-3] 238 239 cache = os.environ.get("QEMU_TEST_PRECACHE", None) 240 if cache is not None: 241 Asset.precache_suites(path, cache) 242 return 243 244 tr = pycotap.TAPTestRunner(message_log = pycotap.LogMode.LogToError, 245 test_output_log = pycotap.LogMode.LogToError) 246 res = unittest.main(module = None, testRunner = tr, exit = False, 247 argv=["__dummy__", path]) 248 for (test, message) in res.result.errors + res.result.failures: 249 250 if hasattr(test, "log_filename"): 251 print('More information on ' + test.id() + ' could be found here:' 252 '\n %s' % test.log_filename, file=sys.stderr) 253 if hasattr(test, 'console_log_name'): 254 print(' %s' % test.console_log_name, file=sys.stderr) 255 sys.exit(not res.result.wasSuccessful()) 256 257 258class QemuUserTest(QemuBaseTest): 259 260 def setUp(self): 261 super().setUp() 262 self._ldpath = [] 263 264 def add_ldpath(self, ldpath): 265 self._ldpath.append(os.path.abspath(ldpath)) 266 267 def run_cmd(self, bin_path, args=[]): 268 return run([self.qemu_bin] 269 + ["-L %s" % ldpath for ldpath in self._ldpath] 270 + [bin_path] 271 + args, 272 text=True, capture_output=True) 273 274class QemuSystemTest(QemuBaseTest): 275 """Facilitates system emulation tests.""" 276 277 cpu = None 278 machine = None 279 _machinehelp = None 280 281 def setUp(self): 282 self._vms = {} 283 284 super().setUp() 285 286 console_log = logging.getLogger('console') 287 console_log.setLevel(logging.DEBUG) 288 self.console_log_name = self.log_file('console.log') 289 self._console_log_fh = logging.FileHandler(self.console_log_name, 290 mode='w') 291 self._console_log_fh.setLevel(logging.DEBUG) 292 fileFormatter = logging.Formatter('%(asctime)s: %(message)s') 293 self._console_log_fh.setFormatter(fileFormatter) 294 console_log.addHandler(self._console_log_fh) 295 296 def set_machine(self, machinename): 297 # TODO: We should use QMP to get the list of available machines 298 if not self._machinehelp: 299 self._machinehelp = run( 300 [self.qemu_bin, '-M', 'help'], 301 capture_output=True, check=True, encoding='utf8').stdout 302 if self._machinehelp.find(machinename) < 0: 303 self.skipTest('no support for machine ' + machinename) 304 self.machine = machinename 305 306 def require_accelerator(self, accelerator): 307 """ 308 Requires an accelerator to be available for the test to continue 309 310 It takes into account the currently set qemu binary. 311 312 If the check fails, the test is canceled. If the check itself 313 for the given accelerator is not available, the test is also 314 canceled. 315 316 :param accelerator: name of the accelerator, such as "kvm" or "tcg" 317 :type accelerator: str 318 """ 319 checker = {'tcg': tcg_available, 320 'kvm': kvm_available}.get(accelerator) 321 if checker is None: 322 self.skipTest("Don't know how to check for the presence " 323 "of accelerator %s" % accelerator) 324 if not checker(qemu_bin=self.qemu_bin): 325 self.skipTest("%s accelerator does not seem to be " 326 "available" % accelerator) 327 328 def require_netdev(self, netdevname): 329 help = run([self.qemu_bin, 330 '-M', 'none', '-netdev', 'help'], 331 capture_output=True, check=True, encoding='utf8').stdout; 332 if help.find('\n' + netdevname + '\n') < 0: 333 self.skipTest('no support for " + netdevname + " networking') 334 335 def require_device(self, devicename): 336 help = run([self.qemu_bin, 337 '-M', 'none', '-device', 'help'], 338 capture_output=True, check=True, encoding='utf8').stdout; 339 if help.find(devicename) < 0: 340 self.skipTest('no support for device ' + devicename) 341 342 def _new_vm(self, name, *args): 343 vm = QEMUMachine(self.qemu_bin, 344 name=name, 345 base_temp_dir=self.workdir, 346 log_dir=self.log_file()) 347 self.log.debug('QEMUMachine "%s" created', name) 348 self.log.debug('QEMUMachine "%s" temp_dir: %s', name, vm.temp_dir) 349 350 sockpath = os.environ.get("QEMU_TEST_QMP_BACKDOOR", None) 351 if sockpath is not None: 352 vm.add_args("-chardev", 353 f"socket,id=backdoor,path={sockpath},server=on,wait=off", 354 "-mon", "chardev=backdoor,mode=control") 355 356 if args: 357 vm.add_args(*args) 358 return vm 359 360 @property 361 def vm(self): 362 return self.get_vm(name='default') 363 364 def get_vm(self, *args, name=None): 365 if not name: 366 name = str(uuid.uuid4()) 367 if self._vms.get(name) is None: 368 self._vms[name] = self._new_vm(name, *args) 369 if self.cpu is not None: 370 self._vms[name].add_args('-cpu', self.cpu) 371 if self.machine is not None: 372 self._vms[name].set_machine(self.machine) 373 return self._vms[name] 374 375 def set_vm_arg(self, arg, value): 376 """ 377 Set an argument to list of extra arguments to be given to the QEMU 378 binary. If the argument already exists then its value is replaced. 379 380 :param arg: the QEMU argument, such as "-cpu" in "-cpu host" 381 :type arg: str 382 :param value: the argument value, such as "host" in "-cpu host" 383 :type value: str 384 """ 385 if not arg or not value: 386 return 387 if arg not in self.vm.args: 388 self.vm.args.extend([arg, value]) 389 else: 390 idx = self.vm.args.index(arg) + 1 391 if idx < len(self.vm.args): 392 self.vm.args[idx] = value 393 else: 394 self.vm.args.append(value) 395 396 def tearDown(self): 397 for vm in self._vms.values(): 398 vm.shutdown() 399 logging.getLogger('console').removeHandler(self._console_log_fh) 400 super().tearDown() 401