1# SPDX-License-Identifier: GPL-2.0 2# 3# Runs UML kernel, collects output, and handles errors. 4# 5# Copyright (C) 2019, Google LLC. 6# Author: Felix Guo <felixguoxiuping@gmail.com> 7# Author: Brendan Higgins <brendanhiggins@google.com> 8 9import importlib.abc 10import importlib.util 11import logging 12import subprocess 13import os 14import shlex 15import shutil 16import signal 17import sys 18import threading 19from typing import Iterator, List, Optional, Tuple, Any 20from types import FrameType 21 22import kunit_config 23import qemu_config 24 25KCONFIG_PATH = '.config' 26KUNITCONFIG_PATH = '.kunitconfig' 27OLD_KUNITCONFIG_PATH = 'last_used_kunitconfig' 28DEFAULT_KUNITCONFIG_PATH = 'tools/testing/kunit/configs/default.config' 29ALL_TESTS_CONFIG_PATH = 'tools/testing/kunit/configs/all_tests.config' 30UML_KCONFIG_PATH = 'tools/testing/kunit/configs/arch_uml.config' 31OUTFILE_PATH = 'test.log' 32ABS_TOOL_PATH = os.path.abspath(os.path.dirname(__file__)) 33QEMU_CONFIGS_DIR = os.path.join(ABS_TOOL_PATH, 'qemu_configs') 34 35class ConfigError(Exception): 36 """Represents an error trying to configure the Linux kernel.""" 37 38 39class BuildError(Exception): 40 """Represents an error trying to build the Linux kernel.""" 41 42 43class LinuxSourceTreeOperations: 44 """An abstraction over command line operations performed on a source tree.""" 45 46 def __init__(self, linux_arch: str, cross_compile: Optional[str]): 47 self._linux_arch = linux_arch 48 self._cross_compile = cross_compile 49 50 def make_mrproper(self) -> None: 51 try: 52 subprocess.check_output(['make', 'mrproper'], stderr=subprocess.STDOUT) 53 except OSError as e: 54 raise ConfigError('Could not call make command: ' + str(e)) 55 except subprocess.CalledProcessError as e: 56 raise ConfigError(e.output.decode()) 57 58 def make_arch_config(self, base_kunitconfig: kunit_config.Kconfig) -> kunit_config.Kconfig: 59 return base_kunitconfig 60 61 def make_olddefconfig(self, build_dir: str, make_options: Optional[List[str]]) -> None: 62 command = ['make', 'ARCH=' + self._linux_arch, 'O=' + build_dir, 'olddefconfig'] 63 if self._cross_compile: 64 command += ['CROSS_COMPILE=' + self._cross_compile] 65 if make_options: 66 command.extend(make_options) 67 print('Populating config with:\n$', ' '.join(command)) 68 try: 69 subprocess.check_output(command, stderr=subprocess.STDOUT) 70 except OSError as e: 71 raise ConfigError('Could not call make command: ' + str(e)) 72 except subprocess.CalledProcessError as e: 73 raise ConfigError(e.output.decode()) 74 75 def make(self, jobs: int, build_dir: str, make_options: Optional[List[str]]) -> None: 76 command = ['make', 'all', 'compile_commands.json', 'scripts_gdb', 77 'ARCH=' + self._linux_arch, 'O=' + build_dir, '--jobs=' + str(jobs)] 78 if make_options: 79 command.extend(make_options) 80 if self._cross_compile: 81 command += ['CROSS_COMPILE=' + self._cross_compile] 82 print('Building with:\n$', ' '.join(command)) 83 try: 84 proc = subprocess.Popen(command, 85 stderr=subprocess.PIPE, 86 stdout=subprocess.DEVNULL) 87 except OSError as e: 88 raise BuildError('Could not call execute make: ' + str(e)) 89 except subprocess.CalledProcessError as e: 90 raise BuildError(e.output) 91 _, stderr = proc.communicate() 92 if proc.returncode != 0: 93 raise BuildError(stderr.decode()) 94 if stderr: # likely only due to build warnings 95 print(stderr.decode()) 96 97 def start(self, params: List[str], build_dir: str) -> subprocess.Popen: 98 raise RuntimeError('not implemented!') 99 100 101class LinuxSourceTreeOperationsQemu(LinuxSourceTreeOperations): 102 103 def __init__(self, qemu_arch_params: qemu_config.QemuArchParams, cross_compile: Optional[str]): 104 super().__init__(linux_arch=qemu_arch_params.linux_arch, 105 cross_compile=cross_compile) 106 self._kconfig = qemu_arch_params.kconfig 107 self._qemu_arch = qemu_arch_params.qemu_arch 108 self._kernel_path = qemu_arch_params.kernel_path 109 self._kernel_command_line = qemu_arch_params.kernel_command_line 110 if 'kunit_shutdown=' not in self._kernel_command_line: 111 self._kernel_command_line += ' kunit_shutdown=reboot' 112 self._extra_qemu_params = qemu_arch_params.extra_qemu_params 113 self._serial = qemu_arch_params.serial 114 115 def make_arch_config(self, base_kunitconfig: kunit_config.Kconfig) -> kunit_config.Kconfig: 116 kconfig = kunit_config.parse_from_string(self._kconfig) 117 kconfig.merge_in_entries(base_kunitconfig) 118 return kconfig 119 120 def start(self, params: List[str], build_dir: str) -> subprocess.Popen: 121 kernel_path = os.path.join(build_dir, self._kernel_path) 122 qemu_command = ['qemu-system-' + self._qemu_arch, 123 '-nodefaults', 124 '-m', '1024', 125 '-kernel', kernel_path, 126 '-append', ' '.join(params + [self._kernel_command_line]), 127 '-no-reboot', 128 '-nographic', 129 '-accel', 'kvm', 130 '-accel', 'hvf', 131 '-accel', 'tcg', 132 '-serial', self._serial] + self._extra_qemu_params 133 # Note: shlex.join() does what we want, but requires python 3.8+. 134 print('Running tests with:\n$', ' '.join(shlex.quote(arg) for arg in qemu_command)) 135 return subprocess.Popen(qemu_command, 136 stdin=subprocess.PIPE, 137 stdout=subprocess.PIPE, 138 stderr=subprocess.STDOUT, 139 text=True, errors='backslashreplace') 140 141class LinuxSourceTreeOperationsUml(LinuxSourceTreeOperations): 142 """An abstraction over command line operations performed on a source tree.""" 143 144 def __init__(self, cross_compile: Optional[str]=None): 145 super().__init__(linux_arch='um', cross_compile=cross_compile) 146 147 def make_arch_config(self, base_kunitconfig: kunit_config.Kconfig) -> kunit_config.Kconfig: 148 kconfig = kunit_config.parse_file(UML_KCONFIG_PATH) 149 kconfig.merge_in_entries(base_kunitconfig) 150 return kconfig 151 152 def start(self, params: List[str], build_dir: str) -> subprocess.Popen: 153 """Runs the Linux UML binary. Must be named 'linux'.""" 154 linux_bin = os.path.join(build_dir, 'linux') 155 params.extend(['mem=1G', 'console=tty', 'kunit_shutdown=halt']) 156 print('Running tests with:\n$', linux_bin, ' '.join(shlex.quote(arg) for arg in params)) 157 return subprocess.Popen([linux_bin] + params, 158 stdin=subprocess.PIPE, 159 stdout=subprocess.PIPE, 160 stderr=subprocess.STDOUT, 161 text=True, errors='backslashreplace') 162 163def get_kconfig_path(build_dir: str) -> str: 164 return os.path.join(build_dir, KCONFIG_PATH) 165 166def get_kunitconfig_path(build_dir: str) -> str: 167 return os.path.join(build_dir, KUNITCONFIG_PATH) 168 169def get_old_kunitconfig_path(build_dir: str) -> str: 170 return os.path.join(build_dir, OLD_KUNITCONFIG_PATH) 171 172def get_parsed_kunitconfig(build_dir: str, 173 kunitconfig_paths: Optional[List[str]]=None) -> kunit_config.Kconfig: 174 if not kunitconfig_paths: 175 path = get_kunitconfig_path(build_dir) 176 if not os.path.exists(path): 177 shutil.copyfile(DEFAULT_KUNITCONFIG_PATH, path) 178 return kunit_config.parse_file(path) 179 180 merged = kunit_config.Kconfig() 181 182 for path in kunitconfig_paths: 183 if os.path.isdir(path): 184 path = os.path.join(path, KUNITCONFIG_PATH) 185 if not os.path.exists(path): 186 raise ConfigError(f'Specified kunitconfig ({path}) does not exist') 187 188 partial = kunit_config.parse_file(path) 189 diff = merged.conflicting_options(partial) 190 if diff: 191 diff_str = '\n\n'.join(f'{a}\n vs from {path}\n{b}' for a, b in diff) 192 raise ConfigError(f'Multiple values specified for {len(diff)} options in kunitconfig:\n{diff_str}') 193 merged.merge_in_entries(partial) 194 return merged 195 196def get_outfile_path(build_dir: str) -> str: 197 return os.path.join(build_dir, OUTFILE_PATH) 198 199def _default_qemu_config_path(arch: str) -> str: 200 config_path = os.path.join(QEMU_CONFIGS_DIR, arch + '.py') 201 if os.path.isfile(config_path): 202 return config_path 203 204 options = [f[:-3] for f in os.listdir(QEMU_CONFIGS_DIR) if f.endswith('.py')] 205 206 if arch == 'help': 207 print('um') 208 for option in options: 209 print(option) 210 sys.exit() 211 212 raise ConfigError(arch + ' is not a valid arch, options are ' + str(sorted(options))) 213 214def _get_qemu_ops(config_path: str, 215 extra_qemu_args: Optional[List[str]], 216 cross_compile: Optional[str]) -> Tuple[str, LinuxSourceTreeOperations]: 217 # The module name/path has very little to do with where the actual file 218 # exists (I learned this through experimentation and could not find it 219 # anywhere in the Python documentation). 220 # 221 # Bascially, we completely ignore the actual file location of the config 222 # we are loading and just tell Python that the module lives in the 223 # QEMU_CONFIGS_DIR for import purposes regardless of where it actually 224 # exists as a file. 225 module_path = '.' + os.path.join(os.path.basename(QEMU_CONFIGS_DIR), os.path.basename(config_path)) 226 spec = importlib.util.spec_from_file_location(module_path, config_path) 227 assert spec is not None 228 config = importlib.util.module_from_spec(spec) 229 # See https://github.com/python/typeshed/pull/2626 for context. 230 assert isinstance(spec.loader, importlib.abc.Loader) 231 spec.loader.exec_module(config) 232 233 if not hasattr(config, 'QEMU_ARCH'): 234 raise ValueError('qemu_config module missing "QEMU_ARCH": ' + config_path) 235 params: qemu_config.QemuArchParams = config.QEMU_ARCH 236 if extra_qemu_args: 237 params.extra_qemu_params.extend(extra_qemu_args) 238 return params.linux_arch, LinuxSourceTreeOperationsQemu( 239 params, cross_compile=cross_compile) 240 241class LinuxSourceTree: 242 """Represents a Linux kernel source tree with KUnit tests.""" 243 244 def __init__( 245 self, 246 build_dir: str, 247 kunitconfig_paths: Optional[List[str]]=None, 248 kconfig_add: Optional[List[str]]=None, 249 arch: Optional[str]=None, 250 cross_compile: Optional[str]=None, 251 qemu_config_path: Optional[str]=None, 252 extra_qemu_args: Optional[List[str]]=None) -> None: 253 signal.signal(signal.SIGINT, self.signal_handler) 254 if qemu_config_path: 255 self._arch, self._ops = _get_qemu_ops(qemu_config_path, extra_qemu_args, cross_compile) 256 else: 257 self._arch = 'um' if arch is None else arch 258 if self._arch == 'um': 259 self._ops = LinuxSourceTreeOperationsUml(cross_compile=cross_compile) 260 else: 261 qemu_config_path = _default_qemu_config_path(self._arch) 262 _, self._ops = _get_qemu_ops(qemu_config_path, extra_qemu_args, cross_compile) 263 264 self._kconfig = get_parsed_kunitconfig(build_dir, kunitconfig_paths) 265 if kconfig_add: 266 kconfig = kunit_config.parse_from_string('\n'.join(kconfig_add)) 267 self._kconfig.merge_in_entries(kconfig) 268 self._process : Optional[subprocess.Popen[Any]] = None 269 270 def arch(self) -> str: 271 return self._arch 272 273 def clean(self) -> bool: 274 try: 275 self._ops.make_mrproper() 276 except ConfigError as e: 277 logging.error(e) 278 return False 279 return True 280 281 def validate_config(self, build_dir: str) -> bool: 282 kconfig_path = get_kconfig_path(build_dir) 283 validated_kconfig = kunit_config.parse_file(kconfig_path) 284 if self._kconfig.is_subset_of(validated_kconfig): 285 return True 286 missing = set(self._kconfig.as_entries()) - set(validated_kconfig.as_entries()) 287 message = 'Not all Kconfig options selected in kunitconfig were in the generated .config.\n' \ 288 'This is probably due to unsatisfied dependencies.\n' \ 289 'Missing: ' + ', '.join(str(e) for e in missing) 290 if self._arch == 'um': 291 message += '\nNote: many Kconfig options aren\'t available on UML. You can try running ' \ 292 'on a different architecture with something like "--arch=x86_64".' 293 logging.error(message) 294 return False 295 296 def build_config(self, build_dir: str, make_options: Optional[List[str]]) -> bool: 297 kconfig_path = get_kconfig_path(build_dir) 298 if build_dir and not os.path.exists(build_dir): 299 os.mkdir(build_dir) 300 try: 301 self._kconfig = self._ops.make_arch_config(self._kconfig) 302 self._kconfig.write_to_file(kconfig_path) 303 self._ops.make_olddefconfig(build_dir, make_options) 304 except ConfigError as e: 305 logging.error(e) 306 return False 307 if not self.validate_config(build_dir): 308 return False 309 310 old_path = get_old_kunitconfig_path(build_dir) 311 if os.path.exists(old_path): 312 os.remove(old_path) # write_to_file appends to the file 313 self._kconfig.write_to_file(old_path) 314 return True 315 316 def _kunitconfig_changed(self, build_dir: str) -> bool: 317 old_path = get_old_kunitconfig_path(build_dir) 318 if not os.path.exists(old_path): 319 return True 320 321 old_kconfig = kunit_config.parse_file(old_path) 322 return old_kconfig != self._kconfig 323 324 def build_reconfig(self, build_dir: str, make_options: Optional[List[str]]) -> bool: 325 """Creates a new .config if it is not a subset of the .kunitconfig.""" 326 kconfig_path = get_kconfig_path(build_dir) 327 if not os.path.exists(kconfig_path): 328 print('Generating .config ...') 329 return self.build_config(build_dir, make_options) 330 331 existing_kconfig = kunit_config.parse_file(kconfig_path) 332 self._kconfig = self._ops.make_arch_config(self._kconfig) 333 334 if self._kconfig.is_subset_of(existing_kconfig) and not self._kunitconfig_changed(build_dir): 335 return True 336 print('Regenerating .config ...') 337 os.remove(kconfig_path) 338 return self.build_config(build_dir, make_options) 339 340 def build_kernel(self, jobs: int, build_dir: str, make_options: Optional[List[str]]) -> bool: 341 try: 342 self._ops.make_olddefconfig(build_dir, make_options) 343 self._ops.make(jobs, build_dir, make_options) 344 except (ConfigError, BuildError) as e: 345 logging.error(e) 346 return False 347 return self.validate_config(build_dir) 348 349 def _restore_terminal_if_tty(self) -> None: 350 # stty requires a controlling terminal; skip headless runs. 351 if sys.stdin is None or not sys.stdin.isatty(): 352 return 353 subprocess.call(['stty', 'sane']) 354 355 def run_kernel(self, args: Optional[List[str]]=None, build_dir: str='', filter_glob: str='', filter: str='', filter_action: Optional[str]=None, timeout: Optional[int]=None) -> Iterator[str]: 356 # Copy to avoid mutating the caller-supplied list. exec_tests() reuses 357 # the same args across repeated run_kernel() calls (e.g. --run_isolated), 358 # so appending to the original would accumulate stale flags on each call. 359 args = list(args) if args else [] 360 if filter_glob: 361 args.append('kunit.filter_glob=' + filter_glob) 362 if filter: 363 args.append('kunit.filter="' + filter + '"') 364 if filter_action: 365 args.append('kunit.filter_action=' + filter_action) 366 args.append('kunit.enable=1') 367 368 self._process = self._ops.start(args, build_dir) 369 assert self._process is not None # tell mypy it's set 370 assert self._process.stdout is not None # tell mypy it's set 371 372 # Enforce the timeout in a background thread. 373 def _wait_proc() -> None: 374 try: 375 if self._process: 376 self._process.wait(timeout=timeout) 377 except Exception as e: 378 print(e) 379 if self._process: 380 self._process.terminate() 381 self._process.wait() 382 waiter = threading.Thread(target=_wait_proc) 383 waiter.start() 384 385 output = open(get_outfile_path(build_dir), 'w') 386 try: 387 # Tee the output to the file and to our caller in real time. 388 for line in self._process.stdout: 389 output.write(line) 390 yield line 391 # This runs even if our caller doesn't consume every line. 392 finally: 393 # Flush any leftover output to the file 394 if self._process: 395 if self._process.stdout: 396 output.write(self._process.stdout.read()) 397 self._process.stdout.close() 398 self._process = None 399 output.close() 400 401 waiter.join() 402 self._restore_terminal_if_tty() 403 404 def signal_handler(self, unused_sig: int, unused_frame: Optional[FrameType]) -> None: 405 logging.error('Build interruption occurred. Cleaning console.') 406 if self._process: 407 self._process.terminate() 408 self._process.wait() 409 self._restore_terminal_if_tty() 410