xref: /qemu/tests/functional/qemu_test/testcase.py (revision 527dede083d3e3e5a13ee996776926e0a0c4e258)
1# Test class and utilities for functional tests
2#
3# Copyright 2018, 2024 Red Hat, Inc.
4#
5# Original Author (Avocado-based tests):
6#  Cleber Rosa <crosa@redhat.com>
7#
8# Adaption for standalone version:
9#  Thomas Huth <thuth@redhat.com>
10#
11# This work is licensed under the terms of the GNU GPL, version 2 or
12# later.  See the COPYING file in the top-level directory.
13
14import logging
15import os
16from pathlib import Path
17import pycotap
18import shutil
19from subprocess import run
20import sys
21import tempfile
22import unittest
23import uuid
24
25from qemu.machine import QEMUMachine
26from qemu.utils import kvm_available, tcg_available
27
28from .archive import archive_extract
29from .asset import Asset
30from .config import BUILD_DIR, dso_suffix
31from .uncompress import uncompress
32
33
34class QemuBaseTest(unittest.TestCase):
35
36    '''
37    @params compressed: filename, Asset, or file-like object to uncompress
38    @params format: optional compression format (gzip, lzma)
39
40    Uncompresses @compressed into the scratch directory.
41
42    If @format is None, heuristics will be applied to guess the format
43    from the filename or Asset URL. @format must be non-None if @uncompressed
44    is a file-like object.
45
46    Returns the fully qualified path to the uncompressed file
47    '''
48    def uncompress(self, compressed, format=None):
49        self.log.debug(f"Uncompress {compressed} format={format}")
50        if type(compressed) == Asset:
51            compressed.fetch()
52
53        (name, ext) = os.path.splitext(str(compressed))
54        uncompressed = self.scratch_file(os.path.basename(name))
55
56        uncompress(compressed, uncompressed, format)
57
58        return uncompressed
59
60    '''
61    @params archive: filename, Asset, or file-like object to extract
62    @params format: optional archive format (tar, zip, deb, cpio)
63    @params sub_dir: optional sub-directory to extract into
64    @params member: optional member file to limit extraction to
65
66    Extracts @archive into the scratch directory, or a directory beneath
67    named by @sub_dir. All files are extracted unless @member specifies
68    a limit.
69
70    If @format is None, heuristics will be applied to guess the format
71    from the filename or Asset URL. @format must be non-None if @archive
72    is a file-like object.
73
74    If @member is non-None, returns the fully qualified path to @member
75    '''
76    def archive_extract(self, archive, format=None, sub_dir=None, member=None):
77        self.log.debug(f"Extract {archive} format={format}" +
78                       f"sub_dir={sub_dir} member={member}")
79        if type(archive) == Asset:
80            archive.fetch()
81        if sub_dir is None:
82            archive_extract(archive, self.scratch_file(), format, member)
83        else:
84            archive_extract(archive, self.scratch_file(sub_dir),
85                            format, member)
86
87        if member is not None:
88            return self.scratch_file(member)
89        return None
90
91    '''
92    Create a temporary directory suitable for storing UNIX
93    socket paths.
94
95    Returns: a tempfile.TemporaryDirectory instance
96    '''
97    def socket_dir(self):
98        if self.socketdir is None:
99            self.socketdir = tempfile.TemporaryDirectory(
100                prefix="qemu_func_test_sock_")
101        return self.socketdir
102
103    '''
104    @params args list of zero or more subdirectories or file
105
106    Construct a path for accessing a data file located
107    relative to the source directory that is the root for
108    functional tests.
109
110    @args may be an empty list to reference the root dir
111    itself, may be a single element to reference a file in
112    the root directory, or may be multiple elements to
113    reference a file nested below. The path components
114    will be joined using the platform appropriate path
115    separator.
116
117    Returns: string representing a file path
118    '''
119    def data_file(self, *args):
120        return str(Path(Path(__file__).parent.parent, *args))
121
122    '''
123    @params args list of zero or more subdirectories or file
124
125    Construct a path for accessing a data file located
126    relative to the build directory root.
127
128    @args may be an empty list to reference the build dir
129    itself, may be a single element to reference a file in
130    the build directory, or may be multiple elements to
131    reference a file nested below. The path components
132    will be joined using the platform appropriate path
133    separator.
134
135    Returns: string representing a file path
136    '''
137    def build_file(self, *args):
138        return str(Path(BUILD_DIR, *args))
139
140    '''
141    @params args list of zero or more subdirectories or file
142
143    Construct a path for accessing/creating a scratch file
144    located relative to a temporary directory dedicated to
145    this test case. The directory and its contents will be
146    purged upon completion of the test.
147
148    @args may be an empty list to reference the scratch dir
149    itself, may be a single element to reference a file in
150    the scratch directory, or may be multiple elements to
151    reference a file nested below. The path components
152    will be joined using the platform appropriate path
153    separator.
154
155    Returns: string representing a file path
156    '''
157    def scratch_file(self, *args):
158        return str(Path(self.workdir, *args))
159
160    '''
161    @params args list of zero or more subdirectories or file
162
163    Construct a path for accessing/creating a log file
164    located relative to a temporary directory dedicated to
165    this test case. The directory and its log files will be
166    preserved upon completion of the test.
167
168    @args may be an empty list to reference the log dir
169    itself, may be a single element to reference a file in
170    the log directory, or may be multiple elements to
171    reference a file nested below. The path components
172    will be joined using the platform appropriate path
173    separator.
174
175    Returns: string representing a file path
176    '''
177    def log_file(self, *args):
178        return str(Path(self.outputdir, *args))
179
180    '''
181    @params plugin name
182
183    Return the full path to the plugin taking into account any host OS
184    specific suffixes.
185    '''
186    def plugin_file(self, plugin_name):
187        sfx = dso_suffix()
188        return os.path.join('tests', 'tcg', 'plugins', f'{plugin_name}.{sfx}')
189
190    def assets_available(self):
191        for name, asset in vars(self.__class__).items():
192            if name.startswith("ASSET_") and type(asset) == Asset:
193                if not asset.available():
194                    self.log.debug(f"Asset {asset.url} not available")
195                    return False
196        return True
197
198    def setUp(self):
199        self.qemu_bin = os.getenv('QEMU_TEST_QEMU_BINARY')
200        self.assertIsNotNone(self.qemu_bin, 'QEMU_TEST_QEMU_BINARY must be set')
201        self.arch = self.qemu_bin.split('-')[-1]
202        self.socketdir = None
203
204        self.outputdir = self.build_file('tests', 'functional',
205                                         self.arch, self.id())
206        self.workdir = os.path.join(self.outputdir, 'scratch')
207        os.makedirs(self.workdir, exist_ok=True)
208
209        self.log_filename = self.log_file('base.log')
210        self.log = logging.getLogger('qemu-test')
211        self.log.setLevel(logging.DEBUG)
212        self._log_fh = logging.FileHandler(self.log_filename, mode='w')
213        self._log_fh.setLevel(logging.DEBUG)
214        fileFormatter = logging.Formatter(
215            '%(asctime)s - %(levelname)s: %(message)s')
216        self._log_fh.setFormatter(fileFormatter)
217        self.log.addHandler(self._log_fh)
218
219        # Capture QEMUMachine logging
220        self.machinelog = logging.getLogger('qemu.machine')
221        self.machinelog.setLevel(logging.DEBUG)
222        self.machinelog.addHandler(self._log_fh)
223
224        if not self.assets_available():
225            self.skipTest('One or more assets is not available')
226
227    def tearDown(self):
228        if "QEMU_TEST_KEEP_SCRATCH" not in os.environ:
229            shutil.rmtree(self.workdir)
230        if self.socketdir is not None:
231            shutil.rmtree(self.socketdir.name)
232            self.socketdir = None
233        self.machinelog.removeHandler(self._log_fh)
234        self.log.removeHandler(self._log_fh)
235
236    def main():
237        path = os.path.basename(sys.argv[0])[:-3]
238
239        cache = os.environ.get("QEMU_TEST_PRECACHE", None)
240        if cache is not None:
241            Asset.precache_suites(path, cache)
242            return
243
244        tr = pycotap.TAPTestRunner(message_log = pycotap.LogMode.LogToError,
245                                   test_output_log = pycotap.LogMode.LogToError)
246        res = unittest.main(module = None, testRunner = tr, exit = False,
247                            argv=["__dummy__", path])
248        for (test, message) in res.result.errors + res.result.failures:
249
250            if hasattr(test, "log_filename"):
251                print('More information on ' + test.id() + ' could be found here:'
252                      '\n %s' % test.log_filename, file=sys.stderr)
253                if hasattr(test, 'console_log_name'):
254                    print(' %s' % test.console_log_name, file=sys.stderr)
255        sys.exit(not res.result.wasSuccessful())
256
257
258class QemuUserTest(QemuBaseTest):
259
260    def setUp(self):
261        super().setUp()
262        self._ldpath = []
263
264    def add_ldpath(self, ldpath):
265        self._ldpath.append(os.path.abspath(ldpath))
266
267    def run_cmd(self, bin_path, args=[]):
268        return run([self.qemu_bin]
269                   + ["-L %s" % ldpath for ldpath in self._ldpath]
270                   + [bin_path]
271                   + args,
272                   text=True, capture_output=True)
273
274class QemuSystemTest(QemuBaseTest):
275    """Facilitates system emulation tests."""
276
277    cpu = None
278    machine = None
279    _machinehelp = None
280
281    def setUp(self):
282        self._vms = {}
283
284        super().setUp()
285
286        console_log = logging.getLogger('console')
287        console_log.setLevel(logging.DEBUG)
288        self.console_log_name = self.log_file('console.log')
289        self._console_log_fh = logging.FileHandler(self.console_log_name,
290                                                   mode='w')
291        self._console_log_fh.setLevel(logging.DEBUG)
292        fileFormatter = logging.Formatter('%(asctime)s: %(message)s')
293        self._console_log_fh.setFormatter(fileFormatter)
294        console_log.addHandler(self._console_log_fh)
295
296    def set_machine(self, machinename):
297        # TODO: We should use QMP to get the list of available machines
298        if not self._machinehelp:
299            self._machinehelp = run(
300                [self.qemu_bin, '-M', 'help'],
301                capture_output=True, check=True, encoding='utf8').stdout
302        if self._machinehelp.find(machinename) < 0:
303            self.skipTest('no support for machine ' + machinename)
304        self.machine = machinename
305
306    def require_accelerator(self, accelerator):
307        """
308        Requires an accelerator to be available for the test to continue
309
310        It takes into account the currently set qemu binary.
311
312        If the check fails, the test is canceled.  If the check itself
313        for the given accelerator is not available, the test is also
314        canceled.
315
316        :param accelerator: name of the accelerator, such as "kvm" or "tcg"
317        :type accelerator: str
318        """
319        checker = {'tcg': tcg_available,
320                   'kvm': kvm_available}.get(accelerator)
321        if checker is None:
322            self.skipTest("Don't know how to check for the presence "
323                          "of accelerator %s" % accelerator)
324        if not checker(qemu_bin=self.qemu_bin):
325            self.skipTest("%s accelerator does not seem to be "
326                          "available" % accelerator)
327
328    def require_netdev(self, netdevname):
329        help = run([self.qemu_bin,
330                    '-M', 'none', '-netdev', 'help'],
331                   capture_output=True, check=True, encoding='utf8').stdout;
332        if help.find('\n' + netdevname + '\n') < 0:
333            self.skipTest('no support for " + netdevname + " networking')
334
335    def require_device(self, devicename):
336        help = run([self.qemu_bin,
337                    '-M', 'none', '-device', 'help'],
338                   capture_output=True, check=True, encoding='utf8').stdout;
339        if help.find(devicename) < 0:
340            self.skipTest('no support for device ' + devicename)
341
342    def _new_vm(self, name, *args):
343        vm = QEMUMachine(self.qemu_bin,
344                         name=name,
345                         base_temp_dir=self.workdir,
346                         log_dir=self.log_file())
347        self.log.debug('QEMUMachine "%s" created', name)
348        self.log.debug('QEMUMachine "%s" temp_dir: %s', name, vm.temp_dir)
349
350        sockpath = os.environ.get("QEMU_TEST_QMP_BACKDOOR", None)
351        if sockpath is not None:
352            vm.add_args("-chardev",
353                        f"socket,id=backdoor,path={sockpath},server=on,wait=off",
354                        "-mon", "chardev=backdoor,mode=control")
355
356        if args:
357            vm.add_args(*args)
358        return vm
359
360    @property
361    def vm(self):
362        return self.get_vm(name='default')
363
364    def get_vm(self, *args, name=None):
365        if not name:
366            name = str(uuid.uuid4())
367        if self._vms.get(name) is None:
368            self._vms[name] = self._new_vm(name, *args)
369            if self.cpu is not None:
370                self._vms[name].add_args('-cpu', self.cpu)
371            if self.machine is not None:
372                self._vms[name].set_machine(self.machine)
373        return self._vms[name]
374
375    def set_vm_arg(self, arg, value):
376        """
377        Set an argument to list of extra arguments to be given to the QEMU
378        binary. If the argument already exists then its value is replaced.
379
380        :param arg: the QEMU argument, such as "-cpu" in "-cpu host"
381        :type arg: str
382        :param value: the argument value, such as "host" in "-cpu host"
383        :type value: str
384        """
385        if not arg or not value:
386            return
387        if arg not in self.vm.args:
388            self.vm.args.extend([arg, value])
389        else:
390            idx = self.vm.args.index(arg) + 1
391            if idx < len(self.vm.args):
392                self.vm.args[idx] = value
393            else:
394                self.vm.args.append(value)
395
396    def tearDown(self):
397        for vm in self._vms.values():
398            vm.shutdown()
399        logging.getLogger('console').removeHandler(self._console_log_fh)
400        super().tearDown()
401