xref: /qemu/migration/rdma.c (revision f2b901098e14ad1aaffab82464917b8679499cc5)
1  /*
2   * RDMA protocol and interfaces
3   *
4   * Copyright IBM, Corp. 2010-2013
5   * Copyright Red Hat, Inc. 2015-2016
6   *
7   * Authors:
8   *  Michael R. Hines <mrhines@us.ibm.com>
9   *  Jiuxing Liu <jl@us.ibm.com>
10   *  Daniel P. Berrange <berrange@redhat.com>
11   *
12   * This work is licensed under the terms of the GNU GPL, version 2 or
13   * later.  See the COPYING file in the top-level directory.
14   *
15   */
16  
17  #include "qemu/osdep.h"
18  #include "qapi/error.h"
19  #include "qemu/cutils.h"
20  #include "rdma.h"
21  #include "migration.h"
22  #include "qemu-file.h"
23  #include "ram.h"
24  #include "qemu/error-report.h"
25  #include "qemu/main-loop.h"
26  #include "qemu/module.h"
27  #include "qemu/rcu.h"
28  #include "qemu/sockets.h"
29  #include "qemu/bitmap.h"
30  #include "qemu/coroutine.h"
31  #include "exec/memory.h"
32  #include <sys/socket.h>
33  #include <netdb.h>
34  #include <arpa/inet.h>
35  #include <rdma/rdma_cma.h>
36  #include "trace.h"
37  #include "qom/object.h"
38  #include <poll.h>
39  
40  /*
41   * Print and error on both the Monitor and the Log file.
42   */
43  #define ERROR(errp, fmt, ...) \
44      do { \
45          fprintf(stderr, "RDMA ERROR: " fmt "\n", ## __VA_ARGS__); \
46          if (errp && (*(errp) == NULL)) { \
47              error_setg(errp, "RDMA ERROR: " fmt, ## __VA_ARGS__); \
48          } \
49      } while (0)
50  
51  #define RDMA_RESOLVE_TIMEOUT_MS 10000
52  
53  /* Do not merge data if larger than this. */
54  #define RDMA_MERGE_MAX (2 * 1024 * 1024)
55  #define RDMA_SIGNALED_SEND_MAX (RDMA_MERGE_MAX / 4096)
56  
57  #define RDMA_REG_CHUNK_SHIFT 20 /* 1 MB */
58  
59  /*
60   * This is only for non-live state being migrated.
61   * Instead of RDMA_WRITE messages, we use RDMA_SEND
62   * messages for that state, which requires a different
63   * delivery design than main memory.
64   */
65  #define RDMA_SEND_INCREMENT 32768
66  
67  /*
68   * Maximum size infiniband SEND message
69   */
70  #define RDMA_CONTROL_MAX_BUFFER (512 * 1024)
71  #define RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE 4096
72  
73  #define RDMA_CONTROL_VERSION_CURRENT 1
74  /*
75   * Capabilities for negotiation.
76   */
77  #define RDMA_CAPABILITY_PIN_ALL 0x01
78  
79  /*
80   * Add the other flags above to this list of known capabilities
81   * as they are introduced.
82   */
83  static uint32_t known_capabilities = RDMA_CAPABILITY_PIN_ALL;
84  
85  #define CHECK_ERROR_STATE() \
86      do { \
87          if (rdma->error_state) { \
88              if (!rdma->error_reported) { \
89                  error_report("RDMA is in an error state waiting migration" \
90                                  " to abort!"); \
91                  rdma->error_reported = 1; \
92              } \
93              return rdma->error_state; \
94          } \
95      } while (0)
96  
97  /*
98   * A work request ID is 64-bits and we split up these bits
99   * into 3 parts:
100   *
101   * bits 0-15 : type of control message, 2^16
102   * bits 16-29: ram block index, 2^14
103   * bits 30-63: ram block chunk number, 2^34
104   *
105   * The last two bit ranges are only used for RDMA writes,
106   * in order to track their completion and potentially
107   * also track unregistration status of the message.
108   */
109  #define RDMA_WRID_TYPE_SHIFT  0UL
110  #define RDMA_WRID_BLOCK_SHIFT 16UL
111  #define RDMA_WRID_CHUNK_SHIFT 30UL
112  
113  #define RDMA_WRID_TYPE_MASK \
114      ((1UL << RDMA_WRID_BLOCK_SHIFT) - 1UL)
115  
116  #define RDMA_WRID_BLOCK_MASK \
117      (~RDMA_WRID_TYPE_MASK & ((1UL << RDMA_WRID_CHUNK_SHIFT) - 1UL))
118  
119  #define RDMA_WRID_CHUNK_MASK (~RDMA_WRID_BLOCK_MASK & ~RDMA_WRID_TYPE_MASK)
120  
121  /*
122   * RDMA migration protocol:
123   * 1. RDMA Writes (data messages, i.e. RAM)
124   * 2. IB Send/Recv (control channel messages)
125   */
126  enum {
127      RDMA_WRID_NONE = 0,
128      RDMA_WRID_RDMA_WRITE = 1,
129      RDMA_WRID_SEND_CONTROL = 2000,
130      RDMA_WRID_RECV_CONTROL = 4000,
131  };
132  
133  static const char *wrid_desc[] = {
134      [RDMA_WRID_NONE] = "NONE",
135      [RDMA_WRID_RDMA_WRITE] = "WRITE RDMA",
136      [RDMA_WRID_SEND_CONTROL] = "CONTROL SEND",
137      [RDMA_WRID_RECV_CONTROL] = "CONTROL RECV",
138  };
139  
140  /*
141   * Work request IDs for IB SEND messages only (not RDMA writes).
142   * This is used by the migration protocol to transmit
143   * control messages (such as device state and registration commands)
144   *
145   * We could use more WRs, but we have enough for now.
146   */
147  enum {
148      RDMA_WRID_READY = 0,
149      RDMA_WRID_DATA,
150      RDMA_WRID_CONTROL,
151      RDMA_WRID_MAX,
152  };
153  
154  /*
155   * SEND/RECV IB Control Messages.
156   */
157  enum {
158      RDMA_CONTROL_NONE = 0,
159      RDMA_CONTROL_ERROR,
160      RDMA_CONTROL_READY,               /* ready to receive */
161      RDMA_CONTROL_QEMU_FILE,           /* QEMUFile-transmitted bytes */
162      RDMA_CONTROL_RAM_BLOCKS_REQUEST,  /* RAMBlock synchronization */
163      RDMA_CONTROL_RAM_BLOCKS_RESULT,   /* RAMBlock synchronization */
164      RDMA_CONTROL_COMPRESS,            /* page contains repeat values */
165      RDMA_CONTROL_REGISTER_REQUEST,    /* dynamic page registration */
166      RDMA_CONTROL_REGISTER_RESULT,     /* key to use after registration */
167      RDMA_CONTROL_REGISTER_FINISHED,   /* current iteration finished */
168      RDMA_CONTROL_UNREGISTER_REQUEST,  /* dynamic UN-registration */
169      RDMA_CONTROL_UNREGISTER_FINISHED, /* unpinning finished */
170  };
171  
172  
173  /*
174   * Memory and MR structures used to represent an IB Send/Recv work request.
175   * This is *not* used for RDMA writes, only IB Send/Recv.
176   */
177  typedef struct {
178      uint8_t  control[RDMA_CONTROL_MAX_BUFFER]; /* actual buffer to register */
179      struct   ibv_mr *control_mr;               /* registration metadata */
180      size_t   control_len;                      /* length of the message */
181      uint8_t *control_curr;                     /* start of unconsumed bytes */
182  } RDMAWorkRequestData;
183  
184  /*
185   * Negotiate RDMA capabilities during connection-setup time.
186   */
187  typedef struct {
188      uint32_t version;
189      uint32_t flags;
190  } RDMACapabilities;
191  
192  static void caps_to_network(RDMACapabilities *cap)
193  {
194      cap->version = htonl(cap->version);
195      cap->flags = htonl(cap->flags);
196  }
197  
198  static void network_to_caps(RDMACapabilities *cap)
199  {
200      cap->version = ntohl(cap->version);
201      cap->flags = ntohl(cap->flags);
202  }
203  
204  /*
205   * Representation of a RAMBlock from an RDMA perspective.
206   * This is not transmitted, only local.
207   * This and subsequent structures cannot be linked lists
208   * because we're using a single IB message to transmit
209   * the information. It's small anyway, so a list is overkill.
210   */
211  typedef struct RDMALocalBlock {
212      char          *block_name;
213      uint8_t       *local_host_addr; /* local virtual address */
214      uint64_t       remote_host_addr; /* remote virtual address */
215      uint64_t       offset;
216      uint64_t       length;
217      struct         ibv_mr **pmr;    /* MRs for chunk-level registration */
218      struct         ibv_mr *mr;      /* MR for non-chunk-level registration */
219      uint32_t      *remote_keys;     /* rkeys for chunk-level registration */
220      uint32_t       remote_rkey;     /* rkeys for non-chunk-level registration */
221      int            index;           /* which block are we */
222      unsigned int   src_index;       /* (Only used on dest) */
223      bool           is_ram_block;
224      int            nb_chunks;
225      unsigned long *transit_bitmap;
226      unsigned long *unregister_bitmap;
227  } RDMALocalBlock;
228  
229  /*
230   * Also represents a RAMblock, but only on the dest.
231   * This gets transmitted by the dest during connection-time
232   * to the source VM and then is used to populate the
233   * corresponding RDMALocalBlock with
234   * the information needed to perform the actual RDMA.
235   */
236  typedef struct QEMU_PACKED RDMADestBlock {
237      uint64_t remote_host_addr;
238      uint64_t offset;
239      uint64_t length;
240      uint32_t remote_rkey;
241      uint32_t padding;
242  } RDMADestBlock;
243  
244  static const char *control_desc(unsigned int rdma_control)
245  {
246      static const char *strs[] = {
247          [RDMA_CONTROL_NONE] = "NONE",
248          [RDMA_CONTROL_ERROR] = "ERROR",
249          [RDMA_CONTROL_READY] = "READY",
250          [RDMA_CONTROL_QEMU_FILE] = "QEMU FILE",
251          [RDMA_CONTROL_RAM_BLOCKS_REQUEST] = "RAM BLOCKS REQUEST",
252          [RDMA_CONTROL_RAM_BLOCKS_RESULT] = "RAM BLOCKS RESULT",
253          [RDMA_CONTROL_COMPRESS] = "COMPRESS",
254          [RDMA_CONTROL_REGISTER_REQUEST] = "REGISTER REQUEST",
255          [RDMA_CONTROL_REGISTER_RESULT] = "REGISTER RESULT",
256          [RDMA_CONTROL_REGISTER_FINISHED] = "REGISTER FINISHED",
257          [RDMA_CONTROL_UNREGISTER_REQUEST] = "UNREGISTER REQUEST",
258          [RDMA_CONTROL_UNREGISTER_FINISHED] = "UNREGISTER FINISHED",
259      };
260  
261      if (rdma_control > RDMA_CONTROL_UNREGISTER_FINISHED) {
262          return "??BAD CONTROL VALUE??";
263      }
264  
265      return strs[rdma_control];
266  }
267  
268  static uint64_t htonll(uint64_t v)
269  {
270      union { uint32_t lv[2]; uint64_t llv; } u;
271      u.lv[0] = htonl(v >> 32);
272      u.lv[1] = htonl(v & 0xFFFFFFFFULL);
273      return u.llv;
274  }
275  
276  static uint64_t ntohll(uint64_t v)
277  {
278      union { uint32_t lv[2]; uint64_t llv; } u;
279      u.llv = v;
280      return ((uint64_t)ntohl(u.lv[0]) << 32) | (uint64_t) ntohl(u.lv[1]);
281  }
282  
283  static void dest_block_to_network(RDMADestBlock *db)
284  {
285      db->remote_host_addr = htonll(db->remote_host_addr);
286      db->offset = htonll(db->offset);
287      db->length = htonll(db->length);
288      db->remote_rkey = htonl(db->remote_rkey);
289  }
290  
291  static void network_to_dest_block(RDMADestBlock *db)
292  {
293      db->remote_host_addr = ntohll(db->remote_host_addr);
294      db->offset = ntohll(db->offset);
295      db->length = ntohll(db->length);
296      db->remote_rkey = ntohl(db->remote_rkey);
297  }
298  
299  /*
300   * Virtual address of the above structures used for transmitting
301   * the RAMBlock descriptions at connection-time.
302   * This structure is *not* transmitted.
303   */
304  typedef struct RDMALocalBlocks {
305      int nb_blocks;
306      bool     init;             /* main memory init complete */
307      RDMALocalBlock *block;
308  } RDMALocalBlocks;
309  
310  /*
311   * Main data structure for RDMA state.
312   * While there is only one copy of this structure being allocated right now,
313   * this is the place where one would start if you wanted to consider
314   * having more than one RDMA connection open at the same time.
315   */
316  typedef struct RDMAContext {
317      char *host;
318      int port;
319      char *host_port;
320  
321      RDMAWorkRequestData wr_data[RDMA_WRID_MAX];
322  
323      /*
324       * This is used by *_exchange_send() to figure out whether or not
325       * the initial "READY" message has already been received or not.
326       * This is because other functions may potentially poll() and detect
327       * the READY message before send() does, in which case we need to
328       * know if it completed.
329       */
330      int control_ready_expected;
331  
332      /* number of outstanding writes */
333      int nb_sent;
334  
335      /* store info about current buffer so that we can
336         merge it with future sends */
337      uint64_t current_addr;
338      uint64_t current_length;
339      /* index of ram block the current buffer belongs to */
340      int current_index;
341      /* index of the chunk in the current ram block */
342      int current_chunk;
343  
344      bool pin_all;
345  
346      /*
347       * infiniband-specific variables for opening the device
348       * and maintaining connection state and so forth.
349       *
350       * cm_id also has ibv_context, rdma_event_channel, and ibv_qp in
351       * cm_id->verbs, cm_id->channel, and cm_id->qp.
352       */
353      struct rdma_cm_id *cm_id;               /* connection manager ID */
354      struct rdma_cm_id *listen_id;
355      bool connected;
356  
357      struct ibv_context          *verbs;
358      struct rdma_event_channel   *channel;
359      struct ibv_qp *qp;                      /* queue pair */
360      struct ibv_comp_channel *recv_comp_channel;  /* recv completion channel */
361      struct ibv_comp_channel *send_comp_channel;  /* send completion channel */
362      struct ibv_pd *pd;                      /* protection domain */
363      struct ibv_cq *recv_cq;                 /* recvieve completion queue */
364      struct ibv_cq *send_cq;                 /* send completion queue */
365  
366      /*
367       * If a previous write failed (perhaps because of a failed
368       * memory registration, then do not attempt any future work
369       * and remember the error state.
370       */
371      int error_state;
372      int error_reported;
373      int received_error;
374  
375      /*
376       * Description of ram blocks used throughout the code.
377       */
378      RDMALocalBlocks local_ram_blocks;
379      RDMADestBlock  *dest_blocks;
380  
381      /* Index of the next RAMBlock received during block registration */
382      unsigned int    next_src_index;
383  
384      /*
385       * Migration on *destination* started.
386       * Then use coroutine yield function.
387       * Source runs in a thread, so we don't care.
388       */
389      int migration_started_on_destination;
390  
391      int total_registrations;
392      int total_writes;
393  
394      int unregister_current, unregister_next;
395      uint64_t unregistrations[RDMA_SIGNALED_SEND_MAX];
396  
397      GHashTable *blockmap;
398  
399      /* the RDMAContext for return path */
400      struct RDMAContext *return_path;
401      bool is_return_path;
402  } RDMAContext;
403  
404  #define TYPE_QIO_CHANNEL_RDMA "qio-channel-rdma"
405  OBJECT_DECLARE_SIMPLE_TYPE(QIOChannelRDMA, QIO_CHANNEL_RDMA)
406  
407  
408  
409  struct QIOChannelRDMA {
410      QIOChannel parent;
411      RDMAContext *rdmain;
412      RDMAContext *rdmaout;
413      QEMUFile *file;
414      bool blocking; /* XXX we don't actually honour this yet */
415  };
416  
417  /*
418   * Main structure for IB Send/Recv control messages.
419   * This gets prepended at the beginning of every Send/Recv.
420   */
421  typedef struct QEMU_PACKED {
422      uint32_t len;     /* Total length of data portion */
423      uint32_t type;    /* which control command to perform */
424      uint32_t repeat;  /* number of commands in data portion of same type */
425      uint32_t padding;
426  } RDMAControlHeader;
427  
428  static void control_to_network(RDMAControlHeader *control)
429  {
430      control->type = htonl(control->type);
431      control->len = htonl(control->len);
432      control->repeat = htonl(control->repeat);
433  }
434  
435  static void network_to_control(RDMAControlHeader *control)
436  {
437      control->type = ntohl(control->type);
438      control->len = ntohl(control->len);
439      control->repeat = ntohl(control->repeat);
440  }
441  
442  /*
443   * Register a single Chunk.
444   * Information sent by the source VM to inform the dest
445   * to register an single chunk of memory before we can perform
446   * the actual RDMA operation.
447   */
448  typedef struct QEMU_PACKED {
449      union QEMU_PACKED {
450          uint64_t current_addr;  /* offset into the ram_addr_t space */
451          uint64_t chunk;         /* chunk to lookup if unregistering */
452      } key;
453      uint32_t current_index; /* which ramblock the chunk belongs to */
454      uint32_t padding;
455      uint64_t chunks;            /* how many sequential chunks to register */
456  } RDMARegister;
457  
458  static void register_to_network(RDMAContext *rdma, RDMARegister *reg)
459  {
460      RDMALocalBlock *local_block;
461      local_block  = &rdma->local_ram_blocks.block[reg->current_index];
462  
463      if (local_block->is_ram_block) {
464          /*
465           * current_addr as passed in is an address in the local ram_addr_t
466           * space, we need to translate this for the destination
467           */
468          reg->key.current_addr -= local_block->offset;
469          reg->key.current_addr += rdma->dest_blocks[reg->current_index].offset;
470      }
471      reg->key.current_addr = htonll(reg->key.current_addr);
472      reg->current_index = htonl(reg->current_index);
473      reg->chunks = htonll(reg->chunks);
474  }
475  
476  static void network_to_register(RDMARegister *reg)
477  {
478      reg->key.current_addr = ntohll(reg->key.current_addr);
479      reg->current_index = ntohl(reg->current_index);
480      reg->chunks = ntohll(reg->chunks);
481  }
482  
483  typedef struct QEMU_PACKED {
484      uint32_t value;     /* if zero, we will madvise() */
485      uint32_t block_idx; /* which ram block index */
486      uint64_t offset;    /* Address in remote ram_addr_t space */
487      uint64_t length;    /* length of the chunk */
488  } RDMACompress;
489  
490  static void compress_to_network(RDMAContext *rdma, RDMACompress *comp)
491  {
492      comp->value = htonl(comp->value);
493      /*
494       * comp->offset as passed in is an address in the local ram_addr_t
495       * space, we need to translate this for the destination
496       */
497      comp->offset -= rdma->local_ram_blocks.block[comp->block_idx].offset;
498      comp->offset += rdma->dest_blocks[comp->block_idx].offset;
499      comp->block_idx = htonl(comp->block_idx);
500      comp->offset = htonll(comp->offset);
501      comp->length = htonll(comp->length);
502  }
503  
504  static void network_to_compress(RDMACompress *comp)
505  {
506      comp->value = ntohl(comp->value);
507      comp->block_idx = ntohl(comp->block_idx);
508      comp->offset = ntohll(comp->offset);
509      comp->length = ntohll(comp->length);
510  }
511  
512  /*
513   * The result of the dest's memory registration produces an "rkey"
514   * which the source VM must reference in order to perform
515   * the RDMA operation.
516   */
517  typedef struct QEMU_PACKED {
518      uint32_t rkey;
519      uint32_t padding;
520      uint64_t host_addr;
521  } RDMARegisterResult;
522  
523  static void result_to_network(RDMARegisterResult *result)
524  {
525      result->rkey = htonl(result->rkey);
526      result->host_addr = htonll(result->host_addr);
527  };
528  
529  static void network_to_result(RDMARegisterResult *result)
530  {
531      result->rkey = ntohl(result->rkey);
532      result->host_addr = ntohll(result->host_addr);
533  };
534  
535  const char *print_wrid(int wrid);
536  static int qemu_rdma_exchange_send(RDMAContext *rdma, RDMAControlHeader *head,
537                                     uint8_t *data, RDMAControlHeader *resp,
538                                     int *resp_idx,
539                                     int (*callback)(RDMAContext *rdma));
540  
541  static inline uint64_t ram_chunk_index(const uint8_t *start,
542                                         const uint8_t *host)
543  {
544      return ((uintptr_t) host - (uintptr_t) start) >> RDMA_REG_CHUNK_SHIFT;
545  }
546  
547  static inline uint8_t *ram_chunk_start(const RDMALocalBlock *rdma_ram_block,
548                                         uint64_t i)
549  {
550      return (uint8_t *)(uintptr_t)(rdma_ram_block->local_host_addr +
551                                    (i << RDMA_REG_CHUNK_SHIFT));
552  }
553  
554  static inline uint8_t *ram_chunk_end(const RDMALocalBlock *rdma_ram_block,
555                                       uint64_t i)
556  {
557      uint8_t *result = ram_chunk_start(rdma_ram_block, i) +
558                                           (1UL << RDMA_REG_CHUNK_SHIFT);
559  
560      if (result > (rdma_ram_block->local_host_addr + rdma_ram_block->length)) {
561          result = rdma_ram_block->local_host_addr + rdma_ram_block->length;
562      }
563  
564      return result;
565  }
566  
567  static int rdma_add_block(RDMAContext *rdma, const char *block_name,
568                           void *host_addr,
569                           ram_addr_t block_offset, uint64_t length)
570  {
571      RDMALocalBlocks *local = &rdma->local_ram_blocks;
572      RDMALocalBlock *block;
573      RDMALocalBlock *old = local->block;
574  
575      local->block = g_new0(RDMALocalBlock, local->nb_blocks + 1);
576  
577      if (local->nb_blocks) {
578          int x;
579  
580          if (rdma->blockmap) {
581              for (x = 0; x < local->nb_blocks; x++) {
582                  g_hash_table_remove(rdma->blockmap,
583                                      (void *)(uintptr_t)old[x].offset);
584                  g_hash_table_insert(rdma->blockmap,
585                                      (void *)(uintptr_t)old[x].offset,
586                                      &local->block[x]);
587              }
588          }
589          memcpy(local->block, old, sizeof(RDMALocalBlock) * local->nb_blocks);
590          g_free(old);
591      }
592  
593      block = &local->block[local->nb_blocks];
594  
595      block->block_name = g_strdup(block_name);
596      block->local_host_addr = host_addr;
597      block->offset = block_offset;
598      block->length = length;
599      block->index = local->nb_blocks;
600      block->src_index = ~0U; /* Filled in by the receipt of the block list */
601      block->nb_chunks = ram_chunk_index(host_addr, host_addr + length) + 1UL;
602      block->transit_bitmap = bitmap_new(block->nb_chunks);
603      bitmap_clear(block->transit_bitmap, 0, block->nb_chunks);
604      block->unregister_bitmap = bitmap_new(block->nb_chunks);
605      bitmap_clear(block->unregister_bitmap, 0, block->nb_chunks);
606      block->remote_keys = g_new0(uint32_t, block->nb_chunks);
607  
608      block->is_ram_block = local->init ? false : true;
609  
610      if (rdma->blockmap) {
611          g_hash_table_insert(rdma->blockmap, (void *)(uintptr_t)block_offset, block);
612      }
613  
614      trace_rdma_add_block(block_name, local->nb_blocks,
615                           (uintptr_t) block->local_host_addr,
616                           block->offset, block->length,
617                           (uintptr_t) (block->local_host_addr + block->length),
618                           BITS_TO_LONGS(block->nb_chunks) *
619                               sizeof(unsigned long) * 8,
620                           block->nb_chunks);
621  
622      local->nb_blocks++;
623  
624      return 0;
625  }
626  
627  /*
628   * Memory regions need to be registered with the device and queue pairs setup
629   * in advanced before the migration starts. This tells us where the RAM blocks
630   * are so that we can register them individually.
631   */
632  static int qemu_rdma_init_one_block(RAMBlock *rb, void *opaque)
633  {
634      const char *block_name = qemu_ram_get_idstr(rb);
635      void *host_addr = qemu_ram_get_host_addr(rb);
636      ram_addr_t block_offset = qemu_ram_get_offset(rb);
637      ram_addr_t length = qemu_ram_get_used_length(rb);
638      return rdma_add_block(opaque, block_name, host_addr, block_offset, length);
639  }
640  
641  /*
642   * Identify the RAMBlocks and their quantity. They will be references to
643   * identify chunk boundaries inside each RAMBlock and also be referenced
644   * during dynamic page registration.
645   */
646  static int qemu_rdma_init_ram_blocks(RDMAContext *rdma)
647  {
648      RDMALocalBlocks *local = &rdma->local_ram_blocks;
649      int ret;
650  
651      assert(rdma->blockmap == NULL);
652      memset(local, 0, sizeof *local);
653      ret = foreach_not_ignored_block(qemu_rdma_init_one_block, rdma);
654      if (ret) {
655          return ret;
656      }
657      trace_qemu_rdma_init_ram_blocks(local->nb_blocks);
658      rdma->dest_blocks = g_new0(RDMADestBlock,
659                                 rdma->local_ram_blocks.nb_blocks);
660      local->init = true;
661      return 0;
662  }
663  
664  /*
665   * Note: If used outside of cleanup, the caller must ensure that the destination
666   * block structures are also updated
667   */
668  static int rdma_delete_block(RDMAContext *rdma, RDMALocalBlock *block)
669  {
670      RDMALocalBlocks *local = &rdma->local_ram_blocks;
671      RDMALocalBlock *old = local->block;
672      int x;
673  
674      if (rdma->blockmap) {
675          g_hash_table_remove(rdma->blockmap, (void *)(uintptr_t)block->offset);
676      }
677      if (block->pmr) {
678          int j;
679  
680          for (j = 0; j < block->nb_chunks; j++) {
681              if (!block->pmr[j]) {
682                  continue;
683              }
684              ibv_dereg_mr(block->pmr[j]);
685              rdma->total_registrations--;
686          }
687          g_free(block->pmr);
688          block->pmr = NULL;
689      }
690  
691      if (block->mr) {
692          ibv_dereg_mr(block->mr);
693          rdma->total_registrations--;
694          block->mr = NULL;
695      }
696  
697      g_free(block->transit_bitmap);
698      block->transit_bitmap = NULL;
699  
700      g_free(block->unregister_bitmap);
701      block->unregister_bitmap = NULL;
702  
703      g_free(block->remote_keys);
704      block->remote_keys = NULL;
705  
706      g_free(block->block_name);
707      block->block_name = NULL;
708  
709      if (rdma->blockmap) {
710          for (x = 0; x < local->nb_blocks; x++) {
711              g_hash_table_remove(rdma->blockmap,
712                                  (void *)(uintptr_t)old[x].offset);
713          }
714      }
715  
716      if (local->nb_blocks > 1) {
717  
718          local->block = g_new0(RDMALocalBlock, local->nb_blocks - 1);
719  
720          if (block->index) {
721              memcpy(local->block, old, sizeof(RDMALocalBlock) * block->index);
722          }
723  
724          if (block->index < (local->nb_blocks - 1)) {
725              memcpy(local->block + block->index, old + (block->index + 1),
726                  sizeof(RDMALocalBlock) *
727                      (local->nb_blocks - (block->index + 1)));
728              for (x = block->index; x < local->nb_blocks - 1; x++) {
729                  local->block[x].index--;
730              }
731          }
732      } else {
733          assert(block == local->block);
734          local->block = NULL;
735      }
736  
737      trace_rdma_delete_block(block, (uintptr_t)block->local_host_addr,
738                             block->offset, block->length,
739                              (uintptr_t)(block->local_host_addr + block->length),
740                             BITS_TO_LONGS(block->nb_chunks) *
741                                 sizeof(unsigned long) * 8, block->nb_chunks);
742  
743      g_free(old);
744  
745      local->nb_blocks--;
746  
747      if (local->nb_blocks && rdma->blockmap) {
748          for (x = 0; x < local->nb_blocks; x++) {
749              g_hash_table_insert(rdma->blockmap,
750                                  (void *)(uintptr_t)local->block[x].offset,
751                                  &local->block[x]);
752          }
753      }
754  
755      return 0;
756  }
757  
758  /*
759   * Put in the log file which RDMA device was opened and the details
760   * associated with that device.
761   */
762  static void qemu_rdma_dump_id(const char *who, struct ibv_context *verbs)
763  {
764      struct ibv_port_attr port;
765  
766      if (ibv_query_port(verbs, 1, &port)) {
767          error_report("Failed to query port information");
768          return;
769      }
770  
771      printf("%s RDMA Device opened: kernel name %s "
772             "uverbs device name %s, "
773             "infiniband_verbs class device path %s, "
774             "infiniband class device path %s, "
775             "transport: (%d) %s\n",
776                  who,
777                  verbs->device->name,
778                  verbs->device->dev_name,
779                  verbs->device->dev_path,
780                  verbs->device->ibdev_path,
781                  port.link_layer,
782                  (port.link_layer == IBV_LINK_LAYER_INFINIBAND) ? "Infiniband" :
783                   ((port.link_layer == IBV_LINK_LAYER_ETHERNET)
784                      ? "Ethernet" : "Unknown"));
785  }
786  
787  /*
788   * Put in the log file the RDMA gid addressing information,
789   * useful for folks who have trouble understanding the
790   * RDMA device hierarchy in the kernel.
791   */
792  static void qemu_rdma_dump_gid(const char *who, struct rdma_cm_id *id)
793  {
794      char sgid[33];
795      char dgid[33];
796      inet_ntop(AF_INET6, &id->route.addr.addr.ibaddr.sgid, sgid, sizeof sgid);
797      inet_ntop(AF_INET6, &id->route.addr.addr.ibaddr.dgid, dgid, sizeof dgid);
798      trace_qemu_rdma_dump_gid(who, sgid, dgid);
799  }
800  
801  /*
802   * As of now, IPv6 over RoCE / iWARP is not supported by linux.
803   * We will try the next addrinfo struct, and fail if there are
804   * no other valid addresses to bind against.
805   *
806   * If user is listening on '[::]', then we will not have a opened a device
807   * yet and have no way of verifying if the device is RoCE or not.
808   *
809   * In this case, the source VM will throw an error for ALL types of
810   * connections (both IPv4 and IPv6) if the destination machine does not have
811   * a regular infiniband network available for use.
812   *
813   * The only way to guarantee that an error is thrown for broken kernels is
814   * for the management software to choose a *specific* interface at bind time
815   * and validate what time of hardware it is.
816   *
817   * Unfortunately, this puts the user in a fix:
818   *
819   *  If the source VM connects with an IPv4 address without knowing that the
820   *  destination has bound to '[::]' the migration will unconditionally fail
821   *  unless the management software is explicitly listening on the IPv4
822   *  address while using a RoCE-based device.
823   *
824   *  If the source VM connects with an IPv6 address, then we're OK because we can
825   *  throw an error on the source (and similarly on the destination).
826   *
827   *  But in mixed environments, this will be broken for a while until it is fixed
828   *  inside linux.
829   *
830   * We do provide a *tiny* bit of help in this function: We can list all of the
831   * devices in the system and check to see if all the devices are RoCE or
832   * Infiniband.
833   *
834   * If we detect that we have a *pure* RoCE environment, then we can safely
835   * thrown an error even if the management software has specified '[::]' as the
836   * bind address.
837   *
838   * However, if there is are multiple hetergeneous devices, then we cannot make
839   * this assumption and the user just has to be sure they know what they are
840   * doing.
841   *
842   * Patches are being reviewed on linux-rdma.
843   */
844  static int qemu_rdma_broken_ipv6_kernel(struct ibv_context *verbs, Error **errp)
845  {
846      /* This bug only exists in linux, to our knowledge. */
847  #ifdef CONFIG_LINUX
848      struct ibv_port_attr port_attr;
849  
850      /*
851       * Verbs are only NULL if management has bound to '[::]'.
852       *
853       * Let's iterate through all the devices and see if there any pure IB
854       * devices (non-ethernet).
855       *
856       * If not, then we can safely proceed with the migration.
857       * Otherwise, there are no guarantees until the bug is fixed in linux.
858       */
859      if (!verbs) {
860          int num_devices, x;
861          struct ibv_device **dev_list = ibv_get_device_list(&num_devices);
862          bool roce_found = false;
863          bool ib_found = false;
864  
865          for (x = 0; x < num_devices; x++) {
866              verbs = ibv_open_device(dev_list[x]);
867              if (!verbs) {
868                  if (errno == EPERM) {
869                      continue;
870                  } else {
871                      return -EINVAL;
872                  }
873              }
874  
875              if (ibv_query_port(verbs, 1, &port_attr)) {
876                  ibv_close_device(verbs);
877                  ERROR(errp, "Could not query initial IB port");
878                  return -EINVAL;
879              }
880  
881              if (port_attr.link_layer == IBV_LINK_LAYER_INFINIBAND) {
882                  ib_found = true;
883              } else if (port_attr.link_layer == IBV_LINK_LAYER_ETHERNET) {
884                  roce_found = true;
885              }
886  
887              ibv_close_device(verbs);
888  
889          }
890  
891          if (roce_found) {
892              if (ib_found) {
893                  fprintf(stderr, "WARN: migrations may fail:"
894                                  " IPv6 over RoCE / iWARP in linux"
895                                  " is broken. But since you appear to have a"
896                                  " mixed RoCE / IB environment, be sure to only"
897                                  " migrate over the IB fabric until the kernel "
898                                  " fixes the bug.\n");
899              } else {
900                  ERROR(errp, "You only have RoCE / iWARP devices in your systems"
901                              " and your management software has specified '[::]'"
902                              ", but IPv6 over RoCE / iWARP is not supported in Linux.");
903                  return -ENONET;
904              }
905          }
906  
907          return 0;
908      }
909  
910      /*
911       * If we have a verbs context, that means that some other than '[::]' was
912       * used by the management software for binding. In which case we can
913       * actually warn the user about a potentially broken kernel.
914       */
915  
916      /* IB ports start with 1, not 0 */
917      if (ibv_query_port(verbs, 1, &port_attr)) {
918          ERROR(errp, "Could not query initial IB port");
919          return -EINVAL;
920      }
921  
922      if (port_attr.link_layer == IBV_LINK_LAYER_ETHERNET) {
923          ERROR(errp, "Linux kernel's RoCE / iWARP does not support IPv6 "
924                      "(but patches on linux-rdma in progress)");
925          return -ENONET;
926      }
927  
928  #endif
929  
930      return 0;
931  }
932  
933  /*
934   * Figure out which RDMA device corresponds to the requested IP hostname
935   * Also create the initial connection manager identifiers for opening
936   * the connection.
937   */
938  static int qemu_rdma_resolve_host(RDMAContext *rdma, Error **errp)
939  {
940      int ret;
941      struct rdma_addrinfo *res;
942      char port_str[16];
943      struct rdma_cm_event *cm_event;
944      char ip[40] = "unknown";
945      struct rdma_addrinfo *e;
946  
947      if (rdma->host == NULL || !strcmp(rdma->host, "")) {
948          ERROR(errp, "RDMA hostname has not been set");
949          return -EINVAL;
950      }
951  
952      /* create CM channel */
953      rdma->channel = rdma_create_event_channel();
954      if (!rdma->channel) {
955          ERROR(errp, "could not create CM channel");
956          return -EINVAL;
957      }
958  
959      /* create CM id */
960      ret = rdma_create_id(rdma->channel, &rdma->cm_id, NULL, RDMA_PS_TCP);
961      if (ret) {
962          ERROR(errp, "could not create channel id");
963          goto err_resolve_create_id;
964      }
965  
966      snprintf(port_str, 16, "%d", rdma->port);
967      port_str[15] = '\0';
968  
969      ret = rdma_getaddrinfo(rdma->host, port_str, NULL, &res);
970      if (ret < 0) {
971          ERROR(errp, "could not rdma_getaddrinfo address %s", rdma->host);
972          goto err_resolve_get_addr;
973      }
974  
975      for (e = res; e != NULL; e = e->ai_next) {
976          inet_ntop(e->ai_family,
977              &((struct sockaddr_in *) e->ai_dst_addr)->sin_addr, ip, sizeof ip);
978          trace_qemu_rdma_resolve_host_trying(rdma->host, ip);
979  
980          ret = rdma_resolve_addr(rdma->cm_id, NULL, e->ai_dst_addr,
981                  RDMA_RESOLVE_TIMEOUT_MS);
982          if (!ret) {
983              if (e->ai_family == AF_INET6) {
984                  ret = qemu_rdma_broken_ipv6_kernel(rdma->cm_id->verbs, errp);
985                  if (ret) {
986                      continue;
987                  }
988              }
989              goto route;
990          }
991      }
992  
993      rdma_freeaddrinfo(res);
994      ERROR(errp, "could not resolve address %s", rdma->host);
995      goto err_resolve_get_addr;
996  
997  route:
998      rdma_freeaddrinfo(res);
999      qemu_rdma_dump_gid("source_resolve_addr", rdma->cm_id);
1000  
1001      ret = rdma_get_cm_event(rdma->channel, &cm_event);
1002      if (ret) {
1003          ERROR(errp, "could not perform event_addr_resolved");
1004          goto err_resolve_get_addr;
1005      }
1006  
1007      if (cm_event->event != RDMA_CM_EVENT_ADDR_RESOLVED) {
1008          ERROR(errp, "result not equal to event_addr_resolved %s",
1009                  rdma_event_str(cm_event->event));
1010          error_report("rdma_resolve_addr");
1011          rdma_ack_cm_event(cm_event);
1012          ret = -EINVAL;
1013          goto err_resolve_get_addr;
1014      }
1015      rdma_ack_cm_event(cm_event);
1016  
1017      /* resolve route */
1018      ret = rdma_resolve_route(rdma->cm_id, RDMA_RESOLVE_TIMEOUT_MS);
1019      if (ret) {
1020          ERROR(errp, "could not resolve rdma route");
1021          goto err_resolve_get_addr;
1022      }
1023  
1024      ret = rdma_get_cm_event(rdma->channel, &cm_event);
1025      if (ret) {
1026          ERROR(errp, "could not perform event_route_resolved");
1027          goto err_resolve_get_addr;
1028      }
1029      if (cm_event->event != RDMA_CM_EVENT_ROUTE_RESOLVED) {
1030          ERROR(errp, "result not equal to event_route_resolved: %s",
1031                          rdma_event_str(cm_event->event));
1032          rdma_ack_cm_event(cm_event);
1033          ret = -EINVAL;
1034          goto err_resolve_get_addr;
1035      }
1036      rdma_ack_cm_event(cm_event);
1037      rdma->verbs = rdma->cm_id->verbs;
1038      qemu_rdma_dump_id("source_resolve_host", rdma->cm_id->verbs);
1039      qemu_rdma_dump_gid("source_resolve_host", rdma->cm_id);
1040      return 0;
1041  
1042  err_resolve_get_addr:
1043      rdma_destroy_id(rdma->cm_id);
1044      rdma->cm_id = NULL;
1045  err_resolve_create_id:
1046      rdma_destroy_event_channel(rdma->channel);
1047      rdma->channel = NULL;
1048      return ret;
1049  }
1050  
1051  /*
1052   * Create protection domain and completion queues
1053   */
1054  static int qemu_rdma_alloc_pd_cq(RDMAContext *rdma)
1055  {
1056      /* allocate pd */
1057      rdma->pd = ibv_alloc_pd(rdma->verbs);
1058      if (!rdma->pd) {
1059          error_report("failed to allocate protection domain");
1060          return -1;
1061      }
1062  
1063      /* create receive completion channel */
1064      rdma->recv_comp_channel = ibv_create_comp_channel(rdma->verbs);
1065      if (!rdma->recv_comp_channel) {
1066          error_report("failed to allocate receive completion channel");
1067          goto err_alloc_pd_cq;
1068      }
1069  
1070      /*
1071       * Completion queue can be filled by read work requests.
1072       */
1073      rdma->recv_cq = ibv_create_cq(rdma->verbs, (RDMA_SIGNALED_SEND_MAX * 3),
1074                                    NULL, rdma->recv_comp_channel, 0);
1075      if (!rdma->recv_cq) {
1076          error_report("failed to allocate receive completion queue");
1077          goto err_alloc_pd_cq;
1078      }
1079  
1080      /* create send completion channel */
1081      rdma->send_comp_channel = ibv_create_comp_channel(rdma->verbs);
1082      if (!rdma->send_comp_channel) {
1083          error_report("failed to allocate send completion channel");
1084          goto err_alloc_pd_cq;
1085      }
1086  
1087      rdma->send_cq = ibv_create_cq(rdma->verbs, (RDMA_SIGNALED_SEND_MAX * 3),
1088                                    NULL, rdma->send_comp_channel, 0);
1089      if (!rdma->send_cq) {
1090          error_report("failed to allocate send completion queue");
1091          goto err_alloc_pd_cq;
1092      }
1093  
1094      return 0;
1095  
1096  err_alloc_pd_cq:
1097      if (rdma->pd) {
1098          ibv_dealloc_pd(rdma->pd);
1099      }
1100      if (rdma->recv_comp_channel) {
1101          ibv_destroy_comp_channel(rdma->recv_comp_channel);
1102      }
1103      if (rdma->send_comp_channel) {
1104          ibv_destroy_comp_channel(rdma->send_comp_channel);
1105      }
1106      if (rdma->recv_cq) {
1107          ibv_destroy_cq(rdma->recv_cq);
1108          rdma->recv_cq = NULL;
1109      }
1110      rdma->pd = NULL;
1111      rdma->recv_comp_channel = NULL;
1112      rdma->send_comp_channel = NULL;
1113      return -1;
1114  
1115  }
1116  
1117  /*
1118   * Create queue pairs.
1119   */
1120  static int qemu_rdma_alloc_qp(RDMAContext *rdma)
1121  {
1122      struct ibv_qp_init_attr attr = { 0 };
1123      int ret;
1124  
1125      attr.cap.max_send_wr = RDMA_SIGNALED_SEND_MAX;
1126      attr.cap.max_recv_wr = 3;
1127      attr.cap.max_send_sge = 1;
1128      attr.cap.max_recv_sge = 1;
1129      attr.send_cq = rdma->send_cq;
1130      attr.recv_cq = rdma->recv_cq;
1131      attr.qp_type = IBV_QPT_RC;
1132  
1133      ret = rdma_create_qp(rdma->cm_id, rdma->pd, &attr);
1134      if (ret) {
1135          return -1;
1136      }
1137  
1138      rdma->qp = rdma->cm_id->qp;
1139      return 0;
1140  }
1141  
1142  /* Check whether On-Demand Paging is supported by RDAM device */
1143  static bool rdma_support_odp(struct ibv_context *dev)
1144  {
1145      struct ibv_device_attr_ex attr = {0};
1146      int ret = ibv_query_device_ex(dev, NULL, &attr);
1147      if (ret) {
1148          return false;
1149      }
1150  
1151      if (attr.odp_caps.general_caps & IBV_ODP_SUPPORT) {
1152          return true;
1153      }
1154  
1155      return false;
1156  }
1157  
1158  /*
1159   * ibv_advise_mr to avoid RNR NAK error as far as possible.
1160   * The responder mr registering with ODP will sent RNR NAK back to
1161   * the requester in the face of the page fault.
1162   */
1163  static void qemu_rdma_advise_prefetch_mr(struct ibv_pd *pd, uint64_t addr,
1164                                           uint32_t len,  uint32_t lkey,
1165                                           const char *name, bool wr)
1166  {
1167  #ifdef HAVE_IBV_ADVISE_MR
1168      int ret;
1169      int advice = wr ? IBV_ADVISE_MR_ADVICE_PREFETCH_WRITE :
1170                   IBV_ADVISE_MR_ADVICE_PREFETCH;
1171      struct ibv_sge sg_list = {.lkey = lkey, .addr = addr, .length = len};
1172  
1173      ret = ibv_advise_mr(pd, advice,
1174                          IBV_ADVISE_MR_FLAG_FLUSH, &sg_list, 1);
1175      /* ignore the error */
1176      if (ret) {
1177          trace_qemu_rdma_advise_mr(name, len, addr, strerror(errno));
1178      } else {
1179          trace_qemu_rdma_advise_mr(name, len, addr, "successed");
1180      }
1181  #endif
1182  }
1183  
1184  static int qemu_rdma_reg_whole_ram_blocks(RDMAContext *rdma)
1185  {
1186      int i;
1187      RDMALocalBlocks *local = &rdma->local_ram_blocks;
1188  
1189      for (i = 0; i < local->nb_blocks; i++) {
1190          int access = IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE;
1191  
1192          local->block[i].mr =
1193              ibv_reg_mr(rdma->pd,
1194                      local->block[i].local_host_addr,
1195                      local->block[i].length, access
1196                      );
1197  
1198          if (!local->block[i].mr &&
1199              errno == ENOTSUP && rdma_support_odp(rdma->verbs)) {
1200                  access |= IBV_ACCESS_ON_DEMAND;
1201                  /* register ODP mr */
1202                  local->block[i].mr =
1203                      ibv_reg_mr(rdma->pd,
1204                                 local->block[i].local_host_addr,
1205                                 local->block[i].length, access);
1206                  trace_qemu_rdma_register_odp_mr(local->block[i].block_name);
1207  
1208                  if (local->block[i].mr) {
1209                      qemu_rdma_advise_prefetch_mr(rdma->pd,
1210                                      (uintptr_t)local->block[i].local_host_addr,
1211                                      local->block[i].length,
1212                                      local->block[i].mr->lkey,
1213                                      local->block[i].block_name,
1214                                      true);
1215                  }
1216          }
1217  
1218          if (!local->block[i].mr) {
1219              perror("Failed to register local dest ram block!");
1220              break;
1221          }
1222          rdma->total_registrations++;
1223      }
1224  
1225      if (i >= local->nb_blocks) {
1226          return 0;
1227      }
1228  
1229      for (i--; i >= 0; i--) {
1230          ibv_dereg_mr(local->block[i].mr);
1231          local->block[i].mr = NULL;
1232          rdma->total_registrations--;
1233      }
1234  
1235      return -1;
1236  
1237  }
1238  
1239  /*
1240   * Find the ram block that corresponds to the page requested to be
1241   * transmitted by QEMU.
1242   *
1243   * Once the block is found, also identify which 'chunk' within that
1244   * block that the page belongs to.
1245   *
1246   * This search cannot fail or the migration will fail.
1247   */
1248  static int qemu_rdma_search_ram_block(RDMAContext *rdma,
1249                                        uintptr_t block_offset,
1250                                        uint64_t offset,
1251                                        uint64_t length,
1252                                        uint64_t *block_index,
1253                                        uint64_t *chunk_index)
1254  {
1255      uint64_t current_addr = block_offset + offset;
1256      RDMALocalBlock *block = g_hash_table_lookup(rdma->blockmap,
1257                                                  (void *) block_offset);
1258      assert(block);
1259      assert(current_addr >= block->offset);
1260      assert((current_addr + length) <= (block->offset + block->length));
1261  
1262      *block_index = block->index;
1263      *chunk_index = ram_chunk_index(block->local_host_addr,
1264                  block->local_host_addr + (current_addr - block->offset));
1265  
1266      return 0;
1267  }
1268  
1269  /*
1270   * Register a chunk with IB. If the chunk was already registered
1271   * previously, then skip.
1272   *
1273   * Also return the keys associated with the registration needed
1274   * to perform the actual RDMA operation.
1275   */
1276  static int qemu_rdma_register_and_get_keys(RDMAContext *rdma,
1277          RDMALocalBlock *block, uintptr_t host_addr,
1278          uint32_t *lkey, uint32_t *rkey, int chunk,
1279          uint8_t *chunk_start, uint8_t *chunk_end)
1280  {
1281      if (block->mr) {
1282          if (lkey) {
1283              *lkey = block->mr->lkey;
1284          }
1285          if (rkey) {
1286              *rkey = block->mr->rkey;
1287          }
1288          return 0;
1289      }
1290  
1291      /* allocate memory to store chunk MRs */
1292      if (!block->pmr) {
1293          block->pmr = g_new0(struct ibv_mr *, block->nb_chunks);
1294      }
1295  
1296      /*
1297       * If 'rkey', then we're the destination, so grant access to the source.
1298       *
1299       * If 'lkey', then we're the source VM, so grant access only to ourselves.
1300       */
1301      if (!block->pmr[chunk]) {
1302          uint64_t len = chunk_end - chunk_start;
1303          int access = rkey ? IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE :
1304                       0;
1305  
1306          trace_qemu_rdma_register_and_get_keys(len, chunk_start);
1307  
1308          block->pmr[chunk] = ibv_reg_mr(rdma->pd, chunk_start, len, access);
1309          if (!block->pmr[chunk] &&
1310              errno == ENOTSUP && rdma_support_odp(rdma->verbs)) {
1311              access |= IBV_ACCESS_ON_DEMAND;
1312              /* register ODP mr */
1313              block->pmr[chunk] = ibv_reg_mr(rdma->pd, chunk_start, len, access);
1314              trace_qemu_rdma_register_odp_mr(block->block_name);
1315  
1316              if (block->pmr[chunk]) {
1317                  qemu_rdma_advise_prefetch_mr(rdma->pd, (uintptr_t)chunk_start,
1318                                              len, block->pmr[chunk]->lkey,
1319                                              block->block_name, rkey);
1320  
1321              }
1322          }
1323      }
1324      if (!block->pmr[chunk]) {
1325          perror("Failed to register chunk!");
1326          fprintf(stderr, "Chunk details: block: %d chunk index %d"
1327                          " start %" PRIuPTR " end %" PRIuPTR
1328                          " host %" PRIuPTR
1329                          " local %" PRIuPTR " registrations: %d\n",
1330                          block->index, chunk, (uintptr_t)chunk_start,
1331                          (uintptr_t)chunk_end, host_addr,
1332                          (uintptr_t)block->local_host_addr,
1333                          rdma->total_registrations);
1334          return -1;
1335      }
1336      rdma->total_registrations++;
1337  
1338      if (lkey) {
1339          *lkey = block->pmr[chunk]->lkey;
1340      }
1341      if (rkey) {
1342          *rkey = block->pmr[chunk]->rkey;
1343      }
1344      return 0;
1345  }
1346  
1347  /*
1348   * Register (at connection time) the memory used for control
1349   * channel messages.
1350   */
1351  static int qemu_rdma_reg_control(RDMAContext *rdma, int idx)
1352  {
1353      rdma->wr_data[idx].control_mr = ibv_reg_mr(rdma->pd,
1354              rdma->wr_data[idx].control, RDMA_CONTROL_MAX_BUFFER,
1355              IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE);
1356      if (rdma->wr_data[idx].control_mr) {
1357          rdma->total_registrations++;
1358          return 0;
1359      }
1360      error_report("qemu_rdma_reg_control failed");
1361      return -1;
1362  }
1363  
1364  const char *print_wrid(int wrid)
1365  {
1366      if (wrid >= RDMA_WRID_RECV_CONTROL) {
1367          return wrid_desc[RDMA_WRID_RECV_CONTROL];
1368      }
1369      return wrid_desc[wrid];
1370  }
1371  
1372  /*
1373   * Perform a non-optimized memory unregistration after every transfer
1374   * for demonstration purposes, only if pin-all is not requested.
1375   *
1376   * Potential optimizations:
1377   * 1. Start a new thread to run this function continuously
1378          - for bit clearing
1379          - and for receipt of unregister messages
1380   * 2. Use an LRU.
1381   * 3. Use workload hints.
1382   */
1383  static int qemu_rdma_unregister_waiting(RDMAContext *rdma)
1384  {
1385      while (rdma->unregistrations[rdma->unregister_current]) {
1386          int ret;
1387          uint64_t wr_id = rdma->unregistrations[rdma->unregister_current];
1388          uint64_t chunk =
1389              (wr_id & RDMA_WRID_CHUNK_MASK) >> RDMA_WRID_CHUNK_SHIFT;
1390          uint64_t index =
1391              (wr_id & RDMA_WRID_BLOCK_MASK) >> RDMA_WRID_BLOCK_SHIFT;
1392          RDMALocalBlock *block =
1393              &(rdma->local_ram_blocks.block[index]);
1394          RDMARegister reg = { .current_index = index };
1395          RDMAControlHeader resp = { .type = RDMA_CONTROL_UNREGISTER_FINISHED,
1396                                   };
1397          RDMAControlHeader head = { .len = sizeof(RDMARegister),
1398                                     .type = RDMA_CONTROL_UNREGISTER_REQUEST,
1399                                     .repeat = 1,
1400                                   };
1401  
1402          trace_qemu_rdma_unregister_waiting_proc(chunk,
1403                                                  rdma->unregister_current);
1404  
1405          rdma->unregistrations[rdma->unregister_current] = 0;
1406          rdma->unregister_current++;
1407  
1408          if (rdma->unregister_current == RDMA_SIGNALED_SEND_MAX) {
1409              rdma->unregister_current = 0;
1410          }
1411  
1412  
1413          /*
1414           * Unregistration is speculative (because migration is single-threaded
1415           * and we cannot break the protocol's inifinband message ordering).
1416           * Thus, if the memory is currently being used for transmission,
1417           * then abort the attempt to unregister and try again
1418           * later the next time a completion is received for this memory.
1419           */
1420          clear_bit(chunk, block->unregister_bitmap);
1421  
1422          if (test_bit(chunk, block->transit_bitmap)) {
1423              trace_qemu_rdma_unregister_waiting_inflight(chunk);
1424              continue;
1425          }
1426  
1427          trace_qemu_rdma_unregister_waiting_send(chunk);
1428  
1429          ret = ibv_dereg_mr(block->pmr[chunk]);
1430          block->pmr[chunk] = NULL;
1431          block->remote_keys[chunk] = 0;
1432  
1433          if (ret != 0) {
1434              perror("unregistration chunk failed");
1435              return -ret;
1436          }
1437          rdma->total_registrations--;
1438  
1439          reg.key.chunk = chunk;
1440          register_to_network(rdma, &reg);
1441          ret = qemu_rdma_exchange_send(rdma, &head, (uint8_t *) &reg,
1442                                  &resp, NULL, NULL);
1443          if (ret < 0) {
1444              return ret;
1445          }
1446  
1447          trace_qemu_rdma_unregister_waiting_complete(chunk);
1448      }
1449  
1450      return 0;
1451  }
1452  
1453  static uint64_t qemu_rdma_make_wrid(uint64_t wr_id, uint64_t index,
1454                                           uint64_t chunk)
1455  {
1456      uint64_t result = wr_id & RDMA_WRID_TYPE_MASK;
1457  
1458      result |= (index << RDMA_WRID_BLOCK_SHIFT);
1459      result |= (chunk << RDMA_WRID_CHUNK_SHIFT);
1460  
1461      return result;
1462  }
1463  
1464  /*
1465   * Consult the connection manager to see a work request
1466   * (of any kind) has completed.
1467   * Return the work request ID that completed.
1468   */
1469  static uint64_t qemu_rdma_poll(RDMAContext *rdma, struct ibv_cq *cq,
1470                                 uint64_t *wr_id_out, uint32_t *byte_len)
1471  {
1472      int ret;
1473      struct ibv_wc wc;
1474      uint64_t wr_id;
1475  
1476      ret = ibv_poll_cq(cq, 1, &wc);
1477  
1478      if (!ret) {
1479          *wr_id_out = RDMA_WRID_NONE;
1480          return 0;
1481      }
1482  
1483      if (ret < 0) {
1484          error_report("ibv_poll_cq return %d", ret);
1485          return ret;
1486      }
1487  
1488      wr_id = wc.wr_id & RDMA_WRID_TYPE_MASK;
1489  
1490      if (wc.status != IBV_WC_SUCCESS) {
1491          fprintf(stderr, "ibv_poll_cq wc.status=%d %s!\n",
1492                          wc.status, ibv_wc_status_str(wc.status));
1493          fprintf(stderr, "ibv_poll_cq wrid=%s!\n", wrid_desc[wr_id]);
1494  
1495          return -1;
1496      }
1497  
1498      if (rdma->control_ready_expected &&
1499          (wr_id >= RDMA_WRID_RECV_CONTROL)) {
1500          trace_qemu_rdma_poll_recv(wrid_desc[RDMA_WRID_RECV_CONTROL],
1501                    wr_id - RDMA_WRID_RECV_CONTROL, wr_id, rdma->nb_sent);
1502          rdma->control_ready_expected = 0;
1503      }
1504  
1505      if (wr_id == RDMA_WRID_RDMA_WRITE) {
1506          uint64_t chunk =
1507              (wc.wr_id & RDMA_WRID_CHUNK_MASK) >> RDMA_WRID_CHUNK_SHIFT;
1508          uint64_t index =
1509              (wc.wr_id & RDMA_WRID_BLOCK_MASK) >> RDMA_WRID_BLOCK_SHIFT;
1510          RDMALocalBlock *block = &(rdma->local_ram_blocks.block[index]);
1511  
1512          trace_qemu_rdma_poll_write(print_wrid(wr_id), wr_id, rdma->nb_sent,
1513                                     index, chunk, block->local_host_addr,
1514                                     (void *)(uintptr_t)block->remote_host_addr);
1515  
1516          clear_bit(chunk, block->transit_bitmap);
1517  
1518          if (rdma->nb_sent > 0) {
1519              rdma->nb_sent--;
1520          }
1521      } else {
1522          trace_qemu_rdma_poll_other(print_wrid(wr_id), wr_id, rdma->nb_sent);
1523      }
1524  
1525      *wr_id_out = wc.wr_id;
1526      if (byte_len) {
1527          *byte_len = wc.byte_len;
1528      }
1529  
1530      return  0;
1531  }
1532  
1533  /* Wait for activity on the completion channel.
1534   * Returns 0 on success, none-0 on error.
1535   */
1536  static int qemu_rdma_wait_comp_channel(RDMAContext *rdma,
1537                                         struct ibv_comp_channel *comp_channel)
1538  {
1539      struct rdma_cm_event *cm_event;
1540      int ret = -1;
1541  
1542      /*
1543       * Coroutine doesn't start until migration_fd_process_incoming()
1544       * so don't yield unless we know we're running inside of a coroutine.
1545       */
1546      if (rdma->migration_started_on_destination &&
1547          migration_incoming_get_current()->state == MIGRATION_STATUS_ACTIVE) {
1548          yield_until_fd_readable(comp_channel->fd);
1549      } else {
1550          /* This is the source side, we're in a separate thread
1551           * or destination prior to migration_fd_process_incoming()
1552           * after postcopy, the destination also in a separate thread.
1553           * we can't yield; so we have to poll the fd.
1554           * But we need to be able to handle 'cancel' or an error
1555           * without hanging forever.
1556           */
1557          while (!rdma->error_state  && !rdma->received_error) {
1558              GPollFD pfds[2];
1559              pfds[0].fd = comp_channel->fd;
1560              pfds[0].events = G_IO_IN | G_IO_HUP | G_IO_ERR;
1561              pfds[0].revents = 0;
1562  
1563              pfds[1].fd = rdma->channel->fd;
1564              pfds[1].events = G_IO_IN | G_IO_HUP | G_IO_ERR;
1565              pfds[1].revents = 0;
1566  
1567              /* 0.1s timeout, should be fine for a 'cancel' */
1568              switch (qemu_poll_ns(pfds, 2, 100 * 1000 * 1000)) {
1569              case 2:
1570              case 1: /* fd active */
1571                  if (pfds[0].revents) {
1572                      return 0;
1573                  }
1574  
1575                  if (pfds[1].revents) {
1576                      ret = rdma_get_cm_event(rdma->channel, &cm_event);
1577                      if (ret) {
1578                          error_report("failed to get cm event while wait "
1579                                       "completion channel");
1580                          return -EPIPE;
1581                      }
1582  
1583                      error_report("receive cm event while wait comp channel,"
1584                                   "cm event is %d", cm_event->event);
1585                      if (cm_event->event == RDMA_CM_EVENT_DISCONNECTED ||
1586                          cm_event->event == RDMA_CM_EVENT_DEVICE_REMOVAL) {
1587                          rdma_ack_cm_event(cm_event);
1588                          return -EPIPE;
1589                      }
1590                      rdma_ack_cm_event(cm_event);
1591                  }
1592                  break;
1593  
1594              case 0: /* Timeout, go around again */
1595                  break;
1596  
1597              default: /* Error of some type -
1598                        * I don't trust errno from qemu_poll_ns
1599                       */
1600                  error_report("%s: poll failed", __func__);
1601                  return -EPIPE;
1602              }
1603  
1604              if (migrate_get_current()->state == MIGRATION_STATUS_CANCELLING) {
1605                  /* Bail out and let the cancellation happen */
1606                  return -EPIPE;
1607              }
1608          }
1609      }
1610  
1611      if (rdma->received_error) {
1612          return -EPIPE;
1613      }
1614      return rdma->error_state;
1615  }
1616  
1617  static struct ibv_comp_channel *to_channel(RDMAContext *rdma, int wrid)
1618  {
1619      return wrid < RDMA_WRID_RECV_CONTROL ? rdma->send_comp_channel :
1620             rdma->recv_comp_channel;
1621  }
1622  
1623  static struct ibv_cq *to_cq(RDMAContext *rdma, int wrid)
1624  {
1625      return wrid < RDMA_WRID_RECV_CONTROL ? rdma->send_cq : rdma->recv_cq;
1626  }
1627  
1628  /*
1629   * Block until the next work request has completed.
1630   *
1631   * First poll to see if a work request has already completed,
1632   * otherwise block.
1633   *
1634   * If we encounter completed work requests for IDs other than
1635   * the one we're interested in, then that's generally an error.
1636   *
1637   * The only exception is actual RDMA Write completions. These
1638   * completions only need to be recorded, but do not actually
1639   * need further processing.
1640   */
1641  static int qemu_rdma_block_for_wrid(RDMAContext *rdma, int wrid_requested,
1642                                      uint32_t *byte_len)
1643  {
1644      int num_cq_events = 0, ret = 0;
1645      struct ibv_cq *cq;
1646      void *cq_ctx;
1647      uint64_t wr_id = RDMA_WRID_NONE, wr_id_in;
1648      struct ibv_comp_channel *ch = to_channel(rdma, wrid_requested);
1649      struct ibv_cq *poll_cq = to_cq(rdma, wrid_requested);
1650  
1651      if (ibv_req_notify_cq(poll_cq, 0)) {
1652          return -1;
1653      }
1654      /* poll cq first */
1655      while (wr_id != wrid_requested) {
1656          ret = qemu_rdma_poll(rdma, poll_cq, &wr_id_in, byte_len);
1657          if (ret < 0) {
1658              return ret;
1659          }
1660  
1661          wr_id = wr_id_in & RDMA_WRID_TYPE_MASK;
1662  
1663          if (wr_id == RDMA_WRID_NONE) {
1664              break;
1665          }
1666          if (wr_id != wrid_requested) {
1667              trace_qemu_rdma_block_for_wrid_miss(print_wrid(wrid_requested),
1668                         wrid_requested, print_wrid(wr_id), wr_id);
1669          }
1670      }
1671  
1672      if (wr_id == wrid_requested) {
1673          return 0;
1674      }
1675  
1676      while (1) {
1677          ret = qemu_rdma_wait_comp_channel(rdma, ch);
1678          if (ret) {
1679              goto err_block_for_wrid;
1680          }
1681  
1682          ret = ibv_get_cq_event(ch, &cq, &cq_ctx);
1683          if (ret) {
1684              perror("ibv_get_cq_event");
1685              goto err_block_for_wrid;
1686          }
1687  
1688          num_cq_events++;
1689  
1690          ret = -ibv_req_notify_cq(cq, 0);
1691          if (ret) {
1692              goto err_block_for_wrid;
1693          }
1694  
1695          while (wr_id != wrid_requested) {
1696              ret = qemu_rdma_poll(rdma, poll_cq, &wr_id_in, byte_len);
1697              if (ret < 0) {
1698                  goto err_block_for_wrid;
1699              }
1700  
1701              wr_id = wr_id_in & RDMA_WRID_TYPE_MASK;
1702  
1703              if (wr_id == RDMA_WRID_NONE) {
1704                  break;
1705              }
1706              if (wr_id != wrid_requested) {
1707                  trace_qemu_rdma_block_for_wrid_miss(print_wrid(wrid_requested),
1708                                     wrid_requested, print_wrid(wr_id), wr_id);
1709              }
1710          }
1711  
1712          if (wr_id == wrid_requested) {
1713              goto success_block_for_wrid;
1714          }
1715      }
1716  
1717  success_block_for_wrid:
1718      if (num_cq_events) {
1719          ibv_ack_cq_events(cq, num_cq_events);
1720      }
1721      return 0;
1722  
1723  err_block_for_wrid:
1724      if (num_cq_events) {
1725          ibv_ack_cq_events(cq, num_cq_events);
1726      }
1727  
1728      rdma->error_state = ret;
1729      return ret;
1730  }
1731  
1732  /*
1733   * Post a SEND message work request for the control channel
1734   * containing some data and block until the post completes.
1735   */
1736  static int qemu_rdma_post_send_control(RDMAContext *rdma, uint8_t *buf,
1737                                         RDMAControlHeader *head)
1738  {
1739      int ret = 0;
1740      RDMAWorkRequestData *wr = &rdma->wr_data[RDMA_WRID_CONTROL];
1741      struct ibv_send_wr *bad_wr;
1742      struct ibv_sge sge = {
1743                             .addr = (uintptr_t)(wr->control),
1744                             .length = head->len + sizeof(RDMAControlHeader),
1745                             .lkey = wr->control_mr->lkey,
1746                           };
1747      struct ibv_send_wr send_wr = {
1748                                     .wr_id = RDMA_WRID_SEND_CONTROL,
1749                                     .opcode = IBV_WR_SEND,
1750                                     .send_flags = IBV_SEND_SIGNALED,
1751                                     .sg_list = &sge,
1752                                     .num_sge = 1,
1753                                  };
1754  
1755      trace_qemu_rdma_post_send_control(control_desc(head->type));
1756  
1757      /*
1758       * We don't actually need to do a memcpy() in here if we used
1759       * the "sge" properly, but since we're only sending control messages
1760       * (not RAM in a performance-critical path), then its OK for now.
1761       *
1762       * The copy makes the RDMAControlHeader simpler to manipulate
1763       * for the time being.
1764       */
1765      assert(head->len <= RDMA_CONTROL_MAX_BUFFER - sizeof(*head));
1766      memcpy(wr->control, head, sizeof(RDMAControlHeader));
1767      control_to_network((void *) wr->control);
1768  
1769      if (buf) {
1770          memcpy(wr->control + sizeof(RDMAControlHeader), buf, head->len);
1771      }
1772  
1773  
1774      ret = ibv_post_send(rdma->qp, &send_wr, &bad_wr);
1775  
1776      if (ret > 0) {
1777          error_report("Failed to use post IB SEND for control");
1778          return -ret;
1779      }
1780  
1781      ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_SEND_CONTROL, NULL);
1782      if (ret < 0) {
1783          error_report("rdma migration: send polling control error");
1784      }
1785  
1786      return ret;
1787  }
1788  
1789  /*
1790   * Post a RECV work request in anticipation of some future receipt
1791   * of data on the control channel.
1792   */
1793  static int qemu_rdma_post_recv_control(RDMAContext *rdma, int idx)
1794  {
1795      struct ibv_recv_wr *bad_wr;
1796      struct ibv_sge sge = {
1797                              .addr = (uintptr_t)(rdma->wr_data[idx].control),
1798                              .length = RDMA_CONTROL_MAX_BUFFER,
1799                              .lkey = rdma->wr_data[idx].control_mr->lkey,
1800                           };
1801  
1802      struct ibv_recv_wr recv_wr = {
1803                                      .wr_id = RDMA_WRID_RECV_CONTROL + idx,
1804                                      .sg_list = &sge,
1805                                      .num_sge = 1,
1806                                   };
1807  
1808  
1809      if (ibv_post_recv(rdma->qp, &recv_wr, &bad_wr)) {
1810          return -1;
1811      }
1812  
1813      return 0;
1814  }
1815  
1816  /*
1817   * Block and wait for a RECV control channel message to arrive.
1818   */
1819  static int qemu_rdma_exchange_get_response(RDMAContext *rdma,
1820                  RDMAControlHeader *head, int expecting, int idx)
1821  {
1822      uint32_t byte_len;
1823      int ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RECV_CONTROL + idx,
1824                                         &byte_len);
1825  
1826      if (ret < 0) {
1827          error_report("rdma migration: recv polling control error!");
1828          return ret;
1829      }
1830  
1831      network_to_control((void *) rdma->wr_data[idx].control);
1832      memcpy(head, rdma->wr_data[idx].control, sizeof(RDMAControlHeader));
1833  
1834      trace_qemu_rdma_exchange_get_response_start(control_desc(expecting));
1835  
1836      if (expecting == RDMA_CONTROL_NONE) {
1837          trace_qemu_rdma_exchange_get_response_none(control_desc(head->type),
1838                                               head->type);
1839      } else if (head->type != expecting || head->type == RDMA_CONTROL_ERROR) {
1840          error_report("Was expecting a %s (%d) control message"
1841                  ", but got: %s (%d), length: %d",
1842                  control_desc(expecting), expecting,
1843                  control_desc(head->type), head->type, head->len);
1844          if (head->type == RDMA_CONTROL_ERROR) {
1845              rdma->received_error = true;
1846          }
1847          return -EIO;
1848      }
1849      if (head->len > RDMA_CONTROL_MAX_BUFFER - sizeof(*head)) {
1850          error_report("too long length: %d", head->len);
1851          return -EINVAL;
1852      }
1853      if (sizeof(*head) + head->len != byte_len) {
1854          error_report("Malformed length: %d byte_len %d", head->len, byte_len);
1855          return -EINVAL;
1856      }
1857  
1858      return 0;
1859  }
1860  
1861  /*
1862   * When a RECV work request has completed, the work request's
1863   * buffer is pointed at the header.
1864   *
1865   * This will advance the pointer to the data portion
1866   * of the control message of the work request's buffer that
1867   * was populated after the work request finished.
1868   */
1869  static void qemu_rdma_move_header(RDMAContext *rdma, int idx,
1870                                    RDMAControlHeader *head)
1871  {
1872      rdma->wr_data[idx].control_len = head->len;
1873      rdma->wr_data[idx].control_curr =
1874          rdma->wr_data[idx].control + sizeof(RDMAControlHeader);
1875  }
1876  
1877  /*
1878   * This is an 'atomic' high-level operation to deliver a single, unified
1879   * control-channel message.
1880   *
1881   * Additionally, if the user is expecting some kind of reply to this message,
1882   * they can request a 'resp' response message be filled in by posting an
1883   * additional work request on behalf of the user and waiting for an additional
1884   * completion.
1885   *
1886   * The extra (optional) response is used during registration to us from having
1887   * to perform an *additional* exchange of message just to provide a response by
1888   * instead piggy-backing on the acknowledgement.
1889   */
1890  static int qemu_rdma_exchange_send(RDMAContext *rdma, RDMAControlHeader *head,
1891                                     uint8_t *data, RDMAControlHeader *resp,
1892                                     int *resp_idx,
1893                                     int (*callback)(RDMAContext *rdma))
1894  {
1895      int ret = 0;
1896  
1897      /*
1898       * Wait until the dest is ready before attempting to deliver the message
1899       * by waiting for a READY message.
1900       */
1901      if (rdma->control_ready_expected) {
1902          RDMAControlHeader resp;
1903          ret = qemu_rdma_exchange_get_response(rdma,
1904                                      &resp, RDMA_CONTROL_READY, RDMA_WRID_READY);
1905          if (ret < 0) {
1906              return ret;
1907          }
1908      }
1909  
1910      /*
1911       * If the user is expecting a response, post a WR in anticipation of it.
1912       */
1913      if (resp) {
1914          ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_DATA);
1915          if (ret) {
1916              error_report("rdma migration: error posting"
1917                      " extra control recv for anticipated result!");
1918              return ret;
1919          }
1920      }
1921  
1922      /*
1923       * Post a WR to replace the one we just consumed for the READY message.
1924       */
1925      ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY);
1926      if (ret) {
1927          error_report("rdma migration: error posting first control recv!");
1928          return ret;
1929      }
1930  
1931      /*
1932       * Deliver the control message that was requested.
1933       */
1934      ret = qemu_rdma_post_send_control(rdma, data, head);
1935  
1936      if (ret < 0) {
1937          error_report("Failed to send control buffer!");
1938          return ret;
1939      }
1940  
1941      /*
1942       * If we're expecting a response, block and wait for it.
1943       */
1944      if (resp) {
1945          if (callback) {
1946              trace_qemu_rdma_exchange_send_issue_callback();
1947              ret = callback(rdma);
1948              if (ret < 0) {
1949                  return ret;
1950              }
1951          }
1952  
1953          trace_qemu_rdma_exchange_send_waiting(control_desc(resp->type));
1954          ret = qemu_rdma_exchange_get_response(rdma, resp,
1955                                                resp->type, RDMA_WRID_DATA);
1956  
1957          if (ret < 0) {
1958              return ret;
1959          }
1960  
1961          qemu_rdma_move_header(rdma, RDMA_WRID_DATA, resp);
1962          if (resp_idx) {
1963              *resp_idx = RDMA_WRID_DATA;
1964          }
1965          trace_qemu_rdma_exchange_send_received(control_desc(resp->type));
1966      }
1967  
1968      rdma->control_ready_expected = 1;
1969  
1970      return 0;
1971  }
1972  
1973  /*
1974   * This is an 'atomic' high-level operation to receive a single, unified
1975   * control-channel message.
1976   */
1977  static int qemu_rdma_exchange_recv(RDMAContext *rdma, RDMAControlHeader *head,
1978                                  int expecting)
1979  {
1980      RDMAControlHeader ready = {
1981                                  .len = 0,
1982                                  .type = RDMA_CONTROL_READY,
1983                                  .repeat = 1,
1984                                };
1985      int ret;
1986  
1987      /*
1988       * Inform the source that we're ready to receive a message.
1989       */
1990      ret = qemu_rdma_post_send_control(rdma, NULL, &ready);
1991  
1992      if (ret < 0) {
1993          error_report("Failed to send control buffer!");
1994          return ret;
1995      }
1996  
1997      /*
1998       * Block and wait for the message.
1999       */
2000      ret = qemu_rdma_exchange_get_response(rdma, head,
2001                                            expecting, RDMA_WRID_READY);
2002  
2003      if (ret < 0) {
2004          return ret;
2005      }
2006  
2007      qemu_rdma_move_header(rdma, RDMA_WRID_READY, head);
2008  
2009      /*
2010       * Post a new RECV work request to replace the one we just consumed.
2011       */
2012      ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY);
2013      if (ret) {
2014          error_report("rdma migration: error posting second control recv!");
2015          return ret;
2016      }
2017  
2018      return 0;
2019  }
2020  
2021  /*
2022   * Write an actual chunk of memory using RDMA.
2023   *
2024   * If we're using dynamic registration on the dest-side, we have to
2025   * send a registration command first.
2026   */
2027  static int qemu_rdma_write_one(QEMUFile *f, RDMAContext *rdma,
2028                                 int current_index, uint64_t current_addr,
2029                                 uint64_t length)
2030  {
2031      struct ibv_sge sge;
2032      struct ibv_send_wr send_wr = { 0 };
2033      struct ibv_send_wr *bad_wr;
2034      int reg_result_idx, ret, count = 0;
2035      uint64_t chunk, chunks;
2036      uint8_t *chunk_start, *chunk_end;
2037      RDMALocalBlock *block = &(rdma->local_ram_blocks.block[current_index]);
2038      RDMARegister reg;
2039      RDMARegisterResult *reg_result;
2040      RDMAControlHeader resp = { .type = RDMA_CONTROL_REGISTER_RESULT };
2041      RDMAControlHeader head = { .len = sizeof(RDMARegister),
2042                                 .type = RDMA_CONTROL_REGISTER_REQUEST,
2043                                 .repeat = 1,
2044                               };
2045  
2046  retry:
2047      sge.addr = (uintptr_t)(block->local_host_addr +
2048                              (current_addr - block->offset));
2049      sge.length = length;
2050  
2051      chunk = ram_chunk_index(block->local_host_addr,
2052                              (uint8_t *)(uintptr_t)sge.addr);
2053      chunk_start = ram_chunk_start(block, chunk);
2054  
2055      if (block->is_ram_block) {
2056          chunks = length / (1UL << RDMA_REG_CHUNK_SHIFT);
2057  
2058          if (chunks && ((length % (1UL << RDMA_REG_CHUNK_SHIFT)) == 0)) {
2059              chunks--;
2060          }
2061      } else {
2062          chunks = block->length / (1UL << RDMA_REG_CHUNK_SHIFT);
2063  
2064          if (chunks && ((block->length % (1UL << RDMA_REG_CHUNK_SHIFT)) == 0)) {
2065              chunks--;
2066          }
2067      }
2068  
2069      trace_qemu_rdma_write_one_top(chunks + 1,
2070                                    (chunks + 1) *
2071                                    (1UL << RDMA_REG_CHUNK_SHIFT) / 1024 / 1024);
2072  
2073      chunk_end = ram_chunk_end(block, chunk + chunks);
2074  
2075  
2076      while (test_bit(chunk, block->transit_bitmap)) {
2077          (void)count;
2078          trace_qemu_rdma_write_one_block(count++, current_index, chunk,
2079                  sge.addr, length, rdma->nb_sent, block->nb_chunks);
2080  
2081          ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE, NULL);
2082  
2083          if (ret < 0) {
2084              error_report("Failed to Wait for previous write to complete "
2085                      "block %d chunk %" PRIu64
2086                      " current %" PRIu64 " len %" PRIu64 " %d",
2087                      current_index, chunk, sge.addr, length, rdma->nb_sent);
2088              return ret;
2089          }
2090      }
2091  
2092      if (!rdma->pin_all || !block->is_ram_block) {
2093          if (!block->remote_keys[chunk]) {
2094              /*
2095               * This chunk has not yet been registered, so first check to see
2096               * if the entire chunk is zero. If so, tell the other size to
2097               * memset() + madvise() the entire chunk without RDMA.
2098               */
2099  
2100              if (buffer_is_zero((void *)(uintptr_t)sge.addr, length)) {
2101                  RDMACompress comp = {
2102                                          .offset = current_addr,
2103                                          .value = 0,
2104                                          .block_idx = current_index,
2105                                          .length = length,
2106                                      };
2107  
2108                  head.len = sizeof(comp);
2109                  head.type = RDMA_CONTROL_COMPRESS;
2110  
2111                  trace_qemu_rdma_write_one_zero(chunk, sge.length,
2112                                                 current_index, current_addr);
2113  
2114                  compress_to_network(rdma, &comp);
2115                  ret = qemu_rdma_exchange_send(rdma, &head,
2116                                  (uint8_t *) &comp, NULL, NULL, NULL);
2117  
2118                  if (ret < 0) {
2119                      return -EIO;
2120                  }
2121  
2122                  acct_update_position(f, sge.length, true);
2123  
2124                  return 1;
2125              }
2126  
2127              /*
2128               * Otherwise, tell other side to register.
2129               */
2130              reg.current_index = current_index;
2131              if (block->is_ram_block) {
2132                  reg.key.current_addr = current_addr;
2133              } else {
2134                  reg.key.chunk = chunk;
2135              }
2136              reg.chunks = chunks;
2137  
2138              trace_qemu_rdma_write_one_sendreg(chunk, sge.length, current_index,
2139                                                current_addr);
2140  
2141              register_to_network(rdma, &reg);
2142              ret = qemu_rdma_exchange_send(rdma, &head, (uint8_t *) &reg,
2143                                      &resp, &reg_result_idx, NULL);
2144              if (ret < 0) {
2145                  return ret;
2146              }
2147  
2148              /* try to overlap this single registration with the one we sent. */
2149              if (qemu_rdma_register_and_get_keys(rdma, block, sge.addr,
2150                                                  &sge.lkey, NULL, chunk,
2151                                                  chunk_start, chunk_end)) {
2152                  error_report("cannot get lkey");
2153                  return -EINVAL;
2154              }
2155  
2156              reg_result = (RDMARegisterResult *)
2157                      rdma->wr_data[reg_result_idx].control_curr;
2158  
2159              network_to_result(reg_result);
2160  
2161              trace_qemu_rdma_write_one_recvregres(block->remote_keys[chunk],
2162                                                   reg_result->rkey, chunk);
2163  
2164              block->remote_keys[chunk] = reg_result->rkey;
2165              block->remote_host_addr = reg_result->host_addr;
2166          } else {
2167              /* already registered before */
2168              if (qemu_rdma_register_and_get_keys(rdma, block, sge.addr,
2169                                                  &sge.lkey, NULL, chunk,
2170                                                  chunk_start, chunk_end)) {
2171                  error_report("cannot get lkey!");
2172                  return -EINVAL;
2173              }
2174          }
2175  
2176          send_wr.wr.rdma.rkey = block->remote_keys[chunk];
2177      } else {
2178          send_wr.wr.rdma.rkey = block->remote_rkey;
2179  
2180          if (qemu_rdma_register_and_get_keys(rdma, block, sge.addr,
2181                                                       &sge.lkey, NULL, chunk,
2182                                                       chunk_start, chunk_end)) {
2183              error_report("cannot get lkey!");
2184              return -EINVAL;
2185          }
2186      }
2187  
2188      /*
2189       * Encode the ram block index and chunk within this wrid.
2190       * We will use this information at the time of completion
2191       * to figure out which bitmap to check against and then which
2192       * chunk in the bitmap to look for.
2193       */
2194      send_wr.wr_id = qemu_rdma_make_wrid(RDMA_WRID_RDMA_WRITE,
2195                                          current_index, chunk);
2196  
2197      send_wr.opcode = IBV_WR_RDMA_WRITE;
2198      send_wr.send_flags = IBV_SEND_SIGNALED;
2199      send_wr.sg_list = &sge;
2200      send_wr.num_sge = 1;
2201      send_wr.wr.rdma.remote_addr = block->remote_host_addr +
2202                                  (current_addr - block->offset);
2203  
2204      trace_qemu_rdma_write_one_post(chunk, sge.addr, send_wr.wr.rdma.remote_addr,
2205                                     sge.length);
2206  
2207      /*
2208       * ibv_post_send() does not return negative error numbers,
2209       * per the specification they are positive - no idea why.
2210       */
2211      ret = ibv_post_send(rdma->qp, &send_wr, &bad_wr);
2212  
2213      if (ret == ENOMEM) {
2214          trace_qemu_rdma_write_one_queue_full();
2215          ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE, NULL);
2216          if (ret < 0) {
2217              error_report("rdma migration: failed to make "
2218                           "room in full send queue! %d", ret);
2219              return ret;
2220          }
2221  
2222          goto retry;
2223  
2224      } else if (ret > 0) {
2225          perror("rdma migration: post rdma write failed");
2226          return -ret;
2227      }
2228  
2229      set_bit(chunk, block->transit_bitmap);
2230      acct_update_position(f, sge.length, false);
2231      rdma->total_writes++;
2232  
2233      return 0;
2234  }
2235  
2236  /*
2237   * Push out any unwritten RDMA operations.
2238   *
2239   * We support sending out multiple chunks at the same time.
2240   * Not all of them need to get signaled in the completion queue.
2241   */
2242  static int qemu_rdma_write_flush(QEMUFile *f, RDMAContext *rdma)
2243  {
2244      int ret;
2245  
2246      if (!rdma->current_length) {
2247          return 0;
2248      }
2249  
2250      ret = qemu_rdma_write_one(f, rdma,
2251              rdma->current_index, rdma->current_addr, rdma->current_length);
2252  
2253      if (ret < 0) {
2254          return ret;
2255      }
2256  
2257      if (ret == 0) {
2258          rdma->nb_sent++;
2259          trace_qemu_rdma_write_flush(rdma->nb_sent);
2260      }
2261  
2262      rdma->current_length = 0;
2263      rdma->current_addr = 0;
2264  
2265      return 0;
2266  }
2267  
2268  static inline int qemu_rdma_buffer_mergable(RDMAContext *rdma,
2269                      uint64_t offset, uint64_t len)
2270  {
2271      RDMALocalBlock *block;
2272      uint8_t *host_addr;
2273      uint8_t *chunk_end;
2274  
2275      if (rdma->current_index < 0) {
2276          return 0;
2277      }
2278  
2279      if (rdma->current_chunk < 0) {
2280          return 0;
2281      }
2282  
2283      block = &(rdma->local_ram_blocks.block[rdma->current_index]);
2284      host_addr = block->local_host_addr + (offset - block->offset);
2285      chunk_end = ram_chunk_end(block, rdma->current_chunk);
2286  
2287      if (rdma->current_length == 0) {
2288          return 0;
2289      }
2290  
2291      /*
2292       * Only merge into chunk sequentially.
2293       */
2294      if (offset != (rdma->current_addr + rdma->current_length)) {
2295          return 0;
2296      }
2297  
2298      if (offset < block->offset) {
2299          return 0;
2300      }
2301  
2302      if ((offset + len) > (block->offset + block->length)) {
2303          return 0;
2304      }
2305  
2306      if ((host_addr + len) > chunk_end) {
2307          return 0;
2308      }
2309  
2310      return 1;
2311  }
2312  
2313  /*
2314   * We're not actually writing here, but doing three things:
2315   *
2316   * 1. Identify the chunk the buffer belongs to.
2317   * 2. If the chunk is full or the buffer doesn't belong to the current
2318   *    chunk, then start a new chunk and flush() the old chunk.
2319   * 3. To keep the hardware busy, we also group chunks into batches
2320   *    and only require that a batch gets acknowledged in the completion
2321   *    queue instead of each individual chunk.
2322   */
2323  static int qemu_rdma_write(QEMUFile *f, RDMAContext *rdma,
2324                             uint64_t block_offset, uint64_t offset,
2325                             uint64_t len)
2326  {
2327      uint64_t current_addr = block_offset + offset;
2328      uint64_t index = rdma->current_index;
2329      uint64_t chunk = rdma->current_chunk;
2330      int ret;
2331  
2332      /* If we cannot merge it, we flush the current buffer first. */
2333      if (!qemu_rdma_buffer_mergable(rdma, current_addr, len)) {
2334          ret = qemu_rdma_write_flush(f, rdma);
2335          if (ret) {
2336              return ret;
2337          }
2338          rdma->current_length = 0;
2339          rdma->current_addr = current_addr;
2340  
2341          ret = qemu_rdma_search_ram_block(rdma, block_offset,
2342                                           offset, len, &index, &chunk);
2343          if (ret) {
2344              error_report("ram block search failed");
2345              return ret;
2346          }
2347          rdma->current_index = index;
2348          rdma->current_chunk = chunk;
2349      }
2350  
2351      /* merge it */
2352      rdma->current_length += len;
2353  
2354      /* flush it if buffer is too large */
2355      if (rdma->current_length >= RDMA_MERGE_MAX) {
2356          return qemu_rdma_write_flush(f, rdma);
2357      }
2358  
2359      return 0;
2360  }
2361  
2362  static void qemu_rdma_cleanup(RDMAContext *rdma)
2363  {
2364      int idx;
2365  
2366      if (rdma->cm_id && rdma->connected) {
2367          if ((rdma->error_state ||
2368               migrate_get_current()->state == MIGRATION_STATUS_CANCELLING) &&
2369              !rdma->received_error) {
2370              RDMAControlHeader head = { .len = 0,
2371                                         .type = RDMA_CONTROL_ERROR,
2372                                         .repeat = 1,
2373                                       };
2374              error_report("Early error. Sending error.");
2375              qemu_rdma_post_send_control(rdma, NULL, &head);
2376          }
2377  
2378          rdma_disconnect(rdma->cm_id);
2379          trace_qemu_rdma_cleanup_disconnect();
2380          rdma->connected = false;
2381      }
2382  
2383      if (rdma->channel) {
2384          qemu_set_fd_handler(rdma->channel->fd, NULL, NULL, NULL);
2385      }
2386      g_free(rdma->dest_blocks);
2387      rdma->dest_blocks = NULL;
2388  
2389      for (idx = 0; idx < RDMA_WRID_MAX; idx++) {
2390          if (rdma->wr_data[idx].control_mr) {
2391              rdma->total_registrations--;
2392              ibv_dereg_mr(rdma->wr_data[idx].control_mr);
2393          }
2394          rdma->wr_data[idx].control_mr = NULL;
2395      }
2396  
2397      if (rdma->local_ram_blocks.block) {
2398          while (rdma->local_ram_blocks.nb_blocks) {
2399              rdma_delete_block(rdma, &rdma->local_ram_blocks.block[0]);
2400          }
2401      }
2402  
2403      if (rdma->qp) {
2404          rdma_destroy_qp(rdma->cm_id);
2405          rdma->qp = NULL;
2406      }
2407      if (rdma->recv_cq) {
2408          ibv_destroy_cq(rdma->recv_cq);
2409          rdma->recv_cq = NULL;
2410      }
2411      if (rdma->send_cq) {
2412          ibv_destroy_cq(rdma->send_cq);
2413          rdma->send_cq = NULL;
2414      }
2415      if (rdma->recv_comp_channel) {
2416          ibv_destroy_comp_channel(rdma->recv_comp_channel);
2417          rdma->recv_comp_channel = NULL;
2418      }
2419      if (rdma->send_comp_channel) {
2420          ibv_destroy_comp_channel(rdma->send_comp_channel);
2421          rdma->send_comp_channel = NULL;
2422      }
2423      if (rdma->pd) {
2424          ibv_dealloc_pd(rdma->pd);
2425          rdma->pd = NULL;
2426      }
2427      if (rdma->cm_id) {
2428          rdma_destroy_id(rdma->cm_id);
2429          rdma->cm_id = NULL;
2430      }
2431  
2432      /* the destination side, listen_id and channel is shared */
2433      if (rdma->listen_id) {
2434          if (!rdma->is_return_path) {
2435              rdma_destroy_id(rdma->listen_id);
2436          }
2437          rdma->listen_id = NULL;
2438  
2439          if (rdma->channel) {
2440              if (!rdma->is_return_path) {
2441                  rdma_destroy_event_channel(rdma->channel);
2442              }
2443              rdma->channel = NULL;
2444          }
2445      }
2446  
2447      if (rdma->channel) {
2448          rdma_destroy_event_channel(rdma->channel);
2449          rdma->channel = NULL;
2450      }
2451      g_free(rdma->host);
2452      g_free(rdma->host_port);
2453      rdma->host = NULL;
2454      rdma->host_port = NULL;
2455  }
2456  
2457  
2458  static int qemu_rdma_source_init(RDMAContext *rdma, bool pin_all, Error **errp)
2459  {
2460      int ret, idx;
2461      Error *local_err = NULL, **temp = &local_err;
2462  
2463      /*
2464       * Will be validated against destination's actual capabilities
2465       * after the connect() completes.
2466       */
2467      rdma->pin_all = pin_all;
2468  
2469      ret = qemu_rdma_resolve_host(rdma, temp);
2470      if (ret) {
2471          goto err_rdma_source_init;
2472      }
2473  
2474      ret = qemu_rdma_alloc_pd_cq(rdma);
2475      if (ret) {
2476          ERROR(temp, "rdma migration: error allocating pd and cq! Your mlock()"
2477                      " limits may be too low. Please check $ ulimit -a # and "
2478                      "search for 'ulimit -l' in the output");
2479          goto err_rdma_source_init;
2480      }
2481  
2482      ret = qemu_rdma_alloc_qp(rdma);
2483      if (ret) {
2484          ERROR(temp, "rdma migration: error allocating qp!");
2485          goto err_rdma_source_init;
2486      }
2487  
2488      ret = qemu_rdma_init_ram_blocks(rdma);
2489      if (ret) {
2490          ERROR(temp, "rdma migration: error initializing ram blocks!");
2491          goto err_rdma_source_init;
2492      }
2493  
2494      /* Build the hash that maps from offset to RAMBlock */
2495      rdma->blockmap = g_hash_table_new(g_direct_hash, g_direct_equal);
2496      for (idx = 0; idx < rdma->local_ram_blocks.nb_blocks; idx++) {
2497          g_hash_table_insert(rdma->blockmap,
2498                  (void *)(uintptr_t)rdma->local_ram_blocks.block[idx].offset,
2499                  &rdma->local_ram_blocks.block[idx]);
2500      }
2501  
2502      for (idx = 0; idx < RDMA_WRID_MAX; idx++) {
2503          ret = qemu_rdma_reg_control(rdma, idx);
2504          if (ret) {
2505              ERROR(temp, "rdma migration: error registering %d control!",
2506                                                              idx);
2507              goto err_rdma_source_init;
2508          }
2509      }
2510  
2511      return 0;
2512  
2513  err_rdma_source_init:
2514      error_propagate(errp, local_err);
2515      qemu_rdma_cleanup(rdma);
2516      return -1;
2517  }
2518  
2519  static int qemu_get_cm_event_timeout(RDMAContext *rdma,
2520                                       struct rdma_cm_event **cm_event,
2521                                       long msec, Error **errp)
2522  {
2523      int ret;
2524      struct pollfd poll_fd = {
2525                                  .fd = rdma->channel->fd,
2526                                  .events = POLLIN,
2527                                  .revents = 0
2528                              };
2529  
2530      do {
2531          ret = poll(&poll_fd, 1, msec);
2532      } while (ret < 0 && errno == EINTR);
2533  
2534      if (ret == 0) {
2535          ERROR(errp, "poll cm event timeout");
2536          return -1;
2537      } else if (ret < 0) {
2538          ERROR(errp, "failed to poll cm event, errno=%i", errno);
2539          return -1;
2540      } else if (poll_fd.revents & POLLIN) {
2541          return rdma_get_cm_event(rdma->channel, cm_event);
2542      } else {
2543          ERROR(errp, "no POLLIN event, revent=%x", poll_fd.revents);
2544          return -1;
2545      }
2546  }
2547  
2548  static int qemu_rdma_connect(RDMAContext *rdma, Error **errp, bool return_path)
2549  {
2550      RDMACapabilities cap = {
2551                                  .version = RDMA_CONTROL_VERSION_CURRENT,
2552                                  .flags = 0,
2553                             };
2554      struct rdma_conn_param conn_param = { .initiator_depth = 2,
2555                                            .retry_count = 5,
2556                                            .private_data = &cap,
2557                                            .private_data_len = sizeof(cap),
2558                                          };
2559      struct rdma_cm_event *cm_event;
2560      int ret;
2561  
2562      /*
2563       * Only negotiate the capability with destination if the user
2564       * on the source first requested the capability.
2565       */
2566      if (rdma->pin_all) {
2567          trace_qemu_rdma_connect_pin_all_requested();
2568          cap.flags |= RDMA_CAPABILITY_PIN_ALL;
2569      }
2570  
2571      caps_to_network(&cap);
2572  
2573      ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY);
2574      if (ret) {
2575          ERROR(errp, "posting second control recv");
2576          goto err_rdma_source_connect;
2577      }
2578  
2579      ret = rdma_connect(rdma->cm_id, &conn_param);
2580      if (ret) {
2581          perror("rdma_connect");
2582          ERROR(errp, "connecting to destination!");
2583          goto err_rdma_source_connect;
2584      }
2585  
2586      if (return_path) {
2587          ret = qemu_get_cm_event_timeout(rdma, &cm_event, 5000, errp);
2588      } else {
2589          ret = rdma_get_cm_event(rdma->channel, &cm_event);
2590      }
2591      if (ret) {
2592          perror("rdma_get_cm_event after rdma_connect");
2593          ERROR(errp, "connecting to destination!");
2594          goto err_rdma_source_connect;
2595      }
2596  
2597      if (cm_event->event != RDMA_CM_EVENT_ESTABLISHED) {
2598          error_report("rdma_get_cm_event != EVENT_ESTABLISHED after rdma_connect");
2599          ERROR(errp, "connecting to destination!");
2600          rdma_ack_cm_event(cm_event);
2601          goto err_rdma_source_connect;
2602      }
2603      rdma->connected = true;
2604  
2605      memcpy(&cap, cm_event->param.conn.private_data, sizeof(cap));
2606      network_to_caps(&cap);
2607  
2608      /*
2609       * Verify that the *requested* capabilities are supported by the destination
2610       * and disable them otherwise.
2611       */
2612      if (rdma->pin_all && !(cap.flags & RDMA_CAPABILITY_PIN_ALL)) {
2613          ERROR(errp, "Server cannot support pinning all memory. "
2614                          "Will register memory dynamically.");
2615          rdma->pin_all = false;
2616      }
2617  
2618      trace_qemu_rdma_connect_pin_all_outcome(rdma->pin_all);
2619  
2620      rdma_ack_cm_event(cm_event);
2621  
2622      rdma->control_ready_expected = 1;
2623      rdma->nb_sent = 0;
2624      return 0;
2625  
2626  err_rdma_source_connect:
2627      qemu_rdma_cleanup(rdma);
2628      return -1;
2629  }
2630  
2631  static int qemu_rdma_dest_init(RDMAContext *rdma, Error **errp)
2632  {
2633      int ret, idx;
2634      struct rdma_cm_id *listen_id;
2635      char ip[40] = "unknown";
2636      struct rdma_addrinfo *res, *e;
2637      char port_str[16];
2638      int reuse = 1;
2639  
2640      for (idx = 0; idx < RDMA_WRID_MAX; idx++) {
2641          rdma->wr_data[idx].control_len = 0;
2642          rdma->wr_data[idx].control_curr = NULL;
2643      }
2644  
2645      if (!rdma->host || !rdma->host[0]) {
2646          ERROR(errp, "RDMA host is not set!");
2647          rdma->error_state = -EINVAL;
2648          return -1;
2649      }
2650      /* create CM channel */
2651      rdma->channel = rdma_create_event_channel();
2652      if (!rdma->channel) {
2653          ERROR(errp, "could not create rdma event channel");
2654          rdma->error_state = -EINVAL;
2655          return -1;
2656      }
2657  
2658      /* create CM id */
2659      ret = rdma_create_id(rdma->channel, &listen_id, NULL, RDMA_PS_TCP);
2660      if (ret) {
2661          ERROR(errp, "could not create cm_id!");
2662          goto err_dest_init_create_listen_id;
2663      }
2664  
2665      snprintf(port_str, 16, "%d", rdma->port);
2666      port_str[15] = '\0';
2667  
2668      ret = rdma_getaddrinfo(rdma->host, port_str, NULL, &res);
2669      if (ret < 0) {
2670          ERROR(errp, "could not rdma_getaddrinfo address %s", rdma->host);
2671          goto err_dest_init_bind_addr;
2672      }
2673  
2674      ret = rdma_set_option(listen_id, RDMA_OPTION_ID, RDMA_OPTION_ID_REUSEADDR,
2675                            &reuse, sizeof reuse);
2676      if (ret) {
2677          ERROR(errp, "Error: could not set REUSEADDR option");
2678          goto err_dest_init_bind_addr;
2679      }
2680      for (e = res; e != NULL; e = e->ai_next) {
2681          inet_ntop(e->ai_family,
2682              &((struct sockaddr_in *) e->ai_dst_addr)->sin_addr, ip, sizeof ip);
2683          trace_qemu_rdma_dest_init_trying(rdma->host, ip);
2684          ret = rdma_bind_addr(listen_id, e->ai_dst_addr);
2685          if (ret) {
2686              continue;
2687          }
2688          if (e->ai_family == AF_INET6) {
2689              ret = qemu_rdma_broken_ipv6_kernel(listen_id->verbs, errp);
2690              if (ret) {
2691                  continue;
2692              }
2693          }
2694          break;
2695      }
2696  
2697      rdma_freeaddrinfo(res);
2698      if (!e) {
2699          ERROR(errp, "Error: could not rdma_bind_addr!");
2700          goto err_dest_init_bind_addr;
2701      }
2702  
2703      rdma->listen_id = listen_id;
2704      qemu_rdma_dump_gid("dest_init", listen_id);
2705      return 0;
2706  
2707  err_dest_init_bind_addr:
2708      rdma_destroy_id(listen_id);
2709  err_dest_init_create_listen_id:
2710      rdma_destroy_event_channel(rdma->channel);
2711      rdma->channel = NULL;
2712      rdma->error_state = ret;
2713      return ret;
2714  
2715  }
2716  
2717  static void qemu_rdma_return_path_dest_init(RDMAContext *rdma_return_path,
2718                                              RDMAContext *rdma)
2719  {
2720      int idx;
2721  
2722      for (idx = 0; idx < RDMA_WRID_MAX; idx++) {
2723          rdma_return_path->wr_data[idx].control_len = 0;
2724          rdma_return_path->wr_data[idx].control_curr = NULL;
2725      }
2726  
2727      /*the CM channel and CM id is shared*/
2728      rdma_return_path->channel = rdma->channel;
2729      rdma_return_path->listen_id = rdma->listen_id;
2730  
2731      rdma->return_path = rdma_return_path;
2732      rdma_return_path->return_path = rdma;
2733      rdma_return_path->is_return_path = true;
2734  }
2735  
2736  static void *qemu_rdma_data_init(const char *host_port, Error **errp)
2737  {
2738      RDMAContext *rdma = NULL;
2739      InetSocketAddress *addr;
2740  
2741      if (host_port) {
2742          rdma = g_new0(RDMAContext, 1);
2743          rdma->current_index = -1;
2744          rdma->current_chunk = -1;
2745  
2746          addr = g_new(InetSocketAddress, 1);
2747          if (!inet_parse(addr, host_port, NULL)) {
2748              rdma->port = atoi(addr->port);
2749              rdma->host = g_strdup(addr->host);
2750              rdma->host_port = g_strdup(host_port);
2751          } else {
2752              ERROR(errp, "bad RDMA migration address '%s'", host_port);
2753              g_free(rdma);
2754              rdma = NULL;
2755          }
2756  
2757          qapi_free_InetSocketAddress(addr);
2758      }
2759  
2760      return rdma;
2761  }
2762  
2763  /*
2764   * QEMUFile interface to the control channel.
2765   * SEND messages for control only.
2766   * VM's ram is handled with regular RDMA messages.
2767   */
2768  static ssize_t qio_channel_rdma_writev(QIOChannel *ioc,
2769                                         const struct iovec *iov,
2770                                         size_t niov,
2771                                         int *fds,
2772                                         size_t nfds,
2773                                         int flags,
2774                                         Error **errp)
2775  {
2776      QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
2777      QEMUFile *f = rioc->file;
2778      RDMAContext *rdma;
2779      int ret;
2780      ssize_t done = 0;
2781      size_t i;
2782      size_t len = 0;
2783  
2784      RCU_READ_LOCK_GUARD();
2785      rdma = qatomic_rcu_read(&rioc->rdmaout);
2786  
2787      if (!rdma) {
2788          error_setg(errp, "RDMA control channel output is not set");
2789          return -1;
2790      }
2791  
2792      CHECK_ERROR_STATE();
2793  
2794      /*
2795       * Push out any writes that
2796       * we're queued up for VM's ram.
2797       */
2798      ret = qemu_rdma_write_flush(f, rdma);
2799      if (ret < 0) {
2800          rdma->error_state = ret;
2801          error_setg(errp, "qemu_rdma_write_flush returned %d", ret);
2802          return -1;
2803      }
2804  
2805      for (i = 0; i < niov; i++) {
2806          size_t remaining = iov[i].iov_len;
2807          uint8_t * data = (void *)iov[i].iov_base;
2808          while (remaining) {
2809              RDMAControlHeader head;
2810  
2811              len = MIN(remaining, RDMA_SEND_INCREMENT);
2812              remaining -= len;
2813  
2814              head.len = len;
2815              head.type = RDMA_CONTROL_QEMU_FILE;
2816  
2817              ret = qemu_rdma_exchange_send(rdma, &head, data, NULL, NULL, NULL);
2818  
2819              if (ret < 0) {
2820                  rdma->error_state = ret;
2821                  error_setg(errp, "qemu_rdma_exchange_send returned %d", ret);
2822                  return -1;
2823              }
2824  
2825              data += len;
2826              done += len;
2827          }
2828      }
2829  
2830      return done;
2831  }
2832  
2833  static size_t qemu_rdma_fill(RDMAContext *rdma, uint8_t *buf,
2834                               size_t size, int idx)
2835  {
2836      size_t len = 0;
2837  
2838      if (rdma->wr_data[idx].control_len) {
2839          trace_qemu_rdma_fill(rdma->wr_data[idx].control_len, size);
2840  
2841          len = MIN(size, rdma->wr_data[idx].control_len);
2842          memcpy(buf, rdma->wr_data[idx].control_curr, len);
2843          rdma->wr_data[idx].control_curr += len;
2844          rdma->wr_data[idx].control_len -= len;
2845      }
2846  
2847      return len;
2848  }
2849  
2850  /*
2851   * QEMUFile interface to the control channel.
2852   * RDMA links don't use bytestreams, so we have to
2853   * return bytes to QEMUFile opportunistically.
2854   */
2855  static ssize_t qio_channel_rdma_readv(QIOChannel *ioc,
2856                                        const struct iovec *iov,
2857                                        size_t niov,
2858                                        int **fds,
2859                                        size_t *nfds,
2860                                        int flags,
2861                                        Error **errp)
2862  {
2863      QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
2864      RDMAContext *rdma;
2865      RDMAControlHeader head;
2866      int ret = 0;
2867      ssize_t i;
2868      size_t done = 0;
2869  
2870      RCU_READ_LOCK_GUARD();
2871      rdma = qatomic_rcu_read(&rioc->rdmain);
2872  
2873      if (!rdma) {
2874          error_setg(errp, "RDMA control channel input is not set");
2875          return -1;
2876      }
2877  
2878      CHECK_ERROR_STATE();
2879  
2880      for (i = 0; i < niov; i++) {
2881          size_t want = iov[i].iov_len;
2882          uint8_t *data = (void *)iov[i].iov_base;
2883  
2884          /*
2885           * First, we hold on to the last SEND message we
2886           * were given and dish out the bytes until we run
2887           * out of bytes.
2888           */
2889          ret = qemu_rdma_fill(rdma, data, want, 0);
2890          done += ret;
2891          want -= ret;
2892          /* Got what we needed, so go to next iovec */
2893          if (want == 0) {
2894              continue;
2895          }
2896  
2897          /* If we got any data so far, then don't wait
2898           * for more, just return what we have */
2899          if (done > 0) {
2900              break;
2901          }
2902  
2903  
2904          /* We've got nothing at all, so lets wait for
2905           * more to arrive
2906           */
2907          ret = qemu_rdma_exchange_recv(rdma, &head, RDMA_CONTROL_QEMU_FILE);
2908  
2909          if (ret < 0) {
2910              rdma->error_state = ret;
2911              error_setg(errp, "qemu_rdma_exchange_recv returned %d", ret);
2912              return -1;
2913          }
2914  
2915          /*
2916           * SEND was received with new bytes, now try again.
2917           */
2918          ret = qemu_rdma_fill(rdma, data, want, 0);
2919          done += ret;
2920          want -= ret;
2921  
2922          /* Still didn't get enough, so lets just return */
2923          if (want) {
2924              if (done == 0) {
2925                  return QIO_CHANNEL_ERR_BLOCK;
2926              } else {
2927                  break;
2928              }
2929          }
2930      }
2931      return done;
2932  }
2933  
2934  /*
2935   * Block until all the outstanding chunks have been delivered by the hardware.
2936   */
2937  static int qemu_rdma_drain_cq(QEMUFile *f, RDMAContext *rdma)
2938  {
2939      int ret;
2940  
2941      if (qemu_rdma_write_flush(f, rdma) < 0) {
2942          return -EIO;
2943      }
2944  
2945      while (rdma->nb_sent) {
2946          ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE, NULL);
2947          if (ret < 0) {
2948              error_report("rdma migration: complete polling error!");
2949              return -EIO;
2950          }
2951      }
2952  
2953      qemu_rdma_unregister_waiting(rdma);
2954  
2955      return 0;
2956  }
2957  
2958  
2959  static int qio_channel_rdma_set_blocking(QIOChannel *ioc,
2960                                           bool blocking,
2961                                           Error **errp)
2962  {
2963      QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
2964      /* XXX we should make readv/writev actually honour this :-) */
2965      rioc->blocking = blocking;
2966      return 0;
2967  }
2968  
2969  
2970  typedef struct QIOChannelRDMASource QIOChannelRDMASource;
2971  struct QIOChannelRDMASource {
2972      GSource parent;
2973      QIOChannelRDMA *rioc;
2974      GIOCondition condition;
2975  };
2976  
2977  static gboolean
2978  qio_channel_rdma_source_prepare(GSource *source,
2979                                  gint *timeout)
2980  {
2981      QIOChannelRDMASource *rsource = (QIOChannelRDMASource *)source;
2982      RDMAContext *rdma;
2983      GIOCondition cond = 0;
2984      *timeout = -1;
2985  
2986      RCU_READ_LOCK_GUARD();
2987      if (rsource->condition == G_IO_IN) {
2988          rdma = qatomic_rcu_read(&rsource->rioc->rdmain);
2989      } else {
2990          rdma = qatomic_rcu_read(&rsource->rioc->rdmaout);
2991      }
2992  
2993      if (!rdma) {
2994          error_report("RDMAContext is NULL when prepare Gsource");
2995          return FALSE;
2996      }
2997  
2998      if (rdma->wr_data[0].control_len) {
2999          cond |= G_IO_IN;
3000      }
3001      cond |= G_IO_OUT;
3002  
3003      return cond & rsource->condition;
3004  }
3005  
3006  static gboolean
3007  qio_channel_rdma_source_check(GSource *source)
3008  {
3009      QIOChannelRDMASource *rsource = (QIOChannelRDMASource *)source;
3010      RDMAContext *rdma;
3011      GIOCondition cond = 0;
3012  
3013      RCU_READ_LOCK_GUARD();
3014      if (rsource->condition == G_IO_IN) {
3015          rdma = qatomic_rcu_read(&rsource->rioc->rdmain);
3016      } else {
3017          rdma = qatomic_rcu_read(&rsource->rioc->rdmaout);
3018      }
3019  
3020      if (!rdma) {
3021          error_report("RDMAContext is NULL when check Gsource");
3022          return FALSE;
3023      }
3024  
3025      if (rdma->wr_data[0].control_len) {
3026          cond |= G_IO_IN;
3027      }
3028      cond |= G_IO_OUT;
3029  
3030      return cond & rsource->condition;
3031  }
3032  
3033  static gboolean
3034  qio_channel_rdma_source_dispatch(GSource *source,
3035                                   GSourceFunc callback,
3036                                   gpointer user_data)
3037  {
3038      QIOChannelFunc func = (QIOChannelFunc)callback;
3039      QIOChannelRDMASource *rsource = (QIOChannelRDMASource *)source;
3040      RDMAContext *rdma;
3041      GIOCondition cond = 0;
3042  
3043      RCU_READ_LOCK_GUARD();
3044      if (rsource->condition == G_IO_IN) {
3045          rdma = qatomic_rcu_read(&rsource->rioc->rdmain);
3046      } else {
3047          rdma = qatomic_rcu_read(&rsource->rioc->rdmaout);
3048      }
3049  
3050      if (!rdma) {
3051          error_report("RDMAContext is NULL when dispatch Gsource");
3052          return FALSE;
3053      }
3054  
3055      if (rdma->wr_data[0].control_len) {
3056          cond |= G_IO_IN;
3057      }
3058      cond |= G_IO_OUT;
3059  
3060      return (*func)(QIO_CHANNEL(rsource->rioc),
3061                     (cond & rsource->condition),
3062                     user_data);
3063  }
3064  
3065  static void
3066  qio_channel_rdma_source_finalize(GSource *source)
3067  {
3068      QIOChannelRDMASource *ssource = (QIOChannelRDMASource *)source;
3069  
3070      object_unref(OBJECT(ssource->rioc));
3071  }
3072  
3073  GSourceFuncs qio_channel_rdma_source_funcs = {
3074      qio_channel_rdma_source_prepare,
3075      qio_channel_rdma_source_check,
3076      qio_channel_rdma_source_dispatch,
3077      qio_channel_rdma_source_finalize
3078  };
3079  
3080  static GSource *qio_channel_rdma_create_watch(QIOChannel *ioc,
3081                                                GIOCondition condition)
3082  {
3083      QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
3084      QIOChannelRDMASource *ssource;
3085      GSource *source;
3086  
3087      source = g_source_new(&qio_channel_rdma_source_funcs,
3088                            sizeof(QIOChannelRDMASource));
3089      ssource = (QIOChannelRDMASource *)source;
3090  
3091      ssource->rioc = rioc;
3092      object_ref(OBJECT(rioc));
3093  
3094      ssource->condition = condition;
3095  
3096      return source;
3097  }
3098  
3099  static void qio_channel_rdma_set_aio_fd_handler(QIOChannel *ioc,
3100                                                    AioContext *ctx,
3101                                                    IOHandler *io_read,
3102                                                    IOHandler *io_write,
3103                                                    void *opaque)
3104  {
3105      QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
3106      if (io_read) {
3107          aio_set_fd_handler(ctx, rioc->rdmain->recv_comp_channel->fd,
3108                             false, io_read, io_write, NULL, NULL, opaque);
3109          aio_set_fd_handler(ctx, rioc->rdmain->send_comp_channel->fd,
3110                             false, io_read, io_write, NULL, NULL, opaque);
3111      } else {
3112          aio_set_fd_handler(ctx, rioc->rdmaout->recv_comp_channel->fd,
3113                             false, io_read, io_write, NULL, NULL, opaque);
3114          aio_set_fd_handler(ctx, rioc->rdmaout->send_comp_channel->fd,
3115                             false, io_read, io_write, NULL, NULL, opaque);
3116      }
3117  }
3118  
3119  struct rdma_close_rcu {
3120      struct rcu_head rcu;
3121      RDMAContext *rdmain;
3122      RDMAContext *rdmaout;
3123  };
3124  
3125  /* callback from qio_channel_rdma_close via call_rcu */
3126  static void qio_channel_rdma_close_rcu(struct rdma_close_rcu *rcu)
3127  {
3128      if (rcu->rdmain) {
3129          qemu_rdma_cleanup(rcu->rdmain);
3130      }
3131  
3132      if (rcu->rdmaout) {
3133          qemu_rdma_cleanup(rcu->rdmaout);
3134      }
3135  
3136      g_free(rcu->rdmain);
3137      g_free(rcu->rdmaout);
3138      g_free(rcu);
3139  }
3140  
3141  static int qio_channel_rdma_close(QIOChannel *ioc,
3142                                    Error **errp)
3143  {
3144      QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
3145      RDMAContext *rdmain, *rdmaout;
3146      struct rdma_close_rcu *rcu = g_new(struct rdma_close_rcu, 1);
3147  
3148      trace_qemu_rdma_close();
3149  
3150      rdmain = rioc->rdmain;
3151      if (rdmain) {
3152          qatomic_rcu_set(&rioc->rdmain, NULL);
3153      }
3154  
3155      rdmaout = rioc->rdmaout;
3156      if (rdmaout) {
3157          qatomic_rcu_set(&rioc->rdmaout, NULL);
3158      }
3159  
3160      rcu->rdmain = rdmain;
3161      rcu->rdmaout = rdmaout;
3162      call_rcu(rcu, qio_channel_rdma_close_rcu, rcu);
3163  
3164      return 0;
3165  }
3166  
3167  static int
3168  qio_channel_rdma_shutdown(QIOChannel *ioc,
3169                              QIOChannelShutdown how,
3170                              Error **errp)
3171  {
3172      QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
3173      RDMAContext *rdmain, *rdmaout;
3174  
3175      RCU_READ_LOCK_GUARD();
3176  
3177      rdmain = qatomic_rcu_read(&rioc->rdmain);
3178      rdmaout = qatomic_rcu_read(&rioc->rdmain);
3179  
3180      switch (how) {
3181      case QIO_CHANNEL_SHUTDOWN_READ:
3182          if (rdmain) {
3183              rdmain->error_state = -1;
3184          }
3185          break;
3186      case QIO_CHANNEL_SHUTDOWN_WRITE:
3187          if (rdmaout) {
3188              rdmaout->error_state = -1;
3189          }
3190          break;
3191      case QIO_CHANNEL_SHUTDOWN_BOTH:
3192      default:
3193          if (rdmain) {
3194              rdmain->error_state = -1;
3195          }
3196          if (rdmaout) {
3197              rdmaout->error_state = -1;
3198          }
3199          break;
3200      }
3201  
3202      return 0;
3203  }
3204  
3205  /*
3206   * Parameters:
3207   *    @offset == 0 :
3208   *        This means that 'block_offset' is a full virtual address that does not
3209   *        belong to a RAMBlock of the virtual machine and instead
3210   *        represents a private malloc'd memory area that the caller wishes to
3211   *        transfer.
3212   *
3213   *    @offset != 0 :
3214   *        Offset is an offset to be added to block_offset and used
3215   *        to also lookup the corresponding RAMBlock.
3216   *
3217   *    @size : Number of bytes to transfer
3218   *
3219   *    @bytes_sent : User-specificed pointer to indicate how many bytes were
3220   *                  sent. Usually, this will not be more than a few bytes of
3221   *                  the protocol because most transfers are sent asynchronously.
3222   */
3223  static size_t qemu_rdma_save_page(QEMUFile *f,
3224                                    ram_addr_t block_offset, ram_addr_t offset,
3225                                    size_t size, uint64_t *bytes_sent)
3226  {
3227      QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(qemu_file_get_ioc(f));
3228      RDMAContext *rdma;
3229      int ret;
3230  
3231      RCU_READ_LOCK_GUARD();
3232      rdma = qatomic_rcu_read(&rioc->rdmaout);
3233  
3234      if (!rdma) {
3235          return -EIO;
3236      }
3237  
3238      CHECK_ERROR_STATE();
3239  
3240      if (migration_in_postcopy()) {
3241          return RAM_SAVE_CONTROL_NOT_SUPP;
3242      }
3243  
3244      qemu_fflush(f);
3245  
3246      /*
3247       * Add this page to the current 'chunk'. If the chunk
3248       * is full, or the page doesn't belong to the current chunk,
3249       * an actual RDMA write will occur and a new chunk will be formed.
3250       */
3251      ret = qemu_rdma_write(f, rdma, block_offset, offset, size);
3252      if (ret < 0) {
3253          error_report("rdma migration: write error! %d", ret);
3254          goto err;
3255      }
3256  
3257      /*
3258       * We always return 1 bytes because the RDMA
3259       * protocol is completely asynchronous. We do not yet know
3260       * whether an  identified chunk is zero or not because we're
3261       * waiting for other pages to potentially be merged with
3262       * the current chunk. So, we have to call qemu_update_position()
3263       * later on when the actual write occurs.
3264       */
3265      if (bytes_sent) {
3266          *bytes_sent = 1;
3267      }
3268  
3269      /*
3270       * Drain the Completion Queue if possible, but do not block,
3271       * just poll.
3272       *
3273       * If nothing to poll, the end of the iteration will do this
3274       * again to make sure we don't overflow the request queue.
3275       */
3276      while (1) {
3277          uint64_t wr_id, wr_id_in;
3278          int ret = qemu_rdma_poll(rdma, rdma->recv_cq, &wr_id_in, NULL);
3279          if (ret < 0) {
3280              error_report("rdma migration: polling error! %d", ret);
3281              goto err;
3282          }
3283  
3284          wr_id = wr_id_in & RDMA_WRID_TYPE_MASK;
3285  
3286          if (wr_id == RDMA_WRID_NONE) {
3287              break;
3288          }
3289      }
3290  
3291      while (1) {
3292          uint64_t wr_id, wr_id_in;
3293          int ret = qemu_rdma_poll(rdma, rdma->send_cq, &wr_id_in, NULL);
3294          if (ret < 0) {
3295              error_report("rdma migration: polling error! %d", ret);
3296              goto err;
3297          }
3298  
3299          wr_id = wr_id_in & RDMA_WRID_TYPE_MASK;
3300  
3301          if (wr_id == RDMA_WRID_NONE) {
3302              break;
3303          }
3304      }
3305  
3306      return RAM_SAVE_CONTROL_DELAYED;
3307  err:
3308      rdma->error_state = ret;
3309      return ret;
3310  }
3311  
3312  static void rdma_accept_incoming_migration(void *opaque);
3313  
3314  static void rdma_cm_poll_handler(void *opaque)
3315  {
3316      RDMAContext *rdma = opaque;
3317      int ret;
3318      struct rdma_cm_event *cm_event;
3319      MigrationIncomingState *mis = migration_incoming_get_current();
3320  
3321      ret = rdma_get_cm_event(rdma->channel, &cm_event);
3322      if (ret) {
3323          error_report("get_cm_event failed %d", errno);
3324          return;
3325      }
3326  
3327      if (cm_event->event == RDMA_CM_EVENT_DISCONNECTED ||
3328          cm_event->event == RDMA_CM_EVENT_DEVICE_REMOVAL) {
3329          if (!rdma->error_state &&
3330              migration_incoming_get_current()->state !=
3331                MIGRATION_STATUS_COMPLETED) {
3332              error_report("receive cm event, cm event is %d", cm_event->event);
3333              rdma->error_state = -EPIPE;
3334              if (rdma->return_path) {
3335                  rdma->return_path->error_state = -EPIPE;
3336              }
3337          }
3338          rdma_ack_cm_event(cm_event);
3339  
3340          if (mis->migration_incoming_co) {
3341              qemu_coroutine_enter(mis->migration_incoming_co);
3342          }
3343          return;
3344      }
3345      rdma_ack_cm_event(cm_event);
3346  }
3347  
3348  static int qemu_rdma_accept(RDMAContext *rdma)
3349  {
3350      RDMACapabilities cap;
3351      struct rdma_conn_param conn_param = {
3352                                              .responder_resources = 2,
3353                                              .private_data = &cap,
3354                                              .private_data_len = sizeof(cap),
3355                                           };
3356      RDMAContext *rdma_return_path = NULL;
3357      struct rdma_cm_event *cm_event;
3358      struct ibv_context *verbs;
3359      int ret = -EINVAL;
3360      int idx;
3361  
3362      ret = rdma_get_cm_event(rdma->channel, &cm_event);
3363      if (ret) {
3364          goto err_rdma_dest_wait;
3365      }
3366  
3367      if (cm_event->event != RDMA_CM_EVENT_CONNECT_REQUEST) {
3368          rdma_ack_cm_event(cm_event);
3369          goto err_rdma_dest_wait;
3370      }
3371  
3372      /*
3373       * initialize the RDMAContext for return path for postcopy after first
3374       * connection request reached.
3375       */
3376      if (migrate_postcopy() && !rdma->is_return_path) {
3377          rdma_return_path = qemu_rdma_data_init(rdma->host_port, NULL);
3378          if (rdma_return_path == NULL) {
3379              rdma_ack_cm_event(cm_event);
3380              goto err_rdma_dest_wait;
3381          }
3382  
3383          qemu_rdma_return_path_dest_init(rdma_return_path, rdma);
3384      }
3385  
3386      memcpy(&cap, cm_event->param.conn.private_data, sizeof(cap));
3387  
3388      network_to_caps(&cap);
3389  
3390      if (cap.version < 1 || cap.version > RDMA_CONTROL_VERSION_CURRENT) {
3391              error_report("Unknown source RDMA version: %d, bailing...",
3392                              cap.version);
3393              rdma_ack_cm_event(cm_event);
3394              goto err_rdma_dest_wait;
3395      }
3396  
3397      /*
3398       * Respond with only the capabilities this version of QEMU knows about.
3399       */
3400      cap.flags &= known_capabilities;
3401  
3402      /*
3403       * Enable the ones that we do know about.
3404       * Add other checks here as new ones are introduced.
3405       */
3406      if (cap.flags & RDMA_CAPABILITY_PIN_ALL) {
3407          rdma->pin_all = true;
3408      }
3409  
3410      rdma->cm_id = cm_event->id;
3411      verbs = cm_event->id->verbs;
3412  
3413      rdma_ack_cm_event(cm_event);
3414  
3415      trace_qemu_rdma_accept_pin_state(rdma->pin_all);
3416  
3417      caps_to_network(&cap);
3418  
3419      trace_qemu_rdma_accept_pin_verbsc(verbs);
3420  
3421      if (!rdma->verbs) {
3422          rdma->verbs = verbs;
3423      } else if (rdma->verbs != verbs) {
3424              error_report("ibv context not matching %p, %p!", rdma->verbs,
3425                           verbs);
3426              goto err_rdma_dest_wait;
3427      }
3428  
3429      qemu_rdma_dump_id("dest_init", verbs);
3430  
3431      ret = qemu_rdma_alloc_pd_cq(rdma);
3432      if (ret) {
3433          error_report("rdma migration: error allocating pd and cq!");
3434          goto err_rdma_dest_wait;
3435      }
3436  
3437      ret = qemu_rdma_alloc_qp(rdma);
3438      if (ret) {
3439          error_report("rdma migration: error allocating qp!");
3440          goto err_rdma_dest_wait;
3441      }
3442  
3443      ret = qemu_rdma_init_ram_blocks(rdma);
3444      if (ret) {
3445          error_report("rdma migration: error initializing ram blocks!");
3446          goto err_rdma_dest_wait;
3447      }
3448  
3449      for (idx = 0; idx < RDMA_WRID_MAX; idx++) {
3450          ret = qemu_rdma_reg_control(rdma, idx);
3451          if (ret) {
3452              error_report("rdma: error registering %d control", idx);
3453              goto err_rdma_dest_wait;
3454          }
3455      }
3456  
3457      /* Accept the second connection request for return path */
3458      if (migrate_postcopy() && !rdma->is_return_path) {
3459          qemu_set_fd_handler(rdma->channel->fd, rdma_accept_incoming_migration,
3460                              NULL,
3461                              (void *)(intptr_t)rdma->return_path);
3462      } else {
3463          qemu_set_fd_handler(rdma->channel->fd, rdma_cm_poll_handler,
3464                              NULL, rdma);
3465      }
3466  
3467      ret = rdma_accept(rdma->cm_id, &conn_param);
3468      if (ret) {
3469          error_report("rdma_accept returns %d", ret);
3470          goto err_rdma_dest_wait;
3471      }
3472  
3473      ret = rdma_get_cm_event(rdma->channel, &cm_event);
3474      if (ret) {
3475          error_report("rdma_accept get_cm_event failed %d", ret);
3476          goto err_rdma_dest_wait;
3477      }
3478  
3479      if (cm_event->event != RDMA_CM_EVENT_ESTABLISHED) {
3480          error_report("rdma_accept not event established");
3481          rdma_ack_cm_event(cm_event);
3482          goto err_rdma_dest_wait;
3483      }
3484  
3485      rdma_ack_cm_event(cm_event);
3486      rdma->connected = true;
3487  
3488      ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY);
3489      if (ret) {
3490          error_report("rdma migration: error posting second control recv");
3491          goto err_rdma_dest_wait;
3492      }
3493  
3494      qemu_rdma_dump_gid("dest_connect", rdma->cm_id);
3495  
3496      return 0;
3497  
3498  err_rdma_dest_wait:
3499      rdma->error_state = ret;
3500      qemu_rdma_cleanup(rdma);
3501      g_free(rdma_return_path);
3502      return ret;
3503  }
3504  
3505  static int dest_ram_sort_func(const void *a, const void *b)
3506  {
3507      unsigned int a_index = ((const RDMALocalBlock *)a)->src_index;
3508      unsigned int b_index = ((const RDMALocalBlock *)b)->src_index;
3509  
3510      return (a_index < b_index) ? -1 : (a_index != b_index);
3511  }
3512  
3513  /*
3514   * During each iteration of the migration, we listen for instructions
3515   * by the source VM to perform dynamic page registrations before they
3516   * can perform RDMA operations.
3517   *
3518   * We respond with the 'rkey'.
3519   *
3520   * Keep doing this until the source tells us to stop.
3521   */
3522  static int qemu_rdma_registration_handle(QEMUFile *f, void *opaque)
3523  {
3524      RDMAControlHeader reg_resp = { .len = sizeof(RDMARegisterResult),
3525                                 .type = RDMA_CONTROL_REGISTER_RESULT,
3526                                 .repeat = 0,
3527                               };
3528      RDMAControlHeader unreg_resp = { .len = 0,
3529                                 .type = RDMA_CONTROL_UNREGISTER_FINISHED,
3530                                 .repeat = 0,
3531                               };
3532      RDMAControlHeader blocks = { .type = RDMA_CONTROL_RAM_BLOCKS_RESULT,
3533                                   .repeat = 1 };
3534      QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque);
3535      RDMAContext *rdma;
3536      RDMALocalBlocks *local;
3537      RDMAControlHeader head;
3538      RDMARegister *reg, *registers;
3539      RDMACompress *comp;
3540      RDMARegisterResult *reg_result;
3541      static RDMARegisterResult results[RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE];
3542      RDMALocalBlock *block;
3543      void *host_addr;
3544      int ret = 0;
3545      int idx = 0;
3546      int count = 0;
3547      int i = 0;
3548  
3549      RCU_READ_LOCK_GUARD();
3550      rdma = qatomic_rcu_read(&rioc->rdmain);
3551  
3552      if (!rdma) {
3553          return -EIO;
3554      }
3555  
3556      CHECK_ERROR_STATE();
3557  
3558      local = &rdma->local_ram_blocks;
3559      do {
3560          trace_qemu_rdma_registration_handle_wait();
3561  
3562          ret = qemu_rdma_exchange_recv(rdma, &head, RDMA_CONTROL_NONE);
3563  
3564          if (ret < 0) {
3565              break;
3566          }
3567  
3568          if (head.repeat > RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE) {
3569              error_report("rdma: Too many requests in this message (%d)."
3570                              "Bailing.", head.repeat);
3571              ret = -EIO;
3572              break;
3573          }
3574  
3575          switch (head.type) {
3576          case RDMA_CONTROL_COMPRESS:
3577              comp = (RDMACompress *) rdma->wr_data[idx].control_curr;
3578              network_to_compress(comp);
3579  
3580              trace_qemu_rdma_registration_handle_compress(comp->length,
3581                                                           comp->block_idx,
3582                                                           comp->offset);
3583              if (comp->block_idx >= rdma->local_ram_blocks.nb_blocks) {
3584                  error_report("rdma: 'compress' bad block index %u (vs %d)",
3585                               (unsigned int)comp->block_idx,
3586                               rdma->local_ram_blocks.nb_blocks);
3587                  ret = -EIO;
3588                  goto out;
3589              }
3590              block = &(rdma->local_ram_blocks.block[comp->block_idx]);
3591  
3592              host_addr = block->local_host_addr +
3593                              (comp->offset - block->offset);
3594  
3595              ram_handle_compressed(host_addr, comp->value, comp->length);
3596              break;
3597  
3598          case RDMA_CONTROL_REGISTER_FINISHED:
3599              trace_qemu_rdma_registration_handle_finished();
3600              goto out;
3601  
3602          case RDMA_CONTROL_RAM_BLOCKS_REQUEST:
3603              trace_qemu_rdma_registration_handle_ram_blocks();
3604  
3605              /* Sort our local RAM Block list so it's the same as the source,
3606               * we can do this since we've filled in a src_index in the list
3607               * as we received the RAMBlock list earlier.
3608               */
3609              qsort(rdma->local_ram_blocks.block,
3610                    rdma->local_ram_blocks.nb_blocks,
3611                    sizeof(RDMALocalBlock), dest_ram_sort_func);
3612              for (i = 0; i < local->nb_blocks; i++) {
3613                  local->block[i].index = i;
3614              }
3615  
3616              if (rdma->pin_all) {
3617                  ret = qemu_rdma_reg_whole_ram_blocks(rdma);
3618                  if (ret) {
3619                      error_report("rdma migration: error dest "
3620                                      "registering ram blocks");
3621                      goto out;
3622                  }
3623              }
3624  
3625              /*
3626               * Dest uses this to prepare to transmit the RAMBlock descriptions
3627               * to the source VM after connection setup.
3628               * Both sides use the "remote" structure to communicate and update
3629               * their "local" descriptions with what was sent.
3630               */
3631              for (i = 0; i < local->nb_blocks; i++) {
3632                  rdma->dest_blocks[i].remote_host_addr =
3633                      (uintptr_t)(local->block[i].local_host_addr);
3634  
3635                  if (rdma->pin_all) {
3636                      rdma->dest_blocks[i].remote_rkey = local->block[i].mr->rkey;
3637                  }
3638  
3639                  rdma->dest_blocks[i].offset = local->block[i].offset;
3640                  rdma->dest_blocks[i].length = local->block[i].length;
3641  
3642                  dest_block_to_network(&rdma->dest_blocks[i]);
3643                  trace_qemu_rdma_registration_handle_ram_blocks_loop(
3644                      local->block[i].block_name,
3645                      local->block[i].offset,
3646                      local->block[i].length,
3647                      local->block[i].local_host_addr,
3648                      local->block[i].src_index);
3649              }
3650  
3651              blocks.len = rdma->local_ram_blocks.nb_blocks
3652                                                  * sizeof(RDMADestBlock);
3653  
3654  
3655              ret = qemu_rdma_post_send_control(rdma,
3656                                          (uint8_t *) rdma->dest_blocks, &blocks);
3657  
3658              if (ret < 0) {
3659                  error_report("rdma migration: error sending remote info");
3660                  goto out;
3661              }
3662  
3663              break;
3664          case RDMA_CONTROL_REGISTER_REQUEST:
3665              trace_qemu_rdma_registration_handle_register(head.repeat);
3666  
3667              reg_resp.repeat = head.repeat;
3668              registers = (RDMARegister *) rdma->wr_data[idx].control_curr;
3669  
3670              for (count = 0; count < head.repeat; count++) {
3671                  uint64_t chunk;
3672                  uint8_t *chunk_start, *chunk_end;
3673  
3674                  reg = &registers[count];
3675                  network_to_register(reg);
3676  
3677                  reg_result = &results[count];
3678  
3679                  trace_qemu_rdma_registration_handle_register_loop(count,
3680                           reg->current_index, reg->key.current_addr, reg->chunks);
3681  
3682                  if (reg->current_index >= rdma->local_ram_blocks.nb_blocks) {
3683                      error_report("rdma: 'register' bad block index %u (vs %d)",
3684                                   (unsigned int)reg->current_index,
3685                                   rdma->local_ram_blocks.nb_blocks);
3686                      ret = -ENOENT;
3687                      goto out;
3688                  }
3689                  block = &(rdma->local_ram_blocks.block[reg->current_index]);
3690                  if (block->is_ram_block) {
3691                      if (block->offset > reg->key.current_addr) {
3692                          error_report("rdma: bad register address for block %s"
3693                              " offset: %" PRIx64 " current_addr: %" PRIx64,
3694                              block->block_name, block->offset,
3695                              reg->key.current_addr);
3696                          ret = -ERANGE;
3697                          goto out;
3698                      }
3699                      host_addr = (block->local_host_addr +
3700                                  (reg->key.current_addr - block->offset));
3701                      chunk = ram_chunk_index(block->local_host_addr,
3702                                              (uint8_t *) host_addr);
3703                  } else {
3704                      chunk = reg->key.chunk;
3705                      host_addr = block->local_host_addr +
3706                          (reg->key.chunk * (1UL << RDMA_REG_CHUNK_SHIFT));
3707                      /* Check for particularly bad chunk value */
3708                      if (host_addr < (void *)block->local_host_addr) {
3709                          error_report("rdma: bad chunk for block %s"
3710                              " chunk: %" PRIx64,
3711                              block->block_name, reg->key.chunk);
3712                          ret = -ERANGE;
3713                          goto out;
3714                      }
3715                  }
3716                  chunk_start = ram_chunk_start(block, chunk);
3717                  chunk_end = ram_chunk_end(block, chunk + reg->chunks);
3718                  /* avoid "-Waddress-of-packed-member" warning */
3719                  uint32_t tmp_rkey = 0;
3720                  if (qemu_rdma_register_and_get_keys(rdma, block,
3721                              (uintptr_t)host_addr, NULL, &tmp_rkey,
3722                              chunk, chunk_start, chunk_end)) {
3723                      error_report("cannot get rkey");
3724                      ret = -EINVAL;
3725                      goto out;
3726                  }
3727                  reg_result->rkey = tmp_rkey;
3728  
3729                  reg_result->host_addr = (uintptr_t)block->local_host_addr;
3730  
3731                  trace_qemu_rdma_registration_handle_register_rkey(
3732                                                             reg_result->rkey);
3733  
3734                  result_to_network(reg_result);
3735              }
3736  
3737              ret = qemu_rdma_post_send_control(rdma,
3738                              (uint8_t *) results, &reg_resp);
3739  
3740              if (ret < 0) {
3741                  error_report("Failed to send control buffer");
3742                  goto out;
3743              }
3744              break;
3745          case RDMA_CONTROL_UNREGISTER_REQUEST:
3746              trace_qemu_rdma_registration_handle_unregister(head.repeat);
3747              unreg_resp.repeat = head.repeat;
3748              registers = (RDMARegister *) rdma->wr_data[idx].control_curr;
3749  
3750              for (count = 0; count < head.repeat; count++) {
3751                  reg = &registers[count];
3752                  network_to_register(reg);
3753  
3754                  trace_qemu_rdma_registration_handle_unregister_loop(count,
3755                             reg->current_index, reg->key.chunk);
3756  
3757                  block = &(rdma->local_ram_blocks.block[reg->current_index]);
3758  
3759                  ret = ibv_dereg_mr(block->pmr[reg->key.chunk]);
3760                  block->pmr[reg->key.chunk] = NULL;
3761  
3762                  if (ret != 0) {
3763                      perror("rdma unregistration chunk failed");
3764                      ret = -ret;
3765                      goto out;
3766                  }
3767  
3768                  rdma->total_registrations--;
3769  
3770                  trace_qemu_rdma_registration_handle_unregister_success(
3771                                                         reg->key.chunk);
3772              }
3773  
3774              ret = qemu_rdma_post_send_control(rdma, NULL, &unreg_resp);
3775  
3776              if (ret < 0) {
3777                  error_report("Failed to send control buffer");
3778                  goto out;
3779              }
3780              break;
3781          case RDMA_CONTROL_REGISTER_RESULT:
3782              error_report("Invalid RESULT message at dest.");
3783              ret = -EIO;
3784              goto out;
3785          default:
3786              error_report("Unknown control message %s", control_desc(head.type));
3787              ret = -EIO;
3788              goto out;
3789          }
3790      } while (1);
3791  out:
3792      if (ret < 0) {
3793          rdma->error_state = ret;
3794      }
3795      return ret;
3796  }
3797  
3798  /* Destination:
3799   * Called via a ram_control_load_hook during the initial RAM load section which
3800   * lists the RAMBlocks by name.  This lets us know the order of the RAMBlocks
3801   * on the source.
3802   * We've already built our local RAMBlock list, but not yet sent the list to
3803   * the source.
3804   */
3805  static int
3806  rdma_block_notification_handle(QIOChannelRDMA *rioc, const char *name)
3807  {
3808      RDMAContext *rdma;
3809      int curr;
3810      int found = -1;
3811  
3812      RCU_READ_LOCK_GUARD();
3813      rdma = qatomic_rcu_read(&rioc->rdmain);
3814  
3815      if (!rdma) {
3816          return -EIO;
3817      }
3818  
3819      /* Find the matching RAMBlock in our local list */
3820      for (curr = 0; curr < rdma->local_ram_blocks.nb_blocks; curr++) {
3821          if (!strcmp(rdma->local_ram_blocks.block[curr].block_name, name)) {
3822              found = curr;
3823              break;
3824          }
3825      }
3826  
3827      if (found == -1) {
3828          error_report("RAMBlock '%s' not found on destination", name);
3829          return -ENOENT;
3830      }
3831  
3832      rdma->local_ram_blocks.block[curr].src_index = rdma->next_src_index;
3833      trace_rdma_block_notification_handle(name, rdma->next_src_index);
3834      rdma->next_src_index++;
3835  
3836      return 0;
3837  }
3838  
3839  static int rdma_load_hook(QEMUFile *f, uint64_t flags, void *data)
3840  {
3841      QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(qemu_file_get_ioc(f));
3842      switch (flags) {
3843      case RAM_CONTROL_BLOCK_REG:
3844          return rdma_block_notification_handle(rioc, data);
3845  
3846      case RAM_CONTROL_HOOK:
3847          return qemu_rdma_registration_handle(f, rioc);
3848  
3849      default:
3850          /* Shouldn't be called with any other values */
3851          abort();
3852      }
3853  }
3854  
3855  static int qemu_rdma_registration_start(QEMUFile *f,
3856                                          uint64_t flags, void *data)
3857  {
3858      QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(qemu_file_get_ioc(f));
3859      RDMAContext *rdma;
3860  
3861      RCU_READ_LOCK_GUARD();
3862      rdma = qatomic_rcu_read(&rioc->rdmaout);
3863      if (!rdma) {
3864          return -EIO;
3865      }
3866  
3867      CHECK_ERROR_STATE();
3868  
3869      if (migration_in_postcopy()) {
3870          return 0;
3871      }
3872  
3873      trace_qemu_rdma_registration_start(flags);
3874      qemu_put_be64(f, RAM_SAVE_FLAG_HOOK);
3875      qemu_fflush(f);
3876  
3877      return 0;
3878  }
3879  
3880  /*
3881   * Inform dest that dynamic registrations are done for now.
3882   * First, flush writes, if any.
3883   */
3884  static int qemu_rdma_registration_stop(QEMUFile *f,
3885                                         uint64_t flags, void *data)
3886  {
3887      QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(qemu_file_get_ioc(f));
3888      RDMAContext *rdma;
3889      RDMAControlHeader head = { .len = 0, .repeat = 1 };
3890      int ret = 0;
3891  
3892      RCU_READ_LOCK_GUARD();
3893      rdma = qatomic_rcu_read(&rioc->rdmaout);
3894      if (!rdma) {
3895          return -EIO;
3896      }
3897  
3898      CHECK_ERROR_STATE();
3899  
3900      if (migration_in_postcopy()) {
3901          return 0;
3902      }
3903  
3904      qemu_fflush(f);
3905      ret = qemu_rdma_drain_cq(f, rdma);
3906  
3907      if (ret < 0) {
3908          goto err;
3909      }
3910  
3911      if (flags == RAM_CONTROL_SETUP) {
3912          RDMAControlHeader resp = {.type = RDMA_CONTROL_RAM_BLOCKS_RESULT };
3913          RDMALocalBlocks *local = &rdma->local_ram_blocks;
3914          int reg_result_idx, i, nb_dest_blocks;
3915  
3916          head.type = RDMA_CONTROL_RAM_BLOCKS_REQUEST;
3917          trace_qemu_rdma_registration_stop_ram();
3918  
3919          /*
3920           * Make sure that we parallelize the pinning on both sides.
3921           * For very large guests, doing this serially takes a really
3922           * long time, so we have to 'interleave' the pinning locally
3923           * with the control messages by performing the pinning on this
3924           * side before we receive the control response from the other
3925           * side that the pinning has completed.
3926           */
3927          ret = qemu_rdma_exchange_send(rdma, &head, NULL, &resp,
3928                      &reg_result_idx, rdma->pin_all ?
3929                      qemu_rdma_reg_whole_ram_blocks : NULL);
3930          if (ret < 0) {
3931              fprintf(stderr, "receiving remote info!");
3932              return ret;
3933          }
3934  
3935          nb_dest_blocks = resp.len / sizeof(RDMADestBlock);
3936  
3937          /*
3938           * The protocol uses two different sets of rkeys (mutually exclusive):
3939           * 1. One key to represent the virtual address of the entire ram block.
3940           *    (dynamic chunk registration disabled - pin everything with one rkey.)
3941           * 2. One to represent individual chunks within a ram block.
3942           *    (dynamic chunk registration enabled - pin individual chunks.)
3943           *
3944           * Once the capability is successfully negotiated, the destination transmits
3945           * the keys to use (or sends them later) including the virtual addresses
3946           * and then propagates the remote ram block descriptions to his local copy.
3947           */
3948  
3949          if (local->nb_blocks != nb_dest_blocks) {
3950              fprintf(stderr, "ram blocks mismatch (Number of blocks %d vs %d) "
3951                      "Your QEMU command line parameters are probably "
3952                      "not identical on both the source and destination.",
3953                      local->nb_blocks, nb_dest_blocks);
3954              rdma->error_state = -EINVAL;
3955              return -EINVAL;
3956          }
3957  
3958          qemu_rdma_move_header(rdma, reg_result_idx, &resp);
3959          memcpy(rdma->dest_blocks,
3960              rdma->wr_data[reg_result_idx].control_curr, resp.len);
3961          for (i = 0; i < nb_dest_blocks; i++) {
3962              network_to_dest_block(&rdma->dest_blocks[i]);
3963  
3964              /* We require that the blocks are in the same order */
3965              if (rdma->dest_blocks[i].length != local->block[i].length) {
3966                  fprintf(stderr, "Block %s/%d has a different length %" PRIu64
3967                          "vs %" PRIu64, local->block[i].block_name, i,
3968                          local->block[i].length,
3969                          rdma->dest_blocks[i].length);
3970                  rdma->error_state = -EINVAL;
3971                  return -EINVAL;
3972              }
3973              local->block[i].remote_host_addr =
3974                      rdma->dest_blocks[i].remote_host_addr;
3975              local->block[i].remote_rkey = rdma->dest_blocks[i].remote_rkey;
3976          }
3977      }
3978  
3979      trace_qemu_rdma_registration_stop(flags);
3980  
3981      head.type = RDMA_CONTROL_REGISTER_FINISHED;
3982      ret = qemu_rdma_exchange_send(rdma, &head, NULL, NULL, NULL, NULL);
3983  
3984      if (ret < 0) {
3985          goto err;
3986      }
3987  
3988      return 0;
3989  err:
3990      rdma->error_state = ret;
3991      return ret;
3992  }
3993  
3994  static const QEMUFileHooks rdma_read_hooks = {
3995      .hook_ram_load = rdma_load_hook,
3996  };
3997  
3998  static const QEMUFileHooks rdma_write_hooks = {
3999      .before_ram_iterate = qemu_rdma_registration_start,
4000      .after_ram_iterate  = qemu_rdma_registration_stop,
4001      .save_page          = qemu_rdma_save_page,
4002  };
4003  
4004  
4005  static void qio_channel_rdma_finalize(Object *obj)
4006  {
4007      QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(obj);
4008      if (rioc->rdmain) {
4009          qemu_rdma_cleanup(rioc->rdmain);
4010          g_free(rioc->rdmain);
4011          rioc->rdmain = NULL;
4012      }
4013      if (rioc->rdmaout) {
4014          qemu_rdma_cleanup(rioc->rdmaout);
4015          g_free(rioc->rdmaout);
4016          rioc->rdmaout = NULL;
4017      }
4018  }
4019  
4020  static void qio_channel_rdma_class_init(ObjectClass *klass,
4021                                          void *class_data G_GNUC_UNUSED)
4022  {
4023      QIOChannelClass *ioc_klass = QIO_CHANNEL_CLASS(klass);
4024  
4025      ioc_klass->io_writev = qio_channel_rdma_writev;
4026      ioc_klass->io_readv = qio_channel_rdma_readv;
4027      ioc_klass->io_set_blocking = qio_channel_rdma_set_blocking;
4028      ioc_klass->io_close = qio_channel_rdma_close;
4029      ioc_klass->io_create_watch = qio_channel_rdma_create_watch;
4030      ioc_klass->io_set_aio_fd_handler = qio_channel_rdma_set_aio_fd_handler;
4031      ioc_klass->io_shutdown = qio_channel_rdma_shutdown;
4032  }
4033  
4034  static const TypeInfo qio_channel_rdma_info = {
4035      .parent = TYPE_QIO_CHANNEL,
4036      .name = TYPE_QIO_CHANNEL_RDMA,
4037      .instance_size = sizeof(QIOChannelRDMA),
4038      .instance_finalize = qio_channel_rdma_finalize,
4039      .class_init = qio_channel_rdma_class_init,
4040  };
4041  
4042  static void qio_channel_rdma_register_types(void)
4043  {
4044      type_register_static(&qio_channel_rdma_info);
4045  }
4046  
4047  type_init(qio_channel_rdma_register_types);
4048  
4049  static QEMUFile *qemu_fopen_rdma(RDMAContext *rdma, const char *mode)
4050  {
4051      QIOChannelRDMA *rioc;
4052  
4053      if (qemu_file_mode_is_not_valid(mode)) {
4054          return NULL;
4055      }
4056  
4057      rioc = QIO_CHANNEL_RDMA(object_new(TYPE_QIO_CHANNEL_RDMA));
4058  
4059      if (mode[0] == 'w') {
4060          rioc->file = qemu_file_new_output(QIO_CHANNEL(rioc));
4061          rioc->rdmaout = rdma;
4062          rioc->rdmain = rdma->return_path;
4063          qemu_file_set_hooks(rioc->file, &rdma_write_hooks);
4064      } else {
4065          rioc->file = qemu_file_new_input(QIO_CHANNEL(rioc));
4066          rioc->rdmain = rdma;
4067          rioc->rdmaout = rdma->return_path;
4068          qemu_file_set_hooks(rioc->file, &rdma_read_hooks);
4069      }
4070  
4071      return rioc->file;
4072  }
4073  
4074  static void rdma_accept_incoming_migration(void *opaque)
4075  {
4076      RDMAContext *rdma = opaque;
4077      int ret;
4078      QEMUFile *f;
4079      Error *local_err = NULL;
4080  
4081      trace_qemu_rdma_accept_incoming_migration();
4082      ret = qemu_rdma_accept(rdma);
4083  
4084      if (ret) {
4085          fprintf(stderr, "RDMA ERROR: Migration initialization failed\n");
4086          return;
4087      }
4088  
4089      trace_qemu_rdma_accept_incoming_migration_accepted();
4090  
4091      if (rdma->is_return_path) {
4092          return;
4093      }
4094  
4095      f = qemu_fopen_rdma(rdma, "rb");
4096      if (f == NULL) {
4097          fprintf(stderr, "RDMA ERROR: could not qemu_fopen_rdma\n");
4098          qemu_rdma_cleanup(rdma);
4099          return;
4100      }
4101  
4102      rdma->migration_started_on_destination = 1;
4103      migration_fd_process_incoming(f, &local_err);
4104      if (local_err) {
4105          error_reportf_err(local_err, "RDMA ERROR:");
4106      }
4107  }
4108  
4109  void rdma_start_incoming_migration(const char *host_port, Error **errp)
4110  {
4111      int ret;
4112      RDMAContext *rdma, *rdma_return_path = NULL;
4113      Error *local_err = NULL;
4114  
4115      trace_rdma_start_incoming_migration();
4116  
4117      /* Avoid ram_block_discard_disable(), cannot change during migration. */
4118      if (ram_block_discard_is_required()) {
4119          error_setg(errp, "RDMA: cannot disable RAM discard");
4120          return;
4121      }
4122  
4123      rdma = qemu_rdma_data_init(host_port, &local_err);
4124      if (rdma == NULL) {
4125          goto err;
4126      }
4127  
4128      ret = qemu_rdma_dest_init(rdma, &local_err);
4129  
4130      if (ret) {
4131          goto err;
4132      }
4133  
4134      trace_rdma_start_incoming_migration_after_dest_init();
4135  
4136      ret = rdma_listen(rdma->listen_id, 5);
4137  
4138      if (ret) {
4139          ERROR(errp, "listening on socket!");
4140          goto cleanup_rdma;
4141      }
4142  
4143      trace_rdma_start_incoming_migration_after_rdma_listen();
4144  
4145      qemu_set_fd_handler(rdma->channel->fd, rdma_accept_incoming_migration,
4146                          NULL, (void *)(intptr_t)rdma);
4147      return;
4148  
4149  cleanup_rdma:
4150      qemu_rdma_cleanup(rdma);
4151  err:
4152      error_propagate(errp, local_err);
4153      if (rdma) {
4154          g_free(rdma->host);
4155          g_free(rdma->host_port);
4156      }
4157      g_free(rdma);
4158      g_free(rdma_return_path);
4159  }
4160  
4161  void rdma_start_outgoing_migration(void *opaque,
4162                              const char *host_port, Error **errp)
4163  {
4164      MigrationState *s = opaque;
4165      RDMAContext *rdma_return_path = NULL;
4166      RDMAContext *rdma;
4167      int ret = 0;
4168  
4169      /* Avoid ram_block_discard_disable(), cannot change during migration. */
4170      if (ram_block_discard_is_required()) {
4171          error_setg(errp, "RDMA: cannot disable RAM discard");
4172          return;
4173      }
4174  
4175      rdma = qemu_rdma_data_init(host_port, errp);
4176      if (rdma == NULL) {
4177          goto err;
4178      }
4179  
4180      ret = qemu_rdma_source_init(rdma,
4181          s->enabled_capabilities[MIGRATION_CAPABILITY_RDMA_PIN_ALL], errp);
4182  
4183      if (ret) {
4184          goto err;
4185      }
4186  
4187      trace_rdma_start_outgoing_migration_after_rdma_source_init();
4188      ret = qemu_rdma_connect(rdma, errp, false);
4189  
4190      if (ret) {
4191          goto err;
4192      }
4193  
4194      /* RDMA postcopy need a separate queue pair for return path */
4195      if (migrate_postcopy()) {
4196          rdma_return_path = qemu_rdma_data_init(host_port, errp);
4197  
4198          if (rdma_return_path == NULL) {
4199              goto return_path_err;
4200          }
4201  
4202          ret = qemu_rdma_source_init(rdma_return_path,
4203              s->enabled_capabilities[MIGRATION_CAPABILITY_RDMA_PIN_ALL], errp);
4204  
4205          if (ret) {
4206              goto return_path_err;
4207          }
4208  
4209          ret = qemu_rdma_connect(rdma_return_path, errp, true);
4210  
4211          if (ret) {
4212              goto return_path_err;
4213          }
4214  
4215          rdma->return_path = rdma_return_path;
4216          rdma_return_path->return_path = rdma;
4217          rdma_return_path->is_return_path = true;
4218      }
4219  
4220      trace_rdma_start_outgoing_migration_after_rdma_connect();
4221  
4222      s->to_dst_file = qemu_fopen_rdma(rdma, "wb");
4223      migrate_fd_connect(s, NULL);
4224      return;
4225  return_path_err:
4226      qemu_rdma_cleanup(rdma);
4227  err:
4228      g_free(rdma);
4229      g_free(rdma_return_path);
4230  }
4231