xref: /qemu/tests/functional/qemu_test/testcase.py (revision 537600df6141640d411cda5deea742081d2f9962)
1# Test class and utilities for functional tests
2#
3# Copyright 2018, 2024 Red Hat, Inc.
4#
5# Original Author (Avocado-based tests):
6#  Cleber Rosa <crosa@redhat.com>
7#
8# Adaption for standalone version:
9#  Thomas Huth <thuth@redhat.com>
10#
11# This work is licensed under the terms of the GNU GPL, version 2 or
12# later.  See the COPYING file in the top-level directory.
13
14import logging
15import os
16from pathlib import Path
17import pycotap
18import shutil
19from subprocess import run
20import sys
21import tempfile
22import unittest
23import uuid
24
25from qemu.machine import QEMUMachine
26from qemu.utils import kvm_available, tcg_available
27
28from .archive import archive_extract
29from .asset import Asset
30from .config import BUILD_DIR
31from .uncompress import uncompress
32
33
34class QemuBaseTest(unittest.TestCase):
35
36    qemu_bin = os.getenv('QEMU_TEST_QEMU_BINARY')
37    arch = None
38
39    workdir = None
40    log = None
41    logdir = None
42
43    '''
44    @params compressed: filename, Asset, or file-like object to uncompress
45    @params format: optional compression format (gzip, lzma)
46
47    Uncompresses @compressed into the scratch directory.
48
49    If @format is None, heuristics will be applied to guess the format
50    from the filename or Asset URL. @format must be non-None if @uncompressed
51    is a file-like object.
52
53    Returns the fully qualified path to the uncompressed file
54    '''
55    def uncompress(self, compressed, format=None):
56        self.log.debug(f"Uncompress {compressed} format={format}")
57        if type(compressed) == Asset:
58            compressed.fetch()
59
60        (name, ext) = os.path.splitext(str(compressed))
61        uncompressed = self.scratch_file(os.path.basename(name))
62
63        uncompress(compressed, uncompressed, format)
64
65        return uncompressed
66
67    '''
68    @params archive: filename, Asset, or file-like object to extract
69    @params format: optional archive format (tar, zip, deb, cpio)
70    @params sub_dir: optional sub-directory to extract into
71    @params member: optional member file to limit extraction to
72
73    Extracts @archive into the scratch directory, or a directory beneath
74    named by @sub_dir. All files are extracted unless @member specifies
75    a limit.
76
77    If @format is None, heuristics will be applied to guess the format
78    from the filename or Asset URL. @format must be non-None if @archive
79    is a file-like object.
80
81    If @member is non-None, returns the fully qualified path to @member
82    '''
83    def archive_extract(self, archive, format=None, sub_dir=None, member=None):
84        self.log.debug(f"Extract {archive} format={format}" +
85                       f"sub_dir={sub_dir} member={member}")
86        if type(archive) == Asset:
87            archive.fetch()
88        if sub_dir is None:
89            archive_extract(archive, self.scratch_file(), format, member)
90        else:
91            archive_extract(archive, self.scratch_file(sub_dir),
92                            format, member)
93
94        if member is not None:
95            return self.scratch_file(member)
96        return None
97
98    '''
99    Create a temporary directory suitable for storing UNIX
100    socket paths.
101
102    Returns: a tempfile.TemporaryDirectory instance
103    '''
104    def socket_dir(self):
105        if self.socketdir is None:
106            self.socketdir = tempfile.TemporaryDirectory(
107                prefix="qemu_func_test_sock_")
108        return self.socketdir
109
110    '''
111    @params args list of zero or more subdirectories or file
112
113    Construct a path for accessing a data file located
114    relative to the source directory that is the root for
115    functional tests.
116
117    @args may be an empty list to reference the root dir
118    itself, may be a single element to reference a file in
119    the root directory, or may be multiple elements to
120    reference a file nested below. The path components
121    will be joined using the platform appropriate path
122    separator.
123
124    Returns: string representing a file path
125    '''
126    def data_file(self, *args):
127        return str(Path(Path(__file__).parent.parent, *args))
128
129    '''
130    @params args list of zero or more subdirectories or file
131
132    Construct a path for accessing a data file located
133    relative to the build directory root.
134
135    @args may be an empty list to reference the build dir
136    itself, may be a single element to reference a file in
137    the build directory, or may be multiple elements to
138    reference a file nested below. The path components
139    will be joined using the platform appropriate path
140    separator.
141
142    Returns: string representing a file path
143    '''
144    def build_file(self, *args):
145        return str(Path(BUILD_DIR, *args))
146
147    '''
148    @params args list of zero or more subdirectories or file
149
150    Construct a path for accessing/creating a scratch file
151    located relative to a temporary directory dedicated to
152    this test case. The directory and its contents will be
153    purged upon completion of the test.
154
155    @args may be an empty list to reference the scratch dir
156    itself, may be a single element to reference a file in
157    the scratch directory, or may be multiple elements to
158    reference a file nested below. The path components
159    will be joined using the platform appropriate path
160    separator.
161
162    Returns: string representing a file path
163    '''
164    def scratch_file(self, *args):
165        return str(Path(self.workdir, *args))
166
167    '''
168    @params args list of zero or more subdirectories or file
169
170    Construct a path for accessing/creating a log file
171    located relative to a temporary directory dedicated to
172    this test case. The directory and its log files will be
173    preserved upon completion of the test.
174
175    @args may be an empty list to reference the log dir
176    itself, may be a single element to reference a file in
177    the log directory, or may be multiple elements to
178    reference a file nested below. The path components
179    will be joined using the platform appropriate path
180    separator.
181
182    Returns: string representing a file path
183    '''
184    def log_file(self, *args):
185        return str(Path(self.outputdir, *args))
186
187    def assets_available(self):
188        for name, asset in vars(self.__class__).items():
189            if name.startswith("ASSET_") and type(asset) == Asset:
190                if not asset.available():
191                    self.log.debug(f"Asset {asset.url} not available")
192                    return False
193        return True
194
195    def setUp(self, bin_prefix):
196        self.assertIsNotNone(self.qemu_bin, 'QEMU_TEST_QEMU_BINARY must be set')
197        self.arch = self.qemu_bin.split('-')[-1]
198        self.socketdir = None
199
200        self.outputdir = self.build_file('tests', 'functional',
201                                         self.arch, self.id())
202        self.workdir = os.path.join(self.outputdir, 'scratch')
203        os.makedirs(self.workdir, exist_ok=True)
204
205        self.log_filename = self.log_file('base.log')
206        self.log = logging.getLogger('qemu-test')
207        self.log.setLevel(logging.DEBUG)
208        self._log_fh = logging.FileHandler(self.log_filename, mode='w')
209        self._log_fh.setLevel(logging.DEBUG)
210        fileFormatter = logging.Formatter(
211            '%(asctime)s - %(levelname)s: %(message)s')
212        self._log_fh.setFormatter(fileFormatter)
213        self.log.addHandler(self._log_fh)
214
215        # Capture QEMUMachine logging
216        self.machinelog = logging.getLogger('qemu.machine')
217        self.machinelog.setLevel(logging.DEBUG)
218        self.machinelog.addHandler(self._log_fh)
219
220        if not self.assets_available():
221            self.skipTest('One or more assets is not available')
222
223    def tearDown(self):
224        if "QEMU_TEST_KEEP_SCRATCH" not in os.environ:
225            shutil.rmtree(self.workdir)
226        if self.socketdir is not None:
227            shutil.rmtree(self.socketdir.name)
228            self.socketdir = None
229        self.machinelog.removeHandler(self._log_fh)
230        self.log.removeHandler(self._log_fh)
231
232    def main():
233        path = os.path.basename(sys.argv[0])[:-3]
234
235        cache = os.environ.get("QEMU_TEST_PRECACHE", None)
236        if cache is not None:
237            Asset.precache_suites(path, cache)
238            return
239
240        tr = pycotap.TAPTestRunner(message_log = pycotap.LogMode.LogToError,
241                                   test_output_log = pycotap.LogMode.LogToError)
242        res = unittest.main(module = None, testRunner = tr, exit = False,
243                            argv=["__dummy__", path])
244        for (test, message) in res.result.errors + res.result.failures:
245
246            if hasattr(test, "log_filename"):
247                print('More information on ' + test.id() + ' could be found here:'
248                      '\n %s' % test.log_filename, file=sys.stderr)
249                if hasattr(test, 'console_log_name'):
250                    print(' %s' % test.console_log_name, file=sys.stderr)
251        sys.exit(not res.result.wasSuccessful())
252
253
254class QemuUserTest(QemuBaseTest):
255
256    def setUp(self):
257        super().setUp('qemu-')
258        self._ldpath = []
259
260    def add_ldpath(self, ldpath):
261        self._ldpath.append(os.path.abspath(ldpath))
262
263    def run_cmd(self, bin_path, args=[]):
264        return run([self.qemu_bin]
265                   + ["-L %s" % ldpath for ldpath in self._ldpath]
266                   + [bin_path]
267                   + args,
268                   text=True, capture_output=True)
269
270class QemuSystemTest(QemuBaseTest):
271    """Facilitates system emulation tests."""
272
273    cpu = None
274    machine = None
275    _machinehelp = None
276
277    def setUp(self):
278        self._vms = {}
279
280        super().setUp('qemu-system-')
281
282        console_log = logging.getLogger('console')
283        console_log.setLevel(logging.DEBUG)
284        self.console_log_name = self.log_file('console.log')
285        self._console_log_fh = logging.FileHandler(self.console_log_name,
286                                                   mode='w')
287        self._console_log_fh.setLevel(logging.DEBUG)
288        fileFormatter = logging.Formatter('%(asctime)s: %(message)s')
289        self._console_log_fh.setFormatter(fileFormatter)
290        console_log.addHandler(self._console_log_fh)
291
292    def set_machine(self, machinename):
293        # TODO: We should use QMP to get the list of available machines
294        if not self._machinehelp:
295            self._machinehelp = run(
296                [self.qemu_bin, '-M', 'help'],
297                capture_output=True, check=True, encoding='utf8').stdout
298        if self._machinehelp.find(machinename) < 0:
299            self.skipTest('no support for machine ' + machinename)
300        self.machine = machinename
301
302    def require_accelerator(self, accelerator):
303        """
304        Requires an accelerator to be available for the test to continue
305
306        It takes into account the currently set qemu binary.
307
308        If the check fails, the test is canceled.  If the check itself
309        for the given accelerator is not available, the test is also
310        canceled.
311
312        :param accelerator: name of the accelerator, such as "kvm" or "tcg"
313        :type accelerator: str
314        """
315        checker = {'tcg': tcg_available,
316                   'kvm': kvm_available}.get(accelerator)
317        if checker is None:
318            self.skipTest("Don't know how to check for the presence "
319                          "of accelerator %s" % accelerator)
320        if not checker(qemu_bin=self.qemu_bin):
321            self.skipTest("%s accelerator does not seem to be "
322                          "available" % accelerator)
323
324    def require_netdev(self, netdevname):
325        help = run([self.qemu_bin,
326                    '-M', 'none', '-netdev', 'help'],
327                   capture_output=True, check=True, encoding='utf8').stdout;
328        if help.find('\n' + netdevname + '\n') < 0:
329            self.skipTest('no support for " + netdevname + " networking')
330
331    def require_device(self, devicename):
332        help = run([self.qemu_bin,
333                    '-M', 'none', '-device', 'help'],
334                   capture_output=True, check=True, encoding='utf8').stdout;
335        if help.find(devicename) < 0:
336            self.skipTest('no support for device ' + devicename)
337
338    def _new_vm(self, name, *args):
339        vm = QEMUMachine(self.qemu_bin,
340                         name=name,
341                         base_temp_dir=self.workdir,
342                         log_dir=self.log_file())
343        self.log.debug('QEMUMachine "%s" created', name)
344        self.log.debug('QEMUMachine "%s" temp_dir: %s', name, vm.temp_dir)
345
346        sockpath = os.environ.get("QEMU_TEST_QMP_BACKDOOR", None)
347        if sockpath is not None:
348            vm.add_args("-chardev",
349                        f"socket,id=backdoor,path={sockpath},server=on,wait=off",
350                        "-mon", "chardev=backdoor,mode=control")
351
352        if args:
353            vm.add_args(*args)
354        return vm
355
356    @property
357    def vm(self):
358        return self.get_vm(name='default')
359
360    def get_vm(self, *args, name=None):
361        if not name:
362            name = str(uuid.uuid4())
363        if self._vms.get(name) is None:
364            self._vms[name] = self._new_vm(name, *args)
365            if self.cpu is not None:
366                self._vms[name].add_args('-cpu', self.cpu)
367            if self.machine is not None:
368                self._vms[name].set_machine(self.machine)
369        return self._vms[name]
370
371    def set_vm_arg(self, arg, value):
372        """
373        Set an argument to list of extra arguments to be given to the QEMU
374        binary. If the argument already exists then its value is replaced.
375
376        :param arg: the QEMU argument, such as "-cpu" in "-cpu host"
377        :type arg: str
378        :param value: the argument value, such as "host" in "-cpu host"
379        :type value: str
380        """
381        if not arg or not value:
382            return
383        if arg not in self.vm.args:
384            self.vm.args.extend([arg, value])
385        else:
386            idx = self.vm.args.index(arg) + 1
387            if idx < len(self.vm.args):
388                self.vm.args[idx] = value
389            else:
390                self.vm.args.append(value)
391
392    def tearDown(self):
393        for vm in self._vms.values():
394            vm.shutdown()
395        logging.getLogger('console').removeHandler(self._console_log_fh)
396        super().tearDown()
397