xref: /qemu/tests/functional/qemu_test/testcase.py (revision 8b5a0dd3a8a4526bb91430b7f548c95d46093dc1)
1# Test class and utilities for functional tests
2#
3# Copyright 2018, 2024 Red Hat, Inc.
4#
5# Original Author (Avocado-based tests):
6#  Cleber Rosa <crosa@redhat.com>
7#
8# Adaption for standalone version:
9#  Thomas Huth <thuth@redhat.com>
10#
11# This work is licensed under the terms of the GNU GPL, version 2 or
12# later.  See the COPYING file in the top-level directory.
13
14import logging
15import os
16from pathlib import Path
17import pycotap
18import shutil
19import subprocess
20import sys
21import tempfile
22import unittest
23import uuid
24
25from qemu.machine import QEMUMachine
26from qemu.utils import kvm_available, tcg_available
27
28from .asset import Asset
29from .cmd import run_cmd
30from .config import BUILD_DIR
31
32
33class QemuBaseTest(unittest.TestCase):
34
35    qemu_bin = os.getenv('QEMU_TEST_QEMU_BINARY')
36    arch = None
37
38    workdir = None
39    log = None
40    logdir = None
41
42    '''
43    Create a temporary directory suitable for storing UNIX
44    socket paths.
45
46    Returns: a tempfile.TemporaryDirectory instance
47    '''
48    def socket_dir(self):
49        if self.socketdir is None:
50            self.socketdir = tempfile.TemporaryDirectory(
51                prefix="qemu_func_test_sock_")
52        return self.socketdir
53
54    '''
55    @params args list of zero or more subdirectories or file
56
57    Construct a path for accessing a data file located
58    relative to the source directory that is the root for
59    functional tests.
60
61    @args may be an empty list to reference the root dir
62    itself, may be a single element to reference a file in
63    the root directory, or may be multiple elements to
64    reference a file nested below. The path components
65    will be joined using the platform appropriate path
66    separator.
67
68    Returns: string representing a file path
69    '''
70    def data_file(self, *args):
71        return str(Path(Path(__file__).parent.parent, *args))
72
73    '''
74    @params args list of zero or more subdirectories or file
75
76    Construct a path for accessing a data file located
77    relative to the build directory root.
78
79    @args may be an empty list to reference the build dir
80    itself, may be a single element to reference a file in
81    the build directory, or may be multiple elements to
82    reference a file nested below. The path components
83    will be joined using the platform appropriate path
84    separator.
85
86    Returns: string representing a file path
87    '''
88    def build_file(self, *args):
89        return str(Path(BUILD_DIR, *args))
90
91    '''
92    @params args list of zero or more subdirectories or file
93
94    Construct a path for accessing/creating a scratch file
95    located relative to a temporary directory dedicated to
96    this test case. The directory and its contents will be
97    purged upon completion of the test.
98
99    @args may be an empty list to reference the scratch dir
100    itself, may be a single element to reference a file in
101    the scratch directory, or may be multiple elements to
102    reference a file nested below. The path components
103    will be joined using the platform appropriate path
104    separator.
105
106    Returns: string representing a file path
107    '''
108    def scratch_file(self, *args):
109        return str(Path(self.workdir, *args))
110
111    '''
112    @params args list of zero or more subdirectories or file
113
114    Construct a path for accessing/creating a log file
115    located relative to a temporary directory dedicated to
116    this test case. The directory and its log files will be
117    preserved upon completion of the test.
118
119    @args may be an empty list to reference the log dir
120    itself, may be a single element to reference a file in
121    the log directory, or may be multiple elements to
122    reference a file nested below. The path components
123    will be joined using the platform appropriate path
124    separator.
125
126    Returns: string representing a file path
127    '''
128    def log_file(self, *args):
129        return str(Path(self.outputdir, *args))
130
131    def setUp(self, bin_prefix):
132        self.assertIsNotNone(self.qemu_bin, 'QEMU_TEST_QEMU_BINARY must be set')
133        self.arch = self.qemu_bin.split('-')[-1]
134        self.socketdir = None
135
136        self.outputdir = self.build_file('tests', 'functional',
137                                         self.arch, self.id())
138        self.workdir = os.path.join(self.outputdir, 'scratch')
139        os.makedirs(self.workdir, exist_ok=True)
140
141        self.log_filename = self.log_file('base.log')
142        self.log = logging.getLogger('qemu-test')
143        self.log.setLevel(logging.DEBUG)
144        self._log_fh = logging.FileHandler(self.log_filename, mode='w')
145        self._log_fh.setLevel(logging.DEBUG)
146        fileFormatter = logging.Formatter(
147            '%(asctime)s - %(levelname)s: %(message)s')
148        self._log_fh.setFormatter(fileFormatter)
149        self.log.addHandler(self._log_fh)
150
151        # Capture QEMUMachine logging
152        self.machinelog = logging.getLogger('qemu.machine')
153        self.machinelog.setLevel(logging.DEBUG)
154        self.machinelog.addHandler(self._log_fh)
155
156    def tearDown(self):
157        if "QEMU_TEST_KEEP_SCRATCH" not in os.environ:
158            shutil.rmtree(self.workdir)
159        if self.socketdir is not None:
160            shutil.rmtree(self.socketdir.name)
161            self.socketdir = None
162        self.machinelog.removeHandler(self._log_fh)
163        self.log.removeHandler(self._log_fh)
164
165    def main():
166        path = os.path.basename(sys.argv[0])[:-3]
167
168        cache = os.environ.get("QEMU_TEST_PRECACHE", None)
169        if cache is not None:
170            Asset.precache_suites(path, cache)
171            return
172
173        tr = pycotap.TAPTestRunner(message_log = pycotap.LogMode.LogToError,
174                                   test_output_log = pycotap.LogMode.LogToError)
175        res = unittest.main(module = None, testRunner = tr, exit = False,
176                            argv=["__dummy__", path])
177        for (test, message) in res.result.errors + res.result.failures:
178
179            if hasattr(test, "log_filename"):
180                print('More information on ' + test.id() + ' could be found here:'
181                      '\n %s' % test.log_filename, file=sys.stderr)
182                if hasattr(test, 'console_log_name'):
183                    print(' %s' % test.console_log_name, file=sys.stderr)
184        sys.exit(not res.result.wasSuccessful())
185
186
187class QemuUserTest(QemuBaseTest):
188
189    def setUp(self):
190        super().setUp('qemu-')
191        self._ldpath = []
192
193    def add_ldpath(self, ldpath):
194        self._ldpath.append(os.path.abspath(ldpath))
195
196    def run_cmd(self, bin_path, args=[]):
197        return subprocess.run([self.qemu_bin]
198                              + ["-L %s" % ldpath for ldpath in self._ldpath]
199                              + [bin_path]
200                              + args,
201                              text=True, capture_output=True)
202
203class QemuSystemTest(QemuBaseTest):
204    """Facilitates system emulation tests."""
205
206    cpu = None
207    machine = None
208    _machinehelp = None
209
210    def setUp(self):
211        self._vms = {}
212
213        super().setUp('qemu-system-')
214
215        console_log = logging.getLogger('console')
216        console_log.setLevel(logging.DEBUG)
217        self.console_log_name = self.log_file('console.log')
218        self._console_log_fh = logging.FileHandler(self.console_log_name,
219                                                   mode='w')
220        self._console_log_fh.setLevel(logging.DEBUG)
221        fileFormatter = logging.Formatter('%(asctime)s: %(message)s')
222        self._console_log_fh.setFormatter(fileFormatter)
223        console_log.addHandler(self._console_log_fh)
224
225    def set_machine(self, machinename):
226        # TODO: We should use QMP to get the list of available machines
227        if not self._machinehelp:
228            self._machinehelp = run_cmd([self.qemu_bin, '-M', 'help'])[0];
229        if self._machinehelp.find(machinename) < 0:
230            self.skipTest('no support for machine ' + machinename)
231        self.machine = machinename
232
233    def require_accelerator(self, accelerator):
234        """
235        Requires an accelerator to be available for the test to continue
236
237        It takes into account the currently set qemu binary.
238
239        If the check fails, the test is canceled.  If the check itself
240        for the given accelerator is not available, the test is also
241        canceled.
242
243        :param accelerator: name of the accelerator, such as "kvm" or "tcg"
244        :type accelerator: str
245        """
246        checker = {'tcg': tcg_available,
247                   'kvm': kvm_available}.get(accelerator)
248        if checker is None:
249            self.skipTest("Don't know how to check for the presence "
250                          "of accelerator %s" % accelerator)
251        if not checker(qemu_bin=self.qemu_bin):
252            self.skipTest("%s accelerator does not seem to be "
253                          "available" % accelerator)
254
255    def require_netdev(self, netdevname):
256        netdevhelp = run_cmd([self.qemu_bin,
257                             '-M', 'none', '-netdev', 'help'])[0];
258        if netdevhelp.find('\n' + netdevname + '\n') < 0:
259            self.skipTest('no support for " + netdevname + " networking')
260
261    def require_device(self, devicename):
262        devhelp = run_cmd([self.qemu_bin,
263                           '-M', 'none', '-device', 'help'])[0];
264        if devhelp.find(devicename) < 0:
265            self.skipTest('no support for device ' + devicename)
266
267    def _new_vm(self, name, *args):
268        vm = QEMUMachine(self.qemu_bin,
269                         name=name,
270                         base_temp_dir=self.workdir,
271                         log_dir=self.log_file())
272        self.log.debug('QEMUMachine "%s" created', name)
273        self.log.debug('QEMUMachine "%s" temp_dir: %s', name, vm.temp_dir)
274
275        sockpath = os.environ.get("QEMU_TEST_QMP_BACKDOOR", None)
276        if sockpath is not None:
277            vm.add_args("-chardev",
278                        f"socket,id=backdoor,path={sockpath},server=on,wait=off",
279                        "-mon", "chardev=backdoor,mode=control")
280
281        if args:
282            vm.add_args(*args)
283        return vm
284
285    @property
286    def vm(self):
287        return self.get_vm(name='default')
288
289    def get_vm(self, *args, name=None):
290        if not name:
291            name = str(uuid.uuid4())
292        if self._vms.get(name) is None:
293            self._vms[name] = self._new_vm(name, *args)
294            if self.cpu is not None:
295                self._vms[name].add_args('-cpu', self.cpu)
296            if self.machine is not None:
297                self._vms[name].set_machine(self.machine)
298        return self._vms[name]
299
300    def set_vm_arg(self, arg, value):
301        """
302        Set an argument to list of extra arguments to be given to the QEMU
303        binary. If the argument already exists then its value is replaced.
304
305        :param arg: the QEMU argument, such as "-cpu" in "-cpu host"
306        :type arg: str
307        :param value: the argument value, such as "host" in "-cpu host"
308        :type value: str
309        """
310        if not arg or not value:
311            return
312        if arg not in self.vm.args:
313            self.vm.args.extend([arg, value])
314        else:
315            idx = self.vm.args.index(arg) + 1
316            if idx < len(self.vm.args):
317                self.vm.args[idx] = value
318            else:
319                self.vm.args.append(value)
320
321    def tearDown(self):
322        for vm in self._vms.values():
323            vm.shutdown()
324        logging.getLogger('console').removeHandler(self._console_log_fh)
325        super().tearDown()
326