xref: /qemu/tests/qemu-iotests/qcow2.py (revision 02756054e1e4e662e2fbdb283754a4a32e9bbd91)
1#!/usr/bin/env python3
2
3import sys
4import struct
5import string
6
7
8class QcowHeaderExtension:
9
10    def __init__(self, magic, length, data):
11        if length % 8 != 0:
12            padding = 8 - (length % 8)
13            data += b"\0" * padding
14
15        self.magic = magic
16        self.length = length
17        self.data = data
18
19    @classmethod
20    def create(cls, magic, data):
21        return QcowHeaderExtension(magic, len(data), data)
22
23
24class QcowHeader:
25
26    uint32_t = 'I'
27    uint64_t = 'Q'
28
29    fields = [
30        # Version 2 header fields
31        [uint32_t, '%#x',  'magic'],
32        [uint32_t, '%d',   'version'],
33        [uint64_t, '%#x',  'backing_file_offset'],
34        [uint32_t, '%#x',  'backing_file_size'],
35        [uint32_t, '%d',   'cluster_bits'],
36        [uint64_t, '%d',   'size'],
37        [uint32_t, '%d',   'crypt_method'],
38        [uint32_t, '%d',   'l1_size'],
39        [uint64_t, '%#x',  'l1_table_offset'],
40        [uint64_t, '%#x',  'refcount_table_offset'],
41        [uint32_t, '%d',   'refcount_table_clusters'],
42        [uint32_t, '%d',   'nb_snapshots'],
43        [uint64_t, '%#x',  'snapshot_offset'],
44
45        # Version 3 header fields
46        [uint64_t, 'mask', 'incompatible_features'],
47        [uint64_t, 'mask', 'compatible_features'],
48        [uint64_t, 'mask', 'autoclear_features'],
49        [uint32_t, '%d',   'refcount_order'],
50        [uint32_t, '%d',   'header_length'],
51    ]
52
53    fmt = '>' + ''.join(field[0] for field in fields)
54
55    def __init__(self, fd):
56
57        buf_size = struct.calcsize(QcowHeader.fmt)
58
59        fd.seek(0)
60        buf = fd.read(buf_size)
61
62        header = struct.unpack(QcowHeader.fmt, buf)
63        self.__dict__ = dict((field[2], header[i])
64                             for i, field in enumerate(QcowHeader.fields))
65
66        self.set_defaults()
67        self.cluster_size = 1 << self.cluster_bits
68
69        fd.seek(self.header_length)
70        self.load_extensions(fd)
71
72        if self.backing_file_offset:
73            fd.seek(self.backing_file_offset)
74            self.backing_file = fd.read(self.backing_file_size)
75        else:
76            self.backing_file = None
77
78    def set_defaults(self):
79        if self.version == 2:
80            self.incompatible_features = 0
81            self.compatible_features = 0
82            self.autoclear_features = 0
83            self.refcount_order = 4
84            self.header_length = 72
85
86    def load_extensions(self, fd):
87        self.extensions = []
88
89        if self.backing_file_offset != 0:
90            end = min(self.cluster_size, self.backing_file_offset)
91        else:
92            end = self.cluster_size
93
94        while fd.tell() < end:
95            (magic, length) = struct.unpack('>II', fd.read(8))
96            if magic == 0:
97                break
98            else:
99                padded = (length + 7) & ~7
100                data = fd.read(padded)
101                self.extensions.append(QcowHeaderExtension(magic, length,
102                                                           data))
103
104    def update_extensions(self, fd):
105
106        fd.seek(self.header_length)
107        extensions = self.extensions
108        extensions.append(QcowHeaderExtension(0, 0, b""))
109        for ex in extensions:
110            buf = struct.pack('>II', ex.magic, ex.length)
111            fd.write(buf)
112            fd.write(ex.data)
113
114        if self.backing_file is not None:
115            self.backing_file_offset = fd.tell()
116            fd.write(self.backing_file)
117
118        if fd.tell() > self.cluster_size:
119            raise Exception("I think I just broke the image...")
120
121    def update(self, fd):
122        header_bytes = self.header_length
123
124        self.update_extensions(fd)
125
126        fd.seek(0)
127        header = tuple(self.__dict__[f] for t, p, f in QcowHeader.fields)
128        buf = struct.pack(QcowHeader.fmt, *header)
129        buf = buf[0:header_bytes-1]
130        fd.write(buf)
131
132    def dump(self):
133        for f in QcowHeader.fields:
134            value = self.__dict__[f[2]]
135            if f[1] == 'mask':
136                bits = []
137                for bit in range(64):
138                    if value & (1 << bit):
139                        bits.append(bit)
140                value_str = str(bits)
141            else:
142                value_str = f[1] % value
143
144            print("%-25s" % f[2], value_str)
145        print("")
146
147    def dump_extensions(self):
148        for ex in self.extensions:
149
150            data = ex.data[:ex.length]
151            if all(c in string.printable.encode('ascii') for c in data):
152                data = "'%s'" % data.decode('ascii')
153            else:
154                data = "<binary>"
155
156            print("Header extension:")
157            print("%-25s %#x" % ("magic", ex.magic))
158            print("%-25s %d" % ("length", ex.length))
159            print("%-25s %s" % ("data", data))
160            print("")
161
162
163def cmd_dump_header(fd):
164    h = QcowHeader(fd)
165    h.dump()
166    h.dump_extensions()
167
168
169def cmd_dump_header_exts(fd):
170    h = QcowHeader(fd)
171    h.dump_extensions()
172
173
174def cmd_set_header(fd, name, value):
175    try:
176        value = int(value, 0)
177    except ValueError:
178        print("'%s' is not a valid number" % value)
179        sys.exit(1)
180
181    fields = (field[2] for field in QcowHeader.fields)
182    if name not in fields:
183        print("'%s' is not a known header field" % name)
184        sys.exit(1)
185
186    h = QcowHeader(fd)
187    h.__dict__[name] = value
188    h.update(fd)
189
190
191def cmd_add_header_ext(fd, magic, data):
192    try:
193        magic = int(magic, 0)
194    except ValueError:
195        print("'%s' is not a valid magic number" % magic)
196        sys.exit(1)
197
198    h = QcowHeader(fd)
199    h.extensions.append(QcowHeaderExtension.create(magic,
200                                                   data.encode('ascii')))
201    h.update(fd)
202
203
204def cmd_add_header_ext_stdio(fd, magic):
205    data = sys.stdin.read()
206    cmd_add_header_ext(fd, magic, data)
207
208
209def cmd_del_header_ext(fd, magic):
210    try:
211        magic = int(magic, 0)
212    except ValueError:
213        print("'%s' is not a valid magic number" % magic)
214        sys.exit(1)
215
216    h = QcowHeader(fd)
217    found = False
218
219    for ex in h.extensions:
220        if ex.magic == magic:
221            found = True
222            h.extensions.remove(ex)
223
224    if not found:
225        print("No such header extension")
226        return
227
228    h.update(fd)
229
230
231def cmd_set_feature_bit(fd, group, bit):
232    try:
233        bit = int(bit, 0)
234        if bit < 0 or bit >= 64:
235            raise ValueError
236    except ValueError:
237        print("'%s' is not a valid bit number in range [0, 64)" % bit)
238        sys.exit(1)
239
240    h = QcowHeader(fd)
241    if group == 'incompatible':
242        h.incompatible_features |= 1 << bit
243    elif group == 'compatible':
244        h.compatible_features |= 1 << bit
245    elif group == 'autoclear':
246        h.autoclear_features |= 1 << bit
247    else:
248        print("'%s' is not a valid group, try "
249              "'incompatible', 'compatible', or 'autoclear'" % group)
250        sys.exit(1)
251
252    h.update(fd)
253
254
255cmds = [
256    ['dump-header', cmd_dump_header, 0,
257     'Dump image header and header extensions'],
258    ['dump-header-exts', cmd_dump_header_exts, 0,
259     'Dump image header extensions'],
260    ['set-header', cmd_set_header, 2, 'Set a field in the header'],
261    ['add-header-ext', cmd_add_header_ext, 2, 'Add a header extension'],
262    ['add-header-ext-stdio', cmd_add_header_ext_stdio, 1,
263     'Add a header extension, data from stdin'],
264    ['del-header-ext', cmd_del_header_ext, 1, 'Delete a header extension'],
265    ['set-feature-bit', cmd_set_feature_bit, 2, 'Set a feature bit'],
266]
267
268
269def main(filename, cmd, args):
270    fd = open(filename, "r+b")
271    try:
272        for name, handler, num_args, desc in cmds:
273            if name != cmd:
274                continue
275            elif len(args) != num_args:
276                usage()
277                return
278            else:
279                handler(fd, *args)
280                return
281        print("Unknown command '%s'" % cmd)
282    finally:
283        fd.close()
284
285
286def usage():
287    print("Usage: %s <file> <cmd> [<arg>, ...]" % sys.argv[0])
288    print("")
289    print("Supported commands:")
290    for name, handler, num_args, desc in cmds:
291        print("    %-20s - %s" % (name, desc))
292
293
294if __name__ == '__main__':
295    if len(sys.argv) < 3:
296        usage()
297        sys.exit(1)
298
299    main(sys.argv[1], sys.argv[2], sys.argv[3:])
300