1# Test class and utilities for functional tests 2# 3# Copyright 2018, 2024 Red Hat, Inc. 4# 5# Original Author (Avocado-based tests): 6# Cleber Rosa <crosa@redhat.com> 7# 8# Adaption for standalone version: 9# Thomas Huth <thuth@redhat.com> 10# 11# This work is licensed under the terms of the GNU GPL, version 2 or 12# later. See the COPYING file in the top-level directory. 13 14import logging 15import os 16from pathlib import Path 17import pycotap 18import shutil 19from subprocess import run 20import sys 21import tempfile 22import unittest 23import uuid 24 25from qemu.machine import QEMUMachine 26from qemu.utils import kvm_available, tcg_available 27 28from .archive import archive_extract 29from .asset import Asset 30from .config import BUILD_DIR, dso_suffix 31from .uncompress import uncompress 32 33 34class QemuBaseTest(unittest.TestCase): 35 36 arch = None 37 38 workdir = None 39 log = None 40 logdir = None 41 42 ''' 43 @params compressed: filename, Asset, or file-like object to uncompress 44 @params format: optional compression format (gzip, lzma) 45 46 Uncompresses @compressed into the scratch directory. 47 48 If @format is None, heuristics will be applied to guess the format 49 from the filename or Asset URL. @format must be non-None if @uncompressed 50 is a file-like object. 51 52 Returns the fully qualified path to the uncompressed file 53 ''' 54 def uncompress(self, compressed, format=None): 55 self.log.debug(f"Uncompress {compressed} format={format}") 56 if type(compressed) == Asset: 57 compressed.fetch() 58 59 (name, ext) = os.path.splitext(str(compressed)) 60 uncompressed = self.scratch_file(os.path.basename(name)) 61 62 uncompress(compressed, uncompressed, format) 63 64 return uncompressed 65 66 ''' 67 @params archive: filename, Asset, or file-like object to extract 68 @params format: optional archive format (tar, zip, deb, cpio) 69 @params sub_dir: optional sub-directory to extract into 70 @params member: optional member file to limit extraction to 71 72 Extracts @archive into the scratch directory, or a directory beneath 73 named by @sub_dir. All files are extracted unless @member specifies 74 a limit. 75 76 If @format is None, heuristics will be applied to guess the format 77 from the filename or Asset URL. @format must be non-None if @archive 78 is a file-like object. 79 80 If @member is non-None, returns the fully qualified path to @member 81 ''' 82 def archive_extract(self, archive, format=None, sub_dir=None, member=None): 83 self.log.debug(f"Extract {archive} format={format}" + 84 f"sub_dir={sub_dir} member={member}") 85 if type(archive) == Asset: 86 archive.fetch() 87 if sub_dir is None: 88 archive_extract(archive, self.scratch_file(), format, member) 89 else: 90 archive_extract(archive, self.scratch_file(sub_dir), 91 format, member) 92 93 if member is not None: 94 return self.scratch_file(member) 95 return None 96 97 ''' 98 Create a temporary directory suitable for storing UNIX 99 socket paths. 100 101 Returns: a tempfile.TemporaryDirectory instance 102 ''' 103 def socket_dir(self): 104 if self.socketdir is None: 105 self.socketdir = tempfile.TemporaryDirectory( 106 prefix="qemu_func_test_sock_") 107 return self.socketdir 108 109 ''' 110 @params args list of zero or more subdirectories or file 111 112 Construct a path for accessing a data file located 113 relative to the source directory that is the root for 114 functional tests. 115 116 @args may be an empty list to reference the root dir 117 itself, may be a single element to reference a file in 118 the root directory, or may be multiple elements to 119 reference a file nested below. The path components 120 will be joined using the platform appropriate path 121 separator. 122 123 Returns: string representing a file path 124 ''' 125 def data_file(self, *args): 126 return str(Path(Path(__file__).parent.parent, *args)) 127 128 ''' 129 @params args list of zero or more subdirectories or file 130 131 Construct a path for accessing a data file located 132 relative to the build directory root. 133 134 @args may be an empty list to reference the build dir 135 itself, may be a single element to reference a file in 136 the build directory, or may be multiple elements to 137 reference a file nested below. The path components 138 will be joined using the platform appropriate path 139 separator. 140 141 Returns: string representing a file path 142 ''' 143 def build_file(self, *args): 144 return str(Path(BUILD_DIR, *args)) 145 146 ''' 147 @params args list of zero or more subdirectories or file 148 149 Construct a path for accessing/creating a scratch file 150 located relative to a temporary directory dedicated to 151 this test case. The directory and its contents will be 152 purged upon completion of the test. 153 154 @args may be an empty list to reference the scratch dir 155 itself, may be a single element to reference a file in 156 the scratch directory, or may be multiple elements to 157 reference a file nested below. The path components 158 will be joined using the platform appropriate path 159 separator. 160 161 Returns: string representing a file path 162 ''' 163 def scratch_file(self, *args): 164 return str(Path(self.workdir, *args)) 165 166 ''' 167 @params args list of zero or more subdirectories or file 168 169 Construct a path for accessing/creating a log file 170 located relative to a temporary directory dedicated to 171 this test case. The directory and its log files will be 172 preserved upon completion of the test. 173 174 @args may be an empty list to reference the log dir 175 itself, may be a single element to reference a file in 176 the log directory, or may be multiple elements to 177 reference a file nested below. The path components 178 will be joined using the platform appropriate path 179 separator. 180 181 Returns: string representing a file path 182 ''' 183 def log_file(self, *args): 184 return str(Path(self.outputdir, *args)) 185 186 ''' 187 @params plugin name 188 189 Return the full path to the plugin taking into account any host OS 190 specific suffixes. 191 ''' 192 def plugin_file(self, plugin_name): 193 sfx = dso_suffix() 194 return os.path.join('tests', 'tcg', 'plugins', f'{plugin_name}.{sfx}') 195 196 def assets_available(self): 197 for name, asset in vars(self.__class__).items(): 198 if name.startswith("ASSET_") and type(asset) == Asset: 199 if not asset.available(): 200 self.log.debug(f"Asset {asset.url} not available") 201 return False 202 return True 203 204 def setUp(self): 205 self.qemu_bin = os.getenv('QEMU_TEST_QEMU_BINARY') 206 self.assertIsNotNone(self.qemu_bin, 'QEMU_TEST_QEMU_BINARY must be set') 207 self.arch = self.qemu_bin.split('-')[-1] 208 self.socketdir = None 209 210 self.outputdir = self.build_file('tests', 'functional', 211 self.arch, self.id()) 212 self.workdir = os.path.join(self.outputdir, 'scratch') 213 os.makedirs(self.workdir, exist_ok=True) 214 215 self.log_filename = self.log_file('base.log') 216 self.log = logging.getLogger('qemu-test') 217 self.log.setLevel(logging.DEBUG) 218 self._log_fh = logging.FileHandler(self.log_filename, mode='w') 219 self._log_fh.setLevel(logging.DEBUG) 220 fileFormatter = logging.Formatter( 221 '%(asctime)s - %(levelname)s: %(message)s') 222 self._log_fh.setFormatter(fileFormatter) 223 self.log.addHandler(self._log_fh) 224 225 # Capture QEMUMachine logging 226 self.machinelog = logging.getLogger('qemu.machine') 227 self.machinelog.setLevel(logging.DEBUG) 228 self.machinelog.addHandler(self._log_fh) 229 230 if not self.assets_available(): 231 self.skipTest('One or more assets is not available') 232 233 def tearDown(self): 234 if "QEMU_TEST_KEEP_SCRATCH" not in os.environ: 235 shutil.rmtree(self.workdir) 236 if self.socketdir is not None: 237 shutil.rmtree(self.socketdir.name) 238 self.socketdir = None 239 self.machinelog.removeHandler(self._log_fh) 240 self.log.removeHandler(self._log_fh) 241 242 def main(): 243 path = os.path.basename(sys.argv[0])[:-3] 244 245 cache = os.environ.get("QEMU_TEST_PRECACHE", None) 246 if cache is not None: 247 Asset.precache_suites(path, cache) 248 return 249 250 tr = pycotap.TAPTestRunner(message_log = pycotap.LogMode.LogToError, 251 test_output_log = pycotap.LogMode.LogToError) 252 res = unittest.main(module = None, testRunner = tr, exit = False, 253 argv=["__dummy__", path]) 254 for (test, message) in res.result.errors + res.result.failures: 255 256 if hasattr(test, "log_filename"): 257 print('More information on ' + test.id() + ' could be found here:' 258 '\n %s' % test.log_filename, file=sys.stderr) 259 if hasattr(test, 'console_log_name'): 260 print(' %s' % test.console_log_name, file=sys.stderr) 261 sys.exit(not res.result.wasSuccessful()) 262 263 264class QemuUserTest(QemuBaseTest): 265 266 def setUp(self): 267 super().setUp() 268 self._ldpath = [] 269 270 def add_ldpath(self, ldpath): 271 self._ldpath.append(os.path.abspath(ldpath)) 272 273 def run_cmd(self, bin_path, args=[]): 274 return run([self.qemu_bin] 275 + ["-L %s" % ldpath for ldpath in self._ldpath] 276 + [bin_path] 277 + args, 278 text=True, capture_output=True) 279 280class QemuSystemTest(QemuBaseTest): 281 """Facilitates system emulation tests.""" 282 283 cpu = None 284 machine = None 285 _machinehelp = None 286 287 def setUp(self): 288 self._vms = {} 289 290 super().setUp() 291 292 console_log = logging.getLogger('console') 293 console_log.setLevel(logging.DEBUG) 294 self.console_log_name = self.log_file('console.log') 295 self._console_log_fh = logging.FileHandler(self.console_log_name, 296 mode='w') 297 self._console_log_fh.setLevel(logging.DEBUG) 298 fileFormatter = logging.Formatter('%(asctime)s: %(message)s') 299 self._console_log_fh.setFormatter(fileFormatter) 300 console_log.addHandler(self._console_log_fh) 301 302 def set_machine(self, machinename): 303 # TODO: We should use QMP to get the list of available machines 304 if not self._machinehelp: 305 self._machinehelp = run( 306 [self.qemu_bin, '-M', 'help'], 307 capture_output=True, check=True, encoding='utf8').stdout 308 if self._machinehelp.find(machinename) < 0: 309 self.skipTest('no support for machine ' + machinename) 310 self.machine = machinename 311 312 def require_accelerator(self, accelerator): 313 """ 314 Requires an accelerator to be available for the test to continue 315 316 It takes into account the currently set qemu binary. 317 318 If the check fails, the test is canceled. If the check itself 319 for the given accelerator is not available, the test is also 320 canceled. 321 322 :param accelerator: name of the accelerator, such as "kvm" or "tcg" 323 :type accelerator: str 324 """ 325 checker = {'tcg': tcg_available, 326 'kvm': kvm_available}.get(accelerator) 327 if checker is None: 328 self.skipTest("Don't know how to check for the presence " 329 "of accelerator %s" % accelerator) 330 if not checker(qemu_bin=self.qemu_bin): 331 self.skipTest("%s accelerator does not seem to be " 332 "available" % accelerator) 333 334 def require_netdev(self, netdevname): 335 help = run([self.qemu_bin, 336 '-M', 'none', '-netdev', 'help'], 337 capture_output=True, check=True, encoding='utf8').stdout; 338 if help.find('\n' + netdevname + '\n') < 0: 339 self.skipTest('no support for " + netdevname + " networking') 340 341 def require_device(self, devicename): 342 help = run([self.qemu_bin, 343 '-M', 'none', '-device', 'help'], 344 capture_output=True, check=True, encoding='utf8').stdout; 345 if help.find(devicename) < 0: 346 self.skipTest('no support for device ' + devicename) 347 348 def _new_vm(self, name, *args): 349 vm = QEMUMachine(self.qemu_bin, 350 name=name, 351 base_temp_dir=self.workdir, 352 log_dir=self.log_file()) 353 self.log.debug('QEMUMachine "%s" created', name) 354 self.log.debug('QEMUMachine "%s" temp_dir: %s', name, vm.temp_dir) 355 356 sockpath = os.environ.get("QEMU_TEST_QMP_BACKDOOR", None) 357 if sockpath is not None: 358 vm.add_args("-chardev", 359 f"socket,id=backdoor,path={sockpath},server=on,wait=off", 360 "-mon", "chardev=backdoor,mode=control") 361 362 if args: 363 vm.add_args(*args) 364 return vm 365 366 @property 367 def vm(self): 368 return self.get_vm(name='default') 369 370 def get_vm(self, *args, name=None): 371 if not name: 372 name = str(uuid.uuid4()) 373 if self._vms.get(name) is None: 374 self._vms[name] = self._new_vm(name, *args) 375 if self.cpu is not None: 376 self._vms[name].add_args('-cpu', self.cpu) 377 if self.machine is not None: 378 self._vms[name].set_machine(self.machine) 379 return self._vms[name] 380 381 def set_vm_arg(self, arg, value): 382 """ 383 Set an argument to list of extra arguments to be given to the QEMU 384 binary. If the argument already exists then its value is replaced. 385 386 :param arg: the QEMU argument, such as "-cpu" in "-cpu host" 387 :type arg: str 388 :param value: the argument value, such as "host" in "-cpu host" 389 :type value: str 390 """ 391 if not arg or not value: 392 return 393 if arg not in self.vm.args: 394 self.vm.args.extend([arg, value]) 395 else: 396 idx = self.vm.args.index(arg) + 1 397 if idx < len(self.vm.args): 398 self.vm.args[idx] = value 399 else: 400 self.vm.args.append(value) 401 402 def tearDown(self): 403 for vm in self._vms.values(): 404 vm.shutdown() 405 logging.getLogger('console').removeHandler(self._console_log_fh) 406 super().tearDown() 407