1 /* 2 * Multifd common code 3 * 4 * Copyright (c) 2019-2020 Red Hat Inc 5 * 6 * Authors: 7 * Juan Quintela <quintela@redhat.com> 8 * 9 * This work is licensed under the terms of the GNU GPL, version 2 or later. 10 * See the COPYING file in the top-level directory. 11 */ 12 13 #include "qemu/osdep.h" 14 #include "qemu/cutils.h" 15 #include "qemu/rcu.h" 16 #include "exec/target_page.h" 17 #include "system/system.h" 18 #include "exec/ramblock.h" 19 #include "qemu/error-report.h" 20 #include "qapi/error.h" 21 #include "file.h" 22 #include "migration.h" 23 #include "migration-stats.h" 24 #include "socket.h" 25 #include "tls.h" 26 #include "qemu-file.h" 27 #include "trace.h" 28 #include "multifd.h" 29 #include "threadinfo.h" 30 #include "options.h" 31 #include "qemu/yank.h" 32 #include "io/channel-file.h" 33 #include "io/channel-socket.h" 34 #include "yank_functions.h" 35 36 /* Multiple fd's */ 37 38 #define MULTIFD_MAGIC 0x11223344U 39 #define MULTIFD_VERSION 1 40 41 typedef struct { 42 uint32_t magic; 43 uint32_t version; 44 unsigned char uuid[16]; /* QemuUUID */ 45 uint8_t id; 46 uint8_t unused1[7]; /* Reserved for future use */ 47 uint64_t unused2[4]; /* Reserved for future use */ 48 } __attribute__((packed)) MultiFDInit_t; 49 50 struct { 51 MultiFDSendParams *params; 52 /* 53 * Global number of generated multifd packets. 54 * 55 * Note that we used 'uintptr_t' because it'll naturally support atomic 56 * operations on both 32bit / 64 bits hosts. It means on 32bit systems 57 * multifd will overflow the packet_num easier, but that should be 58 * fine. 59 * 60 * Another option is to use QEMU's Stat64 then it'll be 64 bits on all 61 * hosts, however so far it does not support atomic fetch_add() yet. 62 * Make it easy for now. 63 */ 64 uintptr_t packet_num; 65 /* 66 * Synchronization point past which no more channels will be 67 * created. 68 */ 69 QemuSemaphore channels_created; 70 /* send channels ready */ 71 QemuSemaphore channels_ready; 72 /* 73 * Have we already run terminate threads. There is a race when it 74 * happens that we got one error while we are exiting. 75 * We will use atomic operations. Only valid values are 0 and 1. 76 */ 77 int exiting; 78 /* multifd ops */ 79 const MultiFDMethods *ops; 80 } *multifd_send_state; 81 82 struct { 83 MultiFDRecvParams *params; 84 MultiFDRecvData *data; 85 /* number of created threads */ 86 int count; 87 /* 88 * This is always posted by the recv threads, the migration thread 89 * uses it to wait for recv threads to finish assigned tasks. 90 */ 91 QemuSemaphore sem_sync; 92 /* global number of generated multifd packets */ 93 uint64_t packet_num; 94 int exiting; 95 /* multifd ops */ 96 const MultiFDMethods *ops; 97 } *multifd_recv_state; 98 99 MultiFDSendData *multifd_send_data_alloc(void) 100 { 101 size_t max_payload_size, size_minus_payload; 102 103 /* 104 * MultiFDPages_t has a flexible array at the end, account for it 105 * when allocating MultiFDSendData. Use max() in case other types 106 * added to the union in the future are larger than 107 * (MultiFDPages_t + flex array). 108 */ 109 max_payload_size = MAX(multifd_ram_payload_size(), sizeof(MultiFDPayload)); 110 111 /* 112 * Account for any holes the compiler might insert. We can't pack 113 * the structure because that misaligns the members and triggers 114 * Waddress-of-packed-member. 115 */ 116 size_minus_payload = sizeof(MultiFDSendData) - sizeof(MultiFDPayload); 117 118 return g_malloc0(size_minus_payload + max_payload_size); 119 } 120 121 static bool multifd_use_packets(void) 122 { 123 return !migrate_mapped_ram(); 124 } 125 126 void multifd_send_channel_created(void) 127 { 128 qemu_sem_post(&multifd_send_state->channels_created); 129 } 130 131 static const MultiFDMethods *multifd_ops[MULTIFD_COMPRESSION__MAX] = {}; 132 133 void multifd_register_ops(int method, const MultiFDMethods *ops) 134 { 135 assert(0 <= method && method < MULTIFD_COMPRESSION__MAX); 136 assert(!multifd_ops[method]); 137 multifd_ops[method] = ops; 138 } 139 140 static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp) 141 { 142 MultiFDInit_t msg = {}; 143 size_t size = sizeof(msg); 144 int ret; 145 146 msg.magic = cpu_to_be32(MULTIFD_MAGIC); 147 msg.version = cpu_to_be32(MULTIFD_VERSION); 148 msg.id = p->id; 149 memcpy(msg.uuid, &qemu_uuid.data, sizeof(msg.uuid)); 150 151 ret = qio_channel_write_all(p->c, (char *)&msg, size, errp); 152 if (ret != 0) { 153 return -1; 154 } 155 stat64_add(&mig_stats.multifd_bytes, size); 156 return 0; 157 } 158 159 static int multifd_recv_initial_packet(QIOChannel *c, Error **errp) 160 { 161 MultiFDInit_t msg; 162 int ret; 163 164 ret = qio_channel_read_all(c, (char *)&msg, sizeof(msg), errp); 165 if (ret != 0) { 166 return -1; 167 } 168 169 msg.magic = be32_to_cpu(msg.magic); 170 msg.version = be32_to_cpu(msg.version); 171 172 if (msg.magic != MULTIFD_MAGIC) { 173 error_setg(errp, "multifd: received packet magic %x " 174 "expected %x", msg.magic, MULTIFD_MAGIC); 175 return -1; 176 } 177 178 if (msg.version != MULTIFD_VERSION) { 179 error_setg(errp, "multifd: received packet version %u " 180 "expected %u", msg.version, MULTIFD_VERSION); 181 return -1; 182 } 183 184 if (memcmp(msg.uuid, &qemu_uuid, sizeof(qemu_uuid))) { 185 char *uuid = qemu_uuid_unparse_strdup(&qemu_uuid); 186 char *msg_uuid = qemu_uuid_unparse_strdup((const QemuUUID *)msg.uuid); 187 188 error_setg(errp, "multifd: received uuid '%s' and expected " 189 "uuid '%s' for channel %hhd", msg_uuid, uuid, msg.id); 190 g_free(uuid); 191 g_free(msg_uuid); 192 return -1; 193 } 194 195 if (msg.id > migrate_multifd_channels()) { 196 error_setg(errp, "multifd: received channel id %u is greater than " 197 "number of channels %u", msg.id, migrate_multifd_channels()); 198 return -1; 199 } 200 201 return msg.id; 202 } 203 204 void multifd_send_fill_packet(MultiFDSendParams *p) 205 { 206 MultiFDPacket_t *packet = p->packet; 207 uint64_t packet_num; 208 bool sync_packet = p->flags & MULTIFD_FLAG_SYNC; 209 210 memset(packet, 0, p->packet_len); 211 212 packet->magic = cpu_to_be32(MULTIFD_MAGIC); 213 packet->version = cpu_to_be32(MULTIFD_VERSION); 214 215 packet->flags = cpu_to_be32(p->flags); 216 packet->next_packet_size = cpu_to_be32(p->next_packet_size); 217 218 packet_num = qatomic_fetch_inc(&multifd_send_state->packet_num); 219 packet->packet_num = cpu_to_be64(packet_num); 220 221 p->packets_sent++; 222 223 if (!sync_packet) { 224 multifd_ram_fill_packet(p); 225 } 226 227 trace_multifd_send_fill(p->id, packet_num, 228 p->flags, p->next_packet_size); 229 } 230 231 static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp) 232 { 233 const MultiFDPacket_t *packet = p->packet; 234 uint32_t magic = be32_to_cpu(packet->magic); 235 uint32_t version = be32_to_cpu(packet->version); 236 int ret = 0; 237 238 if (magic != MULTIFD_MAGIC) { 239 error_setg(errp, "multifd: received packet magic %x, expected %x", 240 magic, MULTIFD_MAGIC); 241 return -1; 242 } 243 244 if (version != MULTIFD_VERSION) { 245 error_setg(errp, "multifd: received packet version %u, expected %u", 246 version, MULTIFD_VERSION); 247 return -1; 248 } 249 250 p->flags = be32_to_cpu(packet->flags); 251 p->next_packet_size = be32_to_cpu(packet->next_packet_size); 252 p->packet_num = be64_to_cpu(packet->packet_num); 253 p->packets_recved++; 254 255 /* Always unfill, old QEMUs (<9.0) send data along with SYNC */ 256 ret = multifd_ram_unfill_packet(p, errp); 257 258 trace_multifd_recv_unfill(p->id, p->packet_num, p->flags, 259 p->next_packet_size); 260 261 return ret; 262 } 263 264 static bool multifd_send_should_exit(void) 265 { 266 return qatomic_read(&multifd_send_state->exiting); 267 } 268 269 static bool multifd_recv_should_exit(void) 270 { 271 return qatomic_read(&multifd_recv_state->exiting); 272 } 273 274 /* 275 * The migration thread can wait on either of the two semaphores. This 276 * function can be used to kick the main thread out of waiting on either of 277 * them. Should mostly only be called when something wrong happened with 278 * the current multifd send thread. 279 */ 280 static void multifd_send_kick_main(MultiFDSendParams *p) 281 { 282 qemu_sem_post(&p->sem_sync); 283 qemu_sem_post(&multifd_send_state->channels_ready); 284 } 285 286 /* 287 * multifd_send() works by exchanging the MultiFDSendData object 288 * provided by the caller with an unused MultiFDSendData object from 289 * the next channel that is found to be idle. 290 * 291 * The channel owns the data until it finishes transmitting and the 292 * caller owns the empty object until it fills it with data and calls 293 * this function again. No locking necessary. 294 * 295 * Switching is safe because both the migration thread and the channel 296 * thread have barriers in place to serialize access. 297 * 298 * Returns true if succeed, false otherwise. 299 */ 300 bool multifd_send(MultiFDSendData **send_data) 301 { 302 int i; 303 static int next_channel; 304 MultiFDSendParams *p = NULL; /* make happy gcc */ 305 MultiFDSendData *tmp; 306 307 if (multifd_send_should_exit()) { 308 return false; 309 } 310 311 /* We wait here, until at least one channel is ready */ 312 qemu_sem_wait(&multifd_send_state->channels_ready); 313 314 /* 315 * next_channel can remain from a previous migration that was 316 * using more channels, so ensure it doesn't overflow if the 317 * limit is lower now. 318 */ 319 next_channel %= migrate_multifd_channels(); 320 for (i = next_channel;; i = (i + 1) % migrate_multifd_channels()) { 321 if (multifd_send_should_exit()) { 322 return false; 323 } 324 p = &multifd_send_state->params[i]; 325 /* 326 * Lockless read to p->pending_job is safe, because only multifd 327 * sender thread can clear it. 328 */ 329 if (qatomic_read(&p->pending_job) == false) { 330 next_channel = (i + 1) % migrate_multifd_channels(); 331 break; 332 } 333 } 334 335 /* 336 * Make sure we read p->pending_job before all the rest. Pairs with 337 * qatomic_store_release() in multifd_send_thread(). 338 */ 339 smp_mb_acquire(); 340 341 assert(multifd_payload_empty(p->data)); 342 343 /* 344 * Swap the pointers. The channel gets the client data for 345 * transferring and the client gets back an unused data slot. 346 */ 347 tmp = *send_data; 348 *send_data = p->data; 349 p->data = tmp; 350 351 /* 352 * Making sure p->data is setup before marking pending_job=true. Pairs 353 * with the qatomic_load_acquire() in multifd_send_thread(). 354 */ 355 qatomic_store_release(&p->pending_job, true); 356 qemu_sem_post(&p->sem); 357 358 return true; 359 } 360 361 /* Multifd send side hit an error; remember it and prepare to quit */ 362 static void multifd_send_set_error(Error *err) 363 { 364 /* 365 * We don't want to exit each threads twice. Depending on where 366 * we get the error, or if there are two independent errors in two 367 * threads at the same time, we can end calling this function 368 * twice. 369 */ 370 if (qatomic_xchg(&multifd_send_state->exiting, 1)) { 371 return; 372 } 373 374 if (err) { 375 MigrationState *s = migrate_get_current(); 376 migrate_set_error(s, err); 377 if (s->state == MIGRATION_STATUS_SETUP || 378 s->state == MIGRATION_STATUS_PRE_SWITCHOVER || 379 s->state == MIGRATION_STATUS_DEVICE || 380 s->state == MIGRATION_STATUS_ACTIVE) { 381 migrate_set_state(&s->state, s->state, 382 MIGRATION_STATUS_FAILED); 383 } 384 } 385 } 386 387 static void multifd_send_terminate_threads(void) 388 { 389 int i; 390 391 trace_multifd_send_terminate_threads(); 392 393 /* 394 * Tell everyone we're quitting. No xchg() needed here; we simply 395 * always set it. 396 */ 397 qatomic_set(&multifd_send_state->exiting, 1); 398 399 /* 400 * Firstly, kick all threads out; no matter whether they are just idle, 401 * or blocked in an IO system call. 402 */ 403 for (i = 0; i < migrate_multifd_channels(); i++) { 404 MultiFDSendParams *p = &multifd_send_state->params[i]; 405 406 qemu_sem_post(&p->sem); 407 if (p->c) { 408 qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL); 409 } 410 } 411 412 /* 413 * Finally recycle all the threads. 414 */ 415 for (i = 0; i < migrate_multifd_channels(); i++) { 416 MultiFDSendParams *p = &multifd_send_state->params[i]; 417 418 if (p->tls_thread_created) { 419 qemu_thread_join(&p->tls_thread); 420 } 421 422 if (p->thread_created) { 423 qemu_thread_join(&p->thread); 424 } 425 } 426 } 427 428 static bool multifd_send_cleanup_channel(MultiFDSendParams *p, Error **errp) 429 { 430 if (p->c) { 431 migration_ioc_unregister_yank(p->c); 432 /* 433 * The object_unref() cannot guarantee the fd will always be 434 * released because finalize() of the iochannel is only 435 * triggered on the last reference and it's not guaranteed 436 * that we always hold the last refcount when reaching here. 437 * 438 * Closing the fd explicitly has the benefit that if there is any 439 * registered I/O handler callbacks on such fd, that will get a 440 * POLLNVAL event and will further trigger the cleanup to finally 441 * release the IOC. 442 * 443 * FIXME: It should logically be guaranteed that all multifd 444 * channels have no I/O handler callback registered when reaching 445 * here, because migration thread will wait for all multifd channel 446 * establishments to complete during setup. Since 447 * migration_cleanup() will be scheduled in main thread too, all 448 * previous callbacks should guarantee to be completed when 449 * reaching here. See multifd_send_state.channels_created and its 450 * usage. In the future, we could replace this with an assert 451 * making sure we're the last reference, or simply drop it if above 452 * is more clear to be justified. 453 */ 454 qio_channel_close(p->c, &error_abort); 455 object_unref(OBJECT(p->c)); 456 p->c = NULL; 457 } 458 qemu_sem_destroy(&p->sem); 459 qemu_sem_destroy(&p->sem_sync); 460 g_free(p->name); 461 p->name = NULL; 462 g_free(p->data); 463 p->data = NULL; 464 p->packet_len = 0; 465 g_free(p->packet); 466 p->packet = NULL; 467 multifd_send_state->ops->send_cleanup(p, errp); 468 assert(!p->iov); 469 470 return *errp == NULL; 471 } 472 473 static void multifd_send_cleanup_state(void) 474 { 475 file_cleanup_outgoing_migration(); 476 socket_cleanup_outgoing_migration(); 477 qemu_sem_destroy(&multifd_send_state->channels_created); 478 qemu_sem_destroy(&multifd_send_state->channels_ready); 479 g_free(multifd_send_state->params); 480 multifd_send_state->params = NULL; 481 g_free(multifd_send_state); 482 multifd_send_state = NULL; 483 } 484 485 void multifd_send_shutdown(void) 486 { 487 int i; 488 489 if (!migrate_multifd()) { 490 return; 491 } 492 493 for (i = 0; i < migrate_multifd_channels(); i++) { 494 MultiFDSendParams *p = &multifd_send_state->params[i]; 495 496 /* thread_created implies the TLS handshake has succeeded */ 497 if (p->tls_thread_created && p->thread_created) { 498 Error *local_err = NULL; 499 /* 500 * The destination expects the TLS session to always be 501 * properly terminated. This helps to detect a premature 502 * termination in the middle of the stream. Note that 503 * older QEMUs always break the connection on the source 504 * and the destination always sees 505 * GNUTLS_E_PREMATURE_TERMINATION. 506 */ 507 migration_tls_channel_end(p->c, &local_err); 508 509 /* 510 * The above can return an error in case the migration has 511 * already failed. If the migration succeeded, errors are 512 * not expected but there's no need to kill the source. 513 */ 514 if (local_err && !migration_has_failed(migrate_get_current())) { 515 warn_report( 516 "multifd_send_%d: Failed to terminate TLS connection: %s", 517 p->id, error_get_pretty(local_err)); 518 break; 519 } 520 } 521 } 522 523 multifd_send_terminate_threads(); 524 525 for (i = 0; i < migrate_multifd_channels(); i++) { 526 MultiFDSendParams *p = &multifd_send_state->params[i]; 527 Error *local_err = NULL; 528 529 if (!multifd_send_cleanup_channel(p, &local_err)) { 530 migrate_set_error(migrate_get_current(), local_err); 531 error_free(local_err); 532 } 533 } 534 535 multifd_send_cleanup_state(); 536 } 537 538 static int multifd_zero_copy_flush(QIOChannel *c) 539 { 540 int ret; 541 Error *err = NULL; 542 543 ret = qio_channel_flush(c, &err); 544 if (ret < 0) { 545 error_report_err(err); 546 return -1; 547 } 548 if (ret == 1) { 549 stat64_add(&mig_stats.dirty_sync_missed_zero_copy, 1); 550 } 551 552 return ret; 553 } 554 555 int multifd_send_sync_main(MultiFDSyncReq req) 556 { 557 int i; 558 bool flush_zero_copy; 559 560 assert(req != MULTIFD_SYNC_NONE); 561 562 flush_zero_copy = migrate_zero_copy_send(); 563 564 for (i = 0; i < migrate_multifd_channels(); i++) { 565 MultiFDSendParams *p = &multifd_send_state->params[i]; 566 567 if (multifd_send_should_exit()) { 568 return -1; 569 } 570 571 trace_multifd_send_sync_main_signal(p->id); 572 573 /* 574 * We should be the only user so far, so not possible to be set by 575 * others concurrently. 576 */ 577 assert(qatomic_read(&p->pending_sync) == MULTIFD_SYNC_NONE); 578 qatomic_set(&p->pending_sync, req); 579 qemu_sem_post(&p->sem); 580 } 581 for (i = 0; i < migrate_multifd_channels(); i++) { 582 MultiFDSendParams *p = &multifd_send_state->params[i]; 583 584 if (multifd_send_should_exit()) { 585 return -1; 586 } 587 588 qemu_sem_wait(&multifd_send_state->channels_ready); 589 trace_multifd_send_sync_main_wait(p->id); 590 qemu_sem_wait(&p->sem_sync); 591 592 if (flush_zero_copy && p->c && (multifd_zero_copy_flush(p->c) < 0)) { 593 return -1; 594 } 595 } 596 trace_multifd_send_sync_main(multifd_send_state->packet_num); 597 598 return 0; 599 } 600 601 static void *multifd_send_thread(void *opaque) 602 { 603 MultiFDSendParams *p = opaque; 604 MigrationThread *thread = NULL; 605 Error *local_err = NULL; 606 int ret = 0; 607 bool use_packets = multifd_use_packets(); 608 609 thread = migration_threads_add(p->name, qemu_get_thread_id()); 610 611 trace_multifd_send_thread_start(p->id); 612 rcu_register_thread(); 613 614 if (use_packets) { 615 if (multifd_send_initial_packet(p, &local_err) < 0) { 616 ret = -1; 617 goto out; 618 } 619 } 620 621 while (true) { 622 qemu_sem_post(&multifd_send_state->channels_ready); 623 qemu_sem_wait(&p->sem); 624 625 if (multifd_send_should_exit()) { 626 break; 627 } 628 629 /* 630 * Read pending_job flag before p->data. Pairs with the 631 * qatomic_store_release() in multifd_send(). 632 */ 633 if (qatomic_load_acquire(&p->pending_job)) { 634 p->flags = 0; 635 p->iovs_num = 0; 636 assert(!multifd_payload_empty(p->data)); 637 638 ret = multifd_send_state->ops->send_prepare(p, &local_err); 639 if (ret != 0) { 640 break; 641 } 642 643 if (migrate_mapped_ram()) { 644 ret = file_write_ramblock_iov(p->c, p->iov, p->iovs_num, 645 &p->data->u.ram, &local_err); 646 } else { 647 ret = qio_channel_writev_full_all(p->c, p->iov, p->iovs_num, 648 NULL, 0, p->write_flags, 649 &local_err); 650 } 651 652 if (ret != 0) { 653 break; 654 } 655 656 stat64_add(&mig_stats.multifd_bytes, 657 (uint64_t)p->next_packet_size + p->packet_len); 658 659 p->next_packet_size = 0; 660 multifd_set_payload_type(p->data, MULTIFD_PAYLOAD_NONE); 661 662 /* 663 * Making sure p->data is published before saying "we're 664 * free". Pairs with the smp_mb_acquire() in 665 * multifd_send(). 666 */ 667 qatomic_store_release(&p->pending_job, false); 668 } else { 669 MultiFDSyncReq req = qatomic_read(&p->pending_sync); 670 671 /* 672 * If not a normal job, must be a sync request. Note that 673 * pending_sync is a standalone flag (unlike pending_job), so 674 * it doesn't require explicit memory barriers. 675 */ 676 assert(req != MULTIFD_SYNC_NONE); 677 678 /* Only push the SYNC message if it involves a remote sync */ 679 if (req == MULTIFD_SYNC_ALL) { 680 p->flags = MULTIFD_FLAG_SYNC; 681 multifd_send_fill_packet(p); 682 ret = qio_channel_write_all(p->c, (void *)p->packet, 683 p->packet_len, &local_err); 684 if (ret != 0) { 685 break; 686 } 687 /* p->next_packet_size will always be zero for a SYNC packet */ 688 stat64_add(&mig_stats.multifd_bytes, p->packet_len); 689 } 690 691 qatomic_set(&p->pending_sync, MULTIFD_SYNC_NONE); 692 qemu_sem_post(&p->sem_sync); 693 } 694 } 695 696 out: 697 if (ret) { 698 assert(local_err); 699 trace_multifd_send_error(p->id); 700 multifd_send_set_error(local_err); 701 multifd_send_kick_main(p); 702 error_free(local_err); 703 } 704 705 rcu_unregister_thread(); 706 migration_threads_remove(thread); 707 trace_multifd_send_thread_end(p->id, p->packets_sent); 708 709 return NULL; 710 } 711 712 static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque); 713 714 typedef struct { 715 MultiFDSendParams *p; 716 QIOChannelTLS *tioc; 717 } MultiFDTLSThreadArgs; 718 719 static void *multifd_tls_handshake_thread(void *opaque) 720 { 721 MultiFDTLSThreadArgs *args = opaque; 722 723 qio_channel_tls_handshake(args->tioc, 724 multifd_new_send_channel_async, 725 args->p, 726 NULL, 727 NULL); 728 g_free(args); 729 730 return NULL; 731 } 732 733 static bool multifd_tls_channel_connect(MultiFDSendParams *p, 734 QIOChannel *ioc, 735 Error **errp) 736 { 737 MigrationState *s = migrate_get_current(); 738 const char *hostname = s->hostname; 739 MultiFDTLSThreadArgs *args; 740 QIOChannelTLS *tioc; 741 742 tioc = migration_tls_client_create(ioc, hostname, errp); 743 if (!tioc) { 744 return false; 745 } 746 747 /* 748 * Ownership of the socket channel now transfers to the newly 749 * created TLS channel, which has already taken a reference. 750 */ 751 object_unref(OBJECT(ioc)); 752 trace_multifd_tls_outgoing_handshake_start(ioc, tioc, hostname); 753 qio_channel_set_name(QIO_CHANNEL(tioc), "multifd-tls-outgoing"); 754 755 args = g_new0(MultiFDTLSThreadArgs, 1); 756 args->tioc = tioc; 757 args->p = p; 758 759 p->tls_thread_created = true; 760 qemu_thread_create(&p->tls_thread, MIGRATION_THREAD_SRC_TLS, 761 multifd_tls_handshake_thread, args, 762 QEMU_THREAD_JOINABLE); 763 return true; 764 } 765 766 void multifd_channel_connect(MultiFDSendParams *p, QIOChannel *ioc) 767 { 768 qio_channel_set_delay(ioc, false); 769 770 migration_ioc_register_yank(ioc); 771 /* Setup p->c only if the channel is completely setup */ 772 p->c = ioc; 773 774 p->thread_created = true; 775 qemu_thread_create(&p->thread, p->name, multifd_send_thread, p, 776 QEMU_THREAD_JOINABLE); 777 } 778 779 /* 780 * When TLS is enabled this function is called once to establish the 781 * TLS connection and a second time after the TLS handshake to create 782 * the multifd channel. Without TLS it goes straight into the channel 783 * creation. 784 */ 785 static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque) 786 { 787 MultiFDSendParams *p = opaque; 788 QIOChannel *ioc = QIO_CHANNEL(qio_task_get_source(task)); 789 Error *local_err = NULL; 790 bool ret; 791 792 trace_multifd_new_send_channel_async(p->id); 793 794 if (qio_task_propagate_error(task, &local_err)) { 795 ret = false; 796 goto out; 797 } 798 799 trace_multifd_set_outgoing_channel(ioc, object_get_typename(OBJECT(ioc)), 800 migrate_get_current()->hostname); 801 802 if (migrate_channel_requires_tls_upgrade(ioc)) { 803 ret = multifd_tls_channel_connect(p, ioc, &local_err); 804 if (ret) { 805 return; 806 } 807 } else { 808 multifd_channel_connect(p, ioc); 809 ret = true; 810 } 811 812 out: 813 /* 814 * Here we're not interested whether creation succeeded, only that 815 * it happened at all. 816 */ 817 multifd_send_channel_created(); 818 819 if (ret) { 820 return; 821 } 822 823 trace_multifd_new_send_channel_async_error(p->id, local_err); 824 multifd_send_set_error(local_err); 825 /* 826 * For error cases (TLS or non-TLS), IO channel is always freed here 827 * rather than when cleanup multifd: since p->c is not set, multifd 828 * cleanup code doesn't even know its existence. 829 */ 830 object_unref(OBJECT(ioc)); 831 error_free(local_err); 832 } 833 834 static bool multifd_new_send_channel_create(gpointer opaque, Error **errp) 835 { 836 if (!multifd_use_packets()) { 837 return file_send_channel_create(opaque, errp); 838 } 839 840 socket_send_channel_create(multifd_new_send_channel_async, opaque); 841 return true; 842 } 843 844 bool multifd_send_setup(void) 845 { 846 MigrationState *s = migrate_get_current(); 847 int thread_count, ret = 0; 848 uint32_t page_count = multifd_ram_page_count(); 849 bool use_packets = multifd_use_packets(); 850 uint8_t i; 851 852 if (!migrate_multifd()) { 853 return true; 854 } 855 856 thread_count = migrate_multifd_channels(); 857 multifd_send_state = g_malloc0(sizeof(*multifd_send_state)); 858 multifd_send_state->params = g_new0(MultiFDSendParams, thread_count); 859 qemu_sem_init(&multifd_send_state->channels_created, 0); 860 qemu_sem_init(&multifd_send_state->channels_ready, 0); 861 qatomic_set(&multifd_send_state->exiting, 0); 862 multifd_send_state->ops = multifd_ops[migrate_multifd_compression()]; 863 864 for (i = 0; i < thread_count; i++) { 865 MultiFDSendParams *p = &multifd_send_state->params[i]; 866 Error *local_err = NULL; 867 868 qemu_sem_init(&p->sem, 0); 869 qemu_sem_init(&p->sem_sync, 0); 870 p->id = i; 871 p->data = multifd_send_data_alloc(); 872 873 if (use_packets) { 874 p->packet_len = sizeof(MultiFDPacket_t) 875 + sizeof(uint64_t) * page_count; 876 p->packet = g_malloc0(p->packet_len); 877 } 878 p->name = g_strdup_printf(MIGRATION_THREAD_SRC_MULTIFD, i); 879 p->write_flags = 0; 880 881 if (!multifd_new_send_channel_create(p, &local_err)) { 882 migrate_set_error(s, local_err); 883 ret = -1; 884 } 885 } 886 887 /* 888 * Wait until channel creation has started for all channels. The 889 * creation can still fail, but no more channels will be created 890 * past this point. 891 */ 892 for (i = 0; i < thread_count; i++) { 893 qemu_sem_wait(&multifd_send_state->channels_created); 894 } 895 896 if (ret) { 897 goto err; 898 } 899 900 for (i = 0; i < thread_count; i++) { 901 MultiFDSendParams *p = &multifd_send_state->params[i]; 902 Error *local_err = NULL; 903 904 ret = multifd_send_state->ops->send_setup(p, &local_err); 905 if (ret) { 906 migrate_set_error(s, local_err); 907 goto err; 908 } 909 assert(p->iov); 910 } 911 912 return true; 913 914 err: 915 migrate_set_state(&s->state, MIGRATION_STATUS_SETUP, 916 MIGRATION_STATUS_FAILED); 917 return false; 918 } 919 920 bool multifd_recv(void) 921 { 922 int i; 923 static int next_recv_channel; 924 MultiFDRecvParams *p = NULL; 925 MultiFDRecvData *data = multifd_recv_state->data; 926 927 /* 928 * next_channel can remain from a previous migration that was 929 * using more channels, so ensure it doesn't overflow if the 930 * limit is lower now. 931 */ 932 next_recv_channel %= migrate_multifd_channels(); 933 for (i = next_recv_channel;; i = (i + 1) % migrate_multifd_channels()) { 934 if (multifd_recv_should_exit()) { 935 return false; 936 } 937 938 p = &multifd_recv_state->params[i]; 939 940 if (qatomic_read(&p->pending_job) == false) { 941 next_recv_channel = (i + 1) % migrate_multifd_channels(); 942 break; 943 } 944 } 945 946 /* 947 * Order pending_job read before manipulating p->data below. Pairs 948 * with qatomic_store_release() at multifd_recv_thread(). 949 */ 950 smp_mb_acquire(); 951 952 assert(!p->data->size); 953 multifd_recv_state->data = p->data; 954 p->data = data; 955 956 /* 957 * Order p->data update before setting pending_job. Pairs with 958 * qatomic_load_acquire() at multifd_recv_thread(). 959 */ 960 qatomic_store_release(&p->pending_job, true); 961 qemu_sem_post(&p->sem); 962 963 return true; 964 } 965 966 MultiFDRecvData *multifd_get_recv_data(void) 967 { 968 return multifd_recv_state->data; 969 } 970 971 static void multifd_recv_terminate_threads(Error *err) 972 { 973 int i; 974 975 trace_multifd_recv_terminate_threads(err != NULL); 976 977 if (qatomic_xchg(&multifd_recv_state->exiting, 1)) { 978 return; 979 } 980 981 if (err) { 982 MigrationState *s = migrate_get_current(); 983 migrate_set_error(s, err); 984 if (s->state == MIGRATION_STATUS_SETUP || 985 s->state == MIGRATION_STATUS_ACTIVE) { 986 migrate_set_state(&s->state, s->state, 987 MIGRATION_STATUS_FAILED); 988 } 989 } 990 991 for (i = 0; i < migrate_multifd_channels(); i++) { 992 MultiFDRecvParams *p = &multifd_recv_state->params[i]; 993 994 /* 995 * The migration thread and channels interact differently 996 * depending on the presence of packets. 997 */ 998 if (multifd_use_packets()) { 999 /* 1000 * The channel receives as long as there are packets. When 1001 * packets end (i.e. MULTIFD_FLAG_SYNC is reached), the 1002 * channel waits for the migration thread to sync. If the 1003 * sync never happens, do it here. 1004 */ 1005 qemu_sem_post(&p->sem_sync); 1006 } else { 1007 /* 1008 * The channel waits for the migration thread to give it 1009 * work. When the migration thread runs out of work, it 1010 * releases the channel and waits for any pending work to 1011 * finish. If we reach here (e.g. due to error) before the 1012 * work runs out, release the channel. 1013 */ 1014 qemu_sem_post(&p->sem); 1015 } 1016 1017 /* 1018 * We could arrive here for two reasons: 1019 * - normal quit, i.e. everything went fine, just finished 1020 * - error quit: We close the channels so the channel threads 1021 * finish the qio_channel_read_all_eof() 1022 */ 1023 if (p->c) { 1024 qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL); 1025 } 1026 } 1027 } 1028 1029 void multifd_recv_shutdown(void) 1030 { 1031 if (migrate_multifd()) { 1032 multifd_recv_terminate_threads(NULL); 1033 } 1034 } 1035 1036 static void multifd_recv_cleanup_channel(MultiFDRecvParams *p) 1037 { 1038 migration_ioc_unregister_yank(p->c); 1039 object_unref(OBJECT(p->c)); 1040 p->c = NULL; 1041 qemu_mutex_destroy(&p->mutex); 1042 qemu_sem_destroy(&p->sem_sync); 1043 qemu_sem_destroy(&p->sem); 1044 g_free(p->data); 1045 p->data = NULL; 1046 g_free(p->name); 1047 p->name = NULL; 1048 p->packet_len = 0; 1049 g_free(p->packet); 1050 p->packet = NULL; 1051 g_free(p->normal); 1052 p->normal = NULL; 1053 g_free(p->zero); 1054 p->zero = NULL; 1055 multifd_recv_state->ops->recv_cleanup(p); 1056 } 1057 1058 static void multifd_recv_cleanup_state(void) 1059 { 1060 qemu_sem_destroy(&multifd_recv_state->sem_sync); 1061 g_free(multifd_recv_state->params); 1062 multifd_recv_state->params = NULL; 1063 g_free(multifd_recv_state->data); 1064 multifd_recv_state->data = NULL; 1065 g_free(multifd_recv_state); 1066 multifd_recv_state = NULL; 1067 } 1068 1069 void multifd_recv_cleanup(void) 1070 { 1071 int i; 1072 1073 if (!migrate_multifd()) { 1074 return; 1075 } 1076 multifd_recv_terminate_threads(NULL); 1077 for (i = 0; i < migrate_multifd_channels(); i++) { 1078 MultiFDRecvParams *p = &multifd_recv_state->params[i]; 1079 1080 if (p->thread_created) { 1081 qemu_thread_join(&p->thread); 1082 } 1083 } 1084 for (i = 0; i < migrate_multifd_channels(); i++) { 1085 multifd_recv_cleanup_channel(&multifd_recv_state->params[i]); 1086 } 1087 multifd_recv_cleanup_state(); 1088 } 1089 1090 void multifd_recv_sync_main(void) 1091 { 1092 int thread_count = migrate_multifd_channels(); 1093 bool file_based = !multifd_use_packets(); 1094 int i; 1095 1096 if (!migrate_multifd()) { 1097 return; 1098 } 1099 1100 /* 1101 * File-based channels don't use packets and therefore need to 1102 * wait for more work. Release them to start the sync. 1103 */ 1104 if (file_based) { 1105 for (i = 0; i < thread_count; i++) { 1106 MultiFDRecvParams *p = &multifd_recv_state->params[i]; 1107 1108 trace_multifd_recv_sync_main_signal(p->id); 1109 qemu_sem_post(&p->sem); 1110 } 1111 } 1112 1113 /* 1114 * Initiate the synchronization by waiting for all channels. 1115 * 1116 * For socket-based migration this means each channel has received 1117 * the SYNC packet on the stream. 1118 * 1119 * For file-based migration this means each channel is done with 1120 * the work (pending_job=false). 1121 */ 1122 for (i = 0; i < thread_count; i++) { 1123 trace_multifd_recv_sync_main_wait(i); 1124 qemu_sem_wait(&multifd_recv_state->sem_sync); 1125 } 1126 1127 if (file_based) { 1128 /* 1129 * For file-based loading is done in one iteration. We're 1130 * done. 1131 */ 1132 return; 1133 } 1134 1135 /* 1136 * Sync done. Release the channels for the next iteration. 1137 */ 1138 for (i = 0; i < thread_count; i++) { 1139 MultiFDRecvParams *p = &multifd_recv_state->params[i]; 1140 1141 WITH_QEMU_LOCK_GUARD(&p->mutex) { 1142 if (multifd_recv_state->packet_num < p->packet_num) { 1143 multifd_recv_state->packet_num = p->packet_num; 1144 } 1145 } 1146 trace_multifd_recv_sync_main_signal(p->id); 1147 qemu_sem_post(&p->sem_sync); 1148 } 1149 trace_multifd_recv_sync_main(multifd_recv_state->packet_num); 1150 } 1151 1152 static void *multifd_recv_thread(void *opaque) 1153 { 1154 MigrationState *s = migrate_get_current(); 1155 MultiFDRecvParams *p = opaque; 1156 Error *local_err = NULL; 1157 bool use_packets = multifd_use_packets(); 1158 int ret; 1159 1160 trace_multifd_recv_thread_start(p->id); 1161 rcu_register_thread(); 1162 1163 if (!s->multifd_clean_tls_termination) { 1164 p->read_flags = QIO_CHANNEL_READ_FLAG_RELAXED_EOF; 1165 } 1166 1167 while (true) { 1168 uint32_t flags = 0; 1169 bool has_data = false; 1170 p->normal_num = 0; 1171 1172 if (use_packets) { 1173 struct iovec iov = { 1174 .iov_base = (void *)p->packet, 1175 .iov_len = p->packet_len 1176 }; 1177 1178 if (multifd_recv_should_exit()) { 1179 break; 1180 } 1181 1182 ret = qio_channel_readv_full_all_eof(p->c, &iov, 1, NULL, NULL, 1183 p->read_flags, &local_err); 1184 if (!ret) { 1185 /* EOF */ 1186 assert(!local_err); 1187 break; 1188 } 1189 1190 if (ret == -1) { 1191 break; 1192 } 1193 1194 qemu_mutex_lock(&p->mutex); 1195 ret = multifd_recv_unfill_packet(p, &local_err); 1196 if (ret) { 1197 qemu_mutex_unlock(&p->mutex); 1198 break; 1199 } 1200 1201 flags = p->flags; 1202 /* recv methods don't know how to handle the SYNC flag */ 1203 p->flags &= ~MULTIFD_FLAG_SYNC; 1204 1205 /* 1206 * Even if it's a SYNC packet, this needs to be set 1207 * because older QEMUs (<9.0) still send data along with 1208 * the SYNC packet. 1209 */ 1210 has_data = p->normal_num || p->zero_num; 1211 qemu_mutex_unlock(&p->mutex); 1212 } else { 1213 /* 1214 * No packets, so we need to wait for the vmstate code to 1215 * give us work. 1216 */ 1217 qemu_sem_wait(&p->sem); 1218 1219 if (multifd_recv_should_exit()) { 1220 break; 1221 } 1222 1223 /* pairs with qatomic_store_release() at multifd_recv() */ 1224 if (!qatomic_load_acquire(&p->pending_job)) { 1225 /* 1226 * Migration thread did not send work, this is 1227 * equivalent to pending_sync on the sending 1228 * side. Post sem_sync to notify we reached this 1229 * point. 1230 */ 1231 qemu_sem_post(&multifd_recv_state->sem_sync); 1232 continue; 1233 } 1234 1235 has_data = !!p->data->size; 1236 } 1237 1238 if (has_data) { 1239 ret = multifd_recv_state->ops->recv(p, &local_err); 1240 if (ret != 0) { 1241 break; 1242 } 1243 } 1244 1245 if (use_packets) { 1246 if (flags & MULTIFD_FLAG_SYNC) { 1247 qemu_sem_post(&multifd_recv_state->sem_sync); 1248 qemu_sem_wait(&p->sem_sync); 1249 } 1250 } else { 1251 p->data->size = 0; 1252 /* 1253 * Order data->size update before clearing 1254 * pending_job. Pairs with smp_mb_acquire() at 1255 * multifd_recv(). 1256 */ 1257 qatomic_store_release(&p->pending_job, false); 1258 } 1259 } 1260 1261 if (local_err) { 1262 multifd_recv_terminate_threads(local_err); 1263 error_free(local_err); 1264 } 1265 1266 rcu_unregister_thread(); 1267 trace_multifd_recv_thread_end(p->id, p->packets_recved); 1268 1269 return NULL; 1270 } 1271 1272 int multifd_recv_setup(Error **errp) 1273 { 1274 int thread_count; 1275 uint32_t page_count = multifd_ram_page_count(); 1276 bool use_packets = multifd_use_packets(); 1277 uint8_t i; 1278 1279 /* 1280 * Return successfully if multiFD recv state is already initialised 1281 * or multiFD is not enabled. 1282 */ 1283 if (multifd_recv_state || !migrate_multifd()) { 1284 return 0; 1285 } 1286 1287 thread_count = migrate_multifd_channels(); 1288 multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state)); 1289 multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count); 1290 1291 multifd_recv_state->data = g_new0(MultiFDRecvData, 1); 1292 multifd_recv_state->data->size = 0; 1293 1294 qatomic_set(&multifd_recv_state->count, 0); 1295 qatomic_set(&multifd_recv_state->exiting, 0); 1296 qemu_sem_init(&multifd_recv_state->sem_sync, 0); 1297 multifd_recv_state->ops = multifd_ops[migrate_multifd_compression()]; 1298 1299 for (i = 0; i < thread_count; i++) { 1300 MultiFDRecvParams *p = &multifd_recv_state->params[i]; 1301 1302 qemu_mutex_init(&p->mutex); 1303 qemu_sem_init(&p->sem_sync, 0); 1304 qemu_sem_init(&p->sem, 0); 1305 p->pending_job = false; 1306 p->id = i; 1307 1308 p->data = g_new0(MultiFDRecvData, 1); 1309 p->data->size = 0; 1310 1311 if (use_packets) { 1312 p->packet_len = sizeof(MultiFDPacket_t) 1313 + sizeof(uint64_t) * page_count; 1314 p->packet = g_malloc0(p->packet_len); 1315 } 1316 p->name = g_strdup_printf(MIGRATION_THREAD_DST_MULTIFD, i); 1317 p->normal = g_new0(ram_addr_t, page_count); 1318 p->zero = g_new0(ram_addr_t, page_count); 1319 } 1320 1321 for (i = 0; i < thread_count; i++) { 1322 MultiFDRecvParams *p = &multifd_recv_state->params[i]; 1323 int ret; 1324 1325 ret = multifd_recv_state->ops->recv_setup(p, errp); 1326 if (ret) { 1327 return ret; 1328 } 1329 } 1330 return 0; 1331 } 1332 1333 bool multifd_recv_all_channels_created(void) 1334 { 1335 int thread_count = migrate_multifd_channels(); 1336 1337 if (!migrate_multifd()) { 1338 return true; 1339 } 1340 1341 if (!multifd_recv_state) { 1342 /* Called before any connections created */ 1343 return false; 1344 } 1345 1346 return thread_count == qatomic_read(&multifd_recv_state->count); 1347 } 1348 1349 /* 1350 * Try to receive all multifd channels to get ready for the migration. 1351 * Sets @errp when failing to receive the current channel. 1352 */ 1353 void multifd_recv_new_channel(QIOChannel *ioc, Error **errp) 1354 { 1355 MultiFDRecvParams *p; 1356 Error *local_err = NULL; 1357 bool use_packets = multifd_use_packets(); 1358 int id; 1359 1360 if (use_packets) { 1361 id = multifd_recv_initial_packet(ioc, &local_err); 1362 if (id < 0) { 1363 multifd_recv_terminate_threads(local_err); 1364 error_propagate_prepend(errp, local_err, 1365 "failed to receive packet" 1366 " via multifd channel %d: ", 1367 qatomic_read(&multifd_recv_state->count)); 1368 return; 1369 } 1370 trace_multifd_recv_new_channel(id); 1371 } else { 1372 id = qatomic_read(&multifd_recv_state->count); 1373 } 1374 1375 p = &multifd_recv_state->params[id]; 1376 if (p->c != NULL) { 1377 error_setg(&local_err, "multifd: received id '%d' already setup'", 1378 id); 1379 multifd_recv_terminate_threads(local_err); 1380 error_propagate(errp, local_err); 1381 return; 1382 } 1383 p->c = ioc; 1384 object_ref(OBJECT(ioc)); 1385 1386 p->thread_created = true; 1387 qemu_thread_create(&p->thread, p->name, multifd_recv_thread, p, 1388 QEMU_THREAD_JOINABLE); 1389 qatomic_inc(&multifd_recv_state->count); 1390 } 1391