xref: /qemu/migration/rdma.c (revision b6f568262b48ca0cb3dc018909908017551b5679)
1 /*
2  * RDMA protocol and interfaces
3  *
4  * Copyright IBM, Corp. 2010-2013
5  * Copyright Red Hat, Inc. 2015-2016
6  *
7  * Authors:
8  *  Michael R. Hines <mrhines@us.ibm.com>
9  *  Jiuxing Liu <jl@us.ibm.com>
10  *  Daniel P. Berrange <berrange@redhat.com>
11  *
12  * This work is licensed under the terms of the GNU GPL, version 2 or
13  * later.  See the COPYING file in the top-level directory.
14  *
15  */
16 
17 #include "qemu/osdep.h"
18 #include "qapi/error.h"
19 #include "qemu/cutils.h"
20 #include "exec/target_page.h"
21 #include "rdma.h"
22 #include "migration.h"
23 #include "migration-stats.h"
24 #include "qemu-file.h"
25 #include "ram.h"
26 #include "qemu/error-report.h"
27 #include "qemu/main-loop.h"
28 #include "qemu/module.h"
29 #include "qemu/rcu.h"
30 #include "qemu/sockets.h"
31 #include "qemu/bitmap.h"
32 #include "qemu/coroutine.h"
33 #include "system/memory.h"
34 #include <sys/socket.h>
35 #include <netdb.h>
36 #include <arpa/inet.h>
37 #include <rdma/rdma_cma.h>
38 #include "trace.h"
39 #include "qom/object.h"
40 #include "options.h"
41 #include <poll.h>
42 
43 #define RDMA_RESOLVE_TIMEOUT_MS 10000
44 
45 /* Do not merge data if larger than this. */
46 #define RDMA_MERGE_MAX (2 * 1024 * 1024)
47 #define RDMA_SIGNALED_SEND_MAX (RDMA_MERGE_MAX / 4096)
48 
49 #define RDMA_REG_CHUNK_SHIFT 20 /* 1 MB */
50 
51 /*
52  * This is only for non-live state being migrated.
53  * Instead of RDMA_WRITE messages, we use RDMA_SEND
54  * messages for that state, which requires a different
55  * delivery design than main memory.
56  */
57 #define RDMA_SEND_INCREMENT 32768
58 
59 /*
60  * Maximum size infiniband SEND message
61  */
62 #define RDMA_CONTROL_MAX_BUFFER (512 * 1024)
63 #define RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE 4096
64 
65 #define RDMA_CONTROL_VERSION_CURRENT 1
66 /*
67  * Capabilities for negotiation.
68  */
69 #define RDMA_CAPABILITY_PIN_ALL 0x01
70 
71 /*
72  * Add the other flags above to this list of known capabilities
73  * as they are introduced.
74  */
75 static uint32_t known_capabilities = RDMA_CAPABILITY_PIN_ALL;
76 
77 /*
78  * A work request ID is 64-bits and we split up these bits
79  * into 3 parts:
80  *
81  * bits 0-15 : type of control message, 2^16
82  * bits 16-29: ram block index, 2^14
83  * bits 30-63: ram block chunk number, 2^34
84  *
85  * The last two bit ranges are only used for RDMA writes,
86  * in order to track their completion and potentially
87  * also track unregistration status of the message.
88  */
89 #define RDMA_WRID_TYPE_SHIFT  0UL
90 #define RDMA_WRID_BLOCK_SHIFT 16UL
91 #define RDMA_WRID_CHUNK_SHIFT 30UL
92 
93 #define RDMA_WRID_TYPE_MASK \
94     ((1UL << RDMA_WRID_BLOCK_SHIFT) - 1UL)
95 
96 #define RDMA_WRID_BLOCK_MASK \
97     (~RDMA_WRID_TYPE_MASK & ((1UL << RDMA_WRID_CHUNK_SHIFT) - 1UL))
98 
99 #define RDMA_WRID_CHUNK_MASK (~RDMA_WRID_BLOCK_MASK & ~RDMA_WRID_TYPE_MASK)
100 
101 /*
102  * RDMA migration protocol:
103  * 1. RDMA Writes (data messages, i.e. RAM)
104  * 2. IB Send/Recv (control channel messages)
105  */
106 enum {
107     RDMA_WRID_NONE = 0,
108     RDMA_WRID_RDMA_WRITE = 1,
109     RDMA_WRID_SEND_CONTROL = 2000,
110     RDMA_WRID_RECV_CONTROL = 4000,
111 };
112 
113 /*
114  * Work request IDs for IB SEND messages only (not RDMA writes).
115  * This is used by the migration protocol to transmit
116  * control messages (such as device state and registration commands)
117  *
118  * We could use more WRs, but we have enough for now.
119  */
120 enum {
121     RDMA_WRID_READY = 0,
122     RDMA_WRID_DATA,
123     RDMA_WRID_CONTROL,
124     RDMA_WRID_MAX,
125 };
126 
127 /*
128  * SEND/RECV IB Control Messages.
129  */
130 enum {
131     RDMA_CONTROL_NONE = 0,
132     RDMA_CONTROL_ERROR,
133     RDMA_CONTROL_READY,               /* ready to receive */
134     RDMA_CONTROL_QEMU_FILE,           /* QEMUFile-transmitted bytes */
135     RDMA_CONTROL_RAM_BLOCKS_REQUEST,  /* RAMBlock synchronization */
136     RDMA_CONTROL_RAM_BLOCKS_RESULT,   /* RAMBlock synchronization */
137     RDMA_CONTROL_COMPRESS,            /* page contains repeat values */
138     RDMA_CONTROL_REGISTER_REQUEST,    /* dynamic page registration */
139     RDMA_CONTROL_REGISTER_RESULT,     /* key to use after registration */
140     RDMA_CONTROL_REGISTER_FINISHED,   /* current iteration finished */
141     RDMA_CONTROL_UNREGISTER_REQUEST,  /* dynamic UN-registration */
142     RDMA_CONTROL_UNREGISTER_FINISHED, /* unpinning finished */
143 };
144 
145 
146 /*
147  * Memory and MR structures used to represent an IB Send/Recv work request.
148  * This is *not* used for RDMA writes, only IB Send/Recv.
149  */
150 typedef struct {
151     uint8_t  control[RDMA_CONTROL_MAX_BUFFER]; /* actual buffer to register */
152     struct   ibv_mr *control_mr;               /* registration metadata */
153     size_t   control_len;                      /* length of the message */
154     uint8_t *control_curr;                     /* start of unconsumed bytes */
155 } RDMAWorkRequestData;
156 
157 /*
158  * Negotiate RDMA capabilities during connection-setup time.
159  */
160 typedef struct {
161     uint32_t version;
162     uint32_t flags;
163 } RDMACapabilities;
164 
caps_to_network(RDMACapabilities * cap)165 static void caps_to_network(RDMACapabilities *cap)
166 {
167     cap->version = htonl(cap->version);
168     cap->flags = htonl(cap->flags);
169 }
170 
network_to_caps(RDMACapabilities * cap)171 static void network_to_caps(RDMACapabilities *cap)
172 {
173     cap->version = ntohl(cap->version);
174     cap->flags = ntohl(cap->flags);
175 }
176 
177 /*
178  * Representation of a RAMBlock from an RDMA perspective.
179  * This is not transmitted, only local.
180  * This and subsequent structures cannot be linked lists
181  * because we're using a single IB message to transmit
182  * the information. It's small anyway, so a list is overkill.
183  */
184 typedef struct RDMALocalBlock {
185     char          *block_name;
186     uint8_t       *local_host_addr; /* local virtual address */
187     uint64_t       remote_host_addr; /* remote virtual address */
188     uint64_t       offset;
189     uint64_t       length;
190     struct         ibv_mr **pmr;    /* MRs for chunk-level registration */
191     struct         ibv_mr *mr;      /* MR for non-chunk-level registration */
192     uint32_t      *remote_keys;     /* rkeys for chunk-level registration */
193     uint32_t       remote_rkey;     /* rkeys for non-chunk-level registration */
194     int            index;           /* which block are we */
195     unsigned int   src_index;       /* (Only used on dest) */
196     bool           is_ram_block;
197     int            nb_chunks;
198     unsigned long *transit_bitmap;
199     unsigned long *unregister_bitmap;
200 } RDMALocalBlock;
201 
202 /*
203  * Also represents a RAMblock, but only on the dest.
204  * This gets transmitted by the dest during connection-time
205  * to the source VM and then is used to populate the
206  * corresponding RDMALocalBlock with
207  * the information needed to perform the actual RDMA.
208  */
209 typedef struct QEMU_PACKED RDMADestBlock {
210     uint64_t remote_host_addr;
211     uint64_t offset;
212     uint64_t length;
213     uint32_t remote_rkey;
214     uint32_t padding;
215 } RDMADestBlock;
216 
control_desc(unsigned int rdma_control)217 static const char *control_desc(unsigned int rdma_control)
218 {
219     static const char *strs[] = {
220         [RDMA_CONTROL_NONE] = "NONE",
221         [RDMA_CONTROL_ERROR] = "ERROR",
222         [RDMA_CONTROL_READY] = "READY",
223         [RDMA_CONTROL_QEMU_FILE] = "QEMU FILE",
224         [RDMA_CONTROL_RAM_BLOCKS_REQUEST] = "RAM BLOCKS REQUEST",
225         [RDMA_CONTROL_RAM_BLOCKS_RESULT] = "RAM BLOCKS RESULT",
226         [RDMA_CONTROL_COMPRESS] = "COMPRESS",
227         [RDMA_CONTROL_REGISTER_REQUEST] = "REGISTER REQUEST",
228         [RDMA_CONTROL_REGISTER_RESULT] = "REGISTER RESULT",
229         [RDMA_CONTROL_REGISTER_FINISHED] = "REGISTER FINISHED",
230         [RDMA_CONTROL_UNREGISTER_REQUEST] = "UNREGISTER REQUEST",
231         [RDMA_CONTROL_UNREGISTER_FINISHED] = "UNREGISTER FINISHED",
232     };
233 
234     if (rdma_control > RDMA_CONTROL_UNREGISTER_FINISHED) {
235         return "??BAD CONTROL VALUE??";
236     }
237 
238     return strs[rdma_control];
239 }
240 
241 #if !defined(htonll)
htonll(uint64_t v)242 static uint64_t htonll(uint64_t v)
243 {
244     union { uint32_t lv[2]; uint64_t llv; } u;
245     u.lv[0] = htonl(v >> 32);
246     u.lv[1] = htonl(v & 0xFFFFFFFFULL);
247     return u.llv;
248 }
249 #endif
250 
251 #if !defined(ntohll)
ntohll(uint64_t v)252 static uint64_t ntohll(uint64_t v)
253 {
254     union { uint32_t lv[2]; uint64_t llv; } u;
255     u.llv = v;
256     return ((uint64_t)ntohl(u.lv[0]) << 32) | (uint64_t) ntohl(u.lv[1]);
257 }
258 #endif
259 
dest_block_to_network(RDMADestBlock * db)260 static void dest_block_to_network(RDMADestBlock *db)
261 {
262     db->remote_host_addr = htonll(db->remote_host_addr);
263     db->offset = htonll(db->offset);
264     db->length = htonll(db->length);
265     db->remote_rkey = htonl(db->remote_rkey);
266 }
267 
network_to_dest_block(RDMADestBlock * db)268 static void network_to_dest_block(RDMADestBlock *db)
269 {
270     db->remote_host_addr = ntohll(db->remote_host_addr);
271     db->offset = ntohll(db->offset);
272     db->length = ntohll(db->length);
273     db->remote_rkey = ntohl(db->remote_rkey);
274 }
275 
276 /*
277  * Virtual address of the above structures used for transmitting
278  * the RAMBlock descriptions at connection-time.
279  * This structure is *not* transmitted.
280  */
281 typedef struct RDMALocalBlocks {
282     int nb_blocks;
283     bool     init;             /* main memory init complete */
284     RDMALocalBlock *block;
285 } RDMALocalBlocks;
286 
287 /*
288  * Main data structure for RDMA state.
289  * While there is only one copy of this structure being allocated right now,
290  * this is the place where one would start if you wanted to consider
291  * having more than one RDMA connection open at the same time.
292  */
293 typedef struct RDMAContext {
294     char *host;
295     int port;
296 
297     RDMAWorkRequestData wr_data[RDMA_WRID_MAX];
298 
299     /*
300      * This is used by *_exchange_send() to figure out whether or not
301      * the initial "READY" message has already been received or not.
302      * This is because other functions may potentially poll() and detect
303      * the READY message before send() does, in which case we need to
304      * know if it completed.
305      */
306     int control_ready_expected;
307 
308     /* number of outstanding writes */
309     int nb_sent;
310 
311     /* store info about current buffer so that we can
312        merge it with future sends */
313     uint64_t current_addr;
314     uint64_t current_length;
315     /* index of ram block the current buffer belongs to */
316     int current_index;
317     /* index of the chunk in the current ram block */
318     int current_chunk;
319 
320     bool pin_all;
321 
322     /*
323      * infiniband-specific variables for opening the device
324      * and maintaining connection state and so forth.
325      *
326      * cm_id also has ibv_context, rdma_event_channel, and ibv_qp in
327      * cm_id->verbs, cm_id->channel, and cm_id->qp.
328      */
329     struct rdma_cm_id *cm_id;               /* connection manager ID */
330     struct rdma_cm_id *listen_id;
331     bool connected;
332 
333     struct ibv_context          *verbs;
334     struct rdma_event_channel   *channel;
335     struct ibv_qp *qp;                      /* queue pair */
336     struct ibv_comp_channel *recv_comp_channel;  /* recv completion channel */
337     struct ibv_comp_channel *send_comp_channel;  /* send completion channel */
338     struct ibv_pd *pd;                      /* protection domain */
339     struct ibv_cq *recv_cq;                 /* recvieve completion queue */
340     struct ibv_cq *send_cq;                 /* send completion queue */
341 
342     /*
343      * If a previous write failed (perhaps because of a failed
344      * memory registration, then do not attempt any future work
345      * and remember the error state.
346      */
347     bool errored;
348     bool error_reported;
349     bool received_error;
350 
351     /*
352      * Description of ram blocks used throughout the code.
353      */
354     RDMALocalBlocks local_ram_blocks;
355     RDMADestBlock  *dest_blocks;
356 
357     /* Index of the next RAMBlock received during block registration */
358     unsigned int    next_src_index;
359 
360     /*
361      * Migration on *destination* started.
362      * Then use coroutine yield function.
363      * Source runs in a thread, so we don't care.
364      */
365     int migration_started_on_destination;
366 
367     int total_registrations;
368     int total_writes;
369 
370     int unregister_current, unregister_next;
371     uint64_t unregistrations[RDMA_SIGNALED_SEND_MAX];
372 
373     GHashTable *blockmap;
374 
375     /* the RDMAContext for return path */
376     struct RDMAContext *return_path;
377     bool is_return_path;
378 } RDMAContext;
379 
380 #define TYPE_QIO_CHANNEL_RDMA "qio-channel-rdma"
381 OBJECT_DECLARE_SIMPLE_TYPE(QIOChannelRDMA, QIO_CHANNEL_RDMA)
382 
383 
384 
385 struct QIOChannelRDMA {
386     QIOChannel parent;
387     RDMAContext *rdmain;
388     RDMAContext *rdmaout;
389     QEMUFile *file;
390     bool blocking; /* XXX we don't actually honour this yet */
391 };
392 
393 /*
394  * Main structure for IB Send/Recv control messages.
395  * This gets prepended at the beginning of every Send/Recv.
396  */
397 typedef struct QEMU_PACKED {
398     uint32_t len;     /* Total length of data portion */
399     uint32_t type;    /* which control command to perform */
400     uint32_t repeat;  /* number of commands in data portion of same type */
401     uint32_t padding;
402 } RDMAControlHeader;
403 
control_to_network(RDMAControlHeader * control)404 static void control_to_network(RDMAControlHeader *control)
405 {
406     control->type = htonl(control->type);
407     control->len = htonl(control->len);
408     control->repeat = htonl(control->repeat);
409 }
410 
network_to_control(RDMAControlHeader * control)411 static void network_to_control(RDMAControlHeader *control)
412 {
413     control->type = ntohl(control->type);
414     control->len = ntohl(control->len);
415     control->repeat = ntohl(control->repeat);
416 }
417 
418 /*
419  * Register a single Chunk.
420  * Information sent by the source VM to inform the dest
421  * to register an single chunk of memory before we can perform
422  * the actual RDMA operation.
423  */
424 typedef struct QEMU_PACKED {
425     union QEMU_PACKED {
426         uint64_t current_addr;  /* offset into the ram_addr_t space */
427         uint64_t chunk;         /* chunk to lookup if unregistering */
428     } key;
429     uint32_t current_index; /* which ramblock the chunk belongs to */
430     uint32_t padding;
431     uint64_t chunks;            /* how many sequential chunks to register */
432 } RDMARegister;
433 
rdma_errored(RDMAContext * rdma)434 static bool rdma_errored(RDMAContext *rdma)
435 {
436     if (rdma->errored && !rdma->error_reported) {
437         error_report("RDMA is in an error state waiting migration"
438                      " to abort!");
439         rdma->error_reported = true;
440     }
441     return rdma->errored;
442 }
443 
register_to_network(RDMAContext * rdma,RDMARegister * reg)444 static void register_to_network(RDMAContext *rdma, RDMARegister *reg)
445 {
446     RDMALocalBlock *local_block;
447     local_block  = &rdma->local_ram_blocks.block[reg->current_index];
448 
449     if (local_block->is_ram_block) {
450         /*
451          * current_addr as passed in is an address in the local ram_addr_t
452          * space, we need to translate this for the destination
453          */
454         reg->key.current_addr -= local_block->offset;
455         reg->key.current_addr += rdma->dest_blocks[reg->current_index].offset;
456     }
457     reg->key.current_addr = htonll(reg->key.current_addr);
458     reg->current_index = htonl(reg->current_index);
459     reg->chunks = htonll(reg->chunks);
460 }
461 
network_to_register(RDMARegister * reg)462 static void network_to_register(RDMARegister *reg)
463 {
464     reg->key.current_addr = ntohll(reg->key.current_addr);
465     reg->current_index = ntohl(reg->current_index);
466     reg->chunks = ntohll(reg->chunks);
467 }
468 
469 typedef struct QEMU_PACKED {
470     uint32_t value;     /* if zero, we will madvise() */
471     uint32_t block_idx; /* which ram block index */
472     uint64_t offset;    /* Address in remote ram_addr_t space */
473     uint64_t length;    /* length of the chunk */
474 } RDMACompress;
475 
compress_to_network(RDMAContext * rdma,RDMACompress * comp)476 static void compress_to_network(RDMAContext *rdma, RDMACompress *comp)
477 {
478     comp->value = htonl(comp->value);
479     /*
480      * comp->offset as passed in is an address in the local ram_addr_t
481      * space, we need to translate this for the destination
482      */
483     comp->offset -= rdma->local_ram_blocks.block[comp->block_idx].offset;
484     comp->offset += rdma->dest_blocks[comp->block_idx].offset;
485     comp->block_idx = htonl(comp->block_idx);
486     comp->offset = htonll(comp->offset);
487     comp->length = htonll(comp->length);
488 }
489 
network_to_compress(RDMACompress * comp)490 static void network_to_compress(RDMACompress *comp)
491 {
492     comp->value = ntohl(comp->value);
493     comp->block_idx = ntohl(comp->block_idx);
494     comp->offset = ntohll(comp->offset);
495     comp->length = ntohll(comp->length);
496 }
497 
498 /*
499  * The result of the dest's memory registration produces an "rkey"
500  * which the source VM must reference in order to perform
501  * the RDMA operation.
502  */
503 typedef struct QEMU_PACKED {
504     uint32_t rkey;
505     uint32_t padding;
506     uint64_t host_addr;
507 } RDMARegisterResult;
508 
result_to_network(RDMARegisterResult * result)509 static void result_to_network(RDMARegisterResult *result)
510 {
511     result->rkey = htonl(result->rkey);
512     result->host_addr = htonll(result->host_addr);
513 };
514 
network_to_result(RDMARegisterResult * result)515 static void network_to_result(RDMARegisterResult *result)
516 {
517     result->rkey = ntohl(result->rkey);
518     result->host_addr = ntohll(result->host_addr);
519 };
520 
521 static int qemu_rdma_exchange_send(RDMAContext *rdma, RDMAControlHeader *head,
522                                    uint8_t *data, RDMAControlHeader *resp,
523                                    int *resp_idx,
524                                    int (*callback)(RDMAContext *rdma,
525                                                    Error **errp),
526                                    Error **errp);
527 
ram_chunk_index(const uint8_t * start,const uint8_t * host)528 static inline uint64_t ram_chunk_index(const uint8_t *start,
529                                        const uint8_t *host)
530 {
531     return ((uintptr_t) host - (uintptr_t) start) >> RDMA_REG_CHUNK_SHIFT;
532 }
533 
ram_chunk_start(const RDMALocalBlock * rdma_ram_block,uint64_t i)534 static inline uint8_t *ram_chunk_start(const RDMALocalBlock *rdma_ram_block,
535                                        uint64_t i)
536 {
537     return (uint8_t *)(uintptr_t)(rdma_ram_block->local_host_addr +
538                                   (i << RDMA_REG_CHUNK_SHIFT));
539 }
540 
ram_chunk_end(const RDMALocalBlock * rdma_ram_block,uint64_t i)541 static inline uint8_t *ram_chunk_end(const RDMALocalBlock *rdma_ram_block,
542                                      uint64_t i)
543 {
544     uint8_t *result = ram_chunk_start(rdma_ram_block, i) +
545                                          (1UL << RDMA_REG_CHUNK_SHIFT);
546 
547     if (result > (rdma_ram_block->local_host_addr + rdma_ram_block->length)) {
548         result = rdma_ram_block->local_host_addr + rdma_ram_block->length;
549     }
550 
551     return result;
552 }
553 
rdma_add_block(RDMAContext * rdma,const char * block_name,void * host_addr,ram_addr_t block_offset,uint64_t length)554 static void rdma_add_block(RDMAContext *rdma, const char *block_name,
555                            void *host_addr,
556                            ram_addr_t block_offset, uint64_t length)
557 {
558     RDMALocalBlocks *local = &rdma->local_ram_blocks;
559     RDMALocalBlock *block;
560     RDMALocalBlock *old = local->block;
561 
562     local->block = g_new0(RDMALocalBlock, local->nb_blocks + 1);
563 
564     if (local->nb_blocks) {
565         if (rdma->blockmap) {
566             for (int x = 0; x < local->nb_blocks; x++) {
567                 g_hash_table_remove(rdma->blockmap,
568                                     (void *)(uintptr_t)old[x].offset);
569                 g_hash_table_insert(rdma->blockmap,
570                                     (void *)(uintptr_t)old[x].offset,
571                                     &local->block[x]);
572             }
573         }
574         memcpy(local->block, old, sizeof(RDMALocalBlock) * local->nb_blocks);
575         g_free(old);
576     }
577 
578     block = &local->block[local->nb_blocks];
579 
580     block->block_name = g_strdup(block_name);
581     block->local_host_addr = host_addr;
582     block->offset = block_offset;
583     block->length = length;
584     block->index = local->nb_blocks;
585     block->src_index = ~0U; /* Filled in by the receipt of the block list */
586     block->nb_chunks = ram_chunk_index(host_addr, host_addr + length) + 1UL;
587     block->transit_bitmap = bitmap_new(block->nb_chunks);
588     bitmap_clear(block->transit_bitmap, 0, block->nb_chunks);
589     block->unregister_bitmap = bitmap_new(block->nb_chunks);
590     bitmap_clear(block->unregister_bitmap, 0, block->nb_chunks);
591     block->remote_keys = g_new0(uint32_t, block->nb_chunks);
592 
593     block->is_ram_block = local->init ? false : true;
594 
595     if (rdma->blockmap) {
596         g_hash_table_insert(rdma->blockmap, (void *)(uintptr_t)block_offset, block);
597     }
598 
599     trace_rdma_add_block(block_name, local->nb_blocks,
600                          (uintptr_t) block->local_host_addr,
601                          block->offset, block->length,
602                          (uintptr_t) (block->local_host_addr + block->length),
603                          BITS_TO_LONGS(block->nb_chunks) *
604                              sizeof(unsigned long) * 8,
605                          block->nb_chunks);
606 
607     local->nb_blocks++;
608 }
609 
610 /*
611  * Memory regions need to be registered with the device and queue pairs setup
612  * in advanced before the migration starts. This tells us where the RAM blocks
613  * are so that we can register them individually.
614  */
qemu_rdma_init_one_block(RAMBlock * rb,void * opaque)615 static int qemu_rdma_init_one_block(RAMBlock *rb, void *opaque)
616 {
617     const char *block_name = qemu_ram_get_idstr(rb);
618     void *host_addr = qemu_ram_get_host_addr(rb);
619     ram_addr_t block_offset = qemu_ram_get_offset(rb);
620     ram_addr_t length = qemu_ram_get_used_length(rb);
621     rdma_add_block(opaque, block_name, host_addr, block_offset, length);
622     return 0;
623 }
624 
625 /*
626  * Identify the RAMBlocks and their quantity. They will be references to
627  * identify chunk boundaries inside each RAMBlock and also be referenced
628  * during dynamic page registration.
629  */
qemu_rdma_init_ram_blocks(RDMAContext * rdma)630 static void qemu_rdma_init_ram_blocks(RDMAContext *rdma)
631 {
632     RDMALocalBlocks *local = &rdma->local_ram_blocks;
633     int ret;
634 
635     assert(rdma->blockmap == NULL);
636     memset(local, 0, sizeof *local);
637     ret = foreach_not_ignored_block(qemu_rdma_init_one_block, rdma);
638     assert(!ret);
639     trace_qemu_rdma_init_ram_blocks(local->nb_blocks);
640     rdma->dest_blocks = g_new0(RDMADestBlock,
641                                rdma->local_ram_blocks.nb_blocks);
642     local->init = true;
643 }
644 
645 /*
646  * Note: If used outside of cleanup, the caller must ensure that the destination
647  * block structures are also updated
648  */
rdma_delete_block(RDMAContext * rdma,RDMALocalBlock * block)649 static void rdma_delete_block(RDMAContext *rdma, RDMALocalBlock *block)
650 {
651     RDMALocalBlocks *local = &rdma->local_ram_blocks;
652     RDMALocalBlock *old = local->block;
653 
654     if (rdma->blockmap) {
655         g_hash_table_remove(rdma->blockmap, (void *)(uintptr_t)block->offset);
656     }
657     if (block->pmr) {
658         for (int j = 0; j < block->nb_chunks; j++) {
659             if (!block->pmr[j]) {
660                 continue;
661             }
662             ibv_dereg_mr(block->pmr[j]);
663             rdma->total_registrations--;
664         }
665         g_free(block->pmr);
666         block->pmr = NULL;
667     }
668 
669     if (block->mr) {
670         ibv_dereg_mr(block->mr);
671         rdma->total_registrations--;
672         block->mr = NULL;
673     }
674 
675     g_free(block->transit_bitmap);
676     block->transit_bitmap = NULL;
677 
678     g_free(block->unregister_bitmap);
679     block->unregister_bitmap = NULL;
680 
681     g_free(block->remote_keys);
682     block->remote_keys = NULL;
683 
684     g_free(block->block_name);
685     block->block_name = NULL;
686 
687     if (rdma->blockmap) {
688         for (int x = 0; x < local->nb_blocks; x++) {
689             g_hash_table_remove(rdma->blockmap,
690                                 (void *)(uintptr_t)old[x].offset);
691         }
692     }
693 
694     if (local->nb_blocks > 1) {
695 
696         local->block = g_new0(RDMALocalBlock, local->nb_blocks - 1);
697 
698         if (block->index) {
699             memcpy(local->block, old, sizeof(RDMALocalBlock) * block->index);
700         }
701 
702         if (block->index < (local->nb_blocks - 1)) {
703             memcpy(local->block + block->index, old + (block->index + 1),
704                 sizeof(RDMALocalBlock) *
705                     (local->nb_blocks - (block->index + 1)));
706             for (int x = block->index; x < local->nb_blocks - 1; x++) {
707                 local->block[x].index--;
708             }
709         }
710     } else {
711         assert(block == local->block);
712         local->block = NULL;
713     }
714 
715     trace_rdma_delete_block(block, (uintptr_t)block->local_host_addr,
716                            block->offset, block->length,
717                             (uintptr_t)(block->local_host_addr + block->length),
718                            BITS_TO_LONGS(block->nb_chunks) *
719                                sizeof(unsigned long) * 8, block->nb_chunks);
720 
721     g_free(old);
722 
723     local->nb_blocks--;
724 
725     if (local->nb_blocks && rdma->blockmap) {
726         for (int x = 0; x < local->nb_blocks; x++) {
727             g_hash_table_insert(rdma->blockmap,
728                                 (void *)(uintptr_t)local->block[x].offset,
729                                 &local->block[x]);
730         }
731     }
732 }
733 
734 /*
735  * Trace RDMA device open, with device details.
736  */
qemu_rdma_dump_id(const char * who,struct ibv_context * verbs)737 static void qemu_rdma_dump_id(const char *who, struct ibv_context *verbs)
738 {
739     struct ibv_port_attr port;
740 
741     if (ibv_query_port(verbs, 1, &port)) {
742         trace_qemu_rdma_dump_id_failed(who);
743         return;
744     }
745 
746     trace_qemu_rdma_dump_id(who,
747                 verbs->device->name,
748                 verbs->device->dev_name,
749                 verbs->device->dev_path,
750                 verbs->device->ibdev_path,
751                 port.link_layer,
752                 port.link_layer == IBV_LINK_LAYER_INFINIBAND ? "Infiniband"
753                 : port.link_layer == IBV_LINK_LAYER_ETHERNET ? "Ethernet"
754                 : "Unknown");
755 }
756 
757 /*
758  * Trace RDMA gid addressing information.
759  * Useful for understanding the RDMA device hierarchy in the kernel.
760  */
qemu_rdma_dump_gid(const char * who,struct rdma_cm_id * id)761 static void qemu_rdma_dump_gid(const char *who, struct rdma_cm_id *id)
762 {
763     char sgid[33];
764     char dgid[33];
765     inet_ntop(AF_INET6, &id->route.addr.addr.ibaddr.sgid, sgid, sizeof sgid);
766     inet_ntop(AF_INET6, &id->route.addr.addr.ibaddr.dgid, dgid, sizeof dgid);
767     trace_qemu_rdma_dump_gid(who, sgid, dgid);
768 }
769 
770 /*
771  * Figure out which RDMA device corresponds to the requested IP hostname
772  * Also create the initial connection manager identifiers for opening
773  * the connection.
774  */
qemu_rdma_resolve_host(RDMAContext * rdma,Error ** errp)775 static int qemu_rdma_resolve_host(RDMAContext *rdma, Error **errp)
776 {
777     int ret;
778     struct rdma_addrinfo *res;
779     char port_str[16];
780     struct rdma_cm_event *cm_event;
781     char ip[40] = "unknown";
782 
783     if (rdma->host == NULL || !strcmp(rdma->host, "")) {
784         error_setg(errp, "RDMA ERROR: RDMA hostname has not been set");
785         return -1;
786     }
787 
788     /* create CM channel */
789     rdma->channel = rdma_create_event_channel();
790     if (!rdma->channel) {
791         error_setg(errp, "RDMA ERROR: could not create CM channel");
792         return -1;
793     }
794 
795     /* create CM id */
796     ret = rdma_create_id(rdma->channel, &rdma->cm_id, NULL, RDMA_PS_TCP);
797     if (ret < 0) {
798         error_setg(errp, "RDMA ERROR: could not create channel id");
799         goto err_resolve_create_id;
800     }
801 
802     snprintf(port_str, 16, "%d", rdma->port);
803     port_str[15] = '\0';
804 
805     ret = rdma_getaddrinfo(rdma->host, port_str, NULL, &res);
806     if (ret) {
807         error_setg(errp, "RDMA ERROR: could not rdma_getaddrinfo address %s",
808                    rdma->host);
809         goto err_resolve_get_addr;
810     }
811 
812     /* Try all addresses, exit loop on first success of resolving address */
813     for (struct rdma_addrinfo *e = res; e != NULL; e = e->ai_next) {
814 
815         inet_ntop(e->ai_family,
816             &((struct sockaddr_in *) e->ai_dst_addr)->sin_addr, ip, sizeof ip);
817         trace_qemu_rdma_resolve_host_trying(rdma->host, ip);
818 
819         ret = rdma_resolve_addr(rdma->cm_id, NULL, e->ai_dst_addr,
820                 RDMA_RESOLVE_TIMEOUT_MS);
821         if (ret >= 0) {
822             goto route;
823         }
824     }
825 
826     rdma_freeaddrinfo(res);
827     error_setg(errp, "RDMA ERROR: could not resolve address %s", rdma->host);
828     goto err_resolve_get_addr;
829 
830 route:
831     rdma_freeaddrinfo(res);
832     qemu_rdma_dump_gid("source_resolve_addr", rdma->cm_id);
833 
834     ret = rdma_get_cm_event(rdma->channel, &cm_event);
835     if (ret < 0) {
836         error_setg(errp, "RDMA ERROR: could not perform event_addr_resolved");
837         goto err_resolve_get_addr;
838     }
839 
840     if (cm_event->event != RDMA_CM_EVENT_ADDR_RESOLVED) {
841         error_setg(errp,
842                    "RDMA ERROR: result not equal to event_addr_resolved %s",
843                    rdma_event_str(cm_event->event));
844         rdma_ack_cm_event(cm_event);
845         goto err_resolve_get_addr;
846     }
847     rdma_ack_cm_event(cm_event);
848 
849     /* resolve route */
850     ret = rdma_resolve_route(rdma->cm_id, RDMA_RESOLVE_TIMEOUT_MS);
851     if (ret < 0) {
852         error_setg(errp, "RDMA ERROR: could not resolve rdma route");
853         goto err_resolve_get_addr;
854     }
855 
856     ret = rdma_get_cm_event(rdma->channel, &cm_event);
857     if (ret < 0) {
858         error_setg(errp, "RDMA ERROR: could not perform event_route_resolved");
859         goto err_resolve_get_addr;
860     }
861     if (cm_event->event != RDMA_CM_EVENT_ROUTE_RESOLVED) {
862         error_setg(errp, "RDMA ERROR: "
863                    "result not equal to event_route_resolved: %s",
864                    rdma_event_str(cm_event->event));
865         rdma_ack_cm_event(cm_event);
866         goto err_resolve_get_addr;
867     }
868     rdma_ack_cm_event(cm_event);
869     rdma->verbs = rdma->cm_id->verbs;
870     qemu_rdma_dump_id("source_resolve_host", rdma->cm_id->verbs);
871     qemu_rdma_dump_gid("source_resolve_host", rdma->cm_id);
872     return 0;
873 
874 err_resolve_get_addr:
875     rdma_destroy_id(rdma->cm_id);
876     rdma->cm_id = NULL;
877 err_resolve_create_id:
878     rdma_destroy_event_channel(rdma->channel);
879     rdma->channel = NULL;
880     return -1;
881 }
882 
883 /*
884  * Create protection domain and completion queues
885  */
qemu_rdma_alloc_pd_cq(RDMAContext * rdma,Error ** errp)886 static int qemu_rdma_alloc_pd_cq(RDMAContext *rdma, Error **errp)
887 {
888     /* allocate pd */
889     rdma->pd = ibv_alloc_pd(rdma->verbs);
890     if (!rdma->pd) {
891         error_setg(errp, "failed to allocate protection domain");
892         return -1;
893     }
894 
895     /* create receive completion channel */
896     rdma->recv_comp_channel = ibv_create_comp_channel(rdma->verbs);
897     if (!rdma->recv_comp_channel) {
898         error_setg(errp, "failed to allocate receive completion channel");
899         goto err_alloc_pd_cq;
900     }
901 
902     /*
903      * Completion queue can be filled by read work requests.
904      */
905     rdma->recv_cq = ibv_create_cq(rdma->verbs, (RDMA_SIGNALED_SEND_MAX * 3),
906                                   NULL, rdma->recv_comp_channel, 0);
907     if (!rdma->recv_cq) {
908         error_setg(errp, "failed to allocate receive completion queue");
909         goto err_alloc_pd_cq;
910     }
911 
912     /* create send completion channel */
913     rdma->send_comp_channel = ibv_create_comp_channel(rdma->verbs);
914     if (!rdma->send_comp_channel) {
915         error_setg(errp, "failed to allocate send completion channel");
916         goto err_alloc_pd_cq;
917     }
918 
919     rdma->send_cq = ibv_create_cq(rdma->verbs, (RDMA_SIGNALED_SEND_MAX * 3),
920                                   NULL, rdma->send_comp_channel, 0);
921     if (!rdma->send_cq) {
922         error_setg(errp, "failed to allocate send completion queue");
923         goto err_alloc_pd_cq;
924     }
925 
926     return 0;
927 
928 err_alloc_pd_cq:
929     if (rdma->pd) {
930         ibv_dealloc_pd(rdma->pd);
931     }
932     if (rdma->recv_comp_channel) {
933         ibv_destroy_comp_channel(rdma->recv_comp_channel);
934     }
935     if (rdma->send_comp_channel) {
936         ibv_destroy_comp_channel(rdma->send_comp_channel);
937     }
938     if (rdma->recv_cq) {
939         ibv_destroy_cq(rdma->recv_cq);
940         rdma->recv_cq = NULL;
941     }
942     rdma->pd = NULL;
943     rdma->recv_comp_channel = NULL;
944     rdma->send_comp_channel = NULL;
945     return -1;
946 
947 }
948 
949 /*
950  * Create queue pairs.
951  */
qemu_rdma_alloc_qp(RDMAContext * rdma)952 static int qemu_rdma_alloc_qp(RDMAContext *rdma)
953 {
954     struct ibv_qp_init_attr attr = { 0 };
955 
956     attr.cap.max_send_wr = RDMA_SIGNALED_SEND_MAX;
957     attr.cap.max_recv_wr = 3;
958     attr.cap.max_send_sge = 1;
959     attr.cap.max_recv_sge = 1;
960     attr.send_cq = rdma->send_cq;
961     attr.recv_cq = rdma->recv_cq;
962     attr.qp_type = IBV_QPT_RC;
963 
964     if (rdma_create_qp(rdma->cm_id, rdma->pd, &attr) < 0) {
965         return -1;
966     }
967 
968     rdma->qp = rdma->cm_id->qp;
969     return 0;
970 }
971 
972 /* Check whether On-Demand Paging is supported by RDAM device */
rdma_support_odp(struct ibv_context * dev)973 static bool rdma_support_odp(struct ibv_context *dev)
974 {
975     struct ibv_device_attr_ex attr = {0};
976 
977     if (ibv_query_device_ex(dev, NULL, &attr)) {
978         return false;
979     }
980 
981     if (attr.odp_caps.general_caps & IBV_ODP_SUPPORT) {
982         return true;
983     }
984 
985     return false;
986 }
987 
988 /*
989  * ibv_advise_mr to avoid RNR NAK error as far as possible.
990  * The responder mr registering with ODP will sent RNR NAK back to
991  * the requester in the face of the page fault.
992  */
qemu_rdma_advise_prefetch_mr(struct ibv_pd * pd,uint64_t addr,uint32_t len,uint32_t lkey,const char * name,bool wr)993 static void qemu_rdma_advise_prefetch_mr(struct ibv_pd *pd, uint64_t addr,
994                                          uint32_t len,  uint32_t lkey,
995                                          const char *name, bool wr)
996 {
997 #ifdef HAVE_IBV_ADVISE_MR
998     int ret;
999     int advice = wr ? IBV_ADVISE_MR_ADVICE_PREFETCH_WRITE :
1000                  IBV_ADVISE_MR_ADVICE_PREFETCH;
1001     struct ibv_sge sg_list = {.lkey = lkey, .addr = addr, .length = len};
1002 
1003     ret = ibv_advise_mr(pd, advice,
1004                         IBV_ADVISE_MR_FLAG_FLUSH, &sg_list, 1);
1005     /* ignore the error */
1006     trace_qemu_rdma_advise_mr(name, len, addr, strerror(ret));
1007 #endif
1008 }
1009 
qemu_rdma_reg_whole_ram_blocks(RDMAContext * rdma,Error ** errp)1010 static int qemu_rdma_reg_whole_ram_blocks(RDMAContext *rdma, Error **errp)
1011 {
1012     int i;
1013     RDMALocalBlocks *local = &rdma->local_ram_blocks;
1014 
1015     for (i = 0; i < local->nb_blocks; i++) {
1016         int access = IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE;
1017 
1018         local->block[i].mr =
1019             ibv_reg_mr(rdma->pd,
1020                     local->block[i].local_host_addr,
1021                     local->block[i].length, access
1022                     );
1023         /*
1024          * ibv_reg_mr() is not documented to set errno.  If it does,
1025          * it's somebody else's doc bug.  If it doesn't, the use of
1026          * errno below is wrong.
1027          * TODO Find out whether ibv_reg_mr() sets errno.
1028          */
1029         if (!local->block[i].mr &&
1030             errno == ENOTSUP && rdma_support_odp(rdma->verbs)) {
1031                 access |= IBV_ACCESS_ON_DEMAND;
1032                 /* register ODP mr */
1033                 local->block[i].mr =
1034                     ibv_reg_mr(rdma->pd,
1035                                local->block[i].local_host_addr,
1036                                local->block[i].length, access);
1037                 trace_qemu_rdma_register_odp_mr(local->block[i].block_name);
1038 
1039                 if (local->block[i].mr) {
1040                     qemu_rdma_advise_prefetch_mr(rdma->pd,
1041                                     (uintptr_t)local->block[i].local_host_addr,
1042                                     local->block[i].length,
1043                                     local->block[i].mr->lkey,
1044                                     local->block[i].block_name,
1045                                     true);
1046                 }
1047         }
1048 
1049         if (!local->block[i].mr) {
1050             error_setg_errno(errp, errno,
1051                              "Failed to register local dest ram block!");
1052             goto err;
1053         }
1054         rdma->total_registrations++;
1055     }
1056 
1057     return 0;
1058 
1059 err:
1060     for (i--; i >= 0; i--) {
1061         ibv_dereg_mr(local->block[i].mr);
1062         local->block[i].mr = NULL;
1063         rdma->total_registrations--;
1064     }
1065 
1066     return -1;
1067 
1068 }
1069 
1070 /*
1071  * Find the ram block that corresponds to the page requested to be
1072  * transmitted by QEMU.
1073  *
1074  * Once the block is found, also identify which 'chunk' within that
1075  * block that the page belongs to.
1076  */
qemu_rdma_search_ram_block(RDMAContext * rdma,uintptr_t block_offset,uint64_t offset,uint64_t length,uint64_t * block_index,uint64_t * chunk_index)1077 static void qemu_rdma_search_ram_block(RDMAContext *rdma,
1078                                        uintptr_t block_offset,
1079                                        uint64_t offset,
1080                                        uint64_t length,
1081                                        uint64_t *block_index,
1082                                        uint64_t *chunk_index)
1083 {
1084     uint64_t current_addr = block_offset + offset;
1085     RDMALocalBlock *block = g_hash_table_lookup(rdma->blockmap,
1086                                                 (void *) block_offset);
1087     assert(block);
1088     assert(current_addr >= block->offset);
1089     assert((current_addr + length) <= (block->offset + block->length));
1090 
1091     *block_index = block->index;
1092     *chunk_index = ram_chunk_index(block->local_host_addr,
1093                 block->local_host_addr + (current_addr - block->offset));
1094 }
1095 
1096 /*
1097  * Register a chunk with IB. If the chunk was already registered
1098  * previously, then skip.
1099  *
1100  * Also return the keys associated with the registration needed
1101  * to perform the actual RDMA operation.
1102  */
qemu_rdma_register_and_get_keys(RDMAContext * rdma,RDMALocalBlock * block,uintptr_t host_addr,uint32_t * lkey,uint32_t * rkey,int chunk,uint8_t * chunk_start,uint8_t * chunk_end)1103 static int qemu_rdma_register_and_get_keys(RDMAContext *rdma,
1104         RDMALocalBlock *block, uintptr_t host_addr,
1105         uint32_t *lkey, uint32_t *rkey, int chunk,
1106         uint8_t *chunk_start, uint8_t *chunk_end)
1107 {
1108     if (block->mr) {
1109         if (lkey) {
1110             *lkey = block->mr->lkey;
1111         }
1112         if (rkey) {
1113             *rkey = block->mr->rkey;
1114         }
1115         return 0;
1116     }
1117 
1118     /* allocate memory to store chunk MRs */
1119     if (!block->pmr) {
1120         block->pmr = g_new0(struct ibv_mr *, block->nb_chunks);
1121     }
1122 
1123     /*
1124      * If 'rkey', then we're the destination, so grant access to the source.
1125      *
1126      * If 'lkey', then we're the source VM, so grant access only to ourselves.
1127      */
1128     if (!block->pmr[chunk]) {
1129         uint64_t len = chunk_end - chunk_start;
1130         int access = rkey ? IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE :
1131                      0;
1132 
1133         trace_qemu_rdma_register_and_get_keys(len, chunk_start);
1134 
1135         block->pmr[chunk] = ibv_reg_mr(rdma->pd, chunk_start, len, access);
1136         /*
1137          * ibv_reg_mr() is not documented to set errno.  If it does,
1138          * it's somebody else's doc bug.  If it doesn't, the use of
1139          * errno below is wrong.
1140          * TODO Find out whether ibv_reg_mr() sets errno.
1141          */
1142         if (!block->pmr[chunk] &&
1143             errno == ENOTSUP && rdma_support_odp(rdma->verbs)) {
1144             access |= IBV_ACCESS_ON_DEMAND;
1145             /* register ODP mr */
1146             block->pmr[chunk] = ibv_reg_mr(rdma->pd, chunk_start, len, access);
1147             trace_qemu_rdma_register_odp_mr(block->block_name);
1148 
1149             if (block->pmr[chunk]) {
1150                 qemu_rdma_advise_prefetch_mr(rdma->pd, (uintptr_t)chunk_start,
1151                                             len, block->pmr[chunk]->lkey,
1152                                             block->block_name, rkey);
1153 
1154             }
1155         }
1156     }
1157     if (!block->pmr[chunk]) {
1158         return -1;
1159     }
1160     rdma->total_registrations++;
1161 
1162     if (lkey) {
1163         *lkey = block->pmr[chunk]->lkey;
1164     }
1165     if (rkey) {
1166         *rkey = block->pmr[chunk]->rkey;
1167     }
1168     return 0;
1169 }
1170 
1171 /*
1172  * Register (at connection time) the memory used for control
1173  * channel messages.
1174  */
qemu_rdma_reg_control(RDMAContext * rdma,int idx)1175 static int qemu_rdma_reg_control(RDMAContext *rdma, int idx)
1176 {
1177     rdma->wr_data[idx].control_mr = ibv_reg_mr(rdma->pd,
1178             rdma->wr_data[idx].control, RDMA_CONTROL_MAX_BUFFER,
1179             IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE);
1180     if (rdma->wr_data[idx].control_mr) {
1181         rdma->total_registrations++;
1182         return 0;
1183     }
1184     return -1;
1185 }
1186 
1187 /*
1188  * Perform a non-optimized memory unregistration after every transfer
1189  * for demonstration purposes, only if pin-all is not requested.
1190  *
1191  * Potential optimizations:
1192  * 1. Start a new thread to run this function continuously
1193         - for bit clearing
1194         - and for receipt of unregister messages
1195  * 2. Use an LRU.
1196  * 3. Use workload hints.
1197  */
qemu_rdma_unregister_waiting(RDMAContext * rdma)1198 static int qemu_rdma_unregister_waiting(RDMAContext *rdma)
1199 {
1200     Error *err = NULL;
1201 
1202     while (rdma->unregistrations[rdma->unregister_current]) {
1203         int ret;
1204         uint64_t wr_id = rdma->unregistrations[rdma->unregister_current];
1205         uint64_t chunk =
1206             (wr_id & RDMA_WRID_CHUNK_MASK) >> RDMA_WRID_CHUNK_SHIFT;
1207         uint64_t index =
1208             (wr_id & RDMA_WRID_BLOCK_MASK) >> RDMA_WRID_BLOCK_SHIFT;
1209         RDMALocalBlock *block =
1210             &(rdma->local_ram_blocks.block[index]);
1211         RDMARegister reg = { .current_index = index };
1212         RDMAControlHeader resp = { .type = RDMA_CONTROL_UNREGISTER_FINISHED,
1213                                  };
1214         RDMAControlHeader head = { .len = sizeof(RDMARegister),
1215                                    .type = RDMA_CONTROL_UNREGISTER_REQUEST,
1216                                    .repeat = 1,
1217                                  };
1218 
1219         trace_qemu_rdma_unregister_waiting_proc(chunk,
1220                                                 rdma->unregister_current);
1221 
1222         rdma->unregistrations[rdma->unregister_current] = 0;
1223         rdma->unregister_current++;
1224 
1225         if (rdma->unregister_current == RDMA_SIGNALED_SEND_MAX) {
1226             rdma->unregister_current = 0;
1227         }
1228 
1229 
1230         /*
1231          * Unregistration is speculative (because migration is single-threaded
1232          * and we cannot break the protocol's inifinband message ordering).
1233          * Thus, if the memory is currently being used for transmission,
1234          * then abort the attempt to unregister and try again
1235          * later the next time a completion is received for this memory.
1236          */
1237         clear_bit(chunk, block->unregister_bitmap);
1238 
1239         if (test_bit(chunk, block->transit_bitmap)) {
1240             trace_qemu_rdma_unregister_waiting_inflight(chunk);
1241             continue;
1242         }
1243 
1244         trace_qemu_rdma_unregister_waiting_send(chunk);
1245 
1246         ret = ibv_dereg_mr(block->pmr[chunk]);
1247         block->pmr[chunk] = NULL;
1248         block->remote_keys[chunk] = 0;
1249 
1250         if (ret != 0) {
1251             error_report("unregistration chunk failed: %s",
1252                          strerror(ret));
1253             return -1;
1254         }
1255         rdma->total_registrations--;
1256 
1257         reg.key.chunk = chunk;
1258         register_to_network(rdma, &reg);
1259         ret = qemu_rdma_exchange_send(rdma, &head, (uint8_t *) &reg,
1260                                       &resp, NULL, NULL, &err);
1261         if (ret < 0) {
1262             error_report_err(err);
1263             return -1;
1264         }
1265 
1266         trace_qemu_rdma_unregister_waiting_complete(chunk);
1267     }
1268 
1269     return 0;
1270 }
1271 
qemu_rdma_make_wrid(uint64_t wr_id,uint64_t index,uint64_t chunk)1272 static uint64_t qemu_rdma_make_wrid(uint64_t wr_id, uint64_t index,
1273                                          uint64_t chunk)
1274 {
1275     uint64_t result = wr_id & RDMA_WRID_TYPE_MASK;
1276 
1277     result |= (index << RDMA_WRID_BLOCK_SHIFT);
1278     result |= (chunk << RDMA_WRID_CHUNK_SHIFT);
1279 
1280     return result;
1281 }
1282 
1283 /*
1284  * Consult the connection manager to see a work request
1285  * (of any kind) has completed.
1286  * Return the work request ID that completed.
1287  */
qemu_rdma_poll(RDMAContext * rdma,struct ibv_cq * cq,uint64_t * wr_id_out,uint32_t * byte_len)1288 static int qemu_rdma_poll(RDMAContext *rdma, struct ibv_cq *cq,
1289                           uint64_t *wr_id_out, uint32_t *byte_len)
1290 {
1291     int ret;
1292     struct ibv_wc wc;
1293     uint64_t wr_id;
1294 
1295     ret = ibv_poll_cq(cq, 1, &wc);
1296 
1297     if (!ret) {
1298         *wr_id_out = RDMA_WRID_NONE;
1299         return 0;
1300     }
1301 
1302     if (ret < 0) {
1303         return -1;
1304     }
1305 
1306     wr_id = wc.wr_id & RDMA_WRID_TYPE_MASK;
1307 
1308     if (wc.status != IBV_WC_SUCCESS) {
1309         return -1;
1310     }
1311 
1312     if (rdma->control_ready_expected &&
1313         (wr_id >= RDMA_WRID_RECV_CONTROL)) {
1314         trace_qemu_rdma_poll_recv(wr_id - RDMA_WRID_RECV_CONTROL, wr_id,
1315                                   rdma->nb_sent);
1316         rdma->control_ready_expected = 0;
1317     }
1318 
1319     if (wr_id == RDMA_WRID_RDMA_WRITE) {
1320         uint64_t chunk =
1321             (wc.wr_id & RDMA_WRID_CHUNK_MASK) >> RDMA_WRID_CHUNK_SHIFT;
1322         uint64_t index =
1323             (wc.wr_id & RDMA_WRID_BLOCK_MASK) >> RDMA_WRID_BLOCK_SHIFT;
1324         RDMALocalBlock *block = &(rdma->local_ram_blocks.block[index]);
1325 
1326         trace_qemu_rdma_poll_write(wr_id, rdma->nb_sent,
1327                                    index, chunk, block->local_host_addr,
1328                                    (void *)(uintptr_t)block->remote_host_addr);
1329 
1330         clear_bit(chunk, block->transit_bitmap);
1331 
1332         if (rdma->nb_sent > 0) {
1333             rdma->nb_sent--;
1334         }
1335     } else {
1336         trace_qemu_rdma_poll_other(wr_id, rdma->nb_sent);
1337     }
1338 
1339     *wr_id_out = wc.wr_id;
1340     if (byte_len) {
1341         *byte_len = wc.byte_len;
1342     }
1343 
1344     return  0;
1345 }
1346 
1347 /* Wait for activity on the completion channel.
1348  * Returns 0 on success, none-0 on error.
1349  */
qemu_rdma_wait_comp_channel(RDMAContext * rdma,struct ibv_comp_channel * comp_channel)1350 static int qemu_rdma_wait_comp_channel(RDMAContext *rdma,
1351                                        struct ibv_comp_channel *comp_channel)
1352 {
1353     struct rdma_cm_event *cm_event;
1354 
1355     /*
1356      * Coroutine doesn't start until migration_fd_process_incoming()
1357      * so don't yield unless we know we're running inside of a coroutine.
1358      */
1359     if (rdma->migration_started_on_destination &&
1360         migration_incoming_get_current()->state == MIGRATION_STATUS_ACTIVE) {
1361         yield_until_fd_readable(comp_channel->fd);
1362     } else {
1363         /* This is the source side, we're in a separate thread
1364          * or destination prior to migration_fd_process_incoming()
1365          * after postcopy, the destination also in a separate thread.
1366          * we can't yield; so we have to poll the fd.
1367          * But we need to be able to handle 'cancel' or an error
1368          * without hanging forever.
1369          */
1370         while (!rdma->errored && !rdma->received_error) {
1371             GPollFD pfds[2];
1372             pfds[0].fd = comp_channel->fd;
1373             pfds[0].events = G_IO_IN | G_IO_HUP | G_IO_ERR;
1374             pfds[0].revents = 0;
1375 
1376             pfds[1].fd = rdma->channel->fd;
1377             pfds[1].events = G_IO_IN | G_IO_HUP | G_IO_ERR;
1378             pfds[1].revents = 0;
1379 
1380             /* 0.1s timeout, should be fine for a 'cancel' */
1381             switch (qemu_poll_ns(pfds, 2, 100 * 1000 * 1000)) {
1382             case 2:
1383             case 1: /* fd active */
1384                 if (pfds[0].revents) {
1385                     return 0;
1386                 }
1387 
1388                 if (pfds[1].revents) {
1389                     if (rdma_get_cm_event(rdma->channel, &cm_event) < 0) {
1390                         return -1;
1391                     }
1392 
1393                     if (cm_event->event == RDMA_CM_EVENT_DISCONNECTED ||
1394                         cm_event->event == RDMA_CM_EVENT_DEVICE_REMOVAL) {
1395                         rdma_ack_cm_event(cm_event);
1396                         return -1;
1397                     }
1398                     rdma_ack_cm_event(cm_event);
1399                 }
1400                 break;
1401 
1402             case 0: /* Timeout, go around again */
1403                 break;
1404 
1405             default: /* Error of some type -
1406                       * I don't trust errno from qemu_poll_ns
1407                      */
1408                 return -1;
1409             }
1410 
1411             if (migrate_get_current()->state == MIGRATION_STATUS_CANCELLING) {
1412                 /* Bail out and let the cancellation happen */
1413                 return -1;
1414             }
1415         }
1416     }
1417 
1418     if (rdma->received_error) {
1419         return -1;
1420     }
1421     return -rdma->errored;
1422 }
1423 
to_channel(RDMAContext * rdma,uint64_t wrid)1424 static struct ibv_comp_channel *to_channel(RDMAContext *rdma, uint64_t wrid)
1425 {
1426     return wrid < RDMA_WRID_RECV_CONTROL ? rdma->send_comp_channel :
1427            rdma->recv_comp_channel;
1428 }
1429 
to_cq(RDMAContext * rdma,uint64_t wrid)1430 static struct ibv_cq *to_cq(RDMAContext *rdma, uint64_t wrid)
1431 {
1432     return wrid < RDMA_WRID_RECV_CONTROL ? rdma->send_cq : rdma->recv_cq;
1433 }
1434 
1435 /*
1436  * Block until the next work request has completed.
1437  *
1438  * First poll to see if a work request has already completed,
1439  * otherwise block.
1440  *
1441  * If we encounter completed work requests for IDs other than
1442  * the one we're interested in, then that's generally an error.
1443  *
1444  * The only exception is actual RDMA Write completions. These
1445  * completions only need to be recorded, but do not actually
1446  * need further processing.
1447  */
qemu_rdma_block_for_wrid(RDMAContext * rdma,uint64_t wrid_requested,uint32_t * byte_len)1448 static int qemu_rdma_block_for_wrid(RDMAContext *rdma,
1449                                     uint64_t wrid_requested,
1450                                     uint32_t *byte_len)
1451 {
1452     int num_cq_events = 0, ret;
1453     struct ibv_cq *cq;
1454     void *cq_ctx;
1455     uint64_t wr_id = RDMA_WRID_NONE, wr_id_in;
1456     struct ibv_comp_channel *ch = to_channel(rdma, wrid_requested);
1457     struct ibv_cq *poll_cq = to_cq(rdma, wrid_requested);
1458 
1459     if (ibv_req_notify_cq(poll_cq, 0)) {
1460         return -1;
1461     }
1462     /* poll cq first */
1463     while (wr_id != wrid_requested) {
1464         ret = qemu_rdma_poll(rdma, poll_cq, &wr_id_in, byte_len);
1465         if (ret < 0) {
1466             return -1;
1467         }
1468 
1469         wr_id = wr_id_in & RDMA_WRID_TYPE_MASK;
1470 
1471         if (wr_id == RDMA_WRID_NONE) {
1472             break;
1473         }
1474         if (wr_id != wrid_requested) {
1475             trace_qemu_rdma_block_for_wrid_miss(wrid_requested, wr_id);
1476         }
1477     }
1478 
1479     if (wr_id == wrid_requested) {
1480         return 0;
1481     }
1482 
1483     while (1) {
1484         ret = qemu_rdma_wait_comp_channel(rdma, ch);
1485         if (ret < 0) {
1486             goto err_block_for_wrid;
1487         }
1488 
1489         ret = ibv_get_cq_event(ch, &cq, &cq_ctx);
1490         if (ret < 0) {
1491             goto err_block_for_wrid;
1492         }
1493 
1494         num_cq_events++;
1495 
1496         if (ibv_req_notify_cq(cq, 0)) {
1497             goto err_block_for_wrid;
1498         }
1499 
1500         while (wr_id != wrid_requested) {
1501             ret = qemu_rdma_poll(rdma, poll_cq, &wr_id_in, byte_len);
1502             if (ret < 0) {
1503                 goto err_block_for_wrid;
1504             }
1505 
1506             wr_id = wr_id_in & RDMA_WRID_TYPE_MASK;
1507 
1508             if (wr_id == RDMA_WRID_NONE) {
1509                 break;
1510             }
1511             if (wr_id != wrid_requested) {
1512                 trace_qemu_rdma_block_for_wrid_miss(wrid_requested, wr_id);
1513             }
1514         }
1515 
1516         if (wr_id == wrid_requested) {
1517             goto success_block_for_wrid;
1518         }
1519     }
1520 
1521 success_block_for_wrid:
1522     if (num_cq_events) {
1523         ibv_ack_cq_events(cq, num_cq_events);
1524     }
1525     return 0;
1526 
1527 err_block_for_wrid:
1528     if (num_cq_events) {
1529         ibv_ack_cq_events(cq, num_cq_events);
1530     }
1531 
1532     rdma->errored = true;
1533     return -1;
1534 }
1535 
1536 /*
1537  * Post a SEND message work request for the control channel
1538  * containing some data and block until the post completes.
1539  */
qemu_rdma_post_send_control(RDMAContext * rdma,uint8_t * buf,RDMAControlHeader * head,Error ** errp)1540 static int qemu_rdma_post_send_control(RDMAContext *rdma, uint8_t *buf,
1541                                        RDMAControlHeader *head,
1542                                        Error **errp)
1543 {
1544     int ret;
1545     RDMAWorkRequestData *wr = &rdma->wr_data[RDMA_WRID_CONTROL];
1546     struct ibv_send_wr *bad_wr;
1547     struct ibv_sge sge = {
1548                            .addr = (uintptr_t)(wr->control),
1549                            .length = head->len + sizeof(RDMAControlHeader),
1550                            .lkey = wr->control_mr->lkey,
1551                          };
1552     struct ibv_send_wr send_wr = {
1553                                    .wr_id = RDMA_WRID_SEND_CONTROL,
1554                                    .opcode = IBV_WR_SEND,
1555                                    .send_flags = IBV_SEND_SIGNALED,
1556                                    .sg_list = &sge,
1557                                    .num_sge = 1,
1558                                 };
1559 
1560     trace_qemu_rdma_post_send_control(control_desc(head->type));
1561 
1562     /*
1563      * We don't actually need to do a memcpy() in here if we used
1564      * the "sge" properly, but since we're only sending control messages
1565      * (not RAM in a performance-critical path), then its OK for now.
1566      *
1567      * The copy makes the RDMAControlHeader simpler to manipulate
1568      * for the time being.
1569      */
1570     assert(head->len <= RDMA_CONTROL_MAX_BUFFER - sizeof(*head));
1571     memcpy(wr->control, head, sizeof(RDMAControlHeader));
1572     control_to_network((void *) wr->control);
1573 
1574     if (buf) {
1575         memcpy(wr->control + sizeof(RDMAControlHeader), buf, head->len);
1576     }
1577 
1578 
1579     ret = ibv_post_send(rdma->qp, &send_wr, &bad_wr);
1580 
1581     if (ret > 0) {
1582         error_setg(errp, "Failed to use post IB SEND for control");
1583         return -1;
1584     }
1585 
1586     ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_SEND_CONTROL, NULL);
1587     if (ret < 0) {
1588         error_setg(errp, "rdma migration: send polling control error");
1589         return -1;
1590     }
1591 
1592     return 0;
1593 }
1594 
1595 /*
1596  * Post a RECV work request in anticipation of some future receipt
1597  * of data on the control channel.
1598  */
qemu_rdma_post_recv_control(RDMAContext * rdma,int idx,Error ** errp)1599 static int qemu_rdma_post_recv_control(RDMAContext *rdma, int idx,
1600                                        Error **errp)
1601 {
1602     struct ibv_recv_wr *bad_wr;
1603     struct ibv_sge sge = {
1604                             .addr = (uintptr_t)(rdma->wr_data[idx].control),
1605                             .length = RDMA_CONTROL_MAX_BUFFER,
1606                             .lkey = rdma->wr_data[idx].control_mr->lkey,
1607                          };
1608 
1609     struct ibv_recv_wr recv_wr = {
1610                                     .wr_id = RDMA_WRID_RECV_CONTROL + idx,
1611                                     .sg_list = &sge,
1612                                     .num_sge = 1,
1613                                  };
1614 
1615 
1616     if (ibv_post_recv(rdma->qp, &recv_wr, &bad_wr)) {
1617         error_setg(errp, "error posting control recv");
1618         return -1;
1619     }
1620 
1621     return 0;
1622 }
1623 
1624 /*
1625  * Block and wait for a RECV control channel message to arrive.
1626  */
qemu_rdma_exchange_get_response(RDMAContext * rdma,RDMAControlHeader * head,uint32_t expecting,int idx,Error ** errp)1627 static int qemu_rdma_exchange_get_response(RDMAContext *rdma,
1628                 RDMAControlHeader *head, uint32_t expecting, int idx,
1629                 Error **errp)
1630 {
1631     uint32_t byte_len;
1632     int ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RECV_CONTROL + idx,
1633                                        &byte_len);
1634 
1635     if (ret < 0) {
1636         error_setg(errp, "rdma migration: recv polling control error!");
1637         return -1;
1638     }
1639 
1640     network_to_control((void *) rdma->wr_data[idx].control);
1641     memcpy(head, rdma->wr_data[idx].control, sizeof(RDMAControlHeader));
1642 
1643     trace_qemu_rdma_exchange_get_response_start(control_desc(expecting));
1644 
1645     if (expecting == RDMA_CONTROL_NONE) {
1646         trace_qemu_rdma_exchange_get_response_none(control_desc(head->type),
1647                                              head->type);
1648     } else if (head->type != expecting || head->type == RDMA_CONTROL_ERROR) {
1649         error_setg(errp, "Was expecting a %s (%d) control message"
1650                 ", but got: %s (%d), length: %d",
1651                 control_desc(expecting), expecting,
1652                 control_desc(head->type), head->type, head->len);
1653         if (head->type == RDMA_CONTROL_ERROR) {
1654             rdma->received_error = true;
1655         }
1656         return -1;
1657     }
1658     if (head->len > RDMA_CONTROL_MAX_BUFFER - sizeof(*head)) {
1659         error_setg(errp, "too long length: %d", head->len);
1660         return -1;
1661     }
1662     if (sizeof(*head) + head->len != byte_len) {
1663         error_setg(errp, "Malformed length: %d byte_len %d",
1664                    head->len, byte_len);
1665         return -1;
1666     }
1667 
1668     return 0;
1669 }
1670 
1671 /*
1672  * When a RECV work request has completed, the work request's
1673  * buffer is pointed at the header.
1674  *
1675  * This will advance the pointer to the data portion
1676  * of the control message of the work request's buffer that
1677  * was populated after the work request finished.
1678  */
qemu_rdma_move_header(RDMAContext * rdma,int idx,RDMAControlHeader * head)1679 static void qemu_rdma_move_header(RDMAContext *rdma, int idx,
1680                                   RDMAControlHeader *head)
1681 {
1682     rdma->wr_data[idx].control_len = head->len;
1683     rdma->wr_data[idx].control_curr =
1684         rdma->wr_data[idx].control + sizeof(RDMAControlHeader);
1685 }
1686 
1687 /*
1688  * This is an 'atomic' high-level operation to deliver a single, unified
1689  * control-channel message.
1690  *
1691  * Additionally, if the user is expecting some kind of reply to this message,
1692  * they can request a 'resp' response message be filled in by posting an
1693  * additional work request on behalf of the user and waiting for an additional
1694  * completion.
1695  *
1696  * The extra (optional) response is used during registration to us from having
1697  * to perform an *additional* exchange of message just to provide a response by
1698  * instead piggy-backing on the acknowledgement.
1699  */
qemu_rdma_exchange_send(RDMAContext * rdma,RDMAControlHeader * head,uint8_t * data,RDMAControlHeader * resp,int * resp_idx,int (* callback)(RDMAContext * rdma,Error ** errp),Error ** errp)1700 static int qemu_rdma_exchange_send(RDMAContext *rdma, RDMAControlHeader *head,
1701                                    uint8_t *data, RDMAControlHeader *resp,
1702                                    int *resp_idx,
1703                                    int (*callback)(RDMAContext *rdma,
1704                                                    Error **errp),
1705                                    Error **errp)
1706 {
1707     int ret;
1708 
1709     /*
1710      * Wait until the dest is ready before attempting to deliver the message
1711      * by waiting for a READY message.
1712      */
1713     if (rdma->control_ready_expected) {
1714         RDMAControlHeader resp_ignored;
1715 
1716         ret = qemu_rdma_exchange_get_response(rdma, &resp_ignored,
1717                                               RDMA_CONTROL_READY,
1718                                               RDMA_WRID_READY, errp);
1719         if (ret < 0) {
1720             return -1;
1721         }
1722     }
1723 
1724     /*
1725      * If the user is expecting a response, post a WR in anticipation of it.
1726      */
1727     if (resp) {
1728         ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_DATA, errp);
1729         if (ret < 0) {
1730             return -1;
1731         }
1732     }
1733 
1734     /*
1735      * Post a WR to replace the one we just consumed for the READY message.
1736      */
1737     ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY, errp);
1738     if (ret < 0) {
1739         return -1;
1740     }
1741 
1742     /*
1743      * Deliver the control message that was requested.
1744      */
1745     ret = qemu_rdma_post_send_control(rdma, data, head, errp);
1746 
1747     if (ret < 0) {
1748         return -1;
1749     }
1750 
1751     /*
1752      * If we're expecting a response, block and wait for it.
1753      */
1754     if (resp) {
1755         if (callback) {
1756             trace_qemu_rdma_exchange_send_issue_callback();
1757             ret = callback(rdma, errp);
1758             if (ret < 0) {
1759                 return -1;
1760             }
1761         }
1762 
1763         trace_qemu_rdma_exchange_send_waiting(control_desc(resp->type));
1764         ret = qemu_rdma_exchange_get_response(rdma, resp,
1765                                               resp->type, RDMA_WRID_DATA,
1766                                               errp);
1767 
1768         if (ret < 0) {
1769             return -1;
1770         }
1771 
1772         qemu_rdma_move_header(rdma, RDMA_WRID_DATA, resp);
1773         if (resp_idx) {
1774             *resp_idx = RDMA_WRID_DATA;
1775         }
1776         trace_qemu_rdma_exchange_send_received(control_desc(resp->type));
1777     }
1778 
1779     rdma->control_ready_expected = 1;
1780 
1781     return 0;
1782 }
1783 
1784 /*
1785  * This is an 'atomic' high-level operation to receive a single, unified
1786  * control-channel message.
1787  */
qemu_rdma_exchange_recv(RDMAContext * rdma,RDMAControlHeader * head,uint32_t expecting,Error ** errp)1788 static int qemu_rdma_exchange_recv(RDMAContext *rdma, RDMAControlHeader *head,
1789                                    uint32_t expecting, Error **errp)
1790 {
1791     RDMAControlHeader ready = {
1792                                 .len = 0,
1793                                 .type = RDMA_CONTROL_READY,
1794                                 .repeat = 1,
1795                               };
1796     int ret;
1797 
1798     /*
1799      * Inform the source that we're ready to receive a message.
1800      */
1801     ret = qemu_rdma_post_send_control(rdma, NULL, &ready, errp);
1802 
1803     if (ret < 0) {
1804         return -1;
1805     }
1806 
1807     /*
1808      * Block and wait for the message.
1809      */
1810     ret = qemu_rdma_exchange_get_response(rdma, head,
1811                                           expecting, RDMA_WRID_READY, errp);
1812 
1813     if (ret < 0) {
1814         return -1;
1815     }
1816 
1817     qemu_rdma_move_header(rdma, RDMA_WRID_READY, head);
1818 
1819     /*
1820      * Post a new RECV work request to replace the one we just consumed.
1821      */
1822     ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY, errp);
1823     if (ret < 0) {
1824         return -1;
1825     }
1826 
1827     return 0;
1828 }
1829 
1830 /*
1831  * Write an actual chunk of memory using RDMA.
1832  *
1833  * If we're using dynamic registration on the dest-side, we have to
1834  * send a registration command first.
1835  */
qemu_rdma_write_one(RDMAContext * rdma,int current_index,uint64_t current_addr,uint64_t length,Error ** errp)1836 static int qemu_rdma_write_one(RDMAContext *rdma,
1837                                int current_index, uint64_t current_addr,
1838                                uint64_t length, Error **errp)
1839 {
1840     struct ibv_sge sge;
1841     struct ibv_send_wr send_wr = { 0 };
1842     struct ibv_send_wr *bad_wr;
1843     int reg_result_idx, ret, count = 0;
1844     uint64_t chunk, chunks;
1845     uint8_t *chunk_start, *chunk_end;
1846     RDMALocalBlock *block = &(rdma->local_ram_blocks.block[current_index]);
1847     RDMARegister reg;
1848     RDMARegisterResult *reg_result;
1849     RDMAControlHeader resp = { .type = RDMA_CONTROL_REGISTER_RESULT };
1850     RDMAControlHeader head = { .len = sizeof(RDMARegister),
1851                                .type = RDMA_CONTROL_REGISTER_REQUEST,
1852                                .repeat = 1,
1853                              };
1854 
1855 retry:
1856     sge.addr = (uintptr_t)(block->local_host_addr +
1857                             (current_addr - block->offset));
1858     sge.length = length;
1859 
1860     chunk = ram_chunk_index(block->local_host_addr,
1861                             (uint8_t *)(uintptr_t)sge.addr);
1862     chunk_start = ram_chunk_start(block, chunk);
1863 
1864     if (block->is_ram_block) {
1865         chunks = length / (1UL << RDMA_REG_CHUNK_SHIFT);
1866 
1867         if (chunks && ((length % (1UL << RDMA_REG_CHUNK_SHIFT)) == 0)) {
1868             chunks--;
1869         }
1870     } else {
1871         chunks = block->length / (1UL << RDMA_REG_CHUNK_SHIFT);
1872 
1873         if (chunks && ((block->length % (1UL << RDMA_REG_CHUNK_SHIFT)) == 0)) {
1874             chunks--;
1875         }
1876     }
1877 
1878     trace_qemu_rdma_write_one_top(chunks + 1,
1879                                   (chunks + 1) *
1880                                   (1UL << RDMA_REG_CHUNK_SHIFT) / 1024 / 1024);
1881 
1882     chunk_end = ram_chunk_end(block, chunk + chunks);
1883 
1884 
1885     while (test_bit(chunk, block->transit_bitmap)) {
1886         (void)count;
1887         trace_qemu_rdma_write_one_block(count++, current_index, chunk,
1888                 sge.addr, length, rdma->nb_sent, block->nb_chunks);
1889 
1890         ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE, NULL);
1891 
1892         if (ret < 0) {
1893             error_setg(errp, "Failed to Wait for previous write to complete "
1894                     "block %d chunk %" PRIu64
1895                     " current %" PRIu64 " len %" PRIu64 " %d",
1896                     current_index, chunk, sge.addr, length, rdma->nb_sent);
1897             return -1;
1898         }
1899     }
1900 
1901     if (!rdma->pin_all || !block->is_ram_block) {
1902         if (!block->remote_keys[chunk]) {
1903             /*
1904              * This chunk has not yet been registered, so first check to see
1905              * if the entire chunk is zero. If so, tell the other size to
1906              * memset() + madvise() the entire chunk without RDMA.
1907              */
1908 
1909             if (buffer_is_zero((void *)(uintptr_t)sge.addr, length)) {
1910                 RDMACompress comp = {
1911                                         .offset = current_addr,
1912                                         .value = 0,
1913                                         .block_idx = current_index,
1914                                         .length = length,
1915                                     };
1916 
1917                 head.len = sizeof(comp);
1918                 head.type = RDMA_CONTROL_COMPRESS;
1919 
1920                 trace_qemu_rdma_write_one_zero(chunk, sge.length,
1921                                                current_index, current_addr);
1922 
1923                 compress_to_network(rdma, &comp);
1924                 ret = qemu_rdma_exchange_send(rdma, &head,
1925                                 (uint8_t *) &comp, NULL, NULL, NULL, errp);
1926 
1927                 if (ret < 0) {
1928                     return -1;
1929                 }
1930 
1931                 /*
1932                  * TODO: Here we are sending something, but we are not
1933                  * accounting for anything transferred.  The following is wrong:
1934                  *
1935                  * stat64_add(&mig_stats.rdma_bytes, sge.length);
1936                  *
1937                  * because we are using some kind of compression.  I
1938                  * would think that head.len would be the more similar
1939                  * thing to a correct value.
1940                  */
1941                 stat64_add(&mig_stats.zero_pages,
1942                            sge.length / qemu_target_page_size());
1943                 return 1;
1944             }
1945 
1946             /*
1947              * Otherwise, tell other side to register.
1948              */
1949             reg.current_index = current_index;
1950             if (block->is_ram_block) {
1951                 reg.key.current_addr = current_addr;
1952             } else {
1953                 reg.key.chunk = chunk;
1954             }
1955             reg.chunks = chunks;
1956 
1957             trace_qemu_rdma_write_one_sendreg(chunk, sge.length, current_index,
1958                                               current_addr);
1959 
1960             register_to_network(rdma, &reg);
1961             ret = qemu_rdma_exchange_send(rdma, &head, (uint8_t *) &reg,
1962                                     &resp, &reg_result_idx, NULL, errp);
1963             if (ret < 0) {
1964                 return -1;
1965             }
1966 
1967             /* try to overlap this single registration with the one we sent. */
1968             if (qemu_rdma_register_and_get_keys(rdma, block, sge.addr,
1969                                                 &sge.lkey, NULL, chunk,
1970                                                 chunk_start, chunk_end)) {
1971                 error_setg(errp, "cannot get lkey");
1972                 return -1;
1973             }
1974 
1975             reg_result = (RDMARegisterResult *)
1976                     rdma->wr_data[reg_result_idx].control_curr;
1977 
1978             network_to_result(reg_result);
1979 
1980             trace_qemu_rdma_write_one_recvregres(block->remote_keys[chunk],
1981                                                  reg_result->rkey, chunk);
1982 
1983             block->remote_keys[chunk] = reg_result->rkey;
1984             block->remote_host_addr = reg_result->host_addr;
1985         } else {
1986             /* already registered before */
1987             if (qemu_rdma_register_and_get_keys(rdma, block, sge.addr,
1988                                                 &sge.lkey, NULL, chunk,
1989                                                 chunk_start, chunk_end)) {
1990                 error_setg(errp, "cannot get lkey!");
1991                 return -1;
1992             }
1993         }
1994 
1995         send_wr.wr.rdma.rkey = block->remote_keys[chunk];
1996     } else {
1997         send_wr.wr.rdma.rkey = block->remote_rkey;
1998 
1999         if (qemu_rdma_register_and_get_keys(rdma, block, sge.addr,
2000                                                      &sge.lkey, NULL, chunk,
2001                                                      chunk_start, chunk_end)) {
2002             error_setg(errp, "cannot get lkey!");
2003             return -1;
2004         }
2005     }
2006 
2007     /*
2008      * Encode the ram block index and chunk within this wrid.
2009      * We will use this information at the time of completion
2010      * to figure out which bitmap to check against and then which
2011      * chunk in the bitmap to look for.
2012      */
2013     send_wr.wr_id = qemu_rdma_make_wrid(RDMA_WRID_RDMA_WRITE,
2014                                         current_index, chunk);
2015 
2016     send_wr.opcode = IBV_WR_RDMA_WRITE;
2017     send_wr.send_flags = IBV_SEND_SIGNALED;
2018     send_wr.sg_list = &sge;
2019     send_wr.num_sge = 1;
2020     send_wr.wr.rdma.remote_addr = block->remote_host_addr +
2021                                 (current_addr - block->offset);
2022 
2023     trace_qemu_rdma_write_one_post(chunk, sge.addr, send_wr.wr.rdma.remote_addr,
2024                                    sge.length);
2025 
2026     /*
2027      * ibv_post_send() does not return negative error numbers,
2028      * per the specification they are positive - no idea why.
2029      */
2030     ret = ibv_post_send(rdma->qp, &send_wr, &bad_wr);
2031 
2032     if (ret == ENOMEM) {
2033         trace_qemu_rdma_write_one_queue_full();
2034         ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE, NULL);
2035         if (ret < 0) {
2036             error_setg(errp, "rdma migration: failed to make "
2037                          "room in full send queue!");
2038             return -1;
2039         }
2040 
2041         goto retry;
2042 
2043     } else if (ret > 0) {
2044         error_setg_errno(errp, ret,
2045                          "rdma migration: post rdma write failed");
2046         return -1;
2047     }
2048 
2049     set_bit(chunk, block->transit_bitmap);
2050     stat64_add(&mig_stats.normal_pages, sge.length / qemu_target_page_size());
2051     /*
2052      * We are adding to transferred the amount of data written, but no
2053      * overhead at all.  I will assume that RDMA is magicaly and don't
2054      * need to transfer (at least) the addresses where it wants to
2055      * write the pages.  Here it looks like it should be something
2056      * like:
2057      *     sizeof(send_wr) + sge.length
2058      * but this being RDMA, who knows.
2059      */
2060     stat64_add(&mig_stats.rdma_bytes, sge.length);
2061     ram_transferred_add(sge.length);
2062     rdma->total_writes++;
2063 
2064     return 0;
2065 }
2066 
2067 /*
2068  * Push out any unwritten RDMA operations.
2069  *
2070  * We support sending out multiple chunks at the same time.
2071  * Not all of them need to get signaled in the completion queue.
2072  */
qemu_rdma_write_flush(RDMAContext * rdma,Error ** errp)2073 static int qemu_rdma_write_flush(RDMAContext *rdma, Error **errp)
2074 {
2075     int ret;
2076 
2077     if (!rdma->current_length) {
2078         return 0;
2079     }
2080 
2081     ret = qemu_rdma_write_one(rdma, rdma->current_index, rdma->current_addr,
2082                               rdma->current_length, errp);
2083 
2084     if (ret < 0) {
2085         return -1;
2086     }
2087 
2088     if (ret == 0) {
2089         rdma->nb_sent++;
2090         trace_qemu_rdma_write_flush(rdma->nb_sent);
2091     }
2092 
2093     rdma->current_length = 0;
2094     rdma->current_addr = 0;
2095 
2096     return 0;
2097 }
2098 
qemu_rdma_buffer_mergeable(RDMAContext * rdma,uint64_t offset,uint64_t len)2099 static inline bool qemu_rdma_buffer_mergeable(RDMAContext *rdma,
2100                     uint64_t offset, uint64_t len)
2101 {
2102     RDMALocalBlock *block;
2103     uint8_t *host_addr;
2104     uint8_t *chunk_end;
2105 
2106     if (rdma->current_index < 0) {
2107         return false;
2108     }
2109 
2110     if (rdma->current_chunk < 0) {
2111         return false;
2112     }
2113 
2114     block = &(rdma->local_ram_blocks.block[rdma->current_index]);
2115     host_addr = block->local_host_addr + (offset - block->offset);
2116     chunk_end = ram_chunk_end(block, rdma->current_chunk);
2117 
2118     if (rdma->current_length == 0) {
2119         return false;
2120     }
2121 
2122     /*
2123      * Only merge into chunk sequentially.
2124      */
2125     if (offset != (rdma->current_addr + rdma->current_length)) {
2126         return false;
2127     }
2128 
2129     if (offset < block->offset) {
2130         return false;
2131     }
2132 
2133     if ((offset + len) > (block->offset + block->length)) {
2134         return false;
2135     }
2136 
2137     if ((host_addr + len) > chunk_end) {
2138         return false;
2139     }
2140 
2141     return true;
2142 }
2143 
2144 /*
2145  * We're not actually writing here, but doing three things:
2146  *
2147  * 1. Identify the chunk the buffer belongs to.
2148  * 2. If the chunk is full or the buffer doesn't belong to the current
2149  *    chunk, then start a new chunk and flush() the old chunk.
2150  * 3. To keep the hardware busy, we also group chunks into batches
2151  *    and only require that a batch gets acknowledged in the completion
2152  *    queue instead of each individual chunk.
2153  */
qemu_rdma_write(RDMAContext * rdma,uint64_t block_offset,uint64_t offset,uint64_t len,Error ** errp)2154 static int qemu_rdma_write(RDMAContext *rdma,
2155                            uint64_t block_offset, uint64_t offset,
2156                            uint64_t len, Error **errp)
2157 {
2158     uint64_t current_addr = block_offset + offset;
2159     uint64_t index = rdma->current_index;
2160     uint64_t chunk = rdma->current_chunk;
2161 
2162     /* If we cannot merge it, we flush the current buffer first. */
2163     if (!qemu_rdma_buffer_mergeable(rdma, current_addr, len)) {
2164         if (qemu_rdma_write_flush(rdma, errp) < 0) {
2165             return -1;
2166         }
2167         rdma->current_length = 0;
2168         rdma->current_addr = current_addr;
2169 
2170         qemu_rdma_search_ram_block(rdma, block_offset,
2171                                    offset, len, &index, &chunk);
2172         rdma->current_index = index;
2173         rdma->current_chunk = chunk;
2174     }
2175 
2176     /* merge it */
2177     rdma->current_length += len;
2178 
2179     /* flush it if buffer is too large */
2180     if (rdma->current_length >= RDMA_MERGE_MAX) {
2181         return qemu_rdma_write_flush(rdma, errp);
2182     }
2183 
2184     return 0;
2185 }
2186 
qemu_rdma_cleanup(RDMAContext * rdma)2187 static void qemu_rdma_cleanup(RDMAContext *rdma)
2188 {
2189     Error *err = NULL;
2190 
2191     if (rdma->cm_id && rdma->connected) {
2192         if ((rdma->errored ||
2193              migrate_get_current()->state == MIGRATION_STATUS_CANCELLING) &&
2194             !rdma->received_error) {
2195             RDMAControlHeader head = { .len = 0,
2196                                        .type = RDMA_CONTROL_ERROR,
2197                                        .repeat = 1,
2198                                      };
2199             warn_report("Early error. Sending error.");
2200             if (qemu_rdma_post_send_control(rdma, NULL, &head, &err) < 0) {
2201                 warn_report_err(err);
2202             }
2203         }
2204 
2205         rdma_disconnect(rdma->cm_id);
2206         trace_qemu_rdma_cleanup_disconnect();
2207         rdma->connected = false;
2208     }
2209 
2210     if (rdma->channel) {
2211         qemu_set_fd_handler(rdma->channel->fd, NULL, NULL, NULL);
2212     }
2213     g_free(rdma->dest_blocks);
2214     rdma->dest_blocks = NULL;
2215 
2216     for (int i = 0; i < RDMA_WRID_MAX; i++) {
2217         if (rdma->wr_data[i].control_mr) {
2218             rdma->total_registrations--;
2219             ibv_dereg_mr(rdma->wr_data[i].control_mr);
2220         }
2221         rdma->wr_data[i].control_mr = NULL;
2222     }
2223 
2224     if (rdma->local_ram_blocks.block) {
2225         while (rdma->local_ram_blocks.nb_blocks) {
2226             rdma_delete_block(rdma, &rdma->local_ram_blocks.block[0]);
2227         }
2228     }
2229 
2230     if (rdma->qp) {
2231         rdma_destroy_qp(rdma->cm_id);
2232         rdma->qp = NULL;
2233     }
2234     if (rdma->recv_cq) {
2235         ibv_destroy_cq(rdma->recv_cq);
2236         rdma->recv_cq = NULL;
2237     }
2238     if (rdma->send_cq) {
2239         ibv_destroy_cq(rdma->send_cq);
2240         rdma->send_cq = NULL;
2241     }
2242     if (rdma->recv_comp_channel) {
2243         ibv_destroy_comp_channel(rdma->recv_comp_channel);
2244         rdma->recv_comp_channel = NULL;
2245     }
2246     if (rdma->send_comp_channel) {
2247         ibv_destroy_comp_channel(rdma->send_comp_channel);
2248         rdma->send_comp_channel = NULL;
2249     }
2250     if (rdma->pd) {
2251         ibv_dealloc_pd(rdma->pd);
2252         rdma->pd = NULL;
2253     }
2254     if (rdma->cm_id) {
2255         rdma_destroy_id(rdma->cm_id);
2256         rdma->cm_id = NULL;
2257     }
2258 
2259     /* the destination side, listen_id and channel is shared */
2260     if (rdma->listen_id) {
2261         if (!rdma->is_return_path) {
2262             rdma_destroy_id(rdma->listen_id);
2263         }
2264         rdma->listen_id = NULL;
2265 
2266         if (rdma->channel) {
2267             if (!rdma->is_return_path) {
2268                 rdma_destroy_event_channel(rdma->channel);
2269             }
2270             rdma->channel = NULL;
2271         }
2272     }
2273 
2274     if (rdma->channel) {
2275         rdma_destroy_event_channel(rdma->channel);
2276         rdma->channel = NULL;
2277     }
2278     g_free(rdma->host);
2279     rdma->host = NULL;
2280 }
2281 
2282 
qemu_rdma_source_init(RDMAContext * rdma,bool pin_all,Error ** errp)2283 static int qemu_rdma_source_init(RDMAContext *rdma, bool pin_all, Error **errp)
2284 {
2285     int ret;
2286 
2287     /*
2288      * Will be validated against destination's actual capabilities
2289      * after the connect() completes.
2290      */
2291     rdma->pin_all = pin_all;
2292 
2293     ret = qemu_rdma_resolve_host(rdma, errp);
2294     if (ret < 0) {
2295         goto err_rdma_source_init;
2296     }
2297 
2298     ret = qemu_rdma_alloc_pd_cq(rdma, errp);
2299     if (ret < 0) {
2300         goto err_rdma_source_init;
2301     }
2302 
2303     ret = qemu_rdma_alloc_qp(rdma);
2304     if (ret < 0) {
2305         error_setg(errp, "RDMA ERROR: rdma migration: error allocating qp!");
2306         goto err_rdma_source_init;
2307     }
2308 
2309     qemu_rdma_init_ram_blocks(rdma);
2310 
2311     /* Build the hash that maps from offset to RAMBlock */
2312     rdma->blockmap = g_hash_table_new(g_direct_hash, g_direct_equal);
2313     for (int i = 0; i < rdma->local_ram_blocks.nb_blocks; i++) {
2314         g_hash_table_insert(rdma->blockmap,
2315                 (void *)(uintptr_t)rdma->local_ram_blocks.block[i].offset,
2316                 &rdma->local_ram_blocks.block[i]);
2317     }
2318 
2319     for (int i = 0; i < RDMA_WRID_MAX; i++) {
2320         ret = qemu_rdma_reg_control(rdma, i);
2321         if (ret < 0) {
2322             error_setg(errp, "RDMA ERROR: rdma migration: error "
2323                        "registering %d control!", i);
2324             goto err_rdma_source_init;
2325         }
2326     }
2327 
2328     return 0;
2329 
2330 err_rdma_source_init:
2331     qemu_rdma_cleanup(rdma);
2332     return -1;
2333 }
2334 
qemu_get_cm_event_timeout(RDMAContext * rdma,struct rdma_cm_event ** cm_event,long msec,Error ** errp)2335 static int qemu_get_cm_event_timeout(RDMAContext *rdma,
2336                                      struct rdma_cm_event **cm_event,
2337                                      long msec, Error **errp)
2338 {
2339     int ret;
2340     struct pollfd poll_fd = {
2341                                 .fd = rdma->channel->fd,
2342                                 .events = POLLIN,
2343                                 .revents = 0
2344                             };
2345 
2346     do {
2347         ret = poll(&poll_fd, 1, msec);
2348     } while (ret < 0 && errno == EINTR);
2349 
2350     if (ret == 0) {
2351         error_setg(errp, "RDMA ERROR: poll cm event timeout");
2352         return -1;
2353     } else if (ret < 0) {
2354         error_setg(errp, "RDMA ERROR: failed to poll cm event, errno=%i",
2355                    errno);
2356         return -1;
2357     } else if (poll_fd.revents & POLLIN) {
2358         if (rdma_get_cm_event(rdma->channel, cm_event) < 0) {
2359             error_setg(errp, "RDMA ERROR: failed to get cm event");
2360             return -1;
2361         }
2362         return 0;
2363     } else {
2364         error_setg(errp, "RDMA ERROR: no POLLIN event, revent=%x",
2365                    poll_fd.revents);
2366         return -1;
2367     }
2368 }
2369 
qemu_rdma_connect(RDMAContext * rdma,bool return_path,Error ** errp)2370 static int qemu_rdma_connect(RDMAContext *rdma, bool return_path,
2371                              Error **errp)
2372 {
2373     RDMACapabilities cap = {
2374                                 .version = RDMA_CONTROL_VERSION_CURRENT,
2375                                 .flags = 0,
2376                            };
2377     struct rdma_conn_param conn_param = { .initiator_depth = 2,
2378                                           .retry_count = 5,
2379                                           .private_data = &cap,
2380                                           .private_data_len = sizeof(cap),
2381                                         };
2382     struct rdma_cm_event *cm_event;
2383     int ret;
2384 
2385     /*
2386      * Only negotiate the capability with destination if the user
2387      * on the source first requested the capability.
2388      */
2389     if (rdma->pin_all) {
2390         trace_qemu_rdma_connect_pin_all_requested();
2391         cap.flags |= RDMA_CAPABILITY_PIN_ALL;
2392     }
2393 
2394     caps_to_network(&cap);
2395 
2396     ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY, errp);
2397     if (ret < 0) {
2398         goto err_rdma_source_connect;
2399     }
2400 
2401     ret = rdma_connect(rdma->cm_id, &conn_param);
2402     if (ret < 0) {
2403         error_setg_errno(errp, errno,
2404                          "RDMA ERROR: connecting to destination!");
2405         goto err_rdma_source_connect;
2406     }
2407 
2408     if (return_path) {
2409         ret = qemu_get_cm_event_timeout(rdma, &cm_event, 5000, errp);
2410     } else {
2411         ret = rdma_get_cm_event(rdma->channel, &cm_event);
2412         if (ret < 0) {
2413             error_setg_errno(errp, errno,
2414                              "RDMA ERROR: failed to get cm event");
2415         }
2416     }
2417     if (ret < 0) {
2418         goto err_rdma_source_connect;
2419     }
2420 
2421     if (cm_event->event != RDMA_CM_EVENT_ESTABLISHED) {
2422         error_setg(errp, "RDMA ERROR: connecting to destination!");
2423         rdma_ack_cm_event(cm_event);
2424         goto err_rdma_source_connect;
2425     }
2426     rdma->connected = true;
2427 
2428     memcpy(&cap, cm_event->param.conn.private_data, sizeof(cap));
2429     network_to_caps(&cap);
2430 
2431     /*
2432      * Verify that the *requested* capabilities are supported by the destination
2433      * and disable them otherwise.
2434      */
2435     if (rdma->pin_all && !(cap.flags & RDMA_CAPABILITY_PIN_ALL)) {
2436         warn_report("RDMA: Server cannot support pinning all memory. "
2437                     "Will register memory dynamically.");
2438         rdma->pin_all = false;
2439     }
2440 
2441     trace_qemu_rdma_connect_pin_all_outcome(rdma->pin_all);
2442 
2443     rdma_ack_cm_event(cm_event);
2444 
2445     rdma->control_ready_expected = 1;
2446     rdma->nb_sent = 0;
2447     return 0;
2448 
2449 err_rdma_source_connect:
2450     qemu_rdma_cleanup(rdma);
2451     return -1;
2452 }
2453 
qemu_rdma_dest_init(RDMAContext * rdma,Error ** errp)2454 static int qemu_rdma_dest_init(RDMAContext *rdma, Error **errp)
2455 {
2456     int ret;
2457     struct rdma_cm_id *listen_id;
2458     char ip[40] = "unknown";
2459     struct rdma_addrinfo *res, *e;
2460     char port_str[16];
2461     int reuse = 1;
2462 
2463     for (int i = 0; i < RDMA_WRID_MAX; i++) {
2464         rdma->wr_data[i].control_len = 0;
2465         rdma->wr_data[i].control_curr = NULL;
2466     }
2467 
2468     if (!rdma->host || !rdma->host[0]) {
2469         error_setg(errp, "RDMA ERROR: RDMA host is not set!");
2470         rdma->errored = true;
2471         return -1;
2472     }
2473     /* create CM channel */
2474     rdma->channel = rdma_create_event_channel();
2475     if (!rdma->channel) {
2476         error_setg(errp, "RDMA ERROR: could not create rdma event channel");
2477         rdma->errored = true;
2478         return -1;
2479     }
2480 
2481     /* create CM id */
2482     ret = rdma_create_id(rdma->channel, &listen_id, NULL, RDMA_PS_TCP);
2483     if (ret < 0) {
2484         error_setg(errp, "RDMA ERROR: could not create cm_id!");
2485         goto err_dest_init_create_listen_id;
2486     }
2487 
2488     snprintf(port_str, 16, "%d", rdma->port);
2489     port_str[15] = '\0';
2490 
2491     ret = rdma_getaddrinfo(rdma->host, port_str, NULL, &res);
2492     if (ret) {
2493         error_setg(errp, "RDMA ERROR: could not rdma_getaddrinfo address %s",
2494                    rdma->host);
2495         goto err_dest_init_bind_addr;
2496     }
2497 
2498     ret = rdma_set_option(listen_id, RDMA_OPTION_ID, RDMA_OPTION_ID_REUSEADDR,
2499                           &reuse, sizeof reuse);
2500     if (ret < 0) {
2501         error_setg(errp, "RDMA ERROR: Error: could not set REUSEADDR option");
2502         goto err_dest_init_bind_addr;
2503     }
2504 
2505     /* Try all addresses */
2506     for (e = res; e != NULL; e = e->ai_next) {
2507 
2508         inet_ntop(e->ai_family,
2509             &((struct sockaddr_in *) e->ai_dst_addr)->sin_addr, ip, sizeof ip);
2510         trace_qemu_rdma_dest_init_trying(rdma->host, ip);
2511         ret = rdma_bind_addr(listen_id, e->ai_dst_addr);
2512         if (ret < 0) {
2513             continue;
2514         }
2515         break;
2516     }
2517 
2518     rdma_freeaddrinfo(res);
2519     if (!e) {
2520         error_setg(errp, "RDMA ERROR: Error: could not rdma_bind_addr!");
2521         goto err_dest_init_bind_addr;
2522     }
2523 
2524     rdma->listen_id = listen_id;
2525     qemu_rdma_dump_gid("dest_init", listen_id);
2526     return 0;
2527 
2528 err_dest_init_bind_addr:
2529     rdma_destroy_id(listen_id);
2530 err_dest_init_create_listen_id:
2531     rdma_destroy_event_channel(rdma->channel);
2532     rdma->channel = NULL;
2533     rdma->errored = true;
2534     return -1;
2535 
2536 }
2537 
qemu_rdma_return_path_dest_init(RDMAContext * rdma_return_path,RDMAContext * rdma)2538 static void qemu_rdma_return_path_dest_init(RDMAContext *rdma_return_path,
2539                                             RDMAContext *rdma)
2540 {
2541     for (int i = 0; i < RDMA_WRID_MAX; i++) {
2542         rdma_return_path->wr_data[i].control_len = 0;
2543         rdma_return_path->wr_data[i].control_curr = NULL;
2544     }
2545 
2546     /*the CM channel and CM id is shared*/
2547     rdma_return_path->channel = rdma->channel;
2548     rdma_return_path->listen_id = rdma->listen_id;
2549 
2550     rdma->return_path = rdma_return_path;
2551     rdma_return_path->return_path = rdma;
2552     rdma_return_path->is_return_path = true;
2553 }
2554 
qemu_rdma_data_init(InetSocketAddress * saddr,Error ** errp)2555 static RDMAContext *qemu_rdma_data_init(InetSocketAddress *saddr, Error **errp)
2556 {
2557     RDMAContext *rdma = NULL;
2558 
2559     rdma = g_new0(RDMAContext, 1);
2560     rdma->current_index = -1;
2561     rdma->current_chunk = -1;
2562 
2563     rdma->host = g_strdup(saddr->host);
2564     rdma->port = atoi(saddr->port);
2565     return rdma;
2566 }
2567 
2568 /*
2569  * QEMUFile interface to the control channel.
2570  * SEND messages for control only.
2571  * VM's ram is handled with regular RDMA messages.
2572  */
qio_channel_rdma_writev(QIOChannel * ioc,const struct iovec * iov,size_t niov,int * fds,size_t nfds,int flags,Error ** errp)2573 static ssize_t qio_channel_rdma_writev(QIOChannel *ioc,
2574                                        const struct iovec *iov,
2575                                        size_t niov,
2576                                        int *fds,
2577                                        size_t nfds,
2578                                        int flags,
2579                                        Error **errp)
2580 {
2581     QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
2582     RDMAContext *rdma;
2583     int ret;
2584     ssize_t done = 0;
2585     size_t len;
2586 
2587     RCU_READ_LOCK_GUARD();
2588     rdma = qatomic_rcu_read(&rioc->rdmaout);
2589 
2590     if (!rdma) {
2591         error_setg(errp, "RDMA control channel output is not set");
2592         return -1;
2593     }
2594 
2595     if (rdma->errored) {
2596         error_setg(errp,
2597                    "RDMA is in an error state waiting migration to abort!");
2598         return -1;
2599     }
2600 
2601     /*
2602      * Push out any writes that
2603      * we're queued up for VM's ram.
2604      */
2605     ret = qemu_rdma_write_flush(rdma, errp);
2606     if (ret < 0) {
2607         rdma->errored = true;
2608         return -1;
2609     }
2610 
2611     for (int i = 0; i < niov; i++) {
2612         size_t remaining = iov[i].iov_len;
2613         uint8_t * data = (void *)iov[i].iov_base;
2614         while (remaining) {
2615             RDMAControlHeader head = {};
2616 
2617             len = MIN(remaining, RDMA_SEND_INCREMENT);
2618             remaining -= len;
2619 
2620             head.len = len;
2621             head.type = RDMA_CONTROL_QEMU_FILE;
2622 
2623             ret = qemu_rdma_exchange_send(rdma, &head,
2624                                           data, NULL, NULL, NULL, errp);
2625 
2626             if (ret < 0) {
2627                 rdma->errored = true;
2628                 return -1;
2629             }
2630 
2631             data += len;
2632             done += len;
2633         }
2634     }
2635 
2636     return done;
2637 }
2638 
qemu_rdma_fill(RDMAContext * rdma,uint8_t * buf,size_t size,int idx)2639 static size_t qemu_rdma_fill(RDMAContext *rdma, uint8_t *buf,
2640                              size_t size, int idx)
2641 {
2642     size_t len = 0;
2643 
2644     if (rdma->wr_data[idx].control_len) {
2645         trace_qemu_rdma_fill(rdma->wr_data[idx].control_len, size);
2646 
2647         len = MIN(size, rdma->wr_data[idx].control_len);
2648         memcpy(buf, rdma->wr_data[idx].control_curr, len);
2649         rdma->wr_data[idx].control_curr += len;
2650         rdma->wr_data[idx].control_len -= len;
2651     }
2652 
2653     return len;
2654 }
2655 
2656 /*
2657  * QEMUFile interface to the control channel.
2658  * RDMA links don't use bytestreams, so we have to
2659  * return bytes to QEMUFile opportunistically.
2660  */
qio_channel_rdma_readv(QIOChannel * ioc,const struct iovec * iov,size_t niov,int ** fds,size_t * nfds,int flags,Error ** errp)2661 static ssize_t qio_channel_rdma_readv(QIOChannel *ioc,
2662                                       const struct iovec *iov,
2663                                       size_t niov,
2664                                       int **fds,
2665                                       size_t *nfds,
2666                                       int flags,
2667                                       Error **errp)
2668 {
2669     QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
2670     RDMAContext *rdma;
2671     RDMAControlHeader head;
2672     int ret;
2673     ssize_t done = 0;
2674     size_t len;
2675 
2676     RCU_READ_LOCK_GUARD();
2677     rdma = qatomic_rcu_read(&rioc->rdmain);
2678 
2679     if (!rdma) {
2680         error_setg(errp, "RDMA control channel input is not set");
2681         return -1;
2682     }
2683 
2684     if (rdma->errored) {
2685         error_setg(errp,
2686                    "RDMA is in an error state waiting migration to abort!");
2687         return -1;
2688     }
2689 
2690     for (int i = 0; i < niov; i++) {
2691         size_t want = iov[i].iov_len;
2692         uint8_t *data = (void *)iov[i].iov_base;
2693 
2694         /*
2695          * First, we hold on to the last SEND message we
2696          * were given and dish out the bytes until we run
2697          * out of bytes.
2698          */
2699         len = qemu_rdma_fill(rdma, data, want, 0);
2700         done += len;
2701         want -= len;
2702         /* Got what we needed, so go to next iovec */
2703         if (want == 0) {
2704             continue;
2705         }
2706 
2707         /* If we got any data so far, then don't wait
2708          * for more, just return what we have */
2709         if (done > 0) {
2710             break;
2711         }
2712 
2713 
2714         /* We've got nothing at all, so lets wait for
2715          * more to arrive
2716          */
2717         ret = qemu_rdma_exchange_recv(rdma, &head, RDMA_CONTROL_QEMU_FILE,
2718                                       errp);
2719 
2720         if (ret < 0) {
2721             rdma->errored = true;
2722             return -1;
2723         }
2724 
2725         /*
2726          * SEND was received with new bytes, now try again.
2727          */
2728         len = qemu_rdma_fill(rdma, data, want, 0);
2729         done += len;
2730         want -= len;
2731 
2732         /* Still didn't get enough, so lets just return */
2733         if (want) {
2734             if (done == 0) {
2735                 return QIO_CHANNEL_ERR_BLOCK;
2736             } else {
2737                 break;
2738             }
2739         }
2740     }
2741     return done;
2742 }
2743 
2744 /*
2745  * Block until all the outstanding chunks have been delivered by the hardware.
2746  */
qemu_rdma_drain_cq(RDMAContext * rdma)2747 static int qemu_rdma_drain_cq(RDMAContext *rdma)
2748 {
2749     Error *err = NULL;
2750 
2751     if (qemu_rdma_write_flush(rdma, &err) < 0) {
2752         error_report_err(err);
2753         return -1;
2754     }
2755 
2756     while (rdma->nb_sent) {
2757         if (qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE, NULL) < 0) {
2758             error_report("rdma migration: complete polling error!");
2759             return -1;
2760         }
2761     }
2762 
2763     qemu_rdma_unregister_waiting(rdma);
2764 
2765     return 0;
2766 }
2767 
2768 
qio_channel_rdma_set_blocking(QIOChannel * ioc,bool blocking,Error ** errp)2769 static int qio_channel_rdma_set_blocking(QIOChannel *ioc,
2770                                          bool blocking,
2771                                          Error **errp)
2772 {
2773     QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
2774     /* XXX we should make readv/writev actually honour this :-) */
2775     rioc->blocking = blocking;
2776     return 0;
2777 }
2778 
2779 
2780 typedef struct QIOChannelRDMASource QIOChannelRDMASource;
2781 struct QIOChannelRDMASource {
2782     GSource parent;
2783     QIOChannelRDMA *rioc;
2784     GIOCondition condition;
2785 };
2786 
2787 static gboolean
qio_channel_rdma_source_prepare(GSource * source,gint * timeout)2788 qio_channel_rdma_source_prepare(GSource *source,
2789                                 gint *timeout)
2790 {
2791     QIOChannelRDMASource *rsource = (QIOChannelRDMASource *)source;
2792     RDMAContext *rdma;
2793     GIOCondition cond = 0;
2794     *timeout = -1;
2795 
2796     RCU_READ_LOCK_GUARD();
2797     if (rsource->condition == G_IO_IN) {
2798         rdma = qatomic_rcu_read(&rsource->rioc->rdmain);
2799     } else {
2800         rdma = qatomic_rcu_read(&rsource->rioc->rdmaout);
2801     }
2802 
2803     if (!rdma) {
2804         error_report("RDMAContext is NULL when prepare Gsource");
2805         return FALSE;
2806     }
2807 
2808     if (rdma->wr_data[0].control_len) {
2809         cond |= G_IO_IN;
2810     }
2811     cond |= G_IO_OUT;
2812 
2813     return cond & rsource->condition;
2814 }
2815 
2816 static gboolean
qio_channel_rdma_source_check(GSource * source)2817 qio_channel_rdma_source_check(GSource *source)
2818 {
2819     QIOChannelRDMASource *rsource = (QIOChannelRDMASource *)source;
2820     RDMAContext *rdma;
2821     GIOCondition cond = 0;
2822 
2823     RCU_READ_LOCK_GUARD();
2824     if (rsource->condition == G_IO_IN) {
2825         rdma = qatomic_rcu_read(&rsource->rioc->rdmain);
2826     } else {
2827         rdma = qatomic_rcu_read(&rsource->rioc->rdmaout);
2828     }
2829 
2830     if (!rdma) {
2831         error_report("RDMAContext is NULL when check Gsource");
2832         return FALSE;
2833     }
2834 
2835     if (rdma->wr_data[0].control_len) {
2836         cond |= G_IO_IN;
2837     }
2838     cond |= G_IO_OUT;
2839 
2840     return cond & rsource->condition;
2841 }
2842 
2843 static gboolean
qio_channel_rdma_source_dispatch(GSource * source,GSourceFunc callback,gpointer user_data)2844 qio_channel_rdma_source_dispatch(GSource *source,
2845                                  GSourceFunc callback,
2846                                  gpointer user_data)
2847 {
2848     QIOChannelFunc func = (QIOChannelFunc)callback;
2849     QIOChannelRDMASource *rsource = (QIOChannelRDMASource *)source;
2850     RDMAContext *rdma;
2851     GIOCondition cond = 0;
2852 
2853     RCU_READ_LOCK_GUARD();
2854     if (rsource->condition == G_IO_IN) {
2855         rdma = qatomic_rcu_read(&rsource->rioc->rdmain);
2856     } else {
2857         rdma = qatomic_rcu_read(&rsource->rioc->rdmaout);
2858     }
2859 
2860     if (!rdma) {
2861         error_report("RDMAContext is NULL when dispatch Gsource");
2862         return FALSE;
2863     }
2864 
2865     if (rdma->wr_data[0].control_len) {
2866         cond |= G_IO_IN;
2867     }
2868     cond |= G_IO_OUT;
2869 
2870     return (*func)(QIO_CHANNEL(rsource->rioc),
2871                    (cond & rsource->condition),
2872                    user_data);
2873 }
2874 
2875 static void
qio_channel_rdma_source_finalize(GSource * source)2876 qio_channel_rdma_source_finalize(GSource *source)
2877 {
2878     QIOChannelRDMASource *ssource = (QIOChannelRDMASource *)source;
2879 
2880     object_unref(OBJECT(ssource->rioc));
2881 }
2882 
2883 static GSourceFuncs qio_channel_rdma_source_funcs = {
2884     qio_channel_rdma_source_prepare,
2885     qio_channel_rdma_source_check,
2886     qio_channel_rdma_source_dispatch,
2887     qio_channel_rdma_source_finalize
2888 };
2889 
qio_channel_rdma_create_watch(QIOChannel * ioc,GIOCondition condition)2890 static GSource *qio_channel_rdma_create_watch(QIOChannel *ioc,
2891                                               GIOCondition condition)
2892 {
2893     QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
2894     QIOChannelRDMASource *ssource;
2895     GSource *source;
2896 
2897     source = g_source_new(&qio_channel_rdma_source_funcs,
2898                           sizeof(QIOChannelRDMASource));
2899     ssource = (QIOChannelRDMASource *)source;
2900 
2901     ssource->rioc = rioc;
2902     object_ref(OBJECT(rioc));
2903 
2904     ssource->condition = condition;
2905 
2906     return source;
2907 }
2908 
qio_channel_rdma_set_aio_fd_handler(QIOChannel * ioc,AioContext * read_ctx,IOHandler * io_read,AioContext * write_ctx,IOHandler * io_write,void * opaque)2909 static void qio_channel_rdma_set_aio_fd_handler(QIOChannel *ioc,
2910                                                 AioContext *read_ctx,
2911                                                 IOHandler *io_read,
2912                                                 AioContext *write_ctx,
2913                                                 IOHandler *io_write,
2914                                                 void *opaque)
2915 {
2916     QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
2917     if (io_read) {
2918         aio_set_fd_handler(read_ctx, rioc->rdmain->recv_comp_channel->fd,
2919                            io_read, io_write, NULL, NULL, opaque);
2920         aio_set_fd_handler(read_ctx, rioc->rdmain->send_comp_channel->fd,
2921                            io_read, io_write, NULL, NULL, opaque);
2922     } else {
2923         aio_set_fd_handler(write_ctx, rioc->rdmaout->recv_comp_channel->fd,
2924                            io_read, io_write, NULL, NULL, opaque);
2925         aio_set_fd_handler(write_ctx, rioc->rdmaout->send_comp_channel->fd,
2926                            io_read, io_write, NULL, NULL, opaque);
2927     }
2928 }
2929 
2930 struct rdma_close_rcu {
2931     struct rcu_head rcu;
2932     RDMAContext *rdmain;
2933     RDMAContext *rdmaout;
2934 };
2935 
2936 /* callback from qio_channel_rdma_close via call_rcu */
qio_channel_rdma_close_rcu(struct rdma_close_rcu * rcu)2937 static void qio_channel_rdma_close_rcu(struct rdma_close_rcu *rcu)
2938 {
2939     if (rcu->rdmain) {
2940         qemu_rdma_cleanup(rcu->rdmain);
2941     }
2942 
2943     if (rcu->rdmaout) {
2944         qemu_rdma_cleanup(rcu->rdmaout);
2945     }
2946 
2947     g_free(rcu->rdmain);
2948     g_free(rcu->rdmaout);
2949     g_free(rcu);
2950 }
2951 
qio_channel_rdma_close(QIOChannel * ioc,Error ** errp)2952 static int qio_channel_rdma_close(QIOChannel *ioc,
2953                                   Error **errp)
2954 {
2955     QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
2956     RDMAContext *rdmain, *rdmaout;
2957     struct rdma_close_rcu *rcu = g_new(struct rdma_close_rcu, 1);
2958 
2959     trace_qemu_rdma_close();
2960 
2961     rdmain = rioc->rdmain;
2962     if (rdmain) {
2963         qatomic_rcu_set(&rioc->rdmain, NULL);
2964     }
2965 
2966     rdmaout = rioc->rdmaout;
2967     if (rdmaout) {
2968         qatomic_rcu_set(&rioc->rdmaout, NULL);
2969     }
2970 
2971     rcu->rdmain = rdmain;
2972     rcu->rdmaout = rdmaout;
2973     call_rcu(rcu, qio_channel_rdma_close_rcu, rcu);
2974 
2975     return 0;
2976 }
2977 
2978 static int
qio_channel_rdma_shutdown(QIOChannel * ioc,QIOChannelShutdown how,Error ** errp)2979 qio_channel_rdma_shutdown(QIOChannel *ioc,
2980                             QIOChannelShutdown how,
2981                             Error **errp)
2982 {
2983     QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
2984     RDMAContext *rdmain, *rdmaout;
2985 
2986     RCU_READ_LOCK_GUARD();
2987 
2988     rdmain = qatomic_rcu_read(&rioc->rdmain);
2989     rdmaout = qatomic_rcu_read(&rioc->rdmain);
2990 
2991     switch (how) {
2992     case QIO_CHANNEL_SHUTDOWN_READ:
2993         if (rdmain) {
2994             rdmain->errored = true;
2995         }
2996         break;
2997     case QIO_CHANNEL_SHUTDOWN_WRITE:
2998         if (rdmaout) {
2999             rdmaout->errored = true;
3000         }
3001         break;
3002     case QIO_CHANNEL_SHUTDOWN_BOTH:
3003     default:
3004         if (rdmain) {
3005             rdmain->errored = true;
3006         }
3007         if (rdmaout) {
3008             rdmaout->errored = true;
3009         }
3010         break;
3011     }
3012 
3013     return 0;
3014 }
3015 
3016 /*
3017  * Parameters:
3018  *    @offset == 0 :
3019  *        This means that 'block_offset' is a full virtual address that does not
3020  *        belong to a RAMBlock of the virtual machine and instead
3021  *        represents a private malloc'd memory area that the caller wishes to
3022  *        transfer.
3023  *
3024  *    @offset != 0 :
3025  *        Offset is an offset to be added to block_offset and used
3026  *        to also lookup the corresponding RAMBlock.
3027  *
3028  *    @size : Number of bytes to transfer
3029  *
3030  *    @pages_sent : User-specificed pointer to indicate how many pages were
3031  *                  sent. Usually, this will not be more than a few bytes of
3032  *                  the protocol because most transfers are sent asynchronously.
3033  */
qemu_rdma_save_page(QEMUFile * f,ram_addr_t block_offset,ram_addr_t offset,size_t size)3034 static int qemu_rdma_save_page(QEMUFile *f, ram_addr_t block_offset,
3035                                ram_addr_t offset, size_t size)
3036 {
3037     QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(qemu_file_get_ioc(f));
3038     Error *err = NULL;
3039     RDMAContext *rdma;
3040     int ret;
3041 
3042     RCU_READ_LOCK_GUARD();
3043     rdma = qatomic_rcu_read(&rioc->rdmaout);
3044 
3045     if (!rdma) {
3046         return -1;
3047     }
3048 
3049     if (rdma_errored(rdma)) {
3050         return -1;
3051     }
3052 
3053     qemu_fflush(f);
3054 
3055     /*
3056      * Add this page to the current 'chunk'. If the chunk
3057      * is full, or the page doesn't belong to the current chunk,
3058      * an actual RDMA write will occur and a new chunk will be formed.
3059      */
3060     ret = qemu_rdma_write(rdma, block_offset, offset, size, &err);
3061     if (ret < 0) {
3062         error_report_err(err);
3063         goto err;
3064     }
3065 
3066     /*
3067      * Drain the Completion Queue if possible, but do not block,
3068      * just poll.
3069      *
3070      * If nothing to poll, the end of the iteration will do this
3071      * again to make sure we don't overflow the request queue.
3072      */
3073     while (1) {
3074         uint64_t wr_id, wr_id_in;
3075         ret = qemu_rdma_poll(rdma, rdma->recv_cq, &wr_id_in, NULL);
3076 
3077         if (ret < 0) {
3078             error_report("rdma migration: polling error");
3079             goto err;
3080         }
3081 
3082         wr_id = wr_id_in & RDMA_WRID_TYPE_MASK;
3083 
3084         if (wr_id == RDMA_WRID_NONE) {
3085             break;
3086         }
3087     }
3088 
3089     while (1) {
3090         uint64_t wr_id, wr_id_in;
3091         ret = qemu_rdma_poll(rdma, rdma->send_cq, &wr_id_in, NULL);
3092 
3093         if (ret < 0) {
3094             error_report("rdma migration: polling error");
3095             goto err;
3096         }
3097 
3098         wr_id = wr_id_in & RDMA_WRID_TYPE_MASK;
3099 
3100         if (wr_id == RDMA_WRID_NONE) {
3101             break;
3102         }
3103     }
3104 
3105     return RAM_SAVE_CONTROL_DELAYED;
3106 
3107 err:
3108     rdma->errored = true;
3109     return -1;
3110 }
3111 
rdma_control_save_page(QEMUFile * f,ram_addr_t block_offset,ram_addr_t offset,size_t size)3112 int rdma_control_save_page(QEMUFile *f, ram_addr_t block_offset,
3113                            ram_addr_t offset, size_t size)
3114 {
3115     assert(migrate_rdma());
3116 
3117     int ret = qemu_rdma_save_page(f, block_offset, offset, size);
3118 
3119     if (ret != RAM_SAVE_CONTROL_DELAYED) {
3120         if (ret < 0) {
3121             qemu_file_set_error(f, ret);
3122         }
3123     }
3124     return ret;
3125 }
3126 
3127 static void rdma_accept_incoming_migration(void *opaque);
3128 
rdma_cm_poll_handler(void * opaque)3129 static void rdma_cm_poll_handler(void *opaque)
3130 {
3131     RDMAContext *rdma = opaque;
3132     struct rdma_cm_event *cm_event;
3133     MigrationIncomingState *mis = migration_incoming_get_current();
3134 
3135     if (rdma_get_cm_event(rdma->channel, &cm_event) < 0) {
3136         error_report("get_cm_event failed %d", errno);
3137         return;
3138     }
3139 
3140     if (cm_event->event == RDMA_CM_EVENT_DISCONNECTED ||
3141         cm_event->event == RDMA_CM_EVENT_DEVICE_REMOVAL) {
3142         if (!rdma->errored &&
3143             migration_incoming_get_current()->state !=
3144               MIGRATION_STATUS_COMPLETED) {
3145             error_report("receive cm event, cm event is %d", cm_event->event);
3146             rdma->errored = true;
3147             if (rdma->return_path) {
3148                 rdma->return_path->errored = true;
3149             }
3150         }
3151         rdma_ack_cm_event(cm_event);
3152         if (mis->loadvm_co) {
3153             qemu_coroutine_enter(mis->loadvm_co);
3154         }
3155         return;
3156     }
3157     rdma_ack_cm_event(cm_event);
3158 }
3159 
qemu_rdma_accept(RDMAContext * rdma)3160 static int qemu_rdma_accept(RDMAContext *rdma)
3161 {
3162     Error *err = NULL;
3163     RDMACapabilities cap;
3164     struct rdma_conn_param conn_param = {
3165                                             .responder_resources = 2,
3166                                             .private_data = &cap,
3167                                             .private_data_len = sizeof(cap),
3168                                          };
3169     RDMAContext *rdma_return_path = NULL;
3170     g_autoptr(InetSocketAddress) isock = g_new0(InetSocketAddress, 1);
3171     struct rdma_cm_event *cm_event;
3172     struct ibv_context *verbs;
3173     int ret;
3174 
3175     ret = rdma_get_cm_event(rdma->channel, &cm_event);
3176     if (ret < 0) {
3177         goto err_rdma_dest_wait;
3178     }
3179 
3180     if (cm_event->event != RDMA_CM_EVENT_CONNECT_REQUEST) {
3181         rdma_ack_cm_event(cm_event);
3182         goto err_rdma_dest_wait;
3183     }
3184 
3185     isock->host = g_strdup(rdma->host);
3186     isock->port = g_strdup_printf("%d", rdma->port);
3187 
3188     /*
3189      * initialize the RDMAContext for return path for postcopy after first
3190      * connection request reached.
3191      */
3192     if ((migrate_postcopy() || migrate_return_path())
3193         && !rdma->is_return_path) {
3194         rdma_return_path = qemu_rdma_data_init(isock, NULL);
3195         if (rdma_return_path == NULL) {
3196             rdma_ack_cm_event(cm_event);
3197             goto err_rdma_dest_wait;
3198         }
3199 
3200         qemu_rdma_return_path_dest_init(rdma_return_path, rdma);
3201     }
3202 
3203     memcpy(&cap, cm_event->param.conn.private_data, sizeof(cap));
3204 
3205     network_to_caps(&cap);
3206 
3207     if (cap.version < 1 || cap.version > RDMA_CONTROL_VERSION_CURRENT) {
3208         error_report("Unknown source RDMA version: %d, bailing...",
3209                      cap.version);
3210         rdma_ack_cm_event(cm_event);
3211         goto err_rdma_dest_wait;
3212     }
3213 
3214     /*
3215      * Respond with only the capabilities this version of QEMU knows about.
3216      */
3217     cap.flags &= known_capabilities;
3218 
3219     /*
3220      * Enable the ones that we do know about.
3221      * Add other checks here as new ones are introduced.
3222      */
3223     if (cap.flags & RDMA_CAPABILITY_PIN_ALL) {
3224         rdma->pin_all = true;
3225     }
3226 
3227     rdma->cm_id = cm_event->id;
3228     verbs = cm_event->id->verbs;
3229 
3230     rdma_ack_cm_event(cm_event);
3231 
3232     trace_qemu_rdma_accept_pin_state(rdma->pin_all);
3233 
3234     caps_to_network(&cap);
3235 
3236     trace_qemu_rdma_accept_pin_verbsc(verbs);
3237 
3238     if (!rdma->verbs) {
3239         rdma->verbs = verbs;
3240     } else if (rdma->verbs != verbs) {
3241         error_report("ibv context not matching %p, %p!", rdma->verbs,
3242                      verbs);
3243         goto err_rdma_dest_wait;
3244     }
3245 
3246     qemu_rdma_dump_id("dest_init", verbs);
3247 
3248     ret = qemu_rdma_alloc_pd_cq(rdma, &err);
3249     if (ret < 0) {
3250         error_report_err(err);
3251         goto err_rdma_dest_wait;
3252     }
3253 
3254     ret = qemu_rdma_alloc_qp(rdma);
3255     if (ret < 0) {
3256         error_report("rdma migration: error allocating qp!");
3257         goto err_rdma_dest_wait;
3258     }
3259 
3260     qemu_rdma_init_ram_blocks(rdma);
3261 
3262     for (int i = 0; i < RDMA_WRID_MAX; i++) {
3263         ret = qemu_rdma_reg_control(rdma, i);
3264         if (ret < 0) {
3265             error_report("rdma: error registering %d control", i);
3266             goto err_rdma_dest_wait;
3267         }
3268     }
3269 
3270     /* Accept the second connection request for return path */
3271     if ((migrate_postcopy() || migrate_return_path())
3272         && !rdma->is_return_path) {
3273         qemu_set_fd_handler(rdma->channel->fd, rdma_accept_incoming_migration,
3274                             NULL,
3275                             (void *)(intptr_t)rdma->return_path);
3276     } else {
3277         qemu_set_fd_handler(rdma->channel->fd, rdma_cm_poll_handler,
3278                             NULL, rdma);
3279     }
3280 
3281     ret = rdma_accept(rdma->cm_id, &conn_param);
3282     if (ret < 0) {
3283         error_report("rdma_accept failed");
3284         goto err_rdma_dest_wait;
3285     }
3286 
3287     ret = rdma_get_cm_event(rdma->channel, &cm_event);
3288     if (ret < 0) {
3289         error_report("rdma_accept get_cm_event failed");
3290         goto err_rdma_dest_wait;
3291     }
3292 
3293     if (cm_event->event != RDMA_CM_EVENT_ESTABLISHED) {
3294         error_report("rdma_accept not event established");
3295         rdma_ack_cm_event(cm_event);
3296         goto err_rdma_dest_wait;
3297     }
3298 
3299     rdma_ack_cm_event(cm_event);
3300     rdma->connected = true;
3301 
3302     ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY, &err);
3303     if (ret < 0) {
3304         error_report_err(err);
3305         goto err_rdma_dest_wait;
3306     }
3307 
3308     qemu_rdma_dump_gid("dest_connect", rdma->cm_id);
3309 
3310     return 0;
3311 
3312 err_rdma_dest_wait:
3313     rdma->errored = true;
3314     qemu_rdma_cleanup(rdma);
3315     g_free(rdma_return_path);
3316     return -1;
3317 }
3318 
dest_ram_sort_func(const void * a,const void * b)3319 static int dest_ram_sort_func(const void *a, const void *b)
3320 {
3321     unsigned int a_index = ((const RDMALocalBlock *)a)->src_index;
3322     unsigned int b_index = ((const RDMALocalBlock *)b)->src_index;
3323 
3324     return (a_index < b_index) ? -1 : (a_index != b_index);
3325 }
3326 
3327 /*
3328  * During each iteration of the migration, we listen for instructions
3329  * by the source VM to perform dynamic page registrations before they
3330  * can perform RDMA operations.
3331  *
3332  * We respond with the 'rkey'.
3333  *
3334  * Keep doing this until the source tells us to stop.
3335  */
rdma_registration_handle(QEMUFile * f)3336 int rdma_registration_handle(QEMUFile *f)
3337 {
3338     RDMAControlHeader reg_resp = { .len = sizeof(RDMARegisterResult),
3339                                .type = RDMA_CONTROL_REGISTER_RESULT,
3340                                .repeat = 0,
3341                              };
3342     RDMAControlHeader unreg_resp = { .len = 0,
3343                                .type = RDMA_CONTROL_UNREGISTER_FINISHED,
3344                                .repeat = 0,
3345                              };
3346     RDMAControlHeader blocks = { .type = RDMA_CONTROL_RAM_BLOCKS_RESULT,
3347                                  .repeat = 1 };
3348     QIOChannelRDMA *rioc;
3349     Error *err = NULL;
3350     RDMAContext *rdma;
3351     RDMALocalBlocks *local;
3352     RDMAControlHeader head;
3353     RDMARegister *reg, *registers;
3354     RDMACompress *comp;
3355     RDMARegisterResult *reg_result;
3356     static RDMARegisterResult results[RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE];
3357     RDMALocalBlock *block;
3358     void *host_addr;
3359     int ret;
3360     int idx = 0;
3361 
3362     if (!migrate_rdma()) {
3363         return 0;
3364     }
3365 
3366     RCU_READ_LOCK_GUARD();
3367     rioc = QIO_CHANNEL_RDMA(qemu_file_get_ioc(f));
3368     rdma = qatomic_rcu_read(&rioc->rdmain);
3369 
3370     if (!rdma) {
3371         return -1;
3372     }
3373 
3374     if (rdma_errored(rdma)) {
3375         return -1;
3376     }
3377 
3378     local = &rdma->local_ram_blocks;
3379     do {
3380         trace_rdma_registration_handle_wait();
3381 
3382         ret = qemu_rdma_exchange_recv(rdma, &head, RDMA_CONTROL_NONE, &err);
3383 
3384         if (ret < 0) {
3385             error_report_err(err);
3386             break;
3387         }
3388 
3389         if (head.repeat > RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE) {
3390             error_report("rdma: Too many requests in this message (%d)."
3391                             "Bailing.", head.repeat);
3392             break;
3393         }
3394 
3395         switch (head.type) {
3396         case RDMA_CONTROL_COMPRESS:
3397             comp = (RDMACompress *) rdma->wr_data[idx].control_curr;
3398             network_to_compress(comp);
3399 
3400             trace_rdma_registration_handle_compress(comp->length,
3401                                                     comp->block_idx,
3402                                                     comp->offset);
3403             if (comp->block_idx >= rdma->local_ram_blocks.nb_blocks) {
3404                 error_report("rdma: 'compress' bad block index %u (vs %d)",
3405                              (unsigned int)comp->block_idx,
3406                              rdma->local_ram_blocks.nb_blocks);
3407                 goto err;
3408             }
3409             block = &(rdma->local_ram_blocks.block[comp->block_idx]);
3410 
3411             host_addr = block->local_host_addr +
3412                             (comp->offset - block->offset);
3413             if (comp->value) {
3414                 error_report("rdma: Zero page with non-zero (%d) value",
3415                              comp->value);
3416                 goto err;
3417             }
3418             ram_handle_zero(host_addr, comp->length);
3419             break;
3420 
3421         case RDMA_CONTROL_REGISTER_FINISHED:
3422             trace_rdma_registration_handle_finished();
3423             return 0;
3424 
3425         case RDMA_CONTROL_RAM_BLOCKS_REQUEST:
3426             trace_rdma_registration_handle_ram_blocks();
3427 
3428             /* Sort our local RAM Block list so it's the same as the source,
3429              * we can do this since we've filled in a src_index in the list
3430              * as we received the RAMBlock list earlier.
3431              */
3432             qsort(rdma->local_ram_blocks.block,
3433                   rdma->local_ram_blocks.nb_blocks,
3434                   sizeof(RDMALocalBlock), dest_ram_sort_func);
3435             for (int i = 0; i < local->nb_blocks; i++) {
3436                 local->block[i].index = i;
3437             }
3438 
3439             if (rdma->pin_all) {
3440                 ret = qemu_rdma_reg_whole_ram_blocks(rdma, &err);
3441                 if (ret < 0) {
3442                     error_report_err(err);
3443                     goto err;
3444                 }
3445             }
3446 
3447             /*
3448              * Dest uses this to prepare to transmit the RAMBlock descriptions
3449              * to the source VM after connection setup.
3450              * Both sides use the "remote" structure to communicate and update
3451              * their "local" descriptions with what was sent.
3452              */
3453             for (int i = 0; i < local->nb_blocks; i++) {
3454                 rdma->dest_blocks[i].remote_host_addr =
3455                     (uintptr_t)(local->block[i].local_host_addr);
3456 
3457                 if (rdma->pin_all) {
3458                     rdma->dest_blocks[i].remote_rkey = local->block[i].mr->rkey;
3459                 }
3460 
3461                 rdma->dest_blocks[i].offset = local->block[i].offset;
3462                 rdma->dest_blocks[i].length = local->block[i].length;
3463 
3464                 dest_block_to_network(&rdma->dest_blocks[i]);
3465                 trace_rdma_registration_handle_ram_blocks_loop(
3466                     local->block[i].block_name,
3467                     local->block[i].offset,
3468                     local->block[i].length,
3469                     local->block[i].local_host_addr,
3470                     local->block[i].src_index);
3471             }
3472 
3473             blocks.len = rdma->local_ram_blocks.nb_blocks
3474                                                 * sizeof(RDMADestBlock);
3475 
3476 
3477             ret = qemu_rdma_post_send_control(rdma,
3478                                     (uint8_t *) rdma->dest_blocks, &blocks,
3479                                     &err);
3480 
3481             if (ret < 0) {
3482                 error_report_err(err);
3483                 goto err;
3484             }
3485 
3486             break;
3487         case RDMA_CONTROL_REGISTER_REQUEST:
3488             trace_rdma_registration_handle_register(head.repeat);
3489 
3490             reg_resp.repeat = head.repeat;
3491             registers = (RDMARegister *) rdma->wr_data[idx].control_curr;
3492 
3493             for (int count = 0; count < head.repeat; count++) {
3494                 uint64_t chunk;
3495                 uint8_t *chunk_start, *chunk_end;
3496 
3497                 reg = &registers[count];
3498                 network_to_register(reg);
3499 
3500                 reg_result = &results[count];
3501 
3502                 trace_rdma_registration_handle_register_loop(count,
3503                          reg->current_index, reg->key.current_addr, reg->chunks);
3504 
3505                 if (reg->current_index >= rdma->local_ram_blocks.nb_blocks) {
3506                     error_report("rdma: 'register' bad block index %u (vs %d)",
3507                                  (unsigned int)reg->current_index,
3508                                  rdma->local_ram_blocks.nb_blocks);
3509                     goto err;
3510                 }
3511                 block = &(rdma->local_ram_blocks.block[reg->current_index]);
3512                 if (block->is_ram_block) {
3513                     if (block->offset > reg->key.current_addr) {
3514                         error_report("rdma: bad register address for block %s"
3515                             " offset: %" PRIx64 " current_addr: %" PRIx64,
3516                             block->block_name, block->offset,
3517                             reg->key.current_addr);
3518                         goto err;
3519                     }
3520                     host_addr = (block->local_host_addr +
3521                                 (reg->key.current_addr - block->offset));
3522                     chunk = ram_chunk_index(block->local_host_addr,
3523                                             (uint8_t *) host_addr);
3524                 } else {
3525                     chunk = reg->key.chunk;
3526                     host_addr = block->local_host_addr +
3527                         (reg->key.chunk * (1UL << RDMA_REG_CHUNK_SHIFT));
3528                     /* Check for particularly bad chunk value */
3529                     if (host_addr < (void *)block->local_host_addr) {
3530                         error_report("rdma: bad chunk for block %s"
3531                             " chunk: %" PRIx64,
3532                             block->block_name, reg->key.chunk);
3533                         goto err;
3534                     }
3535                 }
3536                 chunk_start = ram_chunk_start(block, chunk);
3537                 chunk_end = ram_chunk_end(block, chunk + reg->chunks);
3538                 /* avoid "-Waddress-of-packed-member" warning */
3539                 uint32_t tmp_rkey = 0;
3540                 if (qemu_rdma_register_and_get_keys(rdma, block,
3541                             (uintptr_t)host_addr, NULL, &tmp_rkey,
3542                             chunk, chunk_start, chunk_end)) {
3543                     error_report("cannot get rkey");
3544                     goto err;
3545                 }
3546                 reg_result->rkey = tmp_rkey;
3547 
3548                 reg_result->host_addr = (uintptr_t)block->local_host_addr;
3549 
3550                 trace_rdma_registration_handle_register_rkey(reg_result->rkey);
3551 
3552                 result_to_network(reg_result);
3553             }
3554 
3555             ret = qemu_rdma_post_send_control(rdma,
3556                             (uint8_t *) results, &reg_resp, &err);
3557 
3558             if (ret < 0) {
3559                 error_report_err(err);
3560                 goto err;
3561             }
3562             break;
3563         case RDMA_CONTROL_UNREGISTER_REQUEST:
3564             trace_rdma_registration_handle_unregister(head.repeat);
3565             unreg_resp.repeat = head.repeat;
3566             registers = (RDMARegister *) rdma->wr_data[idx].control_curr;
3567 
3568             for (int count = 0; count < head.repeat; count++) {
3569                 reg = &registers[count];
3570                 network_to_register(reg);
3571 
3572                 trace_rdma_registration_handle_unregister_loop(count,
3573                            reg->current_index, reg->key.chunk);
3574 
3575                 block = &(rdma->local_ram_blocks.block[reg->current_index]);
3576 
3577                 ret = ibv_dereg_mr(block->pmr[reg->key.chunk]);
3578                 block->pmr[reg->key.chunk] = NULL;
3579 
3580                 if (ret != 0) {
3581                     error_report("rdma unregistration chunk failed: %s",
3582                                  strerror(errno));
3583                     goto err;
3584                 }
3585 
3586                 rdma->total_registrations--;
3587 
3588                 trace_rdma_registration_handle_unregister_success(reg->key.chunk);
3589             }
3590 
3591             ret = qemu_rdma_post_send_control(rdma, NULL, &unreg_resp, &err);
3592 
3593             if (ret < 0) {
3594                 error_report_err(err);
3595                 goto err;
3596             }
3597             break;
3598         case RDMA_CONTROL_REGISTER_RESULT:
3599             error_report("Invalid RESULT message at dest.");
3600             goto err;
3601         default:
3602             error_report("Unknown control message %s", control_desc(head.type));
3603             goto err;
3604         }
3605     } while (1);
3606 
3607 err:
3608     rdma->errored = true;
3609     return -1;
3610 }
3611 
3612 /* Destination:
3613  * Called during the initial RAM load section which lists the
3614  * RAMBlocks by name.  This lets us know the order of the RAMBlocks on
3615  * the source.  We've already built our local RAMBlock list, but not
3616  * yet sent the list to the source.
3617  */
rdma_block_notification_handle(QEMUFile * f,const char * name)3618 int rdma_block_notification_handle(QEMUFile *f, const char *name)
3619 {
3620     int curr;
3621     int found = -1;
3622 
3623     if (!migrate_rdma()) {
3624         return 0;
3625     }
3626 
3627     RCU_READ_LOCK_GUARD();
3628     QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(qemu_file_get_ioc(f));
3629     RDMAContext *rdma = qatomic_rcu_read(&rioc->rdmain);
3630 
3631     if (!rdma) {
3632         return -1;
3633     }
3634 
3635     /* Find the matching RAMBlock in our local list */
3636     for (curr = 0; curr < rdma->local_ram_blocks.nb_blocks; curr++) {
3637         if (!strcmp(rdma->local_ram_blocks.block[curr].block_name, name)) {
3638             found = curr;
3639             break;
3640         }
3641     }
3642 
3643     if (found == -1) {
3644         error_report("RAMBlock '%s' not found on destination", name);
3645         return -1;
3646     }
3647 
3648     rdma->local_ram_blocks.block[curr].src_index = rdma->next_src_index;
3649     trace_rdma_block_notification_handle(name, rdma->next_src_index);
3650     rdma->next_src_index++;
3651 
3652     return 0;
3653 }
3654 
rdma_registration_start(QEMUFile * f,uint64_t flags)3655 int rdma_registration_start(QEMUFile *f, uint64_t flags)
3656 {
3657     if (!migrate_rdma()) {
3658         return 0;
3659     }
3660 
3661     QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(qemu_file_get_ioc(f));
3662     RCU_READ_LOCK_GUARD();
3663     RDMAContext *rdma = qatomic_rcu_read(&rioc->rdmaout);
3664     if (!rdma) {
3665         return -1;
3666     }
3667 
3668     if (rdma_errored(rdma)) {
3669         return -1;
3670     }
3671 
3672     trace_rdma_registration_start(flags);
3673     qemu_put_be64(f, RAM_SAVE_FLAG_HOOK);
3674     return qemu_fflush(f);
3675 }
3676 
3677 /*
3678  * Inform dest that dynamic registrations are done for now.
3679  * First, flush writes, if any.
3680  */
rdma_registration_stop(QEMUFile * f,uint64_t flags)3681 int rdma_registration_stop(QEMUFile *f, uint64_t flags)
3682 {
3683     QIOChannelRDMA *rioc;
3684     Error *err = NULL;
3685     RDMAContext *rdma;
3686     RDMAControlHeader head = { .len = 0, .repeat = 1 };
3687     int ret;
3688 
3689     if (!migrate_rdma()) {
3690         return 0;
3691     }
3692 
3693     RCU_READ_LOCK_GUARD();
3694     rioc = QIO_CHANNEL_RDMA(qemu_file_get_ioc(f));
3695     rdma = qatomic_rcu_read(&rioc->rdmaout);
3696     if (!rdma) {
3697         return -1;
3698     }
3699 
3700     if (rdma_errored(rdma)) {
3701         return -1;
3702     }
3703 
3704     qemu_fflush(f);
3705     ret = qemu_rdma_drain_cq(rdma);
3706 
3707     if (ret < 0) {
3708         goto err;
3709     }
3710 
3711     if (flags == RAM_CONTROL_SETUP) {
3712         RDMAControlHeader resp = {.type = RDMA_CONTROL_RAM_BLOCKS_RESULT };
3713         RDMALocalBlocks *local = &rdma->local_ram_blocks;
3714         int reg_result_idx, nb_dest_blocks;
3715 
3716         head.type = RDMA_CONTROL_RAM_BLOCKS_REQUEST;
3717         trace_rdma_registration_stop_ram();
3718 
3719         /*
3720          * Make sure that we parallelize the pinning on both sides.
3721          * For very large guests, doing this serially takes a really
3722          * long time, so we have to 'interleave' the pinning locally
3723          * with the control messages by performing the pinning on this
3724          * side before we receive the control response from the other
3725          * side that the pinning has completed.
3726          */
3727         ret = qemu_rdma_exchange_send(rdma, &head, NULL, &resp,
3728                     &reg_result_idx, rdma->pin_all ?
3729                     qemu_rdma_reg_whole_ram_blocks : NULL,
3730                     &err);
3731         if (ret < 0) {
3732             error_report_err(err);
3733             return -1;
3734         }
3735 
3736         nb_dest_blocks = resp.len / sizeof(RDMADestBlock);
3737 
3738         /*
3739          * The protocol uses two different sets of rkeys (mutually exclusive):
3740          * 1. One key to represent the virtual address of the entire ram block.
3741          *    (dynamic chunk registration disabled - pin everything with one rkey.)
3742          * 2. One to represent individual chunks within a ram block.
3743          *    (dynamic chunk registration enabled - pin individual chunks.)
3744          *
3745          * Once the capability is successfully negotiated, the destination transmits
3746          * the keys to use (or sends them later) including the virtual addresses
3747          * and then propagates the remote ram block descriptions to his local copy.
3748          */
3749 
3750         if (local->nb_blocks != nb_dest_blocks) {
3751             error_report("ram blocks mismatch (Number of blocks %d vs %d)",
3752                          local->nb_blocks, nb_dest_blocks);
3753             error_printf("Your QEMU command line parameters are probably "
3754                          "not identical on both the source and destination.");
3755             rdma->errored = true;
3756             return -1;
3757         }
3758 
3759         qemu_rdma_move_header(rdma, reg_result_idx, &resp);
3760         memcpy(rdma->dest_blocks,
3761             rdma->wr_data[reg_result_idx].control_curr, resp.len);
3762         for (int i = 0; i < nb_dest_blocks; i++) {
3763             network_to_dest_block(&rdma->dest_blocks[i]);
3764 
3765             /* We require that the blocks are in the same order */
3766             if (rdma->dest_blocks[i].length != local->block[i].length) {
3767                 error_report("Block %s/%d has a different length %" PRIu64
3768                              "vs %" PRIu64,
3769                              local->block[i].block_name, i,
3770                              local->block[i].length,
3771                              rdma->dest_blocks[i].length);
3772                 rdma->errored = true;
3773                 return -1;
3774             }
3775             local->block[i].remote_host_addr =
3776                     rdma->dest_blocks[i].remote_host_addr;
3777             local->block[i].remote_rkey = rdma->dest_blocks[i].remote_rkey;
3778         }
3779     }
3780 
3781     trace_rdma_registration_stop(flags);
3782 
3783     head.type = RDMA_CONTROL_REGISTER_FINISHED;
3784     ret = qemu_rdma_exchange_send(rdma, &head, NULL, NULL, NULL, NULL, &err);
3785 
3786     if (ret < 0) {
3787         error_report_err(err);
3788         goto err;
3789     }
3790 
3791     return 0;
3792 err:
3793     rdma->errored = true;
3794     return -1;
3795 }
3796 
qio_channel_rdma_finalize(Object * obj)3797 static void qio_channel_rdma_finalize(Object *obj)
3798 {
3799     QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(obj);
3800     if (rioc->rdmain) {
3801         qemu_rdma_cleanup(rioc->rdmain);
3802         g_free(rioc->rdmain);
3803         rioc->rdmain = NULL;
3804     }
3805     if (rioc->rdmaout) {
3806         qemu_rdma_cleanup(rioc->rdmaout);
3807         g_free(rioc->rdmaout);
3808         rioc->rdmaout = NULL;
3809     }
3810 }
3811 
qio_channel_rdma_class_init(ObjectClass * klass,const void * class_data G_GNUC_UNUSED)3812 static void qio_channel_rdma_class_init(ObjectClass *klass,
3813                                         const void *class_data G_GNUC_UNUSED)
3814 {
3815     QIOChannelClass *ioc_klass = QIO_CHANNEL_CLASS(klass);
3816 
3817     ioc_klass->io_writev = qio_channel_rdma_writev;
3818     ioc_klass->io_readv = qio_channel_rdma_readv;
3819     ioc_klass->io_set_blocking = qio_channel_rdma_set_blocking;
3820     ioc_klass->io_close = qio_channel_rdma_close;
3821     ioc_klass->io_create_watch = qio_channel_rdma_create_watch;
3822     ioc_klass->io_set_aio_fd_handler = qio_channel_rdma_set_aio_fd_handler;
3823     ioc_klass->io_shutdown = qio_channel_rdma_shutdown;
3824 }
3825 
3826 static const TypeInfo qio_channel_rdma_info = {
3827     .parent = TYPE_QIO_CHANNEL,
3828     .name = TYPE_QIO_CHANNEL_RDMA,
3829     .instance_size = sizeof(QIOChannelRDMA),
3830     .instance_finalize = qio_channel_rdma_finalize,
3831     .class_init = qio_channel_rdma_class_init,
3832 };
3833 
qio_channel_rdma_register_types(void)3834 static void qio_channel_rdma_register_types(void)
3835 {
3836     type_register_static(&qio_channel_rdma_info);
3837 }
3838 
3839 type_init(qio_channel_rdma_register_types);
3840 
rdma_new_input(RDMAContext * rdma)3841 static QEMUFile *rdma_new_input(RDMAContext *rdma)
3842 {
3843     QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(object_new(TYPE_QIO_CHANNEL_RDMA));
3844 
3845     rioc->file = qemu_file_new_input(QIO_CHANNEL(rioc));
3846     rioc->rdmain = rdma;
3847     rioc->rdmaout = rdma->return_path;
3848 
3849     return rioc->file;
3850 }
3851 
rdma_new_output(RDMAContext * rdma)3852 static QEMUFile *rdma_new_output(RDMAContext *rdma)
3853 {
3854     QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(object_new(TYPE_QIO_CHANNEL_RDMA));
3855 
3856     rioc->file = qemu_file_new_output(QIO_CHANNEL(rioc));
3857     rioc->rdmaout = rdma;
3858     rioc->rdmain = rdma->return_path;
3859 
3860     return rioc->file;
3861 }
3862 
rdma_accept_incoming_migration(void * opaque)3863 static void rdma_accept_incoming_migration(void *opaque)
3864 {
3865     RDMAContext *rdma = opaque;
3866     QEMUFile *f;
3867 
3868     trace_qemu_rdma_accept_incoming_migration();
3869     if (qemu_rdma_accept(rdma) < 0) {
3870         error_report("RDMA ERROR: Migration initialization failed");
3871         return;
3872     }
3873 
3874     trace_qemu_rdma_accept_incoming_migration_accepted();
3875 
3876     if (rdma->is_return_path) {
3877         return;
3878     }
3879 
3880     f = rdma_new_input(rdma);
3881     if (f == NULL) {
3882         error_report("RDMA ERROR: could not open RDMA for input");
3883         qemu_rdma_cleanup(rdma);
3884         return;
3885     }
3886 
3887     rdma->migration_started_on_destination = 1;
3888     migration_fd_process_incoming(f);
3889 }
3890 
rdma_start_incoming_migration(InetSocketAddress * host_port,Error ** errp)3891 void rdma_start_incoming_migration(InetSocketAddress *host_port,
3892                                    Error **errp)
3893 {
3894     MigrationState *s = migrate_get_current();
3895     int ret;
3896     RDMAContext *rdma;
3897 
3898     trace_rdma_start_incoming_migration();
3899 
3900     /* Avoid ram_block_discard_disable(), cannot change during migration. */
3901     if (ram_block_discard_is_required()) {
3902         error_setg(errp, "RDMA: cannot disable RAM discard");
3903         return;
3904     }
3905 
3906     rdma = qemu_rdma_data_init(host_port, errp);
3907     if (rdma == NULL) {
3908         goto err;
3909     }
3910 
3911     ret = qemu_rdma_dest_init(rdma, errp);
3912     if (ret < 0) {
3913         goto err;
3914     }
3915 
3916     trace_rdma_start_incoming_migration_after_dest_init();
3917 
3918     ret = rdma_listen(rdma->listen_id, 5);
3919 
3920     if (ret < 0) {
3921         error_setg(errp, "RDMA ERROR: listening on socket!");
3922         goto cleanup_rdma;
3923     }
3924 
3925     trace_rdma_start_incoming_migration_after_rdma_listen();
3926     s->rdma_migration = true;
3927     qemu_set_fd_handler(rdma->channel->fd, rdma_accept_incoming_migration,
3928                         NULL, (void *)(intptr_t)rdma);
3929     return;
3930 
3931 cleanup_rdma:
3932     qemu_rdma_cleanup(rdma);
3933 err:
3934     if (rdma) {
3935         g_free(rdma->host);
3936     }
3937     g_free(rdma);
3938 }
3939 
rdma_start_outgoing_migration(void * opaque,InetSocketAddress * host_port,Error ** errp)3940 void rdma_start_outgoing_migration(void *opaque,
3941                             InetSocketAddress *host_port, Error **errp)
3942 {
3943     MigrationState *s = opaque;
3944     RDMAContext *rdma_return_path = NULL;
3945     RDMAContext *rdma;
3946     int ret;
3947 
3948     /* Avoid ram_block_discard_disable(), cannot change during migration. */
3949     if (ram_block_discard_is_required()) {
3950         error_setg(errp, "RDMA: cannot disable RAM discard");
3951         return;
3952     }
3953 
3954     rdma = qemu_rdma_data_init(host_port, errp);
3955     if (rdma == NULL) {
3956         goto err;
3957     }
3958 
3959     ret = qemu_rdma_source_init(rdma, migrate_rdma_pin_all(), errp);
3960 
3961     if (ret < 0) {
3962         goto err;
3963     }
3964 
3965     trace_rdma_start_outgoing_migration_after_rdma_source_init();
3966     ret = qemu_rdma_connect(rdma, false, errp);
3967 
3968     if (ret < 0) {
3969         goto err;
3970     }
3971 
3972     /* RDMA postcopy need a separate queue pair for return path */
3973     if (migrate_postcopy() || migrate_return_path()) {
3974         rdma_return_path = qemu_rdma_data_init(host_port, errp);
3975 
3976         if (rdma_return_path == NULL) {
3977             goto return_path_err;
3978         }
3979 
3980         ret = qemu_rdma_source_init(rdma_return_path,
3981                                     migrate_rdma_pin_all(), errp);
3982 
3983         if (ret < 0) {
3984             goto return_path_err;
3985         }
3986 
3987         ret = qemu_rdma_connect(rdma_return_path, true, errp);
3988 
3989         if (ret < 0) {
3990             goto return_path_err;
3991         }
3992 
3993         rdma->return_path = rdma_return_path;
3994         rdma_return_path->return_path = rdma;
3995         rdma_return_path->is_return_path = true;
3996     }
3997 
3998     trace_rdma_start_outgoing_migration_after_rdma_connect();
3999 
4000     s->to_dst_file = rdma_new_output(rdma);
4001     s->rdma_migration = true;
4002     migration_connect(s, NULL);
4003     return;
4004 return_path_err:
4005     qemu_rdma_cleanup(rdma);
4006 err:
4007     g_free(rdma);
4008     g_free(rdma_return_path);
4009 }
4010