xref: /qemu/tests/functional/qemu_test/uncompress.py (revision 70ce076fa6dff60585c229a4b641b13e64bf03cf)
1# SPDX-License-Identifier: GPL-2.0-or-later
2#
3# Utilities for python-based QEMU tests
4#
5# Copyright 2024 Red Hat, Inc.
6#
7# Authors:
8#  Thomas Huth <thuth@redhat.com>
9
10import gzip
11import lzma
12import os
13import stat
14import shutil
15from urllib.parse import urlparse
16from subprocess import check_call, CalledProcessError
17
18from .asset import Asset
19
20
21def gzip_uncompress(gz_path, output_path):
22    if os.path.exists(output_path):
23        return
24    with gzip.open(gz_path, 'rb') as gz_in:
25        try:
26            with open(output_path, 'wb') as raw_out:
27                shutil.copyfileobj(gz_in, raw_out)
28        except:
29            os.remove(output_path)
30            raise
31
32def lzma_uncompress(xz_path, output_path):
33    if os.path.exists(output_path):
34        return
35    with lzma.open(xz_path, 'rb') as lzma_in:
36        try:
37            with open(output_path, 'wb') as raw_out:
38                shutil.copyfileobj(lzma_in, raw_out)
39        except:
40            os.remove(output_path)
41            raise
42
43
44def zstd_uncompress(zstd_path, output_path):
45    if os.path.exists(output_path):
46        return
47
48    try:
49        check_call(['zstd', "-f", "-d", zstd_path,
50                    "-o", output_path])
51    except CalledProcessError as e:
52        os.remove(output_path)
53        raise Exception(
54            f"Unable to decompress zstd file {zstd_path} with {e}") from e
55
56    # zstd copies source archive permissions for the output
57    # file, so must make this writable for QEMU
58    os.chmod(output_path, stat.S_IRUSR | stat.S_IWUSR)
59
60
61'''
62@params compressed: filename, Asset, or file-like object to uncompress
63@params uncompressed: filename to uncompress into
64@params format: optional compression format (gzip, lzma)
65
66Uncompresses @compressed into @uncompressed
67
68If @format is None, heuristics will be applied to guess the format
69from the filename or Asset URL. @format must be non-None if @uncompressed
70is a file-like object.
71
72Returns the fully qualified path to the uncompessed file
73'''
74def uncompress(compressed, uncompressed, format=None):
75    if format is None:
76        format = guess_uncompress_format(compressed)
77
78    if format == "xz":
79        lzma_uncompress(str(compressed), uncompressed)
80    elif format == "gz":
81        gzip_uncompress(str(compressed), uncompressed)
82    elif format == "zstd":
83        zstd_uncompress(str(compressed), uncompressed)
84    else:
85        raise Exception(f"Unknown compression format {format}")
86
87'''
88@params compressed: filename, Asset, or file-like object to guess
89
90Guess the format of @compressed, raising an exception if
91no format can be determined
92'''
93def guess_uncompress_format(compressed):
94    if type(compressed) == Asset:
95        compressed = urlparse(compressed.url).path
96    elif type(compressed) != str:
97        raise Exception(f"Unable to guess compression cformat for {compressed}")
98
99    (name, ext) = os.path.splitext(compressed)
100    if ext == ".xz":
101        return "xz"
102    elif ext == ".gz":
103        return "gz"
104    elif ext in [".zstd", ".zst"]:
105        return 'zstd'
106    else:
107        raise Exception(f"Unknown compression format for {compressed}")
108