1# Test class and utilities for functional tests 2# 3# Copyright 2018, 2024 Red Hat, Inc. 4# 5# Original Author (Avocado-based tests): 6# Cleber Rosa <crosa@redhat.com> 7# 8# Adaption for standalone version: 9# Thomas Huth <thuth@redhat.com> 10# 11# This work is licensed under the terms of the GNU GPL, version 2 or 12# later. See the COPYING file in the top-level directory. 13 14import logging 15import os 16from pathlib import Path 17import pycotap 18import shutil 19import subprocess 20import sys 21import tempfile 22import unittest 23import uuid 24 25from qemu.machine import QEMUMachine 26from qemu.utils import kvm_available, tcg_available 27 28from .asset import Asset 29from .cmd import run_cmd 30from .config import BUILD_DIR 31 32 33class QemuBaseTest(unittest.TestCase): 34 35 qemu_bin = os.getenv('QEMU_TEST_QEMU_BINARY') 36 arch = None 37 38 workdir = None 39 log = None 40 logdir = None 41 42 ''' 43 Create a temporary directory suitable for storing UNIX 44 socket paths. 45 46 Returns: a tempfile.TemporaryDirectory instance 47 ''' 48 def socket_dir(self): 49 if self.socketdir is None: 50 self.socketdir = tempfile.TemporaryDirectory( 51 prefix="qemu_func_test_sock_") 52 return self.socketdir 53 54 ''' 55 @params args list of zero or more subdirectories or file 56 57 Construct a path for accessing a data file located 58 relative to the source directory that is the root for 59 functional tests. 60 61 @args may be an empty list to reference the root dir 62 itself, may be a single element to reference a file in 63 the root directory, or may be multiple elements to 64 reference a file nested below. The path components 65 will be joined using the platform appropriate path 66 separator. 67 68 Returns: string representing a file path 69 ''' 70 def data_file(self, *args): 71 return str(Path(Path(__file__).parent.parent, *args)) 72 73 ''' 74 @params args list of zero or more subdirectories or file 75 76 Construct a path for accessing a data file located 77 relative to the build directory root. 78 79 @args may be an empty list to reference the build dir 80 itself, may be a single element to reference a file in 81 the build directory, or may be multiple elements to 82 reference a file nested below. The path components 83 will be joined using the platform appropriate path 84 separator. 85 86 Returns: string representing a file path 87 ''' 88 def build_file(self, *args): 89 return str(Path(BUILD_DIR, *args)) 90 91 ''' 92 @params args list of zero or more subdirectories or file 93 94 Construct a path for accessing/creating a scratch file 95 located relative to a temporary directory dedicated to 96 this test case. The directory and its contents will be 97 purged upon completion of the test. 98 99 @args may be an empty list to reference the scratch dir 100 itself, may be a single element to reference a file in 101 the scratch directory, or may be multiple elements to 102 reference a file nested below. The path components 103 will be joined using the platform appropriate path 104 separator. 105 106 Returns: string representing a file path 107 ''' 108 def scratch_file(self, *args): 109 return str(Path(self.workdir, *args)) 110 111 ''' 112 @params args list of zero or more subdirectories or file 113 114 Construct a path for accessing/creating a log file 115 located relative to a temporary directory dedicated to 116 this test case. The directory and its log files will be 117 preserved upon completion of the test. 118 119 @args may be an empty list to reference the log dir 120 itself, may be a single element to reference a file in 121 the log directory, or may be multiple elements to 122 reference a file nested below. The path components 123 will be joined using the platform appropriate path 124 separator. 125 126 Returns: string representing a file path 127 ''' 128 def log_file(self, *args): 129 return str(Path(self.outputdir, *args)) 130 131 def setUp(self, bin_prefix): 132 self.assertIsNotNone(self.qemu_bin, 'QEMU_TEST_QEMU_BINARY must be set') 133 self.arch = self.qemu_bin.split('-')[-1] 134 self.socketdir = None 135 136 self.outputdir = self.build_file('tests', 'functional', 137 self.arch, self.id()) 138 self.workdir = os.path.join(self.outputdir, 'scratch') 139 os.makedirs(self.workdir, exist_ok=True) 140 141 self.log_filename = self.log_file('base.log') 142 self.log = logging.getLogger('qemu-test') 143 self.log.setLevel(logging.DEBUG) 144 self._log_fh = logging.FileHandler(self.log_filename, mode='w') 145 self._log_fh.setLevel(logging.DEBUG) 146 fileFormatter = logging.Formatter( 147 '%(asctime)s - %(levelname)s: %(message)s') 148 self._log_fh.setFormatter(fileFormatter) 149 self.log.addHandler(self._log_fh) 150 151 # Capture QEMUMachine logging 152 self.machinelog = logging.getLogger('qemu.machine') 153 self.machinelog.setLevel(logging.DEBUG) 154 self.machinelog.addHandler(self._log_fh) 155 156 def tearDown(self): 157 if "QEMU_TEST_KEEP_SCRATCH" not in os.environ: 158 shutil.rmtree(self.workdir) 159 if self.socketdir is not None: 160 shutil.rmtree(self.socketdir.name) 161 self.socketdir = None 162 self.machinelog.removeHandler(self._log_fh) 163 self.log.removeHandler(self._log_fh) 164 165 def main(): 166 path = os.path.basename(sys.argv[0])[:-3] 167 168 cache = os.environ.get("QEMU_TEST_PRECACHE", None) 169 if cache is not None: 170 Asset.precache_suites(path, cache) 171 return 172 173 tr = pycotap.TAPTestRunner(message_log = pycotap.LogMode.LogToError, 174 test_output_log = pycotap.LogMode.LogToError) 175 res = unittest.main(module = None, testRunner = tr, exit = False, 176 argv=["__dummy__", path]) 177 for (test, message) in res.result.errors + res.result.failures: 178 179 if hasattr(test, "log_filename"): 180 print('More information on ' + test.id() + ' could be found here:' 181 '\n %s' % test.log_filename, file=sys.stderr) 182 if hasattr(test, 'console_log_name'): 183 print(' %s' % test.console_log_name, file=sys.stderr) 184 sys.exit(not res.result.wasSuccessful()) 185 186 187class QemuUserTest(QemuBaseTest): 188 189 def setUp(self): 190 super().setUp('qemu-') 191 self._ldpath = [] 192 193 def add_ldpath(self, ldpath): 194 self._ldpath.append(os.path.abspath(ldpath)) 195 196 def run_cmd(self, bin_path, args=[]): 197 return subprocess.run([self.qemu_bin] 198 + ["-L %s" % ldpath for ldpath in self._ldpath] 199 + [bin_path] 200 + args, 201 text=True, capture_output=True) 202 203class QemuSystemTest(QemuBaseTest): 204 """Facilitates system emulation tests.""" 205 206 cpu = None 207 machine = None 208 _machinehelp = None 209 210 def setUp(self): 211 self._vms = {} 212 213 super().setUp('qemu-system-') 214 215 console_log = logging.getLogger('console') 216 console_log.setLevel(logging.DEBUG) 217 self.console_log_name = self.log_file('console.log') 218 self._console_log_fh = logging.FileHandler(self.console_log_name, 219 mode='w') 220 self._console_log_fh.setLevel(logging.DEBUG) 221 fileFormatter = logging.Formatter('%(asctime)s: %(message)s') 222 self._console_log_fh.setFormatter(fileFormatter) 223 console_log.addHandler(self._console_log_fh) 224 225 def set_machine(self, machinename): 226 # TODO: We should use QMP to get the list of available machines 227 if not self._machinehelp: 228 self._machinehelp = run_cmd([self.qemu_bin, '-M', 'help'])[0]; 229 if self._machinehelp.find(machinename) < 0: 230 self.skipTest('no support for machine ' + machinename) 231 self.machine = machinename 232 233 def require_accelerator(self, accelerator): 234 """ 235 Requires an accelerator to be available for the test to continue 236 237 It takes into account the currently set qemu binary. 238 239 If the check fails, the test is canceled. If the check itself 240 for the given accelerator is not available, the test is also 241 canceled. 242 243 :param accelerator: name of the accelerator, such as "kvm" or "tcg" 244 :type accelerator: str 245 """ 246 checker = {'tcg': tcg_available, 247 'kvm': kvm_available}.get(accelerator) 248 if checker is None: 249 self.skipTest("Don't know how to check for the presence " 250 "of accelerator %s" % accelerator) 251 if not checker(qemu_bin=self.qemu_bin): 252 self.skipTest("%s accelerator does not seem to be " 253 "available" % accelerator) 254 255 def require_netdev(self, netdevname): 256 netdevhelp = run_cmd([self.qemu_bin, 257 '-M', 'none', '-netdev', 'help'])[0]; 258 if netdevhelp.find('\n' + netdevname + '\n') < 0: 259 self.skipTest('no support for " + netdevname + " networking') 260 261 def require_device(self, devicename): 262 devhelp = run_cmd([self.qemu_bin, 263 '-M', 'none', '-device', 'help'])[0]; 264 if devhelp.find(devicename) < 0: 265 self.skipTest('no support for device ' + devicename) 266 267 def _new_vm(self, name, *args): 268 vm = QEMUMachine(self.qemu_bin, 269 name=name, 270 base_temp_dir=self.workdir, 271 log_dir=self.log_file()) 272 self.log.debug('QEMUMachine "%s" created', name) 273 self.log.debug('QEMUMachine "%s" temp_dir: %s', name, vm.temp_dir) 274 275 sockpath = os.environ.get("QEMU_TEST_QMP_BACKDOOR", None) 276 if sockpath is not None: 277 vm.add_args("-chardev", 278 f"socket,id=backdoor,path={sockpath},server=on,wait=off", 279 "-mon", "chardev=backdoor,mode=control") 280 281 if args: 282 vm.add_args(*args) 283 return vm 284 285 @property 286 def vm(self): 287 return self.get_vm(name='default') 288 289 def get_vm(self, *args, name=None): 290 if not name: 291 name = str(uuid.uuid4()) 292 if self._vms.get(name) is None: 293 self._vms[name] = self._new_vm(name, *args) 294 if self.cpu is not None: 295 self._vms[name].add_args('-cpu', self.cpu) 296 if self.machine is not None: 297 self._vms[name].set_machine(self.machine) 298 return self._vms[name] 299 300 def set_vm_arg(self, arg, value): 301 """ 302 Set an argument to list of extra arguments to be given to the QEMU 303 binary. If the argument already exists then its value is replaced. 304 305 :param arg: the QEMU argument, such as "-cpu" in "-cpu host" 306 :type arg: str 307 :param value: the argument value, such as "host" in "-cpu host" 308 :type value: str 309 """ 310 if not arg or not value: 311 return 312 if arg not in self.vm.args: 313 self.vm.args.extend([arg, value]) 314 else: 315 idx = self.vm.args.index(arg) + 1 316 if idx < len(self.vm.args): 317 self.vm.args[idx] = value 318 else: 319 self.vm.args.append(value) 320 321 def tearDown(self): 322 for vm in self._vms.values(): 323 vm.shutdown() 324 logging.getLogger('console').removeHandler(self._console_log_fh) 325 super().tearDown() 326