xref: /qemu/block/rbd.c (revision 599f2762ed8c86a6eea03b9f91d49d14a874a95c)
1 /*
2  * QEMU Block driver for RADOS (Ceph)
3  *
4  * Copyright (C) 2010-2011 Christian Brunner <chb@muc.de>,
5  *                         Josh Durgin <josh.durgin@dreamhost.com>
6  *
7  * This work is licensed under the terms of the GNU GPL, version 2.  See
8  * the COPYING file in the top-level directory.
9  *
10  * Contributions after 2012-01-13 are licensed under the terms of the
11  * GNU GPL, version 2 or (at your option) any later version.
12  */
13 
14 #include "qemu/osdep.h"
15 
16 #include <rbd/librbd.h>
17 #include "qapi/error.h"
18 #include "qemu/error-report.h"
19 #include "qemu/module.h"
20 #include "qemu/option.h"
21 #include "block/block-io.h"
22 #include "block/block_int.h"
23 #include "block/qdict.h"
24 #include "crypto/secret.h"
25 #include "qemu/cutils.h"
26 #include "system/replay.h"
27 #include "qobject/qstring.h"
28 #include "qobject/qdict.h"
29 #include "qobject/qjson.h"
30 #include "qobject/qlist.h"
31 #include "qapi/qobject-input-visitor.h"
32 #include "qapi/qapi-visit-block-core.h"
33 
34 /*
35  * When specifying the image filename use:
36  *
37  * rbd:poolname/devicename[@snapshotname][:option1=value1[:option2=value2...]]
38  *
39  * poolname must be the name of an existing rados pool.
40  *
41  * devicename is the name of the rbd image.
42  *
43  * Each option given is used to configure rados, and may be any valid
44  * Ceph option, "id", or "conf".
45  *
46  * The "id" option indicates what user we should authenticate as to
47  * the Ceph cluster.  If it is excluded we will use the Ceph default
48  * (normally 'admin').
49  *
50  * The "conf" option specifies a Ceph configuration file to read.  If
51  * it is not specified, we will read from the default Ceph locations
52  * (e.g., /etc/ceph/ceph.conf).  To avoid reading _any_ configuration
53  * file, specify conf=/dev/null.
54  *
55  * Configuration values containing :, @, or = can be escaped with a
56  * leading "\".
57  */
58 
59 #define OBJ_MAX_SIZE (1UL << OBJ_DEFAULT_OBJ_ORDER)
60 
61 #define RBD_MAX_SNAPS 100
62 
63 #define RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN 8
64 
65 static const char rbd_luks_header_verification[
66         RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN] = {
67     'L', 'U', 'K', 'S', 0xBA, 0xBE, 0, 1
68 };
69 
70 static const char rbd_luks2_header_verification[
71         RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN] = {
72     'L', 'U', 'K', 'S', 0xBA, 0xBE, 0, 2
73 };
74 
75 static const char rbd_layered_luks_header_verification[
76         RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN] = {
77     'R', 'B', 'D', 'L', 0xBA, 0xBE, 0, 1
78 };
79 
80 static const char rbd_layered_luks2_header_verification[
81         RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN] = {
82     'R', 'B', 'D', 'L', 0xBA, 0xBE, 0, 2
83 };
84 
85 typedef enum {
86     RBD_AIO_READ,
87     RBD_AIO_WRITE,
88     RBD_AIO_DISCARD,
89     RBD_AIO_FLUSH,
90     RBD_AIO_WRITE_ZEROES
91 } RBDAIOCmd;
92 
93 typedef struct BDRVRBDState {
94     rados_t cluster;
95     rados_ioctx_t io_ctx;
96     rbd_image_t image;
97     char *image_name;
98     char *snap;
99     char *namespace;
100     uint64_t image_size;
101     uint64_t object_size;
102 } BDRVRBDState;
103 
104 typedef struct RBDTask {
105     BlockDriverState *bs;
106     Coroutine *co;
107     bool complete;
108     int64_t ret;
109 } RBDTask;
110 
111 typedef struct RBDDiffIterateReq {
112     uint64_t offs;
113     uint64_t bytes;
114     bool exists;
115 } RBDDiffIterateReq;
116 
117 static int qemu_rbd_connect(rados_t *cluster, rados_ioctx_t *io_ctx,
118                             BlockdevOptionsRbd *opts, bool cache,
119                             const char *keypairs, const char *secretid,
120                             Error **errp);
121 
qemu_rbd_strchr(char * src,char delim)122 static char *qemu_rbd_strchr(char *src, char delim)
123 {
124     char *p;
125 
126     for (p = src; *p; ++p) {
127         if (*p == delim) {
128             return p;
129         }
130         if (*p == '\\' && p[1] != '\0') {
131             ++p;
132         }
133     }
134 
135     return NULL;
136 }
137 
138 
qemu_rbd_next_tok(char * src,char delim,char ** p)139 static char *qemu_rbd_next_tok(char *src, char delim, char **p)
140 {
141     char *end;
142 
143     *p = NULL;
144 
145     end = qemu_rbd_strchr(src, delim);
146     if (end) {
147         *p = end + 1;
148         *end = '\0';
149     }
150     return src;
151 }
152 
qemu_rbd_unescape(char * src)153 static void qemu_rbd_unescape(char *src)
154 {
155     char *p;
156 
157     for (p = src; *src; ++src, ++p) {
158         if (*src == '\\' && src[1] != '\0') {
159             src++;
160         }
161         *p = *src;
162     }
163     *p = '\0';
164 }
165 
qemu_rbd_parse_filename(const char * filename,QDict * options,Error ** errp)166 static void qemu_rbd_parse_filename(const char *filename, QDict *options,
167                                     Error **errp)
168 {
169     const char *start;
170     char *p, *buf;
171     QList *keypairs = NULL;
172     char *found_str, *image_name;
173 
174     if (!strstart(filename, "rbd:", &start)) {
175         error_setg(errp, "File name must start with 'rbd:'");
176         return;
177     }
178 
179     buf = g_strdup(start);
180     p = buf;
181 
182     found_str = qemu_rbd_next_tok(p, '/', &p);
183     if (!p) {
184         error_setg(errp, "Pool name is required");
185         goto done;
186     }
187     qemu_rbd_unescape(found_str);
188     qdict_put_str(options, "pool", found_str);
189 
190     if (qemu_rbd_strchr(p, '@')) {
191         image_name = qemu_rbd_next_tok(p, '@', &p);
192 
193         found_str = qemu_rbd_next_tok(p, ':', &p);
194         qemu_rbd_unescape(found_str);
195         qdict_put_str(options, "snapshot", found_str);
196     } else {
197         image_name = qemu_rbd_next_tok(p, ':', &p);
198     }
199     /* Check for namespace in the image_name */
200     if (qemu_rbd_strchr(image_name, '/')) {
201         found_str = qemu_rbd_next_tok(image_name, '/', &image_name);
202         qemu_rbd_unescape(found_str);
203         qdict_put_str(options, "namespace", found_str);
204     } else {
205         qdict_put_str(options, "namespace", "");
206     }
207     qemu_rbd_unescape(image_name);
208     qdict_put_str(options, "image", image_name);
209     if (!p) {
210         goto done;
211     }
212 
213     /* The following are essentially all key/value pairs, and we treat
214      * 'id' and 'conf' a bit special.  Key/value pairs may be in any order. */
215     while (p) {
216         char *name, *value;
217         name = qemu_rbd_next_tok(p, '=', &p);
218         if (!p) {
219             error_setg(errp, "conf option %s has no value", name);
220             break;
221         }
222 
223         qemu_rbd_unescape(name);
224 
225         value = qemu_rbd_next_tok(p, ':', &p);
226         qemu_rbd_unescape(value);
227 
228         if (!strcmp(name, "conf")) {
229             qdict_put_str(options, "conf", value);
230         } else if (!strcmp(name, "id")) {
231             qdict_put_str(options, "user", value);
232         } else {
233             /*
234              * We pass these internally to qemu_rbd_set_keypairs(), so
235              * we can get away with the simpler list of [ "key1",
236              * "value1", "key2", "value2" ] rather than a raw dict
237              * { "key1": "value1", "key2": "value2" } where we can't
238              * guarantee order, or even a more correct but complex
239              * [ { "key1": "value1" }, { "key2": "value2" } ]
240              */
241             if (!keypairs) {
242                 keypairs = qlist_new();
243             }
244             qlist_append_str(keypairs, name);
245             qlist_append_str(keypairs, value);
246         }
247     }
248 
249     if (keypairs) {
250         qdict_put(options, "=keyvalue-pairs",
251                   qstring_from_gstring(qobject_to_json(QOBJECT(keypairs))));
252     }
253 
254 done:
255     g_free(buf);
256     qobject_unref(keypairs);
257 }
258 
qemu_rbd_set_auth(rados_t cluster,BlockdevOptionsRbd * opts,Error ** errp)259 static int qemu_rbd_set_auth(rados_t cluster, BlockdevOptionsRbd *opts,
260                              Error **errp)
261 {
262     char *key, *acr;
263     int r;
264     GString *accu;
265     RbdAuthModeList *auth;
266 
267     if (opts->key_secret) {
268         key = qcrypto_secret_lookup_as_base64(opts->key_secret, errp);
269         if (!key) {
270             return -EIO;
271         }
272         r = rados_conf_set(cluster, "key", key);
273         g_free(key);
274         if (r < 0) {
275             error_setg_errno(errp, -r, "Could not set 'key'");
276             return r;
277         }
278     }
279 
280     if (opts->has_auth_client_required) {
281         accu = g_string_new("");
282         for (auth = opts->auth_client_required; auth; auth = auth->next) {
283             if (accu->str[0]) {
284                 g_string_append_c(accu, ';');
285             }
286             g_string_append(accu, RbdAuthMode_str(auth->value));
287         }
288         acr = g_string_free(accu, FALSE);
289         r = rados_conf_set(cluster, "auth_client_required", acr);
290         g_free(acr);
291         if (r < 0) {
292             error_setg_errno(errp, -r,
293                              "Could not set 'auth_client_required'");
294             return r;
295         }
296     }
297 
298     return 0;
299 }
300 
qemu_rbd_set_keypairs(rados_t cluster,const char * keypairs_json,Error ** errp)301 static int qemu_rbd_set_keypairs(rados_t cluster, const char *keypairs_json,
302                                  Error **errp)
303 {
304     QList *keypairs;
305     QString *name;
306     QString *value;
307     const char *key;
308     size_t remaining;
309     int ret = 0;
310 
311     if (!keypairs_json) {
312         return ret;
313     }
314     keypairs = qobject_to(QList,
315                           qobject_from_json(keypairs_json, &error_abort));
316     remaining = qlist_size(keypairs) / 2;
317     assert(remaining);
318 
319     while (remaining--) {
320         name = qobject_to(QString, qlist_pop(keypairs));
321         value = qobject_to(QString, qlist_pop(keypairs));
322         assert(name && value);
323         key = qstring_get_str(name);
324 
325         ret = rados_conf_set(cluster, key, qstring_get_str(value));
326         qobject_unref(value);
327         if (ret < 0) {
328             error_setg_errno(errp, -ret, "invalid conf option %s", key);
329             qobject_unref(name);
330             ret = -EINVAL;
331             break;
332         }
333         qobject_unref(name);
334     }
335 
336     qobject_unref(keypairs);
337     return ret;
338 }
339 
340 #ifdef LIBRBD_SUPPORTS_ENCRYPTION
qemu_rbd_convert_luks_options(RbdEncryptionOptionsLUKSBase * luks_opts,char ** passphrase,size_t * passphrase_len,Error ** errp)341 static int qemu_rbd_convert_luks_options(
342         RbdEncryptionOptionsLUKSBase *luks_opts,
343         char **passphrase,
344         size_t *passphrase_len,
345         Error **errp)
346 {
347     return qcrypto_secret_lookup(luks_opts->key_secret, (uint8_t **)passphrase,
348                                  passphrase_len, errp);
349 }
350 
qemu_rbd_convert_luks_create_options(RbdEncryptionCreateOptionsLUKSBase * luks_opts,rbd_encryption_algorithm_t * alg,char ** passphrase,size_t * passphrase_len,Error ** errp)351 static int qemu_rbd_convert_luks_create_options(
352         RbdEncryptionCreateOptionsLUKSBase *luks_opts,
353         rbd_encryption_algorithm_t *alg,
354         char **passphrase,
355         size_t *passphrase_len,
356         Error **errp)
357 {
358     int r = 0;
359 
360     r = qemu_rbd_convert_luks_options(
361             qapi_RbdEncryptionCreateOptionsLUKSBase_base(luks_opts),
362             passphrase, passphrase_len, errp);
363     if (r < 0) {
364         return r;
365     }
366 
367     if (luks_opts->has_cipher_alg) {
368         switch (luks_opts->cipher_alg) {
369             case QCRYPTO_CIPHER_ALGO_AES_128: {
370                 *alg = RBD_ENCRYPTION_ALGORITHM_AES128;
371                 break;
372             }
373             case QCRYPTO_CIPHER_ALGO_AES_256: {
374                 *alg = RBD_ENCRYPTION_ALGORITHM_AES256;
375                 break;
376             }
377             default: {
378                 r = -ENOTSUP;
379                 error_setg_errno(errp, -r, "unknown encryption algorithm: %u",
380                                  luks_opts->cipher_alg);
381                 return r;
382             }
383         }
384     } else {
385         /* default alg */
386         *alg = RBD_ENCRYPTION_ALGORITHM_AES256;
387     }
388 
389     return 0;
390 }
391 
qemu_rbd_encryption_format(rbd_image_t image,RbdEncryptionCreateOptions * encrypt,Error ** errp)392 static int qemu_rbd_encryption_format(rbd_image_t image,
393                                       RbdEncryptionCreateOptions *encrypt,
394                                       Error **errp)
395 {
396     int r = 0;
397     g_autofree char *passphrase = NULL;
398     rbd_encryption_format_t format;
399     rbd_encryption_options_t opts;
400     rbd_encryption_luks1_format_options_t luks_opts;
401     rbd_encryption_luks2_format_options_t luks2_opts;
402     size_t opts_size;
403     uint64_t raw_size, effective_size;
404 
405     r = rbd_get_size(image, &raw_size);
406     if (r < 0) {
407         error_setg_errno(errp, -r, "cannot get raw image size");
408         return r;
409     }
410 
411     switch (encrypt->format) {
412         case RBD_IMAGE_ENCRYPTION_FORMAT_LUKS: {
413             memset(&luks_opts, 0, sizeof(luks_opts));
414             format = RBD_ENCRYPTION_FORMAT_LUKS1;
415             opts = &luks_opts;
416             opts_size = sizeof(luks_opts);
417             r = qemu_rbd_convert_luks_create_options(
418                     qapi_RbdEncryptionCreateOptionsLUKS_base(&encrypt->u.luks),
419                     &luks_opts.alg, &passphrase, &luks_opts.passphrase_size,
420                     errp);
421             if (r < 0) {
422                 return r;
423             }
424             luks_opts.passphrase = passphrase;
425             break;
426         }
427         case RBD_IMAGE_ENCRYPTION_FORMAT_LUKS2: {
428             memset(&luks2_opts, 0, sizeof(luks2_opts));
429             format = RBD_ENCRYPTION_FORMAT_LUKS2;
430             opts = &luks2_opts;
431             opts_size = sizeof(luks2_opts);
432             r = qemu_rbd_convert_luks_create_options(
433                     qapi_RbdEncryptionCreateOptionsLUKS2_base(
434                             &encrypt->u.luks2),
435                     &luks2_opts.alg, &passphrase, &luks2_opts.passphrase_size,
436                     errp);
437             if (r < 0) {
438                 return r;
439             }
440             luks2_opts.passphrase = passphrase;
441             break;
442         }
443         default: {
444             r = -ENOTSUP;
445             error_setg_errno(
446                     errp, -r, "unknown image encryption format: %u",
447                     encrypt->format);
448             return r;
449         }
450     }
451 
452     r = rbd_encryption_format(image, format, opts, opts_size);
453     if (r < 0) {
454         error_setg_errno(errp, -r, "encryption format fail");
455         return r;
456     }
457 
458     r = rbd_get_size(image, &effective_size);
459     if (r < 0) {
460         error_setg_errno(errp, -r, "cannot get effective image size");
461         return r;
462     }
463 
464     r = rbd_resize(image, raw_size + (raw_size - effective_size));
465     if (r < 0) {
466         error_setg_errno(errp, -r, "cannot resize image after format");
467         return r;
468     }
469 
470     return 0;
471 }
472 
qemu_rbd_encryption_load(rbd_image_t image,RbdEncryptionOptions * encrypt,Error ** errp)473 static int qemu_rbd_encryption_load(rbd_image_t image,
474                                     RbdEncryptionOptions *encrypt,
475                                     Error **errp)
476 {
477     int r = 0;
478     g_autofree char *passphrase = NULL;
479     rbd_encryption_luks1_format_options_t luks_opts;
480     rbd_encryption_luks2_format_options_t luks2_opts;
481 #ifdef LIBRBD_SUPPORTS_ENCRYPTION_LOAD2
482     rbd_encryption_luks_format_options_t luks_any_opts;
483 #endif
484     rbd_encryption_format_t format;
485     rbd_encryption_options_t opts;
486     size_t opts_size;
487 
488     switch (encrypt->format) {
489         case RBD_IMAGE_ENCRYPTION_FORMAT_LUKS: {
490             memset(&luks_opts, 0, sizeof(luks_opts));
491             format = RBD_ENCRYPTION_FORMAT_LUKS1;
492             opts = &luks_opts;
493             opts_size = sizeof(luks_opts);
494             r = qemu_rbd_convert_luks_options(
495                     qapi_RbdEncryptionOptionsLUKS_base(&encrypt->u.luks),
496                     &passphrase, &luks_opts.passphrase_size, errp);
497             if (r < 0) {
498                 return r;
499             }
500             luks_opts.passphrase = passphrase;
501             break;
502         }
503         case RBD_IMAGE_ENCRYPTION_FORMAT_LUKS2: {
504             memset(&luks2_opts, 0, sizeof(luks2_opts));
505             format = RBD_ENCRYPTION_FORMAT_LUKS2;
506             opts = &luks2_opts;
507             opts_size = sizeof(luks2_opts);
508             r = qemu_rbd_convert_luks_options(
509                     qapi_RbdEncryptionOptionsLUKS2_base(&encrypt->u.luks2),
510                     &passphrase, &luks2_opts.passphrase_size, errp);
511             if (r < 0) {
512                 return r;
513             }
514             luks2_opts.passphrase = passphrase;
515             break;
516         }
517 #ifdef LIBRBD_SUPPORTS_ENCRYPTION_LOAD2
518         case RBD_IMAGE_ENCRYPTION_FORMAT_LUKS_ANY: {
519             memset(&luks_any_opts, 0, sizeof(luks_any_opts));
520             format = RBD_ENCRYPTION_FORMAT_LUKS;
521             opts = &luks_any_opts;
522             opts_size = sizeof(luks_any_opts);
523             r = qemu_rbd_convert_luks_options(
524                     qapi_RbdEncryptionOptionsLUKSAny_base(&encrypt->u.luks_any),
525                     &passphrase, &luks_any_opts.passphrase_size, errp);
526             if (r < 0) {
527                 return r;
528             }
529             luks_any_opts.passphrase = passphrase;
530             break;
531         }
532 #endif
533         default: {
534             r = -ENOTSUP;
535             error_setg_errno(
536                     errp, -r, "unknown image encryption format: %u",
537                     encrypt->format);
538             return r;
539         }
540     }
541 
542     r = rbd_encryption_load(image, format, opts, opts_size);
543     if (r < 0) {
544         error_setg_errno(errp, -r, "encryption load fail");
545         return r;
546     }
547 
548     return 0;
549 }
550 
551 #ifdef LIBRBD_SUPPORTS_ENCRYPTION_LOAD2
qemu_rbd_encryption_load2(rbd_image_t image,RbdEncryptionOptions * encrypt,Error ** errp)552 static int qemu_rbd_encryption_load2(rbd_image_t image,
553                                      RbdEncryptionOptions *encrypt,
554                                      Error **errp)
555 {
556     int r = 0;
557     int encrypt_count = 1;
558     int i;
559     RbdEncryptionOptions *curr_encrypt;
560     rbd_encryption_spec_t *specs;
561     rbd_encryption_luks1_format_options_t *luks_opts;
562     rbd_encryption_luks2_format_options_t *luks2_opts;
563     rbd_encryption_luks_format_options_t *luks_any_opts;
564 
565     /* count encryption options */
566     for (curr_encrypt = encrypt->parent; curr_encrypt;
567          curr_encrypt = curr_encrypt->parent) {
568         ++encrypt_count;
569     }
570 
571     specs = g_new0(rbd_encryption_spec_t, encrypt_count);
572 
573     curr_encrypt = encrypt;
574     for (i = 0; i < encrypt_count; ++i) {
575         switch (curr_encrypt->format) {
576             case RBD_IMAGE_ENCRYPTION_FORMAT_LUKS: {
577                 specs[i].format = RBD_ENCRYPTION_FORMAT_LUKS1;
578 
579                 luks_opts = g_new0(rbd_encryption_luks1_format_options_t, 1);
580                 specs[i].opts = luks_opts;
581                 specs[i].opts_size = sizeof(*luks_opts);
582 
583                 r = qemu_rbd_convert_luks_options(
584                         qapi_RbdEncryptionOptionsLUKS_base(
585                                 &curr_encrypt->u.luks),
586                         (char **)&luks_opts->passphrase,
587                         &luks_opts->passphrase_size,
588                         errp);
589                 break;
590             }
591             case RBD_IMAGE_ENCRYPTION_FORMAT_LUKS2: {
592                 specs[i].format = RBD_ENCRYPTION_FORMAT_LUKS2;
593 
594                 luks2_opts = g_new0(rbd_encryption_luks2_format_options_t, 1);
595                 specs[i].opts = luks2_opts;
596                 specs[i].opts_size = sizeof(*luks2_opts);
597 
598                 r = qemu_rbd_convert_luks_options(
599                         qapi_RbdEncryptionOptionsLUKS2_base(
600                                 &curr_encrypt->u.luks2),
601                         (char **)&luks2_opts->passphrase,
602                         &luks2_opts->passphrase_size,
603                         errp);
604                 break;
605             }
606             case RBD_IMAGE_ENCRYPTION_FORMAT_LUKS_ANY: {
607                 specs[i].format = RBD_ENCRYPTION_FORMAT_LUKS;
608 
609                 luks_any_opts = g_new0(rbd_encryption_luks_format_options_t, 1);
610                 specs[i].opts = luks_any_opts;
611                 specs[i].opts_size = sizeof(*luks_any_opts);
612 
613                 r = qemu_rbd_convert_luks_options(
614                         qapi_RbdEncryptionOptionsLUKSAny_base(
615                                 &curr_encrypt->u.luks_any),
616                         (char **)&luks_any_opts->passphrase,
617                         &luks_any_opts->passphrase_size,
618                         errp);
619                 break;
620             }
621             default: {
622                 r = -ENOTSUP;
623                 error_setg_errno(
624                         errp, -r, "unknown image encryption format: %u",
625                         curr_encrypt->format);
626             }
627         }
628 
629         if (r < 0) {
630             goto exit;
631         }
632 
633         curr_encrypt = curr_encrypt->parent;
634     }
635 
636     r = rbd_encryption_load2(image, specs, encrypt_count);
637     if (r < 0) {
638         error_setg_errno(errp, -r, "layered encryption load fail");
639         goto exit;
640     }
641 
642 exit:
643     for (i = 0; i < encrypt_count; ++i) {
644         if (!specs[i].opts) {
645             break;
646         }
647 
648         switch (specs[i].format) {
649             case RBD_ENCRYPTION_FORMAT_LUKS1: {
650                 luks_opts = specs[i].opts;
651                 g_free((void *)luks_opts->passphrase);
652                 break;
653             }
654             case RBD_ENCRYPTION_FORMAT_LUKS2: {
655                 luks2_opts = specs[i].opts;
656                 g_free((void *)luks2_opts->passphrase);
657                 break;
658             }
659             case RBD_ENCRYPTION_FORMAT_LUKS: {
660                 luks_any_opts = specs[i].opts;
661                 g_free((void *)luks_any_opts->passphrase);
662                 break;
663             }
664         }
665 
666         g_free(specs[i].opts);
667     }
668     g_free(specs);
669     return r;
670 }
671 #endif
672 #endif
673 
674 /* FIXME Deprecate and remove keypairs or make it available in QMP. */
qemu_rbd_do_create(BlockdevCreateOptions * options,const char * keypairs,const char * password_secret,Error ** errp)675 static int qemu_rbd_do_create(BlockdevCreateOptions *options,
676                               const char *keypairs, const char *password_secret,
677                               Error **errp)
678 {
679     BlockdevCreateOptionsRbd *opts = &options->u.rbd;
680     rados_t cluster;
681     rados_ioctx_t io_ctx;
682     int obj_order = 0;
683     int ret;
684 
685     assert(options->driver == BLOCKDEV_DRIVER_RBD);
686     if (opts->location->snapshot) {
687         error_setg(errp, "Can't use snapshot name for image creation");
688         return -EINVAL;
689     }
690 
691 #ifndef LIBRBD_SUPPORTS_ENCRYPTION
692     if (opts->encrypt) {
693         error_setg(errp, "RBD library does not support image encryption");
694         return -ENOTSUP;
695     }
696 #endif
697 
698     if (opts->has_cluster_size) {
699         int64_t objsize = opts->cluster_size;
700         if ((objsize - 1) & objsize) {    /* not a power of 2? */
701             error_setg(errp, "obj size needs to be power of 2");
702             return -EINVAL;
703         }
704         if (objsize < 4096) {
705             error_setg(errp, "obj size too small");
706             return -EINVAL;
707         }
708         obj_order = ctz32(objsize);
709     }
710 
711     ret = qemu_rbd_connect(&cluster, &io_ctx, opts->location, false, keypairs,
712                            password_secret, errp);
713     if (ret < 0) {
714         return ret;
715     }
716 
717     ret = rbd_create(io_ctx, opts->location->image, opts->size, &obj_order);
718     if (ret < 0) {
719         error_setg_errno(errp, -ret, "error rbd create");
720         goto out;
721     }
722 
723 #ifdef LIBRBD_SUPPORTS_ENCRYPTION
724     if (opts->encrypt) {
725         rbd_image_t image;
726 
727         ret = rbd_open(io_ctx, opts->location->image, &image, NULL);
728         if (ret < 0) {
729             error_setg_errno(errp, -ret,
730                              "error opening image '%s' for encryption format",
731                              opts->location->image);
732             goto out;
733         }
734 
735         ret = qemu_rbd_encryption_format(image, opts->encrypt, errp);
736         rbd_close(image);
737         if (ret < 0) {
738             /* encryption format fail, try removing the image */
739             rbd_remove(io_ctx, opts->location->image);
740             goto out;
741         }
742     }
743 #endif
744 
745     ret = 0;
746 out:
747     rados_ioctx_destroy(io_ctx);
748     rados_shutdown(cluster);
749     return ret;
750 }
751 
qemu_rbd_co_create(BlockdevCreateOptions * options,Error ** errp)752 static int qemu_rbd_co_create(BlockdevCreateOptions *options, Error **errp)
753 {
754     return qemu_rbd_do_create(options, NULL, NULL, errp);
755 }
756 
qemu_rbd_extract_encryption_create_options(QemuOpts * opts,RbdEncryptionCreateOptions ** spec,Error ** errp)757 static int qemu_rbd_extract_encryption_create_options(
758         QemuOpts *opts,
759         RbdEncryptionCreateOptions **spec,
760         Error **errp)
761 {
762     QDict *opts_qdict;
763     QDict *encrypt_qdict;
764     Visitor *v;
765     int ret = 0;
766 
767     opts_qdict = qemu_opts_to_qdict(opts, NULL);
768     qdict_extract_subqdict(opts_qdict, &encrypt_qdict, "encrypt.");
769     qobject_unref(opts_qdict);
770     if (!qdict_size(encrypt_qdict)) {
771         *spec = NULL;
772         goto exit;
773     }
774 
775     /* Convert options into a QAPI object */
776     v = qobject_input_visitor_new_flat_confused(encrypt_qdict, errp);
777     if (!v) {
778         ret = -EINVAL;
779         goto exit;
780     }
781 
782     visit_type_RbdEncryptionCreateOptions(v, NULL, spec, errp);
783     visit_free(v);
784     if (!*spec) {
785         ret = -EINVAL;
786         goto exit;
787     }
788 
789 exit:
790     qobject_unref(encrypt_qdict);
791     return ret;
792 }
793 
qemu_rbd_co_create_opts(BlockDriver * drv,const char * filename,QemuOpts * opts,Error ** errp)794 static int coroutine_fn qemu_rbd_co_create_opts(BlockDriver *drv,
795                                                 const char *filename,
796                                                 QemuOpts *opts,
797                                                 Error **errp)
798 {
799     BlockdevCreateOptions *create_options;
800     BlockdevCreateOptionsRbd *rbd_opts;
801     BlockdevOptionsRbd *loc;
802     RbdEncryptionCreateOptions *encrypt = NULL;
803     Error *local_err = NULL;
804     const char *keypairs, *password_secret;
805     QDict *options = NULL;
806     int ret = 0;
807 
808     create_options = g_new0(BlockdevCreateOptions, 1);
809     create_options->driver = BLOCKDEV_DRIVER_RBD;
810     rbd_opts = &create_options->u.rbd;
811 
812     rbd_opts->location = g_new0(BlockdevOptionsRbd, 1);
813 
814     password_secret = qemu_opt_get(opts, "password-secret");
815 
816     /* Read out options */
817     rbd_opts->size = ROUND_UP(qemu_opt_get_size_del(opts, BLOCK_OPT_SIZE, 0),
818                               BDRV_SECTOR_SIZE);
819     rbd_opts->cluster_size = qemu_opt_get_size_del(opts,
820                                                    BLOCK_OPT_CLUSTER_SIZE, 0);
821     rbd_opts->has_cluster_size = (rbd_opts->cluster_size != 0);
822 
823     options = qdict_new();
824     qemu_rbd_parse_filename(filename, options, &local_err);
825     if (local_err) {
826         ret = -EINVAL;
827         error_propagate(errp, local_err);
828         goto exit;
829     }
830 
831     ret = qemu_rbd_extract_encryption_create_options(opts, &encrypt, errp);
832     if (ret < 0) {
833         goto exit;
834     }
835     rbd_opts->encrypt     = encrypt;
836 
837     /*
838      * Caution: while qdict_get_try_str() is fine, getting non-string
839      * types would require more care.  When @options come from -blockdev
840      * or blockdev_add, its members are typed according to the QAPI
841      * schema, but when they come from -drive, they're all QString.
842      */
843     loc = rbd_opts->location;
844     loc->pool        = g_strdup(qdict_get_try_str(options, "pool"));
845     loc->conf        = g_strdup(qdict_get_try_str(options, "conf"));
846     loc->user        = g_strdup(qdict_get_try_str(options, "user"));
847     loc->q_namespace = g_strdup(qdict_get_try_str(options, "namespace"));
848     loc->image       = g_strdup(qdict_get_try_str(options, "image"));
849     keypairs         = qdict_get_try_str(options, "=keyvalue-pairs");
850 
851     ret = qemu_rbd_do_create(create_options, keypairs, password_secret, errp);
852     if (ret < 0) {
853         goto exit;
854     }
855 
856 exit:
857     qobject_unref(options);
858     qapi_free_BlockdevCreateOptions(create_options);
859     return ret;
860 }
861 
qemu_rbd_mon_host(BlockdevOptionsRbd * opts,Error ** errp)862 static char *qemu_rbd_mon_host(BlockdevOptionsRbd *opts, Error **errp)
863 {
864     const char **vals;
865     const char *host, *port;
866     char *rados_str;
867     InetSocketAddressBaseList *p;
868     int i, cnt;
869 
870     if (!opts->has_server) {
871         return NULL;
872     }
873 
874     for (cnt = 0, p = opts->server; p; p = p->next) {
875         cnt++;
876     }
877 
878     vals = g_new(const char *, cnt + 1);
879 
880     for (i = 0, p = opts->server; p; p = p->next, i++) {
881         host = p->value->host;
882         port = p->value->port;
883 
884         if (strchr(host, ':')) {
885             vals[i] = g_strdup_printf("[%s]:%s", host, port);
886         } else {
887             vals[i] = g_strdup_printf("%s:%s", host, port);
888         }
889     }
890     vals[i] = NULL;
891 
892     rados_str = i ? g_strjoinv(";", (char **)vals) : NULL;
893     g_strfreev((char **)vals);
894     return rados_str;
895 }
896 
qemu_rbd_connect(rados_t * cluster,rados_ioctx_t * io_ctx,BlockdevOptionsRbd * opts,bool cache,const char * keypairs,const char * secretid,Error ** errp)897 static int qemu_rbd_connect(rados_t *cluster, rados_ioctx_t *io_ctx,
898                             BlockdevOptionsRbd *opts, bool cache,
899                             const char *keypairs, const char *secretid,
900                             Error **errp)
901 {
902     char *mon_host = NULL;
903     Error *local_err = NULL;
904     int r;
905 
906     if (secretid) {
907         if (opts->key_secret) {
908             error_setg(errp,
909                        "Legacy 'password-secret' clashes with 'key-secret'");
910             return -EINVAL;
911         }
912         opts->key_secret = g_strdup(secretid);
913     }
914 
915     mon_host = qemu_rbd_mon_host(opts, &local_err);
916     if (local_err) {
917         error_propagate(errp, local_err);
918         r = -EINVAL;
919         goto out;
920     }
921 
922     r = rados_create(cluster, opts->user);
923     if (r < 0) {
924         error_setg_errno(errp, -r, "error initializing");
925         goto out;
926     }
927 
928     /* try default location when conf=NULL, but ignore failure */
929     r = rados_conf_read_file(*cluster, opts->conf);
930     if (opts->conf && r < 0) {
931         error_setg_errno(errp, -r, "error reading conf file %s", opts->conf);
932         goto failed_shutdown;
933     }
934 
935     r = qemu_rbd_set_keypairs(*cluster, keypairs, errp);
936     if (r < 0) {
937         goto failed_shutdown;
938     }
939 
940     if (mon_host) {
941         r = rados_conf_set(*cluster, "mon_host", mon_host);
942         if (r < 0) {
943             goto failed_shutdown;
944         }
945     }
946 
947     r = qemu_rbd_set_auth(*cluster, opts, errp);
948     if (r < 0) {
949         goto failed_shutdown;
950     }
951 
952     /*
953      * Fallback to more conservative semantics if setting cache
954      * options fails. Ignore errors from setting rbd_cache because the
955      * only possible error is that the option does not exist, and
956      * librbd defaults to no caching. If write through caching cannot
957      * be set up, fall back to no caching.
958      */
959     if (cache) {
960         rados_conf_set(*cluster, "rbd_cache", "true");
961     } else {
962         rados_conf_set(*cluster, "rbd_cache", "false");
963     }
964 
965     r = rados_connect(*cluster);
966     if (r < 0) {
967         error_setg_errno(errp, -r, "error connecting");
968         goto failed_shutdown;
969     }
970 
971     r = rados_ioctx_create(*cluster, opts->pool, io_ctx);
972     if (r < 0) {
973         error_setg_errno(errp, -r, "error opening pool %s", opts->pool);
974         goto failed_shutdown;
975     }
976 
977 #ifdef HAVE_RBD_NAMESPACE_EXISTS
978     if (opts->q_namespace && strlen(opts->q_namespace) > 0) {
979         bool exists;
980 
981         r = rbd_namespace_exists(*io_ctx, opts->q_namespace, &exists);
982         if (r < 0) {
983             error_setg_errno(errp, -r, "error checking namespace");
984             goto failed_ioctx_destroy;
985         }
986 
987         if (!exists) {
988             error_setg(errp, "namespace '%s' does not exist",
989                        opts->q_namespace);
990             r = -ENOENT;
991             goto failed_ioctx_destroy;
992         }
993     }
994 #endif
995 
996     /*
997      * Set the namespace after opening the io context on the pool,
998      * if nspace == NULL or if nspace == "", it is just as we did nothing
999      */
1000     rados_ioctx_set_namespace(*io_ctx, opts->q_namespace);
1001 
1002     r = 0;
1003     goto out;
1004 
1005 #ifdef HAVE_RBD_NAMESPACE_EXISTS
1006 failed_ioctx_destroy:
1007     rados_ioctx_destroy(*io_ctx);
1008 #endif
1009 failed_shutdown:
1010     rados_shutdown(*cluster);
1011 out:
1012     g_free(mon_host);
1013     return r;
1014 }
1015 
qemu_rbd_convert_options(QDict * options,BlockdevOptionsRbd ** opts,Error ** errp)1016 static int qemu_rbd_convert_options(QDict *options, BlockdevOptionsRbd **opts,
1017                                     Error **errp)
1018 {
1019     Visitor *v;
1020 
1021     /* Convert the remaining options into a QAPI object */
1022     v = qobject_input_visitor_new_flat_confused(options, errp);
1023     if (!v) {
1024         return -EINVAL;
1025     }
1026 
1027     visit_type_BlockdevOptionsRbd(v, NULL, opts, errp);
1028     visit_free(v);
1029     if (!opts) {
1030         return -EINVAL;
1031     }
1032 
1033     return 0;
1034 }
1035 
qemu_rbd_attempt_legacy_options(QDict * options,BlockdevOptionsRbd ** opts,char ** keypairs)1036 static int qemu_rbd_attempt_legacy_options(QDict *options,
1037                                            BlockdevOptionsRbd **opts,
1038                                            char **keypairs)
1039 {
1040     char *filename;
1041     int r;
1042 
1043     filename = g_strdup(qdict_get_try_str(options, "filename"));
1044     if (!filename) {
1045         return -EINVAL;
1046     }
1047     qdict_del(options, "filename");
1048 
1049     qemu_rbd_parse_filename(filename, options, NULL);
1050 
1051     /* keypairs freed by caller */
1052     *keypairs = g_strdup(qdict_get_try_str(options, "=keyvalue-pairs"));
1053     if (*keypairs) {
1054         qdict_del(options, "=keyvalue-pairs");
1055     }
1056 
1057     r = qemu_rbd_convert_options(options, opts, NULL);
1058 
1059     g_free(filename);
1060     return r;
1061 }
1062 
qemu_rbd_open(BlockDriverState * bs,QDict * options,int flags,Error ** errp)1063 static int qemu_rbd_open(BlockDriverState *bs, QDict *options, int flags,
1064                          Error **errp)
1065 {
1066     BDRVRBDState *s = bs->opaque;
1067     BlockdevOptionsRbd *opts = NULL;
1068     const QDictEntry *e;
1069     Error *local_err = NULL;
1070     char *keypairs, *secretid;
1071     rbd_image_info_t info;
1072     int r;
1073 
1074     keypairs = g_strdup(qdict_get_try_str(options, "=keyvalue-pairs"));
1075     if (keypairs) {
1076         qdict_del(options, "=keyvalue-pairs");
1077     }
1078 
1079     secretid = g_strdup(qdict_get_try_str(options, "password-secret"));
1080     if (secretid) {
1081         qdict_del(options, "password-secret");
1082     }
1083 
1084     r = qemu_rbd_convert_options(options, &opts, &local_err);
1085     if (local_err) {
1086         /* If keypairs are present, that means some options are present in
1087          * the modern option format.  Don't attempt to parse legacy option
1088          * formats, as we won't support mixed usage. */
1089         if (keypairs) {
1090             error_propagate(errp, local_err);
1091             goto out;
1092         }
1093 
1094         /* If the initial attempt to convert and process the options failed,
1095          * we may be attempting to open an image file that has the rbd options
1096          * specified in the older format consisting of all key/value pairs
1097          * encoded in the filename.  Go ahead and attempt to parse the
1098          * filename, and see if we can pull out the required options. */
1099         r = qemu_rbd_attempt_legacy_options(options, &opts, &keypairs);
1100         if (r < 0) {
1101             /* Propagate the original error, not the legacy parsing fallback
1102              * error, as the latter was just a best-effort attempt. */
1103             error_propagate(errp, local_err);
1104             goto out;
1105         }
1106         /* Take care whenever deciding to actually deprecate; once this ability
1107          * is removed, we will not be able to open any images with legacy-styled
1108          * backing image strings. */
1109         warn_report("RBD options encoded in the filename as keyvalue pairs "
1110                     "is deprecated");
1111     }
1112 
1113     /* Remove the processed options from the QDict (the visitor processes
1114      * _all_ options in the QDict) */
1115     while ((e = qdict_first(options))) {
1116         qdict_del(options, e->key);
1117     }
1118 
1119     r = qemu_rbd_connect(&s->cluster, &s->io_ctx, opts,
1120                          !(flags & BDRV_O_NOCACHE), keypairs, secretid, errp);
1121     if (r < 0) {
1122         goto out;
1123     }
1124 
1125     s->snap = g_strdup(opts->snapshot);
1126     s->image_name = g_strdup(opts->image);
1127 
1128     /* rbd_open is always r/w */
1129     r = rbd_open(s->io_ctx, s->image_name, &s->image, s->snap);
1130     if (r < 0) {
1131         error_setg_errno(errp, -r, "error reading header from %s",
1132                          s->image_name);
1133         goto failed_open;
1134     }
1135 
1136     if (opts->encrypt) {
1137 #ifdef LIBRBD_SUPPORTS_ENCRYPTION
1138         if (opts->encrypt->parent) {
1139 #ifdef LIBRBD_SUPPORTS_ENCRYPTION_LOAD2
1140             r = qemu_rbd_encryption_load2(s->image, opts->encrypt, errp);
1141 #else
1142             r = -ENOTSUP;
1143             error_setg(errp, "RBD library does not support layered encryption");
1144 #endif
1145         } else {
1146             r = qemu_rbd_encryption_load(s->image, opts->encrypt, errp);
1147         }
1148         if (r < 0) {
1149             goto failed_post_open;
1150         }
1151 #else
1152         r = -ENOTSUP;
1153         error_setg(errp, "RBD library does not support image encryption");
1154         goto failed_post_open;
1155 #endif
1156     }
1157 
1158     r = rbd_stat(s->image, &info, sizeof(info));
1159     if (r < 0) {
1160         error_setg_errno(errp, -r, "error getting image info from %s",
1161                          s->image_name);
1162         goto failed_post_open;
1163     }
1164     s->image_size = info.size;
1165     s->object_size = info.obj_size;
1166 
1167     /* If we are using an rbd snapshot, we must be r/o, otherwise
1168      * leave as-is */
1169     if (s->snap != NULL) {
1170         bdrv_graph_rdlock_main_loop();
1171         r = bdrv_apply_auto_read_only(bs, "rbd snapshots are read-only", errp);
1172         bdrv_graph_rdunlock_main_loop();
1173         if (r < 0) {
1174             goto failed_post_open;
1175         }
1176     }
1177 
1178 #ifdef LIBRBD_SUPPORTS_WRITE_ZEROES
1179     bs->supported_zero_flags = BDRV_REQ_MAY_UNMAP | BDRV_REQ_NO_FALLBACK;
1180 #endif
1181 
1182     /* When extending regular files, we get zeros from the OS */
1183     bs->supported_truncate_flags = BDRV_REQ_ZERO_WRITE;
1184 
1185     r = 0;
1186     goto out;
1187 
1188 failed_post_open:
1189     rbd_close(s->image);
1190 failed_open:
1191     rados_ioctx_destroy(s->io_ctx);
1192     g_free(s->snap);
1193     g_free(s->image_name);
1194     rados_shutdown(s->cluster);
1195 out:
1196     qapi_free_BlockdevOptionsRbd(opts);
1197     g_free(keypairs);
1198     g_free(secretid);
1199     return r;
1200 }
1201 
1202 
1203 /* Since RBD is currently always opened R/W via the API,
1204  * we just need to check if we are using a snapshot or not, in
1205  * order to determine if we will allow it to be R/W */
qemu_rbd_reopen_prepare(BDRVReopenState * state,BlockReopenQueue * queue,Error ** errp)1206 static int qemu_rbd_reopen_prepare(BDRVReopenState *state,
1207                                    BlockReopenQueue *queue, Error **errp)
1208 {
1209     BDRVRBDState *s = state->bs->opaque;
1210     int ret = 0;
1211 
1212     GRAPH_RDLOCK_GUARD_MAINLOOP();
1213 
1214     if (s->snap && state->flags & BDRV_O_RDWR) {
1215         error_setg(errp,
1216                    "Cannot change node '%s' to r/w when using RBD snapshot",
1217                    bdrv_get_device_or_node_name(state->bs));
1218         ret = -EINVAL;
1219     }
1220 
1221     return ret;
1222 }
1223 
qemu_rbd_close(BlockDriverState * bs)1224 static void qemu_rbd_close(BlockDriverState *bs)
1225 {
1226     BDRVRBDState *s = bs->opaque;
1227 
1228     rbd_close(s->image);
1229     rados_ioctx_destroy(s->io_ctx);
1230     g_free(s->snap);
1231     g_free(s->image_name);
1232     rados_shutdown(s->cluster);
1233 }
1234 
1235 /* Resize the RBD image and update the 'image_size' with the current size */
qemu_rbd_resize(BlockDriverState * bs,uint64_t size)1236 static int qemu_rbd_resize(BlockDriverState *bs, uint64_t size)
1237 {
1238     BDRVRBDState *s = bs->opaque;
1239     int r;
1240 
1241     r = rbd_resize(s->image, size);
1242     if (r < 0) {
1243         return r;
1244     }
1245 
1246     s->image_size = size;
1247 
1248     return 0;
1249 }
1250 
qemu_rbd_finish_bh(void * opaque)1251 static void qemu_rbd_finish_bh(void *opaque)
1252 {
1253     RBDTask *task = opaque;
1254     task->complete = true;
1255     aio_co_wake(task->co);
1256 }
1257 
1258 /*
1259  * This is the completion callback function for all rbd aio calls
1260  * started from qemu_rbd_start_co().
1261  *
1262  * Note: this function is being called from a non qemu thread so
1263  * we need to be careful about what we do here. Generally we only
1264  * schedule a BH, and do the rest of the io completion handling
1265  * from qemu_rbd_finish_bh() which runs in a qemu context.
1266  */
qemu_rbd_completion_cb(rbd_completion_t c,RBDTask * task)1267 static void qemu_rbd_completion_cb(rbd_completion_t c, RBDTask *task)
1268 {
1269     task->ret = rbd_aio_get_return_value(c);
1270     rbd_aio_release(c);
1271     aio_bh_schedule_oneshot(bdrv_get_aio_context(task->bs),
1272                             qemu_rbd_finish_bh, task);
1273 }
1274 
qemu_rbd_start_co(BlockDriverState * bs,uint64_t offset,uint64_t bytes,QEMUIOVector * qiov,int flags,RBDAIOCmd cmd)1275 static int coroutine_fn qemu_rbd_start_co(BlockDriverState *bs,
1276                                           uint64_t offset,
1277                                           uint64_t bytes,
1278                                           QEMUIOVector *qiov,
1279                                           int flags,
1280                                           RBDAIOCmd cmd)
1281 {
1282     BDRVRBDState *s = bs->opaque;
1283     RBDTask task = { .bs = bs, .co = qemu_coroutine_self() };
1284     rbd_completion_t c;
1285     int r;
1286 
1287     assert(!qiov || qiov->size == bytes);
1288 
1289     if (cmd == RBD_AIO_WRITE || cmd == RBD_AIO_WRITE_ZEROES) {
1290         /*
1291          * RBD APIs don't allow us to write more than actual size, so in order
1292          * to support growing images, we resize the image before write
1293          * operations that exceed the current size.
1294          */
1295         if (offset + bytes > s->image_size) {
1296             r = qemu_rbd_resize(bs, offset + bytes);
1297             if (r < 0) {
1298                 return r;
1299             }
1300         }
1301     }
1302 
1303     r = rbd_aio_create_completion(&task,
1304                                   (rbd_callback_t) qemu_rbd_completion_cb, &c);
1305     if (r < 0) {
1306         return r;
1307     }
1308 
1309     switch (cmd) {
1310     case RBD_AIO_READ:
1311         r = rbd_aio_readv(s->image, qiov->iov, qiov->niov, offset, c);
1312         break;
1313     case RBD_AIO_WRITE:
1314         r = rbd_aio_writev(s->image, qiov->iov, qiov->niov, offset, c);
1315         break;
1316     case RBD_AIO_DISCARD:
1317         r = rbd_aio_discard(s->image, offset, bytes, c);
1318         break;
1319     case RBD_AIO_FLUSH:
1320         r = rbd_aio_flush(s->image, c);
1321         break;
1322 #ifdef LIBRBD_SUPPORTS_WRITE_ZEROES
1323     case RBD_AIO_WRITE_ZEROES: {
1324         int zero_flags = 0;
1325 #ifdef RBD_WRITE_ZEROES_FLAG_THICK_PROVISION
1326         if (!(flags & BDRV_REQ_MAY_UNMAP)) {
1327             zero_flags = RBD_WRITE_ZEROES_FLAG_THICK_PROVISION;
1328         }
1329 #endif
1330         r = rbd_aio_write_zeroes(s->image, offset, bytes, c, zero_flags, 0);
1331         break;
1332     }
1333 #endif
1334     default:
1335         r = -EINVAL;
1336     }
1337 
1338     if (r < 0) {
1339         error_report("rbd request failed early: cmd %d offset %" PRIu64
1340                      " bytes %" PRIu64 " flags %d r %d (%s)", cmd, offset,
1341                      bytes, flags, r, strerror(-r));
1342         rbd_aio_release(c);
1343         return r;
1344     }
1345 
1346     while (!task.complete) {
1347         qemu_coroutine_yield();
1348     }
1349 
1350     if (task.ret < 0) {
1351         error_report("rbd request failed: cmd %d offset %" PRIu64 " bytes %"
1352                      PRIu64 " flags %d task.ret %" PRIi64 " (%s)", cmd, offset,
1353                      bytes, flags, task.ret, strerror(-task.ret));
1354         return task.ret;
1355     }
1356 
1357     /* zero pad short reads */
1358     if (cmd == RBD_AIO_READ && task.ret < qiov->size) {
1359         qemu_iovec_memset(qiov, task.ret, 0, qiov->size - task.ret);
1360     }
1361 
1362     return 0;
1363 }
1364 
1365 static int
qemu_rbd_co_preadv(BlockDriverState * bs,int64_t offset,int64_t bytes,QEMUIOVector * qiov,BdrvRequestFlags flags)1366 coroutine_fn qemu_rbd_co_preadv(BlockDriverState *bs, int64_t offset,
1367                                 int64_t bytes, QEMUIOVector *qiov,
1368                                 BdrvRequestFlags flags)
1369 {
1370     return qemu_rbd_start_co(bs, offset, bytes, qiov, flags, RBD_AIO_READ);
1371 }
1372 
1373 static int
qemu_rbd_co_pwritev(BlockDriverState * bs,int64_t offset,int64_t bytes,QEMUIOVector * qiov,BdrvRequestFlags flags)1374 coroutine_fn qemu_rbd_co_pwritev(BlockDriverState *bs, int64_t offset,
1375                                  int64_t bytes, QEMUIOVector *qiov,
1376                                  BdrvRequestFlags flags)
1377 {
1378     return qemu_rbd_start_co(bs, offset, bytes, qiov, flags, RBD_AIO_WRITE);
1379 }
1380 
qemu_rbd_co_flush(BlockDriverState * bs)1381 static int coroutine_fn qemu_rbd_co_flush(BlockDriverState *bs)
1382 {
1383     return qemu_rbd_start_co(bs, 0, 0, NULL, 0, RBD_AIO_FLUSH);
1384 }
1385 
qemu_rbd_co_pdiscard(BlockDriverState * bs,int64_t offset,int64_t bytes)1386 static int coroutine_fn qemu_rbd_co_pdiscard(BlockDriverState *bs,
1387                                              int64_t offset, int64_t bytes)
1388 {
1389     return qemu_rbd_start_co(bs, offset, bytes, NULL, 0, RBD_AIO_DISCARD);
1390 }
1391 
1392 #ifdef LIBRBD_SUPPORTS_WRITE_ZEROES
1393 static int
qemu_rbd_co_pwrite_zeroes(BlockDriverState * bs,int64_t offset,int64_t bytes,BdrvRequestFlags flags)1394 coroutine_fn qemu_rbd_co_pwrite_zeroes(BlockDriverState *bs, int64_t offset,
1395                                        int64_t bytes, BdrvRequestFlags flags)
1396 {
1397     return qemu_rbd_start_co(bs, offset, bytes, NULL, flags,
1398                              RBD_AIO_WRITE_ZEROES);
1399 }
1400 #endif
1401 
1402 static int coroutine_fn
qemu_rbd_co_get_info(BlockDriverState * bs,BlockDriverInfo * bdi)1403 qemu_rbd_co_get_info(BlockDriverState *bs, BlockDriverInfo *bdi)
1404 {
1405     BDRVRBDState *s = bs->opaque;
1406     bdi->cluster_size = s->object_size;
1407     return 0;
1408 }
1409 
qemu_rbd_get_specific_info(BlockDriverState * bs,Error ** errp)1410 static ImageInfoSpecific *qemu_rbd_get_specific_info(BlockDriverState *bs,
1411                                                      Error **errp)
1412 {
1413     BDRVRBDState *s = bs->opaque;
1414     ImageInfoSpecific *spec_info;
1415     char buf[RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN] = {0};
1416     int r;
1417 
1418     if (s->image_size >= RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN) {
1419         r = rbd_read(s->image, 0,
1420                      RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN, buf);
1421         if (r < 0) {
1422             error_setg_errno(errp, -r, "cannot read image start for probe");
1423             return NULL;
1424         }
1425     }
1426 
1427     spec_info = g_new(ImageInfoSpecific, 1);
1428     *spec_info = (ImageInfoSpecific){
1429         .type  = IMAGE_INFO_SPECIFIC_KIND_RBD,
1430         .u.rbd.data = g_new0(ImageInfoSpecificRbd, 1),
1431     };
1432 
1433     if (memcmp(buf, rbd_luks_header_verification,
1434                RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN) == 0) {
1435         spec_info->u.rbd.data->encryption_format =
1436                 RBD_IMAGE_ENCRYPTION_FORMAT_LUKS;
1437         spec_info->u.rbd.data->has_encryption_format = true;
1438     } else if (memcmp(buf, rbd_luks2_header_verification,
1439                RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN) == 0) {
1440         spec_info->u.rbd.data->encryption_format =
1441                 RBD_IMAGE_ENCRYPTION_FORMAT_LUKS2;
1442         spec_info->u.rbd.data->has_encryption_format = true;
1443     } else if (memcmp(buf, rbd_layered_luks_header_verification,
1444                RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN) == 0) {
1445         spec_info->u.rbd.data->encryption_format =
1446                 RBD_IMAGE_ENCRYPTION_FORMAT_LUKS;
1447         spec_info->u.rbd.data->has_encryption_format = true;
1448     } else if (memcmp(buf, rbd_layered_luks2_header_verification,
1449                RBD_ENCRYPTION_LUKS_HEADER_VERIFICATION_LEN) == 0) {
1450         spec_info->u.rbd.data->encryption_format =
1451                 RBD_IMAGE_ENCRYPTION_FORMAT_LUKS2;
1452         spec_info->u.rbd.data->has_encryption_format = true;
1453     } else {
1454         spec_info->u.rbd.data->has_encryption_format = false;
1455     }
1456 
1457     return spec_info;
1458 }
1459 
1460 /*
1461  * rbd_diff_iterate2 allows to interrupt the exection by returning a negative
1462  * value in the callback routine. Choose a value that does not conflict with
1463  * an existing exitcode and return it if we want to prematurely stop the
1464  * execution because we detected a change in the allocation status.
1465  */
1466 #define QEMU_RBD_EXIT_DIFF_ITERATE2 -9000
1467 
qemu_rbd_diff_iterate_cb(uint64_t offs,size_t len,int exists,void * opaque)1468 static int qemu_rbd_diff_iterate_cb(uint64_t offs, size_t len,
1469                                     int exists, void *opaque)
1470 {
1471     RBDDiffIterateReq *req = opaque;
1472 
1473     assert(req->offs + req->bytes <= offs);
1474 
1475     /* treat a hole like an unallocated area and bail out */
1476     if (!exists) {
1477         return 0;
1478     }
1479 
1480     if (!req->exists && offs > req->offs) {
1481         /*
1482          * we started in an unallocated area and hit the first allocated
1483          * block. req->bytes must be set to the length of the unallocated area
1484          * before the allocated area. stop further processing.
1485          */
1486         req->bytes = offs - req->offs;
1487         return QEMU_RBD_EXIT_DIFF_ITERATE2;
1488     }
1489 
1490     if (req->exists && offs > req->offs + req->bytes) {
1491         /*
1492          * we started in an allocated area and jumped over an unallocated area,
1493          * req->bytes contains the length of the allocated area before the
1494          * unallocated area. stop further processing.
1495          */
1496         return QEMU_RBD_EXIT_DIFF_ITERATE2;
1497     }
1498 
1499     req->bytes += len;
1500     req->exists = true;
1501 
1502     return 0;
1503 }
1504 
qemu_rbd_co_block_status(BlockDriverState * bs,unsigned int mode,int64_t offset,int64_t bytes,int64_t * pnum,int64_t * map,BlockDriverState ** file)1505 static int coroutine_fn qemu_rbd_co_block_status(BlockDriverState *bs,
1506                                                  unsigned int mode,
1507                                                  int64_t offset, int64_t bytes,
1508                                                  int64_t *pnum, int64_t *map,
1509                                                  BlockDriverState **file)
1510 {
1511     BDRVRBDState *s = bs->opaque;
1512     int status, r;
1513     RBDDiffIterateReq req = { .offs = offset };
1514     uint64_t features, flags;
1515     uint64_t head = 0;
1516 
1517     assert(offset + bytes <= s->image_size);
1518 
1519     /* default to all sectors allocated */
1520     status = BDRV_BLOCK_DATA | BDRV_BLOCK_OFFSET_VALID;
1521     *map = offset;
1522     *file = bs;
1523     *pnum = bytes;
1524 
1525     /* check if RBD image supports fast-diff */
1526     r = rbd_get_features(s->image, &features);
1527     if (r < 0) {
1528         return status;
1529     }
1530     if (!(features & RBD_FEATURE_FAST_DIFF)) {
1531         return status;
1532     }
1533 
1534     /* check if RBD fast-diff result is valid */
1535     r = rbd_get_flags(s->image, &flags);
1536     if (r < 0) {
1537         return status;
1538     }
1539     if (flags & RBD_FLAG_FAST_DIFF_INVALID) {
1540         return status;
1541     }
1542 
1543 #if LIBRBD_VERSION_CODE < LIBRBD_VERSION(1, 17, 0)
1544     /*
1545      * librbd had a bug until early 2022 that affected all versions of ceph that
1546      * supported fast-diff. This bug results in reporting of incorrect offsets
1547      * if the offset parameter to rbd_diff_iterate2 is not object aligned.
1548      * Work around this bug by rounding down the offset to object boundaries.
1549      * This is OK because we call rbd_diff_iterate2 with whole_object = true.
1550      * However, this workaround only works for non cloned images with default
1551      * striping.
1552      *
1553      * See: https://tracker.ceph.com/issues/53784
1554      */
1555 
1556     /* check if RBD image has non-default striping enabled */
1557     if (features & RBD_FEATURE_STRIPINGV2) {
1558         return status;
1559     }
1560 
1561 #pragma GCC diagnostic push
1562 #pragma GCC diagnostic ignored "-Wdeprecated-declarations"
1563     /*
1564      * check if RBD image is a clone (= has a parent).
1565      *
1566      * rbd_get_parent_info is deprecated from Nautilus onwards, but the
1567      * replacement rbd_get_parent is not present in Luminous and Mimic.
1568      */
1569     if (rbd_get_parent_info(s->image, NULL, 0, NULL, 0, NULL, 0) != -ENOENT) {
1570         return status;
1571     }
1572 #pragma GCC diagnostic pop
1573 
1574     head = req.offs & (s->object_size - 1);
1575     req.offs -= head;
1576     bytes += head;
1577 #endif
1578 
1579     r = rbd_diff_iterate2(s->image, NULL, req.offs, bytes, true, true,
1580                           qemu_rbd_diff_iterate_cb, &req);
1581     if (r < 0 && r != QEMU_RBD_EXIT_DIFF_ITERATE2) {
1582         return status;
1583     }
1584     assert(req.bytes <= bytes);
1585     if (!req.exists) {
1586         if (r == 0) {
1587             /*
1588              * rbd_diff_iterate2 does not invoke callbacks for unallocated
1589              * areas. This here catches the case where no callback was
1590              * invoked at all (req.bytes == 0).
1591              */
1592             assert(req.bytes == 0);
1593             req.bytes = bytes;
1594         }
1595         status = BDRV_BLOCK_ZERO | BDRV_BLOCK_OFFSET_VALID;
1596     }
1597 
1598     assert(req.bytes > head);
1599     *pnum = req.bytes - head;
1600     return status;
1601 }
1602 
qemu_rbd_co_getlength(BlockDriverState * bs)1603 static int64_t coroutine_fn qemu_rbd_co_getlength(BlockDriverState *bs)
1604 {
1605     BDRVRBDState *s = bs->opaque;
1606     int r;
1607 
1608     r = rbd_get_size(s->image, &s->image_size);
1609     if (r < 0) {
1610         return r;
1611     }
1612 
1613     return s->image_size;
1614 }
1615 
qemu_rbd_co_truncate(BlockDriverState * bs,int64_t offset,bool exact,PreallocMode prealloc,BdrvRequestFlags flags,Error ** errp)1616 static int coroutine_fn qemu_rbd_co_truncate(BlockDriverState *bs,
1617                                              int64_t offset,
1618                                              bool exact,
1619                                              PreallocMode prealloc,
1620                                              BdrvRequestFlags flags,
1621                                              Error **errp)
1622 {
1623     int r;
1624 
1625     if (prealloc != PREALLOC_MODE_OFF) {
1626         error_setg(errp, "Unsupported preallocation mode '%s'",
1627                    PreallocMode_str(prealloc));
1628         return -ENOTSUP;
1629     }
1630 
1631     r = qemu_rbd_resize(bs, offset);
1632     if (r < 0) {
1633         error_setg_errno(errp, -r, "Failed to resize file");
1634         return r;
1635     }
1636 
1637     return 0;
1638 }
1639 
qemu_rbd_snap_create(BlockDriverState * bs,QEMUSnapshotInfo * sn_info)1640 static int qemu_rbd_snap_create(BlockDriverState *bs,
1641                                 QEMUSnapshotInfo *sn_info)
1642 {
1643     BDRVRBDState *s = bs->opaque;
1644     int r;
1645 
1646     if (sn_info->name[0] == '\0') {
1647         return -EINVAL; /* we need a name for rbd snapshots */
1648     }
1649 
1650     /*
1651      * rbd snapshots are using the name as the user controlled unique identifier
1652      * we can't use the rbd snapid for that purpose, as it can't be set
1653      */
1654     if (sn_info->id_str[0] != '\0' &&
1655         strcmp(sn_info->id_str, sn_info->name) != 0) {
1656         return -EINVAL;
1657     }
1658 
1659     if (strlen(sn_info->name) >= sizeof(sn_info->id_str)) {
1660         return -ERANGE;
1661     }
1662 
1663     r = rbd_snap_create(s->image, sn_info->name);
1664     if (r < 0) {
1665         error_report("failed to create snap: %s", strerror(-r));
1666         return r;
1667     }
1668 
1669     return 0;
1670 }
1671 
qemu_rbd_snap_remove(BlockDriverState * bs,const char * snapshot_id,const char * snapshot_name,Error ** errp)1672 static int qemu_rbd_snap_remove(BlockDriverState *bs,
1673                                 const char *snapshot_id,
1674                                 const char *snapshot_name,
1675                                 Error **errp)
1676 {
1677     BDRVRBDState *s = bs->opaque;
1678     int r;
1679 
1680     if (!snapshot_name) {
1681         error_setg(errp, "rbd need a valid snapshot name");
1682         return -EINVAL;
1683     }
1684 
1685     /* If snapshot_id is specified, it must be equal to name, see
1686        qemu_rbd_snap_list() */
1687     if (snapshot_id && strcmp(snapshot_id, snapshot_name)) {
1688         error_setg(errp,
1689                    "rbd do not support snapshot id, it should be NULL or "
1690                    "equal to snapshot name");
1691         return -EINVAL;
1692     }
1693 
1694     r = rbd_snap_remove(s->image, snapshot_name);
1695     if (r < 0) {
1696         error_setg_errno(errp, -r, "Failed to remove the snapshot");
1697     }
1698     return r;
1699 }
1700 
qemu_rbd_snap_rollback(BlockDriverState * bs,const char * snapshot_name)1701 static int qemu_rbd_snap_rollback(BlockDriverState *bs,
1702                                   const char *snapshot_name)
1703 {
1704     BDRVRBDState *s = bs->opaque;
1705 
1706     return rbd_snap_rollback(s->image, snapshot_name);
1707 }
1708 
qemu_rbd_snap_list(BlockDriverState * bs,QEMUSnapshotInfo ** psn_tab)1709 static int qemu_rbd_snap_list(BlockDriverState *bs,
1710                               QEMUSnapshotInfo **psn_tab)
1711 {
1712     BDRVRBDState *s = bs->opaque;
1713     QEMUSnapshotInfo *sn_info, *sn_tab = NULL;
1714     int i, snap_count;
1715     rbd_snap_info_t *snaps;
1716     int max_snaps = RBD_MAX_SNAPS;
1717 
1718     do {
1719         snaps = g_new(rbd_snap_info_t, max_snaps);
1720         snap_count = rbd_snap_list(s->image, snaps, &max_snaps);
1721         if (snap_count <= 0) {
1722             g_free(snaps);
1723         }
1724     } while (snap_count == -ERANGE);
1725 
1726     if (snap_count <= 0) {
1727         goto done;
1728     }
1729 
1730     sn_tab = g_new0(QEMUSnapshotInfo, snap_count);
1731 
1732     for (i = 0; i < snap_count; i++) {
1733         const char *snap_name = snaps[i].name;
1734 
1735         sn_info = sn_tab + i;
1736         pstrcpy(sn_info->id_str, sizeof(sn_info->id_str), snap_name);
1737         pstrcpy(sn_info->name, sizeof(sn_info->name), snap_name);
1738 
1739         sn_info->vm_state_size = snaps[i].size;
1740         sn_info->date_sec = 0;
1741         sn_info->date_nsec = 0;
1742         sn_info->vm_clock_nsec = 0;
1743     }
1744     rbd_snap_list_end(snaps);
1745     g_free(snaps);
1746 
1747  done:
1748     *psn_tab = sn_tab;
1749     return snap_count;
1750 }
1751 
qemu_rbd_co_invalidate_cache(BlockDriverState * bs,Error ** errp)1752 static void coroutine_fn qemu_rbd_co_invalidate_cache(BlockDriverState *bs,
1753                                                       Error **errp)
1754 {
1755     BDRVRBDState *s = bs->opaque;
1756     int r = rbd_invalidate_cache(s->image);
1757     if (r < 0) {
1758         error_setg_errno(errp, -r, "Failed to invalidate the cache");
1759     }
1760 }
1761 
1762 static QemuOptsList qemu_rbd_create_opts = {
1763     .name = "rbd-create-opts",
1764     .head = QTAILQ_HEAD_INITIALIZER(qemu_rbd_create_opts.head),
1765     .desc = {
1766         {
1767             .name = BLOCK_OPT_SIZE,
1768             .type = QEMU_OPT_SIZE,
1769             .help = "Virtual disk size"
1770         },
1771         {
1772             .name = BLOCK_OPT_CLUSTER_SIZE,
1773             .type = QEMU_OPT_SIZE,
1774             .help = "RBD object size"
1775         },
1776         {
1777             .name = "password-secret",
1778             .type = QEMU_OPT_STRING,
1779             .help = "ID of secret providing the password",
1780         },
1781         {
1782             .name = "encrypt.format",
1783             .type = QEMU_OPT_STRING,
1784             .help = "Encrypt the image, format choices: 'luks', 'luks2'",
1785         },
1786         {
1787             .name = "encrypt.cipher-alg",
1788             .type = QEMU_OPT_STRING,
1789             .help = "Name of encryption cipher algorithm"
1790                     " (allowed values: aes-128, aes-256)",
1791         },
1792         {
1793             .name = "encrypt.key-secret",
1794             .type = QEMU_OPT_STRING,
1795             .help = "ID of secret providing LUKS passphrase",
1796         },
1797         { /* end of list */ }
1798     }
1799 };
1800 
1801 static const char *const qemu_rbd_strong_runtime_opts[] = {
1802     "pool",
1803     "namespace",
1804     "image",
1805     "conf",
1806     "snapshot",
1807     "user",
1808     "server.",
1809     "password-secret",
1810 
1811     NULL
1812 };
1813 
1814 static BlockDriver bdrv_rbd = {
1815     .format_name            = "rbd",
1816     .instance_size          = sizeof(BDRVRBDState),
1817 
1818     .bdrv_parse_filename    = qemu_rbd_parse_filename,
1819     .bdrv_open              = qemu_rbd_open,
1820     .bdrv_close             = qemu_rbd_close,
1821     .bdrv_reopen_prepare    = qemu_rbd_reopen_prepare,
1822     .bdrv_co_create         = qemu_rbd_co_create,
1823     .bdrv_co_create_opts    = qemu_rbd_co_create_opts,
1824     .bdrv_has_zero_init     = bdrv_has_zero_init_1,
1825     .bdrv_co_get_info       = qemu_rbd_co_get_info,
1826     .bdrv_get_specific_info = qemu_rbd_get_specific_info,
1827     .create_opts            = &qemu_rbd_create_opts,
1828     .bdrv_co_getlength      = qemu_rbd_co_getlength,
1829     .bdrv_co_truncate       = qemu_rbd_co_truncate,
1830     .protocol_name          = "rbd",
1831 
1832     .bdrv_co_preadv         = qemu_rbd_co_preadv,
1833     .bdrv_co_pwritev        = qemu_rbd_co_pwritev,
1834     .bdrv_co_flush_to_disk  = qemu_rbd_co_flush,
1835     .bdrv_co_pdiscard       = qemu_rbd_co_pdiscard,
1836 #ifdef LIBRBD_SUPPORTS_WRITE_ZEROES
1837     .bdrv_co_pwrite_zeroes  = qemu_rbd_co_pwrite_zeroes,
1838 #endif
1839     .bdrv_co_block_status   = qemu_rbd_co_block_status,
1840 
1841     .bdrv_snapshot_create   = qemu_rbd_snap_create,
1842     .bdrv_snapshot_delete   = qemu_rbd_snap_remove,
1843     .bdrv_snapshot_list     = qemu_rbd_snap_list,
1844     .bdrv_snapshot_goto     = qemu_rbd_snap_rollback,
1845     .bdrv_co_invalidate_cache = qemu_rbd_co_invalidate_cache,
1846 
1847     .strong_runtime_opts    = qemu_rbd_strong_runtime_opts,
1848 };
1849 
bdrv_rbd_init(void)1850 static void bdrv_rbd_init(void)
1851 {
1852     bdrv_register(&bdrv_rbd);
1853 }
1854 
1855 block_init(bdrv_rbd_init);
1856