1# SPDX-License-Identifier: GPL-2.0-or-later 2# 3# Utilities for python-based QEMU tests 4# 5# Copyright 2024 Red Hat, Inc. 6# 7# Authors: 8# Thomas Huth <thuth@redhat.com> 9 10import os 11from subprocess import check_call, run, DEVNULL 12import tarfile 13from urllib.parse import urlparse 14import zipfile 15 16from .asset import Asset 17from .cmd import run_cmd 18 19 20def tar_extract(archive, dest_dir, member=None): 21 with tarfile.open(archive) as tf: 22 if hasattr(tarfile, 'data_filter'): 23 tf.extraction_filter = getattr(tarfile, 'data_filter', 24 (lambda member, path: member)) 25 if member: 26 tf.extract(member=member, path=dest_dir) 27 else: 28 tf.extractall(path=dest_dir) 29 30def cpio_extract(archive, output_path): 31 cwd = os.getcwd() 32 os.chdir(output_path) 33 # Not passing 'check=True' as cpio exits with non-zero 34 # status if the archive contains any device nodes :-( 35 if type(archive) == str: 36 run(['cpio', '-i', '-F', archive], 37 stdout=DEVNULL, stderr=DEVNULL) 38 else: 39 run(['cpio', '-i'], 40 input=archive.read(), 41 stdout=DEVNULL, stderr=DEVNULL) 42 os.chdir(cwd) 43 44def zip_extract(archive, dest_dir, member=None): 45 with zipfile.ZipFile(archive, 'r') as zf: 46 if member: 47 zf.extract(member=member, path=dest_dir) 48 else: 49 zf.extractall(path=dest_dir) 50 51def deb_extract(archive, dest_dir, member=None): 52 cwd = os.getcwd() 53 os.chdir(dest_dir) 54 try: 55 (stdout, stderr, ret) = run_cmd(['ar', 't', archive]) 56 file_path = stdout.split()[2] 57 run_cmd(['ar', 'x', archive, file_path]) 58 tar_extract(file_path, dest_dir, member) 59 finally: 60 os.chdir(cwd) 61 62''' 63@params archive: filename, Asset, or file-like object to extract 64@params dest_dir: target directory to extract into 65@params member: optional member file to limit extraction to 66 67Extracts @archive into @dest_dir. All files are extracted 68unless @member specifies a limit. 69 70If @format is None, heuristics will be applied to guess the format 71from the filename or Asset URL. @format must be non-None if @archive 72is a file-like object. 73''' 74def archive_extract(archive, dest_dir, format=None, member=None): 75 if format is None: 76 format = guess_archive_format(archive) 77 if type(archive) == Asset: 78 archive = str(archive) 79 80 if format == "tar": 81 tar_extract(archive, dest_dir, member) 82 elif format == "zip": 83 zip_extract(archive, dest_dir, member) 84 elif format == "cpio": 85 if member is not None: 86 raise Exception("Unable to filter cpio extraction") 87 cpio_extract(archive, dest_dir) 88 elif format == "deb": 89 if type(archive) != str: 90 raise Exception("Unable to use file-like object with deb archives") 91 deb_extract(archive, dest_dir, "./" + member) 92 else: 93 raise Exception(f"Unknown archive format {format}") 94 95''' 96@params archive: filename, or Asset to guess 97 98Guess the format of @compressed, raising an exception if 99no format can be determined 100''' 101def guess_archive_format(archive): 102 if type(archive) == Asset: 103 archive = urlparse(archive.url).path 104 elif type(archive) != str: 105 raise Exception(f"Unable to guess archive format for {archive}") 106 107 if ".tar." in archive or archive.endswith("tgz"): 108 return "tar" 109 elif archive.endswith(".zip"): 110 return "zip" 111 elif archive.endswith(".cpio"): 112 return "cpio" 113 elif archive.endswith(".deb") or archive.endswith(".udeb"): 114 return "deb" 115 else: 116 raise Exception(f"Unknown archive format for {archive}") 117