1#!/usr/bin/env python3 2# 3# Manipulations with qcow2 image 4# 5# This program is free software; you can redistribute it and/or modify 6# it under the terms of the GNU General Public License as published by 7# the Free Software Foundation; either version 2 of the License, or 8# (at your option) any later version. 9# 10# This program is distributed in the hope that it will be useful, 11# but WITHOUT ANY WARRANTY; without even the implied warranty of 12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 13# GNU General Public License for more details. 14# 15# You should have received a copy of the GNU General Public License 16# along with this program. If not, see <http://www.gnu.org/licenses/>. 17# 18 19import sys 20import struct 21import string 22 23 24class QcowHeaderExtension: 25 26 def __init__(self, magic, length, data): 27 if length % 8 != 0: 28 padding = 8 - (length % 8) 29 data += b"\0" * padding 30 31 self.magic = magic 32 self.length = length 33 self.data = data 34 35 @classmethod 36 def create(cls, magic, data): 37 return QcowHeaderExtension(magic, len(data), data) 38 39 40class QcowHeader: 41 42 uint32_t = 'I' 43 uint64_t = 'Q' 44 45 fields = [ 46 # Version 2 header fields 47 [uint32_t, '%#x', 'magic'], 48 [uint32_t, '%d', 'version'], 49 [uint64_t, '%#x', 'backing_file_offset'], 50 [uint32_t, '%#x', 'backing_file_size'], 51 [uint32_t, '%d', 'cluster_bits'], 52 [uint64_t, '%d', 'size'], 53 [uint32_t, '%d', 'crypt_method'], 54 [uint32_t, '%d', 'l1_size'], 55 [uint64_t, '%#x', 'l1_table_offset'], 56 [uint64_t, '%#x', 'refcount_table_offset'], 57 [uint32_t, '%d', 'refcount_table_clusters'], 58 [uint32_t, '%d', 'nb_snapshots'], 59 [uint64_t, '%#x', 'snapshot_offset'], 60 61 # Version 3 header fields 62 [uint64_t, 'mask', 'incompatible_features'], 63 [uint64_t, 'mask', 'compatible_features'], 64 [uint64_t, 'mask', 'autoclear_features'], 65 [uint32_t, '%d', 'refcount_order'], 66 [uint32_t, '%d', 'header_length'], 67 ] 68 69 fmt = '>' + ''.join(field[0] for field in fields) 70 71 def __init__(self, fd): 72 73 buf_size = struct.calcsize(QcowHeader.fmt) 74 75 fd.seek(0) 76 buf = fd.read(buf_size) 77 78 header = struct.unpack(QcowHeader.fmt, buf) 79 self.__dict__ = dict((field[2], header[i]) 80 for i, field in enumerate(QcowHeader.fields)) 81 82 self.set_defaults() 83 self.cluster_size = 1 << self.cluster_bits 84 85 fd.seek(self.header_length) 86 self.load_extensions(fd) 87 88 if self.backing_file_offset: 89 fd.seek(self.backing_file_offset) 90 self.backing_file = fd.read(self.backing_file_size) 91 else: 92 self.backing_file = None 93 94 def set_defaults(self): 95 if self.version == 2: 96 self.incompatible_features = 0 97 self.compatible_features = 0 98 self.autoclear_features = 0 99 self.refcount_order = 4 100 self.header_length = 72 101 102 def load_extensions(self, fd): 103 self.extensions = [] 104 105 if self.backing_file_offset != 0: 106 end = min(self.cluster_size, self.backing_file_offset) 107 else: 108 end = self.cluster_size 109 110 while fd.tell() < end: 111 (magic, length) = struct.unpack('>II', fd.read(8)) 112 if magic == 0: 113 break 114 else: 115 padded = (length + 7) & ~7 116 data = fd.read(padded) 117 self.extensions.append(QcowHeaderExtension(magic, length, 118 data)) 119 120 def update_extensions(self, fd): 121 122 fd.seek(self.header_length) 123 extensions = self.extensions 124 extensions.append(QcowHeaderExtension(0, 0, b"")) 125 for ex in extensions: 126 buf = struct.pack('>II', ex.magic, ex.length) 127 fd.write(buf) 128 fd.write(ex.data) 129 130 if self.backing_file is not None: 131 self.backing_file_offset = fd.tell() 132 fd.write(self.backing_file) 133 134 if fd.tell() > self.cluster_size: 135 raise Exception("I think I just broke the image...") 136 137 def update(self, fd): 138 header_bytes = self.header_length 139 140 self.update_extensions(fd) 141 142 fd.seek(0) 143 header = tuple(self.__dict__[f] for t, p, f in QcowHeader.fields) 144 buf = struct.pack(QcowHeader.fmt, *header) 145 buf = buf[0:header_bytes-1] 146 fd.write(buf) 147 148 def dump(self): 149 for f in QcowHeader.fields: 150 value = self.__dict__[f[2]] 151 if f[1] == 'mask': 152 bits = [] 153 for bit in range(64): 154 if value & (1 << bit): 155 bits.append(bit) 156 value_str = str(bits) 157 else: 158 value_str = f[1] % value 159 160 print("%-25s" % f[2], value_str) 161 print("") 162 163 def dump_extensions(self): 164 for ex in self.extensions: 165 166 data = ex.data[:ex.length] 167 if all(c in string.printable.encode('ascii') for c in data): 168 data = "'%s'" % data.decode('ascii') 169 else: 170 data = "<binary>" 171 172 print("Header extension:") 173 print("%-25s %#x" % ("magic", ex.magic)) 174 print("%-25s %d" % ("length", ex.length)) 175 print("%-25s %s" % ("data", data)) 176 print("") 177 178 179def cmd_dump_header(fd): 180 h = QcowHeader(fd) 181 h.dump() 182 h.dump_extensions() 183 184 185def cmd_dump_header_exts(fd): 186 h = QcowHeader(fd) 187 h.dump_extensions() 188 189 190def cmd_set_header(fd, name, value): 191 try: 192 value = int(value, 0) 193 except ValueError: 194 print("'%s' is not a valid number" % value) 195 sys.exit(1) 196 197 fields = (field[2] for field in QcowHeader.fields) 198 if name not in fields: 199 print("'%s' is not a known header field" % name) 200 sys.exit(1) 201 202 h = QcowHeader(fd) 203 h.__dict__[name] = value 204 h.update(fd) 205 206 207def cmd_add_header_ext(fd, magic, data): 208 try: 209 magic = int(magic, 0) 210 except ValueError: 211 print("'%s' is not a valid magic number" % magic) 212 sys.exit(1) 213 214 h = QcowHeader(fd) 215 h.extensions.append(QcowHeaderExtension.create(magic, 216 data.encode('ascii'))) 217 h.update(fd) 218 219 220def cmd_add_header_ext_stdio(fd, magic): 221 data = sys.stdin.read() 222 cmd_add_header_ext(fd, magic, data) 223 224 225def cmd_del_header_ext(fd, magic): 226 try: 227 magic = int(magic, 0) 228 except ValueError: 229 print("'%s' is not a valid magic number" % magic) 230 sys.exit(1) 231 232 h = QcowHeader(fd) 233 found = False 234 235 for ex in h.extensions: 236 if ex.magic == magic: 237 found = True 238 h.extensions.remove(ex) 239 240 if not found: 241 print("No such header extension") 242 return 243 244 h.update(fd) 245 246 247def cmd_set_feature_bit(fd, group, bit): 248 try: 249 bit = int(bit, 0) 250 if bit < 0 or bit >= 64: 251 raise ValueError 252 except ValueError: 253 print("'%s' is not a valid bit number in range [0, 64)" % bit) 254 sys.exit(1) 255 256 h = QcowHeader(fd) 257 if group == 'incompatible': 258 h.incompatible_features |= 1 << bit 259 elif group == 'compatible': 260 h.compatible_features |= 1 << bit 261 elif group == 'autoclear': 262 h.autoclear_features |= 1 << bit 263 else: 264 print("'%s' is not a valid group, try " 265 "'incompatible', 'compatible', or 'autoclear'" % group) 266 sys.exit(1) 267 268 h.update(fd) 269 270 271cmds = [ 272 ['dump-header', cmd_dump_header, 0, 273 'Dump image header and header extensions'], 274 ['dump-header-exts', cmd_dump_header_exts, 0, 275 'Dump image header extensions'], 276 ['set-header', cmd_set_header, 2, 'Set a field in the header'], 277 ['add-header-ext', cmd_add_header_ext, 2, 'Add a header extension'], 278 ['add-header-ext-stdio', cmd_add_header_ext_stdio, 1, 279 'Add a header extension, data from stdin'], 280 ['del-header-ext', cmd_del_header_ext, 1, 'Delete a header extension'], 281 ['set-feature-bit', cmd_set_feature_bit, 2, 'Set a feature bit'], 282] 283 284 285def main(filename, cmd, args): 286 fd = open(filename, "r+b") 287 try: 288 for name, handler, num_args, desc in cmds: 289 if name != cmd: 290 continue 291 elif len(args) != num_args: 292 usage() 293 return 294 else: 295 handler(fd, *args) 296 return 297 print("Unknown command '%s'" % cmd) 298 finally: 299 fd.close() 300 301 302def usage(): 303 print("Usage: %s <file> <cmd> [<arg>, ...]" % sys.argv[0]) 304 print("") 305 print("Supported commands:") 306 for name, handler, num_args, desc in cmds: 307 print(" %-20s - %s" % (name, desc)) 308 309 310if __name__ == '__main__': 311 if len(sys.argv) < 3: 312 usage() 313 sys.exit(1) 314 315 main(sys.argv[1], sys.argv[2], sys.argv[3:]) 316