1# Test class and utilities for functional tests 2# 3# Copyright 2018, 2024 Red Hat, Inc. 4# 5# Original Author (Avocado-based tests): 6# Cleber Rosa <crosa@redhat.com> 7# 8# Adaption for standalone version: 9# Thomas Huth <thuth@redhat.com> 10# 11# This work is licensed under the terms of the GNU GPL, version 2 or 12# later. See the COPYING file in the top-level directory. 13 14import logging 15import os 16from pathlib import Path 17import pycotap 18import shutil 19import subprocess 20import sys 21import tempfile 22import unittest 23import uuid 24 25from qemu.machine import QEMUMachine 26from qemu.utils import kvm_available, tcg_available 27 28from .asset import Asset 29from .cmd import run_cmd 30from .config import BUILD_DIR 31 32 33class QemuBaseTest(unittest.TestCase): 34 35 qemu_bin = os.getenv('QEMU_TEST_QEMU_BINARY') 36 arch = None 37 38 workdir = None 39 log = None 40 logdir = None 41 42 ''' 43 Create a temporary directory suitable for storing UNIX 44 socket paths. 45 46 Returns: a tempfile.TemporaryDirectory instance 47 ''' 48 def socket_dir(self): 49 if self.socketdir is None: 50 self.socketdir = tempfile.TemporaryDirectory( 51 prefix="qemu_func_test_sock_") 52 return self.socketdir 53 54 ''' 55 @params args list of zero or more subdirectories or file 56 57 Construct a path for accessing a data file located 58 relative to the source directory that is the root for 59 functional tests. 60 61 @args may be an empty list to reference the root dir 62 itself, may be a single element to reference a file in 63 the root directory, or may be multiple elements to 64 reference a file nested below. The path components 65 will be joined using the platform appropriate path 66 separator. 67 68 Returns: string representing a file path 69 ''' 70 def data_file(self, *args): 71 return str(Path(Path(__file__).parent.parent, *args)) 72 73 ''' 74 @params args list of zero or more subdirectories or file 75 76 Construct a path for accessing a data file located 77 relative to the build directory root. 78 79 @args may be an empty list to reference the build dir 80 itself, may be a single element to reference a file in 81 the build directory, or may be multiple elements to 82 reference a file nested below. The path components 83 will be joined using the platform appropriate path 84 separator. 85 86 Returns: string representing a file path 87 ''' 88 def build_file(self, *args): 89 return str(Path(BUILD_DIR, *args)) 90 91 ''' 92 @params args list of zero or more subdirectories or file 93 94 Construct a path for accessing/creating a scratch file 95 located relative to a temporary directory dedicated to 96 this test case. The directory and its contents will be 97 purged upon completion of the test. 98 99 @args may be an empty list to reference the scratch dir 100 itself, may be a single element to reference a file in 101 the scratch directory, or may be multiple elements to 102 reference a file nested below. The path components 103 will be joined using the platform appropriate path 104 separator. 105 106 Returns: string representing a file path 107 ''' 108 def scratch_file(self, *args): 109 return str(Path(self.workdir, *args)) 110 111 ''' 112 @params args list of zero or more subdirectories or file 113 114 Construct a path for accessing/creating a log file 115 located relative to a temporary directory dedicated to 116 this test case. The directory and its log files will be 117 preserved upon completion of the test. 118 119 @args may be an empty list to reference the log dir 120 itself, may be a single element to reference a file in 121 the log directory, or may be multiple elements to 122 reference a file nested below. The path components 123 will be joined using the platform appropriate path 124 separator. 125 126 Returns: string representing a file path 127 ''' 128 def log_file(self, *args): 129 return str(Path(self.logdir, *args)) 130 131 def setUp(self, bin_prefix): 132 self.assertIsNotNone(self.qemu_bin, 'QEMU_TEST_QEMU_BINARY must be set') 133 self.arch = self.qemu_bin.split('-')[-1] 134 self.socketdir = None 135 136 self.outputdir = os.path.join(BUILD_DIR, 'tests', 'functional', 137 self.arch, self.id()) 138 self.workdir = os.path.join(self.outputdir, 'scratch') 139 os.makedirs(self.workdir, exist_ok=True) 140 141 self.logdir = self.outputdir 142 self.log_filename = os.path.join(self.logdir, 'base.log') 143 self.log = logging.getLogger('qemu-test') 144 self.log.setLevel(logging.DEBUG) 145 self._log_fh = logging.FileHandler(self.log_filename, mode='w') 146 self._log_fh.setLevel(logging.DEBUG) 147 fileFormatter = logging.Formatter( 148 '%(asctime)s - %(levelname)s: %(message)s') 149 self._log_fh.setFormatter(fileFormatter) 150 self.log.addHandler(self._log_fh) 151 152 # Capture QEMUMachine logging 153 self.machinelog = logging.getLogger('qemu.machine') 154 self.machinelog.setLevel(logging.DEBUG) 155 self.machinelog.addHandler(self._log_fh) 156 157 def tearDown(self): 158 if "QEMU_TEST_KEEP_SCRATCH" not in os.environ: 159 shutil.rmtree(self.workdir) 160 if self.socketdir is not None: 161 shutil.rmtree(self.socketdir.name) 162 self.socketdir = None 163 self.machinelog.removeHandler(self._log_fh) 164 self.log.removeHandler(self._log_fh) 165 166 def main(): 167 path = os.path.basename(sys.argv[0])[:-3] 168 169 cache = os.environ.get("QEMU_TEST_PRECACHE", None) 170 if cache is not None: 171 Asset.precache_suites(path, cache) 172 return 173 174 tr = pycotap.TAPTestRunner(message_log = pycotap.LogMode.LogToError, 175 test_output_log = pycotap.LogMode.LogToError) 176 res = unittest.main(module = None, testRunner = tr, exit = False, 177 argv=["__dummy__", path]) 178 for (test, message) in res.result.errors + res.result.failures: 179 180 if hasattr(test, "log_filename"): 181 print('More information on ' + test.id() + ' could be found here:' 182 '\n %s' % test.log_filename, file=sys.stderr) 183 if hasattr(test, 'console_log_name'): 184 print(' %s' % test.console_log_name, file=sys.stderr) 185 sys.exit(not res.result.wasSuccessful()) 186 187 188class QemuUserTest(QemuBaseTest): 189 190 def setUp(self): 191 super().setUp('qemu-') 192 self._ldpath = [] 193 194 def add_ldpath(self, ldpath): 195 self._ldpath.append(os.path.abspath(ldpath)) 196 197 def run_cmd(self, bin_path, args=[]): 198 return subprocess.run([self.qemu_bin] 199 + ["-L %s" % ldpath for ldpath in self._ldpath] 200 + [bin_path] 201 + args, 202 text=True, capture_output=True) 203 204class QemuSystemTest(QemuBaseTest): 205 """Facilitates system emulation tests.""" 206 207 cpu = None 208 machine = None 209 _machinehelp = None 210 211 def setUp(self): 212 self._vms = {} 213 214 super().setUp('qemu-system-') 215 216 console_log = logging.getLogger('console') 217 console_log.setLevel(logging.DEBUG) 218 self.console_log_name = os.path.join(self.logdir, 'console.log') 219 self._console_log_fh = logging.FileHandler(self.console_log_name, 220 mode='w') 221 self._console_log_fh.setLevel(logging.DEBUG) 222 fileFormatter = logging.Formatter('%(asctime)s: %(message)s') 223 self._console_log_fh.setFormatter(fileFormatter) 224 console_log.addHandler(self._console_log_fh) 225 226 def set_machine(self, machinename): 227 # TODO: We should use QMP to get the list of available machines 228 if not self._machinehelp: 229 self._machinehelp = run_cmd([self.qemu_bin, '-M', 'help'])[0]; 230 if self._machinehelp.find(machinename) < 0: 231 self.skipTest('no support for machine ' + machinename) 232 self.machine = machinename 233 234 def require_accelerator(self, accelerator): 235 """ 236 Requires an accelerator to be available for the test to continue 237 238 It takes into account the currently set qemu binary. 239 240 If the check fails, the test is canceled. If the check itself 241 for the given accelerator is not available, the test is also 242 canceled. 243 244 :param accelerator: name of the accelerator, such as "kvm" or "tcg" 245 :type accelerator: str 246 """ 247 checker = {'tcg': tcg_available, 248 'kvm': kvm_available}.get(accelerator) 249 if checker is None: 250 self.skipTest("Don't know how to check for the presence " 251 "of accelerator %s" % accelerator) 252 if not checker(qemu_bin=self.qemu_bin): 253 self.skipTest("%s accelerator does not seem to be " 254 "available" % accelerator) 255 256 def require_netdev(self, netdevname): 257 netdevhelp = run_cmd([self.qemu_bin, 258 '-M', 'none', '-netdev', 'help'])[0]; 259 if netdevhelp.find('\n' + netdevname + '\n') < 0: 260 self.skipTest('no support for " + netdevname + " networking') 261 262 def require_device(self, devicename): 263 devhelp = run_cmd([self.qemu_bin, 264 '-M', 'none', '-device', 'help'])[0]; 265 if devhelp.find(devicename) < 0: 266 self.skipTest('no support for device ' + devicename) 267 268 def _new_vm(self, name, *args): 269 vm = QEMUMachine(self.qemu_bin, 270 name=name, 271 base_temp_dir=self.workdir, 272 log_dir=self.logdir) 273 self.log.debug('QEMUMachine "%s" created', name) 274 self.log.debug('QEMUMachine "%s" temp_dir: %s', name, vm.temp_dir) 275 276 sockpath = os.environ.get("QEMU_TEST_QMP_BACKDOOR", None) 277 if sockpath is not None: 278 vm.add_args("-chardev", 279 f"socket,id=backdoor,path={sockpath},server=on,wait=off", 280 "-mon", "chardev=backdoor,mode=control") 281 282 if args: 283 vm.add_args(*args) 284 return vm 285 286 @property 287 def vm(self): 288 return self.get_vm(name='default') 289 290 def get_vm(self, *args, name=None): 291 if not name: 292 name = str(uuid.uuid4()) 293 if self._vms.get(name) is None: 294 self._vms[name] = self._new_vm(name, *args) 295 if self.cpu is not None: 296 self._vms[name].add_args('-cpu', self.cpu) 297 if self.machine is not None: 298 self._vms[name].set_machine(self.machine) 299 return self._vms[name] 300 301 def set_vm_arg(self, arg, value): 302 """ 303 Set an argument to list of extra arguments to be given to the QEMU 304 binary. If the argument already exists then its value is replaced. 305 306 :param arg: the QEMU argument, such as "-cpu" in "-cpu host" 307 :type arg: str 308 :param value: the argument value, such as "host" in "-cpu host" 309 :type value: str 310 """ 311 if not arg or not value: 312 return 313 if arg not in self.vm.args: 314 self.vm.args.extend([arg, value]) 315 else: 316 idx = self.vm.args.index(arg) + 1 317 if idx < len(self.vm.args): 318 self.vm.args[idx] = value 319 else: 320 self.vm.args.append(value) 321 322 def tearDown(self): 323 for vm in self._vms.values(): 324 vm.shutdown() 325 logging.getLogger('console').removeHandler(self._console_log_fh) 326 super().tearDown() 327