xref: /qemu/tests/functional/qemu_test/testcase.py (revision 7698afc42b5af9e55f12ab2236618e38e5a1c23f)
1# Test class and utilities for functional tests
2#
3# Copyright 2018, 2024 Red Hat, Inc.
4#
5# Original Author (Avocado-based tests):
6#  Cleber Rosa <crosa@redhat.com>
7#
8# Adaption for standalone version:
9#  Thomas Huth <thuth@redhat.com>
10#
11# This work is licensed under the terms of the GNU GPL, version 2 or
12# later.  See the COPYING file in the top-level directory.
13
14import logging
15import os
16from pathlib import Path
17import pycotap
18import shutil
19from subprocess import run
20import sys
21import tempfile
22import unittest
23import uuid
24
25from qemu.machine import QEMUMachine
26from qemu.utils import hvf_available, kvm_available, tcg_available
27
28from .archive import archive_extract
29from .asset import Asset
30from .config import BUILD_DIR, dso_suffix
31from .uncompress import uncompress
32
33
34class QemuBaseTest(unittest.TestCase):
35
36    '''
37    @params compressed: filename, Asset, or file-like object to uncompress
38    @params format: optional compression format (gzip, lzma)
39
40    Uncompresses @compressed into the scratch directory.
41
42    If @format is None, heuristics will be applied to guess the format
43    from the filename or Asset URL. @format must be non-None if @uncompressed
44    is a file-like object.
45
46    Returns the fully qualified path to the uncompressed file
47    '''
48    def uncompress(self, compressed, format=None):
49        self.log.debug(f"Uncompress {compressed} format={format}")
50        if type(compressed) == Asset:
51            compressed.fetch()
52
53        (name, ext) = os.path.splitext(str(compressed))
54        uncompressed = self.scratch_file(os.path.basename(name))
55
56        uncompress(compressed, uncompressed, format)
57
58        return uncompressed
59
60    '''
61    @params archive: filename, Asset, or file-like object to extract
62    @params format: optional archive format (tar, zip, deb, cpio)
63    @params sub_dir: optional sub-directory to extract into
64    @params member: optional member file to limit extraction to
65
66    Extracts @archive into the scratch directory, or a directory beneath
67    named by @sub_dir. All files are extracted unless @member specifies
68    a limit.
69
70    If @format is None, heuristics will be applied to guess the format
71    from the filename or Asset URL. @format must be non-None if @archive
72    is a file-like object.
73
74    If @member is non-None, returns the fully qualified path to @member
75    '''
76    def archive_extract(self, archive, format=None, sub_dir=None, member=None):
77        self.log.debug(f"Extract {archive} format={format}" +
78                       f"sub_dir={sub_dir} member={member}")
79        if type(archive) == Asset:
80            archive.fetch()
81        if sub_dir is None:
82            archive_extract(archive, self.scratch_file(), format, member)
83        else:
84            archive_extract(archive, self.scratch_file(sub_dir),
85                            format, member)
86
87        if member is not None:
88            return self.scratch_file(member)
89        return None
90
91    '''
92    Create a temporary directory suitable for storing UNIX
93    socket paths.
94
95    Returns: a tempfile.TemporaryDirectory instance
96    '''
97    def socket_dir(self):
98        if self.socketdir is None:
99            self.socketdir = tempfile.TemporaryDirectory(
100                prefix="qemu_func_test_sock_")
101        return self.socketdir
102
103    '''
104    @params args list of zero or more subdirectories or file
105
106    Construct a path for accessing a data file located
107    relative to the source directory that is the root for
108    functional tests.
109
110    @args may be an empty list to reference the root dir
111    itself, may be a single element to reference a file in
112    the root directory, or may be multiple elements to
113    reference a file nested below. The path components
114    will be joined using the platform appropriate path
115    separator.
116
117    Returns: string representing a file path
118    '''
119    def data_file(self, *args):
120        return str(Path(Path(__file__).parent.parent, *args))
121
122    '''
123    @params args list of zero or more subdirectories or file
124
125    Construct a path for accessing a data file located
126    relative to the build directory root.
127
128    @args may be an empty list to reference the build dir
129    itself, may be a single element to reference a file in
130    the build directory, or may be multiple elements to
131    reference a file nested below. The path components
132    will be joined using the platform appropriate path
133    separator.
134
135    Returns: string representing a file path
136    '''
137    def build_file(self, *args):
138        return str(Path(BUILD_DIR, *args))
139
140    '''
141    @params args list of zero or more subdirectories or file
142
143    Construct a path for accessing/creating a scratch file
144    located relative to a temporary directory dedicated to
145    this test case. The directory and its contents will be
146    purged upon completion of the test.
147
148    @args may be an empty list to reference the scratch dir
149    itself, may be a single element to reference a file in
150    the scratch directory, or may be multiple elements to
151    reference a file nested below. The path components
152    will be joined using the platform appropriate path
153    separator.
154
155    Returns: string representing a file path
156    '''
157    def scratch_file(self, *args):
158        return str(Path(self.workdir, *args))
159
160    '''
161    @params args list of zero or more subdirectories or file
162
163    Construct a path for accessing/creating a log file
164    located relative to a temporary directory dedicated to
165    this test case. The directory and its log files will be
166    preserved upon completion of the test.
167
168    @args may be an empty list to reference the log dir
169    itself, may be a single element to reference a file in
170    the log directory, or may be multiple elements to
171    reference a file nested below. The path components
172    will be joined using the platform appropriate path
173    separator.
174
175    Returns: string representing a file path
176    '''
177    def log_file(self, *args):
178        return str(Path(self.outputdir, *args))
179
180    '''
181    @params plugin name
182
183    Return the full path to the plugin taking into account any host OS
184    specific suffixes.
185    '''
186    def plugin_file(self, plugin_name):
187        sfx = dso_suffix()
188        return os.path.join('tests', 'tcg', 'plugins', f'{plugin_name}.{sfx}')
189
190    def assets_available(self):
191        for name, asset in vars(self.__class__).items():
192            if name.startswith("ASSET_") and type(asset) == Asset:
193                if not asset.available():
194                    self.log.debug(f"Asset {asset.url} not available")
195                    return False
196        return True
197
198    def setUp(self):
199        self.qemu_bin = os.getenv('QEMU_TEST_QEMU_BINARY')
200        self.assertIsNotNone(self.qemu_bin, 'QEMU_TEST_QEMU_BINARY must be set')
201        self.arch = self.qemu_bin.split('-')[-1]
202        self.socketdir = None
203
204        self.outputdir = self.build_file('tests', 'functional',
205                                         self.arch, self.id())
206        self.workdir = os.path.join(self.outputdir, 'scratch')
207        os.makedirs(self.workdir, exist_ok=True)
208
209        self.log_filename = self.log_file('base.log')
210        self.log = logging.getLogger('qemu-test')
211        self.log.setLevel(logging.DEBUG)
212        self._log_fh = logging.FileHandler(self.log_filename, mode='w')
213        self._log_fh.setLevel(logging.DEBUG)
214        fileFormatter = logging.Formatter(
215            '%(asctime)s - %(levelname)s: %(message)s')
216        self._log_fh.setFormatter(fileFormatter)
217        self.log.addHandler(self._log_fh)
218
219        # Capture QEMUMachine logging
220        self.machinelog = logging.getLogger('qemu.machine')
221        self.machinelog.setLevel(logging.DEBUG)
222        self.machinelog.addHandler(self._log_fh)
223
224        if not self.assets_available():
225            self.skipTest('One or more assets is not available')
226
227    def tearDown(self):
228        if "QEMU_TEST_KEEP_SCRATCH" not in os.environ:
229            shutil.rmtree(self.workdir)
230        if self.socketdir is not None:
231            shutil.rmtree(self.socketdir.name)
232            self.socketdir = None
233        self.machinelog.removeHandler(self._log_fh)
234        self.log.removeHandler(self._log_fh)
235
236    def main():
237        path = os.path.basename(sys.argv[0])[:-3]
238
239        cache = os.environ.get("QEMU_TEST_PRECACHE", None)
240        if cache is not None:
241            Asset.precache_suites(path, cache)
242            return
243
244        tr = pycotap.TAPTestRunner(message_log = pycotap.LogMode.LogToError,
245                                   test_output_log = pycotap.LogMode.LogToError)
246        res = unittest.main(module = None, testRunner = tr, exit = False,
247                            argv=["__dummy__", path])
248        for (test, message) in res.result.errors + res.result.failures:
249
250            if hasattr(test, "log_filename"):
251                print('More information on ' + test.id() + ' could be found here:'
252                      '\n %s' % test.log_filename, file=sys.stderr)
253                if hasattr(test, 'console_log_name'):
254                    print(' %s' % test.console_log_name, file=sys.stderr)
255        sys.exit(not res.result.wasSuccessful())
256
257
258class QemuUserTest(QemuBaseTest):
259
260    def setUp(self):
261        super().setUp()
262        self._ldpath = []
263
264    def add_ldpath(self, ldpath):
265        self._ldpath.append(os.path.abspath(ldpath))
266
267    def run_cmd(self, bin_path, args=[]):
268        return run([self.qemu_bin]
269                   + ["-L %s" % ldpath for ldpath in self._ldpath]
270                   + [bin_path]
271                   + args,
272                   text=True, capture_output=True)
273
274class QemuSystemTest(QemuBaseTest):
275    """Facilitates system emulation tests."""
276
277    cpu = None
278    machine = None
279    _machinehelp = None
280
281    def setUp(self):
282        self._vms = {}
283
284        super().setUp()
285
286        console_log = logging.getLogger('console')
287        console_log.setLevel(logging.DEBUG)
288        self.console_log_name = self.log_file('console.log')
289        self._console_log_fh = logging.FileHandler(self.console_log_name,
290                                                   mode='w')
291        self._console_log_fh.setLevel(logging.DEBUG)
292        fileFormatter = logging.Formatter('%(asctime)s: %(message)s')
293        self._console_log_fh.setFormatter(fileFormatter)
294        console_log.addHandler(self._console_log_fh)
295
296    def set_machine(self, machinename):
297        # TODO: We should use QMP to get the list of available machines
298        if not self._machinehelp:
299            self._machinehelp = run(
300                [self.qemu_bin, '-M', 'help'],
301                capture_output=True, check=True, encoding='utf8').stdout
302        if self._machinehelp.find(machinename) < 0:
303            self.skipTest('no support for machine ' + machinename)
304        self.machine = machinename
305
306    def require_accelerator(self, accelerator):
307        """
308        Requires an accelerator to be available for the test to continue
309
310        It takes into account the currently set qemu binary.
311
312        If the check fails, the test is canceled.  If the check itself
313        for the given accelerator is not available, the test is also
314        canceled.
315
316        :param accelerator: name of the accelerator, such as "kvm" or "tcg"
317        :type accelerator: str
318        """
319        checker = {'tcg': tcg_available,
320                   'kvm': kvm_available,
321                   'hvf': hvf_available,
322                  }.get(accelerator)
323        if checker is None:
324            self.skipTest("Don't know how to check for the presence "
325                          "of accelerator %s" % accelerator)
326        if not checker(qemu_bin=self.qemu_bin):
327            self.skipTest("%s accelerator does not seem to be "
328                          "available" % accelerator)
329
330    def require_netdev(self, netdevname):
331        help = run([self.qemu_bin,
332                    '-M', 'none', '-netdev', 'help'],
333                   capture_output=True, check=True, encoding='utf8').stdout;
334        if help.find('\n' + netdevname + '\n') < 0:
335            self.skipTest('no support for " + netdevname + " networking')
336
337    def require_device(self, devicename):
338        help = run([self.qemu_bin,
339                    '-M', 'none', '-device', 'help'],
340                   capture_output=True, check=True, encoding='utf8').stdout;
341        if help.find(devicename) < 0:
342            self.skipTest('no support for device ' + devicename)
343
344    def _new_vm(self, name, *args):
345        vm = QEMUMachine(self.qemu_bin,
346                         name=name,
347                         base_temp_dir=self.workdir,
348                         log_dir=self.log_file())
349        self.log.debug('QEMUMachine "%s" created', name)
350        self.log.debug('QEMUMachine "%s" temp_dir: %s', name, vm.temp_dir)
351
352        sockpath = os.environ.get("QEMU_TEST_QMP_BACKDOOR", None)
353        if sockpath is not None:
354            vm.add_args("-chardev",
355                        f"socket,id=backdoor,path={sockpath},server=on,wait=off",
356                        "-mon", "chardev=backdoor,mode=control")
357
358        if args:
359            vm.add_args(*args)
360        return vm
361
362    @property
363    def vm(self):
364        return self.get_vm(name='default')
365
366    def get_vm(self, *args, name=None):
367        if not name:
368            name = str(uuid.uuid4())
369        if self._vms.get(name) is None:
370            self._vms[name] = self._new_vm(name, *args)
371            if self.cpu is not None:
372                self._vms[name].add_args('-cpu', self.cpu)
373            if self.machine is not None:
374                self._vms[name].set_machine(self.machine)
375        return self._vms[name]
376
377    def set_vm_arg(self, arg, value):
378        """
379        Set an argument to list of extra arguments to be given to the QEMU
380        binary. If the argument already exists then its value is replaced.
381
382        :param arg: the QEMU argument, such as "-cpu" in "-cpu host"
383        :type arg: str
384        :param value: the argument value, such as "host" in "-cpu host"
385        :type value: str
386        """
387        if not arg or not value:
388            return
389        if arg not in self.vm.args:
390            self.vm.args.extend([arg, value])
391        else:
392            idx = self.vm.args.index(arg) + 1
393            if idx < len(self.vm.args):
394                self.vm.args[idx] = value
395            else:
396                self.vm.args.append(value)
397
398    def tearDown(self):
399        for vm in self._vms.values():
400            vm.shutdown()
401        logging.getLogger('console').removeHandler(self._console_log_fh)
402        super().tearDown()
403