1# Test class and utilities for functional tests 2# 3# Copyright 2018, 2024 Red Hat, Inc. 4# 5# Original Author (Avocado-based tests): 6# Cleber Rosa <crosa@redhat.com> 7# 8# Adaption for standalone version: 9# Thomas Huth <thuth@redhat.com> 10# 11# This work is licensed under the terms of the GNU GPL, version 2 or 12# later. See the COPYING file in the top-level directory. 13 14import logging 15import os 16from pathlib import Path 17import pycotap 18import shutil 19from subprocess import run 20import sys 21import tempfile 22import unittest 23import uuid 24 25from qemu.machine import QEMUMachine 26from qemu.utils import kvm_available, tcg_available 27 28from .archive import archive_extract 29from .asset import Asset 30from .config import BUILD_DIR 31from .uncompress import uncompress 32 33 34class QemuBaseTest(unittest.TestCase): 35 36 arch = None 37 38 workdir = None 39 log = None 40 logdir = None 41 42 ''' 43 @params compressed: filename, Asset, or file-like object to uncompress 44 @params format: optional compression format (gzip, lzma) 45 46 Uncompresses @compressed into the scratch directory. 47 48 If @format is None, heuristics will be applied to guess the format 49 from the filename or Asset URL. @format must be non-None if @uncompressed 50 is a file-like object. 51 52 Returns the fully qualified path to the uncompressed file 53 ''' 54 def uncompress(self, compressed, format=None): 55 self.log.debug(f"Uncompress {compressed} format={format}") 56 if type(compressed) == Asset: 57 compressed.fetch() 58 59 (name, ext) = os.path.splitext(str(compressed)) 60 uncompressed = self.scratch_file(os.path.basename(name)) 61 62 uncompress(compressed, uncompressed, format) 63 64 return uncompressed 65 66 ''' 67 @params archive: filename, Asset, or file-like object to extract 68 @params format: optional archive format (tar, zip, deb, cpio) 69 @params sub_dir: optional sub-directory to extract into 70 @params member: optional member file to limit extraction to 71 72 Extracts @archive into the scratch directory, or a directory beneath 73 named by @sub_dir. All files are extracted unless @member specifies 74 a limit. 75 76 If @format is None, heuristics will be applied to guess the format 77 from the filename or Asset URL. @format must be non-None if @archive 78 is a file-like object. 79 80 If @member is non-None, returns the fully qualified path to @member 81 ''' 82 def archive_extract(self, archive, format=None, sub_dir=None, member=None): 83 self.log.debug(f"Extract {archive} format={format}" + 84 f"sub_dir={sub_dir} member={member}") 85 if type(archive) == Asset: 86 archive.fetch() 87 if sub_dir is None: 88 archive_extract(archive, self.scratch_file(), format, member) 89 else: 90 archive_extract(archive, self.scratch_file(sub_dir), 91 format, member) 92 93 if member is not None: 94 return self.scratch_file(member) 95 return None 96 97 ''' 98 Create a temporary directory suitable for storing UNIX 99 socket paths. 100 101 Returns: a tempfile.TemporaryDirectory instance 102 ''' 103 def socket_dir(self): 104 if self.socketdir is None: 105 self.socketdir = tempfile.TemporaryDirectory( 106 prefix="qemu_func_test_sock_") 107 return self.socketdir 108 109 ''' 110 @params args list of zero or more subdirectories or file 111 112 Construct a path for accessing a data file located 113 relative to the source directory that is the root for 114 functional tests. 115 116 @args may be an empty list to reference the root dir 117 itself, may be a single element to reference a file in 118 the root directory, or may be multiple elements to 119 reference a file nested below. The path components 120 will be joined using the platform appropriate path 121 separator. 122 123 Returns: string representing a file path 124 ''' 125 def data_file(self, *args): 126 return str(Path(Path(__file__).parent.parent, *args)) 127 128 ''' 129 @params args list of zero or more subdirectories or file 130 131 Construct a path for accessing a data file located 132 relative to the build directory root. 133 134 @args may be an empty list to reference the build dir 135 itself, may be a single element to reference a file in 136 the build directory, or may be multiple elements to 137 reference a file nested below. The path components 138 will be joined using the platform appropriate path 139 separator. 140 141 Returns: string representing a file path 142 ''' 143 def build_file(self, *args): 144 return str(Path(BUILD_DIR, *args)) 145 146 ''' 147 @params args list of zero or more subdirectories or file 148 149 Construct a path for accessing/creating a scratch file 150 located relative to a temporary directory dedicated to 151 this test case. The directory and its contents will be 152 purged upon completion of the test. 153 154 @args may be an empty list to reference the scratch dir 155 itself, may be a single element to reference a file in 156 the scratch directory, or may be multiple elements to 157 reference a file nested below. The path components 158 will be joined using the platform appropriate path 159 separator. 160 161 Returns: string representing a file path 162 ''' 163 def scratch_file(self, *args): 164 return str(Path(self.workdir, *args)) 165 166 ''' 167 @params args list of zero or more subdirectories or file 168 169 Construct a path for accessing/creating a log file 170 located relative to a temporary directory dedicated to 171 this test case. The directory and its log files will be 172 preserved upon completion of the test. 173 174 @args may be an empty list to reference the log dir 175 itself, may be a single element to reference a file in 176 the log directory, or may be multiple elements to 177 reference a file nested below. The path components 178 will be joined using the platform appropriate path 179 separator. 180 181 Returns: string representing a file path 182 ''' 183 def log_file(self, *args): 184 return str(Path(self.outputdir, *args)) 185 186 def assets_available(self): 187 for name, asset in vars(self.__class__).items(): 188 if name.startswith("ASSET_") and type(asset) == Asset: 189 if not asset.available(): 190 self.log.debug(f"Asset {asset.url} not available") 191 return False 192 return True 193 194 def setUp(self): 195 self.qemu_bin = os.getenv('QEMU_TEST_QEMU_BINARY') 196 self.assertIsNotNone(self.qemu_bin, 'QEMU_TEST_QEMU_BINARY must be set') 197 self.arch = self.qemu_bin.split('-')[-1] 198 self.socketdir = None 199 200 self.outputdir = self.build_file('tests', 'functional', 201 self.arch, self.id()) 202 self.workdir = os.path.join(self.outputdir, 'scratch') 203 os.makedirs(self.workdir, exist_ok=True) 204 205 self.log_filename = self.log_file('base.log') 206 self.log = logging.getLogger('qemu-test') 207 self.log.setLevel(logging.DEBUG) 208 self._log_fh = logging.FileHandler(self.log_filename, mode='w') 209 self._log_fh.setLevel(logging.DEBUG) 210 fileFormatter = logging.Formatter( 211 '%(asctime)s - %(levelname)s: %(message)s') 212 self._log_fh.setFormatter(fileFormatter) 213 self.log.addHandler(self._log_fh) 214 215 # Capture QEMUMachine logging 216 self.machinelog = logging.getLogger('qemu.machine') 217 self.machinelog.setLevel(logging.DEBUG) 218 self.machinelog.addHandler(self._log_fh) 219 220 if not self.assets_available(): 221 self.skipTest('One or more assets is not available') 222 223 def tearDown(self): 224 if "QEMU_TEST_KEEP_SCRATCH" not in os.environ: 225 shutil.rmtree(self.workdir) 226 if self.socketdir is not None: 227 shutil.rmtree(self.socketdir.name) 228 self.socketdir = None 229 self.machinelog.removeHandler(self._log_fh) 230 self.log.removeHandler(self._log_fh) 231 232 def main(): 233 path = os.path.basename(sys.argv[0])[:-3] 234 235 cache = os.environ.get("QEMU_TEST_PRECACHE", None) 236 if cache is not None: 237 Asset.precache_suites(path, cache) 238 return 239 240 tr = pycotap.TAPTestRunner(message_log = pycotap.LogMode.LogToError, 241 test_output_log = pycotap.LogMode.LogToError) 242 res = unittest.main(module = None, testRunner = tr, exit = False, 243 argv=["__dummy__", path]) 244 for (test, message) in res.result.errors + res.result.failures: 245 246 if hasattr(test, "log_filename"): 247 print('More information on ' + test.id() + ' could be found here:' 248 '\n %s' % test.log_filename, file=sys.stderr) 249 if hasattr(test, 'console_log_name'): 250 print(' %s' % test.console_log_name, file=sys.stderr) 251 sys.exit(not res.result.wasSuccessful()) 252 253 254class QemuUserTest(QemuBaseTest): 255 256 def setUp(self): 257 super().setUp() 258 self._ldpath = [] 259 260 def add_ldpath(self, ldpath): 261 self._ldpath.append(os.path.abspath(ldpath)) 262 263 def run_cmd(self, bin_path, args=[]): 264 return run([self.qemu_bin] 265 + ["-L %s" % ldpath for ldpath in self._ldpath] 266 + [bin_path] 267 + args, 268 text=True, capture_output=True) 269 270class QemuSystemTest(QemuBaseTest): 271 """Facilitates system emulation tests.""" 272 273 cpu = None 274 machine = None 275 _machinehelp = None 276 277 def setUp(self): 278 self._vms = {} 279 280 super().setUp() 281 282 console_log = logging.getLogger('console') 283 console_log.setLevel(logging.DEBUG) 284 self.console_log_name = self.log_file('console.log') 285 self._console_log_fh = logging.FileHandler(self.console_log_name, 286 mode='w') 287 self._console_log_fh.setLevel(logging.DEBUG) 288 fileFormatter = logging.Formatter('%(asctime)s: %(message)s') 289 self._console_log_fh.setFormatter(fileFormatter) 290 console_log.addHandler(self._console_log_fh) 291 292 def set_machine(self, machinename): 293 # TODO: We should use QMP to get the list of available machines 294 if not self._machinehelp: 295 self._machinehelp = run( 296 [self.qemu_bin, '-M', 'help'], 297 capture_output=True, check=True, encoding='utf8').stdout 298 if self._machinehelp.find(machinename) < 0: 299 self.skipTest('no support for machine ' + machinename) 300 self.machine = machinename 301 302 def require_accelerator(self, accelerator): 303 """ 304 Requires an accelerator to be available for the test to continue 305 306 It takes into account the currently set qemu binary. 307 308 If the check fails, the test is canceled. If the check itself 309 for the given accelerator is not available, the test is also 310 canceled. 311 312 :param accelerator: name of the accelerator, such as "kvm" or "tcg" 313 :type accelerator: str 314 """ 315 checker = {'tcg': tcg_available, 316 'kvm': kvm_available}.get(accelerator) 317 if checker is None: 318 self.skipTest("Don't know how to check for the presence " 319 "of accelerator %s" % accelerator) 320 if not checker(qemu_bin=self.qemu_bin): 321 self.skipTest("%s accelerator does not seem to be " 322 "available" % accelerator) 323 324 def require_netdev(self, netdevname): 325 help = run([self.qemu_bin, 326 '-M', 'none', '-netdev', 'help'], 327 capture_output=True, check=True, encoding='utf8').stdout; 328 if help.find('\n' + netdevname + '\n') < 0: 329 self.skipTest('no support for " + netdevname + " networking') 330 331 def require_device(self, devicename): 332 help = run([self.qemu_bin, 333 '-M', 'none', '-device', 'help'], 334 capture_output=True, check=True, encoding='utf8').stdout; 335 if help.find(devicename) < 0: 336 self.skipTest('no support for device ' + devicename) 337 338 def _new_vm(self, name, *args): 339 vm = QEMUMachine(self.qemu_bin, 340 name=name, 341 base_temp_dir=self.workdir, 342 log_dir=self.log_file()) 343 self.log.debug('QEMUMachine "%s" created', name) 344 self.log.debug('QEMUMachine "%s" temp_dir: %s', name, vm.temp_dir) 345 346 sockpath = os.environ.get("QEMU_TEST_QMP_BACKDOOR", None) 347 if sockpath is not None: 348 vm.add_args("-chardev", 349 f"socket,id=backdoor,path={sockpath},server=on,wait=off", 350 "-mon", "chardev=backdoor,mode=control") 351 352 if args: 353 vm.add_args(*args) 354 return vm 355 356 @property 357 def vm(self): 358 return self.get_vm(name='default') 359 360 def get_vm(self, *args, name=None): 361 if not name: 362 name = str(uuid.uuid4()) 363 if self._vms.get(name) is None: 364 self._vms[name] = self._new_vm(name, *args) 365 if self.cpu is not None: 366 self._vms[name].add_args('-cpu', self.cpu) 367 if self.machine is not None: 368 self._vms[name].set_machine(self.machine) 369 return self._vms[name] 370 371 def set_vm_arg(self, arg, value): 372 """ 373 Set an argument to list of extra arguments to be given to the QEMU 374 binary. If the argument already exists then its value is replaced. 375 376 :param arg: the QEMU argument, such as "-cpu" in "-cpu host" 377 :type arg: str 378 :param value: the argument value, such as "host" in "-cpu host" 379 :type value: str 380 """ 381 if not arg or not value: 382 return 383 if arg not in self.vm.args: 384 self.vm.args.extend([arg, value]) 385 else: 386 idx = self.vm.args.index(arg) + 1 387 if idx < len(self.vm.args): 388 self.vm.args[idx] = value 389 else: 390 self.vm.args.append(value) 391 392 def tearDown(self): 393 for vm in self._vms.values(): 394 vm.shutdown() 395 logging.getLogger('console').removeHandler(self._console_log_fh) 396 super().tearDown() 397