1 /* 2 * QEMU Block driver for RADOS (Ceph) 3 * 4 * Copyright (C) 2010-2011 Christian Brunner <chb@muc.de>, 5 * Josh Durgin <josh.durgin@dreamhost.com> 6 * 7 * This work is licensed under the terms of the GNU GPL, version 2. See 8 * the COPYING file in the top-level directory. 9 * 10 * Contributions after 2012-01-13 are licensed under the terms of the 11 * GNU GPL, version 2 or (at your option) any later version. 12 */ 13 14 #include "qemu/osdep.h" 15 16 #include <rbd/librbd.h> 17 #include "qapi/error.h" 18 #include "qemu/error-report.h" 19 #include "qemu/module.h" 20 #include "qemu/option.h" 21 #include "block/block-io.h" 22 #include "block/block_int.h" 23 #include "block/qdict.h" 24 #include "crypto/secret.h" 25 #include "qemu/cutils.h" 26 #include "system/replay.h" 27 #include "qobject/qstring.h" 28 #include "qobject/qdict.h" 29 #include "qobject/qjson.h" 30 #include "qobject/qlist.h" 31 #include "qapi/qobject-input-visitor.h" 32 #include "qapi/qapi-visit-block-core.h" 33 34 /* 35 * When specifying the image filename use: 36 * 37 * rbd:poolname/devicename[@snapshotname][:option1=value1[:option2=value2...]] 38 * 39 * poolname must be the name of an existing rados pool. 40 * 41 * devicename is the name of the rbd image. 42 * 43 * Each option given is used to configure rados, and may be any valid 44 * Ceph option, "id", or "conf". 45 * 46 * The "id" option indicates what user we should authenticate as to 47 * the Ceph cluster. If it is excluded we will use the Ceph default 48 * (normally 'admin'). 49 * 50 * The "conf" option specifies a Ceph configuration file to read. If 51 * it is not specified, we will read from the default Ceph locations 52 * (e.g., /etc/ceph/ceph.conf). To avoid reading _any_ configuration 53 * file, specify conf=/dev/null. 54 * 55 * Configuration values containing :, @, or = can be escaped with a 56 * leading "\". 57 */ 58 59 #define OBJ_MAX_SIZE (1UL << OBJ_DEFAULT_OBJ_ORDER) 60 61 #define RBD_MAX_SNAPS 100 62 63 #define RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN 8 64 65 static const char rbd_luks_header_verification[ 66 RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN] = { 67 'L', 'U', 'K', 'S', 0xBA, 0xBE, 0, 1 68 }; 69 70 static const char rbd_luks2_header_verification[ 71 RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN] = { 72 'L', 'U', 'K', 'S', 0xBA, 0xBE, 0, 2 73 }; 74 75 static const char rbd_layered_luks_header_verification[ 76 RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN] = { 77 'R', 'B', 'D', 'L', 0xBA, 0xBE, 0, 1 78 }; 79 80 static const char rbd_layered_luks2_header_verification[ 81 RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN] = { 82 'R', 'B', 'D', 'L', 0xBA, 0xBE, 0, 2 83 }; 84 85 typedef enum { 86 RBD_AIO_READ, 87 RBD_AIO_WRITE, 88 RBD_AIO_DISCARD, 89 RBD_AIO_FLUSH, 90 RBD_AIO_WRITE_ZEROES 91 } RBDAIOCmd; 92 93 typedef struct BDRVRBDState { 94 rados_t cluster; 95 rados_ioctx_t io_ctx; 96 rbd_image_t image; 97 char *image_name; 98 char *snap; 99 char *namespace; 100 uint64_t image_size; 101 uint64_t object_size; 102 } BDRVRBDState; 103 104 typedef struct RBDTask { 105 BlockDriverState *bs; 106 Coroutine *co; 107 bool complete; 108 int64_t ret; 109 } RBDTask; 110 111 typedef struct RBDDiffIterateReq { 112 uint64_t offs; 113 uint64_t bytes; 114 bool exists; 115 } RBDDiffIterateReq; 116 117 static int qemu_rbd_connect(rados_t *cluster, rados_ioctx_t *io_ctx, 118 BlockdevOptionsRbd *opts, bool cache, 119 const char *keypairs, const char *secretid, 120 Error **errp); 121 122 static char *qemu_rbd_strchr(char *src, char delim) 123 { 124 char *p; 125 126 for (p = src; *p; ++p) { 127 if (*p == delim) { 128 return p; 129 } 130 if (*p == '\\' && p[1] != '\0') { 131 ++p; 132 } 133 } 134 135 return NULL; 136 } 137 138 139 static char *qemu_rbd_next_tok(char *src, char delim, char **p) 140 { 141 char *end; 142 143 *p = NULL; 144 145 end = qemu_rbd_strchr(src, delim); 146 if (end) { 147 *p = end + 1; 148 *end = '\0'; 149 } 150 return src; 151 } 152 153 static void qemu_rbd_unescape(char *src) 154 { 155 char *p; 156 157 for (p = src; *src; ++src, ++p) { 158 if (*src == '\\' && src[1] != '\0') { 159 src++; 160 } 161 *p = *src; 162 } 163 *p = '\0'; 164 } 165 166 static void qemu_rbd_parse_filename(const char *filename, QDict *options, 167 Error **errp) 168 { 169 const char *start; 170 char *p, *buf; 171 QList *keypairs = NULL; 172 char *found_str, *image_name; 173 174 if (!strstart(filename, "rbd:", &start)) { 175 error_setg(errp, "File name must start with 'rbd:'"); 176 return; 177 } 178 179 buf = g_strdup(start); 180 p = buf; 181 182 found_str = qemu_rbd_next_tok(p, '/', &p); 183 if (!p) { 184 error_setg(errp, "Pool name is required"); 185 goto done; 186 } 187 qemu_rbd_unescape(found_str); 188 qdict_put_str(options, "pool", found_str); 189 190 if (qemu_rbd_strchr(p, '@')) { 191 image_name = qemu_rbd_next_tok(p, '@', &p); 192 193 found_str = qemu_rbd_next_tok(p, ':', &p); 194 qemu_rbd_unescape(found_str); 195 qdict_put_str(options, "snapshot", found_str); 196 } else { 197 image_name = qemu_rbd_next_tok(p, ':', &p); 198 } 199 /* Check for namespace in the image_name */ 200 if (qemu_rbd_strchr(image_name, '/')) { 201 found_str = qemu_rbd_next_tok(image_name, '/', &image_name); 202 qemu_rbd_unescape(found_str); 203 qdict_put_str(options, "namespace", found_str); 204 } else { 205 qdict_put_str(options, "namespace", ""); 206 } 207 qemu_rbd_unescape(image_name); 208 qdict_put_str(options, "image", image_name); 209 if (!p) { 210 goto done; 211 } 212 213 /* The following are essentially all key/value pairs, and we treat 214 * 'id' and 'conf' a bit special. Key/value pairs may be in any order. */ 215 while (p) { 216 char *name, *value; 217 name = qemu_rbd_next_tok(p, '=', &p); 218 if (!p) { 219 error_setg(errp, "conf option %s has no value", name); 220 break; 221 } 222 223 qemu_rbd_unescape(name); 224 225 value = qemu_rbd_next_tok(p, ':', &p); 226 qemu_rbd_unescape(value); 227 228 if (!strcmp(name, "conf")) { 229 qdict_put_str(options, "conf", value); 230 } else if (!strcmp(name, "id")) { 231 qdict_put_str(options, "user", value); 232 } else { 233 /* 234 * We pass these internally to qemu_rbd_set_keypairs(), so 235 * we can get away with the simpler list of [ "key1", 236 * "value1", "key2", "value2" ] rather than a raw dict 237 * { "key1": "value1", "key2": "value2" } where we can't 238 * guarantee order, or even a more correct but complex 239 * [ { "key1": "value1" }, { "key2": "value2" } ] 240 */ 241 if (!keypairs) { 242 keypairs = qlist_new(); 243 } 244 qlist_append_str(keypairs, name); 245 qlist_append_str(keypairs, value); 246 } 247 } 248 249 if (keypairs) { 250 qdict_put(options, "=keyvalue-pairs", 251 qstring_from_gstring(qobject_to_json(QOBJECT(keypairs)))); 252 } 253 254 done: 255 g_free(buf); 256 qobject_unref(keypairs); 257 } 258 259 static int qemu_rbd_set_auth(rados_t cluster, BlockdevOptionsRbd *opts, 260 Error **errp) 261 { 262 char *key, *acr; 263 int r; 264 GString *accu; 265 RbdAuthModeList *auth; 266 267 if (opts->key_secret) { 268 key = qcrypto_secret_lookup_as_base64(opts->key_secret, errp); 269 if (!key) { 270 return -EIO; 271 } 272 r = rados_conf_set(cluster, "key", key); 273 g_free(key); 274 if (r < 0) { 275 error_setg_errno(errp, -r, "Could not set 'key'"); 276 return r; 277 } 278 } 279 280 if (opts->has_auth_client_required) { 281 accu = g_string_new(""); 282 for (auth = opts->auth_client_required; auth; auth = auth->next) { 283 if (accu->str[0]) { 284 g_string_append_c(accu, ';'); 285 } 286 g_string_append(accu, RbdAuthMode_str(auth->value)); 287 } 288 acr = g_string_free(accu, FALSE); 289 r = rados_conf_set(cluster, "auth_client_required", acr); 290 g_free(acr); 291 if (r < 0) { 292 error_setg_errno(errp, -r, 293 "Could not set 'auth_client_required'"); 294 return r; 295 } 296 } 297 298 return 0; 299 } 300 301 static int qemu_rbd_set_keypairs(rados_t cluster, const char *keypairs_json, 302 Error **errp) 303 { 304 QList *keypairs; 305 QString *name; 306 QString *value; 307 const char *key; 308 size_t remaining; 309 int ret = 0; 310 311 if (!keypairs_json) { 312 return ret; 313 } 314 keypairs = qobject_to(QList, 315 qobject_from_json(keypairs_json, &error_abort)); 316 remaining = qlist_size(keypairs) / 2; 317 assert(remaining); 318 319 while (remaining--) { 320 name = qobject_to(QString, qlist_pop(keypairs)); 321 value = qobject_to(QString, qlist_pop(keypairs)); 322 assert(name && value); 323 key = qstring_get_str(name); 324 325 ret = rados_conf_set(cluster, key, qstring_get_str(value)); 326 qobject_unref(value); 327 if (ret < 0) { 328 error_setg_errno(errp, -ret, "invalid conf option %s", key); 329 qobject_unref(name); 330 ret = -EINVAL; 331 break; 332 } 333 qobject_unref(name); 334 } 335 336 qobject_unref(keypairs); 337 return ret; 338 } 339 340 #ifdef LIBRBD_SUPPORTS_ENCRYPTION 341 static int qemu_rbd_convert_luks_options( 342 RbdEncryptionOptionsLUKSBase *luks_opts, 343 char **passphrase, 344 size_t *passphrase_len, 345 Error **errp) 346 { 347 return qcrypto_secret_lookup(luks_opts->key_secret, (uint8_t **)passphrase, 348 passphrase_len, errp); 349 } 350 351 static int qemu_rbd_convert_luks_create_options( 352 RbdEncryptionCreateOptionsLUKSBase *luks_opts, 353 rbd_encryption_algorithm_t *alg, 354 char **passphrase, 355 size_t *passphrase_len, 356 Error **errp) 357 { 358 int r = 0; 359 360 r = qemu_rbd_convert_luks_options( 361 qapi_RbdEncryptionCreateOptionsLUKSBase_base(luks_opts), 362 passphrase, passphrase_len, errp); 363 if (r < 0) { 364 return r; 365 } 366 367 if (luks_opts->has_cipher_alg) { 368 switch (luks_opts->cipher_alg) { 369 case QCRYPTO_CIPHER_ALGO_AES_128: { 370 *alg = RBD_ENCRYPTION_ALGORITHM_AES128; 371 break; 372 } 373 case QCRYPTO_CIPHER_ALGO_AES_256: { 374 *alg = RBD_ENCRYPTION_ALGORITHM_AES256; 375 break; 376 } 377 default: { 378 r = -ENOTSUP; 379 error_setg_errno(errp, -r, "unknown encryption algorithm: %u", 380 luks_opts->cipher_alg); 381 return r; 382 } 383 } 384 } else { 385 /* default alg */ 386 *alg = RBD_ENCRYPTION_ALGORITHM_AES256; 387 } 388 389 return 0; 390 } 391 392 static int qemu_rbd_encryption_format(rbd_image_t image, 393 RbdEncryptionCreateOptions *encrypt, 394 Error **errp) 395 { 396 int r = 0; 397 g_autofree char *passphrase = NULL; 398 rbd_encryption_format_t format; 399 rbd_encryption_options_t opts; 400 rbd_encryption_luks1_format_options_t luks_opts; 401 rbd_encryption_luks2_format_options_t luks2_opts; 402 size_t opts_size; 403 uint64_t raw_size, effective_size; 404 405 r = rbd_get_size(image, &raw_size); 406 if (r < 0) { 407 error_setg_errno(errp, -r, "cannot get raw image size"); 408 return r; 409 } 410 411 switch (encrypt->format) { 412 case RBD_IMAGE_ENCRYPTION_FORMAT_LUKS: { 413 memset(&luks_opts, 0, sizeof(luks_opts)); 414 format = RBD_ENCRYPTION_FORMAT_LUKS1; 415 opts = &luks_opts; 416 opts_size = sizeof(luks_opts); 417 r = qemu_rbd_convert_luks_create_options( 418 qapi_RbdEncryptionCreateOptionsLUKS_base(&encrypt->u.luks), 419 &luks_opts.alg, &passphrase, &luks_opts.passphrase_size, 420 errp); 421 if (r < 0) { 422 return r; 423 } 424 luks_opts.passphrase = passphrase; 425 break; 426 } 427 case RBD_IMAGE_ENCRYPTION_FORMAT_LUKS2: { 428 memset(&luks2_opts, 0, sizeof(luks2_opts)); 429 format = RBD_ENCRYPTION_FORMAT_LUKS2; 430 opts = &luks2_opts; 431 opts_size = sizeof(luks2_opts); 432 r = qemu_rbd_convert_luks_create_options( 433 qapi_RbdEncryptionCreateOptionsLUKS2_base( 434 &encrypt->u.luks2), 435 &luks2_opts.alg, &passphrase, &luks2_opts.passphrase_size, 436 errp); 437 if (r < 0) { 438 return r; 439 } 440 luks2_opts.passphrase = passphrase; 441 break; 442 } 443 default: { 444 r = -ENOTSUP; 445 error_setg_errno( 446 errp, -r, "unknown image encryption format: %u", 447 encrypt->format); 448 return r; 449 } 450 } 451 452 r = rbd_encryption_format(image, format, opts, opts_size); 453 if (r < 0) { 454 error_setg_errno(errp, -r, "encryption format fail"); 455 return r; 456 } 457 458 r = rbd_get_size(image, &effective_size); 459 if (r < 0) { 460 error_setg_errno(errp, -r, "cannot get effective image size"); 461 return r; 462 } 463 464 r = rbd_resize(image, raw_size + (raw_size - effective_size)); 465 if (r < 0) { 466 error_setg_errno(errp, -r, "cannot resize image after format"); 467 return r; 468 } 469 470 return 0; 471 } 472 473 static int qemu_rbd_encryption_load(rbd_image_t image, 474 RbdEncryptionOptions *encrypt, 475 Error **errp) 476 { 477 int r = 0; 478 g_autofree char *passphrase = NULL; 479 rbd_encryption_luks1_format_options_t luks_opts; 480 rbd_encryption_luks2_format_options_t luks2_opts; 481 #ifdef LIBRBD_SUPPORTS_ENCRYPTION_LOAD2 482 rbd_encryption_luks_format_options_t luks_any_opts; 483 #endif 484 rbd_encryption_format_t format; 485 rbd_encryption_options_t opts; 486 size_t opts_size; 487 488 switch (encrypt->format) { 489 case RBD_IMAGE_ENCRYPTION_FORMAT_LUKS: { 490 memset(&luks_opts, 0, sizeof(luks_opts)); 491 format = RBD_ENCRYPTION_FORMAT_LUKS1; 492 opts = &luks_opts; 493 opts_size = sizeof(luks_opts); 494 r = qemu_rbd_convert_luks_options( 495 qapi_RbdEncryptionOptionsLUKS_base(&encrypt->u.luks), 496 &passphrase, &luks_opts.passphrase_size, errp); 497 if (r < 0) { 498 return r; 499 } 500 luks_opts.passphrase = passphrase; 501 break; 502 } 503 case RBD_IMAGE_ENCRYPTION_FORMAT_LUKS2: { 504 memset(&luks2_opts, 0, sizeof(luks2_opts)); 505 format = RBD_ENCRYPTION_FORMAT_LUKS2; 506 opts = &luks2_opts; 507 opts_size = sizeof(luks2_opts); 508 r = qemu_rbd_convert_luks_options( 509 qapi_RbdEncryptionOptionsLUKS2_base(&encrypt->u.luks2), 510 &passphrase, &luks2_opts.passphrase_size, errp); 511 if (r < 0) { 512 return r; 513 } 514 luks2_opts.passphrase = passphrase; 515 break; 516 } 517 #ifdef LIBRBD_SUPPORTS_ENCRYPTION_LOAD2 518 case RBD_IMAGE_ENCRYPTION_FORMAT_LUKS_ANY: { 519 memset(&luks_any_opts, 0, sizeof(luks_any_opts)); 520 format = RBD_ENCRYPTION_FORMAT_LUKS; 521 opts = &luks_any_opts; 522 opts_size = sizeof(luks_any_opts); 523 r = qemu_rbd_convert_luks_options( 524 qapi_RbdEncryptionOptionsLUKSAny_base(&encrypt->u.luks_any), 525 &passphrase, &luks_any_opts.passphrase_size, errp); 526 if (r < 0) { 527 return r; 528 } 529 luks_any_opts.passphrase = passphrase; 530 break; 531 } 532 #endif 533 default: { 534 r = -ENOTSUP; 535 error_setg_errno( 536 errp, -r, "unknown image encryption format: %u", 537 encrypt->format); 538 return r; 539 } 540 } 541 542 r = rbd_encryption_load(image, format, opts, opts_size); 543 if (r < 0) { 544 error_setg_errno(errp, -r, "encryption load fail"); 545 return r; 546 } 547 548 return 0; 549 } 550 551 #ifdef LIBRBD_SUPPORTS_ENCRYPTION_LOAD2 552 static int qemu_rbd_encryption_load2(rbd_image_t image, 553 RbdEncryptionOptions *encrypt, 554 Error **errp) 555 { 556 int r = 0; 557 int encrypt_count = 1; 558 int i; 559 RbdEncryptionOptions *curr_encrypt; 560 rbd_encryption_spec_t *specs; 561 rbd_encryption_luks1_format_options_t *luks_opts; 562 rbd_encryption_luks2_format_options_t *luks2_opts; 563 rbd_encryption_luks_format_options_t *luks_any_opts; 564 565 /* count encryption options */ 566 for (curr_encrypt = encrypt->parent; curr_encrypt; 567 curr_encrypt = curr_encrypt->parent) { 568 ++encrypt_count; 569 } 570 571 specs = g_new0(rbd_encryption_spec_t, encrypt_count); 572 573 curr_encrypt = encrypt; 574 for (i = 0; i < encrypt_count; ++i) { 575 switch (curr_encrypt->format) { 576 case RBD_IMAGE_ENCRYPTION_FORMAT_LUKS: { 577 specs[i].format = RBD_ENCRYPTION_FORMAT_LUKS1; 578 579 luks_opts = g_new0(rbd_encryption_luks1_format_options_t, 1); 580 specs[i].opts = luks_opts; 581 specs[i].opts_size = sizeof(*luks_opts); 582 583 r = qemu_rbd_convert_luks_options( 584 qapi_RbdEncryptionOptionsLUKS_base( 585 &curr_encrypt->u.luks), 586 (char **)&luks_opts->passphrase, 587 &luks_opts->passphrase_size, 588 errp); 589 break; 590 } 591 case RBD_IMAGE_ENCRYPTION_FORMAT_LUKS2: { 592 specs[i].format = RBD_ENCRYPTION_FORMAT_LUKS2; 593 594 luks2_opts = g_new0(rbd_encryption_luks2_format_options_t, 1); 595 specs[i].opts = luks2_opts; 596 specs[i].opts_size = sizeof(*luks2_opts); 597 598 r = qemu_rbd_convert_luks_options( 599 qapi_RbdEncryptionOptionsLUKS2_base( 600 &curr_encrypt->u.luks2), 601 (char **)&luks2_opts->passphrase, 602 &luks2_opts->passphrase_size, 603 errp); 604 break; 605 } 606 case RBD_IMAGE_ENCRYPTION_FORMAT_LUKS_ANY: { 607 specs[i].format = RBD_ENCRYPTION_FORMAT_LUKS; 608 609 luks_any_opts = g_new0(rbd_encryption_luks_format_options_t, 1); 610 specs[i].opts = luks_any_opts; 611 specs[i].opts_size = sizeof(*luks_any_opts); 612 613 r = qemu_rbd_convert_luks_options( 614 qapi_RbdEncryptionOptionsLUKSAny_base( 615 &curr_encrypt->u.luks_any), 616 (char **)&luks_any_opts->passphrase, 617 &luks_any_opts->passphrase_size, 618 errp); 619 break; 620 } 621 default: { 622 r = -ENOTSUP; 623 error_setg_errno( 624 errp, -r, "unknown image encryption format: %u", 625 curr_encrypt->format); 626 } 627 } 628 629 if (r < 0) { 630 goto exit; 631 } 632 633 curr_encrypt = curr_encrypt->parent; 634 } 635 636 r = rbd_encryption_load2(image, specs, encrypt_count); 637 if (r < 0) { 638 error_setg_errno(errp, -r, "layered encryption load fail"); 639 goto exit; 640 } 641 642 exit: 643 for (i = 0; i < encrypt_count; ++i) { 644 if (!specs[i].opts) { 645 break; 646 } 647 648 switch (specs[i].format) { 649 case RBD_ENCRYPTION_FORMAT_LUKS1: { 650 luks_opts = specs[i].opts; 651 g_free((void *)luks_opts->passphrase); 652 break; 653 } 654 case RBD_ENCRYPTION_FORMAT_LUKS2: { 655 luks2_opts = specs[i].opts; 656 g_free((void *)luks2_opts->passphrase); 657 break; 658 } 659 case RBD_ENCRYPTION_FORMAT_LUKS: { 660 luks_any_opts = specs[i].opts; 661 g_free((void *)luks_any_opts->passphrase); 662 break; 663 } 664 } 665 666 g_free(specs[i].opts); 667 } 668 g_free(specs); 669 return r; 670 } 671 #endif 672 #endif 673 674 /* FIXME Deprecate and remove keypairs or make it available in QMP. */ 675 static int qemu_rbd_do_create(BlockdevCreateOptions *options, 676 const char *keypairs, const char *password_secret, 677 Error **errp) 678 { 679 BlockdevCreateOptionsRbd *opts = &options->u.rbd; 680 rados_t cluster; 681 rados_ioctx_t io_ctx; 682 int obj_order = 0; 683 int ret; 684 685 assert(options->driver == BLOCKDEV_DRIVER_RBD); 686 if (opts->location->snapshot) { 687 error_setg(errp, "Can't use snapshot name for image creation"); 688 return -EINVAL; 689 } 690 691 #ifndef LIBRBD_SUPPORTS_ENCRYPTION 692 if (opts->encrypt) { 693 error_setg(errp, "RBD library does not support image encryption"); 694 return -ENOTSUP; 695 } 696 #endif 697 698 if (opts->has_cluster_size) { 699 int64_t objsize = opts->cluster_size; 700 if ((objsize - 1) & objsize) { /* not a power of 2? */ 701 error_setg(errp, "obj size needs to be power of 2"); 702 return -EINVAL; 703 } 704 if (objsize < 4096) { 705 error_setg(errp, "obj size too small"); 706 return -EINVAL; 707 } 708 obj_order = ctz32(objsize); 709 } 710 711 ret = qemu_rbd_connect(&cluster, &io_ctx, opts->location, false, keypairs, 712 password_secret, errp); 713 if (ret < 0) { 714 return ret; 715 } 716 717 ret = rbd_create(io_ctx, opts->location->image, opts->size, &obj_order); 718 if (ret < 0) { 719 error_setg_errno(errp, -ret, "error rbd create"); 720 goto out; 721 } 722 723 #ifdef LIBRBD_SUPPORTS_ENCRYPTION 724 if (opts->encrypt) { 725 rbd_image_t image; 726 727 ret = rbd_open(io_ctx, opts->location->image, &image, NULL); 728 if (ret < 0) { 729 error_setg_errno(errp, -ret, 730 "error opening image '%s' for encryption format", 731 opts->location->image); 732 goto out; 733 } 734 735 ret = qemu_rbd_encryption_format(image, opts->encrypt, errp); 736 rbd_close(image); 737 if (ret < 0) { 738 /* encryption format fail, try removing the image */ 739 rbd_remove(io_ctx, opts->location->image); 740 goto out; 741 } 742 } 743 #endif 744 745 ret = 0; 746 out: 747 rados_ioctx_destroy(io_ctx); 748 rados_shutdown(cluster); 749 return ret; 750 } 751 752 static int qemu_rbd_co_create(BlockdevCreateOptions *options, Error **errp) 753 { 754 return qemu_rbd_do_create(options, NULL, NULL, errp); 755 } 756 757 static int qemu_rbd_extract_encryption_create_options( 758 QemuOpts *opts, 759 RbdEncryptionCreateOptions **spec, 760 Error **errp) 761 { 762 QDict *opts_qdict; 763 QDict *encrypt_qdict; 764 Visitor *v; 765 int ret = 0; 766 767 opts_qdict = qemu_opts_to_qdict(opts, NULL); 768 qdict_extract_subqdict(opts_qdict, &encrypt_qdict, "encrypt."); 769 qobject_unref(opts_qdict); 770 if (!qdict_size(encrypt_qdict)) { 771 *spec = NULL; 772 goto exit; 773 } 774 775 /* Convert options into a QAPI object */ 776 v = qobject_input_visitor_new_flat_confused(encrypt_qdict, errp); 777 if (!v) { 778 ret = -EINVAL; 779 goto exit; 780 } 781 782 visit_type_RbdEncryptionCreateOptions(v, NULL, spec, errp); 783 visit_free(v); 784 if (!*spec) { 785 ret = -EINVAL; 786 goto exit; 787 } 788 789 exit: 790 qobject_unref(encrypt_qdict); 791 return ret; 792 } 793 794 static int coroutine_fn qemu_rbd_co_create_opts(BlockDriver *drv, 795 const char *filename, 796 QemuOpts *opts, 797 Error **errp) 798 { 799 BlockdevCreateOptions *create_options; 800 BlockdevCreateOptionsRbd *rbd_opts; 801 BlockdevOptionsRbd *loc; 802 RbdEncryptionCreateOptions *encrypt = NULL; 803 Error *local_err = NULL; 804 const char *keypairs, *password_secret; 805 QDict *options = NULL; 806 int ret = 0; 807 808 create_options = g_new0(BlockdevCreateOptions, 1); 809 create_options->driver = BLOCKDEV_DRIVER_RBD; 810 rbd_opts = &create_options->u.rbd; 811 812 rbd_opts->location = g_new0(BlockdevOptionsRbd, 1); 813 814 password_secret = qemu_opt_get(opts, "password-secret"); 815 816 /* Read out options */ 817 rbd_opts->size = ROUND_UP(qemu_opt_get_size_del(opts, BLOCK_OPT_SIZE, 0), 818 BDRV_SECTOR_SIZE); 819 rbd_opts->cluster_size = qemu_opt_get_size_del(opts, 820 BLOCK_OPT_CLUSTER_SIZE, 0); 821 rbd_opts->has_cluster_size = (rbd_opts->cluster_size != 0); 822 823 options = qdict_new(); 824 qemu_rbd_parse_filename(filename, options, &local_err); 825 if (local_err) { 826 ret = -EINVAL; 827 error_propagate(errp, local_err); 828 goto exit; 829 } 830 831 ret = qemu_rbd_extract_encryption_create_options(opts, &encrypt, errp); 832 if (ret < 0) { 833 goto exit; 834 } 835 rbd_opts->encrypt = encrypt; 836 837 /* 838 * Caution: while qdict_get_try_str() is fine, getting non-string 839 * types would require more care. When @options come from -blockdev 840 * or blockdev_add, its members are typed according to the QAPI 841 * schema, but when they come from -drive, they're all QString. 842 */ 843 loc = rbd_opts->location; 844 loc->pool = g_strdup(qdict_get_try_str(options, "pool")); 845 loc->conf = g_strdup(qdict_get_try_str(options, "conf")); 846 loc->user = g_strdup(qdict_get_try_str(options, "user")); 847 loc->q_namespace = g_strdup(qdict_get_try_str(options, "namespace")); 848 loc->image = g_strdup(qdict_get_try_str(options, "image")); 849 keypairs = qdict_get_try_str(options, "=keyvalue-pairs"); 850 851 ret = qemu_rbd_do_create(create_options, keypairs, password_secret, errp); 852 if (ret < 0) { 853 goto exit; 854 } 855 856 exit: 857 qobject_unref(options); 858 qapi_free_BlockdevCreateOptions(create_options); 859 return ret; 860 } 861 862 static char *qemu_rbd_mon_host(BlockdevOptionsRbd *opts, Error **errp) 863 { 864 const char **vals; 865 const char *host, *port; 866 char *rados_str; 867 InetSocketAddressBaseList *p; 868 int i, cnt; 869 870 if (!opts->has_server) { 871 return NULL; 872 } 873 874 for (cnt = 0, p = opts->server; p; p = p->next) { 875 cnt++; 876 } 877 878 vals = g_new(const char *, cnt + 1); 879 880 for (i = 0, p = opts->server; p; p = p->next, i++) { 881 host = p->value->host; 882 port = p->value->port; 883 884 if (strchr(host, ':')) { 885 vals[i] = g_strdup_printf("[%s]:%s", host, port); 886 } else { 887 vals[i] = g_strdup_printf("%s:%s", host, port); 888 } 889 } 890 vals[i] = NULL; 891 892 rados_str = i ? g_strjoinv(";", (char **)vals) : NULL; 893 g_strfreev((char **)vals); 894 return rados_str; 895 } 896 897 static int qemu_rbd_connect(rados_t *cluster, rados_ioctx_t *io_ctx, 898 BlockdevOptionsRbd *opts, bool cache, 899 const char *keypairs, const char *secretid, 900 Error **errp) 901 { 902 char *mon_host = NULL; 903 Error *local_err = NULL; 904 int r; 905 906 if (secretid) { 907 if (opts->key_secret) { 908 error_setg(errp, 909 "Legacy 'password-secret' clashes with 'key-secret'"); 910 return -EINVAL; 911 } 912 opts->key_secret = g_strdup(secretid); 913 } 914 915 mon_host = qemu_rbd_mon_host(opts, &local_err); 916 if (local_err) { 917 error_propagate(errp, local_err); 918 r = -EINVAL; 919 goto out; 920 } 921 922 r = rados_create(cluster, opts->user); 923 if (r < 0) { 924 error_setg_errno(errp, -r, "error initializing"); 925 goto out; 926 } 927 928 /* try default location when conf=NULL, but ignore failure */ 929 r = rados_conf_read_file(*cluster, opts->conf); 930 if (opts->conf && r < 0) { 931 error_setg_errno(errp, -r, "error reading conf file %s", opts->conf); 932 goto failed_shutdown; 933 } 934 935 r = qemu_rbd_set_keypairs(*cluster, keypairs, errp); 936 if (r < 0) { 937 goto failed_shutdown; 938 } 939 940 if (mon_host) { 941 r = rados_conf_set(*cluster, "mon_host", mon_host); 942 if (r < 0) { 943 goto failed_shutdown; 944 } 945 } 946 947 r = qemu_rbd_set_auth(*cluster, opts, errp); 948 if (r < 0) { 949 goto failed_shutdown; 950 } 951 952 /* 953 * Fallback to more conservative semantics if setting cache 954 * options fails. Ignore errors from setting rbd_cache because the 955 * only possible error is that the option does not exist, and 956 * librbd defaults to no caching. If write through caching cannot 957 * be set up, fall back to no caching. 958 */ 959 if (cache) { 960 rados_conf_set(*cluster, "rbd_cache", "true"); 961 } else { 962 rados_conf_set(*cluster, "rbd_cache", "false"); 963 } 964 965 r = rados_connect(*cluster); 966 if (r < 0) { 967 error_setg_errno(errp, -r, "error connecting"); 968 goto failed_shutdown; 969 } 970 971 r = rados_ioctx_create(*cluster, opts->pool, io_ctx); 972 if (r < 0) { 973 error_setg_errno(errp, -r, "error opening pool %s", opts->pool); 974 goto failed_shutdown; 975 } 976 977 #ifdef HAVE_RBD_NAMESPACE_EXISTS 978 if (opts->q_namespace && strlen(opts->q_namespace) > 0) { 979 bool exists; 980 981 r = rbd_namespace_exists(*io_ctx, opts->q_namespace, &exists); 982 if (r < 0) { 983 error_setg_errno(errp, -r, "error checking namespace"); 984 goto failed_ioctx_destroy; 985 } 986 987 if (!exists) { 988 error_setg(errp, "namespace '%s' does not exist", 989 opts->q_namespace); 990 r = -ENOENT; 991 goto failed_ioctx_destroy; 992 } 993 } 994 #endif 995 996 /* 997 * Set the namespace after opening the io context on the pool, 998 * if nspace == NULL or if nspace == "", it is just as we did nothing 999 */ 1000 rados_ioctx_set_namespace(*io_ctx, opts->q_namespace); 1001 1002 r = 0; 1003 goto out; 1004 1005 #ifdef HAVE_RBD_NAMESPACE_EXISTS 1006 failed_ioctx_destroy: 1007 rados_ioctx_destroy(*io_ctx); 1008 #endif 1009 failed_shutdown: 1010 rados_shutdown(*cluster); 1011 out: 1012 g_free(mon_host); 1013 return r; 1014 } 1015 1016 static int qemu_rbd_convert_options(QDict *options, BlockdevOptionsRbd **opts, 1017 Error **errp) 1018 { 1019 Visitor *v; 1020 1021 /* Convert the remaining options into a QAPI object */ 1022 v = qobject_input_visitor_new_flat_confused(options, errp); 1023 if (!v) { 1024 return -EINVAL; 1025 } 1026 1027 visit_type_BlockdevOptionsRbd(v, NULL, opts, errp); 1028 visit_free(v); 1029 if (!opts) { 1030 return -EINVAL; 1031 } 1032 1033 return 0; 1034 } 1035 1036 static int qemu_rbd_attempt_legacy_options(QDict *options, 1037 BlockdevOptionsRbd **opts, 1038 char **keypairs) 1039 { 1040 char *filename; 1041 int r; 1042 1043 filename = g_strdup(qdict_get_try_str(options, "filename")); 1044 if (!filename) { 1045 return -EINVAL; 1046 } 1047 qdict_del(options, "filename"); 1048 1049 qemu_rbd_parse_filename(filename, options, NULL); 1050 1051 /* keypairs freed by caller */ 1052 *keypairs = g_strdup(qdict_get_try_str(options, "=keyvalue-pairs")); 1053 if (*keypairs) { 1054 qdict_del(options, "=keyvalue-pairs"); 1055 } 1056 1057 r = qemu_rbd_convert_options(options, opts, NULL); 1058 1059 g_free(filename); 1060 return r; 1061 } 1062 1063 static int qemu_rbd_open(BlockDriverState *bs, QDict *options, int flags, 1064 Error **errp) 1065 { 1066 BDRVRBDState *s = bs->opaque; 1067 BlockdevOptionsRbd *opts = NULL; 1068 const QDictEntry *e; 1069 Error *local_err = NULL; 1070 char *keypairs, *secretid; 1071 rbd_image_info_t info; 1072 int r; 1073 1074 keypairs = g_strdup(qdict_get_try_str(options, "=keyvalue-pairs")); 1075 if (keypairs) { 1076 qdict_del(options, "=keyvalue-pairs"); 1077 } 1078 1079 secretid = g_strdup(qdict_get_try_str(options, "password-secret")); 1080 if (secretid) { 1081 qdict_del(options, "password-secret"); 1082 } 1083 1084 r = qemu_rbd_convert_options(options, &opts, &local_err); 1085 if (local_err) { 1086 /* If keypairs are present, that means some options are present in 1087 * the modern option format. Don't attempt to parse legacy option 1088 * formats, as we won't support mixed usage. */ 1089 if (keypairs) { 1090 error_propagate(errp, local_err); 1091 goto out; 1092 } 1093 1094 /* If the initial attempt to convert and process the options failed, 1095 * we may be attempting to open an image file that has the rbd options 1096 * specified in the older format consisting of all key/value pairs 1097 * encoded in the filename. Go ahead and attempt to parse the 1098 * filename, and see if we can pull out the required options. */ 1099 r = qemu_rbd_attempt_legacy_options(options, &opts, &keypairs); 1100 if (r < 0) { 1101 /* Propagate the original error, not the legacy parsing fallback 1102 * error, as the latter was just a best-effort attempt. */ 1103 error_propagate(errp, local_err); 1104 goto out; 1105 } 1106 /* Take care whenever deciding to actually deprecate; once this ability 1107 * is removed, we will not be able to open any images with legacy-styled 1108 * backing image strings. */ 1109 warn_report("RBD options encoded in the filename as keyvalue pairs " 1110 "is deprecated"); 1111 } 1112 1113 /* Remove the processed options from the QDict (the visitor processes 1114 * _all_ options in the QDict) */ 1115 while ((e = qdict_first(options))) { 1116 qdict_del(options, e->key); 1117 } 1118 1119 r = qemu_rbd_connect(&s->cluster, &s->io_ctx, opts, 1120 !(flags & BDRV_O_NOCACHE), keypairs, secretid, errp); 1121 if (r < 0) { 1122 goto out; 1123 } 1124 1125 s->snap = g_strdup(opts->snapshot); 1126 s->image_name = g_strdup(opts->image); 1127 1128 /* rbd_open is always r/w */ 1129 r = rbd_open(s->io_ctx, s->image_name, &s->image, s->snap); 1130 if (r < 0) { 1131 error_setg_errno(errp, -r, "error reading header from %s", 1132 s->image_name); 1133 goto failed_open; 1134 } 1135 1136 if (opts->encrypt) { 1137 #ifdef LIBRBD_SUPPORTS_ENCRYPTION 1138 if (opts->encrypt->parent) { 1139 #ifdef LIBRBD_SUPPORTS_ENCRYPTION_LOAD2 1140 r = qemu_rbd_encryption_load2(s->image, opts->encrypt, errp); 1141 #else 1142 r = -ENOTSUP; 1143 error_setg(errp, "RBD library does not support layered encryption"); 1144 #endif 1145 } else { 1146 r = qemu_rbd_encryption_load(s->image, opts->encrypt, errp); 1147 } 1148 if (r < 0) { 1149 goto failed_post_open; 1150 } 1151 #else 1152 r = -ENOTSUP; 1153 error_setg(errp, "RBD library does not support image encryption"); 1154 goto failed_post_open; 1155 #endif 1156 } 1157 1158 r = rbd_stat(s->image, &info, sizeof(info)); 1159 if (r < 0) { 1160 error_setg_errno(errp, -r, "error getting image info from %s", 1161 s->image_name); 1162 goto failed_post_open; 1163 } 1164 s->image_size = info.size; 1165 s->object_size = info.obj_size; 1166 1167 /* If we are using an rbd snapshot, we must be r/o, otherwise 1168 * leave as-is */ 1169 if (s->snap != NULL) { 1170 bdrv_graph_rdlock_main_loop(); 1171 r = bdrv_apply_auto_read_only(bs, "rbd snapshots are read-only", errp); 1172 bdrv_graph_rdunlock_main_loop(); 1173 if (r < 0) { 1174 goto failed_post_open; 1175 } 1176 } 1177 1178 #ifdef LIBRBD_SUPPORTS_WRITE_ZEROES 1179 bs->supported_zero_flags = BDRV_REQ_MAY_UNMAP | BDRV_REQ_NO_FALLBACK; 1180 #endif 1181 1182 /* When extending regular files, we get zeros from the OS */ 1183 bs->supported_truncate_flags = BDRV_REQ_ZERO_WRITE; 1184 1185 r = 0; 1186 goto out; 1187 1188 failed_post_open: 1189 rbd_close(s->image); 1190 failed_open: 1191 rados_ioctx_destroy(s->io_ctx); 1192 g_free(s->snap); 1193 g_free(s->image_name); 1194 rados_shutdown(s->cluster); 1195 out: 1196 qapi_free_BlockdevOptionsRbd(opts); 1197 g_free(keypairs); 1198 g_free(secretid); 1199 return r; 1200 } 1201 1202 1203 /* Since RBD is currently always opened R/W via the API, 1204 * we just need to check if we are using a snapshot or not, in 1205 * order to determine if we will allow it to be R/W */ 1206 static int qemu_rbd_reopen_prepare(BDRVReopenState *state, 1207 BlockReopenQueue *queue, Error **errp) 1208 { 1209 BDRVRBDState *s = state->bs->opaque; 1210 int ret = 0; 1211 1212 GRAPH_RDLOCK_GUARD_MAINLOOP(); 1213 1214 if (s->snap && state->flags & BDRV_O_RDWR) { 1215 error_setg(errp, 1216 "Cannot change node '%s' to r/w when using RBD snapshot", 1217 bdrv_get_device_or_node_name(state->bs)); 1218 ret = -EINVAL; 1219 } 1220 1221 return ret; 1222 } 1223 1224 static void qemu_rbd_close(BlockDriverState *bs) 1225 { 1226 BDRVRBDState *s = bs->opaque; 1227 1228 rbd_close(s->image); 1229 rados_ioctx_destroy(s->io_ctx); 1230 g_free(s->snap); 1231 g_free(s->image_name); 1232 rados_shutdown(s->cluster); 1233 } 1234 1235 /* Resize the RBD image and update the 'image_size' with the current size */ 1236 static int qemu_rbd_resize(BlockDriverState *bs, uint64_t size) 1237 { 1238 BDRVRBDState *s = bs->opaque; 1239 int r; 1240 1241 r = rbd_resize(s->image, size); 1242 if (r < 0) { 1243 return r; 1244 } 1245 1246 s->image_size = size; 1247 1248 return 0; 1249 } 1250 1251 static void qemu_rbd_finish_bh(void *opaque) 1252 { 1253 RBDTask *task = opaque; 1254 task->complete = true; 1255 aio_co_wake(task->co); 1256 } 1257 1258 /* 1259 * This is the completion callback function for all rbd aio calls 1260 * started from qemu_rbd_start_co(). 1261 * 1262 * Note: this function is being called from a non qemu thread so 1263 * we need to be careful about what we do here. Generally we only 1264 * schedule a BH, and do the rest of the io completion handling 1265 * from qemu_rbd_finish_bh() which runs in a qemu context. 1266 */ 1267 static void qemu_rbd_completion_cb(rbd_completion_t c, RBDTask *task) 1268 { 1269 task->ret = rbd_aio_get_return_value(c); 1270 rbd_aio_release(c); 1271 aio_bh_schedule_oneshot(bdrv_get_aio_context(task->bs), 1272 qemu_rbd_finish_bh, task); 1273 } 1274 1275 static int coroutine_fn qemu_rbd_start_co(BlockDriverState *bs, 1276 uint64_t offset, 1277 uint64_t bytes, 1278 QEMUIOVector *qiov, 1279 int flags, 1280 RBDAIOCmd cmd) 1281 { 1282 BDRVRBDState *s = bs->opaque; 1283 RBDTask task = { .bs = bs, .co = qemu_coroutine_self() }; 1284 rbd_completion_t c; 1285 int r; 1286 1287 assert(!qiov || qiov->size == bytes); 1288 1289 if (cmd == RBD_AIO_WRITE || cmd == RBD_AIO_WRITE_ZEROES) { 1290 /* 1291 * RBD APIs don't allow us to write more than actual size, so in order 1292 * to support growing images, we resize the image before write 1293 * operations that exceed the current size. 1294 */ 1295 if (offset + bytes > s->image_size) { 1296 r = qemu_rbd_resize(bs, offset + bytes); 1297 if (r < 0) { 1298 return r; 1299 } 1300 } 1301 } 1302 1303 r = rbd_aio_create_completion(&task, 1304 (rbd_callback_t) qemu_rbd_completion_cb, &c); 1305 if (r < 0) { 1306 return r; 1307 } 1308 1309 switch (cmd) { 1310 case RBD_AIO_READ: 1311 r = rbd_aio_readv(s->image, qiov->iov, qiov->niov, offset, c); 1312 break; 1313 case RBD_AIO_WRITE: 1314 r = rbd_aio_writev(s->image, qiov->iov, qiov->niov, offset, c); 1315 break; 1316 case RBD_AIO_DISCARD: 1317 r = rbd_aio_discard(s->image, offset, bytes, c); 1318 break; 1319 case RBD_AIO_FLUSH: 1320 r = rbd_aio_flush(s->image, c); 1321 break; 1322 #ifdef LIBRBD_SUPPORTS_WRITE_ZEROES 1323 case RBD_AIO_WRITE_ZEROES: { 1324 int zero_flags = 0; 1325 #ifdef RBD_WRITE_ZEROES_FLAG_THICK_PROVISION 1326 if (!(flags & BDRV_REQ_MAY_UNMAP)) { 1327 zero_flags = RBD_WRITE_ZEROES_FLAG_THICK_PROVISION; 1328 } 1329 #endif 1330 r = rbd_aio_write_zeroes(s->image, offset, bytes, c, zero_flags, 0); 1331 break; 1332 } 1333 #endif 1334 default: 1335 r = -EINVAL; 1336 } 1337 1338 if (r < 0) { 1339 error_report("rbd request failed early: cmd %d offset %" PRIu64 1340 " bytes %" PRIu64 " flags %d r %d (%s)", cmd, offset, 1341 bytes, flags, r, strerror(-r)); 1342 rbd_aio_release(c); 1343 return r; 1344 } 1345 1346 while (!task.complete) { 1347 qemu_coroutine_yield(); 1348 } 1349 1350 if (task.ret < 0) { 1351 error_report("rbd request failed: cmd %d offset %" PRIu64 " bytes %" 1352 PRIu64 " flags %d task.ret %" PRIi64 " (%s)", cmd, offset, 1353 bytes, flags, task.ret, strerror(-task.ret)); 1354 return task.ret; 1355 } 1356 1357 /* zero pad short reads */ 1358 if (cmd == RBD_AIO_READ && task.ret < qiov->size) { 1359 qemu_iovec_memset(qiov, task.ret, 0, qiov->size - task.ret); 1360 } 1361 1362 return 0; 1363 } 1364 1365 static int 1366 coroutine_fn qemu_rbd_co_preadv(BlockDriverState *bs, int64_t offset, 1367 int64_t bytes, QEMUIOVector *qiov, 1368 BdrvRequestFlags flags) 1369 { 1370 return qemu_rbd_start_co(bs, offset, bytes, qiov, flags, RBD_AIO_READ); 1371 } 1372 1373 static int 1374 coroutine_fn qemu_rbd_co_pwritev(BlockDriverState *bs, int64_t offset, 1375 int64_t bytes, QEMUIOVector *qiov, 1376 BdrvRequestFlags flags) 1377 { 1378 return qemu_rbd_start_co(bs, offset, bytes, qiov, flags, RBD_AIO_WRITE); 1379 } 1380 1381 static int coroutine_fn qemu_rbd_co_flush(BlockDriverState *bs) 1382 { 1383 return qemu_rbd_start_co(bs, 0, 0, NULL, 0, RBD_AIO_FLUSH); 1384 } 1385 1386 static int coroutine_fn qemu_rbd_co_pdiscard(BlockDriverState *bs, 1387 int64_t offset, int64_t bytes) 1388 { 1389 return qemu_rbd_start_co(bs, offset, bytes, NULL, 0, RBD_AIO_DISCARD); 1390 } 1391 1392 #ifdef LIBRBD_SUPPORTS_WRITE_ZEROES 1393 static int 1394 coroutine_fn qemu_rbd_co_pwrite_zeroes(BlockDriverState *bs, int64_t offset, 1395 int64_t bytes, BdrvRequestFlags flags) 1396 { 1397 return qemu_rbd_start_co(bs, offset, bytes, NULL, flags, 1398 RBD_AIO_WRITE_ZEROES); 1399 } 1400 #endif 1401 1402 static int coroutine_fn 1403 qemu_rbd_co_get_info(BlockDriverState *bs, BlockDriverInfo *bdi) 1404 { 1405 BDRVRBDState *s = bs->opaque; 1406 bdi->cluster_size = s->object_size; 1407 return 0; 1408 } 1409 1410 static ImageInfoSpecific *qemu_rbd_get_specific_info(BlockDriverState *bs, 1411 Error **errp) 1412 { 1413 BDRVRBDState *s = bs->opaque; 1414 ImageInfoSpecific *spec_info; 1415 char buf[RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN] = {0}; 1416 int r; 1417 1418 if (s->image_size >= RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN) { 1419 r = rbd_read(s->image, 0, 1420 RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN, buf); 1421 if (r < 0) { 1422 error_setg_errno(errp, -r, "cannot read image start for probe"); 1423 return NULL; 1424 } 1425 } 1426 1427 spec_info = g_new(ImageInfoSpecific, 1); 1428 *spec_info = (ImageInfoSpecific){ 1429 .type = IMAGE_INFO_SPECIFIC_KIND_RBD, 1430 .u.rbd.data = g_new0(ImageInfoSpecificRbd, 1), 1431 }; 1432 1433 if (memcmp(buf, rbd_luks_header_verification, 1434 RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN) == 0) { 1435 spec_info->u.rbd.data->encryption_format = 1436 RBD_IMAGE_ENCRYPTION_FORMAT_LUKS; 1437 spec_info->u.rbd.data->has_encryption_format = true; 1438 } else if (memcmp(buf, rbd_luks2_header_verification, 1439 RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN) == 0) { 1440 spec_info->u.rbd.data->encryption_format = 1441 RBD_IMAGE_ENCRYPTION_FORMAT_LUKS2; 1442 spec_info->u.rbd.data->has_encryption_format = true; 1443 } else if (memcmp(buf, rbd_layered_luks_header_verification, 1444 RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN) == 0) { 1445 spec_info->u.rbd.data->encryption_format = 1446 RBD_IMAGE_ENCRYPTION_FORMAT_LUKS; 1447 spec_info->u.rbd.data->has_encryption_format = true; 1448 } else if (memcmp(buf, rbd_layered_luks2_header_verification, 1449 RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN) == 0) { 1450 spec_info->u.rbd.data->encryption_format = 1451 RBD_IMAGE_ENCRYPTION_FORMAT_LUKS2; 1452 spec_info->u.rbd.data->has_encryption_format = true; 1453 } else { 1454 spec_info->u.rbd.data->has_encryption_format = false; 1455 } 1456 1457 return spec_info; 1458 } 1459 1460 /* 1461 * rbd_diff_iterate2 allows to interrupt the exection by returning a negative 1462 * value in the callback routine. Choose a value that does not conflict with 1463 * an existing exitcode and return it if we want to prematurely stop the 1464 * execution because we detected a change in the allocation status. 1465 */ 1466 #define QEMU_RBD_EXIT_DIFF_ITERATE2 -9000 1467 1468 static int qemu_rbd_diff_iterate_cb(uint64_t offs, size_t len, 1469 int exists, void *opaque) 1470 { 1471 RBDDiffIterateReq *req = opaque; 1472 1473 assert(req->offs + req->bytes <= offs); 1474 1475 /* treat a hole like an unallocated area and bail out */ 1476 if (!exists) { 1477 return 0; 1478 } 1479 1480 if (!req->exists && offs > req->offs) { 1481 /* 1482 * we started in an unallocated area and hit the first allocated 1483 * block. req->bytes must be set to the length of the unallocated area 1484 * before the allocated area. stop further processing. 1485 */ 1486 req->bytes = offs - req->offs; 1487 return QEMU_RBD_EXIT_DIFF_ITERATE2; 1488 } 1489 1490 if (req->exists && offs > req->offs + req->bytes) { 1491 /* 1492 * we started in an allocated area and jumped over an unallocated area, 1493 * req->bytes contains the length of the allocated area before the 1494 * unallocated area. stop further processing. 1495 */ 1496 return QEMU_RBD_EXIT_DIFF_ITERATE2; 1497 } 1498 1499 req->bytes += len; 1500 req->exists = true; 1501 1502 return 0; 1503 } 1504 1505 static int coroutine_fn qemu_rbd_co_block_status(BlockDriverState *bs, 1506 bool want_zero, int64_t offset, 1507 int64_t bytes, int64_t *pnum, 1508 int64_t *map, 1509 BlockDriverState **file) 1510 { 1511 BDRVRBDState *s = bs->opaque; 1512 int status, r; 1513 RBDDiffIterateReq req = { .offs = offset }; 1514 uint64_t features, flags; 1515 uint64_t head = 0; 1516 1517 assert(offset + bytes <= s->image_size); 1518 1519 /* default to all sectors allocated */ 1520 status = BDRV_BLOCK_DATA | BDRV_BLOCK_OFFSET_VALID; 1521 *map = offset; 1522 *file = bs; 1523 *pnum = bytes; 1524 1525 /* check if RBD image supports fast-diff */ 1526 r = rbd_get_features(s->image, &features); 1527 if (r < 0) { 1528 return status; 1529 } 1530 if (!(features & RBD_FEATURE_FAST_DIFF)) { 1531 return status; 1532 } 1533 1534 /* check if RBD fast-diff result is valid */ 1535 r = rbd_get_flags(s->image, &flags); 1536 if (r < 0) { 1537 return status; 1538 } 1539 if (flags & RBD_FLAG_FAST_DIFF_INVALID) { 1540 return status; 1541 } 1542 1543 #if LIBRBD_VERSION_CODE < LIBRBD_VERSION(1, 17, 0) 1544 /* 1545 * librbd had a bug until early 2022 that affected all versions of ceph that 1546 * supported fast-diff. This bug results in reporting of incorrect offsets 1547 * if the offset parameter to rbd_diff_iterate2 is not object aligned. 1548 * Work around this bug by rounding down the offset to object boundaries. 1549 * This is OK because we call rbd_diff_iterate2 with whole_object = true. 1550 * However, this workaround only works for non cloned images with default 1551 * striping. 1552 * 1553 * See: https://tracker.ceph.com/issues/53784 1554 */ 1555 1556 /* check if RBD image has non-default striping enabled */ 1557 if (features & RBD_FEATURE_STRIPINGV2) { 1558 return status; 1559 } 1560 1561 #pragma GCC diagnostic push 1562 #pragma GCC diagnostic ignored "-Wdeprecated-declarations" 1563 /* 1564 * check if RBD image is a clone (= has a parent). 1565 * 1566 * rbd_get_parent_info is deprecated from Nautilus onwards, but the 1567 * replacement rbd_get_parent is not present in Luminous and Mimic. 1568 */ 1569 if (rbd_get_parent_info(s->image, NULL, 0, NULL, 0, NULL, 0) != -ENOENT) { 1570 return status; 1571 } 1572 #pragma GCC diagnostic pop 1573 1574 head = req.offs & (s->object_size - 1); 1575 req.offs -= head; 1576 bytes += head; 1577 #endif 1578 1579 r = rbd_diff_iterate2(s->image, NULL, req.offs, bytes, true, true, 1580 qemu_rbd_diff_iterate_cb, &req); 1581 if (r < 0 && r != QEMU_RBD_EXIT_DIFF_ITERATE2) { 1582 return status; 1583 } 1584 assert(req.bytes <= bytes); 1585 if (!req.exists) { 1586 if (r == 0) { 1587 /* 1588 * rbd_diff_iterate2 does not invoke callbacks for unallocated 1589 * areas. This here catches the case where no callback was 1590 * invoked at all (req.bytes == 0). 1591 */ 1592 assert(req.bytes == 0); 1593 req.bytes = bytes; 1594 } 1595 status = BDRV_BLOCK_ZERO | BDRV_BLOCK_OFFSET_VALID; 1596 } 1597 1598 assert(req.bytes > head); 1599 *pnum = req.bytes - head; 1600 return status; 1601 } 1602 1603 static int64_t coroutine_fn qemu_rbd_co_getlength(BlockDriverState *bs) 1604 { 1605 BDRVRBDState *s = bs->opaque; 1606 int r; 1607 1608 r = rbd_get_size(s->image, &s->image_size); 1609 if (r < 0) { 1610 return r; 1611 } 1612 1613 return s->image_size; 1614 } 1615 1616 static int coroutine_fn qemu_rbd_co_truncate(BlockDriverState *bs, 1617 int64_t offset, 1618 bool exact, 1619 PreallocMode prealloc, 1620 BdrvRequestFlags flags, 1621 Error **errp) 1622 { 1623 int r; 1624 1625 if (prealloc != PREALLOC_MODE_OFF) { 1626 error_setg(errp, "Unsupported preallocation mode '%s'", 1627 PreallocMode_str(prealloc)); 1628 return -ENOTSUP; 1629 } 1630 1631 r = qemu_rbd_resize(bs, offset); 1632 if (r < 0) { 1633 error_setg_errno(errp, -r, "Failed to resize file"); 1634 return r; 1635 } 1636 1637 return 0; 1638 } 1639 1640 static int qemu_rbd_snap_create(BlockDriverState *bs, 1641 QEMUSnapshotInfo *sn_info) 1642 { 1643 BDRVRBDState *s = bs->opaque; 1644 int r; 1645 1646 if (sn_info->name[0] == '\0') { 1647 return -EINVAL; /* we need a name for rbd snapshots */ 1648 } 1649 1650 /* 1651 * rbd snapshots are using the name as the user controlled unique identifier 1652 * we can't use the rbd snapid for that purpose, as it can't be set 1653 */ 1654 if (sn_info->id_str[0] != '\0' && 1655 strcmp(sn_info->id_str, sn_info->name) != 0) { 1656 return -EINVAL; 1657 } 1658 1659 if (strlen(sn_info->name) >= sizeof(sn_info->id_str)) { 1660 return -ERANGE; 1661 } 1662 1663 r = rbd_snap_create(s->image, sn_info->name); 1664 if (r < 0) { 1665 error_report("failed to create snap: %s", strerror(-r)); 1666 return r; 1667 } 1668 1669 return 0; 1670 } 1671 1672 static int qemu_rbd_snap_remove(BlockDriverState *bs, 1673 const char *snapshot_id, 1674 const char *snapshot_name, 1675 Error **errp) 1676 { 1677 BDRVRBDState *s = bs->opaque; 1678 int r; 1679 1680 if (!snapshot_name) { 1681 error_setg(errp, "rbd need a valid snapshot name"); 1682 return -EINVAL; 1683 } 1684 1685 /* If snapshot_id is specified, it must be equal to name, see 1686 qemu_rbd_snap_list() */ 1687 if (snapshot_id && strcmp(snapshot_id, snapshot_name)) { 1688 error_setg(errp, 1689 "rbd do not support snapshot id, it should be NULL or " 1690 "equal to snapshot name"); 1691 return -EINVAL; 1692 } 1693 1694 r = rbd_snap_remove(s->image, snapshot_name); 1695 if (r < 0) { 1696 error_setg_errno(errp, -r, "Failed to remove the snapshot"); 1697 } 1698 return r; 1699 } 1700 1701 static int qemu_rbd_snap_rollback(BlockDriverState *bs, 1702 const char *snapshot_name) 1703 { 1704 BDRVRBDState *s = bs->opaque; 1705 1706 return rbd_snap_rollback(s->image, snapshot_name); 1707 } 1708 1709 static int qemu_rbd_snap_list(BlockDriverState *bs, 1710 QEMUSnapshotInfo **psn_tab) 1711 { 1712 BDRVRBDState *s = bs->opaque; 1713 QEMUSnapshotInfo *sn_info, *sn_tab = NULL; 1714 int i, snap_count; 1715 rbd_snap_info_t *snaps; 1716 int max_snaps = RBD_MAX_SNAPS; 1717 1718 do { 1719 snaps = g_new(rbd_snap_info_t, max_snaps); 1720 snap_count = rbd_snap_list(s->image, snaps, &max_snaps); 1721 if (snap_count <= 0) { 1722 g_free(snaps); 1723 } 1724 } while (snap_count == -ERANGE); 1725 1726 if (snap_count <= 0) { 1727 goto done; 1728 } 1729 1730 sn_tab = g_new0(QEMUSnapshotInfo, snap_count); 1731 1732 for (i = 0; i < snap_count; i++) { 1733 const char *snap_name = snaps[i].name; 1734 1735 sn_info = sn_tab + i; 1736 pstrcpy(sn_info->id_str, sizeof(sn_info->id_str), snap_name); 1737 pstrcpy(sn_info->name, sizeof(sn_info->name), snap_name); 1738 1739 sn_info->vm_state_size = snaps[i].size; 1740 sn_info->date_sec = 0; 1741 sn_info->date_nsec = 0; 1742 sn_info->vm_clock_nsec = 0; 1743 } 1744 rbd_snap_list_end(snaps); 1745 g_free(snaps); 1746 1747 done: 1748 *psn_tab = sn_tab; 1749 return snap_count; 1750 } 1751 1752 static void coroutine_fn qemu_rbd_co_invalidate_cache(BlockDriverState *bs, 1753 Error **errp) 1754 { 1755 BDRVRBDState *s = bs->opaque; 1756 int r = rbd_invalidate_cache(s->image); 1757 if (r < 0) { 1758 error_setg_errno(errp, -r, "Failed to invalidate the cache"); 1759 } 1760 } 1761 1762 static QemuOptsList qemu_rbd_create_opts = { 1763 .name = "rbd-create-opts", 1764 .head = QTAILQ_HEAD_INITIALIZER(qemu_rbd_create_opts.head), 1765 .desc = { 1766 { 1767 .name = BLOCK_OPT_SIZE, 1768 .type = QEMU_OPT_SIZE, 1769 .help = "Virtual disk size" 1770 }, 1771 { 1772 .name = BLOCK_OPT_CLUSTER_SIZE, 1773 .type = QEMU_OPT_SIZE, 1774 .help = "RBD object size" 1775 }, 1776 { 1777 .name = "password-secret", 1778 .type = QEMU_OPT_STRING, 1779 .help = "ID of secret providing the password", 1780 }, 1781 { 1782 .name = "encrypt.format", 1783 .type = QEMU_OPT_STRING, 1784 .help = "Encrypt the image, format choices: 'luks', 'luks2'", 1785 }, 1786 { 1787 .name = "encrypt.cipher-alg", 1788 .type = QEMU_OPT_STRING, 1789 .help = "Name of encryption cipher algorithm" 1790 " (allowed values: aes-128, aes-256)", 1791 }, 1792 { 1793 .name = "encrypt.key-secret", 1794 .type = QEMU_OPT_STRING, 1795 .help = "ID of secret providing LUKS passphrase", 1796 }, 1797 { /* end of list */ } 1798 } 1799 }; 1800 1801 static const char *const qemu_rbd_strong_runtime_opts[] = { 1802 "pool", 1803 "namespace", 1804 "image", 1805 "conf", 1806 "snapshot", 1807 "user", 1808 "server.", 1809 "password-secret", 1810 1811 NULL 1812 }; 1813 1814 static BlockDriver bdrv_rbd = { 1815 .format_name = "rbd", 1816 .instance_size = sizeof(BDRVRBDState), 1817 1818 .bdrv_parse_filename = qemu_rbd_parse_filename, 1819 .bdrv_open = qemu_rbd_open, 1820 .bdrv_close = qemu_rbd_close, 1821 .bdrv_reopen_prepare = qemu_rbd_reopen_prepare, 1822 .bdrv_co_create = qemu_rbd_co_create, 1823 .bdrv_co_create_opts = qemu_rbd_co_create_opts, 1824 .bdrv_has_zero_init = bdrv_has_zero_init_1, 1825 .bdrv_co_get_info = qemu_rbd_co_get_info, 1826 .bdrv_get_specific_info = qemu_rbd_get_specific_info, 1827 .create_opts = &qemu_rbd_create_opts, 1828 .bdrv_co_getlength = qemu_rbd_co_getlength, 1829 .bdrv_co_truncate = qemu_rbd_co_truncate, 1830 .protocol_name = "rbd", 1831 1832 .bdrv_co_preadv = qemu_rbd_co_preadv, 1833 .bdrv_co_pwritev = qemu_rbd_co_pwritev, 1834 .bdrv_co_flush_to_disk = qemu_rbd_co_flush, 1835 .bdrv_co_pdiscard = qemu_rbd_co_pdiscard, 1836 #ifdef LIBRBD_SUPPORTS_WRITE_ZEROES 1837 .bdrv_co_pwrite_zeroes = qemu_rbd_co_pwrite_zeroes, 1838 #endif 1839 .bdrv_co_block_status = qemu_rbd_co_block_status, 1840 1841 .bdrv_snapshot_create = qemu_rbd_snap_create, 1842 .bdrv_snapshot_delete = qemu_rbd_snap_remove, 1843 .bdrv_snapshot_list = qemu_rbd_snap_list, 1844 .bdrv_snapshot_goto = qemu_rbd_snap_rollback, 1845 .bdrv_co_invalidate_cache = qemu_rbd_co_invalidate_cache, 1846 1847 .strong_runtime_opts = qemu_rbd_strong_runtime_opts, 1848 }; 1849 1850 static void bdrv_rbd_init(void) 1851 { 1852 bdrv_register(&bdrv_rbd); 1853 } 1854 1855 block_init(bdrv_rbd_init); 1856