1# Test class and utilities for functional tests 2# 3# Copyright 2018, 2024 Red Hat, Inc. 4# 5# Original Author (Avocado-based tests): 6# Cleber Rosa <crosa@redhat.com> 7# 8# Adaption for standalone version: 9# Thomas Huth <thuth@redhat.com> 10# 11# This work is licensed under the terms of the GNU GPL, version 2 or 12# later. See the COPYING file in the top-level directory. 13 14import logging 15import os 16import os.path 17import subprocess 18 19from .config import BUILD_DIR 20 21def which(tool): 22 """ looks up the full path for @tool, returns None if not found 23 or if @tool does not have executable permissions. 24 """ 25 paths=os.getenv('PATH') 26 for p in paths.split(os.path.pathsep): 27 p = os.path.join(p, tool) 28 if os.path.exists(p) and os.access(p, os.X_OK): 29 return p 30 return None 31 32def has_cmd(name, args=None): 33 """ 34 This function is for use in a @skipUnless decorator, e.g.: 35 36 @skipUnless(*has_cmd('sudo -n', ('sudo', '-n', 'true'))) 37 def test_something_that_needs_sudo(self): 38 ... 39 """ 40 41 if args is None: 42 args = ('which', name) 43 44 try: 45 _, stderr, exitcode = run_cmd(args) 46 except Exception as e: 47 exitcode = -1 48 stderr = str(e) 49 50 if exitcode != 0: 51 cmd_line = ' '.join(args) 52 err = f'{name} required, but "{cmd_line}" failed: {stderr.strip()}' 53 return (False, err) 54 else: 55 return (True, '') 56 57def has_cmds(*cmds): 58 """ 59 This function is for use in a @skipUnless decorator and 60 allows checking for the availability of multiple commands, e.g.: 61 62 @skipUnless(*has_cmds(('cmd1', ('cmd1', '--some-parameter')), 63 'cmd2', 'cmd3')) 64 def test_something_that_needs_cmd1_and_cmd2(self): 65 ... 66 """ 67 68 for cmd in cmds: 69 if isinstance(cmd, str): 70 cmd = (cmd,) 71 72 ok, errstr = has_cmd(*cmd) 73 if not ok: 74 return (False, errstr) 75 76 return (True, '') 77 78def run_cmd(args): 79 subp = subprocess.Popen(args, 80 stdout=subprocess.PIPE, 81 stderr=subprocess.PIPE, 82 universal_newlines=True) 83 stdout, stderr = subp.communicate() 84 ret = subp.returncode 85 86 return (stdout, stderr, ret) 87 88def is_readable_executable_file(path): 89 return os.path.isfile(path) and os.access(path, os.R_OK | os.X_OK) 90 91# @test: functional test to fail if @failure is seen 92# @vm: the VM whose console to process 93# @success: a non-None string to look for 94# @failure: a string to look for that triggers test failure, or None 95# 96# Read up to 1 line of text from @vm, looking for @success 97# and optionally @failure. 98# 99# If @success or @failure are seen, immediately return True, 100# even if end of line is not yet seen. ie remainder of the 101# line is left unread. 102# 103# If end of line is seen, with neither @success or @failure 104# return False 105# 106# If @failure is seen, then mark @test as failed 107def _console_read_line_until_match(test, vm, success, failure): 108 msg = bytes([]) 109 done = False 110 while True: 111 c = vm.console_socket.recv(1) 112 if c is None: 113 done = True 114 test.fail( 115 f"EOF in console, expected '{success}'") 116 break 117 msg += c 118 119 if success in msg: 120 done = True 121 break 122 if failure and failure in msg: 123 done = True 124 vm.console_socket.close() 125 test.fail( 126 f"'{failure}' found in console, expected '{success}'") 127 128 if c == b'\n': 129 break 130 131 console_logger = logging.getLogger('console') 132 try: 133 console_logger.debug(msg.decode().strip()) 134 except: 135 console_logger.debug(msg) 136 137 return done 138 139def _console_interaction(test, success_message, failure_message, 140 send_string, keep_sending=False, vm=None): 141 assert not keep_sending or send_string 142 assert success_message or send_string 143 144 if vm is None: 145 vm = test.vm 146 147 test.log.debug( 148 f"Console interaction: success_msg='{success_message}' " + 149 f"failure_msg='{failure_message}' send_string='{send_string}'") 150 151 # We'll process console in bytes, to avoid having to 152 # deal with unicode decode errors from receiving 153 # partial utf8 byte sequences 154 success_message_b = None 155 if success_message is not None: 156 success_message_b = success_message.encode() 157 158 failure_message_b = None 159 if failure_message is not None: 160 failure_message_b = failure_message.encode() 161 162 while True: 163 if send_string: 164 vm.console_socket.sendall(send_string.encode()) 165 if not keep_sending: 166 send_string = None # send only once 167 168 # Only consume console output if waiting for something 169 if success_message is None: 170 if send_string is None: 171 break 172 continue 173 174 if _console_read_line_until_match(test, vm, 175 success_message_b, 176 failure_message_b): 177 break 178 179def interrupt_interactive_console_until_pattern(test, success_message, 180 failure_message=None, 181 interrupt_string='\r'): 182 """ 183 Keep sending a string to interrupt a console prompt, while logging the 184 console output. Typical use case is to break a boot loader prompt, such: 185 186 Press a key within 5 seconds to interrupt boot process. 187 5 188 4 189 3 190 2 191 1 192 Booting default image... 193 194 :param test: a test containing a VM that will have its console 195 read and probed for a success or failure message 196 :type test: :class:`qemu_test.QemuSystemTest` 197 :param success_message: if this message appears, test succeeds 198 :param failure_message: if this message appears, test fails 199 :param interrupt_string: a string to send to the console before trying 200 to read a new line 201 """ 202 assert success_message 203 _console_interaction(test, success_message, failure_message, 204 interrupt_string, True) 205 206def wait_for_console_pattern(test, success_message, failure_message=None, 207 vm=None): 208 """ 209 Waits for messages to appear on the console, while logging the content 210 211 :param test: a test containing a VM that will have its console 212 read and probed for a success or failure message 213 :type test: :class:`qemu_test.QemuSystemTest` 214 :param success_message: if this message appears, test succeeds 215 :param failure_message: if this message appears, test fails 216 """ 217 assert success_message 218 _console_interaction(test, success_message, failure_message, None, vm=vm) 219 220def exec_command(test, command): 221 """ 222 Send a command to a console (appending CRLF characters), while logging 223 the content. 224 225 :param test: a test containing a VM. 226 :type test: :class:`qemu_test.QemuSystemTest` 227 :param command: the command to send 228 :type command: str 229 """ 230 _console_interaction(test, None, None, command + '\r') 231 232def exec_command_and_wait_for_pattern(test, command, 233 success_message, failure_message=None): 234 """ 235 Send a command to a console (appending CRLF characters), then wait 236 for success_message to appear on the console, while logging the. 237 content. Mark the test as failed if failure_message is found instead. 238 239 :param test: a test containing a VM that will have its console 240 read and probed for a success or failure message 241 :type test: :class:`qemu_test.QemuSystemTest` 242 :param command: the command to send 243 :param success_message: if this message appears, test succeeds 244 :param failure_message: if this message appears, test fails 245 """ 246 assert success_message 247 _console_interaction(test, success_message, failure_message, command + '\r') 248 249def get_qemu_img(test): 250 test.log.debug('Looking for and selecting a qemu-img binary') 251 252 # If qemu-img has been built, use it, otherwise the system wide one 253 # will be used. 254 qemu_img = os.path.join(BUILD_DIR, 'qemu-img') 255 if os.path.exists(qemu_img): 256 return qemu_img 257 (has_system_qemu_img, errmsg) = has_cmd('qemu-img') 258 if has_system_qemu_img: 259 return 'qemu-img' 260 test.skipTest(errmsg) 261