xref: /qemu/tests/functional/qemu_test/archive.py (revision c055f1d26fbcc18de02e59569f3e2a224595835a)
1cfcb4484SDaniel P. Berrangé# SPDX-License-Identifier: GPL-2.0-or-later
2cfcb4484SDaniel P. Berrangé#
3cfcb4484SDaniel P. Berrangé# Utilities for python-based QEMU tests
4cfcb4484SDaniel P. Berrangé#
5cfcb4484SDaniel P. Berrangé# Copyright 2024 Red Hat, Inc.
6cfcb4484SDaniel P. Berrangé#
7cfcb4484SDaniel P. Berrangé# Authors:
8cfcb4484SDaniel P. Berrangé#  Thomas Huth <thuth@redhat.com>
9cfcb4484SDaniel P. Berrangé
10cfcb4484SDaniel P. Berrangéimport os
11*c055f1d2SDaniel P. Berrangéfrom subprocess import check_call, run, DEVNULL
12cfcb4484SDaniel P. Berrangéimport tarfile
13379ee839SDaniel P. Berrangéimport zipfile
14cfcb4484SDaniel P. Berrangé
15512fe088SDaniel P. Berrangéfrom .cmd import run_cmd
16512fe088SDaniel P. Berrangé
17cfcb4484SDaniel P. Berrangé
18cfcb4484SDaniel P. Berrangédef tar_extract(archive, dest_dir, member=None):
19cfcb4484SDaniel P. Berrangé    with tarfile.open(archive) as tf:
20cfcb4484SDaniel P. Berrangé        if hasattr(tarfile, 'data_filter'):
21cfcb4484SDaniel P. Berrangé            tf.extraction_filter = getattr(tarfile, 'data_filter',
22cfcb4484SDaniel P. Berrangé                                           (lambda member, path: member))
23cfcb4484SDaniel P. Berrangé        if member:
24cfcb4484SDaniel P. Berrangé            tf.extract(member=member, path=dest_dir)
25cfcb4484SDaniel P. Berrangé        else:
26cfcb4484SDaniel P. Berrangé            tf.extractall(path=dest_dir)
27cfcb4484SDaniel P. Berrangé
28*c055f1d2SDaniel P. Berrangédef cpio_extract(archive, output_path):
29cfcb4484SDaniel P. Berrangé    cwd = os.getcwd()
30cfcb4484SDaniel P. Berrangé    os.chdir(output_path)
31*c055f1d2SDaniel P. Berrangé    # Not passing 'check=True' as cpio exits with non-zero
32*c055f1d2SDaniel P. Berrangé    # status if the archive contains any device nodes :-(
33*c055f1d2SDaniel P. Berrangé    if type(archive) == str:
34*c055f1d2SDaniel P. Berrangé        run(['cpio', '-i', '-F', archive],
35*c055f1d2SDaniel P. Berrangé            stdout=DEVNULL, stderr=DEVNULL)
36*c055f1d2SDaniel P. Berrangé    else:
37*c055f1d2SDaniel P. Berrangé        run(['cpio', '-i'],
38*c055f1d2SDaniel P. Berrangé            input=archive.read(),
39*c055f1d2SDaniel P. Berrangé            stdout=DEVNULL, stderr=DEVNULL)
40cfcb4484SDaniel P. Berrangé    os.chdir(cwd)
41379ee839SDaniel P. Berrangé
42379ee839SDaniel P. Berrangédef zip_extract(archive, dest_dir, member=None):
43379ee839SDaniel P. Berrangé    with zipfile.ZipFile(archive, 'r') as zf:
44379ee839SDaniel P. Berrangé        if member:
45379ee839SDaniel P. Berrangé            zf.extract(member=member, path=dest_dir)
46379ee839SDaniel P. Berrangé        else:
47379ee839SDaniel P. Berrangé            zf.extractall(path=dest_dir)
48512fe088SDaniel P. Berrangé
49512fe088SDaniel P. Berrangédef deb_extract(archive, dest_dir, member=None):
50512fe088SDaniel P. Berrangé    cwd = os.getcwd()
51512fe088SDaniel P. Berrangé    os.chdir(dest_dir)
52512fe088SDaniel P. Berrangé    try:
53512fe088SDaniel P. Berrangé        (stdout, stderr, ret) = run_cmd(['ar', 't', archive])
54512fe088SDaniel P. Berrangé        file_path = stdout.split()[2]
55512fe088SDaniel P. Berrangé        run_cmd(['ar', 'x', archive, file_path])
56512fe088SDaniel P. Berrangé        tar_extract(file_path, dest_dir, member)
57512fe088SDaniel P. Berrangé    finally:
58512fe088SDaniel P. Berrangé        os.chdir(cwd)
59