1#!/usr/bin/env python3 2 3import sys 4import struct 5import string 6 7 8class QcowHeaderExtension: 9 10 def __init__(self, magic, length, data): 11 if length % 8 != 0: 12 padding = 8 - (length % 8) 13 data += b"\0" * padding 14 15 self.magic = magic 16 self.length = length 17 self.data = data 18 19 @classmethod 20 def create(cls, magic, data): 21 return QcowHeaderExtension(magic, len(data), data) 22 23 24class QcowHeader: 25 26 uint32_t = 'I' 27 uint64_t = 'Q' 28 29 fields = [ 30 # Version 2 header fields 31 [uint32_t, '%#x', 'magic'], 32 [uint32_t, '%d', 'version'], 33 [uint64_t, '%#x', 'backing_file_offset'], 34 [uint32_t, '%#x', 'backing_file_size'], 35 [uint32_t, '%d', 'cluster_bits'], 36 [uint64_t, '%d', 'size'], 37 [uint32_t, '%d', 'crypt_method'], 38 [uint32_t, '%d', 'l1_size'], 39 [uint64_t, '%#x', 'l1_table_offset'], 40 [uint64_t, '%#x', 'refcount_table_offset'], 41 [uint32_t, '%d', 'refcount_table_clusters'], 42 [uint32_t, '%d', 'nb_snapshots'], 43 [uint64_t, '%#x', 'snapshot_offset'], 44 45 # Version 3 header fields 46 [uint64_t, 'mask', 'incompatible_features'], 47 [uint64_t, 'mask', 'compatible_features'], 48 [uint64_t, 'mask', 'autoclear_features'], 49 [uint32_t, '%d', 'refcount_order'], 50 [uint32_t, '%d', 'header_length'], 51 ] 52 53 fmt = '>' + ''.join(field[0] for field in fields) 54 55 def __init__(self, fd): 56 57 buf_size = struct.calcsize(QcowHeader.fmt) 58 59 fd.seek(0) 60 buf = fd.read(buf_size) 61 62 header = struct.unpack(QcowHeader.fmt, buf) 63 self.__dict__ = dict((field[2], header[i]) 64 for i, field in enumerate(QcowHeader.fields)) 65 66 self.set_defaults() 67 self.cluster_size = 1 << self.cluster_bits 68 69 fd.seek(self.header_length) 70 self.load_extensions(fd) 71 72 if self.backing_file_offset: 73 fd.seek(self.backing_file_offset) 74 self.backing_file = fd.read(self.backing_file_size) 75 else: 76 self.backing_file = None 77 78 def set_defaults(self): 79 if self.version == 2: 80 self.incompatible_features = 0 81 self.compatible_features = 0 82 self.autoclear_features = 0 83 self.refcount_order = 4 84 self.header_length = 72 85 86 def load_extensions(self, fd): 87 self.extensions = [] 88 89 if self.backing_file_offset != 0: 90 end = min(self.cluster_size, self.backing_file_offset) 91 else: 92 end = self.cluster_size 93 94 while fd.tell() < end: 95 (magic, length) = struct.unpack('>II', fd.read(8)) 96 if magic == 0: 97 break 98 else: 99 padded = (length + 7) & ~7 100 data = fd.read(padded) 101 self.extensions.append(QcowHeaderExtension(magic, length, 102 data)) 103 104 def update_extensions(self, fd): 105 106 fd.seek(self.header_length) 107 extensions = self.extensions 108 extensions.append(QcowHeaderExtension(0, 0, b"")) 109 for ex in extensions: 110 buf = struct.pack('>II', ex.magic, ex.length) 111 fd.write(buf) 112 fd.write(ex.data) 113 114 if self.backing_file is not None: 115 self.backing_file_offset = fd.tell() 116 fd.write(self.backing_file) 117 118 if fd.tell() > self.cluster_size: 119 raise Exception("I think I just broke the image...") 120 121 def update(self, fd): 122 header_bytes = self.header_length 123 124 self.update_extensions(fd) 125 126 fd.seek(0) 127 header = tuple(self.__dict__[f] for t, p, f in QcowHeader.fields) 128 buf = struct.pack(QcowHeader.fmt, *header) 129 buf = buf[0:header_bytes-1] 130 fd.write(buf) 131 132 def dump(self): 133 for f in QcowHeader.fields: 134 value = self.__dict__[f[2]] 135 if f[1] == 'mask': 136 bits = [] 137 for bit in range(64): 138 if value & (1 << bit): 139 bits.append(bit) 140 value_str = str(bits) 141 else: 142 value_str = f[1] % value 143 144 print("%-25s" % f[2], value_str) 145 print("") 146 147 def dump_extensions(self): 148 for ex in self.extensions: 149 150 data = ex.data[:ex.length] 151 if all(c in string.printable.encode('ascii') for c in data): 152 data = "'%s'" % data.decode('ascii') 153 else: 154 data = "<binary>" 155 156 print("Header extension:") 157 print("%-25s %#x" % ("magic", ex.magic)) 158 print("%-25s %d" % ("length", ex.length)) 159 print("%-25s %s" % ("data", data)) 160 print("") 161 162 163def cmd_dump_header(fd): 164 h = QcowHeader(fd) 165 h.dump() 166 h.dump_extensions() 167 168 169def cmd_dump_header_exts(fd): 170 h = QcowHeader(fd) 171 h.dump_extensions() 172 173 174def cmd_set_header(fd, name, value): 175 try: 176 value = int(value, 0) 177 except ValueError: 178 print("'%s' is not a valid number" % value) 179 sys.exit(1) 180 181 fields = (field[2] for field in QcowHeader.fields) 182 if name not in fields: 183 print("'%s' is not a known header field" % name) 184 sys.exit(1) 185 186 h = QcowHeader(fd) 187 h.__dict__[name] = value 188 h.update(fd) 189 190 191def cmd_add_header_ext(fd, magic, data): 192 try: 193 magic = int(magic, 0) 194 except ValueError: 195 print("'%s' is not a valid magic number" % magic) 196 sys.exit(1) 197 198 h = QcowHeader(fd) 199 h.extensions.append(QcowHeaderExtension.create(magic, 200 data.encode('ascii'))) 201 h.update(fd) 202 203 204def cmd_add_header_ext_stdio(fd, magic): 205 data = sys.stdin.read() 206 cmd_add_header_ext(fd, magic, data) 207 208 209def cmd_del_header_ext(fd, magic): 210 try: 211 magic = int(magic, 0) 212 except ValueError: 213 print("'%s' is not a valid magic number" % magic) 214 sys.exit(1) 215 216 h = QcowHeader(fd) 217 found = False 218 219 for ex in h.extensions: 220 if ex.magic == magic: 221 found = True 222 h.extensions.remove(ex) 223 224 if not found: 225 print("No such header extension") 226 return 227 228 h.update(fd) 229 230 231def cmd_set_feature_bit(fd, group, bit): 232 try: 233 bit = int(bit, 0) 234 if bit < 0 or bit >= 64: 235 raise ValueError 236 except ValueError: 237 print("'%s' is not a valid bit number in range [0, 64)" % bit) 238 sys.exit(1) 239 240 h = QcowHeader(fd) 241 if group == 'incompatible': 242 h.incompatible_features |= 1 << bit 243 elif group == 'compatible': 244 h.compatible_features |= 1 << bit 245 elif group == 'autoclear': 246 h.autoclear_features |= 1 << bit 247 else: 248 print("'%s' is not a valid group, try " 249 "'incompatible', 'compatible', or 'autoclear'" % group) 250 sys.exit(1) 251 252 h.update(fd) 253 254 255cmds = [ 256 ['dump-header', cmd_dump_header, 0, 257 'Dump image header and header extensions'], 258 ['dump-header-exts', cmd_dump_header_exts, 0, 259 'Dump image header extensions'], 260 ['set-header', cmd_set_header, 2, 'Set a field in the header'], 261 ['add-header-ext', cmd_add_header_ext, 2, 'Add a header extension'], 262 ['add-header-ext-stdio', cmd_add_header_ext_stdio, 1, 263 'Add a header extension, data from stdin'], 264 ['del-header-ext', cmd_del_header_ext, 1, 'Delete a header extension'], 265 ['set-feature-bit', cmd_set_feature_bit, 2, 'Set a feature bit'], 266] 267 268 269def main(filename, cmd, args): 270 fd = open(filename, "r+b") 271 try: 272 for name, handler, num_args, desc in cmds: 273 if name != cmd: 274 continue 275 elif len(args) != num_args: 276 usage() 277 return 278 else: 279 handler(fd, *args) 280 return 281 print("Unknown command '%s'" % cmd) 282 finally: 283 fd.close() 284 285 286def usage(): 287 print("Usage: %s <file> <cmd> [<arg>, ...]" % sys.argv[0]) 288 print("") 289 print("Supported commands:") 290 for name, handler, num_args, desc in cmds: 291 print(" %-20s - %s" % (name, desc)) 292 293 294if __name__ == '__main__': 295 if len(sys.argv) < 3: 296 usage() 297 sys.exit(1) 298 299 main(sys.argv[1], sys.argv[2], sys.argv[3:]) 300