xref: /qemu/tests/functional/qemu_test/testcase.py (revision f84f8e71eb4bdd193e89c6550a015277b686b0af)
1# Test class and utilities for functional tests
2#
3# Copyright 2018, 2024 Red Hat, Inc.
4#
5# Original Author (Avocado-based tests):
6#  Cleber Rosa <crosa@redhat.com>
7#
8# Adaption for standalone version:
9#  Thomas Huth <thuth@redhat.com>
10#
11# This work is licensed under the terms of the GNU GPL, version 2 or
12# later.  See the COPYING file in the top-level directory.
13
14import logging
15import os
16from pathlib import Path
17import pycotap
18import shutil
19import subprocess
20import sys
21import tempfile
22import unittest
23import uuid
24
25from qemu.machine import QEMUMachine
26from qemu.utils import kvm_available, tcg_available
27
28from .asset import Asset
29from .cmd import run_cmd
30from .config import BUILD_DIR
31
32
33class QemuBaseTest(unittest.TestCase):
34
35    qemu_bin = os.getenv('QEMU_TEST_QEMU_BINARY')
36    arch = None
37
38    workdir = None
39    log = None
40    logdir = None
41
42    '''
43    Create a temporary directory suitable for storing UNIX
44    socket paths.
45
46    Returns: a tempfile.TemporaryDirectory instance
47    '''
48    def socket_dir(self):
49        if self.socketdir is None:
50            self.socketdir = tempfile.TemporaryDirectory(
51                prefix="qemu_func_test_sock_")
52        return self.socketdir
53
54    '''
55    @params args list of zero or more subdirectories or file
56
57    Construct a path for accessing a data file located
58    relative to the source directory that is the root for
59    functional tests.
60
61    @args may be an empty list to reference the root dir
62    itself, may be a single element to reference a file in
63    the root directory, or may be multiple elements to
64    reference a file nested below. The path components
65    will be joined using the platform appropriate path
66    separator.
67
68    Returns: string representing a file path
69    '''
70    def data_file(self, *args):
71        return str(Path(Path(__file__).parent.parent, *args))
72
73    '''
74    @params args list of zero or more subdirectories or file
75
76    Construct a path for accessing a data file located
77    relative to the build directory root.
78
79    @args may be an empty list to reference the build dir
80    itself, may be a single element to reference a file in
81    the build directory, or may be multiple elements to
82    reference a file nested below. The path components
83    will be joined using the platform appropriate path
84    separator.
85
86    Returns: string representing a file path
87    '''
88    def build_file(self, *args):
89        return str(Path(BUILD_DIR, *args))
90
91    '''
92    @params args list of zero or more subdirectories or file
93
94    Construct a path for accessing/creating a scratch file
95    located relative to a temporary directory dedicated to
96    this test case. The directory and its contents will be
97    purged upon completion of the test.
98
99    @args may be an empty list to reference the scratch dir
100    itself, may be a single element to reference a file in
101    the scratch directory, or may be multiple elements to
102    reference a file nested below. The path components
103    will be joined using the platform appropriate path
104    separator.
105
106    Returns: string representing a file path
107    '''
108    def scratch_file(self, *args):
109        return str(Path(self.workdir, *args))
110
111    '''
112    @params args list of zero or more subdirectories or file
113
114    Construct a path for accessing/creating a log file
115    located relative to a temporary directory dedicated to
116    this test case. The directory and its log files will be
117    preserved upon completion of the test.
118
119    @args may be an empty list to reference the log dir
120    itself, may be a single element to reference a file in
121    the log directory, or may be multiple elements to
122    reference a file nested below. The path components
123    will be joined using the platform appropriate path
124    separator.
125
126    Returns: string representing a file path
127    '''
128    def log_file(self, *args):
129        return str(Path(self.logdir, *args))
130
131    def setUp(self, bin_prefix):
132        self.assertIsNotNone(self.qemu_bin, 'QEMU_TEST_QEMU_BINARY must be set')
133        self.arch = self.qemu_bin.split('-')[-1]
134        self.socketdir = None
135
136        self.outputdir = os.path.join(BUILD_DIR, 'tests', 'functional',
137                                      self.arch, self.id())
138        self.workdir = os.path.join(self.outputdir, 'scratch')
139        os.makedirs(self.workdir, exist_ok=True)
140
141        self.logdir = self.outputdir
142        self.log_filename = os.path.join(self.logdir, 'base.log')
143        self.log = logging.getLogger('qemu-test')
144        self.log.setLevel(logging.DEBUG)
145        self._log_fh = logging.FileHandler(self.log_filename, mode='w')
146        self._log_fh.setLevel(logging.DEBUG)
147        fileFormatter = logging.Formatter(
148            '%(asctime)s - %(levelname)s: %(message)s')
149        self._log_fh.setFormatter(fileFormatter)
150        self.log.addHandler(self._log_fh)
151
152        # Capture QEMUMachine logging
153        self.machinelog = logging.getLogger('qemu.machine')
154        self.machinelog.setLevel(logging.DEBUG)
155        self.machinelog.addHandler(self._log_fh)
156
157    def tearDown(self):
158        if "QEMU_TEST_KEEP_SCRATCH" not in os.environ:
159            shutil.rmtree(self.workdir)
160        if self.socketdir is not None:
161            shutil.rmtree(self.socketdir.name)
162            self.socketdir = None
163        self.machinelog.removeHandler(self._log_fh)
164        self.log.removeHandler(self._log_fh)
165
166    def main():
167        path = os.path.basename(sys.argv[0])[:-3]
168
169        cache = os.environ.get("QEMU_TEST_PRECACHE", None)
170        if cache is not None:
171            Asset.precache_suites(path, cache)
172            return
173
174        tr = pycotap.TAPTestRunner(message_log = pycotap.LogMode.LogToError,
175                                   test_output_log = pycotap.LogMode.LogToError)
176        res = unittest.main(module = None, testRunner = tr, exit = False,
177                            argv=["__dummy__", path])
178        for (test, message) in res.result.errors + res.result.failures:
179
180            if hasattr(test, "log_filename"):
181                print('More information on ' + test.id() + ' could be found here:'
182                      '\n %s' % test.log_filename, file=sys.stderr)
183                if hasattr(test, 'console_log_name'):
184                    print(' %s' % test.console_log_name, file=sys.stderr)
185        sys.exit(not res.result.wasSuccessful())
186
187
188class QemuUserTest(QemuBaseTest):
189
190    def setUp(self):
191        super().setUp('qemu-')
192        self._ldpath = []
193
194    def add_ldpath(self, ldpath):
195        self._ldpath.append(os.path.abspath(ldpath))
196
197    def run_cmd(self, bin_path, args=[]):
198        return subprocess.run([self.qemu_bin]
199                              + ["-L %s" % ldpath for ldpath in self._ldpath]
200                              + [bin_path]
201                              + args,
202                              text=True, capture_output=True)
203
204class QemuSystemTest(QemuBaseTest):
205    """Facilitates system emulation tests."""
206
207    cpu = None
208    machine = None
209    _machinehelp = None
210
211    def setUp(self):
212        self._vms = {}
213
214        super().setUp('qemu-system-')
215
216        console_log = logging.getLogger('console')
217        console_log.setLevel(logging.DEBUG)
218        self.console_log_name = os.path.join(self.logdir, 'console.log')
219        self._console_log_fh = logging.FileHandler(self.console_log_name,
220                                                   mode='w')
221        self._console_log_fh.setLevel(logging.DEBUG)
222        fileFormatter = logging.Formatter('%(asctime)s: %(message)s')
223        self._console_log_fh.setFormatter(fileFormatter)
224        console_log.addHandler(self._console_log_fh)
225
226    def set_machine(self, machinename):
227        # TODO: We should use QMP to get the list of available machines
228        if not self._machinehelp:
229            self._machinehelp = run_cmd([self.qemu_bin, '-M', 'help'])[0];
230        if self._machinehelp.find(machinename) < 0:
231            self.skipTest('no support for machine ' + machinename)
232        self.machine = machinename
233
234    def require_accelerator(self, accelerator):
235        """
236        Requires an accelerator to be available for the test to continue
237
238        It takes into account the currently set qemu binary.
239
240        If the check fails, the test is canceled.  If the check itself
241        for the given accelerator is not available, the test is also
242        canceled.
243
244        :param accelerator: name of the accelerator, such as "kvm" or "tcg"
245        :type accelerator: str
246        """
247        checker = {'tcg': tcg_available,
248                   'kvm': kvm_available}.get(accelerator)
249        if checker is None:
250            self.skipTest("Don't know how to check for the presence "
251                          "of accelerator %s" % accelerator)
252        if not checker(qemu_bin=self.qemu_bin):
253            self.skipTest("%s accelerator does not seem to be "
254                          "available" % accelerator)
255
256    def require_netdev(self, netdevname):
257        netdevhelp = run_cmd([self.qemu_bin,
258                             '-M', 'none', '-netdev', 'help'])[0];
259        if netdevhelp.find('\n' + netdevname + '\n') < 0:
260            self.skipTest('no support for " + netdevname + " networking')
261
262    def require_device(self, devicename):
263        devhelp = run_cmd([self.qemu_bin,
264                           '-M', 'none', '-device', 'help'])[0];
265        if devhelp.find(devicename) < 0:
266            self.skipTest('no support for device ' + devicename)
267
268    def _new_vm(self, name, *args):
269        vm = QEMUMachine(self.qemu_bin,
270                         name=name,
271                         base_temp_dir=self.workdir,
272                         log_dir=self.logdir)
273        self.log.debug('QEMUMachine "%s" created', name)
274        self.log.debug('QEMUMachine "%s" temp_dir: %s', name, vm.temp_dir)
275
276        sockpath = os.environ.get("QEMU_TEST_QMP_BACKDOOR", None)
277        if sockpath is not None:
278            vm.add_args("-chardev",
279                        f"socket,id=backdoor,path={sockpath},server=on,wait=off",
280                        "-mon", "chardev=backdoor,mode=control")
281
282        if args:
283            vm.add_args(*args)
284        return vm
285
286    @property
287    def vm(self):
288        return self.get_vm(name='default')
289
290    def get_vm(self, *args, name=None):
291        if not name:
292            name = str(uuid.uuid4())
293        if self._vms.get(name) is None:
294            self._vms[name] = self._new_vm(name, *args)
295            if self.cpu is not None:
296                self._vms[name].add_args('-cpu', self.cpu)
297            if self.machine is not None:
298                self._vms[name].set_machine(self.machine)
299        return self._vms[name]
300
301    def set_vm_arg(self, arg, value):
302        """
303        Set an argument to list of extra arguments to be given to the QEMU
304        binary. If the argument already exists then its value is replaced.
305
306        :param arg: the QEMU argument, such as "-cpu" in "-cpu host"
307        :type arg: str
308        :param value: the argument value, such as "host" in "-cpu host"
309        :type value: str
310        """
311        if not arg or not value:
312            return
313        if arg not in self.vm.args:
314            self.vm.args.extend([arg, value])
315        else:
316            idx = self.vm.args.index(arg) + 1
317            if idx < len(self.vm.args):
318                self.vm.args[idx] = value
319            else:
320                self.vm.args.append(value)
321
322    def tearDown(self):
323        for vm in self._vms.values():
324            vm.shutdown()
325        logging.getLogger('console').removeHandler(self._console_log_fh)
326        super().tearDown()
327