1 /* 2 * Multifd common code 3 * 4 * Copyright (c) 2019-2020 Red Hat Inc 5 * 6 * Authors: 7 * Juan Quintela <quintela@redhat.com> 8 * 9 * This work is licensed under the terms of the GNU GPL, version 2 or later. 10 * See the COPYING file in the top-level directory. 11 */ 12 13 #include "qemu/osdep.h" 14 #include "qemu/cutils.h" 15 #include "qemu/iov.h" 16 #include "qemu/rcu.h" 17 #include "exec/target_page.h" 18 #include "system/system.h" 19 #include "exec/ramblock.h" 20 #include "qemu/error-report.h" 21 #include "qapi/error.h" 22 #include "file.h" 23 #include "migration/misc.h" 24 #include "migration.h" 25 #include "migration-stats.h" 26 #include "savevm.h" 27 #include "socket.h" 28 #include "tls.h" 29 #include "qemu-file.h" 30 #include "trace.h" 31 #include "multifd.h" 32 #include "threadinfo.h" 33 #include "options.h" 34 #include "qemu/yank.h" 35 #include "io/channel-file.h" 36 #include "io/channel-socket.h" 37 #include "yank_functions.h" 38 39 /* Multiple fd's */ 40 41 #define MULTIFD_MAGIC 0x11223344U 42 #define MULTIFD_VERSION 1 43 44 typedef struct { 45 uint32_t magic; 46 uint32_t version; 47 unsigned char uuid[16]; /* QemuUUID */ 48 uint8_t id; 49 uint8_t unused1[7]; /* Reserved for future use */ 50 uint64_t unused2[4]; /* Reserved for future use */ 51 } __attribute__((packed)) MultiFDInit_t; 52 53 struct { 54 MultiFDSendParams *params; 55 56 /* multifd_send() body is not thread safe, needs serialization */ 57 QemuMutex multifd_send_mutex; 58 59 /* 60 * Global number of generated multifd packets. 61 * 62 * Note that we used 'uintptr_t' because it'll naturally support atomic 63 * operations on both 32bit / 64 bits hosts. It means on 32bit systems 64 * multifd will overflow the packet_num easier, but that should be 65 * fine. 66 * 67 * Another option is to use QEMU's Stat64 then it'll be 64 bits on all 68 * hosts, however so far it does not support atomic fetch_add() yet. 69 * Make it easy for now. 70 */ 71 uintptr_t packet_num; 72 /* 73 * Synchronization point past which no more channels will be 74 * created. 75 */ 76 QemuSemaphore channels_created; 77 /* send channels ready */ 78 QemuSemaphore channels_ready; 79 /* 80 * Have we already run terminate threads. There is a race when it 81 * happens that we got one error while we are exiting. 82 * We will use atomic operations. Only valid values are 0 and 1. 83 */ 84 int exiting; 85 /* multifd ops */ 86 const MultiFDMethods *ops; 87 } *multifd_send_state; 88 89 struct { 90 MultiFDRecvParams *params; 91 MultiFDRecvData *data; 92 /* number of created threads */ 93 int count; 94 /* 95 * This is always posted by the recv threads, the migration thread 96 * uses it to wait for recv threads to finish assigned tasks. 97 */ 98 QemuSemaphore sem_sync; 99 /* global number of generated multifd packets */ 100 uint64_t packet_num; 101 int exiting; 102 /* multifd ops */ 103 const MultiFDMethods *ops; 104 } *multifd_recv_state; 105 106 MultiFDSendData *multifd_send_data_alloc(void) 107 { 108 MultiFDSendData *new = g_new0(MultiFDSendData, 1); 109 110 multifd_ram_payload_alloc(&new->u.ram); 111 /* Device state allocates its payload on-demand */ 112 113 return new; 114 } 115 116 void multifd_send_data_clear(MultiFDSendData *data) 117 { 118 if (multifd_payload_empty(data)) { 119 return; 120 } 121 122 switch (data->type) { 123 case MULTIFD_PAYLOAD_DEVICE_STATE: 124 multifd_send_data_clear_device_state(&data->u.device_state); 125 break; 126 default: 127 /* Nothing to do */ 128 break; 129 } 130 131 data->type = MULTIFD_PAYLOAD_NONE; 132 } 133 134 void multifd_send_data_free(MultiFDSendData *data) 135 { 136 if (!data) { 137 return; 138 } 139 140 /* This also free's device state payload */ 141 multifd_send_data_clear(data); 142 143 multifd_ram_payload_free(&data->u.ram); 144 145 g_free(data); 146 } 147 148 static bool multifd_use_packets(void) 149 { 150 return !migrate_mapped_ram(); 151 } 152 153 void multifd_send_channel_created(void) 154 { 155 qemu_sem_post(&multifd_send_state->channels_created); 156 } 157 158 static const MultiFDMethods *multifd_ops[MULTIFD_COMPRESSION__MAX] = {}; 159 160 void multifd_register_ops(int method, const MultiFDMethods *ops) 161 { 162 assert(0 <= method && method < MULTIFD_COMPRESSION__MAX); 163 assert(!multifd_ops[method]); 164 multifd_ops[method] = ops; 165 } 166 167 static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp) 168 { 169 MultiFDInit_t msg = {}; 170 size_t size = sizeof(msg); 171 int ret; 172 173 msg.magic = cpu_to_be32(MULTIFD_MAGIC); 174 msg.version = cpu_to_be32(MULTIFD_VERSION); 175 msg.id = p->id; 176 memcpy(msg.uuid, &qemu_uuid.data, sizeof(msg.uuid)); 177 178 ret = qio_channel_write_all(p->c, (char *)&msg, size, errp); 179 if (ret != 0) { 180 return -1; 181 } 182 stat64_add(&mig_stats.multifd_bytes, size); 183 return 0; 184 } 185 186 static int multifd_recv_initial_packet(QIOChannel *c, Error **errp) 187 { 188 MultiFDInit_t msg; 189 int ret; 190 191 ret = qio_channel_read_all(c, (char *)&msg, sizeof(msg), errp); 192 if (ret != 0) { 193 return -1; 194 } 195 196 msg.magic = be32_to_cpu(msg.magic); 197 msg.version = be32_to_cpu(msg.version); 198 199 if (msg.magic != MULTIFD_MAGIC) { 200 error_setg(errp, "multifd: received packet magic %x " 201 "expected %x", msg.magic, MULTIFD_MAGIC); 202 return -1; 203 } 204 205 if (msg.version != MULTIFD_VERSION) { 206 error_setg(errp, "multifd: received packet version %u " 207 "expected %u", msg.version, MULTIFD_VERSION); 208 return -1; 209 } 210 211 if (memcmp(msg.uuid, &qemu_uuid, sizeof(qemu_uuid))) { 212 char *uuid = qemu_uuid_unparse_strdup(&qemu_uuid); 213 char *msg_uuid = qemu_uuid_unparse_strdup((const QemuUUID *)msg.uuid); 214 215 error_setg(errp, "multifd: received uuid '%s' and expected " 216 "uuid '%s' for channel %hhd", msg_uuid, uuid, msg.id); 217 g_free(uuid); 218 g_free(msg_uuid); 219 return -1; 220 } 221 222 if (msg.id > migrate_multifd_channels()) { 223 error_setg(errp, "multifd: received channel id %u is greater than " 224 "number of channels %u", msg.id, migrate_multifd_channels()); 225 return -1; 226 } 227 228 return msg.id; 229 } 230 231 /* Fills a RAM multifd packet */ 232 void multifd_send_fill_packet(MultiFDSendParams *p) 233 { 234 MultiFDPacket_t *packet = p->packet; 235 uint64_t packet_num; 236 bool sync_packet = p->flags & MULTIFD_FLAG_SYNC; 237 238 memset(packet, 0, p->packet_len); 239 240 packet->hdr.magic = cpu_to_be32(MULTIFD_MAGIC); 241 packet->hdr.version = cpu_to_be32(MULTIFD_VERSION); 242 243 packet->hdr.flags = cpu_to_be32(p->flags); 244 packet->next_packet_size = cpu_to_be32(p->next_packet_size); 245 246 packet_num = qatomic_fetch_inc(&multifd_send_state->packet_num); 247 packet->packet_num = cpu_to_be64(packet_num); 248 249 p->packets_sent++; 250 251 if (!sync_packet) { 252 multifd_ram_fill_packet(p); 253 } 254 255 trace_multifd_send_fill(p->id, packet_num, 256 p->flags, p->next_packet_size); 257 } 258 259 static int multifd_recv_unfill_packet_header(MultiFDRecvParams *p, 260 const MultiFDPacketHdr_t *hdr, 261 Error **errp) 262 { 263 uint32_t magic = be32_to_cpu(hdr->magic); 264 uint32_t version = be32_to_cpu(hdr->version); 265 266 if (magic != MULTIFD_MAGIC) { 267 error_setg(errp, "multifd: received packet magic %x, expected %x", 268 magic, MULTIFD_MAGIC); 269 return -1; 270 } 271 272 if (version != MULTIFD_VERSION) { 273 error_setg(errp, "multifd: received packet version %u, expected %u", 274 version, MULTIFD_VERSION); 275 return -1; 276 } 277 278 p->flags = be32_to_cpu(hdr->flags); 279 280 return 0; 281 } 282 283 static int multifd_recv_unfill_packet_device_state(MultiFDRecvParams *p, 284 Error **errp) 285 { 286 MultiFDPacketDeviceState_t *packet = p->packet_dev_state; 287 288 packet->instance_id = be32_to_cpu(packet->instance_id); 289 p->next_packet_size = be32_to_cpu(packet->next_packet_size); 290 291 return 0; 292 } 293 294 static int multifd_recv_unfill_packet_ram(MultiFDRecvParams *p, Error **errp) 295 { 296 const MultiFDPacket_t *packet = p->packet; 297 int ret = 0; 298 299 p->next_packet_size = be32_to_cpu(packet->next_packet_size); 300 p->packet_num = be64_to_cpu(packet->packet_num); 301 302 /* Always unfill, old QEMUs (<9.0) send data along with SYNC */ 303 ret = multifd_ram_unfill_packet(p, errp); 304 305 trace_multifd_recv_unfill(p->id, p->packet_num, p->flags, 306 p->next_packet_size); 307 308 return ret; 309 } 310 311 static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp) 312 { 313 p->packets_recved++; 314 315 if (p->flags & MULTIFD_FLAG_DEVICE_STATE) { 316 return multifd_recv_unfill_packet_device_state(p, errp); 317 } 318 319 return multifd_recv_unfill_packet_ram(p, errp); 320 } 321 322 static bool multifd_send_should_exit(void) 323 { 324 return qatomic_read(&multifd_send_state->exiting); 325 } 326 327 static bool multifd_recv_should_exit(void) 328 { 329 return qatomic_read(&multifd_recv_state->exiting); 330 } 331 332 /* 333 * The migration thread can wait on either of the two semaphores. This 334 * function can be used to kick the main thread out of waiting on either of 335 * them. Should mostly only be called when something wrong happened with 336 * the current multifd send thread. 337 */ 338 static void multifd_send_kick_main(MultiFDSendParams *p) 339 { 340 qemu_sem_post(&p->sem_sync); 341 qemu_sem_post(&multifd_send_state->channels_ready); 342 } 343 344 /* 345 * multifd_send() works by exchanging the MultiFDSendData object 346 * provided by the caller with an unused MultiFDSendData object from 347 * the next channel that is found to be idle. 348 * 349 * The channel owns the data until it finishes transmitting and the 350 * caller owns the empty object until it fills it with data and calls 351 * this function again. No locking necessary. 352 * 353 * Switching is safe because both the migration thread and the channel 354 * thread have barriers in place to serialize access. 355 * 356 * Returns true if succeed, false otherwise. 357 */ 358 bool multifd_send(MultiFDSendData **send_data) 359 { 360 int i; 361 static int next_channel; 362 MultiFDSendParams *p = NULL; /* make happy gcc */ 363 MultiFDSendData *tmp; 364 365 if (multifd_send_should_exit()) { 366 return false; 367 } 368 369 QEMU_LOCK_GUARD(&multifd_send_state->multifd_send_mutex); 370 371 /* We wait here, until at least one channel is ready */ 372 qemu_sem_wait(&multifd_send_state->channels_ready); 373 374 /* 375 * next_channel can remain from a previous migration that was 376 * using more channels, so ensure it doesn't overflow if the 377 * limit is lower now. 378 */ 379 next_channel %= migrate_multifd_channels(); 380 for (i = next_channel;; i = (i + 1) % migrate_multifd_channels()) { 381 if (multifd_send_should_exit()) { 382 return false; 383 } 384 p = &multifd_send_state->params[i]; 385 /* 386 * Lockless read to p->pending_job is safe, because only multifd 387 * sender thread can clear it. 388 */ 389 if (qatomic_read(&p->pending_job) == false) { 390 next_channel = (i + 1) % migrate_multifd_channels(); 391 break; 392 } 393 } 394 395 /* 396 * Make sure we read p->pending_job before all the rest. Pairs with 397 * qatomic_store_release() in multifd_send_thread(). 398 */ 399 smp_mb_acquire(); 400 401 assert(multifd_payload_empty(p->data)); 402 403 /* 404 * Swap the pointers. The channel gets the client data for 405 * transferring and the client gets back an unused data slot. 406 */ 407 tmp = *send_data; 408 *send_data = p->data; 409 p->data = tmp; 410 411 /* 412 * Making sure p->data is setup before marking pending_job=true. Pairs 413 * with the qatomic_load_acquire() in multifd_send_thread(). 414 */ 415 qatomic_store_release(&p->pending_job, true); 416 qemu_sem_post(&p->sem); 417 418 return true; 419 } 420 421 /* Multifd send side hit an error; remember it and prepare to quit */ 422 static void multifd_send_set_error(Error *err) 423 { 424 /* 425 * We don't want to exit each threads twice. Depending on where 426 * we get the error, or if there are two independent errors in two 427 * threads at the same time, we can end calling this function 428 * twice. 429 */ 430 if (qatomic_xchg(&multifd_send_state->exiting, 1)) { 431 return; 432 } 433 434 if (err) { 435 MigrationState *s = migrate_get_current(); 436 migrate_set_error(s, err); 437 if (s->state == MIGRATION_STATUS_SETUP || 438 s->state == MIGRATION_STATUS_PRE_SWITCHOVER || 439 s->state == MIGRATION_STATUS_DEVICE || 440 s->state == MIGRATION_STATUS_ACTIVE) { 441 migrate_set_state(&s->state, s->state, 442 MIGRATION_STATUS_FAILED); 443 } 444 } 445 } 446 447 static void multifd_send_terminate_threads(void) 448 { 449 int i; 450 451 trace_multifd_send_terminate_threads(); 452 453 /* 454 * Tell everyone we're quitting. No xchg() needed here; we simply 455 * always set it. 456 */ 457 qatomic_set(&multifd_send_state->exiting, 1); 458 459 /* 460 * Firstly, kick all threads out; no matter whether they are just idle, 461 * or blocked in an IO system call. 462 */ 463 for (i = 0; i < migrate_multifd_channels(); i++) { 464 MultiFDSendParams *p = &multifd_send_state->params[i]; 465 466 qemu_sem_post(&p->sem); 467 if (p->c) { 468 qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL); 469 } 470 } 471 472 /* 473 * Finally recycle all the threads. 474 */ 475 for (i = 0; i < migrate_multifd_channels(); i++) { 476 MultiFDSendParams *p = &multifd_send_state->params[i]; 477 478 if (p->tls_thread_created) { 479 qemu_thread_join(&p->tls_thread); 480 } 481 482 if (p->thread_created) { 483 qemu_thread_join(&p->thread); 484 } 485 } 486 } 487 488 static bool multifd_send_cleanup_channel(MultiFDSendParams *p, Error **errp) 489 { 490 if (p->c) { 491 migration_ioc_unregister_yank(p->c); 492 /* 493 * The object_unref() cannot guarantee the fd will always be 494 * released because finalize() of the iochannel is only 495 * triggered on the last reference and it's not guaranteed 496 * that we always hold the last refcount when reaching here. 497 * 498 * Closing the fd explicitly has the benefit that if there is any 499 * registered I/O handler callbacks on such fd, that will get a 500 * POLLNVAL event and will further trigger the cleanup to finally 501 * release the IOC. 502 * 503 * FIXME: It should logically be guaranteed that all multifd 504 * channels have no I/O handler callback registered when reaching 505 * here, because migration thread will wait for all multifd channel 506 * establishments to complete during setup. Since 507 * migration_cleanup() will be scheduled in main thread too, all 508 * previous callbacks should guarantee to be completed when 509 * reaching here. See multifd_send_state.channels_created and its 510 * usage. In the future, we could replace this with an assert 511 * making sure we're the last reference, or simply drop it if above 512 * is more clear to be justified. 513 */ 514 qio_channel_close(p->c, &error_abort); 515 object_unref(OBJECT(p->c)); 516 p->c = NULL; 517 } 518 qemu_sem_destroy(&p->sem); 519 qemu_sem_destroy(&p->sem_sync); 520 g_free(p->name); 521 p->name = NULL; 522 g_clear_pointer(&p->data, multifd_send_data_free); 523 p->packet_len = 0; 524 g_clear_pointer(&p->packet_device_state, g_free); 525 g_free(p->packet); 526 p->packet = NULL; 527 multifd_send_state->ops->send_cleanup(p, errp); 528 assert(!p->iov); 529 530 return *errp == NULL; 531 } 532 533 static void multifd_send_cleanup_state(void) 534 { 535 file_cleanup_outgoing_migration(); 536 socket_cleanup_outgoing_migration(); 537 multifd_device_state_send_cleanup(); 538 qemu_sem_destroy(&multifd_send_state->channels_created); 539 qemu_sem_destroy(&multifd_send_state->channels_ready); 540 qemu_mutex_destroy(&multifd_send_state->multifd_send_mutex); 541 g_free(multifd_send_state->params); 542 multifd_send_state->params = NULL; 543 g_free(multifd_send_state); 544 multifd_send_state = NULL; 545 } 546 547 void multifd_send_shutdown(void) 548 { 549 int i; 550 551 if (!migrate_multifd()) { 552 return; 553 } 554 555 for (i = 0; i < migrate_multifd_channels(); i++) { 556 MultiFDSendParams *p = &multifd_send_state->params[i]; 557 558 /* thread_created implies the TLS handshake has succeeded */ 559 if (p->tls_thread_created && p->thread_created) { 560 Error *local_err = NULL; 561 /* 562 * The destination expects the TLS session to always be 563 * properly terminated. This helps to detect a premature 564 * termination in the middle of the stream. Note that 565 * older QEMUs always break the connection on the source 566 * and the destination always sees 567 * GNUTLS_E_PREMATURE_TERMINATION. 568 */ 569 migration_tls_channel_end(p->c, &local_err); 570 571 /* 572 * The above can return an error in case the migration has 573 * already failed. If the migration succeeded, errors are 574 * not expected but there's no need to kill the source. 575 */ 576 if (local_err && !migration_has_failed(migrate_get_current())) { 577 warn_report( 578 "multifd_send_%d: Failed to terminate TLS connection: %s", 579 p->id, error_get_pretty(local_err)); 580 break; 581 } 582 } 583 } 584 585 multifd_send_terminate_threads(); 586 587 for (i = 0; i < migrate_multifd_channels(); i++) { 588 MultiFDSendParams *p = &multifd_send_state->params[i]; 589 Error *local_err = NULL; 590 591 if (!multifd_send_cleanup_channel(p, &local_err)) { 592 migrate_set_error(migrate_get_current(), local_err); 593 error_free(local_err); 594 } 595 } 596 597 multifd_send_cleanup_state(); 598 } 599 600 static int multifd_zero_copy_flush(QIOChannel *c) 601 { 602 int ret; 603 Error *err = NULL; 604 605 ret = qio_channel_flush(c, &err); 606 if (ret < 0) { 607 error_report_err(err); 608 return -1; 609 } 610 if (ret == 1) { 611 stat64_add(&mig_stats.dirty_sync_missed_zero_copy, 1); 612 } 613 614 return ret; 615 } 616 617 int multifd_send_sync_main(MultiFDSyncReq req) 618 { 619 int i; 620 bool flush_zero_copy; 621 622 assert(req != MULTIFD_SYNC_NONE); 623 624 flush_zero_copy = migrate_zero_copy_send(); 625 626 for (i = 0; i < migrate_multifd_channels(); i++) { 627 MultiFDSendParams *p = &multifd_send_state->params[i]; 628 629 if (multifd_send_should_exit()) { 630 return -1; 631 } 632 633 trace_multifd_send_sync_main_signal(p->id); 634 635 /* 636 * We should be the only user so far, so not possible to be set by 637 * others concurrently. 638 */ 639 assert(qatomic_read(&p->pending_sync) == MULTIFD_SYNC_NONE); 640 qatomic_set(&p->pending_sync, req); 641 qemu_sem_post(&p->sem); 642 } 643 for (i = 0; i < migrate_multifd_channels(); i++) { 644 MultiFDSendParams *p = &multifd_send_state->params[i]; 645 646 if (multifd_send_should_exit()) { 647 return -1; 648 } 649 650 qemu_sem_wait(&multifd_send_state->channels_ready); 651 trace_multifd_send_sync_main_wait(p->id); 652 qemu_sem_wait(&p->sem_sync); 653 654 if (flush_zero_copy && p->c && (multifd_zero_copy_flush(p->c) < 0)) { 655 return -1; 656 } 657 } 658 trace_multifd_send_sync_main(multifd_send_state->packet_num); 659 660 return 0; 661 } 662 663 static void *multifd_send_thread(void *opaque) 664 { 665 MultiFDSendParams *p = opaque; 666 MigrationThread *thread = NULL; 667 Error *local_err = NULL; 668 int ret = 0; 669 bool use_packets = multifd_use_packets(); 670 671 thread = migration_threads_add(p->name, qemu_get_thread_id()); 672 673 trace_multifd_send_thread_start(p->id); 674 rcu_register_thread(); 675 676 if (use_packets) { 677 if (multifd_send_initial_packet(p, &local_err) < 0) { 678 ret = -1; 679 goto out; 680 } 681 } 682 683 while (true) { 684 qemu_sem_post(&multifd_send_state->channels_ready); 685 qemu_sem_wait(&p->sem); 686 687 if (multifd_send_should_exit()) { 688 break; 689 } 690 691 /* 692 * Read pending_job flag before p->data. Pairs with the 693 * qatomic_store_release() in multifd_send(). 694 */ 695 if (qatomic_load_acquire(&p->pending_job)) { 696 bool is_device_state = multifd_payload_device_state(p->data); 697 size_t total_size; 698 699 p->flags = 0; 700 p->iovs_num = 0; 701 assert(!multifd_payload_empty(p->data)); 702 703 if (is_device_state) { 704 multifd_device_state_send_prepare(p); 705 } else { 706 ret = multifd_send_state->ops->send_prepare(p, &local_err); 707 if (ret != 0) { 708 break; 709 } 710 } 711 712 /* 713 * The packet header in the zerocopy RAM case is accounted for 714 * in multifd_nocomp_send_prepare() - where it is actually 715 * being sent. 716 */ 717 total_size = iov_size(p->iov, p->iovs_num); 718 719 if (migrate_mapped_ram()) { 720 assert(!is_device_state); 721 722 ret = file_write_ramblock_iov(p->c, p->iov, p->iovs_num, 723 &p->data->u.ram, &local_err); 724 } else { 725 ret = qio_channel_writev_full_all(p->c, p->iov, p->iovs_num, 726 NULL, 0, p->write_flags, 727 &local_err); 728 } 729 730 if (ret != 0) { 731 break; 732 } 733 734 stat64_add(&mig_stats.multifd_bytes, total_size); 735 736 p->next_packet_size = 0; 737 multifd_send_data_clear(p->data); 738 739 /* 740 * Making sure p->data is published before saying "we're 741 * free". Pairs with the smp_mb_acquire() in 742 * multifd_send(). 743 */ 744 qatomic_store_release(&p->pending_job, false); 745 } else { 746 MultiFDSyncReq req = qatomic_read(&p->pending_sync); 747 748 /* 749 * If not a normal job, must be a sync request. Note that 750 * pending_sync is a standalone flag (unlike pending_job), so 751 * it doesn't require explicit memory barriers. 752 */ 753 assert(req != MULTIFD_SYNC_NONE); 754 755 /* Only push the SYNC message if it involves a remote sync */ 756 if (req == MULTIFD_SYNC_ALL) { 757 p->flags = MULTIFD_FLAG_SYNC; 758 multifd_send_fill_packet(p); 759 ret = qio_channel_write_all(p->c, (void *)p->packet, 760 p->packet_len, &local_err); 761 if (ret != 0) { 762 break; 763 } 764 /* p->next_packet_size will always be zero for a SYNC packet */ 765 stat64_add(&mig_stats.multifd_bytes, p->packet_len); 766 } 767 768 qatomic_set(&p->pending_sync, MULTIFD_SYNC_NONE); 769 qemu_sem_post(&p->sem_sync); 770 } 771 } 772 773 out: 774 if (ret) { 775 assert(local_err); 776 trace_multifd_send_error(p->id); 777 multifd_send_set_error(local_err); 778 multifd_send_kick_main(p); 779 error_free(local_err); 780 } 781 782 rcu_unregister_thread(); 783 migration_threads_remove(thread); 784 trace_multifd_send_thread_end(p->id, p->packets_sent); 785 786 return NULL; 787 } 788 789 static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque); 790 791 typedef struct { 792 MultiFDSendParams *p; 793 QIOChannelTLS *tioc; 794 } MultiFDTLSThreadArgs; 795 796 static void *multifd_tls_handshake_thread(void *opaque) 797 { 798 MultiFDTLSThreadArgs *args = opaque; 799 800 qio_channel_tls_handshake(args->tioc, 801 multifd_new_send_channel_async, 802 args->p, 803 NULL, 804 NULL); 805 g_free(args); 806 807 return NULL; 808 } 809 810 static bool multifd_tls_channel_connect(MultiFDSendParams *p, 811 QIOChannel *ioc, 812 Error **errp) 813 { 814 MigrationState *s = migrate_get_current(); 815 const char *hostname = s->hostname; 816 MultiFDTLSThreadArgs *args; 817 QIOChannelTLS *tioc; 818 819 tioc = migration_tls_client_create(ioc, hostname, errp); 820 if (!tioc) { 821 return false; 822 } 823 824 /* 825 * Ownership of the socket channel now transfers to the newly 826 * created TLS channel, which has already taken a reference. 827 */ 828 object_unref(OBJECT(ioc)); 829 trace_multifd_tls_outgoing_handshake_start(ioc, tioc, hostname); 830 qio_channel_set_name(QIO_CHANNEL(tioc), "multifd-tls-outgoing"); 831 832 args = g_new0(MultiFDTLSThreadArgs, 1); 833 args->tioc = tioc; 834 args->p = p; 835 836 p->tls_thread_created = true; 837 qemu_thread_create(&p->tls_thread, MIGRATION_THREAD_SRC_TLS, 838 multifd_tls_handshake_thread, args, 839 QEMU_THREAD_JOINABLE); 840 return true; 841 } 842 843 void multifd_channel_connect(MultiFDSendParams *p, QIOChannel *ioc) 844 { 845 qio_channel_set_delay(ioc, false); 846 847 migration_ioc_register_yank(ioc); 848 /* Setup p->c only if the channel is completely setup */ 849 p->c = ioc; 850 851 p->thread_created = true; 852 qemu_thread_create(&p->thread, p->name, multifd_send_thread, p, 853 QEMU_THREAD_JOINABLE); 854 } 855 856 /* 857 * When TLS is enabled this function is called once to establish the 858 * TLS connection and a second time after the TLS handshake to create 859 * the multifd channel. Without TLS it goes straight into the channel 860 * creation. 861 */ 862 static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque) 863 { 864 MultiFDSendParams *p = opaque; 865 QIOChannel *ioc = QIO_CHANNEL(qio_task_get_source(task)); 866 Error *local_err = NULL; 867 bool ret; 868 869 trace_multifd_new_send_channel_async(p->id); 870 871 if (qio_task_propagate_error(task, &local_err)) { 872 ret = false; 873 goto out; 874 } 875 876 trace_multifd_set_outgoing_channel(ioc, object_get_typename(OBJECT(ioc)), 877 migrate_get_current()->hostname); 878 879 if (migrate_channel_requires_tls_upgrade(ioc)) { 880 ret = multifd_tls_channel_connect(p, ioc, &local_err); 881 if (ret) { 882 return; 883 } 884 } else { 885 multifd_channel_connect(p, ioc); 886 ret = true; 887 } 888 889 out: 890 /* 891 * Here we're not interested whether creation succeeded, only that 892 * it happened at all. 893 */ 894 multifd_send_channel_created(); 895 896 if (ret) { 897 return; 898 } 899 900 trace_multifd_new_send_channel_async_error(p->id, local_err); 901 multifd_send_set_error(local_err); 902 /* 903 * For error cases (TLS or non-TLS), IO channel is always freed here 904 * rather than when cleanup multifd: since p->c is not set, multifd 905 * cleanup code doesn't even know its existence. 906 */ 907 object_unref(OBJECT(ioc)); 908 error_free(local_err); 909 } 910 911 static bool multifd_new_send_channel_create(gpointer opaque, Error **errp) 912 { 913 if (!multifd_use_packets()) { 914 return file_send_channel_create(opaque, errp); 915 } 916 917 socket_send_channel_create(multifd_new_send_channel_async, opaque); 918 return true; 919 } 920 921 bool multifd_send_setup(void) 922 { 923 MigrationState *s = migrate_get_current(); 924 int thread_count, ret = 0; 925 uint32_t page_count = multifd_ram_page_count(); 926 bool use_packets = multifd_use_packets(); 927 uint8_t i; 928 929 if (!migrate_multifd()) { 930 return true; 931 } 932 933 thread_count = migrate_multifd_channels(); 934 multifd_send_state = g_malloc0(sizeof(*multifd_send_state)); 935 multifd_send_state->params = g_new0(MultiFDSendParams, thread_count); 936 qemu_mutex_init(&multifd_send_state->multifd_send_mutex); 937 qemu_sem_init(&multifd_send_state->channels_created, 0); 938 qemu_sem_init(&multifd_send_state->channels_ready, 0); 939 qatomic_set(&multifd_send_state->exiting, 0); 940 multifd_send_state->ops = multifd_ops[migrate_multifd_compression()]; 941 942 for (i = 0; i < thread_count; i++) { 943 MultiFDSendParams *p = &multifd_send_state->params[i]; 944 Error *local_err = NULL; 945 946 qemu_sem_init(&p->sem, 0); 947 qemu_sem_init(&p->sem_sync, 0); 948 p->id = i; 949 p->data = multifd_send_data_alloc(); 950 951 if (use_packets) { 952 p->packet_len = sizeof(MultiFDPacket_t) 953 + sizeof(uint64_t) * page_count; 954 p->packet = g_malloc0(p->packet_len); 955 p->packet_device_state = g_malloc0(sizeof(*p->packet_device_state)); 956 p->packet_device_state->hdr.magic = cpu_to_be32(MULTIFD_MAGIC); 957 p->packet_device_state->hdr.version = cpu_to_be32(MULTIFD_VERSION); 958 } 959 p->name = g_strdup_printf(MIGRATION_THREAD_SRC_MULTIFD, i); 960 p->write_flags = 0; 961 962 if (!multifd_new_send_channel_create(p, &local_err)) { 963 migrate_set_error(s, local_err); 964 ret = -1; 965 } 966 } 967 968 /* 969 * Wait until channel creation has started for all channels. The 970 * creation can still fail, but no more channels will be created 971 * past this point. 972 */ 973 for (i = 0; i < thread_count; i++) { 974 qemu_sem_wait(&multifd_send_state->channels_created); 975 } 976 977 if (ret) { 978 goto err; 979 } 980 981 for (i = 0; i < thread_count; i++) { 982 MultiFDSendParams *p = &multifd_send_state->params[i]; 983 Error *local_err = NULL; 984 985 ret = multifd_send_state->ops->send_setup(p, &local_err); 986 if (ret) { 987 migrate_set_error(s, local_err); 988 goto err; 989 } 990 assert(p->iov); 991 } 992 993 multifd_device_state_send_setup(); 994 995 return true; 996 997 err: 998 migrate_set_state(&s->state, MIGRATION_STATUS_SETUP, 999 MIGRATION_STATUS_FAILED); 1000 return false; 1001 } 1002 1003 bool multifd_recv(void) 1004 { 1005 int i; 1006 static int next_recv_channel; 1007 MultiFDRecvParams *p = NULL; 1008 MultiFDRecvData *data = multifd_recv_state->data; 1009 1010 /* 1011 * next_channel can remain from a previous migration that was 1012 * using more channels, so ensure it doesn't overflow if the 1013 * limit is lower now. 1014 */ 1015 next_recv_channel %= migrate_multifd_channels(); 1016 for (i = next_recv_channel;; i = (i + 1) % migrate_multifd_channels()) { 1017 if (multifd_recv_should_exit()) { 1018 return false; 1019 } 1020 1021 p = &multifd_recv_state->params[i]; 1022 1023 if (qatomic_read(&p->pending_job) == false) { 1024 next_recv_channel = (i + 1) % migrate_multifd_channels(); 1025 break; 1026 } 1027 } 1028 1029 /* 1030 * Order pending_job read before manipulating p->data below. Pairs 1031 * with qatomic_store_release() at multifd_recv_thread(). 1032 */ 1033 smp_mb_acquire(); 1034 1035 assert(!p->data->size); 1036 multifd_recv_state->data = p->data; 1037 p->data = data; 1038 1039 /* 1040 * Order p->data update before setting pending_job. Pairs with 1041 * qatomic_load_acquire() at multifd_recv_thread(). 1042 */ 1043 qatomic_store_release(&p->pending_job, true); 1044 qemu_sem_post(&p->sem); 1045 1046 return true; 1047 } 1048 1049 MultiFDRecvData *multifd_get_recv_data(void) 1050 { 1051 return multifd_recv_state->data; 1052 } 1053 1054 static void multifd_recv_terminate_threads(Error *err) 1055 { 1056 int i; 1057 1058 trace_multifd_recv_terminate_threads(err != NULL); 1059 1060 if (qatomic_xchg(&multifd_recv_state->exiting, 1)) { 1061 return; 1062 } 1063 1064 if (err) { 1065 MigrationState *s = migrate_get_current(); 1066 migrate_set_error(s, err); 1067 if (s->state == MIGRATION_STATUS_SETUP || 1068 s->state == MIGRATION_STATUS_ACTIVE) { 1069 migrate_set_state(&s->state, s->state, 1070 MIGRATION_STATUS_FAILED); 1071 } 1072 } 1073 1074 for (i = 0; i < migrate_multifd_channels(); i++) { 1075 MultiFDRecvParams *p = &multifd_recv_state->params[i]; 1076 1077 /* 1078 * The migration thread and channels interact differently 1079 * depending on the presence of packets. 1080 */ 1081 if (multifd_use_packets()) { 1082 /* 1083 * The channel receives as long as there are packets. When 1084 * packets end (i.e. MULTIFD_FLAG_SYNC is reached), the 1085 * channel waits for the migration thread to sync. If the 1086 * sync never happens, do it here. 1087 */ 1088 qemu_sem_post(&p->sem_sync); 1089 } else { 1090 /* 1091 * The channel waits for the migration thread to give it 1092 * work. When the migration thread runs out of work, it 1093 * releases the channel and waits for any pending work to 1094 * finish. If we reach here (e.g. due to error) before the 1095 * work runs out, release the channel. 1096 */ 1097 qemu_sem_post(&p->sem); 1098 } 1099 1100 /* 1101 * We could arrive here for two reasons: 1102 * - normal quit, i.e. everything went fine, just finished 1103 * - error quit: We close the channels so the channel threads 1104 * finish the qio_channel_read_all_eof() 1105 */ 1106 if (p->c) { 1107 qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL); 1108 } 1109 } 1110 } 1111 1112 void multifd_recv_shutdown(void) 1113 { 1114 if (migrate_multifd()) { 1115 multifd_recv_terminate_threads(NULL); 1116 } 1117 } 1118 1119 static void multifd_recv_cleanup_channel(MultiFDRecvParams *p) 1120 { 1121 migration_ioc_unregister_yank(p->c); 1122 object_unref(OBJECT(p->c)); 1123 p->c = NULL; 1124 qemu_mutex_destroy(&p->mutex); 1125 qemu_sem_destroy(&p->sem_sync); 1126 qemu_sem_destroy(&p->sem); 1127 g_free(p->data); 1128 p->data = NULL; 1129 g_free(p->name); 1130 p->name = NULL; 1131 p->packet_len = 0; 1132 g_free(p->packet); 1133 p->packet = NULL; 1134 g_clear_pointer(&p->packet_dev_state, g_free); 1135 g_free(p->normal); 1136 p->normal = NULL; 1137 g_free(p->zero); 1138 p->zero = NULL; 1139 multifd_recv_state->ops->recv_cleanup(p); 1140 } 1141 1142 static void multifd_recv_cleanup_state(void) 1143 { 1144 qemu_sem_destroy(&multifd_recv_state->sem_sync); 1145 g_free(multifd_recv_state->params); 1146 multifd_recv_state->params = NULL; 1147 g_free(multifd_recv_state->data); 1148 multifd_recv_state->data = NULL; 1149 g_free(multifd_recv_state); 1150 multifd_recv_state = NULL; 1151 } 1152 1153 void multifd_recv_cleanup(void) 1154 { 1155 int i; 1156 1157 if (!migrate_multifd()) { 1158 return; 1159 } 1160 multifd_recv_terminate_threads(NULL); 1161 for (i = 0; i < migrate_multifd_channels(); i++) { 1162 MultiFDRecvParams *p = &multifd_recv_state->params[i]; 1163 1164 if (p->thread_created) { 1165 qemu_thread_join(&p->thread); 1166 } 1167 } 1168 for (i = 0; i < migrate_multifd_channels(); i++) { 1169 multifd_recv_cleanup_channel(&multifd_recv_state->params[i]); 1170 } 1171 multifd_recv_cleanup_state(); 1172 } 1173 1174 void multifd_recv_sync_main(void) 1175 { 1176 int thread_count = migrate_multifd_channels(); 1177 bool file_based = !multifd_use_packets(); 1178 int i; 1179 1180 if (!migrate_multifd()) { 1181 return; 1182 } 1183 1184 /* 1185 * File-based channels don't use packets and therefore need to 1186 * wait for more work. Release them to start the sync. 1187 */ 1188 if (file_based) { 1189 for (i = 0; i < thread_count; i++) { 1190 MultiFDRecvParams *p = &multifd_recv_state->params[i]; 1191 1192 trace_multifd_recv_sync_main_signal(p->id); 1193 qemu_sem_post(&p->sem); 1194 } 1195 } 1196 1197 /* 1198 * Initiate the synchronization by waiting for all channels. 1199 * 1200 * For socket-based migration this means each channel has received 1201 * the SYNC packet on the stream. 1202 * 1203 * For file-based migration this means each channel is done with 1204 * the work (pending_job=false). 1205 */ 1206 for (i = 0; i < thread_count; i++) { 1207 trace_multifd_recv_sync_main_wait(i); 1208 qemu_sem_wait(&multifd_recv_state->sem_sync); 1209 } 1210 1211 if (file_based) { 1212 /* 1213 * For file-based loading is done in one iteration. We're 1214 * done. 1215 */ 1216 return; 1217 } 1218 1219 /* 1220 * Sync done. Release the channels for the next iteration. 1221 */ 1222 for (i = 0; i < thread_count; i++) { 1223 MultiFDRecvParams *p = &multifd_recv_state->params[i]; 1224 1225 WITH_QEMU_LOCK_GUARD(&p->mutex) { 1226 if (multifd_recv_state->packet_num < p->packet_num) { 1227 multifd_recv_state->packet_num = p->packet_num; 1228 } 1229 } 1230 trace_multifd_recv_sync_main_signal(p->id); 1231 qemu_sem_post(&p->sem_sync); 1232 } 1233 trace_multifd_recv_sync_main(multifd_recv_state->packet_num); 1234 } 1235 1236 static int multifd_device_state_recv(MultiFDRecvParams *p, Error **errp) 1237 { 1238 g_autofree char *dev_state_buf = NULL; 1239 int ret; 1240 1241 dev_state_buf = g_malloc(p->next_packet_size); 1242 1243 ret = qio_channel_read_all(p->c, dev_state_buf, p->next_packet_size, errp); 1244 if (ret != 0) { 1245 return ret; 1246 } 1247 1248 if (p->packet_dev_state->idstr[sizeof(p->packet_dev_state->idstr) - 1] 1249 != 0) { 1250 error_setg(errp, "unterminated multifd device state idstr"); 1251 return -1; 1252 } 1253 1254 if (!qemu_loadvm_load_state_buffer(p->packet_dev_state->idstr, 1255 p->packet_dev_state->instance_id, 1256 dev_state_buf, p->next_packet_size, 1257 errp)) { 1258 ret = -1; 1259 } 1260 1261 return ret; 1262 } 1263 1264 static void *multifd_recv_thread(void *opaque) 1265 { 1266 MigrationState *s = migrate_get_current(); 1267 MultiFDRecvParams *p = opaque; 1268 Error *local_err = NULL; 1269 bool use_packets = multifd_use_packets(); 1270 int ret; 1271 1272 trace_multifd_recv_thread_start(p->id); 1273 rcu_register_thread(); 1274 1275 if (!s->multifd_clean_tls_termination) { 1276 p->read_flags = QIO_CHANNEL_READ_FLAG_RELAXED_EOF; 1277 } 1278 1279 while (true) { 1280 MultiFDPacketHdr_t hdr; 1281 uint32_t flags = 0; 1282 bool is_device_state = false; 1283 bool has_data = false; 1284 uint8_t *pkt_buf; 1285 size_t pkt_len; 1286 1287 p->normal_num = 0; 1288 1289 if (use_packets) { 1290 struct iovec iov = { 1291 .iov_base = (void *)&hdr, 1292 .iov_len = sizeof(hdr) 1293 }; 1294 1295 if (multifd_recv_should_exit()) { 1296 break; 1297 } 1298 1299 ret = qio_channel_readv_full_all_eof(p->c, &iov, 1, NULL, NULL, 1300 p->read_flags, &local_err); 1301 if (!ret) { 1302 /* EOF */ 1303 assert(!local_err); 1304 break; 1305 } 1306 1307 if (ret == -1) { 1308 break; 1309 } 1310 1311 ret = multifd_recv_unfill_packet_header(p, &hdr, &local_err); 1312 if (ret) { 1313 break; 1314 } 1315 1316 is_device_state = p->flags & MULTIFD_FLAG_DEVICE_STATE; 1317 if (is_device_state) { 1318 pkt_buf = (uint8_t *)p->packet_dev_state + sizeof(hdr); 1319 pkt_len = sizeof(*p->packet_dev_state) - sizeof(hdr); 1320 } else { 1321 pkt_buf = (uint8_t *)p->packet + sizeof(hdr); 1322 pkt_len = p->packet_len - sizeof(hdr); 1323 } 1324 1325 ret = qio_channel_read_all_eof(p->c, (char *)pkt_buf, pkt_len, 1326 &local_err); 1327 if (!ret) { 1328 /* EOF */ 1329 error_setg(&local_err, "multifd: unexpected EOF after packet header"); 1330 break; 1331 } 1332 1333 if (ret == -1) { 1334 break; 1335 } 1336 1337 qemu_mutex_lock(&p->mutex); 1338 ret = multifd_recv_unfill_packet(p, &local_err); 1339 if (ret) { 1340 qemu_mutex_unlock(&p->mutex); 1341 break; 1342 } 1343 1344 flags = p->flags; 1345 /* recv methods don't know how to handle the SYNC flag */ 1346 p->flags &= ~MULTIFD_FLAG_SYNC; 1347 1348 if (is_device_state) { 1349 has_data = p->next_packet_size > 0; 1350 } else { 1351 /* 1352 * Even if it's a SYNC packet, this needs to be set 1353 * because older QEMUs (<9.0) still send data along with 1354 * the SYNC packet. 1355 */ 1356 has_data = p->normal_num || p->zero_num; 1357 } 1358 1359 qemu_mutex_unlock(&p->mutex); 1360 } else { 1361 /* 1362 * No packets, so we need to wait for the vmstate code to 1363 * give us work. 1364 */ 1365 qemu_sem_wait(&p->sem); 1366 1367 if (multifd_recv_should_exit()) { 1368 break; 1369 } 1370 1371 /* pairs with qatomic_store_release() at multifd_recv() */ 1372 if (!qatomic_load_acquire(&p->pending_job)) { 1373 /* 1374 * Migration thread did not send work, this is 1375 * equivalent to pending_sync on the sending 1376 * side. Post sem_sync to notify we reached this 1377 * point. 1378 */ 1379 qemu_sem_post(&multifd_recv_state->sem_sync); 1380 continue; 1381 } 1382 1383 has_data = !!p->data->size; 1384 } 1385 1386 if (has_data) { 1387 if (is_device_state) { 1388 assert(use_packets); 1389 ret = multifd_device_state_recv(p, &local_err); 1390 } else { 1391 ret = multifd_recv_state->ops->recv(p, &local_err); 1392 } 1393 if (ret != 0) { 1394 break; 1395 } 1396 } else if (is_device_state) { 1397 error_setg(&local_err, 1398 "multifd: received empty device state packet"); 1399 break; 1400 } 1401 1402 if (use_packets) { 1403 if (flags & MULTIFD_FLAG_SYNC) { 1404 if (is_device_state) { 1405 error_setg(&local_err, 1406 "multifd: received SYNC device state packet"); 1407 break; 1408 } 1409 1410 qemu_sem_post(&multifd_recv_state->sem_sync); 1411 qemu_sem_wait(&p->sem_sync); 1412 } 1413 } else { 1414 p->data->size = 0; 1415 /* 1416 * Order data->size update before clearing 1417 * pending_job. Pairs with smp_mb_acquire() at 1418 * multifd_recv(). 1419 */ 1420 qatomic_store_release(&p->pending_job, false); 1421 } 1422 } 1423 1424 if (local_err) { 1425 multifd_recv_terminate_threads(local_err); 1426 error_free(local_err); 1427 } 1428 1429 rcu_unregister_thread(); 1430 trace_multifd_recv_thread_end(p->id, p->packets_recved); 1431 1432 return NULL; 1433 } 1434 1435 int multifd_recv_setup(Error **errp) 1436 { 1437 int thread_count; 1438 uint32_t page_count = multifd_ram_page_count(); 1439 bool use_packets = multifd_use_packets(); 1440 uint8_t i; 1441 1442 /* 1443 * Return successfully if multiFD recv state is already initialised 1444 * or multiFD is not enabled. 1445 */ 1446 if (multifd_recv_state || !migrate_multifd()) { 1447 return 0; 1448 } 1449 1450 thread_count = migrate_multifd_channels(); 1451 multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state)); 1452 multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count); 1453 1454 multifd_recv_state->data = g_new0(MultiFDRecvData, 1); 1455 multifd_recv_state->data->size = 0; 1456 1457 qatomic_set(&multifd_recv_state->count, 0); 1458 qatomic_set(&multifd_recv_state->exiting, 0); 1459 qemu_sem_init(&multifd_recv_state->sem_sync, 0); 1460 multifd_recv_state->ops = multifd_ops[migrate_multifd_compression()]; 1461 1462 for (i = 0; i < thread_count; i++) { 1463 MultiFDRecvParams *p = &multifd_recv_state->params[i]; 1464 1465 qemu_mutex_init(&p->mutex); 1466 qemu_sem_init(&p->sem_sync, 0); 1467 qemu_sem_init(&p->sem, 0); 1468 p->pending_job = false; 1469 p->id = i; 1470 1471 p->data = g_new0(MultiFDRecvData, 1); 1472 p->data->size = 0; 1473 1474 if (use_packets) { 1475 p->packet_len = sizeof(MultiFDPacket_t) 1476 + sizeof(uint64_t) * page_count; 1477 p->packet = g_malloc0(p->packet_len); 1478 p->packet_dev_state = g_malloc0(sizeof(*p->packet_dev_state)); 1479 } 1480 p->name = g_strdup_printf(MIGRATION_THREAD_DST_MULTIFD, i); 1481 p->normal = g_new0(ram_addr_t, page_count); 1482 p->zero = g_new0(ram_addr_t, page_count); 1483 } 1484 1485 for (i = 0; i < thread_count; i++) { 1486 MultiFDRecvParams *p = &multifd_recv_state->params[i]; 1487 int ret; 1488 1489 ret = multifd_recv_state->ops->recv_setup(p, errp); 1490 if (ret) { 1491 return ret; 1492 } 1493 } 1494 return 0; 1495 } 1496 1497 bool multifd_recv_all_channels_created(void) 1498 { 1499 int thread_count = migrate_multifd_channels(); 1500 1501 if (!migrate_multifd()) { 1502 return true; 1503 } 1504 1505 if (!multifd_recv_state) { 1506 /* Called before any connections created */ 1507 return false; 1508 } 1509 1510 return thread_count == qatomic_read(&multifd_recv_state->count); 1511 } 1512 1513 /* 1514 * Try to receive all multifd channels to get ready for the migration. 1515 * Sets @errp when failing to receive the current channel. 1516 */ 1517 void multifd_recv_new_channel(QIOChannel *ioc, Error **errp) 1518 { 1519 MultiFDRecvParams *p; 1520 Error *local_err = NULL; 1521 bool use_packets = multifd_use_packets(); 1522 int id; 1523 1524 if (use_packets) { 1525 id = multifd_recv_initial_packet(ioc, &local_err); 1526 if (id < 0) { 1527 multifd_recv_terminate_threads(local_err); 1528 error_propagate_prepend(errp, local_err, 1529 "failed to receive packet" 1530 " via multifd channel %d: ", 1531 qatomic_read(&multifd_recv_state->count)); 1532 return; 1533 } 1534 trace_multifd_recv_new_channel(id); 1535 } else { 1536 id = qatomic_read(&multifd_recv_state->count); 1537 } 1538 1539 p = &multifd_recv_state->params[id]; 1540 if (p->c != NULL) { 1541 error_setg(&local_err, "multifd: received id '%d' already setup'", 1542 id); 1543 multifd_recv_terminate_threads(local_err); 1544 error_propagate(errp, local_err); 1545 return; 1546 } 1547 p->c = ioc; 1548 object_ref(OBJECT(ioc)); 1549 1550 p->thread_created = true; 1551 qemu_thread_create(&p->thread, p->name, multifd_recv_thread, p, 1552 QEMU_THREAD_JOINABLE); 1553 qatomic_inc(&multifd_recv_state->count); 1554 } 1555