1 /* 2 * RDMA protocol and interfaces 3 * 4 * Copyright IBM, Corp. 2010-2013 5 * Copyright Red Hat, Inc. 2015-2016 6 * 7 * Authors: 8 * Michael R. Hines <mrhines@us.ibm.com> 9 * Jiuxing Liu <jl@us.ibm.com> 10 * Daniel P. Berrange <berrange@redhat.com> 11 * 12 * This work is licensed under the terms of the GNU GPL, version 2 or 13 * later. See the COPYING file in the top-level directory. 14 * 15 */ 16 17 #include "qemu/osdep.h" 18 #include "qapi/error.h" 19 #include "qemu/cutils.h" 20 #include "exec/target_page.h" 21 #include "rdma.h" 22 #include "migration.h" 23 #include "migration-stats.h" 24 #include "qemu-file.h" 25 #include "ram.h" 26 #include "qemu/error-report.h" 27 #include "qemu/main-loop.h" 28 #include "qemu/module.h" 29 #include "qemu/rcu.h" 30 #include "qemu/sockets.h" 31 #include "qemu/bitmap.h" 32 #include "qemu/coroutine.h" 33 #include "system/memory.h" 34 #include <sys/socket.h> 35 #include <netdb.h> 36 #include <arpa/inet.h> 37 #include <rdma/rdma_cma.h> 38 #include "trace.h" 39 #include "qom/object.h" 40 #include "options.h" 41 #include <poll.h> 42 43 #define RDMA_RESOLVE_TIMEOUT_MS 10000 44 45 /* Do not merge data if larger than this. */ 46 #define RDMA_MERGE_MAX (2 * 1024 * 1024) 47 #define RDMA_SIGNALED_SEND_MAX (RDMA_MERGE_MAX / 4096) 48 49 #define RDMA_REG_CHUNK_SHIFT 20 /* 1 MB */ 50 51 /* 52 * This is only for non-live state being migrated. 53 * Instead of RDMA_WRITE messages, we use RDMA_SEND 54 * messages for that state, which requires a different 55 * delivery design than main memory. 56 */ 57 #define RDMA_SEND_INCREMENT 32768 58 59 /* 60 * Maximum size infiniband SEND message 61 */ 62 #define RDMA_CONTROL_MAX_BUFFER (512 * 1024) 63 #define RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE 4096 64 65 #define RDMA_CONTROL_VERSION_CURRENT 1 66 /* 67 * Capabilities for negotiation. 68 */ 69 #define RDMA_CAPABILITY_PIN_ALL 0x01 70 71 /* 72 * Add the other flags above to this list of known capabilities 73 * as they are introduced. 74 */ 75 static uint32_t known_capabilities = RDMA_CAPABILITY_PIN_ALL; 76 77 /* 78 * A work request ID is 64-bits and we split up these bits 79 * into 3 parts: 80 * 81 * bits 0-15 : type of control message, 2^16 82 * bits 16-29: ram block index, 2^14 83 * bits 30-63: ram block chunk number, 2^34 84 * 85 * The last two bit ranges are only used for RDMA writes, 86 * in order to track their completion and potentially 87 * also track unregistration status of the message. 88 */ 89 #define RDMA_WRID_TYPE_SHIFT 0UL 90 #define RDMA_WRID_BLOCK_SHIFT 16UL 91 #define RDMA_WRID_CHUNK_SHIFT 30UL 92 93 #define RDMA_WRID_TYPE_MASK \ 94 ((1UL << RDMA_WRID_BLOCK_SHIFT) - 1UL) 95 96 #define RDMA_WRID_BLOCK_MASK \ 97 (~RDMA_WRID_TYPE_MASK & ((1UL << RDMA_WRID_CHUNK_SHIFT) - 1UL)) 98 99 #define RDMA_WRID_CHUNK_MASK (~RDMA_WRID_BLOCK_MASK & ~RDMA_WRID_TYPE_MASK) 100 101 /* 102 * RDMA migration protocol: 103 * 1. RDMA Writes (data messages, i.e. RAM) 104 * 2. IB Send/Recv (control channel messages) 105 */ 106 enum { 107 RDMA_WRID_NONE = 0, 108 RDMA_WRID_RDMA_WRITE = 1, 109 RDMA_WRID_SEND_CONTROL = 2000, 110 RDMA_WRID_RECV_CONTROL = 4000, 111 }; 112 113 /* 114 * Work request IDs for IB SEND messages only (not RDMA writes). 115 * This is used by the migration protocol to transmit 116 * control messages (such as device state and registration commands) 117 * 118 * We could use more WRs, but we have enough for now. 119 */ 120 enum { 121 RDMA_WRID_READY = 0, 122 RDMA_WRID_DATA, 123 RDMA_WRID_CONTROL, 124 RDMA_WRID_MAX, 125 }; 126 127 /* 128 * SEND/RECV IB Control Messages. 129 */ 130 enum { 131 RDMA_CONTROL_NONE = 0, 132 RDMA_CONTROL_ERROR, 133 RDMA_CONTROL_READY, /* ready to receive */ 134 RDMA_CONTROL_QEMU_FILE, /* QEMUFile-transmitted bytes */ 135 RDMA_CONTROL_RAM_BLOCKS_REQUEST, /* RAMBlock synchronization */ 136 RDMA_CONTROL_RAM_BLOCKS_RESULT, /* RAMBlock synchronization */ 137 RDMA_CONTROL_COMPRESS, /* page contains repeat values */ 138 RDMA_CONTROL_REGISTER_REQUEST, /* dynamic page registration */ 139 RDMA_CONTROL_REGISTER_RESULT, /* key to use after registration */ 140 RDMA_CONTROL_REGISTER_FINISHED, /* current iteration finished */ 141 RDMA_CONTROL_UNREGISTER_REQUEST, /* dynamic UN-registration */ 142 RDMA_CONTROL_UNREGISTER_FINISHED, /* unpinning finished */ 143 }; 144 145 146 /* 147 * Memory and MR structures used to represent an IB Send/Recv work request. 148 * This is *not* used for RDMA writes, only IB Send/Recv. 149 */ 150 typedef struct { 151 uint8_t control[RDMA_CONTROL_MAX_BUFFER]; /* actual buffer to register */ 152 struct ibv_mr *control_mr; /* registration metadata */ 153 size_t control_len; /* length of the message */ 154 uint8_t *control_curr; /* start of unconsumed bytes */ 155 } RDMAWorkRequestData; 156 157 /* 158 * Negotiate RDMA capabilities during connection-setup time. 159 */ 160 typedef struct { 161 uint32_t version; 162 uint32_t flags; 163 } RDMACapabilities; 164 165 static void caps_to_network(RDMACapabilities *cap) 166 { 167 cap->version = htonl(cap->version); 168 cap->flags = htonl(cap->flags); 169 } 170 171 static void network_to_caps(RDMACapabilities *cap) 172 { 173 cap->version = ntohl(cap->version); 174 cap->flags = ntohl(cap->flags); 175 } 176 177 /* 178 * Representation of a RAMBlock from an RDMA perspective. 179 * This is not transmitted, only local. 180 * This and subsequent structures cannot be linked lists 181 * because we're using a single IB message to transmit 182 * the information. It's small anyway, so a list is overkill. 183 */ 184 typedef struct RDMALocalBlock { 185 char *block_name; 186 uint8_t *local_host_addr; /* local virtual address */ 187 uint64_t remote_host_addr; /* remote virtual address */ 188 uint64_t offset; 189 uint64_t length; 190 struct ibv_mr **pmr; /* MRs for chunk-level registration */ 191 struct ibv_mr *mr; /* MR for non-chunk-level registration */ 192 uint32_t *remote_keys; /* rkeys for chunk-level registration */ 193 uint32_t remote_rkey; /* rkeys for non-chunk-level registration */ 194 int index; /* which block are we */ 195 unsigned int src_index; /* (Only used on dest) */ 196 bool is_ram_block; 197 int nb_chunks; 198 unsigned long *transit_bitmap; 199 unsigned long *unregister_bitmap; 200 } RDMALocalBlock; 201 202 /* 203 * Also represents a RAMblock, but only on the dest. 204 * This gets transmitted by the dest during connection-time 205 * to the source VM and then is used to populate the 206 * corresponding RDMALocalBlock with 207 * the information needed to perform the actual RDMA. 208 */ 209 typedef struct QEMU_PACKED RDMADestBlock { 210 uint64_t remote_host_addr; 211 uint64_t offset; 212 uint64_t length; 213 uint32_t remote_rkey; 214 uint32_t padding; 215 } RDMADestBlock; 216 217 static const char *control_desc(unsigned int rdma_control) 218 { 219 static const char *strs[] = { 220 [RDMA_CONTROL_NONE] = "NONE", 221 [RDMA_CONTROL_ERROR] = "ERROR", 222 [RDMA_CONTROL_READY] = "READY", 223 [RDMA_CONTROL_QEMU_FILE] = "QEMU FILE", 224 [RDMA_CONTROL_RAM_BLOCKS_REQUEST] = "RAM BLOCKS REQUEST", 225 [RDMA_CONTROL_RAM_BLOCKS_RESULT] = "RAM BLOCKS RESULT", 226 [RDMA_CONTROL_COMPRESS] = "COMPRESS", 227 [RDMA_CONTROL_REGISTER_REQUEST] = "REGISTER REQUEST", 228 [RDMA_CONTROL_REGISTER_RESULT] = "REGISTER RESULT", 229 [RDMA_CONTROL_REGISTER_FINISHED] = "REGISTER FINISHED", 230 [RDMA_CONTROL_UNREGISTER_REQUEST] = "UNREGISTER REQUEST", 231 [RDMA_CONTROL_UNREGISTER_FINISHED] = "UNREGISTER FINISHED", 232 }; 233 234 if (rdma_control > RDMA_CONTROL_UNREGISTER_FINISHED) { 235 return "??BAD CONTROL VALUE??"; 236 } 237 238 return strs[rdma_control]; 239 } 240 241 #if !defined(htonll) 242 static uint64_t htonll(uint64_t v) 243 { 244 union { uint32_t lv[2]; uint64_t llv; } u; 245 u.lv[0] = htonl(v >> 32); 246 u.lv[1] = htonl(v & 0xFFFFFFFFULL); 247 return u.llv; 248 } 249 #endif 250 251 #if !defined(ntohll) 252 static uint64_t ntohll(uint64_t v) 253 { 254 union { uint32_t lv[2]; uint64_t llv; } u; 255 u.llv = v; 256 return ((uint64_t)ntohl(u.lv[0]) << 32) | (uint64_t) ntohl(u.lv[1]); 257 } 258 #endif 259 260 static void dest_block_to_network(RDMADestBlock *db) 261 { 262 db->remote_host_addr = htonll(db->remote_host_addr); 263 db->offset = htonll(db->offset); 264 db->length = htonll(db->length); 265 db->remote_rkey = htonl(db->remote_rkey); 266 } 267 268 static void network_to_dest_block(RDMADestBlock *db) 269 { 270 db->remote_host_addr = ntohll(db->remote_host_addr); 271 db->offset = ntohll(db->offset); 272 db->length = ntohll(db->length); 273 db->remote_rkey = ntohl(db->remote_rkey); 274 } 275 276 /* 277 * Virtual address of the above structures used for transmitting 278 * the RAMBlock descriptions at connection-time. 279 * This structure is *not* transmitted. 280 */ 281 typedef struct RDMALocalBlocks { 282 int nb_blocks; 283 bool init; /* main memory init complete */ 284 RDMALocalBlock *block; 285 } RDMALocalBlocks; 286 287 /* 288 * Main data structure for RDMA state. 289 * While there is only one copy of this structure being allocated right now, 290 * this is the place where one would start if you wanted to consider 291 * having more than one RDMA connection open at the same time. 292 */ 293 typedef struct RDMAContext { 294 char *host; 295 int port; 296 297 RDMAWorkRequestData wr_data[RDMA_WRID_MAX]; 298 299 /* 300 * This is used by *_exchange_send() to figure out whether or not 301 * the initial "READY" message has already been received or not. 302 * This is because other functions may potentially poll() and detect 303 * the READY message before send() does, in which case we need to 304 * know if it completed. 305 */ 306 int control_ready_expected; 307 308 /* number of outstanding writes */ 309 int nb_sent; 310 311 /* store info about current buffer so that we can 312 merge it with future sends */ 313 uint64_t current_addr; 314 uint64_t current_length; 315 /* index of ram block the current buffer belongs to */ 316 int current_index; 317 /* index of the chunk in the current ram block */ 318 int current_chunk; 319 320 bool pin_all; 321 322 /* 323 * infiniband-specific variables for opening the device 324 * and maintaining connection state and so forth. 325 * 326 * cm_id also has ibv_context, rdma_event_channel, and ibv_qp in 327 * cm_id->verbs, cm_id->channel, and cm_id->qp. 328 */ 329 struct rdma_cm_id *cm_id; /* connection manager ID */ 330 struct rdma_cm_id *listen_id; 331 bool connected; 332 333 struct ibv_context *verbs; 334 struct rdma_event_channel *channel; 335 struct ibv_qp *qp; /* queue pair */ 336 struct ibv_comp_channel *recv_comp_channel; /* recv completion channel */ 337 struct ibv_comp_channel *send_comp_channel; /* send completion channel */ 338 struct ibv_pd *pd; /* protection domain */ 339 struct ibv_cq *recv_cq; /* recvieve completion queue */ 340 struct ibv_cq *send_cq; /* send completion queue */ 341 342 /* 343 * If a previous write failed (perhaps because of a failed 344 * memory registration, then do not attempt any future work 345 * and remember the error state. 346 */ 347 bool errored; 348 bool error_reported; 349 bool received_error; 350 351 /* 352 * Description of ram blocks used throughout the code. 353 */ 354 RDMALocalBlocks local_ram_blocks; 355 RDMADestBlock *dest_blocks; 356 357 /* Index of the next RAMBlock received during block registration */ 358 unsigned int next_src_index; 359 360 /* 361 * Migration on *destination* started. 362 * Then use coroutine yield function. 363 * Source runs in a thread, so we don't care. 364 */ 365 int migration_started_on_destination; 366 367 int total_registrations; 368 int total_writes; 369 370 int unregister_current, unregister_next; 371 uint64_t unregistrations[RDMA_SIGNALED_SEND_MAX]; 372 373 GHashTable *blockmap; 374 375 /* the RDMAContext for return path */ 376 struct RDMAContext *return_path; 377 bool is_return_path; 378 } RDMAContext; 379 380 #define TYPE_QIO_CHANNEL_RDMA "qio-channel-rdma" 381 OBJECT_DECLARE_SIMPLE_TYPE(QIOChannelRDMA, QIO_CHANNEL_RDMA) 382 383 384 385 struct QIOChannelRDMA { 386 QIOChannel parent; 387 RDMAContext *rdmain; 388 RDMAContext *rdmaout; 389 QEMUFile *file; 390 bool blocking; /* XXX we don't actually honour this yet */ 391 }; 392 393 /* 394 * Main structure for IB Send/Recv control messages. 395 * This gets prepended at the beginning of every Send/Recv. 396 */ 397 typedef struct QEMU_PACKED { 398 uint32_t len; /* Total length of data portion */ 399 uint32_t type; /* which control command to perform */ 400 uint32_t repeat; /* number of commands in data portion of same type */ 401 uint32_t padding; 402 } RDMAControlHeader; 403 404 static void control_to_network(RDMAControlHeader *control) 405 { 406 control->type = htonl(control->type); 407 control->len = htonl(control->len); 408 control->repeat = htonl(control->repeat); 409 } 410 411 static void network_to_control(RDMAControlHeader *control) 412 { 413 control->type = ntohl(control->type); 414 control->len = ntohl(control->len); 415 control->repeat = ntohl(control->repeat); 416 } 417 418 /* 419 * Register a single Chunk. 420 * Information sent by the source VM to inform the dest 421 * to register an single chunk of memory before we can perform 422 * the actual RDMA operation. 423 */ 424 typedef struct QEMU_PACKED { 425 union QEMU_PACKED { 426 uint64_t current_addr; /* offset into the ram_addr_t space */ 427 uint64_t chunk; /* chunk to lookup if unregistering */ 428 } key; 429 uint32_t current_index; /* which ramblock the chunk belongs to */ 430 uint32_t padding; 431 uint64_t chunks; /* how many sequential chunks to register */ 432 } RDMARegister; 433 434 static bool rdma_errored(RDMAContext *rdma) 435 { 436 if (rdma->errored && !rdma->error_reported) { 437 error_report("RDMA is in an error state waiting migration" 438 " to abort!"); 439 rdma->error_reported = true; 440 } 441 return rdma->errored; 442 } 443 444 static void register_to_network(RDMAContext *rdma, RDMARegister *reg) 445 { 446 RDMALocalBlock *local_block; 447 local_block = &rdma->local_ram_blocks.block[reg->current_index]; 448 449 if (local_block->is_ram_block) { 450 /* 451 * current_addr as passed in is an address in the local ram_addr_t 452 * space, we need to translate this for the destination 453 */ 454 reg->key.current_addr -= local_block->offset; 455 reg->key.current_addr += rdma->dest_blocks[reg->current_index].offset; 456 } 457 reg->key.current_addr = htonll(reg->key.current_addr); 458 reg->current_index = htonl(reg->current_index); 459 reg->chunks = htonll(reg->chunks); 460 } 461 462 static void network_to_register(RDMARegister *reg) 463 { 464 reg->key.current_addr = ntohll(reg->key.current_addr); 465 reg->current_index = ntohl(reg->current_index); 466 reg->chunks = ntohll(reg->chunks); 467 } 468 469 typedef struct QEMU_PACKED { 470 uint32_t value; /* if zero, we will madvise() */ 471 uint32_t block_idx; /* which ram block index */ 472 uint64_t offset; /* Address in remote ram_addr_t space */ 473 uint64_t length; /* length of the chunk */ 474 } RDMACompress; 475 476 static void compress_to_network(RDMAContext *rdma, RDMACompress *comp) 477 { 478 comp->value = htonl(comp->value); 479 /* 480 * comp->offset as passed in is an address in the local ram_addr_t 481 * space, we need to translate this for the destination 482 */ 483 comp->offset -= rdma->local_ram_blocks.block[comp->block_idx].offset; 484 comp->offset += rdma->dest_blocks[comp->block_idx].offset; 485 comp->block_idx = htonl(comp->block_idx); 486 comp->offset = htonll(comp->offset); 487 comp->length = htonll(comp->length); 488 } 489 490 static void network_to_compress(RDMACompress *comp) 491 { 492 comp->value = ntohl(comp->value); 493 comp->block_idx = ntohl(comp->block_idx); 494 comp->offset = ntohll(comp->offset); 495 comp->length = ntohll(comp->length); 496 } 497 498 /* 499 * The result of the dest's memory registration produces an "rkey" 500 * which the source VM must reference in order to perform 501 * the RDMA operation. 502 */ 503 typedef struct QEMU_PACKED { 504 uint32_t rkey; 505 uint32_t padding; 506 uint64_t host_addr; 507 } RDMARegisterResult; 508 509 static void result_to_network(RDMARegisterResult *result) 510 { 511 result->rkey = htonl(result->rkey); 512 result->host_addr = htonll(result->host_addr); 513 }; 514 515 static void network_to_result(RDMARegisterResult *result) 516 { 517 result->rkey = ntohl(result->rkey); 518 result->host_addr = ntohll(result->host_addr); 519 }; 520 521 static int qemu_rdma_exchange_send(RDMAContext *rdma, RDMAControlHeader *head, 522 uint8_t *data, RDMAControlHeader *resp, 523 int *resp_idx, 524 int (*callback)(RDMAContext *rdma, 525 Error **errp), 526 Error **errp); 527 528 static inline uint64_t ram_chunk_index(const uint8_t *start, 529 const uint8_t *host) 530 { 531 return ((uintptr_t) host - (uintptr_t) start) >> RDMA_REG_CHUNK_SHIFT; 532 } 533 534 static inline uint8_t *ram_chunk_start(const RDMALocalBlock *rdma_ram_block, 535 uint64_t i) 536 { 537 return (uint8_t *)(uintptr_t)(rdma_ram_block->local_host_addr + 538 (i << RDMA_REG_CHUNK_SHIFT)); 539 } 540 541 static inline uint8_t *ram_chunk_end(const RDMALocalBlock *rdma_ram_block, 542 uint64_t i) 543 { 544 uint8_t *result = ram_chunk_start(rdma_ram_block, i) + 545 (1UL << RDMA_REG_CHUNK_SHIFT); 546 547 if (result > (rdma_ram_block->local_host_addr + rdma_ram_block->length)) { 548 result = rdma_ram_block->local_host_addr + rdma_ram_block->length; 549 } 550 551 return result; 552 } 553 554 static void rdma_add_block(RDMAContext *rdma, const char *block_name, 555 void *host_addr, 556 ram_addr_t block_offset, uint64_t length) 557 { 558 RDMALocalBlocks *local = &rdma->local_ram_blocks; 559 RDMALocalBlock *block; 560 RDMALocalBlock *old = local->block; 561 562 local->block = g_new0(RDMALocalBlock, local->nb_blocks + 1); 563 564 if (local->nb_blocks) { 565 if (rdma->blockmap) { 566 for (int x = 0; x < local->nb_blocks; x++) { 567 g_hash_table_remove(rdma->blockmap, 568 (void *)(uintptr_t)old[x].offset); 569 g_hash_table_insert(rdma->blockmap, 570 (void *)(uintptr_t)old[x].offset, 571 &local->block[x]); 572 } 573 } 574 memcpy(local->block, old, sizeof(RDMALocalBlock) * local->nb_blocks); 575 g_free(old); 576 } 577 578 block = &local->block[local->nb_blocks]; 579 580 block->block_name = g_strdup(block_name); 581 block->local_host_addr = host_addr; 582 block->offset = block_offset; 583 block->length = length; 584 block->index = local->nb_blocks; 585 block->src_index = ~0U; /* Filled in by the receipt of the block list */ 586 block->nb_chunks = ram_chunk_index(host_addr, host_addr + length) + 1UL; 587 block->transit_bitmap = bitmap_new(block->nb_chunks); 588 bitmap_clear(block->transit_bitmap, 0, block->nb_chunks); 589 block->unregister_bitmap = bitmap_new(block->nb_chunks); 590 bitmap_clear(block->unregister_bitmap, 0, block->nb_chunks); 591 block->remote_keys = g_new0(uint32_t, block->nb_chunks); 592 593 block->is_ram_block = local->init ? false : true; 594 595 if (rdma->blockmap) { 596 g_hash_table_insert(rdma->blockmap, (void *)(uintptr_t)block_offset, block); 597 } 598 599 trace_rdma_add_block(block_name, local->nb_blocks, 600 (uintptr_t) block->local_host_addr, 601 block->offset, block->length, 602 (uintptr_t) (block->local_host_addr + block->length), 603 BITS_TO_LONGS(block->nb_chunks) * 604 sizeof(unsigned long) * 8, 605 block->nb_chunks); 606 607 local->nb_blocks++; 608 } 609 610 /* 611 * Memory regions need to be registered with the device and queue pairs setup 612 * in advanced before the migration starts. This tells us where the RAM blocks 613 * are so that we can register them individually. 614 */ 615 static int qemu_rdma_init_one_block(RAMBlock *rb, void *opaque) 616 { 617 const char *block_name = qemu_ram_get_idstr(rb); 618 void *host_addr = qemu_ram_get_host_addr(rb); 619 ram_addr_t block_offset = qemu_ram_get_offset(rb); 620 ram_addr_t length = qemu_ram_get_used_length(rb); 621 rdma_add_block(opaque, block_name, host_addr, block_offset, length); 622 return 0; 623 } 624 625 /* 626 * Identify the RAMBlocks and their quantity. They will be references to 627 * identify chunk boundaries inside each RAMBlock and also be referenced 628 * during dynamic page registration. 629 */ 630 static void qemu_rdma_init_ram_blocks(RDMAContext *rdma) 631 { 632 RDMALocalBlocks *local = &rdma->local_ram_blocks; 633 int ret; 634 635 assert(rdma->blockmap == NULL); 636 memset(local, 0, sizeof *local); 637 ret = foreach_not_ignored_block(qemu_rdma_init_one_block, rdma); 638 assert(!ret); 639 trace_qemu_rdma_init_ram_blocks(local->nb_blocks); 640 rdma->dest_blocks = g_new0(RDMADestBlock, 641 rdma->local_ram_blocks.nb_blocks); 642 local->init = true; 643 } 644 645 /* 646 * Note: If used outside of cleanup, the caller must ensure that the destination 647 * block structures are also updated 648 */ 649 static void rdma_delete_block(RDMAContext *rdma, RDMALocalBlock *block) 650 { 651 RDMALocalBlocks *local = &rdma->local_ram_blocks; 652 RDMALocalBlock *old = local->block; 653 654 if (rdma->blockmap) { 655 g_hash_table_remove(rdma->blockmap, (void *)(uintptr_t)block->offset); 656 } 657 if (block->pmr) { 658 for (int j = 0; j < block->nb_chunks; j++) { 659 if (!block->pmr[j]) { 660 continue; 661 } 662 ibv_dereg_mr(block->pmr[j]); 663 rdma->total_registrations--; 664 } 665 g_free(block->pmr); 666 block->pmr = NULL; 667 } 668 669 if (block->mr) { 670 ibv_dereg_mr(block->mr); 671 rdma->total_registrations--; 672 block->mr = NULL; 673 } 674 675 g_free(block->transit_bitmap); 676 block->transit_bitmap = NULL; 677 678 g_free(block->unregister_bitmap); 679 block->unregister_bitmap = NULL; 680 681 g_free(block->remote_keys); 682 block->remote_keys = NULL; 683 684 g_free(block->block_name); 685 block->block_name = NULL; 686 687 if (rdma->blockmap) { 688 for (int x = 0; x < local->nb_blocks; x++) { 689 g_hash_table_remove(rdma->blockmap, 690 (void *)(uintptr_t)old[x].offset); 691 } 692 } 693 694 if (local->nb_blocks > 1) { 695 696 local->block = g_new0(RDMALocalBlock, local->nb_blocks - 1); 697 698 if (block->index) { 699 memcpy(local->block, old, sizeof(RDMALocalBlock) * block->index); 700 } 701 702 if (block->index < (local->nb_blocks - 1)) { 703 memcpy(local->block + block->index, old + (block->index + 1), 704 sizeof(RDMALocalBlock) * 705 (local->nb_blocks - (block->index + 1))); 706 for (int x = block->index; x < local->nb_blocks - 1; x++) { 707 local->block[x].index--; 708 } 709 } 710 } else { 711 assert(block == local->block); 712 local->block = NULL; 713 } 714 715 trace_rdma_delete_block(block, (uintptr_t)block->local_host_addr, 716 block->offset, block->length, 717 (uintptr_t)(block->local_host_addr + block->length), 718 BITS_TO_LONGS(block->nb_chunks) * 719 sizeof(unsigned long) * 8, block->nb_chunks); 720 721 g_free(old); 722 723 local->nb_blocks--; 724 725 if (local->nb_blocks && rdma->blockmap) { 726 for (int x = 0; x < local->nb_blocks; x++) { 727 g_hash_table_insert(rdma->blockmap, 728 (void *)(uintptr_t)local->block[x].offset, 729 &local->block[x]); 730 } 731 } 732 } 733 734 /* 735 * Trace RDMA device open, with device details. 736 */ 737 static void qemu_rdma_dump_id(const char *who, struct ibv_context *verbs) 738 { 739 struct ibv_port_attr port; 740 741 if (ibv_query_port(verbs, 1, &port)) { 742 trace_qemu_rdma_dump_id_failed(who); 743 return; 744 } 745 746 trace_qemu_rdma_dump_id(who, 747 verbs->device->name, 748 verbs->device->dev_name, 749 verbs->device->dev_path, 750 verbs->device->ibdev_path, 751 port.link_layer, 752 port.link_layer == IBV_LINK_LAYER_INFINIBAND ? "Infiniband" 753 : port.link_layer == IBV_LINK_LAYER_ETHERNET ? "Ethernet" 754 : "Unknown"); 755 } 756 757 /* 758 * Trace RDMA gid addressing information. 759 * Useful for understanding the RDMA device hierarchy in the kernel. 760 */ 761 static void qemu_rdma_dump_gid(const char *who, struct rdma_cm_id *id) 762 { 763 char sgid[33]; 764 char dgid[33]; 765 inet_ntop(AF_INET6, &id->route.addr.addr.ibaddr.sgid, sgid, sizeof sgid); 766 inet_ntop(AF_INET6, &id->route.addr.addr.ibaddr.dgid, dgid, sizeof dgid); 767 trace_qemu_rdma_dump_gid(who, sgid, dgid); 768 } 769 770 /* 771 * Figure out which RDMA device corresponds to the requested IP hostname 772 * Also create the initial connection manager identifiers for opening 773 * the connection. 774 */ 775 static int qemu_rdma_resolve_host(RDMAContext *rdma, Error **errp) 776 { 777 int ret; 778 struct rdma_addrinfo *res; 779 char port_str[16]; 780 struct rdma_cm_event *cm_event; 781 char ip[40] = "unknown"; 782 783 if (rdma->host == NULL || !strcmp(rdma->host, "")) { 784 error_setg(errp, "RDMA ERROR: RDMA hostname has not been set"); 785 return -1; 786 } 787 788 /* create CM channel */ 789 rdma->channel = rdma_create_event_channel(); 790 if (!rdma->channel) { 791 error_setg(errp, "RDMA ERROR: could not create CM channel"); 792 return -1; 793 } 794 795 /* create CM id */ 796 ret = rdma_create_id(rdma->channel, &rdma->cm_id, NULL, RDMA_PS_TCP); 797 if (ret < 0) { 798 error_setg(errp, "RDMA ERROR: could not create channel id"); 799 goto err_resolve_create_id; 800 } 801 802 snprintf(port_str, 16, "%d", rdma->port); 803 port_str[15] = '\0'; 804 805 ret = rdma_getaddrinfo(rdma->host, port_str, NULL, &res); 806 if (ret) { 807 error_setg(errp, "RDMA ERROR: could not rdma_getaddrinfo address %s", 808 rdma->host); 809 goto err_resolve_get_addr; 810 } 811 812 /* Try all addresses, exit loop on first success of resolving address */ 813 for (struct rdma_addrinfo *e = res; e != NULL; e = e->ai_next) { 814 815 inet_ntop(e->ai_family, 816 &((struct sockaddr_in *) e->ai_dst_addr)->sin_addr, ip, sizeof ip); 817 trace_qemu_rdma_resolve_host_trying(rdma->host, ip); 818 819 ret = rdma_resolve_addr(rdma->cm_id, NULL, e->ai_dst_addr, 820 RDMA_RESOLVE_TIMEOUT_MS); 821 if (ret >= 0) { 822 goto route; 823 } 824 } 825 826 rdma_freeaddrinfo(res); 827 error_setg(errp, "RDMA ERROR: could not resolve address %s", rdma->host); 828 goto err_resolve_get_addr; 829 830 route: 831 rdma_freeaddrinfo(res); 832 qemu_rdma_dump_gid("source_resolve_addr", rdma->cm_id); 833 834 ret = rdma_get_cm_event(rdma->channel, &cm_event); 835 if (ret < 0) { 836 error_setg(errp, "RDMA ERROR: could not perform event_addr_resolved"); 837 goto err_resolve_get_addr; 838 } 839 840 if (cm_event->event != RDMA_CM_EVENT_ADDR_RESOLVED) { 841 error_setg(errp, 842 "RDMA ERROR: result not equal to event_addr_resolved %s", 843 rdma_event_str(cm_event->event)); 844 rdma_ack_cm_event(cm_event); 845 goto err_resolve_get_addr; 846 } 847 rdma_ack_cm_event(cm_event); 848 849 /* resolve route */ 850 ret = rdma_resolve_route(rdma->cm_id, RDMA_RESOLVE_TIMEOUT_MS); 851 if (ret < 0) { 852 error_setg(errp, "RDMA ERROR: could not resolve rdma route"); 853 goto err_resolve_get_addr; 854 } 855 856 ret = rdma_get_cm_event(rdma->channel, &cm_event); 857 if (ret < 0) { 858 error_setg(errp, "RDMA ERROR: could not perform event_route_resolved"); 859 goto err_resolve_get_addr; 860 } 861 if (cm_event->event != RDMA_CM_EVENT_ROUTE_RESOLVED) { 862 error_setg(errp, "RDMA ERROR: " 863 "result not equal to event_route_resolved: %s", 864 rdma_event_str(cm_event->event)); 865 rdma_ack_cm_event(cm_event); 866 goto err_resolve_get_addr; 867 } 868 rdma_ack_cm_event(cm_event); 869 rdma->verbs = rdma->cm_id->verbs; 870 qemu_rdma_dump_id("source_resolve_host", rdma->cm_id->verbs); 871 qemu_rdma_dump_gid("source_resolve_host", rdma->cm_id); 872 return 0; 873 874 err_resolve_get_addr: 875 rdma_destroy_id(rdma->cm_id); 876 rdma->cm_id = NULL; 877 err_resolve_create_id: 878 rdma_destroy_event_channel(rdma->channel); 879 rdma->channel = NULL; 880 return -1; 881 } 882 883 /* 884 * Create protection domain and completion queues 885 */ 886 static int qemu_rdma_alloc_pd_cq(RDMAContext *rdma, Error **errp) 887 { 888 /* allocate pd */ 889 rdma->pd = ibv_alloc_pd(rdma->verbs); 890 if (!rdma->pd) { 891 error_setg(errp, "failed to allocate protection domain"); 892 return -1; 893 } 894 895 /* create receive completion channel */ 896 rdma->recv_comp_channel = ibv_create_comp_channel(rdma->verbs); 897 if (!rdma->recv_comp_channel) { 898 error_setg(errp, "failed to allocate receive completion channel"); 899 goto err_alloc_pd_cq; 900 } 901 902 /* 903 * Completion queue can be filled by read work requests. 904 */ 905 rdma->recv_cq = ibv_create_cq(rdma->verbs, (RDMA_SIGNALED_SEND_MAX * 3), 906 NULL, rdma->recv_comp_channel, 0); 907 if (!rdma->recv_cq) { 908 error_setg(errp, "failed to allocate receive completion queue"); 909 goto err_alloc_pd_cq; 910 } 911 912 /* create send completion channel */ 913 rdma->send_comp_channel = ibv_create_comp_channel(rdma->verbs); 914 if (!rdma->send_comp_channel) { 915 error_setg(errp, "failed to allocate send completion channel"); 916 goto err_alloc_pd_cq; 917 } 918 919 rdma->send_cq = ibv_create_cq(rdma->verbs, (RDMA_SIGNALED_SEND_MAX * 3), 920 NULL, rdma->send_comp_channel, 0); 921 if (!rdma->send_cq) { 922 error_setg(errp, "failed to allocate send completion queue"); 923 goto err_alloc_pd_cq; 924 } 925 926 return 0; 927 928 err_alloc_pd_cq: 929 if (rdma->pd) { 930 ibv_dealloc_pd(rdma->pd); 931 } 932 if (rdma->recv_comp_channel) { 933 ibv_destroy_comp_channel(rdma->recv_comp_channel); 934 } 935 if (rdma->send_comp_channel) { 936 ibv_destroy_comp_channel(rdma->send_comp_channel); 937 } 938 if (rdma->recv_cq) { 939 ibv_destroy_cq(rdma->recv_cq); 940 rdma->recv_cq = NULL; 941 } 942 rdma->pd = NULL; 943 rdma->recv_comp_channel = NULL; 944 rdma->send_comp_channel = NULL; 945 return -1; 946 947 } 948 949 /* 950 * Create queue pairs. 951 */ 952 static int qemu_rdma_alloc_qp(RDMAContext *rdma) 953 { 954 struct ibv_qp_init_attr attr = { 0 }; 955 956 attr.cap.max_send_wr = RDMA_SIGNALED_SEND_MAX; 957 attr.cap.max_recv_wr = 3; 958 attr.cap.max_send_sge = 1; 959 attr.cap.max_recv_sge = 1; 960 attr.send_cq = rdma->send_cq; 961 attr.recv_cq = rdma->recv_cq; 962 attr.qp_type = IBV_QPT_RC; 963 964 if (rdma_create_qp(rdma->cm_id, rdma->pd, &attr) < 0) { 965 return -1; 966 } 967 968 rdma->qp = rdma->cm_id->qp; 969 return 0; 970 } 971 972 /* Check whether On-Demand Paging is supported by RDAM device */ 973 static bool rdma_support_odp(struct ibv_context *dev) 974 { 975 struct ibv_device_attr_ex attr = {0}; 976 977 if (ibv_query_device_ex(dev, NULL, &attr)) { 978 return false; 979 } 980 981 if (attr.odp_caps.general_caps & IBV_ODP_SUPPORT) { 982 return true; 983 } 984 985 return false; 986 } 987 988 /* 989 * ibv_advise_mr to avoid RNR NAK error as far as possible. 990 * The responder mr registering with ODP will sent RNR NAK back to 991 * the requester in the face of the page fault. 992 */ 993 static void qemu_rdma_advise_prefetch_mr(struct ibv_pd *pd, uint64_t addr, 994 uint32_t len, uint32_t lkey, 995 const char *name, bool wr) 996 { 997 #ifdef HAVE_IBV_ADVISE_MR 998 int ret; 999 int advice = wr ? IBV_ADVISE_MR_ADVICE_PREFETCH_WRITE : 1000 IBV_ADVISE_MR_ADVICE_PREFETCH; 1001 struct ibv_sge sg_list = {.lkey = lkey, .addr = addr, .length = len}; 1002 1003 ret = ibv_advise_mr(pd, advice, 1004 IBV_ADVISE_MR_FLAG_FLUSH, &sg_list, 1); 1005 /* ignore the error */ 1006 trace_qemu_rdma_advise_mr(name, len, addr, strerror(ret)); 1007 #endif 1008 } 1009 1010 static int qemu_rdma_reg_whole_ram_blocks(RDMAContext *rdma, Error **errp) 1011 { 1012 int i; 1013 RDMALocalBlocks *local = &rdma->local_ram_blocks; 1014 1015 for (i = 0; i < local->nb_blocks; i++) { 1016 int access = IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE; 1017 1018 local->block[i].mr = 1019 ibv_reg_mr(rdma->pd, 1020 local->block[i].local_host_addr, 1021 local->block[i].length, access 1022 ); 1023 /* 1024 * ibv_reg_mr() is not documented to set errno. If it does, 1025 * it's somebody else's doc bug. If it doesn't, the use of 1026 * errno below is wrong. 1027 * TODO Find out whether ibv_reg_mr() sets errno. 1028 */ 1029 if (!local->block[i].mr && 1030 errno == ENOTSUP && rdma_support_odp(rdma->verbs)) { 1031 access |= IBV_ACCESS_ON_DEMAND; 1032 /* register ODP mr */ 1033 local->block[i].mr = 1034 ibv_reg_mr(rdma->pd, 1035 local->block[i].local_host_addr, 1036 local->block[i].length, access); 1037 trace_qemu_rdma_register_odp_mr(local->block[i].block_name); 1038 1039 if (local->block[i].mr) { 1040 qemu_rdma_advise_prefetch_mr(rdma->pd, 1041 (uintptr_t)local->block[i].local_host_addr, 1042 local->block[i].length, 1043 local->block[i].mr->lkey, 1044 local->block[i].block_name, 1045 true); 1046 } 1047 } 1048 1049 if (!local->block[i].mr) { 1050 error_setg_errno(errp, errno, 1051 "Failed to register local dest ram block!"); 1052 goto err; 1053 } 1054 rdma->total_registrations++; 1055 } 1056 1057 return 0; 1058 1059 err: 1060 for (i--; i >= 0; i--) { 1061 ibv_dereg_mr(local->block[i].mr); 1062 local->block[i].mr = NULL; 1063 rdma->total_registrations--; 1064 } 1065 1066 return -1; 1067 1068 } 1069 1070 /* 1071 * Find the ram block that corresponds to the page requested to be 1072 * transmitted by QEMU. 1073 * 1074 * Once the block is found, also identify which 'chunk' within that 1075 * block that the page belongs to. 1076 */ 1077 static void qemu_rdma_search_ram_block(RDMAContext *rdma, 1078 uintptr_t block_offset, 1079 uint64_t offset, 1080 uint64_t length, 1081 uint64_t *block_index, 1082 uint64_t *chunk_index) 1083 { 1084 uint64_t current_addr = block_offset + offset; 1085 RDMALocalBlock *block = g_hash_table_lookup(rdma->blockmap, 1086 (void *) block_offset); 1087 assert(block); 1088 assert(current_addr >= block->offset); 1089 assert((current_addr + length) <= (block->offset + block->length)); 1090 1091 *block_index = block->index; 1092 *chunk_index = ram_chunk_index(block->local_host_addr, 1093 block->local_host_addr + (current_addr - block->offset)); 1094 } 1095 1096 /* 1097 * Register a chunk with IB. If the chunk was already registered 1098 * previously, then skip. 1099 * 1100 * Also return the keys associated with the registration needed 1101 * to perform the actual RDMA operation. 1102 */ 1103 static int qemu_rdma_register_and_get_keys(RDMAContext *rdma, 1104 RDMALocalBlock *block, uintptr_t host_addr, 1105 uint32_t *lkey, uint32_t *rkey, int chunk, 1106 uint8_t *chunk_start, uint8_t *chunk_end) 1107 { 1108 if (block->mr) { 1109 if (lkey) { 1110 *lkey = block->mr->lkey; 1111 } 1112 if (rkey) { 1113 *rkey = block->mr->rkey; 1114 } 1115 return 0; 1116 } 1117 1118 /* allocate memory to store chunk MRs */ 1119 if (!block->pmr) { 1120 block->pmr = g_new0(struct ibv_mr *, block->nb_chunks); 1121 } 1122 1123 /* 1124 * If 'rkey', then we're the destination, so grant access to the source. 1125 * 1126 * If 'lkey', then we're the source VM, so grant access only to ourselves. 1127 */ 1128 if (!block->pmr[chunk]) { 1129 uint64_t len = chunk_end - chunk_start; 1130 int access = rkey ? IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE : 1131 0; 1132 1133 trace_qemu_rdma_register_and_get_keys(len, chunk_start); 1134 1135 block->pmr[chunk] = ibv_reg_mr(rdma->pd, chunk_start, len, access); 1136 /* 1137 * ibv_reg_mr() is not documented to set errno. If it does, 1138 * it's somebody else's doc bug. If it doesn't, the use of 1139 * errno below is wrong. 1140 * TODO Find out whether ibv_reg_mr() sets errno. 1141 */ 1142 if (!block->pmr[chunk] && 1143 errno == ENOTSUP && rdma_support_odp(rdma->verbs)) { 1144 access |= IBV_ACCESS_ON_DEMAND; 1145 /* register ODP mr */ 1146 block->pmr[chunk] = ibv_reg_mr(rdma->pd, chunk_start, len, access); 1147 trace_qemu_rdma_register_odp_mr(block->block_name); 1148 1149 if (block->pmr[chunk]) { 1150 qemu_rdma_advise_prefetch_mr(rdma->pd, (uintptr_t)chunk_start, 1151 len, block->pmr[chunk]->lkey, 1152 block->block_name, rkey); 1153 1154 } 1155 } 1156 } 1157 if (!block->pmr[chunk]) { 1158 return -1; 1159 } 1160 rdma->total_registrations++; 1161 1162 if (lkey) { 1163 *lkey = block->pmr[chunk]->lkey; 1164 } 1165 if (rkey) { 1166 *rkey = block->pmr[chunk]->rkey; 1167 } 1168 return 0; 1169 } 1170 1171 /* 1172 * Register (at connection time) the memory used for control 1173 * channel messages. 1174 */ 1175 static int qemu_rdma_reg_control(RDMAContext *rdma, int idx) 1176 { 1177 rdma->wr_data[idx].control_mr = ibv_reg_mr(rdma->pd, 1178 rdma->wr_data[idx].control, RDMA_CONTROL_MAX_BUFFER, 1179 IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE); 1180 if (rdma->wr_data[idx].control_mr) { 1181 rdma->total_registrations++; 1182 return 0; 1183 } 1184 return -1; 1185 } 1186 1187 /* 1188 * Perform a non-optimized memory unregistration after every transfer 1189 * for demonstration purposes, only if pin-all is not requested. 1190 * 1191 * Potential optimizations: 1192 * 1. Start a new thread to run this function continuously 1193 - for bit clearing 1194 - and for receipt of unregister messages 1195 * 2. Use an LRU. 1196 * 3. Use workload hints. 1197 */ 1198 static int qemu_rdma_unregister_waiting(RDMAContext *rdma) 1199 { 1200 Error *err = NULL; 1201 1202 while (rdma->unregistrations[rdma->unregister_current]) { 1203 int ret; 1204 uint64_t wr_id = rdma->unregistrations[rdma->unregister_current]; 1205 uint64_t chunk = 1206 (wr_id & RDMA_WRID_CHUNK_MASK) >> RDMA_WRID_CHUNK_SHIFT; 1207 uint64_t index = 1208 (wr_id & RDMA_WRID_BLOCK_MASK) >> RDMA_WRID_BLOCK_SHIFT; 1209 RDMALocalBlock *block = 1210 &(rdma->local_ram_blocks.block[index]); 1211 RDMARegister reg = { .current_index = index }; 1212 RDMAControlHeader resp = { .type = RDMA_CONTROL_UNREGISTER_FINISHED, 1213 }; 1214 RDMAControlHeader head = { .len = sizeof(RDMARegister), 1215 .type = RDMA_CONTROL_UNREGISTER_REQUEST, 1216 .repeat = 1, 1217 }; 1218 1219 trace_qemu_rdma_unregister_waiting_proc(chunk, 1220 rdma->unregister_current); 1221 1222 rdma->unregistrations[rdma->unregister_current] = 0; 1223 rdma->unregister_current++; 1224 1225 if (rdma->unregister_current == RDMA_SIGNALED_SEND_MAX) { 1226 rdma->unregister_current = 0; 1227 } 1228 1229 1230 /* 1231 * Unregistration is speculative (because migration is single-threaded 1232 * and we cannot break the protocol's inifinband message ordering). 1233 * Thus, if the memory is currently being used for transmission, 1234 * then abort the attempt to unregister and try again 1235 * later the next time a completion is received for this memory. 1236 */ 1237 clear_bit(chunk, block->unregister_bitmap); 1238 1239 if (test_bit(chunk, block->transit_bitmap)) { 1240 trace_qemu_rdma_unregister_waiting_inflight(chunk); 1241 continue; 1242 } 1243 1244 trace_qemu_rdma_unregister_waiting_send(chunk); 1245 1246 ret = ibv_dereg_mr(block->pmr[chunk]); 1247 block->pmr[chunk] = NULL; 1248 block->remote_keys[chunk] = 0; 1249 1250 if (ret != 0) { 1251 error_report("unregistration chunk failed: %s", 1252 strerror(ret)); 1253 return -1; 1254 } 1255 rdma->total_registrations--; 1256 1257 reg.key.chunk = chunk; 1258 register_to_network(rdma, ®); 1259 ret = qemu_rdma_exchange_send(rdma, &head, (uint8_t *) ®, 1260 &resp, NULL, NULL, &err); 1261 if (ret < 0) { 1262 error_report_err(err); 1263 return -1; 1264 } 1265 1266 trace_qemu_rdma_unregister_waiting_complete(chunk); 1267 } 1268 1269 return 0; 1270 } 1271 1272 static uint64_t qemu_rdma_make_wrid(uint64_t wr_id, uint64_t index, 1273 uint64_t chunk) 1274 { 1275 uint64_t result = wr_id & RDMA_WRID_TYPE_MASK; 1276 1277 result |= (index << RDMA_WRID_BLOCK_SHIFT); 1278 result |= (chunk << RDMA_WRID_CHUNK_SHIFT); 1279 1280 return result; 1281 } 1282 1283 /* 1284 * Consult the connection manager to see a work request 1285 * (of any kind) has completed. 1286 * Return the work request ID that completed. 1287 */ 1288 static int qemu_rdma_poll(RDMAContext *rdma, struct ibv_cq *cq, 1289 uint64_t *wr_id_out, uint32_t *byte_len) 1290 { 1291 int ret; 1292 struct ibv_wc wc; 1293 uint64_t wr_id; 1294 1295 ret = ibv_poll_cq(cq, 1, &wc); 1296 1297 if (!ret) { 1298 *wr_id_out = RDMA_WRID_NONE; 1299 return 0; 1300 } 1301 1302 if (ret < 0) { 1303 return -1; 1304 } 1305 1306 wr_id = wc.wr_id & RDMA_WRID_TYPE_MASK; 1307 1308 if (wc.status != IBV_WC_SUCCESS) { 1309 return -1; 1310 } 1311 1312 if (rdma->control_ready_expected && 1313 (wr_id >= RDMA_WRID_RECV_CONTROL)) { 1314 trace_qemu_rdma_poll_recv(wr_id - RDMA_WRID_RECV_CONTROL, wr_id, 1315 rdma->nb_sent); 1316 rdma->control_ready_expected = 0; 1317 } 1318 1319 if (wr_id == RDMA_WRID_RDMA_WRITE) { 1320 uint64_t chunk = 1321 (wc.wr_id & RDMA_WRID_CHUNK_MASK) >> RDMA_WRID_CHUNK_SHIFT; 1322 uint64_t index = 1323 (wc.wr_id & RDMA_WRID_BLOCK_MASK) >> RDMA_WRID_BLOCK_SHIFT; 1324 RDMALocalBlock *block = &(rdma->local_ram_blocks.block[index]); 1325 1326 trace_qemu_rdma_poll_write(wr_id, rdma->nb_sent, 1327 index, chunk, block->local_host_addr, 1328 (void *)(uintptr_t)block->remote_host_addr); 1329 1330 clear_bit(chunk, block->transit_bitmap); 1331 1332 if (rdma->nb_sent > 0) { 1333 rdma->nb_sent--; 1334 } 1335 } else { 1336 trace_qemu_rdma_poll_other(wr_id, rdma->nb_sent); 1337 } 1338 1339 *wr_id_out = wc.wr_id; 1340 if (byte_len) { 1341 *byte_len = wc.byte_len; 1342 } 1343 1344 return 0; 1345 } 1346 1347 /* Wait for activity on the completion channel. 1348 * Returns 0 on success, none-0 on error. 1349 */ 1350 static int qemu_rdma_wait_comp_channel(RDMAContext *rdma, 1351 struct ibv_comp_channel *comp_channel) 1352 { 1353 struct rdma_cm_event *cm_event; 1354 1355 /* 1356 * Coroutine doesn't start until migration_fd_process_incoming() 1357 * so don't yield unless we know we're running inside of a coroutine. 1358 */ 1359 if (rdma->migration_started_on_destination && 1360 migration_incoming_get_current()->state == MIGRATION_STATUS_ACTIVE) { 1361 yield_until_fd_readable(comp_channel->fd); 1362 } else { 1363 /* This is the source side, we're in a separate thread 1364 * or destination prior to migration_fd_process_incoming() 1365 * after postcopy, the destination also in a separate thread. 1366 * we can't yield; so we have to poll the fd. 1367 * But we need to be able to handle 'cancel' or an error 1368 * without hanging forever. 1369 */ 1370 while (!rdma->errored && !rdma->received_error) { 1371 GPollFD pfds[2]; 1372 pfds[0].fd = comp_channel->fd; 1373 pfds[0].events = G_IO_IN | G_IO_HUP | G_IO_ERR; 1374 pfds[0].revents = 0; 1375 1376 pfds[1].fd = rdma->channel->fd; 1377 pfds[1].events = G_IO_IN | G_IO_HUP | G_IO_ERR; 1378 pfds[1].revents = 0; 1379 1380 /* 0.1s timeout, should be fine for a 'cancel' */ 1381 switch (qemu_poll_ns(pfds, 2, 100 * 1000 * 1000)) { 1382 case 2: 1383 case 1: /* fd active */ 1384 if (pfds[0].revents) { 1385 return 0; 1386 } 1387 1388 if (pfds[1].revents) { 1389 if (rdma_get_cm_event(rdma->channel, &cm_event) < 0) { 1390 return -1; 1391 } 1392 1393 if (cm_event->event == RDMA_CM_EVENT_DISCONNECTED || 1394 cm_event->event == RDMA_CM_EVENT_DEVICE_REMOVAL) { 1395 rdma_ack_cm_event(cm_event); 1396 return -1; 1397 } 1398 rdma_ack_cm_event(cm_event); 1399 } 1400 break; 1401 1402 case 0: /* Timeout, go around again */ 1403 break; 1404 1405 default: /* Error of some type - 1406 * I don't trust errno from qemu_poll_ns 1407 */ 1408 return -1; 1409 } 1410 1411 if (migrate_get_current()->state == MIGRATION_STATUS_CANCELLING) { 1412 /* Bail out and let the cancellation happen */ 1413 return -1; 1414 } 1415 } 1416 } 1417 1418 if (rdma->received_error) { 1419 return -1; 1420 } 1421 return -rdma->errored; 1422 } 1423 1424 static struct ibv_comp_channel *to_channel(RDMAContext *rdma, uint64_t wrid) 1425 { 1426 return wrid < RDMA_WRID_RECV_CONTROL ? rdma->send_comp_channel : 1427 rdma->recv_comp_channel; 1428 } 1429 1430 static struct ibv_cq *to_cq(RDMAContext *rdma, uint64_t wrid) 1431 { 1432 return wrid < RDMA_WRID_RECV_CONTROL ? rdma->send_cq : rdma->recv_cq; 1433 } 1434 1435 /* 1436 * Block until the next work request has completed. 1437 * 1438 * First poll to see if a work request has already completed, 1439 * otherwise block. 1440 * 1441 * If we encounter completed work requests for IDs other than 1442 * the one we're interested in, then that's generally an error. 1443 * 1444 * The only exception is actual RDMA Write completions. These 1445 * completions only need to be recorded, but do not actually 1446 * need further processing. 1447 */ 1448 static int qemu_rdma_block_for_wrid(RDMAContext *rdma, 1449 uint64_t wrid_requested, 1450 uint32_t *byte_len) 1451 { 1452 int num_cq_events = 0, ret; 1453 struct ibv_cq *cq; 1454 void *cq_ctx; 1455 uint64_t wr_id = RDMA_WRID_NONE, wr_id_in; 1456 struct ibv_comp_channel *ch = to_channel(rdma, wrid_requested); 1457 struct ibv_cq *poll_cq = to_cq(rdma, wrid_requested); 1458 1459 if (ibv_req_notify_cq(poll_cq, 0)) { 1460 return -1; 1461 } 1462 /* poll cq first */ 1463 while (wr_id != wrid_requested) { 1464 ret = qemu_rdma_poll(rdma, poll_cq, &wr_id_in, byte_len); 1465 if (ret < 0) { 1466 return -1; 1467 } 1468 1469 wr_id = wr_id_in & RDMA_WRID_TYPE_MASK; 1470 1471 if (wr_id == RDMA_WRID_NONE) { 1472 break; 1473 } 1474 if (wr_id != wrid_requested) { 1475 trace_qemu_rdma_block_for_wrid_miss(wrid_requested, wr_id); 1476 } 1477 } 1478 1479 if (wr_id == wrid_requested) { 1480 return 0; 1481 } 1482 1483 while (1) { 1484 ret = qemu_rdma_wait_comp_channel(rdma, ch); 1485 if (ret < 0) { 1486 goto err_block_for_wrid; 1487 } 1488 1489 ret = ibv_get_cq_event(ch, &cq, &cq_ctx); 1490 if (ret < 0) { 1491 goto err_block_for_wrid; 1492 } 1493 1494 num_cq_events++; 1495 1496 if (ibv_req_notify_cq(cq, 0)) { 1497 goto err_block_for_wrid; 1498 } 1499 1500 while (wr_id != wrid_requested) { 1501 ret = qemu_rdma_poll(rdma, poll_cq, &wr_id_in, byte_len); 1502 if (ret < 0) { 1503 goto err_block_for_wrid; 1504 } 1505 1506 wr_id = wr_id_in & RDMA_WRID_TYPE_MASK; 1507 1508 if (wr_id == RDMA_WRID_NONE) { 1509 break; 1510 } 1511 if (wr_id != wrid_requested) { 1512 trace_qemu_rdma_block_for_wrid_miss(wrid_requested, wr_id); 1513 } 1514 } 1515 1516 if (wr_id == wrid_requested) { 1517 goto success_block_for_wrid; 1518 } 1519 } 1520 1521 success_block_for_wrid: 1522 if (num_cq_events) { 1523 ibv_ack_cq_events(cq, num_cq_events); 1524 } 1525 return 0; 1526 1527 err_block_for_wrid: 1528 if (num_cq_events) { 1529 ibv_ack_cq_events(cq, num_cq_events); 1530 } 1531 1532 rdma->errored = true; 1533 return -1; 1534 } 1535 1536 /* 1537 * Post a SEND message work request for the control channel 1538 * containing some data and block until the post completes. 1539 */ 1540 static int qemu_rdma_post_send_control(RDMAContext *rdma, uint8_t *buf, 1541 RDMAControlHeader *head, 1542 Error **errp) 1543 { 1544 int ret; 1545 RDMAWorkRequestData *wr = &rdma->wr_data[RDMA_WRID_CONTROL]; 1546 struct ibv_send_wr *bad_wr; 1547 struct ibv_sge sge = { 1548 .addr = (uintptr_t)(wr->control), 1549 .length = head->len + sizeof(RDMAControlHeader), 1550 .lkey = wr->control_mr->lkey, 1551 }; 1552 struct ibv_send_wr send_wr = { 1553 .wr_id = RDMA_WRID_SEND_CONTROL, 1554 .opcode = IBV_WR_SEND, 1555 .send_flags = IBV_SEND_SIGNALED, 1556 .sg_list = &sge, 1557 .num_sge = 1, 1558 }; 1559 1560 trace_qemu_rdma_post_send_control(control_desc(head->type)); 1561 1562 /* 1563 * We don't actually need to do a memcpy() in here if we used 1564 * the "sge" properly, but since we're only sending control messages 1565 * (not RAM in a performance-critical path), then its OK for now. 1566 * 1567 * The copy makes the RDMAControlHeader simpler to manipulate 1568 * for the time being. 1569 */ 1570 assert(head->len <= RDMA_CONTROL_MAX_BUFFER - sizeof(*head)); 1571 memcpy(wr->control, head, sizeof(RDMAControlHeader)); 1572 control_to_network((void *) wr->control); 1573 1574 if (buf) { 1575 memcpy(wr->control + sizeof(RDMAControlHeader), buf, head->len); 1576 } 1577 1578 1579 ret = ibv_post_send(rdma->qp, &send_wr, &bad_wr); 1580 1581 if (ret > 0) { 1582 error_setg(errp, "Failed to use post IB SEND for control"); 1583 return -1; 1584 } 1585 1586 ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_SEND_CONTROL, NULL); 1587 if (ret < 0) { 1588 error_setg(errp, "rdma migration: send polling control error"); 1589 return -1; 1590 } 1591 1592 return 0; 1593 } 1594 1595 /* 1596 * Post a RECV work request in anticipation of some future receipt 1597 * of data on the control channel. 1598 */ 1599 static int qemu_rdma_post_recv_control(RDMAContext *rdma, int idx, 1600 Error **errp) 1601 { 1602 struct ibv_recv_wr *bad_wr; 1603 struct ibv_sge sge = { 1604 .addr = (uintptr_t)(rdma->wr_data[idx].control), 1605 .length = RDMA_CONTROL_MAX_BUFFER, 1606 .lkey = rdma->wr_data[idx].control_mr->lkey, 1607 }; 1608 1609 struct ibv_recv_wr recv_wr = { 1610 .wr_id = RDMA_WRID_RECV_CONTROL + idx, 1611 .sg_list = &sge, 1612 .num_sge = 1, 1613 }; 1614 1615 1616 if (ibv_post_recv(rdma->qp, &recv_wr, &bad_wr)) { 1617 error_setg(errp, "error posting control recv"); 1618 return -1; 1619 } 1620 1621 return 0; 1622 } 1623 1624 /* 1625 * Block and wait for a RECV control channel message to arrive. 1626 */ 1627 static int qemu_rdma_exchange_get_response(RDMAContext *rdma, 1628 RDMAControlHeader *head, uint32_t expecting, int idx, 1629 Error **errp) 1630 { 1631 uint32_t byte_len; 1632 int ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RECV_CONTROL + idx, 1633 &byte_len); 1634 1635 if (ret < 0) { 1636 error_setg(errp, "rdma migration: recv polling control error!"); 1637 return -1; 1638 } 1639 1640 network_to_control((void *) rdma->wr_data[idx].control); 1641 memcpy(head, rdma->wr_data[idx].control, sizeof(RDMAControlHeader)); 1642 1643 trace_qemu_rdma_exchange_get_response_start(control_desc(expecting)); 1644 1645 if (expecting == RDMA_CONTROL_NONE) { 1646 trace_qemu_rdma_exchange_get_response_none(control_desc(head->type), 1647 head->type); 1648 } else if (head->type != expecting || head->type == RDMA_CONTROL_ERROR) { 1649 error_setg(errp, "Was expecting a %s (%d) control message" 1650 ", but got: %s (%d), length: %d", 1651 control_desc(expecting), expecting, 1652 control_desc(head->type), head->type, head->len); 1653 if (head->type == RDMA_CONTROL_ERROR) { 1654 rdma->received_error = true; 1655 } 1656 return -1; 1657 } 1658 if (head->len > RDMA_CONTROL_MAX_BUFFER - sizeof(*head)) { 1659 error_setg(errp, "too long length: %d", head->len); 1660 return -1; 1661 } 1662 if (sizeof(*head) + head->len != byte_len) { 1663 error_setg(errp, "Malformed length: %d byte_len %d", 1664 head->len, byte_len); 1665 return -1; 1666 } 1667 1668 return 0; 1669 } 1670 1671 /* 1672 * When a RECV work request has completed, the work request's 1673 * buffer is pointed at the header. 1674 * 1675 * This will advance the pointer to the data portion 1676 * of the control message of the work request's buffer that 1677 * was populated after the work request finished. 1678 */ 1679 static void qemu_rdma_move_header(RDMAContext *rdma, int idx, 1680 RDMAControlHeader *head) 1681 { 1682 rdma->wr_data[idx].control_len = head->len; 1683 rdma->wr_data[idx].control_curr = 1684 rdma->wr_data[idx].control + sizeof(RDMAControlHeader); 1685 } 1686 1687 /* 1688 * This is an 'atomic' high-level operation to deliver a single, unified 1689 * control-channel message. 1690 * 1691 * Additionally, if the user is expecting some kind of reply to this message, 1692 * they can request a 'resp' response message be filled in by posting an 1693 * additional work request on behalf of the user and waiting for an additional 1694 * completion. 1695 * 1696 * The extra (optional) response is used during registration to us from having 1697 * to perform an *additional* exchange of message just to provide a response by 1698 * instead piggy-backing on the acknowledgement. 1699 */ 1700 static int qemu_rdma_exchange_send(RDMAContext *rdma, RDMAControlHeader *head, 1701 uint8_t *data, RDMAControlHeader *resp, 1702 int *resp_idx, 1703 int (*callback)(RDMAContext *rdma, 1704 Error **errp), 1705 Error **errp) 1706 { 1707 int ret; 1708 1709 /* 1710 * Wait until the dest is ready before attempting to deliver the message 1711 * by waiting for a READY message. 1712 */ 1713 if (rdma->control_ready_expected) { 1714 RDMAControlHeader resp_ignored; 1715 1716 ret = qemu_rdma_exchange_get_response(rdma, &resp_ignored, 1717 RDMA_CONTROL_READY, 1718 RDMA_WRID_READY, errp); 1719 if (ret < 0) { 1720 return -1; 1721 } 1722 } 1723 1724 /* 1725 * If the user is expecting a response, post a WR in anticipation of it. 1726 */ 1727 if (resp) { 1728 ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_DATA, errp); 1729 if (ret < 0) { 1730 return -1; 1731 } 1732 } 1733 1734 /* 1735 * Post a WR to replace the one we just consumed for the READY message. 1736 */ 1737 ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY, errp); 1738 if (ret < 0) { 1739 return -1; 1740 } 1741 1742 /* 1743 * Deliver the control message that was requested. 1744 */ 1745 ret = qemu_rdma_post_send_control(rdma, data, head, errp); 1746 1747 if (ret < 0) { 1748 return -1; 1749 } 1750 1751 /* 1752 * If we're expecting a response, block and wait for it. 1753 */ 1754 if (resp) { 1755 if (callback) { 1756 trace_qemu_rdma_exchange_send_issue_callback(); 1757 ret = callback(rdma, errp); 1758 if (ret < 0) { 1759 return -1; 1760 } 1761 } 1762 1763 trace_qemu_rdma_exchange_send_waiting(control_desc(resp->type)); 1764 ret = qemu_rdma_exchange_get_response(rdma, resp, 1765 resp->type, RDMA_WRID_DATA, 1766 errp); 1767 1768 if (ret < 0) { 1769 return -1; 1770 } 1771 1772 qemu_rdma_move_header(rdma, RDMA_WRID_DATA, resp); 1773 if (resp_idx) { 1774 *resp_idx = RDMA_WRID_DATA; 1775 } 1776 trace_qemu_rdma_exchange_send_received(control_desc(resp->type)); 1777 } 1778 1779 rdma->control_ready_expected = 1; 1780 1781 return 0; 1782 } 1783 1784 /* 1785 * This is an 'atomic' high-level operation to receive a single, unified 1786 * control-channel message. 1787 */ 1788 static int qemu_rdma_exchange_recv(RDMAContext *rdma, RDMAControlHeader *head, 1789 uint32_t expecting, Error **errp) 1790 { 1791 RDMAControlHeader ready = { 1792 .len = 0, 1793 .type = RDMA_CONTROL_READY, 1794 .repeat = 1, 1795 }; 1796 int ret; 1797 1798 /* 1799 * Inform the source that we're ready to receive a message. 1800 */ 1801 ret = qemu_rdma_post_send_control(rdma, NULL, &ready, errp); 1802 1803 if (ret < 0) { 1804 return -1; 1805 } 1806 1807 /* 1808 * Block and wait for the message. 1809 */ 1810 ret = qemu_rdma_exchange_get_response(rdma, head, 1811 expecting, RDMA_WRID_READY, errp); 1812 1813 if (ret < 0) { 1814 return -1; 1815 } 1816 1817 qemu_rdma_move_header(rdma, RDMA_WRID_READY, head); 1818 1819 /* 1820 * Post a new RECV work request to replace the one we just consumed. 1821 */ 1822 ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY, errp); 1823 if (ret < 0) { 1824 return -1; 1825 } 1826 1827 return 0; 1828 } 1829 1830 /* 1831 * Write an actual chunk of memory using RDMA. 1832 * 1833 * If we're using dynamic registration on the dest-side, we have to 1834 * send a registration command first. 1835 */ 1836 static int qemu_rdma_write_one(RDMAContext *rdma, 1837 int current_index, uint64_t current_addr, 1838 uint64_t length, Error **errp) 1839 { 1840 struct ibv_sge sge; 1841 struct ibv_send_wr send_wr = { 0 }; 1842 struct ibv_send_wr *bad_wr; 1843 int reg_result_idx, ret, count = 0; 1844 uint64_t chunk, chunks; 1845 uint8_t *chunk_start, *chunk_end; 1846 RDMALocalBlock *block = &(rdma->local_ram_blocks.block[current_index]); 1847 RDMARegister reg; 1848 RDMARegisterResult *reg_result; 1849 RDMAControlHeader resp = { .type = RDMA_CONTROL_REGISTER_RESULT }; 1850 RDMAControlHeader head = { .len = sizeof(RDMARegister), 1851 .type = RDMA_CONTROL_REGISTER_REQUEST, 1852 .repeat = 1, 1853 }; 1854 1855 retry: 1856 sge.addr = (uintptr_t)(block->local_host_addr + 1857 (current_addr - block->offset)); 1858 sge.length = length; 1859 1860 chunk = ram_chunk_index(block->local_host_addr, 1861 (uint8_t *)(uintptr_t)sge.addr); 1862 chunk_start = ram_chunk_start(block, chunk); 1863 1864 if (block->is_ram_block) { 1865 chunks = length / (1UL << RDMA_REG_CHUNK_SHIFT); 1866 1867 if (chunks && ((length % (1UL << RDMA_REG_CHUNK_SHIFT)) == 0)) { 1868 chunks--; 1869 } 1870 } else { 1871 chunks = block->length / (1UL << RDMA_REG_CHUNK_SHIFT); 1872 1873 if (chunks && ((block->length % (1UL << RDMA_REG_CHUNK_SHIFT)) == 0)) { 1874 chunks--; 1875 } 1876 } 1877 1878 trace_qemu_rdma_write_one_top(chunks + 1, 1879 (chunks + 1) * 1880 (1UL << RDMA_REG_CHUNK_SHIFT) / 1024 / 1024); 1881 1882 chunk_end = ram_chunk_end(block, chunk + chunks); 1883 1884 1885 while (test_bit(chunk, block->transit_bitmap)) { 1886 (void)count; 1887 trace_qemu_rdma_write_one_block(count++, current_index, chunk, 1888 sge.addr, length, rdma->nb_sent, block->nb_chunks); 1889 1890 ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE, NULL); 1891 1892 if (ret < 0) { 1893 error_setg(errp, "Failed to Wait for previous write to complete " 1894 "block %d chunk %" PRIu64 1895 " current %" PRIu64 " len %" PRIu64 " %d", 1896 current_index, chunk, sge.addr, length, rdma->nb_sent); 1897 return -1; 1898 } 1899 } 1900 1901 if (!rdma->pin_all || !block->is_ram_block) { 1902 if (!block->remote_keys[chunk]) { 1903 /* 1904 * This chunk has not yet been registered, so first check to see 1905 * if the entire chunk is zero. If so, tell the other size to 1906 * memset() + madvise() the entire chunk without RDMA. 1907 */ 1908 1909 if (buffer_is_zero((void *)(uintptr_t)sge.addr, length)) { 1910 RDMACompress comp = { 1911 .offset = current_addr, 1912 .value = 0, 1913 .block_idx = current_index, 1914 .length = length, 1915 }; 1916 1917 head.len = sizeof(comp); 1918 head.type = RDMA_CONTROL_COMPRESS; 1919 1920 trace_qemu_rdma_write_one_zero(chunk, sge.length, 1921 current_index, current_addr); 1922 1923 compress_to_network(rdma, &comp); 1924 ret = qemu_rdma_exchange_send(rdma, &head, 1925 (uint8_t *) &comp, NULL, NULL, NULL, errp); 1926 1927 if (ret < 0) { 1928 return -1; 1929 } 1930 1931 /* 1932 * TODO: Here we are sending something, but we are not 1933 * accounting for anything transferred. The following is wrong: 1934 * 1935 * stat64_add(&mig_stats.rdma_bytes, sge.length); 1936 * 1937 * because we are using some kind of compression. I 1938 * would think that head.len would be the more similar 1939 * thing to a correct value. 1940 */ 1941 stat64_add(&mig_stats.zero_pages, 1942 sge.length / qemu_target_page_size()); 1943 return 1; 1944 } 1945 1946 /* 1947 * Otherwise, tell other side to register. 1948 */ 1949 reg.current_index = current_index; 1950 if (block->is_ram_block) { 1951 reg.key.current_addr = current_addr; 1952 } else { 1953 reg.key.chunk = chunk; 1954 } 1955 reg.chunks = chunks; 1956 1957 trace_qemu_rdma_write_one_sendreg(chunk, sge.length, current_index, 1958 current_addr); 1959 1960 register_to_network(rdma, ®); 1961 ret = qemu_rdma_exchange_send(rdma, &head, (uint8_t *) ®, 1962 &resp, ®_result_idx, NULL, errp); 1963 if (ret < 0) { 1964 return -1; 1965 } 1966 1967 /* try to overlap this single registration with the one we sent. */ 1968 if (qemu_rdma_register_and_get_keys(rdma, block, sge.addr, 1969 &sge.lkey, NULL, chunk, 1970 chunk_start, chunk_end)) { 1971 error_setg(errp, "cannot get lkey"); 1972 return -1; 1973 } 1974 1975 reg_result = (RDMARegisterResult *) 1976 rdma->wr_data[reg_result_idx].control_curr; 1977 1978 network_to_result(reg_result); 1979 1980 trace_qemu_rdma_write_one_recvregres(block->remote_keys[chunk], 1981 reg_result->rkey, chunk); 1982 1983 block->remote_keys[chunk] = reg_result->rkey; 1984 block->remote_host_addr = reg_result->host_addr; 1985 } else { 1986 /* already registered before */ 1987 if (qemu_rdma_register_and_get_keys(rdma, block, sge.addr, 1988 &sge.lkey, NULL, chunk, 1989 chunk_start, chunk_end)) { 1990 error_setg(errp, "cannot get lkey!"); 1991 return -1; 1992 } 1993 } 1994 1995 send_wr.wr.rdma.rkey = block->remote_keys[chunk]; 1996 } else { 1997 send_wr.wr.rdma.rkey = block->remote_rkey; 1998 1999 if (qemu_rdma_register_and_get_keys(rdma, block, sge.addr, 2000 &sge.lkey, NULL, chunk, 2001 chunk_start, chunk_end)) { 2002 error_setg(errp, "cannot get lkey!"); 2003 return -1; 2004 } 2005 } 2006 2007 /* 2008 * Encode the ram block index and chunk within this wrid. 2009 * We will use this information at the time of completion 2010 * to figure out which bitmap to check against and then which 2011 * chunk in the bitmap to look for. 2012 */ 2013 send_wr.wr_id = qemu_rdma_make_wrid(RDMA_WRID_RDMA_WRITE, 2014 current_index, chunk); 2015 2016 send_wr.opcode = IBV_WR_RDMA_WRITE; 2017 send_wr.send_flags = IBV_SEND_SIGNALED; 2018 send_wr.sg_list = &sge; 2019 send_wr.num_sge = 1; 2020 send_wr.wr.rdma.remote_addr = block->remote_host_addr + 2021 (current_addr - block->offset); 2022 2023 trace_qemu_rdma_write_one_post(chunk, sge.addr, send_wr.wr.rdma.remote_addr, 2024 sge.length); 2025 2026 /* 2027 * ibv_post_send() does not return negative error numbers, 2028 * per the specification they are positive - no idea why. 2029 */ 2030 ret = ibv_post_send(rdma->qp, &send_wr, &bad_wr); 2031 2032 if (ret == ENOMEM) { 2033 trace_qemu_rdma_write_one_queue_full(); 2034 ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE, NULL); 2035 if (ret < 0) { 2036 error_setg(errp, "rdma migration: failed to make " 2037 "room in full send queue!"); 2038 return -1; 2039 } 2040 2041 goto retry; 2042 2043 } else if (ret > 0) { 2044 error_setg_errno(errp, ret, 2045 "rdma migration: post rdma write failed"); 2046 return -1; 2047 } 2048 2049 set_bit(chunk, block->transit_bitmap); 2050 stat64_add(&mig_stats.normal_pages, sge.length / qemu_target_page_size()); 2051 /* 2052 * We are adding to transferred the amount of data written, but no 2053 * overhead at all. I will assume that RDMA is magicaly and don't 2054 * need to transfer (at least) the addresses where it wants to 2055 * write the pages. Here it looks like it should be something 2056 * like: 2057 * sizeof(send_wr) + sge.length 2058 * but this being RDMA, who knows. 2059 */ 2060 stat64_add(&mig_stats.rdma_bytes, sge.length); 2061 ram_transferred_add(sge.length); 2062 rdma->total_writes++; 2063 2064 return 0; 2065 } 2066 2067 /* 2068 * Push out any unwritten RDMA operations. 2069 * 2070 * We support sending out multiple chunks at the same time. 2071 * Not all of them need to get signaled in the completion queue. 2072 */ 2073 static int qemu_rdma_write_flush(RDMAContext *rdma, Error **errp) 2074 { 2075 int ret; 2076 2077 if (!rdma->current_length) { 2078 return 0; 2079 } 2080 2081 ret = qemu_rdma_write_one(rdma, rdma->current_index, rdma->current_addr, 2082 rdma->current_length, errp); 2083 2084 if (ret < 0) { 2085 return -1; 2086 } 2087 2088 if (ret == 0) { 2089 rdma->nb_sent++; 2090 trace_qemu_rdma_write_flush(rdma->nb_sent); 2091 } 2092 2093 rdma->current_length = 0; 2094 rdma->current_addr = 0; 2095 2096 return 0; 2097 } 2098 2099 static inline bool qemu_rdma_buffer_mergeable(RDMAContext *rdma, 2100 uint64_t offset, uint64_t len) 2101 { 2102 RDMALocalBlock *block; 2103 uint8_t *host_addr; 2104 uint8_t *chunk_end; 2105 2106 if (rdma->current_index < 0) { 2107 return false; 2108 } 2109 2110 if (rdma->current_chunk < 0) { 2111 return false; 2112 } 2113 2114 block = &(rdma->local_ram_blocks.block[rdma->current_index]); 2115 host_addr = block->local_host_addr + (offset - block->offset); 2116 chunk_end = ram_chunk_end(block, rdma->current_chunk); 2117 2118 if (rdma->current_length == 0) { 2119 return false; 2120 } 2121 2122 /* 2123 * Only merge into chunk sequentially. 2124 */ 2125 if (offset != (rdma->current_addr + rdma->current_length)) { 2126 return false; 2127 } 2128 2129 if (offset < block->offset) { 2130 return false; 2131 } 2132 2133 if ((offset + len) > (block->offset + block->length)) { 2134 return false; 2135 } 2136 2137 if ((host_addr + len) > chunk_end) { 2138 return false; 2139 } 2140 2141 return true; 2142 } 2143 2144 /* 2145 * We're not actually writing here, but doing three things: 2146 * 2147 * 1. Identify the chunk the buffer belongs to. 2148 * 2. If the chunk is full or the buffer doesn't belong to the current 2149 * chunk, then start a new chunk and flush() the old chunk. 2150 * 3. To keep the hardware busy, we also group chunks into batches 2151 * and only require that a batch gets acknowledged in the completion 2152 * queue instead of each individual chunk. 2153 */ 2154 static int qemu_rdma_write(RDMAContext *rdma, 2155 uint64_t block_offset, uint64_t offset, 2156 uint64_t len, Error **errp) 2157 { 2158 uint64_t current_addr = block_offset + offset; 2159 uint64_t index = rdma->current_index; 2160 uint64_t chunk = rdma->current_chunk; 2161 2162 /* If we cannot merge it, we flush the current buffer first. */ 2163 if (!qemu_rdma_buffer_mergeable(rdma, current_addr, len)) { 2164 if (qemu_rdma_write_flush(rdma, errp) < 0) { 2165 return -1; 2166 } 2167 rdma->current_length = 0; 2168 rdma->current_addr = current_addr; 2169 2170 qemu_rdma_search_ram_block(rdma, block_offset, 2171 offset, len, &index, &chunk); 2172 rdma->current_index = index; 2173 rdma->current_chunk = chunk; 2174 } 2175 2176 /* merge it */ 2177 rdma->current_length += len; 2178 2179 /* flush it if buffer is too large */ 2180 if (rdma->current_length >= RDMA_MERGE_MAX) { 2181 return qemu_rdma_write_flush(rdma, errp); 2182 } 2183 2184 return 0; 2185 } 2186 2187 static void qemu_rdma_cleanup(RDMAContext *rdma) 2188 { 2189 Error *err = NULL; 2190 2191 if (rdma->cm_id && rdma->connected) { 2192 if ((rdma->errored || 2193 migrate_get_current()->state == MIGRATION_STATUS_CANCELLING) && 2194 !rdma->received_error) { 2195 RDMAControlHeader head = { .len = 0, 2196 .type = RDMA_CONTROL_ERROR, 2197 .repeat = 1, 2198 }; 2199 warn_report("Early error. Sending error."); 2200 if (qemu_rdma_post_send_control(rdma, NULL, &head, &err) < 0) { 2201 warn_report_err(err); 2202 } 2203 } 2204 2205 rdma_disconnect(rdma->cm_id); 2206 trace_qemu_rdma_cleanup_disconnect(); 2207 rdma->connected = false; 2208 } 2209 2210 if (rdma->channel) { 2211 qemu_set_fd_handler(rdma->channel->fd, NULL, NULL, NULL); 2212 } 2213 g_free(rdma->dest_blocks); 2214 rdma->dest_blocks = NULL; 2215 2216 for (int i = 0; i < RDMA_WRID_MAX; i++) { 2217 if (rdma->wr_data[i].control_mr) { 2218 rdma->total_registrations--; 2219 ibv_dereg_mr(rdma->wr_data[i].control_mr); 2220 } 2221 rdma->wr_data[i].control_mr = NULL; 2222 } 2223 2224 if (rdma->local_ram_blocks.block) { 2225 while (rdma->local_ram_blocks.nb_blocks) { 2226 rdma_delete_block(rdma, &rdma->local_ram_blocks.block[0]); 2227 } 2228 } 2229 2230 if (rdma->qp) { 2231 rdma_destroy_qp(rdma->cm_id); 2232 rdma->qp = NULL; 2233 } 2234 if (rdma->recv_cq) { 2235 ibv_destroy_cq(rdma->recv_cq); 2236 rdma->recv_cq = NULL; 2237 } 2238 if (rdma->send_cq) { 2239 ibv_destroy_cq(rdma->send_cq); 2240 rdma->send_cq = NULL; 2241 } 2242 if (rdma->recv_comp_channel) { 2243 ibv_destroy_comp_channel(rdma->recv_comp_channel); 2244 rdma->recv_comp_channel = NULL; 2245 } 2246 if (rdma->send_comp_channel) { 2247 ibv_destroy_comp_channel(rdma->send_comp_channel); 2248 rdma->send_comp_channel = NULL; 2249 } 2250 if (rdma->pd) { 2251 ibv_dealloc_pd(rdma->pd); 2252 rdma->pd = NULL; 2253 } 2254 if (rdma->cm_id) { 2255 rdma_destroy_id(rdma->cm_id); 2256 rdma->cm_id = NULL; 2257 } 2258 2259 /* the destination side, listen_id and channel is shared */ 2260 if (rdma->listen_id) { 2261 if (!rdma->is_return_path) { 2262 rdma_destroy_id(rdma->listen_id); 2263 } 2264 rdma->listen_id = NULL; 2265 2266 if (rdma->channel) { 2267 if (!rdma->is_return_path) { 2268 rdma_destroy_event_channel(rdma->channel); 2269 } 2270 rdma->channel = NULL; 2271 } 2272 } 2273 2274 if (rdma->channel) { 2275 rdma_destroy_event_channel(rdma->channel); 2276 rdma->channel = NULL; 2277 } 2278 g_free(rdma->host); 2279 rdma->host = NULL; 2280 } 2281 2282 2283 static int qemu_rdma_source_init(RDMAContext *rdma, bool pin_all, Error **errp) 2284 { 2285 int ret; 2286 2287 /* 2288 * Will be validated against destination's actual capabilities 2289 * after the connect() completes. 2290 */ 2291 rdma->pin_all = pin_all; 2292 2293 ret = qemu_rdma_resolve_host(rdma, errp); 2294 if (ret < 0) { 2295 goto err_rdma_source_init; 2296 } 2297 2298 ret = qemu_rdma_alloc_pd_cq(rdma, errp); 2299 if (ret < 0) { 2300 goto err_rdma_source_init; 2301 } 2302 2303 ret = qemu_rdma_alloc_qp(rdma); 2304 if (ret < 0) { 2305 error_setg(errp, "RDMA ERROR: rdma migration: error allocating qp!"); 2306 goto err_rdma_source_init; 2307 } 2308 2309 qemu_rdma_init_ram_blocks(rdma); 2310 2311 /* Build the hash that maps from offset to RAMBlock */ 2312 rdma->blockmap = g_hash_table_new(g_direct_hash, g_direct_equal); 2313 for (int i = 0; i < rdma->local_ram_blocks.nb_blocks; i++) { 2314 g_hash_table_insert(rdma->blockmap, 2315 (void *)(uintptr_t)rdma->local_ram_blocks.block[i].offset, 2316 &rdma->local_ram_blocks.block[i]); 2317 } 2318 2319 for (int i = 0; i < RDMA_WRID_MAX; i++) { 2320 ret = qemu_rdma_reg_control(rdma, i); 2321 if (ret < 0) { 2322 error_setg(errp, "RDMA ERROR: rdma migration: error " 2323 "registering %d control!", i); 2324 goto err_rdma_source_init; 2325 } 2326 } 2327 2328 return 0; 2329 2330 err_rdma_source_init: 2331 qemu_rdma_cleanup(rdma); 2332 return -1; 2333 } 2334 2335 static int qemu_get_cm_event_timeout(RDMAContext *rdma, 2336 struct rdma_cm_event **cm_event, 2337 long msec, Error **errp) 2338 { 2339 int ret; 2340 struct pollfd poll_fd = { 2341 .fd = rdma->channel->fd, 2342 .events = POLLIN, 2343 .revents = 0 2344 }; 2345 2346 do { 2347 ret = poll(&poll_fd, 1, msec); 2348 } while (ret < 0 && errno == EINTR); 2349 2350 if (ret == 0) { 2351 error_setg(errp, "RDMA ERROR: poll cm event timeout"); 2352 return -1; 2353 } else if (ret < 0) { 2354 error_setg(errp, "RDMA ERROR: failed to poll cm event, errno=%i", 2355 errno); 2356 return -1; 2357 } else if (poll_fd.revents & POLLIN) { 2358 if (rdma_get_cm_event(rdma->channel, cm_event) < 0) { 2359 error_setg(errp, "RDMA ERROR: failed to get cm event"); 2360 return -1; 2361 } 2362 return 0; 2363 } else { 2364 error_setg(errp, "RDMA ERROR: no POLLIN event, revent=%x", 2365 poll_fd.revents); 2366 return -1; 2367 } 2368 } 2369 2370 static int qemu_rdma_connect(RDMAContext *rdma, bool return_path, 2371 Error **errp) 2372 { 2373 RDMACapabilities cap = { 2374 .version = RDMA_CONTROL_VERSION_CURRENT, 2375 .flags = 0, 2376 }; 2377 struct rdma_conn_param conn_param = { .initiator_depth = 2, 2378 .retry_count = 5, 2379 .private_data = &cap, 2380 .private_data_len = sizeof(cap), 2381 }; 2382 struct rdma_cm_event *cm_event; 2383 int ret; 2384 2385 /* 2386 * Only negotiate the capability with destination if the user 2387 * on the source first requested the capability. 2388 */ 2389 if (rdma->pin_all) { 2390 trace_qemu_rdma_connect_pin_all_requested(); 2391 cap.flags |= RDMA_CAPABILITY_PIN_ALL; 2392 } 2393 2394 caps_to_network(&cap); 2395 2396 ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY, errp); 2397 if (ret < 0) { 2398 goto err_rdma_source_connect; 2399 } 2400 2401 ret = rdma_connect(rdma->cm_id, &conn_param); 2402 if (ret < 0) { 2403 error_setg_errno(errp, errno, 2404 "RDMA ERROR: connecting to destination!"); 2405 goto err_rdma_source_connect; 2406 } 2407 2408 if (return_path) { 2409 ret = qemu_get_cm_event_timeout(rdma, &cm_event, 5000, errp); 2410 } else { 2411 ret = rdma_get_cm_event(rdma->channel, &cm_event); 2412 if (ret < 0) { 2413 error_setg_errno(errp, errno, 2414 "RDMA ERROR: failed to get cm event"); 2415 } 2416 } 2417 if (ret < 0) { 2418 goto err_rdma_source_connect; 2419 } 2420 2421 if (cm_event->event != RDMA_CM_EVENT_ESTABLISHED) { 2422 error_setg(errp, "RDMA ERROR: connecting to destination!"); 2423 rdma_ack_cm_event(cm_event); 2424 goto err_rdma_source_connect; 2425 } 2426 rdma->connected = true; 2427 2428 memcpy(&cap, cm_event->param.conn.private_data, sizeof(cap)); 2429 network_to_caps(&cap); 2430 2431 /* 2432 * Verify that the *requested* capabilities are supported by the destination 2433 * and disable them otherwise. 2434 */ 2435 if (rdma->pin_all && !(cap.flags & RDMA_CAPABILITY_PIN_ALL)) { 2436 warn_report("RDMA: Server cannot support pinning all memory. " 2437 "Will register memory dynamically."); 2438 rdma->pin_all = false; 2439 } 2440 2441 trace_qemu_rdma_connect_pin_all_outcome(rdma->pin_all); 2442 2443 rdma_ack_cm_event(cm_event); 2444 2445 rdma->control_ready_expected = 1; 2446 rdma->nb_sent = 0; 2447 return 0; 2448 2449 err_rdma_source_connect: 2450 qemu_rdma_cleanup(rdma); 2451 return -1; 2452 } 2453 2454 static int qemu_rdma_dest_init(RDMAContext *rdma, Error **errp) 2455 { 2456 int ret; 2457 struct rdma_cm_id *listen_id; 2458 char ip[40] = "unknown"; 2459 struct rdma_addrinfo *res, *e; 2460 char port_str[16]; 2461 int reuse = 1; 2462 2463 for (int i = 0; i < RDMA_WRID_MAX; i++) { 2464 rdma->wr_data[i].control_len = 0; 2465 rdma->wr_data[i].control_curr = NULL; 2466 } 2467 2468 if (!rdma->host || !rdma->host[0]) { 2469 error_setg(errp, "RDMA ERROR: RDMA host is not set!"); 2470 rdma->errored = true; 2471 return -1; 2472 } 2473 /* create CM channel */ 2474 rdma->channel = rdma_create_event_channel(); 2475 if (!rdma->channel) { 2476 error_setg(errp, "RDMA ERROR: could not create rdma event channel"); 2477 rdma->errored = true; 2478 return -1; 2479 } 2480 2481 /* create CM id */ 2482 ret = rdma_create_id(rdma->channel, &listen_id, NULL, RDMA_PS_TCP); 2483 if (ret < 0) { 2484 error_setg(errp, "RDMA ERROR: could not create cm_id!"); 2485 goto err_dest_init_create_listen_id; 2486 } 2487 2488 snprintf(port_str, 16, "%d", rdma->port); 2489 port_str[15] = '\0'; 2490 2491 ret = rdma_getaddrinfo(rdma->host, port_str, NULL, &res); 2492 if (ret) { 2493 error_setg(errp, "RDMA ERROR: could not rdma_getaddrinfo address %s", 2494 rdma->host); 2495 goto err_dest_init_bind_addr; 2496 } 2497 2498 ret = rdma_set_option(listen_id, RDMA_OPTION_ID, RDMA_OPTION_ID_REUSEADDR, 2499 &reuse, sizeof reuse); 2500 if (ret < 0) { 2501 error_setg(errp, "RDMA ERROR: Error: could not set REUSEADDR option"); 2502 goto err_dest_init_bind_addr; 2503 } 2504 2505 /* Try all addresses */ 2506 for (e = res; e != NULL; e = e->ai_next) { 2507 2508 inet_ntop(e->ai_family, 2509 &((struct sockaddr_in *) e->ai_dst_addr)->sin_addr, ip, sizeof ip); 2510 trace_qemu_rdma_dest_init_trying(rdma->host, ip); 2511 ret = rdma_bind_addr(listen_id, e->ai_dst_addr); 2512 if (ret < 0) { 2513 continue; 2514 } 2515 break; 2516 } 2517 2518 rdma_freeaddrinfo(res); 2519 if (!e) { 2520 error_setg(errp, "RDMA ERROR: Error: could not rdma_bind_addr!"); 2521 goto err_dest_init_bind_addr; 2522 } 2523 2524 rdma->listen_id = listen_id; 2525 qemu_rdma_dump_gid("dest_init", listen_id); 2526 return 0; 2527 2528 err_dest_init_bind_addr: 2529 rdma_destroy_id(listen_id); 2530 err_dest_init_create_listen_id: 2531 rdma_destroy_event_channel(rdma->channel); 2532 rdma->channel = NULL; 2533 rdma->errored = true; 2534 return -1; 2535 2536 } 2537 2538 static void qemu_rdma_return_path_dest_init(RDMAContext *rdma_return_path, 2539 RDMAContext *rdma) 2540 { 2541 for (int i = 0; i < RDMA_WRID_MAX; i++) { 2542 rdma_return_path->wr_data[i].control_len = 0; 2543 rdma_return_path->wr_data[i].control_curr = NULL; 2544 } 2545 2546 /*the CM channel and CM id is shared*/ 2547 rdma_return_path->channel = rdma->channel; 2548 rdma_return_path->listen_id = rdma->listen_id; 2549 2550 rdma->return_path = rdma_return_path; 2551 rdma_return_path->return_path = rdma; 2552 rdma_return_path->is_return_path = true; 2553 } 2554 2555 static RDMAContext *qemu_rdma_data_init(InetSocketAddress *saddr, Error **errp) 2556 { 2557 RDMAContext *rdma = NULL; 2558 2559 rdma = g_new0(RDMAContext, 1); 2560 rdma->current_index = -1; 2561 rdma->current_chunk = -1; 2562 2563 rdma->host = g_strdup(saddr->host); 2564 rdma->port = atoi(saddr->port); 2565 return rdma; 2566 } 2567 2568 /* 2569 * QEMUFile interface to the control channel. 2570 * SEND messages for control only. 2571 * VM's ram is handled with regular RDMA messages. 2572 */ 2573 static ssize_t qio_channel_rdma_writev(QIOChannel *ioc, 2574 const struct iovec *iov, 2575 size_t niov, 2576 int *fds, 2577 size_t nfds, 2578 int flags, 2579 Error **errp) 2580 { 2581 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc); 2582 RDMAContext *rdma; 2583 int ret; 2584 ssize_t done = 0; 2585 size_t len; 2586 2587 RCU_READ_LOCK_GUARD(); 2588 rdma = qatomic_rcu_read(&rioc->rdmaout); 2589 2590 if (!rdma) { 2591 error_setg(errp, "RDMA control channel output is not set"); 2592 return -1; 2593 } 2594 2595 if (rdma->errored) { 2596 error_setg(errp, 2597 "RDMA is in an error state waiting migration to abort!"); 2598 return -1; 2599 } 2600 2601 /* 2602 * Push out any writes that 2603 * we're queued up for VM's ram. 2604 */ 2605 ret = qemu_rdma_write_flush(rdma, errp); 2606 if (ret < 0) { 2607 rdma->errored = true; 2608 return -1; 2609 } 2610 2611 for (int i = 0; i < niov; i++) { 2612 size_t remaining = iov[i].iov_len; 2613 uint8_t * data = (void *)iov[i].iov_base; 2614 while (remaining) { 2615 RDMAControlHeader head = {}; 2616 2617 len = MIN(remaining, RDMA_SEND_INCREMENT); 2618 remaining -= len; 2619 2620 head.len = len; 2621 head.type = RDMA_CONTROL_QEMU_FILE; 2622 2623 ret = qemu_rdma_exchange_send(rdma, &head, 2624 data, NULL, NULL, NULL, errp); 2625 2626 if (ret < 0) { 2627 rdma->errored = true; 2628 return -1; 2629 } 2630 2631 data += len; 2632 done += len; 2633 } 2634 } 2635 2636 return done; 2637 } 2638 2639 static size_t qemu_rdma_fill(RDMAContext *rdma, uint8_t *buf, 2640 size_t size, int idx) 2641 { 2642 size_t len = 0; 2643 2644 if (rdma->wr_data[idx].control_len) { 2645 trace_qemu_rdma_fill(rdma->wr_data[idx].control_len, size); 2646 2647 len = MIN(size, rdma->wr_data[idx].control_len); 2648 memcpy(buf, rdma->wr_data[idx].control_curr, len); 2649 rdma->wr_data[idx].control_curr += len; 2650 rdma->wr_data[idx].control_len -= len; 2651 } 2652 2653 return len; 2654 } 2655 2656 /* 2657 * QEMUFile interface to the control channel. 2658 * RDMA links don't use bytestreams, so we have to 2659 * return bytes to QEMUFile opportunistically. 2660 */ 2661 static ssize_t qio_channel_rdma_readv(QIOChannel *ioc, 2662 const struct iovec *iov, 2663 size_t niov, 2664 int **fds, 2665 size_t *nfds, 2666 int flags, 2667 Error **errp) 2668 { 2669 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc); 2670 RDMAContext *rdma; 2671 RDMAControlHeader head; 2672 int ret; 2673 ssize_t done = 0; 2674 size_t len; 2675 2676 RCU_READ_LOCK_GUARD(); 2677 rdma = qatomic_rcu_read(&rioc->rdmain); 2678 2679 if (!rdma) { 2680 error_setg(errp, "RDMA control channel input is not set"); 2681 return -1; 2682 } 2683 2684 if (rdma->errored) { 2685 error_setg(errp, 2686 "RDMA is in an error state waiting migration to abort!"); 2687 return -1; 2688 } 2689 2690 for (int i = 0; i < niov; i++) { 2691 size_t want = iov[i].iov_len; 2692 uint8_t *data = (void *)iov[i].iov_base; 2693 2694 /* 2695 * First, we hold on to the last SEND message we 2696 * were given and dish out the bytes until we run 2697 * out of bytes. 2698 */ 2699 len = qemu_rdma_fill(rdma, data, want, 0); 2700 done += len; 2701 want -= len; 2702 /* Got what we needed, so go to next iovec */ 2703 if (want == 0) { 2704 continue; 2705 } 2706 2707 /* If we got any data so far, then don't wait 2708 * for more, just return what we have */ 2709 if (done > 0) { 2710 break; 2711 } 2712 2713 2714 /* We've got nothing at all, so lets wait for 2715 * more to arrive 2716 */ 2717 ret = qemu_rdma_exchange_recv(rdma, &head, RDMA_CONTROL_QEMU_FILE, 2718 errp); 2719 2720 if (ret < 0) { 2721 rdma->errored = true; 2722 return -1; 2723 } 2724 2725 /* 2726 * SEND was received with new bytes, now try again. 2727 */ 2728 len = qemu_rdma_fill(rdma, data, want, 0); 2729 done += len; 2730 want -= len; 2731 2732 /* Still didn't get enough, so lets just return */ 2733 if (want) { 2734 if (done == 0) { 2735 return QIO_CHANNEL_ERR_BLOCK; 2736 } else { 2737 break; 2738 } 2739 } 2740 } 2741 return done; 2742 } 2743 2744 /* 2745 * Block until all the outstanding chunks have been delivered by the hardware. 2746 */ 2747 static int qemu_rdma_drain_cq(RDMAContext *rdma) 2748 { 2749 Error *err = NULL; 2750 2751 if (qemu_rdma_write_flush(rdma, &err) < 0) { 2752 error_report_err(err); 2753 return -1; 2754 } 2755 2756 while (rdma->nb_sent) { 2757 if (qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE, NULL) < 0) { 2758 error_report("rdma migration: complete polling error!"); 2759 return -1; 2760 } 2761 } 2762 2763 qemu_rdma_unregister_waiting(rdma); 2764 2765 return 0; 2766 } 2767 2768 2769 static int qio_channel_rdma_set_blocking(QIOChannel *ioc, 2770 bool blocking, 2771 Error **errp) 2772 { 2773 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc); 2774 /* XXX we should make readv/writev actually honour this :-) */ 2775 rioc->blocking = blocking; 2776 return 0; 2777 } 2778 2779 2780 typedef struct QIOChannelRDMASource QIOChannelRDMASource; 2781 struct QIOChannelRDMASource { 2782 GSource parent; 2783 QIOChannelRDMA *rioc; 2784 GIOCondition condition; 2785 }; 2786 2787 static gboolean 2788 qio_channel_rdma_source_prepare(GSource *source, 2789 gint *timeout) 2790 { 2791 QIOChannelRDMASource *rsource = (QIOChannelRDMASource *)source; 2792 RDMAContext *rdma; 2793 GIOCondition cond = 0; 2794 *timeout = -1; 2795 2796 RCU_READ_LOCK_GUARD(); 2797 if (rsource->condition == G_IO_IN) { 2798 rdma = qatomic_rcu_read(&rsource->rioc->rdmain); 2799 } else { 2800 rdma = qatomic_rcu_read(&rsource->rioc->rdmaout); 2801 } 2802 2803 if (!rdma) { 2804 error_report("RDMAContext is NULL when prepare Gsource"); 2805 return FALSE; 2806 } 2807 2808 if (rdma->wr_data[0].control_len) { 2809 cond |= G_IO_IN; 2810 } 2811 cond |= G_IO_OUT; 2812 2813 return cond & rsource->condition; 2814 } 2815 2816 static gboolean 2817 qio_channel_rdma_source_check(GSource *source) 2818 { 2819 QIOChannelRDMASource *rsource = (QIOChannelRDMASource *)source; 2820 RDMAContext *rdma; 2821 GIOCondition cond = 0; 2822 2823 RCU_READ_LOCK_GUARD(); 2824 if (rsource->condition == G_IO_IN) { 2825 rdma = qatomic_rcu_read(&rsource->rioc->rdmain); 2826 } else { 2827 rdma = qatomic_rcu_read(&rsource->rioc->rdmaout); 2828 } 2829 2830 if (!rdma) { 2831 error_report("RDMAContext is NULL when check Gsource"); 2832 return FALSE; 2833 } 2834 2835 if (rdma->wr_data[0].control_len) { 2836 cond |= G_IO_IN; 2837 } 2838 cond |= G_IO_OUT; 2839 2840 return cond & rsource->condition; 2841 } 2842 2843 static gboolean 2844 qio_channel_rdma_source_dispatch(GSource *source, 2845 GSourceFunc callback, 2846 gpointer user_data) 2847 { 2848 QIOChannelFunc func = (QIOChannelFunc)callback; 2849 QIOChannelRDMASource *rsource = (QIOChannelRDMASource *)source; 2850 RDMAContext *rdma; 2851 GIOCondition cond = 0; 2852 2853 RCU_READ_LOCK_GUARD(); 2854 if (rsource->condition == G_IO_IN) { 2855 rdma = qatomic_rcu_read(&rsource->rioc->rdmain); 2856 } else { 2857 rdma = qatomic_rcu_read(&rsource->rioc->rdmaout); 2858 } 2859 2860 if (!rdma) { 2861 error_report("RDMAContext is NULL when dispatch Gsource"); 2862 return FALSE; 2863 } 2864 2865 if (rdma->wr_data[0].control_len) { 2866 cond |= G_IO_IN; 2867 } 2868 cond |= G_IO_OUT; 2869 2870 return (*func)(QIO_CHANNEL(rsource->rioc), 2871 (cond & rsource->condition), 2872 user_data); 2873 } 2874 2875 static void 2876 qio_channel_rdma_source_finalize(GSource *source) 2877 { 2878 QIOChannelRDMASource *ssource = (QIOChannelRDMASource *)source; 2879 2880 object_unref(OBJECT(ssource->rioc)); 2881 } 2882 2883 static GSourceFuncs qio_channel_rdma_source_funcs = { 2884 qio_channel_rdma_source_prepare, 2885 qio_channel_rdma_source_check, 2886 qio_channel_rdma_source_dispatch, 2887 qio_channel_rdma_source_finalize 2888 }; 2889 2890 static GSource *qio_channel_rdma_create_watch(QIOChannel *ioc, 2891 GIOCondition condition) 2892 { 2893 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc); 2894 QIOChannelRDMASource *ssource; 2895 GSource *source; 2896 2897 source = g_source_new(&qio_channel_rdma_source_funcs, 2898 sizeof(QIOChannelRDMASource)); 2899 ssource = (QIOChannelRDMASource *)source; 2900 2901 ssource->rioc = rioc; 2902 object_ref(OBJECT(rioc)); 2903 2904 ssource->condition = condition; 2905 2906 return source; 2907 } 2908 2909 static void qio_channel_rdma_set_aio_fd_handler(QIOChannel *ioc, 2910 AioContext *read_ctx, 2911 IOHandler *io_read, 2912 AioContext *write_ctx, 2913 IOHandler *io_write, 2914 void *opaque) 2915 { 2916 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc); 2917 if (io_read) { 2918 aio_set_fd_handler(read_ctx, rioc->rdmain->recv_comp_channel->fd, 2919 io_read, io_write, NULL, NULL, opaque); 2920 aio_set_fd_handler(read_ctx, rioc->rdmain->send_comp_channel->fd, 2921 io_read, io_write, NULL, NULL, opaque); 2922 } else { 2923 aio_set_fd_handler(write_ctx, rioc->rdmaout->recv_comp_channel->fd, 2924 io_read, io_write, NULL, NULL, opaque); 2925 aio_set_fd_handler(write_ctx, rioc->rdmaout->send_comp_channel->fd, 2926 io_read, io_write, NULL, NULL, opaque); 2927 } 2928 } 2929 2930 struct rdma_close_rcu { 2931 struct rcu_head rcu; 2932 RDMAContext *rdmain; 2933 RDMAContext *rdmaout; 2934 }; 2935 2936 /* callback from qio_channel_rdma_close via call_rcu */ 2937 static void qio_channel_rdma_close_rcu(struct rdma_close_rcu *rcu) 2938 { 2939 if (rcu->rdmain) { 2940 qemu_rdma_cleanup(rcu->rdmain); 2941 } 2942 2943 if (rcu->rdmaout) { 2944 qemu_rdma_cleanup(rcu->rdmaout); 2945 } 2946 2947 g_free(rcu->rdmain); 2948 g_free(rcu->rdmaout); 2949 g_free(rcu); 2950 } 2951 2952 static int qio_channel_rdma_close(QIOChannel *ioc, 2953 Error **errp) 2954 { 2955 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc); 2956 RDMAContext *rdmain, *rdmaout; 2957 struct rdma_close_rcu *rcu = g_new(struct rdma_close_rcu, 1); 2958 2959 trace_qemu_rdma_close(); 2960 2961 rdmain = rioc->rdmain; 2962 if (rdmain) { 2963 qatomic_rcu_set(&rioc->rdmain, NULL); 2964 } 2965 2966 rdmaout = rioc->rdmaout; 2967 if (rdmaout) { 2968 qatomic_rcu_set(&rioc->rdmaout, NULL); 2969 } 2970 2971 rcu->rdmain = rdmain; 2972 rcu->rdmaout = rdmaout; 2973 call_rcu(rcu, qio_channel_rdma_close_rcu, rcu); 2974 2975 return 0; 2976 } 2977 2978 static int 2979 qio_channel_rdma_shutdown(QIOChannel *ioc, 2980 QIOChannelShutdown how, 2981 Error **errp) 2982 { 2983 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc); 2984 RDMAContext *rdmain, *rdmaout; 2985 2986 RCU_READ_LOCK_GUARD(); 2987 2988 rdmain = qatomic_rcu_read(&rioc->rdmain); 2989 rdmaout = qatomic_rcu_read(&rioc->rdmain); 2990 2991 switch (how) { 2992 case QIO_CHANNEL_SHUTDOWN_READ: 2993 if (rdmain) { 2994 rdmain->errored = true; 2995 } 2996 break; 2997 case QIO_CHANNEL_SHUTDOWN_WRITE: 2998 if (rdmaout) { 2999 rdmaout->errored = true; 3000 } 3001 break; 3002 case QIO_CHANNEL_SHUTDOWN_BOTH: 3003 default: 3004 if (rdmain) { 3005 rdmain->errored = true; 3006 } 3007 if (rdmaout) { 3008 rdmaout->errored = true; 3009 } 3010 break; 3011 } 3012 3013 return 0; 3014 } 3015 3016 /* 3017 * Parameters: 3018 * @offset == 0 : 3019 * This means that 'block_offset' is a full virtual address that does not 3020 * belong to a RAMBlock of the virtual machine and instead 3021 * represents a private malloc'd memory area that the caller wishes to 3022 * transfer. 3023 * 3024 * @offset != 0 : 3025 * Offset is an offset to be added to block_offset and used 3026 * to also lookup the corresponding RAMBlock. 3027 * 3028 * @size : Number of bytes to transfer 3029 * 3030 * @pages_sent : User-specificed pointer to indicate how many pages were 3031 * sent. Usually, this will not be more than a few bytes of 3032 * the protocol because most transfers are sent asynchronously. 3033 */ 3034 static int qemu_rdma_save_page(QEMUFile *f, ram_addr_t block_offset, 3035 ram_addr_t offset, size_t size) 3036 { 3037 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(qemu_file_get_ioc(f)); 3038 Error *err = NULL; 3039 RDMAContext *rdma; 3040 int ret; 3041 3042 RCU_READ_LOCK_GUARD(); 3043 rdma = qatomic_rcu_read(&rioc->rdmaout); 3044 3045 if (!rdma) { 3046 return -1; 3047 } 3048 3049 if (rdma_errored(rdma)) { 3050 return -1; 3051 } 3052 3053 qemu_fflush(f); 3054 3055 /* 3056 * Add this page to the current 'chunk'. If the chunk 3057 * is full, or the page doesn't belong to the current chunk, 3058 * an actual RDMA write will occur and a new chunk will be formed. 3059 */ 3060 ret = qemu_rdma_write(rdma, block_offset, offset, size, &err); 3061 if (ret < 0) { 3062 error_report_err(err); 3063 goto err; 3064 } 3065 3066 /* 3067 * Drain the Completion Queue if possible, but do not block, 3068 * just poll. 3069 * 3070 * If nothing to poll, the end of the iteration will do this 3071 * again to make sure we don't overflow the request queue. 3072 */ 3073 while (1) { 3074 uint64_t wr_id, wr_id_in; 3075 ret = qemu_rdma_poll(rdma, rdma->recv_cq, &wr_id_in, NULL); 3076 3077 if (ret < 0) { 3078 error_report("rdma migration: polling error"); 3079 goto err; 3080 } 3081 3082 wr_id = wr_id_in & RDMA_WRID_TYPE_MASK; 3083 3084 if (wr_id == RDMA_WRID_NONE) { 3085 break; 3086 } 3087 } 3088 3089 while (1) { 3090 uint64_t wr_id, wr_id_in; 3091 ret = qemu_rdma_poll(rdma, rdma->send_cq, &wr_id_in, NULL); 3092 3093 if (ret < 0) { 3094 error_report("rdma migration: polling error"); 3095 goto err; 3096 } 3097 3098 wr_id = wr_id_in & RDMA_WRID_TYPE_MASK; 3099 3100 if (wr_id == RDMA_WRID_NONE) { 3101 break; 3102 } 3103 } 3104 3105 return RAM_SAVE_CONTROL_DELAYED; 3106 3107 err: 3108 rdma->errored = true; 3109 return -1; 3110 } 3111 3112 int rdma_control_save_page(QEMUFile *f, ram_addr_t block_offset, 3113 ram_addr_t offset, size_t size) 3114 { 3115 assert(migrate_rdma()); 3116 3117 int ret = qemu_rdma_save_page(f, block_offset, offset, size); 3118 3119 if (ret != RAM_SAVE_CONTROL_DELAYED) { 3120 if (ret < 0) { 3121 qemu_file_set_error(f, ret); 3122 } 3123 } 3124 return ret; 3125 } 3126 3127 static void rdma_accept_incoming_migration(void *opaque); 3128 3129 static void rdma_cm_poll_handler(void *opaque) 3130 { 3131 RDMAContext *rdma = opaque; 3132 struct rdma_cm_event *cm_event; 3133 MigrationIncomingState *mis = migration_incoming_get_current(); 3134 3135 if (rdma_get_cm_event(rdma->channel, &cm_event) < 0) { 3136 error_report("get_cm_event failed %d", errno); 3137 return; 3138 } 3139 3140 if (cm_event->event == RDMA_CM_EVENT_DISCONNECTED || 3141 cm_event->event == RDMA_CM_EVENT_DEVICE_REMOVAL) { 3142 if (!rdma->errored && 3143 migration_incoming_get_current()->state != 3144 MIGRATION_STATUS_COMPLETED) { 3145 error_report("receive cm event, cm event is %d", cm_event->event); 3146 rdma->errored = true; 3147 if (rdma->return_path) { 3148 rdma->return_path->errored = true; 3149 } 3150 } 3151 rdma_ack_cm_event(cm_event); 3152 if (mis->loadvm_co) { 3153 qemu_coroutine_enter(mis->loadvm_co); 3154 } 3155 return; 3156 } 3157 rdma_ack_cm_event(cm_event); 3158 } 3159 3160 static int qemu_rdma_accept(RDMAContext *rdma) 3161 { 3162 Error *err = NULL; 3163 RDMACapabilities cap; 3164 struct rdma_conn_param conn_param = { 3165 .responder_resources = 2, 3166 .private_data = &cap, 3167 .private_data_len = sizeof(cap), 3168 }; 3169 RDMAContext *rdma_return_path = NULL; 3170 g_autoptr(InetSocketAddress) isock = g_new0(InetSocketAddress, 1); 3171 struct rdma_cm_event *cm_event; 3172 struct ibv_context *verbs; 3173 int ret; 3174 3175 ret = rdma_get_cm_event(rdma->channel, &cm_event); 3176 if (ret < 0) { 3177 goto err_rdma_dest_wait; 3178 } 3179 3180 if (cm_event->event != RDMA_CM_EVENT_CONNECT_REQUEST) { 3181 rdma_ack_cm_event(cm_event); 3182 goto err_rdma_dest_wait; 3183 } 3184 3185 isock->host = g_strdup(rdma->host); 3186 isock->port = g_strdup_printf("%d", rdma->port); 3187 3188 /* 3189 * initialize the RDMAContext for return path for postcopy after first 3190 * connection request reached. 3191 */ 3192 if ((migrate_postcopy() || migrate_return_path()) 3193 && !rdma->is_return_path) { 3194 rdma_return_path = qemu_rdma_data_init(isock, NULL); 3195 if (rdma_return_path == NULL) { 3196 rdma_ack_cm_event(cm_event); 3197 goto err_rdma_dest_wait; 3198 } 3199 3200 qemu_rdma_return_path_dest_init(rdma_return_path, rdma); 3201 } 3202 3203 memcpy(&cap, cm_event->param.conn.private_data, sizeof(cap)); 3204 3205 network_to_caps(&cap); 3206 3207 if (cap.version < 1 || cap.version > RDMA_CONTROL_VERSION_CURRENT) { 3208 error_report("Unknown source RDMA version: %d, bailing...", 3209 cap.version); 3210 rdma_ack_cm_event(cm_event); 3211 goto err_rdma_dest_wait; 3212 } 3213 3214 /* 3215 * Respond with only the capabilities this version of QEMU knows about. 3216 */ 3217 cap.flags &= known_capabilities; 3218 3219 /* 3220 * Enable the ones that we do know about. 3221 * Add other checks here as new ones are introduced. 3222 */ 3223 if (cap.flags & RDMA_CAPABILITY_PIN_ALL) { 3224 rdma->pin_all = true; 3225 } 3226 3227 rdma->cm_id = cm_event->id; 3228 verbs = cm_event->id->verbs; 3229 3230 rdma_ack_cm_event(cm_event); 3231 3232 trace_qemu_rdma_accept_pin_state(rdma->pin_all); 3233 3234 caps_to_network(&cap); 3235 3236 trace_qemu_rdma_accept_pin_verbsc(verbs); 3237 3238 if (!rdma->verbs) { 3239 rdma->verbs = verbs; 3240 } else if (rdma->verbs != verbs) { 3241 error_report("ibv context not matching %p, %p!", rdma->verbs, 3242 verbs); 3243 goto err_rdma_dest_wait; 3244 } 3245 3246 qemu_rdma_dump_id("dest_init", verbs); 3247 3248 ret = qemu_rdma_alloc_pd_cq(rdma, &err); 3249 if (ret < 0) { 3250 error_report_err(err); 3251 goto err_rdma_dest_wait; 3252 } 3253 3254 ret = qemu_rdma_alloc_qp(rdma); 3255 if (ret < 0) { 3256 error_report("rdma migration: error allocating qp!"); 3257 goto err_rdma_dest_wait; 3258 } 3259 3260 qemu_rdma_init_ram_blocks(rdma); 3261 3262 for (int i = 0; i < RDMA_WRID_MAX; i++) { 3263 ret = qemu_rdma_reg_control(rdma, i); 3264 if (ret < 0) { 3265 error_report("rdma: error registering %d control", i); 3266 goto err_rdma_dest_wait; 3267 } 3268 } 3269 3270 /* Accept the second connection request for return path */ 3271 if ((migrate_postcopy() || migrate_return_path()) 3272 && !rdma->is_return_path) { 3273 qemu_set_fd_handler(rdma->channel->fd, rdma_accept_incoming_migration, 3274 NULL, 3275 (void *)(intptr_t)rdma->return_path); 3276 } else { 3277 qemu_set_fd_handler(rdma->channel->fd, rdma_cm_poll_handler, 3278 NULL, rdma); 3279 } 3280 3281 ret = rdma_accept(rdma->cm_id, &conn_param); 3282 if (ret < 0) { 3283 error_report("rdma_accept failed"); 3284 goto err_rdma_dest_wait; 3285 } 3286 3287 ret = rdma_get_cm_event(rdma->channel, &cm_event); 3288 if (ret < 0) { 3289 error_report("rdma_accept get_cm_event failed"); 3290 goto err_rdma_dest_wait; 3291 } 3292 3293 if (cm_event->event != RDMA_CM_EVENT_ESTABLISHED) { 3294 error_report("rdma_accept not event established"); 3295 rdma_ack_cm_event(cm_event); 3296 goto err_rdma_dest_wait; 3297 } 3298 3299 rdma_ack_cm_event(cm_event); 3300 rdma->connected = true; 3301 3302 ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY, &err); 3303 if (ret < 0) { 3304 error_report_err(err); 3305 goto err_rdma_dest_wait; 3306 } 3307 3308 qemu_rdma_dump_gid("dest_connect", rdma->cm_id); 3309 3310 return 0; 3311 3312 err_rdma_dest_wait: 3313 rdma->errored = true; 3314 qemu_rdma_cleanup(rdma); 3315 g_free(rdma_return_path); 3316 return -1; 3317 } 3318 3319 static int dest_ram_sort_func(const void *a, const void *b) 3320 { 3321 unsigned int a_index = ((const RDMALocalBlock *)a)->src_index; 3322 unsigned int b_index = ((const RDMALocalBlock *)b)->src_index; 3323 3324 return (a_index < b_index) ? -1 : (a_index != b_index); 3325 } 3326 3327 /* 3328 * During each iteration of the migration, we listen for instructions 3329 * by the source VM to perform dynamic page registrations before they 3330 * can perform RDMA operations. 3331 * 3332 * We respond with the 'rkey'. 3333 * 3334 * Keep doing this until the source tells us to stop. 3335 */ 3336 int rdma_registration_handle(QEMUFile *f) 3337 { 3338 RDMAControlHeader reg_resp = { .len = sizeof(RDMARegisterResult), 3339 .type = RDMA_CONTROL_REGISTER_RESULT, 3340 .repeat = 0, 3341 }; 3342 RDMAControlHeader unreg_resp = { .len = 0, 3343 .type = RDMA_CONTROL_UNREGISTER_FINISHED, 3344 .repeat = 0, 3345 }; 3346 RDMAControlHeader blocks = { .type = RDMA_CONTROL_RAM_BLOCKS_RESULT, 3347 .repeat = 1 }; 3348 QIOChannelRDMA *rioc; 3349 Error *err = NULL; 3350 RDMAContext *rdma; 3351 RDMALocalBlocks *local; 3352 RDMAControlHeader head; 3353 RDMARegister *reg, *registers; 3354 RDMACompress *comp; 3355 RDMARegisterResult *reg_result; 3356 static RDMARegisterResult results[RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE]; 3357 RDMALocalBlock *block; 3358 void *host_addr; 3359 int ret; 3360 int idx = 0; 3361 3362 if (!migrate_rdma()) { 3363 return 0; 3364 } 3365 3366 RCU_READ_LOCK_GUARD(); 3367 rioc = QIO_CHANNEL_RDMA(qemu_file_get_ioc(f)); 3368 rdma = qatomic_rcu_read(&rioc->rdmain); 3369 3370 if (!rdma) { 3371 return -1; 3372 } 3373 3374 if (rdma_errored(rdma)) { 3375 return -1; 3376 } 3377 3378 local = &rdma->local_ram_blocks; 3379 do { 3380 trace_rdma_registration_handle_wait(); 3381 3382 ret = qemu_rdma_exchange_recv(rdma, &head, RDMA_CONTROL_NONE, &err); 3383 3384 if (ret < 0) { 3385 error_report_err(err); 3386 break; 3387 } 3388 3389 if (head.repeat > RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE) { 3390 error_report("rdma: Too many requests in this message (%d)." 3391 "Bailing.", head.repeat); 3392 break; 3393 } 3394 3395 switch (head.type) { 3396 case RDMA_CONTROL_COMPRESS: 3397 comp = (RDMACompress *) rdma->wr_data[idx].control_curr; 3398 network_to_compress(comp); 3399 3400 trace_rdma_registration_handle_compress(comp->length, 3401 comp->block_idx, 3402 comp->offset); 3403 if (comp->block_idx >= rdma->local_ram_blocks.nb_blocks) { 3404 error_report("rdma: 'compress' bad block index %u (vs %d)", 3405 (unsigned int)comp->block_idx, 3406 rdma->local_ram_blocks.nb_blocks); 3407 goto err; 3408 } 3409 block = &(rdma->local_ram_blocks.block[comp->block_idx]); 3410 3411 host_addr = block->local_host_addr + 3412 (comp->offset - block->offset); 3413 if (comp->value) { 3414 error_report("rdma: Zero page with non-zero (%d) value", 3415 comp->value); 3416 goto err; 3417 } 3418 ram_handle_zero(host_addr, comp->length); 3419 break; 3420 3421 case RDMA_CONTROL_REGISTER_FINISHED: 3422 trace_rdma_registration_handle_finished(); 3423 return 0; 3424 3425 case RDMA_CONTROL_RAM_BLOCKS_REQUEST: 3426 trace_rdma_registration_handle_ram_blocks(); 3427 3428 /* Sort our local RAM Block list so it's the same as the source, 3429 * we can do this since we've filled in a src_index in the list 3430 * as we received the RAMBlock list earlier. 3431 */ 3432 qsort(rdma->local_ram_blocks.block, 3433 rdma->local_ram_blocks.nb_blocks, 3434 sizeof(RDMALocalBlock), dest_ram_sort_func); 3435 for (int i = 0; i < local->nb_blocks; i++) { 3436 local->block[i].index = i; 3437 } 3438 3439 if (rdma->pin_all) { 3440 ret = qemu_rdma_reg_whole_ram_blocks(rdma, &err); 3441 if (ret < 0) { 3442 error_report_err(err); 3443 goto err; 3444 } 3445 } 3446 3447 /* 3448 * Dest uses this to prepare to transmit the RAMBlock descriptions 3449 * to the source VM after connection setup. 3450 * Both sides use the "remote" structure to communicate and update 3451 * their "local" descriptions with what was sent. 3452 */ 3453 for (int i = 0; i < local->nb_blocks; i++) { 3454 rdma->dest_blocks[i].remote_host_addr = 3455 (uintptr_t)(local->block[i].local_host_addr); 3456 3457 if (rdma->pin_all) { 3458 rdma->dest_blocks[i].remote_rkey = local->block[i].mr->rkey; 3459 } 3460 3461 rdma->dest_blocks[i].offset = local->block[i].offset; 3462 rdma->dest_blocks[i].length = local->block[i].length; 3463 3464 dest_block_to_network(&rdma->dest_blocks[i]); 3465 trace_rdma_registration_handle_ram_blocks_loop( 3466 local->block[i].block_name, 3467 local->block[i].offset, 3468 local->block[i].length, 3469 local->block[i].local_host_addr, 3470 local->block[i].src_index); 3471 } 3472 3473 blocks.len = rdma->local_ram_blocks.nb_blocks 3474 * sizeof(RDMADestBlock); 3475 3476 3477 ret = qemu_rdma_post_send_control(rdma, 3478 (uint8_t *) rdma->dest_blocks, &blocks, 3479 &err); 3480 3481 if (ret < 0) { 3482 error_report_err(err); 3483 goto err; 3484 } 3485 3486 break; 3487 case RDMA_CONTROL_REGISTER_REQUEST: 3488 trace_rdma_registration_handle_register(head.repeat); 3489 3490 reg_resp.repeat = head.repeat; 3491 registers = (RDMARegister *) rdma->wr_data[idx].control_curr; 3492 3493 for (int count = 0; count < head.repeat; count++) { 3494 uint64_t chunk; 3495 uint8_t *chunk_start, *chunk_end; 3496 3497 reg = ®isters[count]; 3498 network_to_register(reg); 3499 3500 reg_result = &results[count]; 3501 3502 trace_rdma_registration_handle_register_loop(count, 3503 reg->current_index, reg->key.current_addr, reg->chunks); 3504 3505 if (reg->current_index >= rdma->local_ram_blocks.nb_blocks) { 3506 error_report("rdma: 'register' bad block index %u (vs %d)", 3507 (unsigned int)reg->current_index, 3508 rdma->local_ram_blocks.nb_blocks); 3509 goto err; 3510 } 3511 block = &(rdma->local_ram_blocks.block[reg->current_index]); 3512 if (block->is_ram_block) { 3513 if (block->offset > reg->key.current_addr) { 3514 error_report("rdma: bad register address for block %s" 3515 " offset: %" PRIx64 " current_addr: %" PRIx64, 3516 block->block_name, block->offset, 3517 reg->key.current_addr); 3518 goto err; 3519 } 3520 host_addr = (block->local_host_addr + 3521 (reg->key.current_addr - block->offset)); 3522 chunk = ram_chunk_index(block->local_host_addr, 3523 (uint8_t *) host_addr); 3524 } else { 3525 chunk = reg->key.chunk; 3526 host_addr = block->local_host_addr + 3527 (reg->key.chunk * (1UL << RDMA_REG_CHUNK_SHIFT)); 3528 /* Check for particularly bad chunk value */ 3529 if (host_addr < (void *)block->local_host_addr) { 3530 error_report("rdma: bad chunk for block %s" 3531 " chunk: %" PRIx64, 3532 block->block_name, reg->key.chunk); 3533 goto err; 3534 } 3535 } 3536 chunk_start = ram_chunk_start(block, chunk); 3537 chunk_end = ram_chunk_end(block, chunk + reg->chunks); 3538 /* avoid "-Waddress-of-packed-member" warning */ 3539 uint32_t tmp_rkey = 0; 3540 if (qemu_rdma_register_and_get_keys(rdma, block, 3541 (uintptr_t)host_addr, NULL, &tmp_rkey, 3542 chunk, chunk_start, chunk_end)) { 3543 error_report("cannot get rkey"); 3544 goto err; 3545 } 3546 reg_result->rkey = tmp_rkey; 3547 3548 reg_result->host_addr = (uintptr_t)block->local_host_addr; 3549 3550 trace_rdma_registration_handle_register_rkey(reg_result->rkey); 3551 3552 result_to_network(reg_result); 3553 } 3554 3555 ret = qemu_rdma_post_send_control(rdma, 3556 (uint8_t *) results, ®_resp, &err); 3557 3558 if (ret < 0) { 3559 error_report_err(err); 3560 goto err; 3561 } 3562 break; 3563 case RDMA_CONTROL_UNREGISTER_REQUEST: 3564 trace_rdma_registration_handle_unregister(head.repeat); 3565 unreg_resp.repeat = head.repeat; 3566 registers = (RDMARegister *) rdma->wr_data[idx].control_curr; 3567 3568 for (int count = 0; count < head.repeat; count++) { 3569 reg = ®isters[count]; 3570 network_to_register(reg); 3571 3572 trace_rdma_registration_handle_unregister_loop(count, 3573 reg->current_index, reg->key.chunk); 3574 3575 block = &(rdma->local_ram_blocks.block[reg->current_index]); 3576 3577 ret = ibv_dereg_mr(block->pmr[reg->key.chunk]); 3578 block->pmr[reg->key.chunk] = NULL; 3579 3580 if (ret != 0) { 3581 error_report("rdma unregistration chunk failed: %s", 3582 strerror(errno)); 3583 goto err; 3584 } 3585 3586 rdma->total_registrations--; 3587 3588 trace_rdma_registration_handle_unregister_success(reg->key.chunk); 3589 } 3590 3591 ret = qemu_rdma_post_send_control(rdma, NULL, &unreg_resp, &err); 3592 3593 if (ret < 0) { 3594 error_report_err(err); 3595 goto err; 3596 } 3597 break; 3598 case RDMA_CONTROL_REGISTER_RESULT: 3599 error_report("Invalid RESULT message at dest."); 3600 goto err; 3601 default: 3602 error_report("Unknown control message %s", control_desc(head.type)); 3603 goto err; 3604 } 3605 } while (1); 3606 3607 err: 3608 rdma->errored = true; 3609 return -1; 3610 } 3611 3612 /* Destination: 3613 * Called during the initial RAM load section which lists the 3614 * RAMBlocks by name. This lets us know the order of the RAMBlocks on 3615 * the source. We've already built our local RAMBlock list, but not 3616 * yet sent the list to the source. 3617 */ 3618 int rdma_block_notification_handle(QEMUFile *f, const char *name) 3619 { 3620 int curr; 3621 int found = -1; 3622 3623 if (!migrate_rdma()) { 3624 return 0; 3625 } 3626 3627 RCU_READ_LOCK_GUARD(); 3628 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(qemu_file_get_ioc(f)); 3629 RDMAContext *rdma = qatomic_rcu_read(&rioc->rdmain); 3630 3631 if (!rdma) { 3632 return -1; 3633 } 3634 3635 /* Find the matching RAMBlock in our local list */ 3636 for (curr = 0; curr < rdma->local_ram_blocks.nb_blocks; curr++) { 3637 if (!strcmp(rdma->local_ram_blocks.block[curr].block_name, name)) { 3638 found = curr; 3639 break; 3640 } 3641 } 3642 3643 if (found == -1) { 3644 error_report("RAMBlock '%s' not found on destination", name); 3645 return -1; 3646 } 3647 3648 rdma->local_ram_blocks.block[curr].src_index = rdma->next_src_index; 3649 trace_rdma_block_notification_handle(name, rdma->next_src_index); 3650 rdma->next_src_index++; 3651 3652 return 0; 3653 } 3654 3655 int rdma_registration_start(QEMUFile *f, uint64_t flags) 3656 { 3657 if (!migrate_rdma()) { 3658 return 0; 3659 } 3660 3661 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(qemu_file_get_ioc(f)); 3662 RCU_READ_LOCK_GUARD(); 3663 RDMAContext *rdma = qatomic_rcu_read(&rioc->rdmaout); 3664 if (!rdma) { 3665 return -1; 3666 } 3667 3668 if (rdma_errored(rdma)) { 3669 return -1; 3670 } 3671 3672 trace_rdma_registration_start(flags); 3673 qemu_put_be64(f, RAM_SAVE_FLAG_HOOK); 3674 return qemu_fflush(f); 3675 } 3676 3677 /* 3678 * Inform dest that dynamic registrations are done for now. 3679 * First, flush writes, if any. 3680 */ 3681 int rdma_registration_stop(QEMUFile *f, uint64_t flags) 3682 { 3683 QIOChannelRDMA *rioc; 3684 Error *err = NULL; 3685 RDMAContext *rdma; 3686 RDMAControlHeader head = { .len = 0, .repeat = 1 }; 3687 int ret; 3688 3689 if (!migrate_rdma()) { 3690 return 0; 3691 } 3692 3693 RCU_READ_LOCK_GUARD(); 3694 rioc = QIO_CHANNEL_RDMA(qemu_file_get_ioc(f)); 3695 rdma = qatomic_rcu_read(&rioc->rdmaout); 3696 if (!rdma) { 3697 return -1; 3698 } 3699 3700 if (rdma_errored(rdma)) { 3701 return -1; 3702 } 3703 3704 qemu_fflush(f); 3705 ret = qemu_rdma_drain_cq(rdma); 3706 3707 if (ret < 0) { 3708 goto err; 3709 } 3710 3711 if (flags == RAM_CONTROL_SETUP) { 3712 RDMAControlHeader resp = {.type = RDMA_CONTROL_RAM_BLOCKS_RESULT }; 3713 RDMALocalBlocks *local = &rdma->local_ram_blocks; 3714 int reg_result_idx, nb_dest_blocks; 3715 3716 head.type = RDMA_CONTROL_RAM_BLOCKS_REQUEST; 3717 trace_rdma_registration_stop_ram(); 3718 3719 /* 3720 * Make sure that we parallelize the pinning on both sides. 3721 * For very large guests, doing this serially takes a really 3722 * long time, so we have to 'interleave' the pinning locally 3723 * with the control messages by performing the pinning on this 3724 * side before we receive the control response from the other 3725 * side that the pinning has completed. 3726 */ 3727 ret = qemu_rdma_exchange_send(rdma, &head, NULL, &resp, 3728 ®_result_idx, rdma->pin_all ? 3729 qemu_rdma_reg_whole_ram_blocks : NULL, 3730 &err); 3731 if (ret < 0) { 3732 error_report_err(err); 3733 return -1; 3734 } 3735 3736 nb_dest_blocks = resp.len / sizeof(RDMADestBlock); 3737 3738 /* 3739 * The protocol uses two different sets of rkeys (mutually exclusive): 3740 * 1. One key to represent the virtual address of the entire ram block. 3741 * (dynamic chunk registration disabled - pin everything with one rkey.) 3742 * 2. One to represent individual chunks within a ram block. 3743 * (dynamic chunk registration enabled - pin individual chunks.) 3744 * 3745 * Once the capability is successfully negotiated, the destination transmits 3746 * the keys to use (or sends them later) including the virtual addresses 3747 * and then propagates the remote ram block descriptions to his local copy. 3748 */ 3749 3750 if (local->nb_blocks != nb_dest_blocks) { 3751 error_report("ram blocks mismatch (Number of blocks %d vs %d)", 3752 local->nb_blocks, nb_dest_blocks); 3753 error_printf("Your QEMU command line parameters are probably " 3754 "not identical on both the source and destination."); 3755 rdma->errored = true; 3756 return -1; 3757 } 3758 3759 qemu_rdma_move_header(rdma, reg_result_idx, &resp); 3760 memcpy(rdma->dest_blocks, 3761 rdma->wr_data[reg_result_idx].control_curr, resp.len); 3762 for (int i = 0; i < nb_dest_blocks; i++) { 3763 network_to_dest_block(&rdma->dest_blocks[i]); 3764 3765 /* We require that the blocks are in the same order */ 3766 if (rdma->dest_blocks[i].length != local->block[i].length) { 3767 error_report("Block %s/%d has a different length %" PRIu64 3768 "vs %" PRIu64, 3769 local->block[i].block_name, i, 3770 local->block[i].length, 3771 rdma->dest_blocks[i].length); 3772 rdma->errored = true; 3773 return -1; 3774 } 3775 local->block[i].remote_host_addr = 3776 rdma->dest_blocks[i].remote_host_addr; 3777 local->block[i].remote_rkey = rdma->dest_blocks[i].remote_rkey; 3778 } 3779 } 3780 3781 trace_rdma_registration_stop(flags); 3782 3783 head.type = RDMA_CONTROL_REGISTER_FINISHED; 3784 ret = qemu_rdma_exchange_send(rdma, &head, NULL, NULL, NULL, NULL, &err); 3785 3786 if (ret < 0) { 3787 error_report_err(err); 3788 goto err; 3789 } 3790 3791 return 0; 3792 err: 3793 rdma->errored = true; 3794 return -1; 3795 } 3796 3797 static void qio_channel_rdma_finalize(Object *obj) 3798 { 3799 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(obj); 3800 if (rioc->rdmain) { 3801 qemu_rdma_cleanup(rioc->rdmain); 3802 g_free(rioc->rdmain); 3803 rioc->rdmain = NULL; 3804 } 3805 if (rioc->rdmaout) { 3806 qemu_rdma_cleanup(rioc->rdmaout); 3807 g_free(rioc->rdmaout); 3808 rioc->rdmaout = NULL; 3809 } 3810 } 3811 3812 static void qio_channel_rdma_class_init(ObjectClass *klass, 3813 const void *class_data G_GNUC_UNUSED) 3814 { 3815 QIOChannelClass *ioc_klass = QIO_CHANNEL_CLASS(klass); 3816 3817 ioc_klass->io_writev = qio_channel_rdma_writev; 3818 ioc_klass->io_readv = qio_channel_rdma_readv; 3819 ioc_klass->io_set_blocking = qio_channel_rdma_set_blocking; 3820 ioc_klass->io_close = qio_channel_rdma_close; 3821 ioc_klass->io_create_watch = qio_channel_rdma_create_watch; 3822 ioc_klass->io_set_aio_fd_handler = qio_channel_rdma_set_aio_fd_handler; 3823 ioc_klass->io_shutdown = qio_channel_rdma_shutdown; 3824 } 3825 3826 static const TypeInfo qio_channel_rdma_info = { 3827 .parent = TYPE_QIO_CHANNEL, 3828 .name = TYPE_QIO_CHANNEL_RDMA, 3829 .instance_size = sizeof(QIOChannelRDMA), 3830 .instance_finalize = qio_channel_rdma_finalize, 3831 .class_init = qio_channel_rdma_class_init, 3832 }; 3833 3834 static void qio_channel_rdma_register_types(void) 3835 { 3836 type_register_static(&qio_channel_rdma_info); 3837 } 3838 3839 type_init(qio_channel_rdma_register_types); 3840 3841 static QEMUFile *rdma_new_input(RDMAContext *rdma) 3842 { 3843 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(object_new(TYPE_QIO_CHANNEL_RDMA)); 3844 3845 rioc->file = qemu_file_new_input(QIO_CHANNEL(rioc)); 3846 rioc->rdmain = rdma; 3847 rioc->rdmaout = rdma->return_path; 3848 3849 return rioc->file; 3850 } 3851 3852 static QEMUFile *rdma_new_output(RDMAContext *rdma) 3853 { 3854 QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(object_new(TYPE_QIO_CHANNEL_RDMA)); 3855 3856 rioc->file = qemu_file_new_output(QIO_CHANNEL(rioc)); 3857 rioc->rdmaout = rdma; 3858 rioc->rdmain = rdma->return_path; 3859 3860 return rioc->file; 3861 } 3862 3863 static void rdma_accept_incoming_migration(void *opaque) 3864 { 3865 RDMAContext *rdma = opaque; 3866 QEMUFile *f; 3867 3868 trace_qemu_rdma_accept_incoming_migration(); 3869 if (qemu_rdma_accept(rdma) < 0) { 3870 error_report("RDMA ERROR: Migration initialization failed"); 3871 return; 3872 } 3873 3874 trace_qemu_rdma_accept_incoming_migration_accepted(); 3875 3876 if (rdma->is_return_path) { 3877 return; 3878 } 3879 3880 f = rdma_new_input(rdma); 3881 if (f == NULL) { 3882 error_report("RDMA ERROR: could not open RDMA for input"); 3883 qemu_rdma_cleanup(rdma); 3884 return; 3885 } 3886 3887 rdma->migration_started_on_destination = 1; 3888 migration_fd_process_incoming(f); 3889 } 3890 3891 void rdma_start_incoming_migration(InetSocketAddress *host_port, 3892 Error **errp) 3893 { 3894 MigrationState *s = migrate_get_current(); 3895 int ret; 3896 RDMAContext *rdma; 3897 3898 trace_rdma_start_incoming_migration(); 3899 3900 /* Avoid ram_block_discard_disable(), cannot change during migration. */ 3901 if (ram_block_discard_is_required()) { 3902 error_setg(errp, "RDMA: cannot disable RAM discard"); 3903 return; 3904 } 3905 3906 rdma = qemu_rdma_data_init(host_port, errp); 3907 if (rdma == NULL) { 3908 goto err; 3909 } 3910 3911 ret = qemu_rdma_dest_init(rdma, errp); 3912 if (ret < 0) { 3913 goto err; 3914 } 3915 3916 trace_rdma_start_incoming_migration_after_dest_init(); 3917 3918 ret = rdma_listen(rdma->listen_id, 5); 3919 3920 if (ret < 0) { 3921 error_setg(errp, "RDMA ERROR: listening on socket!"); 3922 goto cleanup_rdma; 3923 } 3924 3925 trace_rdma_start_incoming_migration_after_rdma_listen(); 3926 s->rdma_migration = true; 3927 qemu_set_fd_handler(rdma->channel->fd, rdma_accept_incoming_migration, 3928 NULL, (void *)(intptr_t)rdma); 3929 return; 3930 3931 cleanup_rdma: 3932 qemu_rdma_cleanup(rdma); 3933 err: 3934 if (rdma) { 3935 g_free(rdma->host); 3936 } 3937 g_free(rdma); 3938 } 3939 3940 void rdma_start_outgoing_migration(void *opaque, 3941 InetSocketAddress *host_port, Error **errp) 3942 { 3943 MigrationState *s = opaque; 3944 RDMAContext *rdma_return_path = NULL; 3945 RDMAContext *rdma; 3946 int ret; 3947 3948 /* Avoid ram_block_discard_disable(), cannot change during migration. */ 3949 if (ram_block_discard_is_required()) { 3950 error_setg(errp, "RDMA: cannot disable RAM discard"); 3951 return; 3952 } 3953 3954 rdma = qemu_rdma_data_init(host_port, errp); 3955 if (rdma == NULL) { 3956 goto err; 3957 } 3958 3959 ret = qemu_rdma_source_init(rdma, migrate_rdma_pin_all(), errp); 3960 3961 if (ret < 0) { 3962 goto err; 3963 } 3964 3965 trace_rdma_start_outgoing_migration_after_rdma_source_init(); 3966 ret = qemu_rdma_connect(rdma, false, errp); 3967 3968 if (ret < 0) { 3969 goto err; 3970 } 3971 3972 /* RDMA postcopy need a separate queue pair for return path */ 3973 if (migrate_postcopy() || migrate_return_path()) { 3974 rdma_return_path = qemu_rdma_data_init(host_port, errp); 3975 3976 if (rdma_return_path == NULL) { 3977 goto return_path_err; 3978 } 3979 3980 ret = qemu_rdma_source_init(rdma_return_path, 3981 migrate_rdma_pin_all(), errp); 3982 3983 if (ret < 0) { 3984 goto return_path_err; 3985 } 3986 3987 ret = qemu_rdma_connect(rdma_return_path, true, errp); 3988 3989 if (ret < 0) { 3990 goto return_path_err; 3991 } 3992 3993 rdma->return_path = rdma_return_path; 3994 rdma_return_path->return_path = rdma; 3995 rdma_return_path->is_return_path = true; 3996 } 3997 3998 trace_rdma_start_outgoing_migration_after_rdma_connect(); 3999 4000 s->to_dst_file = rdma_new_output(rdma); 4001 s->rdma_migration = true; 4002 migration_connect(s, NULL); 4003 return; 4004 return_path_err: 4005 qemu_rdma_cleanup(rdma); 4006 err: 4007 g_free(rdma); 4008 g_free(rdma_return_path); 4009 } 4010