1# Test class and utilities for functional tests 2# 3# Copyright 2018, 2024 Red Hat, Inc. 4# 5# Original Author (Avocado-based tests): 6# Cleber Rosa <crosa@redhat.com> 7# 8# Adaption for standalone version: 9# Thomas Huth <thuth@redhat.com> 10# 11# This work is licensed under the terms of the GNU GPL, version 2 or 12# later. See the COPYING file in the top-level directory. 13 14import logging 15import os 16from pathlib import Path 17import pycotap 18import shutil 19import subprocess 20import sys 21import tempfile 22import unittest 23import uuid 24 25from qemu.machine import QEMUMachine 26from qemu.utils import kvm_available, tcg_available 27 28from .archive import archive_extract 29from .asset import Asset 30from .cmd import run_cmd 31from .config import BUILD_DIR 32from .uncompress import uncompress 33 34 35class QemuBaseTest(unittest.TestCase): 36 37 qemu_bin = os.getenv('QEMU_TEST_QEMU_BINARY') 38 arch = None 39 40 workdir = None 41 log = None 42 logdir = None 43 44 ''' 45 @params compressed: filename, Asset, or file-like object to uncompress 46 @params format: optional compression format (gzip, lzma) 47 48 Uncompresses @compressed into the scratch directory. 49 50 If @format is None, heuristics will be applied to guess the format 51 from the filename or Asset URL. @format must be non-None if @uncompressed 52 is a file-like object. 53 54 Returns the fully qualified path to the uncompressed file 55 ''' 56 def uncompress(self, compressed, format=None): 57 self.log.debug(f"Uncompress {compressed} format={format}") 58 if type(compressed) == Asset: 59 compressed.fetch() 60 61 (name, ext) = os.path.splitext(str(compressed)) 62 uncompressed = self.scratch_file(os.path.basename(name)) 63 64 uncompress(compressed, uncompressed, format) 65 66 return uncompressed 67 68 ''' 69 @params archive: filename, Asset, or file-like object to extract 70 @params format: optional archive format (tar, zip, deb, cpio) 71 @params sub_dir: optional sub-directory to extract into 72 @params member: optional member file to limit extraction to 73 74 Extracts @archive into the scratch directory, or a directory beneath 75 named by @sub_dir. All files are extracted unless @member specifies 76 a limit. 77 78 If @format is None, heuristics will be applied to guess the format 79 from the filename or Asset URL. @format must be non-None if @archive 80 is a file-like object. 81 82 If @member is non-None, returns the fully qualified path to @member 83 ''' 84 def archive_extract(self, archive, format=None, sub_dir=None, member=None): 85 self.log.debug(f"Extract {archive} format={format}" + 86 f"sub_dir={sub_dir} member={member}") 87 if type(archive) == Asset: 88 archive.fetch() 89 if sub_dir is None: 90 archive_extract(archive, self.scratch_file(), format, member) 91 else: 92 archive_extract(archive, self.scratch_file(sub_dir), 93 format, member) 94 95 if member is not None: 96 return self.scratch_file(member) 97 return None 98 99 ''' 100 Create a temporary directory suitable for storing UNIX 101 socket paths. 102 103 Returns: a tempfile.TemporaryDirectory instance 104 ''' 105 def socket_dir(self): 106 if self.socketdir is None: 107 self.socketdir = tempfile.TemporaryDirectory( 108 prefix="qemu_func_test_sock_") 109 return self.socketdir 110 111 ''' 112 @params args list of zero or more subdirectories or file 113 114 Construct a path for accessing a data file located 115 relative to the source directory that is the root for 116 functional tests. 117 118 @args may be an empty list to reference the root dir 119 itself, may be a single element to reference a file in 120 the root directory, or may be multiple elements to 121 reference a file nested below. The path components 122 will be joined using the platform appropriate path 123 separator. 124 125 Returns: string representing a file path 126 ''' 127 def data_file(self, *args): 128 return str(Path(Path(__file__).parent.parent, *args)) 129 130 ''' 131 @params args list of zero or more subdirectories or file 132 133 Construct a path for accessing a data file located 134 relative to the build directory root. 135 136 @args may be an empty list to reference the build dir 137 itself, may be a single element to reference a file in 138 the build directory, or may be multiple elements to 139 reference a file nested below. The path components 140 will be joined using the platform appropriate path 141 separator. 142 143 Returns: string representing a file path 144 ''' 145 def build_file(self, *args): 146 return str(Path(BUILD_DIR, *args)) 147 148 ''' 149 @params args list of zero or more subdirectories or file 150 151 Construct a path for accessing/creating a scratch file 152 located relative to a temporary directory dedicated to 153 this test case. The directory and its contents will be 154 purged upon completion of the test. 155 156 @args may be an empty list to reference the scratch dir 157 itself, may be a single element to reference a file in 158 the scratch directory, or may be multiple elements to 159 reference a file nested below. The path components 160 will be joined using the platform appropriate path 161 separator. 162 163 Returns: string representing a file path 164 ''' 165 def scratch_file(self, *args): 166 return str(Path(self.workdir, *args)) 167 168 ''' 169 @params args list of zero or more subdirectories or file 170 171 Construct a path for accessing/creating a log file 172 located relative to a temporary directory dedicated to 173 this test case. The directory and its log files will be 174 preserved upon completion of the test. 175 176 @args may be an empty list to reference the log dir 177 itself, may be a single element to reference a file in 178 the log directory, or may be multiple elements to 179 reference a file nested below. The path components 180 will be joined using the platform appropriate path 181 separator. 182 183 Returns: string representing a file path 184 ''' 185 def log_file(self, *args): 186 return str(Path(self.outputdir, *args)) 187 188 def setUp(self, bin_prefix): 189 self.assertIsNotNone(self.qemu_bin, 'QEMU_TEST_QEMU_BINARY must be set') 190 self.arch = self.qemu_bin.split('-')[-1] 191 self.socketdir = None 192 193 self.outputdir = self.build_file('tests', 'functional', 194 self.arch, self.id()) 195 self.workdir = os.path.join(self.outputdir, 'scratch') 196 os.makedirs(self.workdir, exist_ok=True) 197 198 self.log_filename = self.log_file('base.log') 199 self.log = logging.getLogger('qemu-test') 200 self.log.setLevel(logging.DEBUG) 201 self._log_fh = logging.FileHandler(self.log_filename, mode='w') 202 self._log_fh.setLevel(logging.DEBUG) 203 fileFormatter = logging.Formatter( 204 '%(asctime)s - %(levelname)s: %(message)s') 205 self._log_fh.setFormatter(fileFormatter) 206 self.log.addHandler(self._log_fh) 207 208 # Capture QEMUMachine logging 209 self.machinelog = logging.getLogger('qemu.machine') 210 self.machinelog.setLevel(logging.DEBUG) 211 self.machinelog.addHandler(self._log_fh) 212 213 def tearDown(self): 214 if "QEMU_TEST_KEEP_SCRATCH" not in os.environ: 215 shutil.rmtree(self.workdir) 216 if self.socketdir is not None: 217 shutil.rmtree(self.socketdir.name) 218 self.socketdir = None 219 self.machinelog.removeHandler(self._log_fh) 220 self.log.removeHandler(self._log_fh) 221 222 def main(): 223 path = os.path.basename(sys.argv[0])[:-3] 224 225 cache = os.environ.get("QEMU_TEST_PRECACHE", None) 226 if cache is not None: 227 Asset.precache_suites(path, cache) 228 return 229 230 tr = pycotap.TAPTestRunner(message_log = pycotap.LogMode.LogToError, 231 test_output_log = pycotap.LogMode.LogToError) 232 res = unittest.main(module = None, testRunner = tr, exit = False, 233 argv=["__dummy__", path]) 234 for (test, message) in res.result.errors + res.result.failures: 235 236 if hasattr(test, "log_filename"): 237 print('More information on ' + test.id() + ' could be found here:' 238 '\n %s' % test.log_filename, file=sys.stderr) 239 if hasattr(test, 'console_log_name'): 240 print(' %s' % test.console_log_name, file=sys.stderr) 241 sys.exit(not res.result.wasSuccessful()) 242 243 244class QemuUserTest(QemuBaseTest): 245 246 def setUp(self): 247 super().setUp('qemu-') 248 self._ldpath = [] 249 250 def add_ldpath(self, ldpath): 251 self._ldpath.append(os.path.abspath(ldpath)) 252 253 def run_cmd(self, bin_path, args=[]): 254 return subprocess.run([self.qemu_bin] 255 + ["-L %s" % ldpath for ldpath in self._ldpath] 256 + [bin_path] 257 + args, 258 text=True, capture_output=True) 259 260class QemuSystemTest(QemuBaseTest): 261 """Facilitates system emulation tests.""" 262 263 cpu = None 264 machine = None 265 _machinehelp = None 266 267 def setUp(self): 268 self._vms = {} 269 270 super().setUp('qemu-system-') 271 272 console_log = logging.getLogger('console') 273 console_log.setLevel(logging.DEBUG) 274 self.console_log_name = self.log_file('console.log') 275 self._console_log_fh = logging.FileHandler(self.console_log_name, 276 mode='w') 277 self._console_log_fh.setLevel(logging.DEBUG) 278 fileFormatter = logging.Formatter('%(asctime)s: %(message)s') 279 self._console_log_fh.setFormatter(fileFormatter) 280 console_log.addHandler(self._console_log_fh) 281 282 def set_machine(self, machinename): 283 # TODO: We should use QMP to get the list of available machines 284 if not self._machinehelp: 285 self._machinehelp = run_cmd([self.qemu_bin, '-M', 'help'])[0]; 286 if self._machinehelp.find(machinename) < 0: 287 self.skipTest('no support for machine ' + machinename) 288 self.machine = machinename 289 290 def require_accelerator(self, accelerator): 291 """ 292 Requires an accelerator to be available for the test to continue 293 294 It takes into account the currently set qemu binary. 295 296 If the check fails, the test is canceled. If the check itself 297 for the given accelerator is not available, the test is also 298 canceled. 299 300 :param accelerator: name of the accelerator, such as "kvm" or "tcg" 301 :type accelerator: str 302 """ 303 checker = {'tcg': tcg_available, 304 'kvm': kvm_available}.get(accelerator) 305 if checker is None: 306 self.skipTest("Don't know how to check for the presence " 307 "of accelerator %s" % accelerator) 308 if not checker(qemu_bin=self.qemu_bin): 309 self.skipTest("%s accelerator does not seem to be " 310 "available" % accelerator) 311 312 def require_netdev(self, netdevname): 313 netdevhelp = run_cmd([self.qemu_bin, 314 '-M', 'none', '-netdev', 'help'])[0]; 315 if netdevhelp.find('\n' + netdevname + '\n') < 0: 316 self.skipTest('no support for " + netdevname + " networking') 317 318 def require_device(self, devicename): 319 devhelp = run_cmd([self.qemu_bin, 320 '-M', 'none', '-device', 'help'])[0]; 321 if devhelp.find(devicename) < 0: 322 self.skipTest('no support for device ' + devicename) 323 324 def _new_vm(self, name, *args): 325 vm = QEMUMachine(self.qemu_bin, 326 name=name, 327 base_temp_dir=self.workdir, 328 log_dir=self.log_file()) 329 self.log.debug('QEMUMachine "%s" created', name) 330 self.log.debug('QEMUMachine "%s" temp_dir: %s', name, vm.temp_dir) 331 332 sockpath = os.environ.get("QEMU_TEST_QMP_BACKDOOR", None) 333 if sockpath is not None: 334 vm.add_args("-chardev", 335 f"socket,id=backdoor,path={sockpath},server=on,wait=off", 336 "-mon", "chardev=backdoor,mode=control") 337 338 if args: 339 vm.add_args(*args) 340 return vm 341 342 @property 343 def vm(self): 344 return self.get_vm(name='default') 345 346 def get_vm(self, *args, name=None): 347 if not name: 348 name = str(uuid.uuid4()) 349 if self._vms.get(name) is None: 350 self._vms[name] = self._new_vm(name, *args) 351 if self.cpu is not None: 352 self._vms[name].add_args('-cpu', self.cpu) 353 if self.machine is not None: 354 self._vms[name].set_machine(self.machine) 355 return self._vms[name] 356 357 def set_vm_arg(self, arg, value): 358 """ 359 Set an argument to list of extra arguments to be given to the QEMU 360 binary. If the argument already exists then its value is replaced. 361 362 :param arg: the QEMU argument, such as "-cpu" in "-cpu host" 363 :type arg: str 364 :param value: the argument value, such as "host" in "-cpu host" 365 :type value: str 366 """ 367 if not arg or not value: 368 return 369 if arg not in self.vm.args: 370 self.vm.args.extend([arg, value]) 371 else: 372 idx = self.vm.args.index(arg) + 1 373 if idx < len(self.vm.args): 374 self.vm.args[idx] = value 375 else: 376 self.vm.args.append(value) 377 378 def tearDown(self): 379 for vm in self._vms.values(): 380 vm.shutdown() 381 logging.getLogger('console').removeHandler(self._console_log_fh) 382 super().tearDown() 383