xref: /qemu/tests/functional/qemu_test/testcase.py (revision cc3d262aa93a42e19c38f6acb6d0f6012a71eb4b)
1# Test class and utilities for functional tests
2#
3# Copyright 2018, 2024 Red Hat, Inc.
4#
5# Original Author (Avocado-based tests):
6#  Cleber Rosa <crosa@redhat.com>
7#
8# Adaption for standalone version:
9#  Thomas Huth <thuth@redhat.com>
10#
11# This work is licensed under the terms of the GNU GPL, version 2 or
12# later.  See the COPYING file in the top-level directory.
13
14import logging
15import os
16from pathlib import Path
17import pycotap
18import shutil
19from subprocess import run
20import sys
21import tempfile
22import unittest
23import uuid
24
25from qemu.machine import QEMUMachine
26from qemu.utils import kvm_available, tcg_available
27
28from .archive import archive_extract
29from .asset import Asset
30from .config import BUILD_DIR, dso_suffix
31from .uncompress import uncompress
32
33
34class QemuBaseTest(unittest.TestCase):
35
36    arch = None
37
38    workdir = None
39    log = None
40    logdir = None
41
42    '''
43    @params compressed: filename, Asset, or file-like object to uncompress
44    @params format: optional compression format (gzip, lzma)
45
46    Uncompresses @compressed into the scratch directory.
47
48    If @format is None, heuristics will be applied to guess the format
49    from the filename or Asset URL. @format must be non-None if @uncompressed
50    is a file-like object.
51
52    Returns the fully qualified path to the uncompressed file
53    '''
54    def uncompress(self, compressed, format=None):
55        self.log.debug(f"Uncompress {compressed} format={format}")
56        if type(compressed) == Asset:
57            compressed.fetch()
58
59        (name, ext) = os.path.splitext(str(compressed))
60        uncompressed = self.scratch_file(os.path.basename(name))
61
62        uncompress(compressed, uncompressed, format)
63
64        return uncompressed
65
66    '''
67    @params archive: filename, Asset, or file-like object to extract
68    @params format: optional archive format (tar, zip, deb, cpio)
69    @params sub_dir: optional sub-directory to extract into
70    @params member: optional member file to limit extraction to
71
72    Extracts @archive into the scratch directory, or a directory beneath
73    named by @sub_dir. All files are extracted unless @member specifies
74    a limit.
75
76    If @format is None, heuristics will be applied to guess the format
77    from the filename or Asset URL. @format must be non-None if @archive
78    is a file-like object.
79
80    If @member is non-None, returns the fully qualified path to @member
81    '''
82    def archive_extract(self, archive, format=None, sub_dir=None, member=None):
83        self.log.debug(f"Extract {archive} format={format}" +
84                       f"sub_dir={sub_dir} member={member}")
85        if type(archive) == Asset:
86            archive.fetch()
87        if sub_dir is None:
88            archive_extract(archive, self.scratch_file(), format, member)
89        else:
90            archive_extract(archive, self.scratch_file(sub_dir),
91                            format, member)
92
93        if member is not None:
94            return self.scratch_file(member)
95        return None
96
97    '''
98    Create a temporary directory suitable for storing UNIX
99    socket paths.
100
101    Returns: a tempfile.TemporaryDirectory instance
102    '''
103    def socket_dir(self):
104        if self.socketdir is None:
105            self.socketdir = tempfile.TemporaryDirectory(
106                prefix="qemu_func_test_sock_")
107        return self.socketdir
108
109    '''
110    @params args list of zero or more subdirectories or file
111
112    Construct a path for accessing a data file located
113    relative to the source directory that is the root for
114    functional tests.
115
116    @args may be an empty list to reference the root dir
117    itself, may be a single element to reference a file in
118    the root directory, or may be multiple elements to
119    reference a file nested below. The path components
120    will be joined using the platform appropriate path
121    separator.
122
123    Returns: string representing a file path
124    '''
125    def data_file(self, *args):
126        return str(Path(Path(__file__).parent.parent, *args))
127
128    '''
129    @params args list of zero or more subdirectories or file
130
131    Construct a path for accessing a data file located
132    relative to the build directory root.
133
134    @args may be an empty list to reference the build dir
135    itself, may be a single element to reference a file in
136    the build directory, or may be multiple elements to
137    reference a file nested below. The path components
138    will be joined using the platform appropriate path
139    separator.
140
141    Returns: string representing a file path
142    '''
143    def build_file(self, *args):
144        return str(Path(BUILD_DIR, *args))
145
146    '''
147    @params args list of zero or more subdirectories or file
148
149    Construct a path for accessing/creating a scratch file
150    located relative to a temporary directory dedicated to
151    this test case. The directory and its contents will be
152    purged upon completion of the test.
153
154    @args may be an empty list to reference the scratch dir
155    itself, may be a single element to reference a file in
156    the scratch directory, or may be multiple elements to
157    reference a file nested below. The path components
158    will be joined using the platform appropriate path
159    separator.
160
161    Returns: string representing a file path
162    '''
163    def scratch_file(self, *args):
164        return str(Path(self.workdir, *args))
165
166    '''
167    @params args list of zero or more subdirectories or file
168
169    Construct a path for accessing/creating a log file
170    located relative to a temporary directory dedicated to
171    this test case. The directory and its log files will be
172    preserved upon completion of the test.
173
174    @args may be an empty list to reference the log dir
175    itself, may be a single element to reference a file in
176    the log directory, or may be multiple elements to
177    reference a file nested below. The path components
178    will be joined using the platform appropriate path
179    separator.
180
181    Returns: string representing a file path
182    '''
183    def log_file(self, *args):
184        return str(Path(self.outputdir, *args))
185
186    '''
187    @params plugin name
188
189    Return the full path to the plugin taking into account any host OS
190    specific suffixes.
191    '''
192    def plugin_file(self, plugin_name):
193        sfx = dso_suffix()
194        return os.path.join('tests', 'tcg', 'plugins', f'{plugin_name}.{sfx}')
195
196    def assets_available(self):
197        for name, asset in vars(self.__class__).items():
198            if name.startswith("ASSET_") and type(asset) == Asset:
199                if not asset.available():
200                    self.log.debug(f"Asset {asset.url} not available")
201                    return False
202        return True
203
204    def setUp(self):
205        self.qemu_bin = os.getenv('QEMU_TEST_QEMU_BINARY')
206        self.assertIsNotNone(self.qemu_bin, 'QEMU_TEST_QEMU_BINARY must be set')
207        self.arch = self.qemu_bin.split('-')[-1]
208        self.socketdir = None
209
210        self.outputdir = self.build_file('tests', 'functional',
211                                         self.arch, self.id())
212        self.workdir = os.path.join(self.outputdir, 'scratch')
213        os.makedirs(self.workdir, exist_ok=True)
214
215        self.log_filename = self.log_file('base.log')
216        self.log = logging.getLogger('qemu-test')
217        self.log.setLevel(logging.DEBUG)
218        self._log_fh = logging.FileHandler(self.log_filename, mode='w')
219        self._log_fh.setLevel(logging.DEBUG)
220        fileFormatter = logging.Formatter(
221            '%(asctime)s - %(levelname)s: %(message)s')
222        self._log_fh.setFormatter(fileFormatter)
223        self.log.addHandler(self._log_fh)
224
225        # Capture QEMUMachine logging
226        self.machinelog = logging.getLogger('qemu.machine')
227        self.machinelog.setLevel(logging.DEBUG)
228        self.machinelog.addHandler(self._log_fh)
229
230        if not self.assets_available():
231            self.skipTest('One or more assets is not available')
232
233    def tearDown(self):
234        if "QEMU_TEST_KEEP_SCRATCH" not in os.environ:
235            shutil.rmtree(self.workdir)
236        if self.socketdir is not None:
237            shutil.rmtree(self.socketdir.name)
238            self.socketdir = None
239        self.machinelog.removeHandler(self._log_fh)
240        self.log.removeHandler(self._log_fh)
241
242    def main():
243        path = os.path.basename(sys.argv[0])[:-3]
244
245        cache = os.environ.get("QEMU_TEST_PRECACHE", None)
246        if cache is not None:
247            Asset.precache_suites(path, cache)
248            return
249
250        tr = pycotap.TAPTestRunner(message_log = pycotap.LogMode.LogToError,
251                                   test_output_log = pycotap.LogMode.LogToError)
252        res = unittest.main(module = None, testRunner = tr, exit = False,
253                            argv=["__dummy__", path])
254        for (test, message) in res.result.errors + res.result.failures:
255
256            if hasattr(test, "log_filename"):
257                print('More information on ' + test.id() + ' could be found here:'
258                      '\n %s' % test.log_filename, file=sys.stderr)
259                if hasattr(test, 'console_log_name'):
260                    print(' %s' % test.console_log_name, file=sys.stderr)
261        sys.exit(not res.result.wasSuccessful())
262
263
264class QemuUserTest(QemuBaseTest):
265
266    def setUp(self):
267        super().setUp()
268        self._ldpath = []
269
270    def add_ldpath(self, ldpath):
271        self._ldpath.append(os.path.abspath(ldpath))
272
273    def run_cmd(self, bin_path, args=[]):
274        return run([self.qemu_bin]
275                   + ["-L %s" % ldpath for ldpath in self._ldpath]
276                   + [bin_path]
277                   + args,
278                   text=True, capture_output=True)
279
280class QemuSystemTest(QemuBaseTest):
281    """Facilitates system emulation tests."""
282
283    cpu = None
284    machine = None
285    _machinehelp = None
286
287    def setUp(self):
288        self._vms = {}
289
290        super().setUp()
291
292        console_log = logging.getLogger('console')
293        console_log.setLevel(logging.DEBUG)
294        self.console_log_name = self.log_file('console.log')
295        self._console_log_fh = logging.FileHandler(self.console_log_name,
296                                                   mode='w')
297        self._console_log_fh.setLevel(logging.DEBUG)
298        fileFormatter = logging.Formatter('%(asctime)s: %(message)s')
299        self._console_log_fh.setFormatter(fileFormatter)
300        console_log.addHandler(self._console_log_fh)
301
302    def set_machine(self, machinename):
303        # TODO: We should use QMP to get the list of available machines
304        if not self._machinehelp:
305            self._machinehelp = run(
306                [self.qemu_bin, '-M', 'help'],
307                capture_output=True, check=True, encoding='utf8').stdout
308        if self._machinehelp.find(machinename) < 0:
309            self.skipTest('no support for machine ' + machinename)
310        self.machine = machinename
311
312    def require_accelerator(self, accelerator):
313        """
314        Requires an accelerator to be available for the test to continue
315
316        It takes into account the currently set qemu binary.
317
318        If the check fails, the test is canceled.  If the check itself
319        for the given accelerator is not available, the test is also
320        canceled.
321
322        :param accelerator: name of the accelerator, such as "kvm" or "tcg"
323        :type accelerator: str
324        """
325        checker = {'tcg': tcg_available,
326                   'kvm': kvm_available}.get(accelerator)
327        if checker is None:
328            self.skipTest("Don't know how to check for the presence "
329                          "of accelerator %s" % accelerator)
330        if not checker(qemu_bin=self.qemu_bin):
331            self.skipTest("%s accelerator does not seem to be "
332                          "available" % accelerator)
333
334    def require_netdev(self, netdevname):
335        help = run([self.qemu_bin,
336                    '-M', 'none', '-netdev', 'help'],
337                   capture_output=True, check=True, encoding='utf8').stdout;
338        if help.find('\n' + netdevname + '\n') < 0:
339            self.skipTest('no support for " + netdevname + " networking')
340
341    def require_device(self, devicename):
342        help = run([self.qemu_bin,
343                    '-M', 'none', '-device', 'help'],
344                   capture_output=True, check=True, encoding='utf8').stdout;
345        if help.find(devicename) < 0:
346            self.skipTest('no support for device ' + devicename)
347
348    def _new_vm(self, name, *args):
349        vm = QEMUMachine(self.qemu_bin,
350                         name=name,
351                         base_temp_dir=self.workdir,
352                         log_dir=self.log_file())
353        self.log.debug('QEMUMachine "%s" created', name)
354        self.log.debug('QEMUMachine "%s" temp_dir: %s', name, vm.temp_dir)
355
356        sockpath = os.environ.get("QEMU_TEST_QMP_BACKDOOR", None)
357        if sockpath is not None:
358            vm.add_args("-chardev",
359                        f"socket,id=backdoor,path={sockpath},server=on,wait=off",
360                        "-mon", "chardev=backdoor,mode=control")
361
362        if args:
363            vm.add_args(*args)
364        return vm
365
366    @property
367    def vm(self):
368        return self.get_vm(name='default')
369
370    def get_vm(self, *args, name=None):
371        if not name:
372            name = str(uuid.uuid4())
373        if self._vms.get(name) is None:
374            self._vms[name] = self._new_vm(name, *args)
375            if self.cpu is not None:
376                self._vms[name].add_args('-cpu', self.cpu)
377            if self.machine is not None:
378                self._vms[name].set_machine(self.machine)
379        return self._vms[name]
380
381    def set_vm_arg(self, arg, value):
382        """
383        Set an argument to list of extra arguments to be given to the QEMU
384        binary. If the argument already exists then its value is replaced.
385
386        :param arg: the QEMU argument, such as "-cpu" in "-cpu host"
387        :type arg: str
388        :param value: the argument value, such as "host" in "-cpu host"
389        :type value: str
390        """
391        if not arg or not value:
392            return
393        if arg not in self.vm.args:
394            self.vm.args.extend([arg, value])
395        else:
396            idx = self.vm.args.index(arg) + 1
397            if idx < len(self.vm.args):
398                self.vm.args[idx] = value
399            else:
400                self.vm.args.append(value)
401
402    def tearDown(self):
403        for vm in self._vms.values():
404            vm.shutdown()
405        logging.getLogger('console').removeHandler(self._console_log_fh)
406        super().tearDown()
407