xref: /qemu/tests/functional/qemu_test/testcase.py (revision 9f85aff93f5dcedb70819a5ed7796b6df90fdf2d)
1# Test class and utilities for functional tests
2#
3# Copyright 2018, 2024 Red Hat, Inc.
4#
5# Original Author (Avocado-based tests):
6#  Cleber Rosa <crosa@redhat.com>
7#
8# Adaption for standalone version:
9#  Thomas Huth <thuth@redhat.com>
10#
11# This work is licensed under the terms of the GNU GPL, version 2 or
12# later.  See the COPYING file in the top-level directory.
13
14import logging
15import os
16import pycotap
17import shutil
18import subprocess
19import sys
20import unittest
21import uuid
22
23from qemu.machine import QEMUMachine
24from qemu.utils import kvm_available, tcg_available
25
26from .asset import Asset
27from .cmd import run_cmd
28from .config import BUILD_DIR
29
30
31class QemuBaseTest(unittest.TestCase):
32
33    qemu_bin = os.getenv('QEMU_TEST_QEMU_BINARY')
34    arch = None
35
36    workdir = None
37    log = None
38    logdir = None
39
40    def setUp(self, bin_prefix):
41        self.assertIsNotNone(self.qemu_bin, 'QEMU_TEST_QEMU_BINARY must be set')
42        self.arch = self.qemu_bin.split('-')[-1]
43
44        self.outputdir = os.path.join(BUILD_DIR, 'tests', 'functional',
45                                      self.arch, self.id())
46        self.workdir = os.path.join(self.outputdir, 'scratch')
47        os.makedirs(self.workdir, exist_ok=True)
48
49        self.logdir = self.outputdir
50        self.log_filename = os.path.join(self.logdir, 'base.log')
51        self.log = logging.getLogger('qemu-test')
52        self.log.setLevel(logging.DEBUG)
53        self._log_fh = logging.FileHandler(self.log_filename, mode='w')
54        self._log_fh.setLevel(logging.DEBUG)
55        fileFormatter = logging.Formatter(
56            '%(asctime)s - %(levelname)s: %(message)s')
57        self._log_fh.setFormatter(fileFormatter)
58        self.log.addHandler(self._log_fh)
59
60    def tearDown(self):
61        if "QEMU_TEST_KEEP_SCRATCH" not in os.environ:
62            shutil.rmtree(self.workdir)
63        self.log.removeHandler(self._log_fh)
64
65    def main():
66        path = os.path.basename(sys.argv[0])[:-3]
67
68        cache = os.environ.get("QEMU_TEST_PRECACHE", None)
69        if cache is not None:
70            Asset.precache_suites(path, cache)
71            return
72
73        tr = pycotap.TAPTestRunner(message_log = pycotap.LogMode.LogToError,
74                                   test_output_log = pycotap.LogMode.LogToError)
75        res = unittest.main(module = None, testRunner = tr, exit = False,
76                            argv=["__dummy__", path])
77        for (test, message) in res.result.errors + res.result.failures:
78            print('More information on ' + test.id() + ' could be found here:'
79                  '\n %s' % test.log_filename, file=sys.stderr)
80            if hasattr(test, 'console_log_name'):
81                print(' %s' % test.console_log_name, file=sys.stderr)
82        sys.exit(not res.result.wasSuccessful())
83
84
85class QemuUserTest(QemuBaseTest):
86
87    def setUp(self):
88        super().setUp('qemu-')
89        self._ldpath = []
90
91    def add_ldpath(self, ldpath):
92        self._ldpath.append(os.path.abspath(ldpath))
93
94    def run_cmd(self, bin_path, args=[]):
95        return subprocess.run([self.qemu_bin]
96                              + ["-L %s" % ldpath for ldpath in self._ldpath]
97                              + [bin_path]
98                              + args,
99                              text=True, capture_output=True)
100
101class QemuSystemTest(QemuBaseTest):
102    """Facilitates system emulation tests."""
103
104    cpu = None
105    machine = None
106    _machinehelp = None
107
108    def setUp(self):
109        self._vms = {}
110
111        super().setUp('qemu-system-')
112
113        console_log = logging.getLogger('console')
114        console_log.setLevel(logging.DEBUG)
115        self.console_log_name = os.path.join(self.logdir, 'console.log')
116        self._console_log_fh = logging.FileHandler(self.console_log_name,
117                                                   mode='w')
118        self._console_log_fh.setLevel(logging.DEBUG)
119        fileFormatter = logging.Formatter('%(asctime)s: %(message)s')
120        self._console_log_fh.setFormatter(fileFormatter)
121        console_log.addHandler(self._console_log_fh)
122
123    def set_machine(self, machinename):
124        # TODO: We should use QMP to get the list of available machines
125        if not self._machinehelp:
126            self._machinehelp = run_cmd([self.qemu_bin, '-M', 'help'])[0];
127        if self._machinehelp.find(machinename) < 0:
128            self.skipTest('no support for machine ' + machinename)
129        self.machine = machinename
130
131    def require_accelerator(self, accelerator):
132        """
133        Requires an accelerator to be available for the test to continue
134
135        It takes into account the currently set qemu binary.
136
137        If the check fails, the test is canceled.  If the check itself
138        for the given accelerator is not available, the test is also
139        canceled.
140
141        :param accelerator: name of the accelerator, such as "kvm" or "tcg"
142        :type accelerator: str
143        """
144        checker = {'tcg': tcg_available,
145                   'kvm': kvm_available}.get(accelerator)
146        if checker is None:
147            self.skipTest("Don't know how to check for the presence "
148                          "of accelerator %s" % accelerator)
149        if not checker(qemu_bin=self.qemu_bin):
150            self.skipTest("%s accelerator does not seem to be "
151                          "available" % accelerator)
152
153    def require_netdev(self, netdevname):
154        netdevhelp = run_cmd([self.qemu_bin,
155                             '-M', 'none', '-netdev', 'help'])[0];
156        if netdevhelp.find('\n' + netdevname + '\n') < 0:
157            self.skipTest('no support for " + netdevname + " networking')
158
159    def require_device(self, devicename):
160        devhelp = run_cmd([self.qemu_bin,
161                           '-M', 'none', '-device', 'help'])[0];
162        if devhelp.find(devicename) < 0:
163            self.skipTest('no support for device ' + devicename)
164
165    def _new_vm(self, name, *args):
166        vm = QEMUMachine(self.qemu_bin,
167                         name=name,
168                         base_temp_dir=self.workdir,
169                         log_dir=self.logdir)
170        self.log.debug('QEMUMachine "%s" created', name)
171        self.log.debug('QEMUMachine "%s" temp_dir: %s', name, vm.temp_dir)
172        if args:
173            vm.add_args(*args)
174        return vm
175
176    @property
177    def vm(self):
178        return self.get_vm(name='default')
179
180    def get_vm(self, *args, name=None):
181        if not name:
182            name = str(uuid.uuid4())
183        if self._vms.get(name) is None:
184            self._vms[name] = self._new_vm(name, *args)
185            if self.cpu is not None:
186                self._vms[name].add_args('-cpu', self.cpu)
187            if self.machine is not None:
188                self._vms[name].set_machine(self.machine)
189        return self._vms[name]
190
191    def set_vm_arg(self, arg, value):
192        """
193        Set an argument to list of extra arguments to be given to the QEMU
194        binary. If the argument already exists then its value is replaced.
195
196        :param arg: the QEMU argument, such as "-cpu" in "-cpu host"
197        :type arg: str
198        :param value: the argument value, such as "host" in "-cpu host"
199        :type value: str
200        """
201        if not arg or not value:
202            return
203        if arg not in self.vm.args:
204            self.vm.args.extend([arg, value])
205        else:
206            idx = self.vm.args.index(arg) + 1
207            if idx < len(self.vm.args):
208                self.vm.args[idx] = value
209            else:
210                self.vm.args.append(value)
211
212    def tearDown(self):
213        for vm in self._vms.values():
214            vm.shutdown()
215        logging.getLogger('console').removeHandler(self._console_log_fh)
216        super().tearDown()
217