1# SPDX-License-Identifier: GPL-2.0-or-later 2# 3# Utilities for python-based QEMU tests 4# 5# Copyright 2024 Red Hat, Inc. 6# 7# Authors: 8# Thomas Huth <thuth@redhat.com> 9 10import gzip 11import lzma 12import os 13import stat 14import shutil 15from urllib.parse import urlparse 16from subprocess import check_call, CalledProcessError 17 18from .asset import Asset 19 20 21def gzip_uncompress(gz_path, output_path): 22 if os.path.exists(output_path): 23 return 24 with gzip.open(gz_path, 'rb') as gz_in: 25 try: 26 with open(output_path, 'wb') as raw_out: 27 shutil.copyfileobj(gz_in, raw_out) 28 except: 29 os.remove(output_path) 30 raise 31 32def lzma_uncompress(xz_path, output_path): 33 if os.path.exists(output_path): 34 return 35 with lzma.open(xz_path, 'rb') as lzma_in: 36 try: 37 with open(output_path, 'wb') as raw_out: 38 shutil.copyfileobj(lzma_in, raw_out) 39 except: 40 os.remove(output_path) 41 raise 42 43 44def zstd_uncompress(zstd_path, output_path): 45 if os.path.exists(output_path): 46 return 47 48 try: 49 check_call(['zstd', "-f", "-d", zstd_path, 50 "-o", output_path]) 51 except CalledProcessError as e: 52 os.remove(output_path) 53 raise Exception( 54 f"Unable to decompress zstd file {zstd_path} with {e}") from e 55 56 # zstd copies source archive permissions for the output 57 # file, so must make this writable for QEMU 58 os.chmod(output_path, stat.S_IRUSR | stat.S_IWUSR) 59 60 61''' 62@params compressed: filename, Asset, or file-like object to uncompress 63@params uncompressed: filename to uncompress into 64@params format: optional compression format (gzip, lzma) 65 66Uncompresses @compressed into @uncompressed 67 68If @format is None, heuristics will be applied to guess the format 69from the filename or Asset URL. @format must be non-None if @uncompressed 70is a file-like object. 71 72Returns the fully qualified path to the uncompessed file 73''' 74def uncompress(compressed, uncompressed, format=None): 75 if format is None: 76 format = guess_uncompress_format(compressed) 77 78 if format == "xz": 79 lzma_uncompress(str(compressed), uncompressed) 80 elif format == "gz": 81 gzip_uncompress(str(compressed), uncompressed) 82 elif format == "zstd": 83 zstd_uncompress(str(compressed), uncompressed) 84 else: 85 raise Exception(f"Unknown compression format {format}") 86 87''' 88@params compressed: filename, Asset, or file-like object to guess 89 90Guess the format of @compressed, raising an exception if 91no format can be determined 92''' 93def guess_uncompress_format(compressed): 94 if type(compressed) == Asset: 95 compressed = urlparse(compressed.url).path 96 elif type(compressed) != str: 97 raise Exception(f"Unable to guess compression cformat for {compressed}") 98 99 (name, ext) = os.path.splitext(compressed) 100 if ext == ".xz": 101 return "xz" 102 elif ext == ".gz": 103 return "gz" 104 elif ext in [".zstd", ".zst"]: 105 return 'zstd' 106 else: 107 raise Exception(f"Unknown compression format for {compressed}") 108