xref: /qemu/tests/functional/qemu_test/archive.py (revision c283afbf658cdc2156250a84be433fc0bdd002ac)
1# SPDX-License-Identifier: GPL-2.0-or-later
2#
3# Utilities for python-based QEMU tests
4#
5# Copyright 2024 Red Hat, Inc.
6#
7# Authors:
8#  Thomas Huth <thuth@redhat.com>
9
10import os
11from subprocess import check_call, run, DEVNULL
12import tarfile
13from urllib.parse import urlparse
14import zipfile
15
16from .asset import Asset
17from .cmd import run_cmd
18
19
20def tar_extract(archive, dest_dir, member=None):
21    with tarfile.open(archive) as tf:
22        if hasattr(tarfile, 'data_filter'):
23            tf.extraction_filter = getattr(tarfile, 'data_filter',
24                                           (lambda member, path: member))
25        if member:
26            tf.extract(member=member, path=dest_dir)
27        else:
28            tf.extractall(path=dest_dir)
29
30def cpio_extract(archive, output_path):
31    cwd = os.getcwd()
32    os.chdir(output_path)
33    # Not passing 'check=True' as cpio exits with non-zero
34    # status if the archive contains any device nodes :-(
35    if type(archive) == str:
36        run(['cpio', '-i', '-F', archive],
37            stdout=DEVNULL, stderr=DEVNULL)
38    else:
39        run(['cpio', '-i'],
40            input=archive.read(),
41            stdout=DEVNULL, stderr=DEVNULL)
42    os.chdir(cwd)
43
44def zip_extract(archive, dest_dir, member=None):
45    with zipfile.ZipFile(archive, 'r') as zf:
46        if member:
47            zf.extract(member=member, path=dest_dir)
48        else:
49            zf.extractall(path=dest_dir)
50
51def deb_extract(archive, dest_dir, member=None):
52    cwd = os.getcwd()
53    os.chdir(dest_dir)
54    try:
55        (stdout, stderr, ret) = run_cmd(['ar', 't', archive])
56        file_path = stdout.split()[2]
57        run_cmd(['ar', 'x', archive, file_path])
58        tar_extract(file_path, dest_dir, member)
59    finally:
60        os.chdir(cwd)
61
62'''
63@params archive: filename, Asset, or file-like object to extract
64@params dest_dir: target directory to extract into
65@params member: optional member file to limit extraction to
66
67Extracts @archive into @dest_dir. All files are extracted
68unless @member specifies a limit.
69
70If @format is None, heuristics will be applied to guess the format
71from the filename or Asset URL. @format must be non-None if @archive
72is a file-like object.
73'''
74def archive_extract(archive, dest_dir, format=None, member=None):
75    if format is None:
76        format = guess_archive_format(archive)
77    if type(archive) == Asset:
78        archive = str(archive)
79
80    if format == "tar":
81        tar_extract(archive, dest_dir, member)
82    elif format == "zip":
83        zip_extract(archive, dest_dir, member)
84    elif format == "cpio":
85        if member is not None:
86            raise Exception("Unable to filter cpio extraction")
87        cpio_extract(archive, dest_dir)
88    elif format == "deb":
89        if type(archive) != str:
90            raise Exception("Unable to use file-like object with deb archives")
91        deb_extract(archive, dest_dir, "./" + member)
92    else:
93        raise Exception(f"Unknown archive format {format}")
94
95'''
96@params archive: filename, or Asset to guess
97
98Guess the format of @compressed, raising an exception if
99no format can be determined
100'''
101def guess_archive_format(archive):
102    if type(archive) == Asset:
103        archive = urlparse(archive.url).path
104    elif type(archive) != str:
105        raise Exception(f"Unable to guess archive format for {archive}")
106
107    if ".tar." in archive or archive.endswith("tgz"):
108        return "tar"
109    elif archive.endswith(".zip"):
110        return "zip"
111    elif archive.endswith(".cpio"):
112        return "cpio"
113    elif archive.endswith(".deb") or archive.endswith(".udeb"):
114        return "deb"
115    else:
116        raise Exception(f"Unknown archive format for {archive}")
117