xref: /qemu/tests/functional/qemu_test/testcase.py (revision fd4abcb00839a310fc668d86177278e789f51688)
1# Test class and utilities for functional tests
2#
3# Copyright 2018, 2024 Red Hat, Inc.
4#
5# Original Author (Avocado-based tests):
6#  Cleber Rosa <crosa@redhat.com>
7#
8# Adaption for standalone version:
9#  Thomas Huth <thuth@redhat.com>
10#
11# This work is licensed under the terms of the GNU GPL, version 2 or
12# later.  See the COPYING file in the top-level directory.
13
14import logging
15import os
16from pathlib import Path
17import pycotap
18import shutil
19import subprocess
20import sys
21import tempfile
22import unittest
23import uuid
24
25from qemu.machine import QEMUMachine
26from qemu.utils import kvm_available, tcg_available
27
28from .archive import archive_extract
29from .asset import Asset
30from .cmd import run_cmd
31from .config import BUILD_DIR
32from .uncompress import uncompress
33
34
35class QemuBaseTest(unittest.TestCase):
36
37    qemu_bin = os.getenv('QEMU_TEST_QEMU_BINARY')
38    arch = None
39
40    workdir = None
41    log = None
42    logdir = None
43
44    '''
45    @params compressed: filename, Asset, or file-like object to uncompress
46    @params format: optional compression format (gzip, lzma)
47
48    Uncompresses @compressed into the scratch directory.
49
50    If @format is None, heuristics will be applied to guess the format
51    from the filename or Asset URL. @format must be non-None if @uncompressed
52    is a file-like object.
53
54    Returns the fully qualified path to the uncompressed file
55    '''
56    def uncompress(self, compressed, format=None):
57        self.log.debug(f"Uncompress {compressed} format={format}")
58        if type(compressed) == Asset:
59            compressed.fetch()
60
61        (name, ext) = os.path.splitext(str(compressed))
62        uncompressed = self.scratch_file(os.path.basename(name))
63
64        uncompress(compressed, uncompressed, format)
65
66        return uncompressed
67
68    '''
69    @params archive: filename, Asset, or file-like object to extract
70    @params format: optional archive format (tar, zip, deb, cpio)
71    @params sub_dir: optional sub-directory to extract into
72    @params member: optional member file to limit extraction to
73
74    Extracts @archive into the scratch directory, or a directory beneath
75    named by @sub_dir. All files are extracted unless @member specifies
76    a limit.
77
78    If @format is None, heuristics will be applied to guess the format
79    from the filename or Asset URL. @format must be non-None if @archive
80    is a file-like object.
81
82    If @member is non-None, returns the fully qualified path to @member
83    '''
84    def archive_extract(self, archive, format=None, sub_dir=None, member=None):
85        self.log.debug(f"Extract {archive} format={format}" +
86                       f"sub_dir={sub_dir} member={member}")
87        if type(archive) == Asset:
88            archive.fetch()
89        if sub_dir is None:
90            archive_extract(archive, self.scratch_file(), format, member)
91        else:
92            archive_extract(archive, self.scratch_file(sub_dir),
93                            format, member)
94
95        if member is not None:
96            return self.scratch_file(member)
97        return None
98
99    '''
100    Create a temporary directory suitable for storing UNIX
101    socket paths.
102
103    Returns: a tempfile.TemporaryDirectory instance
104    '''
105    def socket_dir(self):
106        if self.socketdir is None:
107            self.socketdir = tempfile.TemporaryDirectory(
108                prefix="qemu_func_test_sock_")
109        return self.socketdir
110
111    '''
112    @params args list of zero or more subdirectories or file
113
114    Construct a path for accessing a data file located
115    relative to the source directory that is the root for
116    functional tests.
117
118    @args may be an empty list to reference the root dir
119    itself, may be a single element to reference a file in
120    the root directory, or may be multiple elements to
121    reference a file nested below. The path components
122    will be joined using the platform appropriate path
123    separator.
124
125    Returns: string representing a file path
126    '''
127    def data_file(self, *args):
128        return str(Path(Path(__file__).parent.parent, *args))
129
130    '''
131    @params args list of zero or more subdirectories or file
132
133    Construct a path for accessing a data file located
134    relative to the build directory root.
135
136    @args may be an empty list to reference the build dir
137    itself, may be a single element to reference a file in
138    the build directory, or may be multiple elements to
139    reference a file nested below. The path components
140    will be joined using the platform appropriate path
141    separator.
142
143    Returns: string representing a file path
144    '''
145    def build_file(self, *args):
146        return str(Path(BUILD_DIR, *args))
147
148    '''
149    @params args list of zero or more subdirectories or file
150
151    Construct a path for accessing/creating a scratch file
152    located relative to a temporary directory dedicated to
153    this test case. The directory and its contents will be
154    purged upon completion of the test.
155
156    @args may be an empty list to reference the scratch dir
157    itself, may be a single element to reference a file in
158    the scratch directory, or may be multiple elements to
159    reference a file nested below. The path components
160    will be joined using the platform appropriate path
161    separator.
162
163    Returns: string representing a file path
164    '''
165    def scratch_file(self, *args):
166        return str(Path(self.workdir, *args))
167
168    '''
169    @params args list of zero or more subdirectories or file
170
171    Construct a path for accessing/creating a log file
172    located relative to a temporary directory dedicated to
173    this test case. The directory and its log files will be
174    preserved upon completion of the test.
175
176    @args may be an empty list to reference the log dir
177    itself, may be a single element to reference a file in
178    the log directory, or may be multiple elements to
179    reference a file nested below. The path components
180    will be joined using the platform appropriate path
181    separator.
182
183    Returns: string representing a file path
184    '''
185    def log_file(self, *args):
186        return str(Path(self.outputdir, *args))
187
188    def setUp(self, bin_prefix):
189        self.assertIsNotNone(self.qemu_bin, 'QEMU_TEST_QEMU_BINARY must be set')
190        self.arch = self.qemu_bin.split('-')[-1]
191        self.socketdir = None
192
193        self.outputdir = self.build_file('tests', 'functional',
194                                         self.arch, self.id())
195        self.workdir = os.path.join(self.outputdir, 'scratch')
196        os.makedirs(self.workdir, exist_ok=True)
197
198        self.log_filename = self.log_file('base.log')
199        self.log = logging.getLogger('qemu-test')
200        self.log.setLevel(logging.DEBUG)
201        self._log_fh = logging.FileHandler(self.log_filename, mode='w')
202        self._log_fh.setLevel(logging.DEBUG)
203        fileFormatter = logging.Formatter(
204            '%(asctime)s - %(levelname)s: %(message)s')
205        self._log_fh.setFormatter(fileFormatter)
206        self.log.addHandler(self._log_fh)
207
208        # Capture QEMUMachine logging
209        self.machinelog = logging.getLogger('qemu.machine')
210        self.machinelog.setLevel(logging.DEBUG)
211        self.machinelog.addHandler(self._log_fh)
212
213    def tearDown(self):
214        if "QEMU_TEST_KEEP_SCRATCH" not in os.environ:
215            shutil.rmtree(self.workdir)
216        if self.socketdir is not None:
217            shutil.rmtree(self.socketdir.name)
218            self.socketdir = None
219        self.machinelog.removeHandler(self._log_fh)
220        self.log.removeHandler(self._log_fh)
221
222    def main():
223        path = os.path.basename(sys.argv[0])[:-3]
224
225        cache = os.environ.get("QEMU_TEST_PRECACHE", None)
226        if cache is not None:
227            Asset.precache_suites(path, cache)
228            return
229
230        tr = pycotap.TAPTestRunner(message_log = pycotap.LogMode.LogToError,
231                                   test_output_log = pycotap.LogMode.LogToError)
232        res = unittest.main(module = None, testRunner = tr, exit = False,
233                            argv=["__dummy__", path])
234        for (test, message) in res.result.errors + res.result.failures:
235
236            if hasattr(test, "log_filename"):
237                print('More information on ' + test.id() + ' could be found here:'
238                      '\n %s' % test.log_filename, file=sys.stderr)
239                if hasattr(test, 'console_log_name'):
240                    print(' %s' % test.console_log_name, file=sys.stderr)
241        sys.exit(not res.result.wasSuccessful())
242
243
244class QemuUserTest(QemuBaseTest):
245
246    def setUp(self):
247        super().setUp('qemu-')
248        self._ldpath = []
249
250    def add_ldpath(self, ldpath):
251        self._ldpath.append(os.path.abspath(ldpath))
252
253    def run_cmd(self, bin_path, args=[]):
254        return subprocess.run([self.qemu_bin]
255                              + ["-L %s" % ldpath for ldpath in self._ldpath]
256                              + [bin_path]
257                              + args,
258                              text=True, capture_output=True)
259
260class QemuSystemTest(QemuBaseTest):
261    """Facilitates system emulation tests."""
262
263    cpu = None
264    machine = None
265    _machinehelp = None
266
267    def setUp(self):
268        self._vms = {}
269
270        super().setUp('qemu-system-')
271
272        console_log = logging.getLogger('console')
273        console_log.setLevel(logging.DEBUG)
274        self.console_log_name = self.log_file('console.log')
275        self._console_log_fh = logging.FileHandler(self.console_log_name,
276                                                   mode='w')
277        self._console_log_fh.setLevel(logging.DEBUG)
278        fileFormatter = logging.Formatter('%(asctime)s: %(message)s')
279        self._console_log_fh.setFormatter(fileFormatter)
280        console_log.addHandler(self._console_log_fh)
281
282    def set_machine(self, machinename):
283        # TODO: We should use QMP to get the list of available machines
284        if not self._machinehelp:
285            self._machinehelp = run_cmd([self.qemu_bin, '-M', 'help'])[0];
286        if self._machinehelp.find(machinename) < 0:
287            self.skipTest('no support for machine ' + machinename)
288        self.machine = machinename
289
290    def require_accelerator(self, accelerator):
291        """
292        Requires an accelerator to be available for the test to continue
293
294        It takes into account the currently set qemu binary.
295
296        If the check fails, the test is canceled.  If the check itself
297        for the given accelerator is not available, the test is also
298        canceled.
299
300        :param accelerator: name of the accelerator, such as "kvm" or "tcg"
301        :type accelerator: str
302        """
303        checker = {'tcg': tcg_available,
304                   'kvm': kvm_available}.get(accelerator)
305        if checker is None:
306            self.skipTest("Don't know how to check for the presence "
307                          "of accelerator %s" % accelerator)
308        if not checker(qemu_bin=self.qemu_bin):
309            self.skipTest("%s accelerator does not seem to be "
310                          "available" % accelerator)
311
312    def require_netdev(self, netdevname):
313        netdevhelp = run_cmd([self.qemu_bin,
314                             '-M', 'none', '-netdev', 'help'])[0];
315        if netdevhelp.find('\n' + netdevname + '\n') < 0:
316            self.skipTest('no support for " + netdevname + " networking')
317
318    def require_device(self, devicename):
319        devhelp = run_cmd([self.qemu_bin,
320                           '-M', 'none', '-device', 'help'])[0];
321        if devhelp.find(devicename) < 0:
322            self.skipTest('no support for device ' + devicename)
323
324    def _new_vm(self, name, *args):
325        vm = QEMUMachine(self.qemu_bin,
326                         name=name,
327                         base_temp_dir=self.workdir,
328                         log_dir=self.log_file())
329        self.log.debug('QEMUMachine "%s" created', name)
330        self.log.debug('QEMUMachine "%s" temp_dir: %s', name, vm.temp_dir)
331
332        sockpath = os.environ.get("QEMU_TEST_QMP_BACKDOOR", None)
333        if sockpath is not None:
334            vm.add_args("-chardev",
335                        f"socket,id=backdoor,path={sockpath},server=on,wait=off",
336                        "-mon", "chardev=backdoor,mode=control")
337
338        if args:
339            vm.add_args(*args)
340        return vm
341
342    @property
343    def vm(self):
344        return self.get_vm(name='default')
345
346    def get_vm(self, *args, name=None):
347        if not name:
348            name = str(uuid.uuid4())
349        if self._vms.get(name) is None:
350            self._vms[name] = self._new_vm(name, *args)
351            if self.cpu is not None:
352                self._vms[name].add_args('-cpu', self.cpu)
353            if self.machine is not None:
354                self._vms[name].set_machine(self.machine)
355        return self._vms[name]
356
357    def set_vm_arg(self, arg, value):
358        """
359        Set an argument to list of extra arguments to be given to the QEMU
360        binary. If the argument already exists then its value is replaced.
361
362        :param arg: the QEMU argument, such as "-cpu" in "-cpu host"
363        :type arg: str
364        :param value: the argument value, such as "host" in "-cpu host"
365        :type value: str
366        """
367        if not arg or not value:
368            return
369        if arg not in self.vm.args:
370            self.vm.args.extend([arg, value])
371        else:
372            idx = self.vm.args.index(arg) + 1
373            if idx < len(self.vm.args):
374                self.vm.args[idx] = value
375            else:
376                self.vm.args.append(value)
377
378    def tearDown(self):
379        for vm in self._vms.values():
380            vm.shutdown()
381        logging.getLogger('console').removeHandler(self._console_log_fh)
382        super().tearDown()
383