1 /* 2 * Multifd common code 3 * 4 * Copyright (c) 2019-2020 Red Hat Inc 5 * 6 * Authors: 7 * Juan Quintela <quintela@redhat.com> 8 * 9 * This work is licensed under the terms of the GNU GPL, version 2 or later. 10 * See the COPYING file in the top-level directory. 11 */ 12 13 #include "qemu/osdep.h" 14 #include "qemu/cutils.h" 15 #include "qemu/rcu.h" 16 #include "exec/target_page.h" 17 #include "system/system.h" 18 #include "exec/ramblock.h" 19 #include "qemu/error-report.h" 20 #include "qapi/error.h" 21 #include "file.h" 22 #include "migration.h" 23 #include "migration-stats.h" 24 #include "socket.h" 25 #include "tls.h" 26 #include "qemu-file.h" 27 #include "trace.h" 28 #include "multifd.h" 29 #include "threadinfo.h" 30 #include "options.h" 31 #include "qemu/yank.h" 32 #include "io/channel-file.h" 33 #include "io/channel-socket.h" 34 #include "yank_functions.h" 35 36 /* Multiple fd's */ 37 38 #define MULTIFD_MAGIC 0x11223344U 39 #define MULTIFD_VERSION 1 40 41 typedef struct { 42 uint32_t magic; 43 uint32_t version; 44 unsigned char uuid[16]; /* QemuUUID */ 45 uint8_t id; 46 uint8_t unused1[7]; /* Reserved for future use */ 47 uint64_t unused2[4]; /* Reserved for future use */ 48 } __attribute__((packed)) MultiFDInit_t; 49 50 struct { 51 MultiFDSendParams *params; 52 /* 53 * Global number of generated multifd packets. 54 * 55 * Note that we used 'uintptr_t' because it'll naturally support atomic 56 * operations on both 32bit / 64 bits hosts. It means on 32bit systems 57 * multifd will overflow the packet_num easier, but that should be 58 * fine. 59 * 60 * Another option is to use QEMU's Stat64 then it'll be 64 bits on all 61 * hosts, however so far it does not support atomic fetch_add() yet. 62 * Make it easy for now. 63 */ 64 uintptr_t packet_num; 65 /* 66 * Synchronization point past which no more channels will be 67 * created. 68 */ 69 QemuSemaphore channels_created; 70 /* send channels ready */ 71 QemuSemaphore channels_ready; 72 /* 73 * Have we already run terminate threads. There is a race when it 74 * happens that we got one error while we are exiting. 75 * We will use atomic operations. Only valid values are 0 and 1. 76 */ 77 int exiting; 78 /* multifd ops */ 79 const MultiFDMethods *ops; 80 } *multifd_send_state; 81 82 struct { 83 MultiFDRecvParams *params; 84 MultiFDRecvData *data; 85 /* number of created threads */ 86 int count; 87 /* 88 * This is always posted by the recv threads, the migration thread 89 * uses it to wait for recv threads to finish assigned tasks. 90 */ 91 QemuSemaphore sem_sync; 92 /* global number of generated multifd packets */ 93 uint64_t packet_num; 94 int exiting; 95 /* multifd ops */ 96 const MultiFDMethods *ops; 97 } *multifd_recv_state; 98 99 MultiFDSendData *multifd_send_data_alloc(void) 100 { 101 size_t max_payload_size, size_minus_payload; 102 103 /* 104 * MultiFDPages_t has a flexible array at the end, account for it 105 * when allocating MultiFDSendData. Use max() in case other types 106 * added to the union in the future are larger than 107 * (MultiFDPages_t + flex array). 108 */ 109 max_payload_size = MAX(multifd_ram_payload_size(), sizeof(MultiFDPayload)); 110 111 /* 112 * Account for any holes the compiler might insert. We can't pack 113 * the structure because that misaligns the members and triggers 114 * Waddress-of-packed-member. 115 */ 116 size_minus_payload = sizeof(MultiFDSendData) - sizeof(MultiFDPayload); 117 118 return g_malloc0(size_minus_payload + max_payload_size); 119 } 120 121 static bool multifd_use_packets(void) 122 { 123 return !migrate_mapped_ram(); 124 } 125 126 void multifd_send_channel_created(void) 127 { 128 qemu_sem_post(&multifd_send_state->channels_created); 129 } 130 131 static const MultiFDMethods *multifd_ops[MULTIFD_COMPRESSION__MAX] = {}; 132 133 void multifd_register_ops(int method, const MultiFDMethods *ops) 134 { 135 assert(0 <= method && method < MULTIFD_COMPRESSION__MAX); 136 assert(!multifd_ops[method]); 137 multifd_ops[method] = ops; 138 } 139 140 static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp) 141 { 142 MultiFDInit_t msg = {}; 143 size_t size = sizeof(msg); 144 int ret; 145 146 msg.magic = cpu_to_be32(MULTIFD_MAGIC); 147 msg.version = cpu_to_be32(MULTIFD_VERSION); 148 msg.id = p->id; 149 memcpy(msg.uuid, &qemu_uuid.data, sizeof(msg.uuid)); 150 151 ret = qio_channel_write_all(p->c, (char *)&msg, size, errp); 152 if (ret != 0) { 153 return -1; 154 } 155 stat64_add(&mig_stats.multifd_bytes, size); 156 return 0; 157 } 158 159 static int multifd_recv_initial_packet(QIOChannel *c, Error **errp) 160 { 161 MultiFDInit_t msg; 162 int ret; 163 164 ret = qio_channel_read_all(c, (char *)&msg, sizeof(msg), errp); 165 if (ret != 0) { 166 return -1; 167 } 168 169 msg.magic = be32_to_cpu(msg.magic); 170 msg.version = be32_to_cpu(msg.version); 171 172 if (msg.magic != MULTIFD_MAGIC) { 173 error_setg(errp, "multifd: received packet magic %x " 174 "expected %x", msg.magic, MULTIFD_MAGIC); 175 return -1; 176 } 177 178 if (msg.version != MULTIFD_VERSION) { 179 error_setg(errp, "multifd: received packet version %u " 180 "expected %u", msg.version, MULTIFD_VERSION); 181 return -1; 182 } 183 184 if (memcmp(msg.uuid, &qemu_uuid, sizeof(qemu_uuid))) { 185 char *uuid = qemu_uuid_unparse_strdup(&qemu_uuid); 186 char *msg_uuid = qemu_uuid_unparse_strdup((const QemuUUID *)msg.uuid); 187 188 error_setg(errp, "multifd: received uuid '%s' and expected " 189 "uuid '%s' for channel %hhd", msg_uuid, uuid, msg.id); 190 g_free(uuid); 191 g_free(msg_uuid); 192 return -1; 193 } 194 195 if (msg.id > migrate_multifd_channels()) { 196 error_setg(errp, "multifd: received channel id %u is greater than " 197 "number of channels %u", msg.id, migrate_multifd_channels()); 198 return -1; 199 } 200 201 return msg.id; 202 } 203 204 void multifd_send_fill_packet(MultiFDSendParams *p) 205 { 206 MultiFDPacket_t *packet = p->packet; 207 uint64_t packet_num; 208 bool sync_packet = p->flags & MULTIFD_FLAG_SYNC; 209 210 memset(packet, 0, p->packet_len); 211 212 packet->magic = cpu_to_be32(MULTIFD_MAGIC); 213 packet->version = cpu_to_be32(MULTIFD_VERSION); 214 215 packet->flags = cpu_to_be32(p->flags); 216 packet->next_packet_size = cpu_to_be32(p->next_packet_size); 217 218 packet_num = qatomic_fetch_inc(&multifd_send_state->packet_num); 219 packet->packet_num = cpu_to_be64(packet_num); 220 221 p->packets_sent++; 222 223 if (!sync_packet) { 224 multifd_ram_fill_packet(p); 225 } 226 227 trace_multifd_send_fill(p->id, packet_num, 228 p->flags, p->next_packet_size); 229 } 230 231 static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp) 232 { 233 const MultiFDPacket_t *packet = p->packet; 234 uint32_t magic = be32_to_cpu(packet->magic); 235 uint32_t version = be32_to_cpu(packet->version); 236 int ret = 0; 237 238 if (magic != MULTIFD_MAGIC) { 239 error_setg(errp, "multifd: received packet magic %x, expected %x", 240 magic, MULTIFD_MAGIC); 241 return -1; 242 } 243 244 if (version != MULTIFD_VERSION) { 245 error_setg(errp, "multifd: received packet version %u, expected %u", 246 version, MULTIFD_VERSION); 247 return -1; 248 } 249 250 p->flags = be32_to_cpu(packet->flags); 251 p->next_packet_size = be32_to_cpu(packet->next_packet_size); 252 p->packet_num = be64_to_cpu(packet->packet_num); 253 p->packets_recved++; 254 255 /* Always unfill, old QEMUs (<9.0) send data along with SYNC */ 256 ret = multifd_ram_unfill_packet(p, errp); 257 258 trace_multifd_recv_unfill(p->id, p->packet_num, p->flags, 259 p->next_packet_size); 260 261 return ret; 262 } 263 264 static bool multifd_send_should_exit(void) 265 { 266 return qatomic_read(&multifd_send_state->exiting); 267 } 268 269 static bool multifd_recv_should_exit(void) 270 { 271 return qatomic_read(&multifd_recv_state->exiting); 272 } 273 274 /* 275 * The migration thread can wait on either of the two semaphores. This 276 * function can be used to kick the main thread out of waiting on either of 277 * them. Should mostly only be called when something wrong happened with 278 * the current multifd send thread. 279 */ 280 static void multifd_send_kick_main(MultiFDSendParams *p) 281 { 282 qemu_sem_post(&p->sem_sync); 283 qemu_sem_post(&multifd_send_state->channels_ready); 284 } 285 286 /* 287 * multifd_send() works by exchanging the MultiFDSendData object 288 * provided by the caller with an unused MultiFDSendData object from 289 * the next channel that is found to be idle. 290 * 291 * The channel owns the data until it finishes transmitting and the 292 * caller owns the empty object until it fills it with data and calls 293 * this function again. No locking necessary. 294 * 295 * Switching is safe because both the migration thread and the channel 296 * thread have barriers in place to serialize access. 297 * 298 * Returns true if succeed, false otherwise. 299 */ 300 bool multifd_send(MultiFDSendData **send_data) 301 { 302 int i; 303 static int next_channel; 304 MultiFDSendParams *p = NULL; /* make happy gcc */ 305 MultiFDSendData *tmp; 306 307 if (multifd_send_should_exit()) { 308 return false; 309 } 310 311 /* We wait here, until at least one channel is ready */ 312 qemu_sem_wait(&multifd_send_state->channels_ready); 313 314 /* 315 * next_channel can remain from a previous migration that was 316 * using more channels, so ensure it doesn't overflow if the 317 * limit is lower now. 318 */ 319 next_channel %= migrate_multifd_channels(); 320 for (i = next_channel;; i = (i + 1) % migrate_multifd_channels()) { 321 if (multifd_send_should_exit()) { 322 return false; 323 } 324 p = &multifd_send_state->params[i]; 325 /* 326 * Lockless read to p->pending_job is safe, because only multifd 327 * sender thread can clear it. 328 */ 329 if (qatomic_read(&p->pending_job) == false) { 330 next_channel = (i + 1) % migrate_multifd_channels(); 331 break; 332 } 333 } 334 335 /* 336 * Make sure we read p->pending_job before all the rest. Pairs with 337 * qatomic_store_release() in multifd_send_thread(). 338 */ 339 smp_mb_acquire(); 340 341 assert(multifd_payload_empty(p->data)); 342 343 /* 344 * Swap the pointers. The channel gets the client data for 345 * transferring and the client gets back an unused data slot. 346 */ 347 tmp = *send_data; 348 *send_data = p->data; 349 p->data = tmp; 350 351 /* 352 * Making sure p->data is setup before marking pending_job=true. Pairs 353 * with the qatomic_load_acquire() in multifd_send_thread(). 354 */ 355 qatomic_store_release(&p->pending_job, true); 356 qemu_sem_post(&p->sem); 357 358 return true; 359 } 360 361 /* Multifd send side hit an error; remember it and prepare to quit */ 362 static void multifd_send_set_error(Error *err) 363 { 364 /* 365 * We don't want to exit each threads twice. Depending on where 366 * we get the error, or if there are two independent errors in two 367 * threads at the same time, we can end calling this function 368 * twice. 369 */ 370 if (qatomic_xchg(&multifd_send_state->exiting, 1)) { 371 return; 372 } 373 374 if (err) { 375 MigrationState *s = migrate_get_current(); 376 migrate_set_error(s, err); 377 if (s->state == MIGRATION_STATUS_SETUP || 378 s->state == MIGRATION_STATUS_PRE_SWITCHOVER || 379 s->state == MIGRATION_STATUS_DEVICE || 380 s->state == MIGRATION_STATUS_ACTIVE) { 381 migrate_set_state(&s->state, s->state, 382 MIGRATION_STATUS_FAILED); 383 } 384 } 385 } 386 387 static void multifd_send_terminate_threads(void) 388 { 389 int i; 390 391 trace_multifd_send_terminate_threads(); 392 393 /* 394 * Tell everyone we're quitting. No xchg() needed here; we simply 395 * always set it. 396 */ 397 qatomic_set(&multifd_send_state->exiting, 1); 398 399 /* 400 * Firstly, kick all threads out; no matter whether they are just idle, 401 * or blocked in an IO system call. 402 */ 403 for (i = 0; i < migrate_multifd_channels(); i++) { 404 MultiFDSendParams *p = &multifd_send_state->params[i]; 405 406 qemu_sem_post(&p->sem); 407 if (p->c) { 408 qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL); 409 } 410 } 411 412 /* 413 * Finally recycle all the threads. 414 */ 415 for (i = 0; i < migrate_multifd_channels(); i++) { 416 MultiFDSendParams *p = &multifd_send_state->params[i]; 417 418 if (p->tls_thread_created) { 419 qemu_thread_join(&p->tls_thread); 420 } 421 422 if (p->thread_created) { 423 qemu_thread_join(&p->thread); 424 } 425 } 426 } 427 428 static bool multifd_send_cleanup_channel(MultiFDSendParams *p, Error **errp) 429 { 430 if (p->c) { 431 migration_ioc_unregister_yank(p->c); 432 /* 433 * The object_unref() cannot guarantee the fd will always be 434 * released because finalize() of the iochannel is only 435 * triggered on the last reference and it's not guaranteed 436 * that we always hold the last refcount when reaching here. 437 * 438 * Closing the fd explicitly has the benefit that if there is any 439 * registered I/O handler callbacks on such fd, that will get a 440 * POLLNVAL event and will further trigger the cleanup to finally 441 * release the IOC. 442 * 443 * FIXME: It should logically be guaranteed that all multifd 444 * channels have no I/O handler callback registered when reaching 445 * here, because migration thread will wait for all multifd channel 446 * establishments to complete during setup. Since 447 * migrate_fd_cleanup() will be scheduled in main thread too, all 448 * previous callbacks should guarantee to be completed when 449 * reaching here. See multifd_send_state.channels_created and its 450 * usage. In the future, we could replace this with an assert 451 * making sure we're the last reference, or simply drop it if above 452 * is more clear to be justified. 453 */ 454 qio_channel_close(p->c, &error_abort); 455 object_unref(OBJECT(p->c)); 456 p->c = NULL; 457 } 458 qemu_sem_destroy(&p->sem); 459 qemu_sem_destroy(&p->sem_sync); 460 g_free(p->name); 461 p->name = NULL; 462 g_free(p->data); 463 p->data = NULL; 464 p->packet_len = 0; 465 g_free(p->packet); 466 p->packet = NULL; 467 multifd_send_state->ops->send_cleanup(p, errp); 468 assert(!p->iov); 469 470 return *errp == NULL; 471 } 472 473 static void multifd_send_cleanup_state(void) 474 { 475 file_cleanup_outgoing_migration(); 476 socket_cleanup_outgoing_migration(); 477 qemu_sem_destroy(&multifd_send_state->channels_created); 478 qemu_sem_destroy(&multifd_send_state->channels_ready); 479 g_free(multifd_send_state->params); 480 multifd_send_state->params = NULL; 481 g_free(multifd_send_state); 482 multifd_send_state = NULL; 483 } 484 485 void multifd_send_shutdown(void) 486 { 487 int i; 488 489 if (!migrate_multifd()) { 490 return; 491 } 492 493 multifd_send_terminate_threads(); 494 495 for (i = 0; i < migrate_multifd_channels(); i++) { 496 MultiFDSendParams *p = &multifd_send_state->params[i]; 497 Error *local_err = NULL; 498 499 if (!multifd_send_cleanup_channel(p, &local_err)) { 500 migrate_set_error(migrate_get_current(), local_err); 501 error_free(local_err); 502 } 503 } 504 505 multifd_send_cleanup_state(); 506 } 507 508 static int multifd_zero_copy_flush(QIOChannel *c) 509 { 510 int ret; 511 Error *err = NULL; 512 513 ret = qio_channel_flush(c, &err); 514 if (ret < 0) { 515 error_report_err(err); 516 return -1; 517 } 518 if (ret == 1) { 519 stat64_add(&mig_stats.dirty_sync_missed_zero_copy, 1); 520 } 521 522 return ret; 523 } 524 525 int multifd_send_sync_main(MultiFDSyncReq req) 526 { 527 int i; 528 bool flush_zero_copy; 529 530 assert(req != MULTIFD_SYNC_NONE); 531 532 flush_zero_copy = migrate_zero_copy_send(); 533 534 for (i = 0; i < migrate_multifd_channels(); i++) { 535 MultiFDSendParams *p = &multifd_send_state->params[i]; 536 537 if (multifd_send_should_exit()) { 538 return -1; 539 } 540 541 trace_multifd_send_sync_main_signal(p->id); 542 543 /* 544 * We should be the only user so far, so not possible to be set by 545 * others concurrently. 546 */ 547 assert(qatomic_read(&p->pending_sync) == MULTIFD_SYNC_NONE); 548 qatomic_set(&p->pending_sync, req); 549 qemu_sem_post(&p->sem); 550 } 551 for (i = 0; i < migrate_multifd_channels(); i++) { 552 MultiFDSendParams *p = &multifd_send_state->params[i]; 553 554 if (multifd_send_should_exit()) { 555 return -1; 556 } 557 558 qemu_sem_wait(&multifd_send_state->channels_ready); 559 trace_multifd_send_sync_main_wait(p->id); 560 qemu_sem_wait(&p->sem_sync); 561 562 if (flush_zero_copy && p->c && (multifd_zero_copy_flush(p->c) < 0)) { 563 return -1; 564 } 565 } 566 trace_multifd_send_sync_main(multifd_send_state->packet_num); 567 568 return 0; 569 } 570 571 static void *multifd_send_thread(void *opaque) 572 { 573 MultiFDSendParams *p = opaque; 574 MigrationThread *thread = NULL; 575 Error *local_err = NULL; 576 int ret = 0; 577 bool use_packets = multifd_use_packets(); 578 579 thread = migration_threads_add(p->name, qemu_get_thread_id()); 580 581 trace_multifd_send_thread_start(p->id); 582 rcu_register_thread(); 583 584 if (use_packets) { 585 if (multifd_send_initial_packet(p, &local_err) < 0) { 586 ret = -1; 587 goto out; 588 } 589 } 590 591 while (true) { 592 qemu_sem_post(&multifd_send_state->channels_ready); 593 qemu_sem_wait(&p->sem); 594 595 if (multifd_send_should_exit()) { 596 break; 597 } 598 599 /* 600 * Read pending_job flag before p->data. Pairs with the 601 * qatomic_store_release() in multifd_send(). 602 */ 603 if (qatomic_load_acquire(&p->pending_job)) { 604 p->flags = 0; 605 p->iovs_num = 0; 606 assert(!multifd_payload_empty(p->data)); 607 608 ret = multifd_send_state->ops->send_prepare(p, &local_err); 609 if (ret != 0) { 610 break; 611 } 612 613 if (migrate_mapped_ram()) { 614 ret = file_write_ramblock_iov(p->c, p->iov, p->iovs_num, 615 &p->data->u.ram, &local_err); 616 } else { 617 ret = qio_channel_writev_full_all(p->c, p->iov, p->iovs_num, 618 NULL, 0, p->write_flags, 619 &local_err); 620 } 621 622 if (ret != 0) { 623 break; 624 } 625 626 stat64_add(&mig_stats.multifd_bytes, 627 (uint64_t)p->next_packet_size + p->packet_len); 628 629 p->next_packet_size = 0; 630 multifd_set_payload_type(p->data, MULTIFD_PAYLOAD_NONE); 631 632 /* 633 * Making sure p->data is published before saying "we're 634 * free". Pairs with the smp_mb_acquire() in 635 * multifd_send(). 636 */ 637 qatomic_store_release(&p->pending_job, false); 638 } else { 639 MultiFDSyncReq req = qatomic_read(&p->pending_sync); 640 641 /* 642 * If not a normal job, must be a sync request. Note that 643 * pending_sync is a standalone flag (unlike pending_job), so 644 * it doesn't require explicit memory barriers. 645 */ 646 assert(req != MULTIFD_SYNC_NONE); 647 648 /* Only push the SYNC message if it involves a remote sync */ 649 if (req == MULTIFD_SYNC_ALL) { 650 p->flags = MULTIFD_FLAG_SYNC; 651 multifd_send_fill_packet(p); 652 ret = qio_channel_write_all(p->c, (void *)p->packet, 653 p->packet_len, &local_err); 654 if (ret != 0) { 655 break; 656 } 657 /* p->next_packet_size will always be zero for a SYNC packet */ 658 stat64_add(&mig_stats.multifd_bytes, p->packet_len); 659 } 660 661 qatomic_set(&p->pending_sync, MULTIFD_SYNC_NONE); 662 qemu_sem_post(&p->sem_sync); 663 } 664 } 665 666 out: 667 if (ret) { 668 assert(local_err); 669 trace_multifd_send_error(p->id); 670 multifd_send_set_error(local_err); 671 multifd_send_kick_main(p); 672 error_free(local_err); 673 } 674 675 rcu_unregister_thread(); 676 migration_threads_remove(thread); 677 trace_multifd_send_thread_end(p->id, p->packets_sent); 678 679 return NULL; 680 } 681 682 static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque); 683 684 typedef struct { 685 MultiFDSendParams *p; 686 QIOChannelTLS *tioc; 687 } MultiFDTLSThreadArgs; 688 689 static void *multifd_tls_handshake_thread(void *opaque) 690 { 691 MultiFDTLSThreadArgs *args = opaque; 692 693 qio_channel_tls_handshake(args->tioc, 694 multifd_new_send_channel_async, 695 args->p, 696 NULL, 697 NULL); 698 g_free(args); 699 700 return NULL; 701 } 702 703 static bool multifd_tls_channel_connect(MultiFDSendParams *p, 704 QIOChannel *ioc, 705 Error **errp) 706 { 707 MigrationState *s = migrate_get_current(); 708 const char *hostname = s->hostname; 709 MultiFDTLSThreadArgs *args; 710 QIOChannelTLS *tioc; 711 712 tioc = migration_tls_client_create(ioc, hostname, errp); 713 if (!tioc) { 714 return false; 715 } 716 717 /* 718 * Ownership of the socket channel now transfers to the newly 719 * created TLS channel, which has already taken a reference. 720 */ 721 object_unref(OBJECT(ioc)); 722 trace_multifd_tls_outgoing_handshake_start(ioc, tioc, hostname); 723 qio_channel_set_name(QIO_CHANNEL(tioc), "multifd-tls-outgoing"); 724 725 args = g_new0(MultiFDTLSThreadArgs, 1); 726 args->tioc = tioc; 727 args->p = p; 728 729 p->tls_thread_created = true; 730 qemu_thread_create(&p->tls_thread, MIGRATION_THREAD_SRC_TLS, 731 multifd_tls_handshake_thread, args, 732 QEMU_THREAD_JOINABLE); 733 return true; 734 } 735 736 void multifd_channel_connect(MultiFDSendParams *p, QIOChannel *ioc) 737 { 738 qio_channel_set_delay(ioc, false); 739 740 migration_ioc_register_yank(ioc); 741 /* Setup p->c only if the channel is completely setup */ 742 p->c = ioc; 743 744 p->thread_created = true; 745 qemu_thread_create(&p->thread, p->name, multifd_send_thread, p, 746 QEMU_THREAD_JOINABLE); 747 } 748 749 /* 750 * When TLS is enabled this function is called once to establish the 751 * TLS connection and a second time after the TLS handshake to create 752 * the multifd channel. Without TLS it goes straight into the channel 753 * creation. 754 */ 755 static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque) 756 { 757 MultiFDSendParams *p = opaque; 758 QIOChannel *ioc = QIO_CHANNEL(qio_task_get_source(task)); 759 Error *local_err = NULL; 760 bool ret; 761 762 trace_multifd_new_send_channel_async(p->id); 763 764 if (qio_task_propagate_error(task, &local_err)) { 765 ret = false; 766 goto out; 767 } 768 769 trace_multifd_set_outgoing_channel(ioc, object_get_typename(OBJECT(ioc)), 770 migrate_get_current()->hostname); 771 772 if (migrate_channel_requires_tls_upgrade(ioc)) { 773 ret = multifd_tls_channel_connect(p, ioc, &local_err); 774 if (ret) { 775 return; 776 } 777 } else { 778 multifd_channel_connect(p, ioc); 779 ret = true; 780 } 781 782 out: 783 /* 784 * Here we're not interested whether creation succeeded, only that 785 * it happened at all. 786 */ 787 multifd_send_channel_created(); 788 789 if (ret) { 790 return; 791 } 792 793 trace_multifd_new_send_channel_async_error(p->id, local_err); 794 multifd_send_set_error(local_err); 795 /* 796 * For error cases (TLS or non-TLS), IO channel is always freed here 797 * rather than when cleanup multifd: since p->c is not set, multifd 798 * cleanup code doesn't even know its existence. 799 */ 800 object_unref(OBJECT(ioc)); 801 error_free(local_err); 802 } 803 804 static bool multifd_new_send_channel_create(gpointer opaque, Error **errp) 805 { 806 if (!multifd_use_packets()) { 807 return file_send_channel_create(opaque, errp); 808 } 809 810 socket_send_channel_create(multifd_new_send_channel_async, opaque); 811 return true; 812 } 813 814 bool multifd_send_setup(void) 815 { 816 MigrationState *s = migrate_get_current(); 817 int thread_count, ret = 0; 818 uint32_t page_count = multifd_ram_page_count(); 819 bool use_packets = multifd_use_packets(); 820 uint8_t i; 821 822 if (!migrate_multifd()) { 823 return true; 824 } 825 826 thread_count = migrate_multifd_channels(); 827 multifd_send_state = g_malloc0(sizeof(*multifd_send_state)); 828 multifd_send_state->params = g_new0(MultiFDSendParams, thread_count); 829 qemu_sem_init(&multifd_send_state->channels_created, 0); 830 qemu_sem_init(&multifd_send_state->channels_ready, 0); 831 qatomic_set(&multifd_send_state->exiting, 0); 832 multifd_send_state->ops = multifd_ops[migrate_multifd_compression()]; 833 834 for (i = 0; i < thread_count; i++) { 835 MultiFDSendParams *p = &multifd_send_state->params[i]; 836 Error *local_err = NULL; 837 838 qemu_sem_init(&p->sem, 0); 839 qemu_sem_init(&p->sem_sync, 0); 840 p->id = i; 841 p->data = multifd_send_data_alloc(); 842 843 if (use_packets) { 844 p->packet_len = sizeof(MultiFDPacket_t) 845 + sizeof(uint64_t) * page_count; 846 p->packet = g_malloc0(p->packet_len); 847 } 848 p->name = g_strdup_printf(MIGRATION_THREAD_SRC_MULTIFD, i); 849 p->write_flags = 0; 850 851 if (!multifd_new_send_channel_create(p, &local_err)) { 852 migrate_set_error(s, local_err); 853 ret = -1; 854 } 855 } 856 857 /* 858 * Wait until channel creation has started for all channels. The 859 * creation can still fail, but no more channels will be created 860 * past this point. 861 */ 862 for (i = 0; i < thread_count; i++) { 863 qemu_sem_wait(&multifd_send_state->channels_created); 864 } 865 866 if (ret) { 867 goto err; 868 } 869 870 for (i = 0; i < thread_count; i++) { 871 MultiFDSendParams *p = &multifd_send_state->params[i]; 872 Error *local_err = NULL; 873 874 ret = multifd_send_state->ops->send_setup(p, &local_err); 875 if (ret) { 876 migrate_set_error(s, local_err); 877 goto err; 878 } 879 assert(p->iov); 880 } 881 882 return true; 883 884 err: 885 migrate_set_state(&s->state, MIGRATION_STATUS_SETUP, 886 MIGRATION_STATUS_FAILED); 887 return false; 888 } 889 890 bool multifd_recv(void) 891 { 892 int i; 893 static int next_recv_channel; 894 MultiFDRecvParams *p = NULL; 895 MultiFDRecvData *data = multifd_recv_state->data; 896 897 /* 898 * next_channel can remain from a previous migration that was 899 * using more channels, so ensure it doesn't overflow if the 900 * limit is lower now. 901 */ 902 next_recv_channel %= migrate_multifd_channels(); 903 for (i = next_recv_channel;; i = (i + 1) % migrate_multifd_channels()) { 904 if (multifd_recv_should_exit()) { 905 return false; 906 } 907 908 p = &multifd_recv_state->params[i]; 909 910 if (qatomic_read(&p->pending_job) == false) { 911 next_recv_channel = (i + 1) % migrate_multifd_channels(); 912 break; 913 } 914 } 915 916 /* 917 * Order pending_job read before manipulating p->data below. Pairs 918 * with qatomic_store_release() at multifd_recv_thread(). 919 */ 920 smp_mb_acquire(); 921 922 assert(!p->data->size); 923 multifd_recv_state->data = p->data; 924 p->data = data; 925 926 /* 927 * Order p->data update before setting pending_job. Pairs with 928 * qatomic_load_acquire() at multifd_recv_thread(). 929 */ 930 qatomic_store_release(&p->pending_job, true); 931 qemu_sem_post(&p->sem); 932 933 return true; 934 } 935 936 MultiFDRecvData *multifd_get_recv_data(void) 937 { 938 return multifd_recv_state->data; 939 } 940 941 static void multifd_recv_terminate_threads(Error *err) 942 { 943 int i; 944 945 trace_multifd_recv_terminate_threads(err != NULL); 946 947 if (qatomic_xchg(&multifd_recv_state->exiting, 1)) { 948 return; 949 } 950 951 if (err) { 952 MigrationState *s = migrate_get_current(); 953 migrate_set_error(s, err); 954 if (s->state == MIGRATION_STATUS_SETUP || 955 s->state == MIGRATION_STATUS_ACTIVE) { 956 migrate_set_state(&s->state, s->state, 957 MIGRATION_STATUS_FAILED); 958 } 959 } 960 961 for (i = 0; i < migrate_multifd_channels(); i++) { 962 MultiFDRecvParams *p = &multifd_recv_state->params[i]; 963 964 /* 965 * The migration thread and channels interact differently 966 * depending on the presence of packets. 967 */ 968 if (multifd_use_packets()) { 969 /* 970 * The channel receives as long as there are packets. When 971 * packets end (i.e. MULTIFD_FLAG_SYNC is reached), the 972 * channel waits for the migration thread to sync. If the 973 * sync never happens, do it here. 974 */ 975 qemu_sem_post(&p->sem_sync); 976 } else { 977 /* 978 * The channel waits for the migration thread to give it 979 * work. When the migration thread runs out of work, it 980 * releases the channel and waits for any pending work to 981 * finish. If we reach here (e.g. due to error) before the 982 * work runs out, release the channel. 983 */ 984 qemu_sem_post(&p->sem); 985 } 986 987 /* 988 * We could arrive here for two reasons: 989 * - normal quit, i.e. everything went fine, just finished 990 * - error quit: We close the channels so the channel threads 991 * finish the qio_channel_read_all_eof() 992 */ 993 if (p->c) { 994 qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL); 995 } 996 } 997 } 998 999 void multifd_recv_shutdown(void) 1000 { 1001 if (migrate_multifd()) { 1002 multifd_recv_terminate_threads(NULL); 1003 } 1004 } 1005 1006 static void multifd_recv_cleanup_channel(MultiFDRecvParams *p) 1007 { 1008 migration_ioc_unregister_yank(p->c); 1009 object_unref(OBJECT(p->c)); 1010 p->c = NULL; 1011 qemu_mutex_destroy(&p->mutex); 1012 qemu_sem_destroy(&p->sem_sync); 1013 qemu_sem_destroy(&p->sem); 1014 g_free(p->data); 1015 p->data = NULL; 1016 g_free(p->name); 1017 p->name = NULL; 1018 p->packet_len = 0; 1019 g_free(p->packet); 1020 p->packet = NULL; 1021 g_free(p->normal); 1022 p->normal = NULL; 1023 g_free(p->zero); 1024 p->zero = NULL; 1025 multifd_recv_state->ops->recv_cleanup(p); 1026 } 1027 1028 static void multifd_recv_cleanup_state(void) 1029 { 1030 qemu_sem_destroy(&multifd_recv_state->sem_sync); 1031 g_free(multifd_recv_state->params); 1032 multifd_recv_state->params = NULL; 1033 g_free(multifd_recv_state->data); 1034 multifd_recv_state->data = NULL; 1035 g_free(multifd_recv_state); 1036 multifd_recv_state = NULL; 1037 } 1038 1039 void multifd_recv_cleanup(void) 1040 { 1041 int i; 1042 1043 if (!migrate_multifd()) { 1044 return; 1045 } 1046 multifd_recv_terminate_threads(NULL); 1047 for (i = 0; i < migrate_multifd_channels(); i++) { 1048 MultiFDRecvParams *p = &multifd_recv_state->params[i]; 1049 1050 if (p->thread_created) { 1051 qemu_thread_join(&p->thread); 1052 } 1053 } 1054 for (i = 0; i < migrate_multifd_channels(); i++) { 1055 multifd_recv_cleanup_channel(&multifd_recv_state->params[i]); 1056 } 1057 multifd_recv_cleanup_state(); 1058 } 1059 1060 void multifd_recv_sync_main(void) 1061 { 1062 int thread_count = migrate_multifd_channels(); 1063 bool file_based = !multifd_use_packets(); 1064 int i; 1065 1066 if (!migrate_multifd()) { 1067 return; 1068 } 1069 1070 /* 1071 * File-based channels don't use packets and therefore need to 1072 * wait for more work. Release them to start the sync. 1073 */ 1074 if (file_based) { 1075 for (i = 0; i < thread_count; i++) { 1076 MultiFDRecvParams *p = &multifd_recv_state->params[i]; 1077 1078 trace_multifd_recv_sync_main_signal(p->id); 1079 qemu_sem_post(&p->sem); 1080 } 1081 } 1082 1083 /* 1084 * Initiate the synchronization by waiting for all channels. 1085 * 1086 * For socket-based migration this means each channel has received 1087 * the SYNC packet on the stream. 1088 * 1089 * For file-based migration this means each channel is done with 1090 * the work (pending_job=false). 1091 */ 1092 for (i = 0; i < thread_count; i++) { 1093 trace_multifd_recv_sync_main_wait(i); 1094 qemu_sem_wait(&multifd_recv_state->sem_sync); 1095 } 1096 1097 if (file_based) { 1098 /* 1099 * For file-based loading is done in one iteration. We're 1100 * done. 1101 */ 1102 return; 1103 } 1104 1105 /* 1106 * Sync done. Release the channels for the next iteration. 1107 */ 1108 for (i = 0; i < thread_count; i++) { 1109 MultiFDRecvParams *p = &multifd_recv_state->params[i]; 1110 1111 WITH_QEMU_LOCK_GUARD(&p->mutex) { 1112 if (multifd_recv_state->packet_num < p->packet_num) { 1113 multifd_recv_state->packet_num = p->packet_num; 1114 } 1115 } 1116 trace_multifd_recv_sync_main_signal(p->id); 1117 qemu_sem_post(&p->sem_sync); 1118 } 1119 trace_multifd_recv_sync_main(multifd_recv_state->packet_num); 1120 } 1121 1122 static void *multifd_recv_thread(void *opaque) 1123 { 1124 MultiFDRecvParams *p = opaque; 1125 Error *local_err = NULL; 1126 bool use_packets = multifd_use_packets(); 1127 int ret; 1128 1129 trace_multifd_recv_thread_start(p->id); 1130 rcu_register_thread(); 1131 1132 while (true) { 1133 uint32_t flags = 0; 1134 bool has_data = false; 1135 p->normal_num = 0; 1136 1137 if (use_packets) { 1138 if (multifd_recv_should_exit()) { 1139 break; 1140 } 1141 1142 ret = qio_channel_read_all_eof(p->c, (void *)p->packet, 1143 p->packet_len, &local_err); 1144 if (ret == 0 || ret == -1) { /* 0: EOF -1: Error */ 1145 break; 1146 } 1147 1148 qemu_mutex_lock(&p->mutex); 1149 ret = multifd_recv_unfill_packet(p, &local_err); 1150 if (ret) { 1151 qemu_mutex_unlock(&p->mutex); 1152 break; 1153 } 1154 1155 flags = p->flags; 1156 /* recv methods don't know how to handle the SYNC flag */ 1157 p->flags &= ~MULTIFD_FLAG_SYNC; 1158 1159 /* 1160 * Even if it's a SYNC packet, this needs to be set 1161 * because older QEMUs (<9.0) still send data along with 1162 * the SYNC packet. 1163 */ 1164 has_data = p->normal_num || p->zero_num; 1165 qemu_mutex_unlock(&p->mutex); 1166 } else { 1167 /* 1168 * No packets, so we need to wait for the vmstate code to 1169 * give us work. 1170 */ 1171 qemu_sem_wait(&p->sem); 1172 1173 if (multifd_recv_should_exit()) { 1174 break; 1175 } 1176 1177 /* pairs with qatomic_store_release() at multifd_recv() */ 1178 if (!qatomic_load_acquire(&p->pending_job)) { 1179 /* 1180 * Migration thread did not send work, this is 1181 * equivalent to pending_sync on the sending 1182 * side. Post sem_sync to notify we reached this 1183 * point. 1184 */ 1185 qemu_sem_post(&multifd_recv_state->sem_sync); 1186 continue; 1187 } 1188 1189 has_data = !!p->data->size; 1190 } 1191 1192 if (has_data) { 1193 ret = multifd_recv_state->ops->recv(p, &local_err); 1194 if (ret != 0) { 1195 break; 1196 } 1197 } 1198 1199 if (use_packets) { 1200 if (flags & MULTIFD_FLAG_SYNC) { 1201 qemu_sem_post(&multifd_recv_state->sem_sync); 1202 qemu_sem_wait(&p->sem_sync); 1203 } 1204 } else { 1205 p->data->size = 0; 1206 /* 1207 * Order data->size update before clearing 1208 * pending_job. Pairs with smp_mb_acquire() at 1209 * multifd_recv(). 1210 */ 1211 qatomic_store_release(&p->pending_job, false); 1212 } 1213 } 1214 1215 if (local_err) { 1216 multifd_recv_terminate_threads(local_err); 1217 error_free(local_err); 1218 } 1219 1220 rcu_unregister_thread(); 1221 trace_multifd_recv_thread_end(p->id, p->packets_recved); 1222 1223 return NULL; 1224 } 1225 1226 int multifd_recv_setup(Error **errp) 1227 { 1228 int thread_count; 1229 uint32_t page_count = multifd_ram_page_count(); 1230 bool use_packets = multifd_use_packets(); 1231 uint8_t i; 1232 1233 /* 1234 * Return successfully if multiFD recv state is already initialised 1235 * or multiFD is not enabled. 1236 */ 1237 if (multifd_recv_state || !migrate_multifd()) { 1238 return 0; 1239 } 1240 1241 thread_count = migrate_multifd_channels(); 1242 multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state)); 1243 multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count); 1244 1245 multifd_recv_state->data = g_new0(MultiFDRecvData, 1); 1246 multifd_recv_state->data->size = 0; 1247 1248 qatomic_set(&multifd_recv_state->count, 0); 1249 qatomic_set(&multifd_recv_state->exiting, 0); 1250 qemu_sem_init(&multifd_recv_state->sem_sync, 0); 1251 multifd_recv_state->ops = multifd_ops[migrate_multifd_compression()]; 1252 1253 for (i = 0; i < thread_count; i++) { 1254 MultiFDRecvParams *p = &multifd_recv_state->params[i]; 1255 1256 qemu_mutex_init(&p->mutex); 1257 qemu_sem_init(&p->sem_sync, 0); 1258 qemu_sem_init(&p->sem, 0); 1259 p->pending_job = false; 1260 p->id = i; 1261 1262 p->data = g_new0(MultiFDRecvData, 1); 1263 p->data->size = 0; 1264 1265 if (use_packets) { 1266 p->packet_len = sizeof(MultiFDPacket_t) 1267 + sizeof(uint64_t) * page_count; 1268 p->packet = g_malloc0(p->packet_len); 1269 } 1270 p->name = g_strdup_printf(MIGRATION_THREAD_DST_MULTIFD, i); 1271 p->normal = g_new0(ram_addr_t, page_count); 1272 p->zero = g_new0(ram_addr_t, page_count); 1273 } 1274 1275 for (i = 0; i < thread_count; i++) { 1276 MultiFDRecvParams *p = &multifd_recv_state->params[i]; 1277 int ret; 1278 1279 ret = multifd_recv_state->ops->recv_setup(p, errp); 1280 if (ret) { 1281 return ret; 1282 } 1283 } 1284 return 0; 1285 } 1286 1287 bool multifd_recv_all_channels_created(void) 1288 { 1289 int thread_count = migrate_multifd_channels(); 1290 1291 if (!migrate_multifd()) { 1292 return true; 1293 } 1294 1295 if (!multifd_recv_state) { 1296 /* Called before any connections created */ 1297 return false; 1298 } 1299 1300 return thread_count == qatomic_read(&multifd_recv_state->count); 1301 } 1302 1303 /* 1304 * Try to receive all multifd channels to get ready for the migration. 1305 * Sets @errp when failing to receive the current channel. 1306 */ 1307 void multifd_recv_new_channel(QIOChannel *ioc, Error **errp) 1308 { 1309 MultiFDRecvParams *p; 1310 Error *local_err = NULL; 1311 bool use_packets = multifd_use_packets(); 1312 int id; 1313 1314 if (use_packets) { 1315 id = multifd_recv_initial_packet(ioc, &local_err); 1316 if (id < 0) { 1317 multifd_recv_terminate_threads(local_err); 1318 error_propagate_prepend(errp, local_err, 1319 "failed to receive packet" 1320 " via multifd channel %d: ", 1321 qatomic_read(&multifd_recv_state->count)); 1322 return; 1323 } 1324 trace_multifd_recv_new_channel(id); 1325 } else { 1326 id = qatomic_read(&multifd_recv_state->count); 1327 } 1328 1329 p = &multifd_recv_state->params[id]; 1330 if (p->c != NULL) { 1331 error_setg(&local_err, "multifd: received id '%d' already setup'", 1332 id); 1333 multifd_recv_terminate_threads(local_err); 1334 error_propagate(errp, local_err); 1335 return; 1336 } 1337 p->c = ioc; 1338 object_ref(OBJECT(ioc)); 1339 1340 p->thread_created = true; 1341 qemu_thread_create(&p->thread, p->name, multifd_recv_thread, p, 1342 QEMU_THREAD_JOINABLE); 1343 qatomic_inc(&multifd_recv_state->count); 1344 } 1345