1# SPDX-License-Identifier: GPL-2.0-or-later 2# 3# Utilities for python-based QEMU tests 4# 5# Copyright 2024 Red Hat, Inc. 6# 7# Authors: 8# Thomas Huth <thuth@redhat.com> 9 10import os 11from subprocess import check_call, run, DEVNULL 12import tarfile 13from urllib.parse import urlparse 14import zipfile 15 16from .asset import Asset 17 18 19def tar_extract(archive, dest_dir, member=None): 20 with tarfile.open(archive) as tf: 21 if hasattr(tarfile, 'data_filter'): 22 tf.extraction_filter = getattr(tarfile, 'data_filter', 23 (lambda member, path: member)) 24 if member: 25 tf.extract(member=member, path=dest_dir) 26 else: 27 tf.extractall(path=dest_dir) 28 29def cpio_extract(archive, output_path): 30 cwd = os.getcwd() 31 os.chdir(output_path) 32 # Not passing 'check=True' as cpio exits with non-zero 33 # status if the archive contains any device nodes :-( 34 if type(archive) == str: 35 run(['cpio', '-i', '-F', archive], 36 stdout=DEVNULL, stderr=DEVNULL) 37 else: 38 run(['cpio', '-i'], 39 input=archive.read(), 40 stdout=DEVNULL, stderr=DEVNULL) 41 os.chdir(cwd) 42 43def zip_extract(archive, dest_dir, member=None): 44 with zipfile.ZipFile(archive, 'r') as zf: 45 if member: 46 zf.extract(member=member, path=dest_dir) 47 else: 48 zf.extractall(path=dest_dir) 49 50def deb_extract(archive, dest_dir, member=None): 51 cwd = os.getcwd() 52 os.chdir(dest_dir) 53 try: 54 proc = run(['ar', 't', archive], 55 check=True, capture_output=True, encoding='utf8') 56 file_path = proc.stdout.split()[2] 57 check_call(['ar', 'x', archive, file_path], 58 stdout=DEVNULL, stderr=DEVNULL) 59 tar_extract(file_path, dest_dir, member) 60 finally: 61 os.chdir(cwd) 62 63''' 64@params archive: filename, Asset, or file-like object to extract 65@params dest_dir: target directory to extract into 66@params member: optional member file to limit extraction to 67 68Extracts @archive into @dest_dir. All files are extracted 69unless @member specifies a limit. 70 71If @format is None, heuristics will be applied to guess the format 72from the filename or Asset URL. @format must be non-None if @archive 73is a file-like object. 74''' 75def archive_extract(archive, dest_dir, format=None, member=None): 76 if format is None: 77 format = guess_archive_format(archive) 78 if type(archive) == Asset: 79 archive = str(archive) 80 81 if format == "tar": 82 tar_extract(archive, dest_dir, member) 83 elif format == "zip": 84 zip_extract(archive, dest_dir, member) 85 elif format == "cpio": 86 if member is not None: 87 raise Exception("Unable to filter cpio extraction") 88 cpio_extract(archive, dest_dir) 89 elif format == "deb": 90 if type(archive) != str: 91 raise Exception("Unable to use file-like object with deb archives") 92 deb_extract(archive, dest_dir, "./" + member) 93 else: 94 raise Exception(f"Unknown archive format {format}") 95 96''' 97@params archive: filename, or Asset to guess 98 99Guess the format of @compressed, raising an exception if 100no format can be determined 101''' 102def guess_archive_format(archive): 103 if type(archive) == Asset: 104 archive = urlparse(archive.url).path 105 elif type(archive) != str: 106 raise Exception(f"Unable to guess archive format for {archive}") 107 108 if ".tar." in archive or archive.endswith("tgz"): 109 return "tar" 110 elif archive.endswith(".zip"): 111 return "zip" 112 elif archive.endswith(".cpio"): 113 return "cpio" 114 elif archive.endswith(".deb") or archive.endswith(".udeb"): 115 return "deb" 116 else: 117 raise Exception(f"Unknown archive format for {archive}") 118