xref: /qemu/tests/functional/qemu_test/cmd.py (revision 0da341a78f00d6feae98f38d1dfbe2e9f88d0b93)
1# Test class and utilities for functional tests
2#
3# Copyright 2018, 2024 Red Hat, Inc.
4#
5# Original Author (Avocado-based tests):
6#  Cleber Rosa <crosa@redhat.com>
7#
8# Adaption for standalone version:
9#  Thomas Huth <thuth@redhat.com>
10#
11# This work is licensed under the terms of the GNU GPL, version 2 or
12# later.  See the COPYING file in the top-level directory.
13
14import logging
15import os
16import os.path
17import subprocess
18
19from .config import BUILD_DIR
20
21def which(tool):
22    """ looks up the full path for @tool, returns None if not found
23        or if @tool does not have executable permissions.
24    """
25    paths=os.getenv('PATH')
26    for p in paths.split(os.path.pathsep):
27        p = os.path.join(p, tool)
28        if os.path.exists(p) and os.access(p, os.X_OK):
29            return p
30    return None
31
32def has_cmd(name, args=None):
33    """
34    This function is for use in a @skipUnless decorator, e.g.:
35
36        @skipUnless(*has_cmd('sudo -n', ('sudo', '-n', 'true')))
37        def test_something_that_needs_sudo(self):
38            ...
39    """
40
41    if args is None:
42        args = ('which', name)
43
44    try:
45        _, stderr, exitcode = run_cmd(args)
46    except Exception as e:
47        exitcode = -1
48        stderr = str(e)
49
50    if exitcode != 0:
51        cmd_line = ' '.join(args)
52        err = f'{name} required, but "{cmd_line}" failed: {stderr.strip()}'
53        return (False, err)
54    else:
55        return (True, '')
56
57def has_cmds(*cmds):
58    """
59    This function is for use in a @skipUnless decorator and
60    allows checking for the availability of multiple commands, e.g.:
61
62        @skipUnless(*has_cmds(('cmd1', ('cmd1', '--some-parameter')),
63                              'cmd2', 'cmd3'))
64        def test_something_that_needs_cmd1_and_cmd2(self):
65            ...
66    """
67
68    for cmd in cmds:
69        if isinstance(cmd, str):
70            cmd = (cmd,)
71
72        ok, errstr = has_cmd(*cmd)
73        if not ok:
74            return (False, errstr)
75
76    return (True, '')
77
78def run_cmd(args):
79    subp = subprocess.Popen(args,
80                            stdout=subprocess.PIPE,
81                            stderr=subprocess.PIPE,
82                            universal_newlines=True)
83    stdout, stderr = subp.communicate()
84    ret = subp.returncode
85
86    return (stdout, stderr, ret)
87
88def is_readable_executable_file(path):
89    return os.path.isfile(path) and os.access(path, os.R_OK | os.X_OK)
90
91# @test: functional test to fail if @failure is seen
92# @vm: the VM whose console to process
93# @success: a non-None string to look for
94# @failure: a string to look for that triggers test failure, or None
95#
96# Read up to 1 line of text from @vm, looking for @success
97# and optionally @failure.
98#
99# If @success or @failure are seen, immediately return True,
100# even if end of line is not yet seen. ie remainder of the
101# line is left unread.
102#
103# If end of line is seen, with neither @success or @failure
104# return False
105#
106# If @failure is seen, then mark @test as failed
107def _console_read_line_until_match(test, vm, success, failure):
108    msg = bytes([])
109    done = False
110    while True:
111        c = vm.console_socket.recv(1)
112        if c is None:
113            done = True
114            test.fail(
115                f"EOF in console, expected '{success}'")
116            break
117        msg += c
118
119        if success in msg:
120            done = True
121            break
122        if failure and failure in msg:
123            done = True
124            vm.console_socket.close()
125            test.fail(
126                f"'{failure}' found in console, expected '{success}'")
127
128        if c == b'\n':
129            break
130
131    console_logger = logging.getLogger('console')
132    try:
133        console_logger.debug(msg.decode().strip())
134    except:
135        console_logger.debug(msg)
136
137    return done
138
139def _console_interaction(test, success_message, failure_message,
140                         send_string, keep_sending=False, vm=None):
141    assert not keep_sending or send_string
142    assert success_message or send_string
143
144    if vm is None:
145        vm = test.vm
146
147    test.log.debug(
148        f"Console interaction: success_msg='{success_message}' " +
149        f"failure_msg='{failure_message}' send_string='{send_string}'")
150
151    # We'll process console in bytes, to avoid having to
152    # deal with unicode decode errors from receiving
153    # partial utf8 byte sequences
154    success_message_b = None
155    if success_message is not None:
156        success_message_b = success_message.encode()
157
158    failure_message_b = None
159    if failure_message is not None:
160        failure_message_b = failure_message.encode()
161
162    while True:
163        if send_string:
164            vm.console_socket.sendall(send_string.encode())
165            if not keep_sending:
166                send_string = None # send only once
167
168        # Only consume console output if waiting for something
169        if success_message is None:
170            if send_string is None:
171                break
172            continue
173
174        if _console_read_line_until_match(test, vm,
175                                          success_message_b,
176                                          failure_message_b):
177            break
178
179def interrupt_interactive_console_until_pattern(test, success_message,
180                                                failure_message=None,
181                                                interrupt_string='\r'):
182    """
183    Keep sending a string to interrupt a console prompt, while logging the
184    console output. Typical use case is to break a boot loader prompt, such:
185
186        Press a key within 5 seconds to interrupt boot process.
187        5
188        4
189        3
190        2
191        1
192        Booting default image...
193
194    :param test: a  test containing a VM that will have its console
195                 read and probed for a success or failure message
196    :type test: :class:`qemu_test.QemuSystemTest`
197    :param success_message: if this message appears, test succeeds
198    :param failure_message: if this message appears, test fails
199    :param interrupt_string: a string to send to the console before trying
200                             to read a new line
201    """
202    assert success_message
203    _console_interaction(test, success_message, failure_message,
204                         interrupt_string, True)
205
206def wait_for_console_pattern(test, success_message, failure_message=None,
207                             vm=None):
208    """
209    Waits for messages to appear on the console, while logging the content
210
211    :param test: a test containing a VM that will have its console
212                 read and probed for a success or failure message
213    :type test: :class:`qemu_test.QemuSystemTest`
214    :param success_message: if this message appears, test succeeds
215    :param failure_message: if this message appears, test fails
216    """
217    assert success_message
218    _console_interaction(test, success_message, failure_message, None, vm=vm)
219
220def exec_command(test, command):
221    """
222    Send a command to a console (appending CRLF characters), while logging
223    the content.
224
225    :param test: a test containing a VM.
226    :type test: :class:`qemu_test.QemuSystemTest`
227    :param command: the command to send
228    :type command: str
229    """
230    _console_interaction(test, None, None, command + '\r')
231
232def exec_command_and_wait_for_pattern(test, command,
233                                      success_message, failure_message=None):
234    """
235    Send a command to a console (appending CRLF characters), then wait
236    for success_message to appear on the console, while logging the.
237    content. Mark the test as failed if failure_message is found instead.
238
239    :param test: a test containing a VM that will have its console
240                 read and probed for a success or failure message
241    :type test: :class:`qemu_test.QemuSystemTest`
242    :param command: the command to send
243    :param success_message: if this message appears, test succeeds
244    :param failure_message: if this message appears, test fails
245    """
246    assert success_message
247    _console_interaction(test, success_message, failure_message, command + '\r')
248
249def get_qemu_img(test):
250    test.log.debug('Looking for and selecting a qemu-img binary')
251
252    # If qemu-img has been built, use it, otherwise the system wide one
253    # will be used.
254    qemu_img = os.path.join(BUILD_DIR, 'qemu-img')
255    if os.path.exists(qemu_img):
256        return qemu_img
257    (has_system_qemu_img, errmsg) = has_cmd('qemu-img')
258    if has_system_qemu_img:
259        return 'qemu-img'
260    test.skipTest(errmsg)
261