xref: /qemu/tests/functional/qemu_test/testcase.py (revision 4a119cfc6cd7affc07d4b76c1340cf96b6ff0268)
1# Test class and utilities for functional tests
2#
3# Copyright 2018, 2024 Red Hat, Inc.
4#
5# Original Author (Avocado-based tests):
6#  Cleber Rosa <crosa@redhat.com>
7#
8# Adaption for standalone version:
9#  Thomas Huth <thuth@redhat.com>
10#
11# This work is licensed under the terms of the GNU GPL, version 2 or
12# later.  See the COPYING file in the top-level directory.
13
14import logging
15import os
16from pathlib import Path
17import pycotap
18import shutil
19from subprocess import run
20import sys
21import tempfile
22import unittest
23import uuid
24
25from qemu.machine import QEMUMachine
26from qemu.utils import kvm_available, tcg_available
27
28from .archive import archive_extract
29from .asset import Asset
30from .config import BUILD_DIR
31from .uncompress import uncompress
32
33
34class QemuBaseTest(unittest.TestCase):
35
36    arch = None
37
38    workdir = None
39    log = None
40    logdir = None
41
42    '''
43    @params compressed: filename, Asset, or file-like object to uncompress
44    @params format: optional compression format (gzip, lzma)
45
46    Uncompresses @compressed into the scratch directory.
47
48    If @format is None, heuristics will be applied to guess the format
49    from the filename or Asset URL. @format must be non-None if @uncompressed
50    is a file-like object.
51
52    Returns the fully qualified path to the uncompressed file
53    '''
54    def uncompress(self, compressed, format=None):
55        self.log.debug(f"Uncompress {compressed} format={format}")
56        if type(compressed) == Asset:
57            compressed.fetch()
58
59        (name, ext) = os.path.splitext(str(compressed))
60        uncompressed = self.scratch_file(os.path.basename(name))
61
62        uncompress(compressed, uncompressed, format)
63
64        return uncompressed
65
66    '''
67    @params archive: filename, Asset, or file-like object to extract
68    @params format: optional archive format (tar, zip, deb, cpio)
69    @params sub_dir: optional sub-directory to extract into
70    @params member: optional member file to limit extraction to
71
72    Extracts @archive into the scratch directory, or a directory beneath
73    named by @sub_dir. All files are extracted unless @member specifies
74    a limit.
75
76    If @format is None, heuristics will be applied to guess the format
77    from the filename or Asset URL. @format must be non-None if @archive
78    is a file-like object.
79
80    If @member is non-None, returns the fully qualified path to @member
81    '''
82    def archive_extract(self, archive, format=None, sub_dir=None, member=None):
83        self.log.debug(f"Extract {archive} format={format}" +
84                       f"sub_dir={sub_dir} member={member}")
85        if type(archive) == Asset:
86            archive.fetch()
87        if sub_dir is None:
88            archive_extract(archive, self.scratch_file(), format, member)
89        else:
90            archive_extract(archive, self.scratch_file(sub_dir),
91                            format, member)
92
93        if member is not None:
94            return self.scratch_file(member)
95        return None
96
97    '''
98    Create a temporary directory suitable for storing UNIX
99    socket paths.
100
101    Returns: a tempfile.TemporaryDirectory instance
102    '''
103    def socket_dir(self):
104        if self.socketdir is None:
105            self.socketdir = tempfile.TemporaryDirectory(
106                prefix="qemu_func_test_sock_")
107        return self.socketdir
108
109    '''
110    @params args list of zero or more subdirectories or file
111
112    Construct a path for accessing a data file located
113    relative to the source directory that is the root for
114    functional tests.
115
116    @args may be an empty list to reference the root dir
117    itself, may be a single element to reference a file in
118    the root directory, or may be multiple elements to
119    reference a file nested below. The path components
120    will be joined using the platform appropriate path
121    separator.
122
123    Returns: string representing a file path
124    '''
125    def data_file(self, *args):
126        return str(Path(Path(__file__).parent.parent, *args))
127
128    '''
129    @params args list of zero or more subdirectories or file
130
131    Construct a path for accessing a data file located
132    relative to the build directory root.
133
134    @args may be an empty list to reference the build dir
135    itself, may be a single element to reference a file in
136    the build directory, or may be multiple elements to
137    reference a file nested below. The path components
138    will be joined using the platform appropriate path
139    separator.
140
141    Returns: string representing a file path
142    '''
143    def build_file(self, *args):
144        return str(Path(BUILD_DIR, *args))
145
146    '''
147    @params args list of zero or more subdirectories or file
148
149    Construct a path for accessing/creating a scratch file
150    located relative to a temporary directory dedicated to
151    this test case. The directory and its contents will be
152    purged upon completion of the test.
153
154    @args may be an empty list to reference the scratch dir
155    itself, may be a single element to reference a file in
156    the scratch directory, or may be multiple elements to
157    reference a file nested below. The path components
158    will be joined using the platform appropriate path
159    separator.
160
161    Returns: string representing a file path
162    '''
163    def scratch_file(self, *args):
164        return str(Path(self.workdir, *args))
165
166    '''
167    @params args list of zero or more subdirectories or file
168
169    Construct a path for accessing/creating a log file
170    located relative to a temporary directory dedicated to
171    this test case. The directory and its log files will be
172    preserved upon completion of the test.
173
174    @args may be an empty list to reference the log dir
175    itself, may be a single element to reference a file in
176    the log directory, or may be multiple elements to
177    reference a file nested below. The path components
178    will be joined using the platform appropriate path
179    separator.
180
181    Returns: string representing a file path
182    '''
183    def log_file(self, *args):
184        return str(Path(self.outputdir, *args))
185
186    def assets_available(self):
187        for name, asset in vars(self.__class__).items():
188            if name.startswith("ASSET_") and type(asset) == Asset:
189                if not asset.available():
190                    self.log.debug(f"Asset {asset.url} not available")
191                    return False
192        return True
193
194    def setUp(self):
195        self.qemu_bin = os.getenv('QEMU_TEST_QEMU_BINARY')
196        self.assertIsNotNone(self.qemu_bin, 'QEMU_TEST_QEMU_BINARY must be set')
197        self.arch = self.qemu_bin.split('-')[-1]
198        self.socketdir = None
199
200        self.outputdir = self.build_file('tests', 'functional',
201                                         self.arch, self.id())
202        self.workdir = os.path.join(self.outputdir, 'scratch')
203        os.makedirs(self.workdir, exist_ok=True)
204
205        self.log_filename = self.log_file('base.log')
206        self.log = logging.getLogger('qemu-test')
207        self.log.setLevel(logging.DEBUG)
208        self._log_fh = logging.FileHandler(self.log_filename, mode='w')
209        self._log_fh.setLevel(logging.DEBUG)
210        fileFormatter = logging.Formatter(
211            '%(asctime)s - %(levelname)s: %(message)s')
212        self._log_fh.setFormatter(fileFormatter)
213        self.log.addHandler(self._log_fh)
214
215        # Capture QEMUMachine logging
216        self.machinelog = logging.getLogger('qemu.machine')
217        self.machinelog.setLevel(logging.DEBUG)
218        self.machinelog.addHandler(self._log_fh)
219
220        if not self.assets_available():
221            self.skipTest('One or more assets is not available')
222
223    def tearDown(self):
224        if "QEMU_TEST_KEEP_SCRATCH" not in os.environ:
225            shutil.rmtree(self.workdir)
226        if self.socketdir is not None:
227            shutil.rmtree(self.socketdir.name)
228            self.socketdir = None
229        self.machinelog.removeHandler(self._log_fh)
230        self.log.removeHandler(self._log_fh)
231
232    def main():
233        path = os.path.basename(sys.argv[0])[:-3]
234
235        cache = os.environ.get("QEMU_TEST_PRECACHE", None)
236        if cache is not None:
237            Asset.precache_suites(path, cache)
238            return
239
240        tr = pycotap.TAPTestRunner(message_log = pycotap.LogMode.LogToError,
241                                   test_output_log = pycotap.LogMode.LogToError)
242        res = unittest.main(module = None, testRunner = tr, exit = False,
243                            argv=["__dummy__", path])
244        for (test, message) in res.result.errors + res.result.failures:
245
246            if hasattr(test, "log_filename"):
247                print('More information on ' + test.id() + ' could be found here:'
248                      '\n %s' % test.log_filename, file=sys.stderr)
249                if hasattr(test, 'console_log_name'):
250                    print(' %s' % test.console_log_name, file=sys.stderr)
251        sys.exit(not res.result.wasSuccessful())
252
253
254class QemuUserTest(QemuBaseTest):
255
256    def setUp(self):
257        super().setUp()
258        self._ldpath = []
259
260    def add_ldpath(self, ldpath):
261        self._ldpath.append(os.path.abspath(ldpath))
262
263    def run_cmd(self, bin_path, args=[]):
264        return run([self.qemu_bin]
265                   + ["-L %s" % ldpath for ldpath in self._ldpath]
266                   + [bin_path]
267                   + args,
268                   text=True, capture_output=True)
269
270class QemuSystemTest(QemuBaseTest):
271    """Facilitates system emulation tests."""
272
273    cpu = None
274    machine = None
275    _machinehelp = None
276
277    def setUp(self):
278        self._vms = {}
279
280        super().setUp()
281
282        console_log = logging.getLogger('console')
283        console_log.setLevel(logging.DEBUG)
284        self.console_log_name = self.log_file('console.log')
285        self._console_log_fh = logging.FileHandler(self.console_log_name,
286                                                   mode='w')
287        self._console_log_fh.setLevel(logging.DEBUG)
288        fileFormatter = logging.Formatter('%(asctime)s: %(message)s')
289        self._console_log_fh.setFormatter(fileFormatter)
290        console_log.addHandler(self._console_log_fh)
291
292    def set_machine(self, machinename):
293        # TODO: We should use QMP to get the list of available machines
294        if not self._machinehelp:
295            self._machinehelp = run(
296                [self.qemu_bin, '-M', 'help'],
297                capture_output=True, check=True, encoding='utf8').stdout
298        if self._machinehelp.find(machinename) < 0:
299            self.skipTest('no support for machine ' + machinename)
300        self.machine = machinename
301
302    def require_accelerator(self, accelerator):
303        """
304        Requires an accelerator to be available for the test to continue
305
306        It takes into account the currently set qemu binary.
307
308        If the check fails, the test is canceled.  If the check itself
309        for the given accelerator is not available, the test is also
310        canceled.
311
312        :param accelerator: name of the accelerator, such as "kvm" or "tcg"
313        :type accelerator: str
314        """
315        checker = {'tcg': tcg_available,
316                   'kvm': kvm_available}.get(accelerator)
317        if checker is None:
318            self.skipTest("Don't know how to check for the presence "
319                          "of accelerator %s" % accelerator)
320        if not checker(qemu_bin=self.qemu_bin):
321            self.skipTest("%s accelerator does not seem to be "
322                          "available" % accelerator)
323
324    def require_netdev(self, netdevname):
325        help = run([self.qemu_bin,
326                    '-M', 'none', '-netdev', 'help'],
327                   capture_output=True, check=True, encoding='utf8').stdout;
328        if help.find('\n' + netdevname + '\n') < 0:
329            self.skipTest('no support for " + netdevname + " networking')
330
331    def require_device(self, devicename):
332        help = run([self.qemu_bin,
333                    '-M', 'none', '-device', 'help'],
334                   capture_output=True, check=True, encoding='utf8').stdout;
335        if help.find(devicename) < 0:
336            self.skipTest('no support for device ' + devicename)
337
338    def _new_vm(self, name, *args):
339        vm = QEMUMachine(self.qemu_bin,
340                         name=name,
341                         base_temp_dir=self.workdir,
342                         log_dir=self.log_file())
343        self.log.debug('QEMUMachine "%s" created', name)
344        self.log.debug('QEMUMachine "%s" temp_dir: %s', name, vm.temp_dir)
345
346        sockpath = os.environ.get("QEMU_TEST_QMP_BACKDOOR", None)
347        if sockpath is not None:
348            vm.add_args("-chardev",
349                        f"socket,id=backdoor,path={sockpath},server=on,wait=off",
350                        "-mon", "chardev=backdoor,mode=control")
351
352        if args:
353            vm.add_args(*args)
354        return vm
355
356    @property
357    def vm(self):
358        return self.get_vm(name='default')
359
360    def get_vm(self, *args, name=None):
361        if not name:
362            name = str(uuid.uuid4())
363        if self._vms.get(name) is None:
364            self._vms[name] = self._new_vm(name, *args)
365            if self.cpu is not None:
366                self._vms[name].add_args('-cpu', self.cpu)
367            if self.machine is not None:
368                self._vms[name].set_machine(self.machine)
369        return self._vms[name]
370
371    def set_vm_arg(self, arg, value):
372        """
373        Set an argument to list of extra arguments to be given to the QEMU
374        binary. If the argument already exists then its value is replaced.
375
376        :param arg: the QEMU argument, such as "-cpu" in "-cpu host"
377        :type arg: str
378        :param value: the argument value, such as "host" in "-cpu host"
379        :type value: str
380        """
381        if not arg or not value:
382            return
383        if arg not in self.vm.args:
384            self.vm.args.extend([arg, value])
385        else:
386            idx = self.vm.args.index(arg) + 1
387            if idx < len(self.vm.args):
388                self.vm.args[idx] = value
389            else:
390                self.vm.args.append(value)
391
392    def tearDown(self):
393        for vm in self._vms.values():
394            vm.shutdown()
395        logging.getLogger('console').removeHandler(self._console_log_fh)
396        super().tearDown()
397