1 /* 2 * Multifd common code 3 * 4 * Copyright (c) 2019-2020 Red Hat Inc 5 * 6 * Authors: 7 * Juan Quintela <quintela@redhat.com> 8 * 9 * This work is licensed under the terms of the GNU GPL, version 2 or later. 10 * See the COPYING file in the top-level directory. 11 */ 12 13 #include "qemu/osdep.h" 14 #include "qemu/cutils.h" 15 #include "qemu/iov.h" 16 #include "qemu/rcu.h" 17 #include "exec/target_page.h" 18 #include "system/system.h" 19 #include "system/ramblock.h" 20 #include "qemu/error-report.h" 21 #include "qapi/error.h" 22 #include "file.h" 23 #include "migration/misc.h" 24 #include "migration.h" 25 #include "migration-stats.h" 26 #include "savevm.h" 27 #include "socket.h" 28 #include "tls.h" 29 #include "qemu-file.h" 30 #include "trace.h" 31 #include "multifd.h" 32 #include "threadinfo.h" 33 #include "options.h" 34 #include "qemu/yank.h" 35 #include "io/channel-file.h" 36 #include "io/channel-socket.h" 37 #include "yank_functions.h" 38 39 typedef struct { 40 uint32_t magic; 41 uint32_t version; 42 unsigned char uuid[16]; /* QemuUUID */ 43 uint8_t id; 44 uint8_t unused1[7]; /* Reserved for future use */ 45 uint64_t unused2[4]; /* Reserved for future use */ 46 } __attribute__((packed)) MultiFDInit_t; 47 48 struct { 49 MultiFDSendParams *params; 50 51 /* multifd_send() body is not thread safe, needs serialization */ 52 QemuMutex multifd_send_mutex; 53 54 /* 55 * Global number of generated multifd packets. 56 * 57 * Note that we used 'uintptr_t' because it'll naturally support atomic 58 * operations on both 32bit / 64 bits hosts. It means on 32bit systems 59 * multifd will overflow the packet_num easier, but that should be 60 * fine. 61 * 62 * Another option is to use QEMU's Stat64 then it'll be 64 bits on all 63 * hosts, however so far it does not support atomic fetch_add() yet. 64 * Make it easy for now. 65 */ 66 uintptr_t packet_num; 67 /* 68 * Synchronization point past which no more channels will be 69 * created. 70 */ 71 QemuSemaphore channels_created; 72 /* send channels ready */ 73 QemuSemaphore channels_ready; 74 /* 75 * Have we already run terminate threads. There is a race when it 76 * happens that we got one error while we are exiting. 77 * We will use atomic operations. Only valid values are 0 and 1. 78 */ 79 int exiting; 80 /* multifd ops */ 81 const MultiFDMethods *ops; 82 } *multifd_send_state; 83 84 struct { 85 MultiFDRecvParams *params; 86 MultiFDRecvData *data; 87 /* number of created threads */ 88 int count; 89 /* 90 * This is always posted by the recv threads, the migration thread 91 * uses it to wait for recv threads to finish assigned tasks. 92 */ 93 QemuSemaphore sem_sync; 94 /* global number of generated multifd packets */ 95 uint64_t packet_num; 96 int exiting; 97 /* multifd ops */ 98 const MultiFDMethods *ops; 99 } *multifd_recv_state; 100 101 MultiFDSendData *multifd_send_data_alloc(void) 102 { 103 MultiFDSendData *new = g_new0(MultiFDSendData, 1); 104 105 multifd_ram_payload_alloc(&new->u.ram); 106 /* Device state allocates its payload on-demand */ 107 108 return new; 109 } 110 111 void multifd_send_data_clear(MultiFDSendData *data) 112 { 113 if (multifd_payload_empty(data)) { 114 return; 115 } 116 117 switch (data->type) { 118 case MULTIFD_PAYLOAD_DEVICE_STATE: 119 multifd_send_data_clear_device_state(&data->u.device_state); 120 break; 121 default: 122 /* Nothing to do */ 123 break; 124 } 125 126 data->type = MULTIFD_PAYLOAD_NONE; 127 } 128 129 void multifd_send_data_free(MultiFDSendData *data) 130 { 131 if (!data) { 132 return; 133 } 134 135 /* This also free's device state payload */ 136 multifd_send_data_clear(data); 137 138 multifd_ram_payload_free(&data->u.ram); 139 140 g_free(data); 141 } 142 143 static bool multifd_use_packets(void) 144 { 145 return !migrate_mapped_ram(); 146 } 147 148 void multifd_send_channel_created(void) 149 { 150 qemu_sem_post(&multifd_send_state->channels_created); 151 } 152 153 static const MultiFDMethods *multifd_ops[MULTIFD_COMPRESSION__MAX] = {}; 154 155 void multifd_register_ops(int method, const MultiFDMethods *ops) 156 { 157 assert(0 <= method && method < MULTIFD_COMPRESSION__MAX); 158 assert(!multifd_ops[method]); 159 multifd_ops[method] = ops; 160 } 161 162 static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp) 163 { 164 MultiFDInit_t msg = {}; 165 size_t size = sizeof(msg); 166 int ret; 167 168 msg.magic = cpu_to_be32(MULTIFD_MAGIC); 169 msg.version = cpu_to_be32(MULTIFD_VERSION); 170 msg.id = p->id; 171 memcpy(msg.uuid, &qemu_uuid.data, sizeof(msg.uuid)); 172 173 ret = qio_channel_write_all(p->c, (char *)&msg, size, errp); 174 if (ret != 0) { 175 return -1; 176 } 177 stat64_add(&mig_stats.multifd_bytes, size); 178 return 0; 179 } 180 181 static int multifd_recv_initial_packet(QIOChannel *c, Error **errp) 182 { 183 MultiFDInit_t msg; 184 int ret; 185 186 ret = qio_channel_read_all(c, (char *)&msg, sizeof(msg), errp); 187 if (ret != 0) { 188 return -1; 189 } 190 191 msg.magic = be32_to_cpu(msg.magic); 192 msg.version = be32_to_cpu(msg.version); 193 194 if (msg.magic != MULTIFD_MAGIC) { 195 error_setg(errp, "multifd: received packet magic %x " 196 "expected %x", msg.magic, MULTIFD_MAGIC); 197 return -1; 198 } 199 200 if (msg.version != MULTIFD_VERSION) { 201 error_setg(errp, "multifd: received packet version %u " 202 "expected %u", msg.version, MULTIFD_VERSION); 203 return -1; 204 } 205 206 if (memcmp(msg.uuid, &qemu_uuid, sizeof(qemu_uuid))) { 207 char *uuid = qemu_uuid_unparse_strdup(&qemu_uuid); 208 char *msg_uuid = qemu_uuid_unparse_strdup((const QemuUUID *)msg.uuid); 209 210 error_setg(errp, "multifd: received uuid '%s' and expected " 211 "uuid '%s' for channel %hhd", msg_uuid, uuid, msg.id); 212 g_free(uuid); 213 g_free(msg_uuid); 214 return -1; 215 } 216 217 if (msg.id > migrate_multifd_channels()) { 218 error_setg(errp, "multifd: received channel id %u is greater than " 219 "number of channels %u", msg.id, migrate_multifd_channels()); 220 return -1; 221 } 222 223 return msg.id; 224 } 225 226 /* Fills a RAM multifd packet */ 227 void multifd_send_fill_packet(MultiFDSendParams *p) 228 { 229 MultiFDPacket_t *packet = p->packet; 230 uint64_t packet_num; 231 bool sync_packet = p->flags & MULTIFD_FLAG_SYNC; 232 233 memset(packet, 0, p->packet_len); 234 235 packet->hdr.magic = cpu_to_be32(MULTIFD_MAGIC); 236 packet->hdr.version = cpu_to_be32(MULTIFD_VERSION); 237 238 packet->hdr.flags = cpu_to_be32(p->flags); 239 packet->next_packet_size = cpu_to_be32(p->next_packet_size); 240 241 packet_num = qatomic_fetch_inc(&multifd_send_state->packet_num); 242 packet->packet_num = cpu_to_be64(packet_num); 243 244 p->packets_sent++; 245 246 if (!sync_packet) { 247 multifd_ram_fill_packet(p); 248 } 249 250 trace_multifd_send_fill(p->id, packet_num, 251 p->flags, p->next_packet_size); 252 } 253 254 static int multifd_recv_unfill_packet_header(MultiFDRecvParams *p, 255 const MultiFDPacketHdr_t *hdr, 256 Error **errp) 257 { 258 uint32_t magic = be32_to_cpu(hdr->magic); 259 uint32_t version = be32_to_cpu(hdr->version); 260 261 if (magic != MULTIFD_MAGIC) { 262 error_setg(errp, "multifd: received packet magic %x, expected %x", 263 magic, MULTIFD_MAGIC); 264 return -1; 265 } 266 267 if (version != MULTIFD_VERSION) { 268 error_setg(errp, "multifd: received packet version %u, expected %u", 269 version, MULTIFD_VERSION); 270 return -1; 271 } 272 273 p->flags = be32_to_cpu(hdr->flags); 274 275 return 0; 276 } 277 278 static int multifd_recv_unfill_packet_device_state(MultiFDRecvParams *p, 279 Error **errp) 280 { 281 MultiFDPacketDeviceState_t *packet = p->packet_dev_state; 282 283 packet->instance_id = be32_to_cpu(packet->instance_id); 284 p->next_packet_size = be32_to_cpu(packet->next_packet_size); 285 286 return 0; 287 } 288 289 static int multifd_recv_unfill_packet_ram(MultiFDRecvParams *p, Error **errp) 290 { 291 const MultiFDPacket_t *packet = p->packet; 292 int ret = 0; 293 294 p->next_packet_size = be32_to_cpu(packet->next_packet_size); 295 p->packet_num = be64_to_cpu(packet->packet_num); 296 297 /* Always unfill, old QEMUs (<9.0) send data along with SYNC */ 298 ret = multifd_ram_unfill_packet(p, errp); 299 300 trace_multifd_recv_unfill(p->id, p->packet_num, p->flags, 301 p->next_packet_size); 302 303 return ret; 304 } 305 306 static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp) 307 { 308 p->packets_recved++; 309 310 if (p->flags & MULTIFD_FLAG_DEVICE_STATE) { 311 return multifd_recv_unfill_packet_device_state(p, errp); 312 } 313 314 return multifd_recv_unfill_packet_ram(p, errp); 315 } 316 317 static bool multifd_send_should_exit(void) 318 { 319 return qatomic_read(&multifd_send_state->exiting); 320 } 321 322 static bool multifd_recv_should_exit(void) 323 { 324 return qatomic_read(&multifd_recv_state->exiting); 325 } 326 327 /* 328 * The migration thread can wait on either of the two semaphores. This 329 * function can be used to kick the main thread out of waiting on either of 330 * them. Should mostly only be called when something wrong happened with 331 * the current multifd send thread. 332 */ 333 static void multifd_send_kick_main(MultiFDSendParams *p) 334 { 335 qemu_sem_post(&p->sem_sync); 336 qemu_sem_post(&multifd_send_state->channels_ready); 337 } 338 339 /* 340 * multifd_send() works by exchanging the MultiFDSendData object 341 * provided by the caller with an unused MultiFDSendData object from 342 * the next channel that is found to be idle. 343 * 344 * The channel owns the data until it finishes transmitting and the 345 * caller owns the empty object until it fills it with data and calls 346 * this function again. No locking necessary. 347 * 348 * Switching is safe because both the migration thread and the channel 349 * thread have barriers in place to serialize access. 350 * 351 * Returns true if succeed, false otherwise. 352 */ 353 bool multifd_send(MultiFDSendData **send_data) 354 { 355 int i; 356 static int next_channel; 357 MultiFDSendParams *p = NULL; /* make happy gcc */ 358 MultiFDSendData *tmp; 359 360 if (multifd_send_should_exit()) { 361 return false; 362 } 363 364 QEMU_LOCK_GUARD(&multifd_send_state->multifd_send_mutex); 365 366 /* We wait here, until at least one channel is ready */ 367 qemu_sem_wait(&multifd_send_state->channels_ready); 368 369 /* 370 * next_channel can remain from a previous migration that was 371 * using more channels, so ensure it doesn't overflow if the 372 * limit is lower now. 373 */ 374 next_channel %= migrate_multifd_channels(); 375 for (i = next_channel;; i = (i + 1) % migrate_multifd_channels()) { 376 if (multifd_send_should_exit()) { 377 return false; 378 } 379 p = &multifd_send_state->params[i]; 380 /* 381 * Lockless read to p->pending_job is safe, because only multifd 382 * sender thread can clear it. 383 */ 384 if (qatomic_read(&p->pending_job) == false) { 385 next_channel = (i + 1) % migrate_multifd_channels(); 386 break; 387 } 388 } 389 390 /* 391 * Make sure we read p->pending_job before all the rest. Pairs with 392 * qatomic_store_release() in multifd_send_thread(). 393 */ 394 smp_mb_acquire(); 395 396 assert(multifd_payload_empty(p->data)); 397 398 /* 399 * Swap the pointers. The channel gets the client data for 400 * transferring and the client gets back an unused data slot. 401 */ 402 tmp = *send_data; 403 *send_data = p->data; 404 p->data = tmp; 405 406 /* 407 * Making sure p->data is setup before marking pending_job=true. Pairs 408 * with the qatomic_load_acquire() in multifd_send_thread(). 409 */ 410 qatomic_store_release(&p->pending_job, true); 411 qemu_sem_post(&p->sem); 412 413 return true; 414 } 415 416 /* Multifd send side hit an error; remember it and prepare to quit */ 417 static void multifd_send_set_error(Error *err) 418 { 419 /* 420 * We don't want to exit each threads twice. Depending on where 421 * we get the error, or if there are two independent errors in two 422 * threads at the same time, we can end calling this function 423 * twice. 424 */ 425 if (qatomic_xchg(&multifd_send_state->exiting, 1)) { 426 return; 427 } 428 429 if (err) { 430 MigrationState *s = migrate_get_current(); 431 migrate_set_error(s, err); 432 if (s->state == MIGRATION_STATUS_SETUP || 433 s->state == MIGRATION_STATUS_PRE_SWITCHOVER || 434 s->state == MIGRATION_STATUS_DEVICE || 435 s->state == MIGRATION_STATUS_ACTIVE) { 436 migrate_set_state(&s->state, s->state, 437 MIGRATION_STATUS_FAILED); 438 } 439 } 440 } 441 442 static void multifd_send_terminate_threads(void) 443 { 444 int i; 445 446 trace_multifd_send_terminate_threads(); 447 448 /* 449 * Tell everyone we're quitting. No xchg() needed here; we simply 450 * always set it. 451 */ 452 qatomic_set(&multifd_send_state->exiting, 1); 453 454 /* 455 * Firstly, kick all threads out; no matter whether they are just idle, 456 * or blocked in an IO system call. 457 */ 458 for (i = 0; i < migrate_multifd_channels(); i++) { 459 MultiFDSendParams *p = &multifd_send_state->params[i]; 460 461 qemu_sem_post(&p->sem); 462 if (p->c) { 463 qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL); 464 } 465 } 466 467 /* 468 * Finally recycle all the threads. 469 */ 470 for (i = 0; i < migrate_multifd_channels(); i++) { 471 MultiFDSendParams *p = &multifd_send_state->params[i]; 472 473 if (p->tls_thread_created) { 474 qemu_thread_join(&p->tls_thread); 475 } 476 477 if (p->thread_created) { 478 qemu_thread_join(&p->thread); 479 } 480 } 481 } 482 483 static bool multifd_send_cleanup_channel(MultiFDSendParams *p, Error **errp) 484 { 485 if (p->c) { 486 migration_ioc_unregister_yank(p->c); 487 /* 488 * The object_unref() cannot guarantee the fd will always be 489 * released because finalize() of the iochannel is only 490 * triggered on the last reference and it's not guaranteed 491 * that we always hold the last refcount when reaching here. 492 * 493 * Closing the fd explicitly has the benefit that if there is any 494 * registered I/O handler callbacks on such fd, that will get a 495 * POLLNVAL event and will further trigger the cleanup to finally 496 * release the IOC. 497 * 498 * FIXME: It should logically be guaranteed that all multifd 499 * channels have no I/O handler callback registered when reaching 500 * here, because migration thread will wait for all multifd channel 501 * establishments to complete during setup. Since 502 * migration_cleanup() will be scheduled in main thread too, all 503 * previous callbacks should guarantee to be completed when 504 * reaching here. See multifd_send_state.channels_created and its 505 * usage. In the future, we could replace this with an assert 506 * making sure we're the last reference, or simply drop it if above 507 * is more clear to be justified. 508 */ 509 qio_channel_close(p->c, &error_abort); 510 object_unref(OBJECT(p->c)); 511 p->c = NULL; 512 } 513 qemu_sem_destroy(&p->sem); 514 qemu_sem_destroy(&p->sem_sync); 515 g_free(p->name); 516 p->name = NULL; 517 g_clear_pointer(&p->data, multifd_send_data_free); 518 p->packet_len = 0; 519 g_clear_pointer(&p->packet_device_state, g_free); 520 g_free(p->packet); 521 p->packet = NULL; 522 multifd_send_state->ops->send_cleanup(p, errp); 523 assert(!p->iov); 524 525 return *errp == NULL; 526 } 527 528 static void multifd_send_cleanup_state(void) 529 { 530 file_cleanup_outgoing_migration(); 531 socket_cleanup_outgoing_migration(); 532 multifd_device_state_send_cleanup(); 533 qemu_sem_destroy(&multifd_send_state->channels_created); 534 qemu_sem_destroy(&multifd_send_state->channels_ready); 535 qemu_mutex_destroy(&multifd_send_state->multifd_send_mutex); 536 g_free(multifd_send_state->params); 537 multifd_send_state->params = NULL; 538 g_free(multifd_send_state); 539 multifd_send_state = NULL; 540 } 541 542 void multifd_send_shutdown(void) 543 { 544 int i; 545 546 if (!migrate_multifd()) { 547 return; 548 } 549 550 for (i = 0; i < migrate_multifd_channels(); i++) { 551 MultiFDSendParams *p = &multifd_send_state->params[i]; 552 553 /* thread_created implies the TLS handshake has succeeded */ 554 if (p->tls_thread_created && p->thread_created) { 555 Error *local_err = NULL; 556 /* 557 * The destination expects the TLS session to always be 558 * properly terminated. This helps to detect a premature 559 * termination in the middle of the stream. Note that 560 * older QEMUs always break the connection on the source 561 * and the destination always sees 562 * GNUTLS_E_PREMATURE_TERMINATION. 563 */ 564 migration_tls_channel_end(p->c, &local_err); 565 566 /* 567 * The above can return an error in case the migration has 568 * already failed. If the migration succeeded, errors are 569 * not expected but there's no need to kill the source. 570 */ 571 if (local_err && !migration_has_failed(migrate_get_current())) { 572 warn_report( 573 "multifd_send_%d: Failed to terminate TLS connection: %s", 574 p->id, error_get_pretty(local_err)); 575 break; 576 } 577 } 578 } 579 580 multifd_send_terminate_threads(); 581 582 for (i = 0; i < migrate_multifd_channels(); i++) { 583 MultiFDSendParams *p = &multifd_send_state->params[i]; 584 Error *local_err = NULL; 585 586 if (!multifd_send_cleanup_channel(p, &local_err)) { 587 migrate_set_error(migrate_get_current(), local_err); 588 error_free(local_err); 589 } 590 } 591 592 multifd_send_cleanup_state(); 593 } 594 595 static int multifd_zero_copy_flush(QIOChannel *c) 596 { 597 int ret; 598 Error *err = NULL; 599 600 ret = qio_channel_flush(c, &err); 601 if (ret < 0) { 602 error_report_err(err); 603 return -1; 604 } 605 if (ret == 1) { 606 stat64_add(&mig_stats.dirty_sync_missed_zero_copy, 1); 607 } 608 609 return ret; 610 } 611 612 int multifd_send_sync_main(MultiFDSyncReq req) 613 { 614 int i; 615 bool flush_zero_copy; 616 617 assert(req != MULTIFD_SYNC_NONE); 618 619 flush_zero_copy = migrate_zero_copy_send(); 620 621 for (i = 0; i < migrate_multifd_channels(); i++) { 622 MultiFDSendParams *p = &multifd_send_state->params[i]; 623 624 if (multifd_send_should_exit()) { 625 return -1; 626 } 627 628 trace_multifd_send_sync_main_signal(p->id); 629 630 /* 631 * We should be the only user so far, so not possible to be set by 632 * others concurrently. 633 */ 634 assert(qatomic_read(&p->pending_sync) == MULTIFD_SYNC_NONE); 635 qatomic_set(&p->pending_sync, req); 636 qemu_sem_post(&p->sem); 637 } 638 for (i = 0; i < migrate_multifd_channels(); i++) { 639 MultiFDSendParams *p = &multifd_send_state->params[i]; 640 641 if (multifd_send_should_exit()) { 642 return -1; 643 } 644 645 qemu_sem_wait(&multifd_send_state->channels_ready); 646 trace_multifd_send_sync_main_wait(p->id); 647 qemu_sem_wait(&p->sem_sync); 648 649 if (flush_zero_copy && p->c && (multifd_zero_copy_flush(p->c) < 0)) { 650 return -1; 651 } 652 } 653 trace_multifd_send_sync_main(multifd_send_state->packet_num); 654 655 return 0; 656 } 657 658 static void *multifd_send_thread(void *opaque) 659 { 660 MultiFDSendParams *p = opaque; 661 MigrationThread *thread = NULL; 662 Error *local_err = NULL; 663 int ret = 0; 664 bool use_packets = multifd_use_packets(); 665 666 thread = migration_threads_add(p->name, qemu_get_thread_id()); 667 668 trace_multifd_send_thread_start(p->id); 669 rcu_register_thread(); 670 671 if (use_packets) { 672 if (multifd_send_initial_packet(p, &local_err) < 0) { 673 ret = -1; 674 goto out; 675 } 676 } 677 678 while (true) { 679 qemu_sem_post(&multifd_send_state->channels_ready); 680 qemu_sem_wait(&p->sem); 681 682 if (multifd_send_should_exit()) { 683 break; 684 } 685 686 /* 687 * Read pending_job flag before p->data. Pairs with the 688 * qatomic_store_release() in multifd_send(). 689 */ 690 if (qatomic_load_acquire(&p->pending_job)) { 691 bool is_device_state = multifd_payload_device_state(p->data); 692 size_t total_size; 693 int write_flags_masked = 0; 694 695 p->flags = 0; 696 p->iovs_num = 0; 697 assert(!multifd_payload_empty(p->data)); 698 699 if (is_device_state) { 700 multifd_device_state_send_prepare(p); 701 702 /* Device state packets cannot be sent via zerocopy */ 703 write_flags_masked |= QIO_CHANNEL_WRITE_FLAG_ZERO_COPY; 704 } else { 705 ret = multifd_send_state->ops->send_prepare(p, &local_err); 706 if (ret != 0) { 707 break; 708 } 709 } 710 711 /* 712 * The packet header in the zerocopy RAM case is accounted for 713 * in multifd_nocomp_send_prepare() - where it is actually 714 * being sent. 715 */ 716 total_size = iov_size(p->iov, p->iovs_num); 717 718 if (migrate_mapped_ram()) { 719 assert(!is_device_state); 720 721 ret = file_write_ramblock_iov(p->c, p->iov, p->iovs_num, 722 &p->data->u.ram, &local_err); 723 } else { 724 ret = qio_channel_writev_full_all(p->c, p->iov, p->iovs_num, 725 NULL, 0, 726 p->write_flags & ~write_flags_masked, 727 &local_err); 728 } 729 730 if (ret != 0) { 731 break; 732 } 733 734 stat64_add(&mig_stats.multifd_bytes, total_size); 735 736 p->next_packet_size = 0; 737 multifd_send_data_clear(p->data); 738 739 /* 740 * Making sure p->data is published before saying "we're 741 * free". Pairs with the smp_mb_acquire() in 742 * multifd_send(). 743 */ 744 qatomic_store_release(&p->pending_job, false); 745 } else { 746 MultiFDSyncReq req = qatomic_read(&p->pending_sync); 747 748 /* 749 * If not a normal job, must be a sync request. Note that 750 * pending_sync is a standalone flag (unlike pending_job), so 751 * it doesn't require explicit memory barriers. 752 */ 753 assert(req != MULTIFD_SYNC_NONE); 754 755 /* Only push the SYNC message if it involves a remote sync */ 756 if (req == MULTIFD_SYNC_ALL) { 757 p->flags = MULTIFD_FLAG_SYNC; 758 multifd_send_fill_packet(p); 759 ret = qio_channel_write_all(p->c, (void *)p->packet, 760 p->packet_len, &local_err); 761 if (ret != 0) { 762 break; 763 } 764 /* p->next_packet_size will always be zero for a SYNC packet */ 765 stat64_add(&mig_stats.multifd_bytes, p->packet_len); 766 } 767 768 qatomic_set(&p->pending_sync, MULTIFD_SYNC_NONE); 769 qemu_sem_post(&p->sem_sync); 770 } 771 } 772 773 out: 774 if (ret) { 775 assert(local_err); 776 trace_multifd_send_error(p->id); 777 multifd_send_set_error(local_err); 778 multifd_send_kick_main(p); 779 error_free(local_err); 780 } 781 782 rcu_unregister_thread(); 783 migration_threads_remove(thread); 784 trace_multifd_send_thread_end(p->id, p->packets_sent); 785 786 return NULL; 787 } 788 789 static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque); 790 791 typedef struct { 792 MultiFDSendParams *p; 793 QIOChannelTLS *tioc; 794 } MultiFDTLSThreadArgs; 795 796 static void *multifd_tls_handshake_thread(void *opaque) 797 { 798 MultiFDTLSThreadArgs *args = opaque; 799 800 qio_channel_tls_handshake(args->tioc, 801 multifd_new_send_channel_async, 802 args->p, 803 NULL, 804 NULL); 805 g_free(args); 806 807 return NULL; 808 } 809 810 static bool multifd_tls_channel_connect(MultiFDSendParams *p, 811 QIOChannel *ioc, 812 Error **errp) 813 { 814 MigrationState *s = migrate_get_current(); 815 const char *hostname = s->hostname; 816 MultiFDTLSThreadArgs *args; 817 QIOChannelTLS *tioc; 818 819 tioc = migration_tls_client_create(ioc, hostname, errp); 820 if (!tioc) { 821 return false; 822 } 823 824 /* 825 * Ownership of the socket channel now transfers to the newly 826 * created TLS channel, which has already taken a reference. 827 */ 828 object_unref(OBJECT(ioc)); 829 trace_multifd_tls_outgoing_handshake_start(ioc, tioc, hostname); 830 qio_channel_set_name(QIO_CHANNEL(tioc), "multifd-tls-outgoing"); 831 832 args = g_new0(MultiFDTLSThreadArgs, 1); 833 args->tioc = tioc; 834 args->p = p; 835 836 p->tls_thread_created = true; 837 qemu_thread_create(&p->tls_thread, MIGRATION_THREAD_SRC_TLS, 838 multifd_tls_handshake_thread, args, 839 QEMU_THREAD_JOINABLE); 840 return true; 841 } 842 843 void multifd_channel_connect(MultiFDSendParams *p, QIOChannel *ioc) 844 { 845 qio_channel_set_delay(ioc, false); 846 847 migration_ioc_register_yank(ioc); 848 /* Setup p->c only if the channel is completely setup */ 849 p->c = ioc; 850 851 p->thread_created = true; 852 qemu_thread_create(&p->thread, p->name, multifd_send_thread, p, 853 QEMU_THREAD_JOINABLE); 854 } 855 856 /* 857 * When TLS is enabled this function is called once to establish the 858 * TLS connection and a second time after the TLS handshake to create 859 * the multifd channel. Without TLS it goes straight into the channel 860 * creation. 861 */ 862 static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque) 863 { 864 MultiFDSendParams *p = opaque; 865 QIOChannel *ioc = QIO_CHANNEL(qio_task_get_source(task)); 866 Error *local_err = NULL; 867 bool ret; 868 869 trace_multifd_new_send_channel_async(p->id); 870 871 if (qio_task_propagate_error(task, &local_err)) { 872 ret = false; 873 goto out; 874 } 875 876 trace_multifd_set_outgoing_channel(ioc, object_get_typename(OBJECT(ioc)), 877 migrate_get_current()->hostname); 878 879 if (migrate_channel_requires_tls_upgrade(ioc)) { 880 ret = multifd_tls_channel_connect(p, ioc, &local_err); 881 if (ret) { 882 return; 883 } 884 } else { 885 multifd_channel_connect(p, ioc); 886 ret = true; 887 } 888 889 out: 890 /* 891 * Here we're not interested whether creation succeeded, only that 892 * it happened at all. 893 */ 894 multifd_send_channel_created(); 895 896 if (ret) { 897 return; 898 } 899 900 trace_multifd_new_send_channel_async_error(p->id, local_err); 901 multifd_send_set_error(local_err); 902 /* 903 * For error cases (TLS or non-TLS), IO channel is always freed here 904 * rather than when cleanup multifd: since p->c is not set, multifd 905 * cleanup code doesn't even know its existence. 906 */ 907 object_unref(OBJECT(ioc)); 908 error_free(local_err); 909 } 910 911 static bool multifd_new_send_channel_create(gpointer opaque, Error **errp) 912 { 913 if (!multifd_use_packets()) { 914 return file_send_channel_create(opaque, errp); 915 } 916 917 socket_send_channel_create(multifd_new_send_channel_async, opaque); 918 return true; 919 } 920 921 bool multifd_send_setup(void) 922 { 923 MigrationState *s = migrate_get_current(); 924 int thread_count, ret = 0; 925 uint32_t page_count = multifd_ram_page_count(); 926 bool use_packets = multifd_use_packets(); 927 uint8_t i; 928 929 if (!migrate_multifd()) { 930 return true; 931 } 932 933 thread_count = migrate_multifd_channels(); 934 multifd_send_state = g_malloc0(sizeof(*multifd_send_state)); 935 multifd_send_state->params = g_new0(MultiFDSendParams, thread_count); 936 qemu_mutex_init(&multifd_send_state->multifd_send_mutex); 937 qemu_sem_init(&multifd_send_state->channels_created, 0); 938 qemu_sem_init(&multifd_send_state->channels_ready, 0); 939 qatomic_set(&multifd_send_state->exiting, 0); 940 multifd_send_state->ops = multifd_ops[migrate_multifd_compression()]; 941 942 for (i = 0; i < thread_count; i++) { 943 MultiFDSendParams *p = &multifd_send_state->params[i]; 944 Error *local_err = NULL; 945 946 qemu_sem_init(&p->sem, 0); 947 qemu_sem_init(&p->sem_sync, 0); 948 p->id = i; 949 p->data = multifd_send_data_alloc(); 950 951 if (use_packets) { 952 p->packet_len = sizeof(MultiFDPacket_t) 953 + sizeof(uint64_t) * page_count; 954 p->packet = g_malloc0(p->packet_len); 955 p->packet_device_state = g_malloc0(sizeof(*p->packet_device_state)); 956 p->packet_device_state->hdr.magic = cpu_to_be32(MULTIFD_MAGIC); 957 p->packet_device_state->hdr.version = cpu_to_be32(MULTIFD_VERSION); 958 } 959 p->name = g_strdup_printf(MIGRATION_THREAD_SRC_MULTIFD, i); 960 p->write_flags = 0; 961 962 if (!multifd_new_send_channel_create(p, &local_err)) { 963 migrate_set_error(s, local_err); 964 ret = -1; 965 } 966 } 967 968 /* 969 * Wait until channel creation has started for all channels. The 970 * creation can still fail, but no more channels will be created 971 * past this point. 972 */ 973 for (i = 0; i < thread_count; i++) { 974 qemu_sem_wait(&multifd_send_state->channels_created); 975 } 976 977 if (ret) { 978 goto err; 979 } 980 981 for (i = 0; i < thread_count; i++) { 982 MultiFDSendParams *p = &multifd_send_state->params[i]; 983 Error *local_err = NULL; 984 985 ret = multifd_send_state->ops->send_setup(p, &local_err); 986 if (ret) { 987 migrate_set_error(s, local_err); 988 goto err; 989 } 990 assert(p->iov); 991 } 992 993 multifd_device_state_send_setup(); 994 995 return true; 996 997 err: 998 migrate_set_state(&s->state, MIGRATION_STATUS_SETUP, 999 MIGRATION_STATUS_FAILED); 1000 return false; 1001 } 1002 1003 bool multifd_recv(void) 1004 { 1005 int i; 1006 static int next_recv_channel; 1007 MultiFDRecvParams *p = NULL; 1008 MultiFDRecvData *data = multifd_recv_state->data; 1009 1010 /* 1011 * next_channel can remain from a previous migration that was 1012 * using more channels, so ensure it doesn't overflow if the 1013 * limit is lower now. 1014 */ 1015 next_recv_channel %= migrate_multifd_channels(); 1016 for (i = next_recv_channel;; i = (i + 1) % migrate_multifd_channels()) { 1017 if (multifd_recv_should_exit()) { 1018 return false; 1019 } 1020 1021 p = &multifd_recv_state->params[i]; 1022 1023 if (qatomic_read(&p->pending_job) == false) { 1024 next_recv_channel = (i + 1) % migrate_multifd_channels(); 1025 break; 1026 } 1027 } 1028 1029 /* 1030 * Order pending_job read before manipulating p->data below. Pairs 1031 * with qatomic_store_release() at multifd_recv_thread(). 1032 */ 1033 smp_mb_acquire(); 1034 1035 assert(!p->data->size); 1036 multifd_recv_state->data = p->data; 1037 p->data = data; 1038 1039 /* 1040 * Order p->data update before setting pending_job. Pairs with 1041 * qatomic_load_acquire() at multifd_recv_thread(). 1042 */ 1043 qatomic_store_release(&p->pending_job, true); 1044 qemu_sem_post(&p->sem); 1045 1046 return true; 1047 } 1048 1049 MultiFDRecvData *multifd_get_recv_data(void) 1050 { 1051 return multifd_recv_state->data; 1052 } 1053 1054 static void multifd_recv_terminate_threads(Error *err) 1055 { 1056 int i; 1057 1058 trace_multifd_recv_terminate_threads(err != NULL); 1059 1060 if (qatomic_xchg(&multifd_recv_state->exiting, 1)) { 1061 return; 1062 } 1063 1064 if (err) { 1065 MigrationState *s = migrate_get_current(); 1066 migrate_set_error(s, err); 1067 if (s->state == MIGRATION_STATUS_SETUP || 1068 s->state == MIGRATION_STATUS_ACTIVE) { 1069 migrate_set_state(&s->state, s->state, 1070 MIGRATION_STATUS_FAILED); 1071 } 1072 } 1073 1074 for (i = 0; i < migrate_multifd_channels(); i++) { 1075 MultiFDRecvParams *p = &multifd_recv_state->params[i]; 1076 1077 /* 1078 * The migration thread and channels interact differently 1079 * depending on the presence of packets. 1080 */ 1081 if (multifd_use_packets()) { 1082 /* 1083 * The channel receives as long as there are packets. When 1084 * packets end (i.e. MULTIFD_FLAG_SYNC is reached), the 1085 * channel waits for the migration thread to sync. If the 1086 * sync never happens, do it here. 1087 */ 1088 qemu_sem_post(&p->sem_sync); 1089 } else { 1090 /* 1091 * The channel waits for the migration thread to give it 1092 * work. When the migration thread runs out of work, it 1093 * releases the channel and waits for any pending work to 1094 * finish. If we reach here (e.g. due to error) before the 1095 * work runs out, release the channel. 1096 */ 1097 qemu_sem_post(&p->sem); 1098 } 1099 1100 /* 1101 * We could arrive here for two reasons: 1102 * - normal quit, i.e. everything went fine, just finished 1103 * - error quit: We close the channels so the channel threads 1104 * finish the qio_channel_read_all_eof() 1105 */ 1106 if (p->c) { 1107 qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL); 1108 } 1109 } 1110 } 1111 1112 void multifd_recv_shutdown(void) 1113 { 1114 if (migrate_multifd()) { 1115 multifd_recv_terminate_threads(NULL); 1116 } 1117 } 1118 1119 static void multifd_recv_cleanup_channel(MultiFDRecvParams *p) 1120 { 1121 migration_ioc_unregister_yank(p->c); 1122 object_unref(OBJECT(p->c)); 1123 p->c = NULL; 1124 qemu_mutex_destroy(&p->mutex); 1125 qemu_sem_destroy(&p->sem_sync); 1126 qemu_sem_destroy(&p->sem); 1127 g_free(p->data); 1128 p->data = NULL; 1129 g_free(p->name); 1130 p->name = NULL; 1131 p->packet_len = 0; 1132 g_free(p->packet); 1133 p->packet = NULL; 1134 g_clear_pointer(&p->packet_dev_state, g_free); 1135 g_free(p->normal); 1136 p->normal = NULL; 1137 g_free(p->zero); 1138 p->zero = NULL; 1139 multifd_recv_state->ops->recv_cleanup(p); 1140 } 1141 1142 static void multifd_recv_cleanup_state(void) 1143 { 1144 qemu_sem_destroy(&multifd_recv_state->sem_sync); 1145 g_free(multifd_recv_state->params); 1146 multifd_recv_state->params = NULL; 1147 g_free(multifd_recv_state->data); 1148 multifd_recv_state->data = NULL; 1149 g_free(multifd_recv_state); 1150 multifd_recv_state = NULL; 1151 } 1152 1153 void multifd_recv_cleanup(void) 1154 { 1155 int i; 1156 1157 if (!migrate_multifd()) { 1158 return; 1159 } 1160 multifd_recv_terminate_threads(NULL); 1161 for (i = 0; i < migrate_multifd_channels(); i++) { 1162 MultiFDRecvParams *p = &multifd_recv_state->params[i]; 1163 1164 if (p->thread_created) { 1165 qemu_thread_join(&p->thread); 1166 } 1167 } 1168 for (i = 0; i < migrate_multifd_channels(); i++) { 1169 multifd_recv_cleanup_channel(&multifd_recv_state->params[i]); 1170 } 1171 multifd_recv_cleanup_state(); 1172 } 1173 1174 void multifd_recv_sync_main(void) 1175 { 1176 int thread_count = migrate_multifd_channels(); 1177 bool file_based = !multifd_use_packets(); 1178 int i; 1179 1180 if (!migrate_multifd()) { 1181 return; 1182 } 1183 1184 /* 1185 * File-based channels don't use packets and therefore need to 1186 * wait for more work. Release them to start the sync. 1187 */ 1188 if (file_based) { 1189 for (i = 0; i < thread_count; i++) { 1190 MultiFDRecvParams *p = &multifd_recv_state->params[i]; 1191 1192 trace_multifd_recv_sync_main_signal(p->id); 1193 qemu_sem_post(&p->sem); 1194 } 1195 } 1196 1197 /* 1198 * Initiate the synchronization by waiting for all channels. 1199 * 1200 * For socket-based migration this means each channel has received 1201 * the SYNC packet on the stream. 1202 * 1203 * For file-based migration this means each channel is done with 1204 * the work (pending_job=false). 1205 */ 1206 for (i = 0; i < thread_count; i++) { 1207 trace_multifd_recv_sync_main_wait(i); 1208 qemu_sem_wait(&multifd_recv_state->sem_sync); 1209 } 1210 1211 if (file_based) { 1212 /* 1213 * For file-based loading is done in one iteration. We're 1214 * done. 1215 */ 1216 return; 1217 } 1218 1219 /* 1220 * Sync done. Release the channels for the next iteration. 1221 */ 1222 for (i = 0; i < thread_count; i++) { 1223 MultiFDRecvParams *p = &multifd_recv_state->params[i]; 1224 1225 WITH_QEMU_LOCK_GUARD(&p->mutex) { 1226 if (multifd_recv_state->packet_num < p->packet_num) { 1227 multifd_recv_state->packet_num = p->packet_num; 1228 } 1229 } 1230 trace_multifd_recv_sync_main_signal(p->id); 1231 qemu_sem_post(&p->sem_sync); 1232 } 1233 trace_multifd_recv_sync_main(multifd_recv_state->packet_num); 1234 } 1235 1236 static int multifd_device_state_recv(MultiFDRecvParams *p, Error **errp) 1237 { 1238 g_autofree char *dev_state_buf = NULL; 1239 int ret; 1240 1241 dev_state_buf = g_malloc(p->next_packet_size); 1242 1243 ret = qio_channel_read_all(p->c, dev_state_buf, p->next_packet_size, errp); 1244 if (ret != 0) { 1245 return ret; 1246 } 1247 1248 if (p->packet_dev_state->idstr[sizeof(p->packet_dev_state->idstr) - 1] 1249 != 0) { 1250 error_setg(errp, "unterminated multifd device state idstr"); 1251 return -1; 1252 } 1253 1254 if (!qemu_loadvm_load_state_buffer(p->packet_dev_state->idstr, 1255 p->packet_dev_state->instance_id, 1256 dev_state_buf, p->next_packet_size, 1257 errp)) { 1258 ret = -1; 1259 } 1260 1261 return ret; 1262 } 1263 1264 static void *multifd_recv_thread(void *opaque) 1265 { 1266 MigrationState *s = migrate_get_current(); 1267 MultiFDRecvParams *p = opaque; 1268 Error *local_err = NULL; 1269 bool use_packets = multifd_use_packets(); 1270 int ret; 1271 1272 trace_multifd_recv_thread_start(p->id); 1273 rcu_register_thread(); 1274 1275 if (!s->multifd_clean_tls_termination) { 1276 p->read_flags = QIO_CHANNEL_READ_FLAG_RELAXED_EOF; 1277 } 1278 1279 while (true) { 1280 MultiFDPacketHdr_t hdr; 1281 uint32_t flags = 0; 1282 bool is_device_state = false; 1283 bool has_data = false; 1284 uint8_t *pkt_buf; 1285 size_t pkt_len; 1286 1287 p->normal_num = 0; 1288 1289 if (use_packets) { 1290 struct iovec iov = { 1291 .iov_base = (void *)&hdr, 1292 .iov_len = sizeof(hdr) 1293 }; 1294 1295 if (multifd_recv_should_exit()) { 1296 break; 1297 } 1298 1299 ret = qio_channel_readv_full_all_eof(p->c, &iov, 1, NULL, NULL, 1300 p->read_flags, &local_err); 1301 if (!ret) { 1302 /* EOF */ 1303 assert(!local_err); 1304 break; 1305 } 1306 1307 if (ret == -1) { 1308 break; 1309 } 1310 1311 ret = multifd_recv_unfill_packet_header(p, &hdr, &local_err); 1312 if (ret) { 1313 break; 1314 } 1315 1316 is_device_state = p->flags & MULTIFD_FLAG_DEVICE_STATE; 1317 if (is_device_state) { 1318 pkt_buf = (uint8_t *)p->packet_dev_state + sizeof(hdr); 1319 pkt_len = sizeof(*p->packet_dev_state) - sizeof(hdr); 1320 } else { 1321 pkt_buf = (uint8_t *)p->packet + sizeof(hdr); 1322 pkt_len = p->packet_len - sizeof(hdr); 1323 } 1324 1325 ret = qio_channel_read_all_eof(p->c, (char *)pkt_buf, pkt_len, 1326 &local_err); 1327 if (!ret) { 1328 /* EOF */ 1329 error_setg(&local_err, "multifd: unexpected EOF after packet header"); 1330 break; 1331 } 1332 1333 if (ret == -1) { 1334 break; 1335 } 1336 1337 qemu_mutex_lock(&p->mutex); 1338 ret = multifd_recv_unfill_packet(p, &local_err); 1339 if (ret) { 1340 qemu_mutex_unlock(&p->mutex); 1341 break; 1342 } 1343 1344 flags = p->flags; 1345 /* recv methods don't know how to handle the SYNC flag */ 1346 p->flags &= ~MULTIFD_FLAG_SYNC; 1347 1348 if (is_device_state) { 1349 has_data = p->next_packet_size > 0; 1350 } else { 1351 /* 1352 * Even if it's a SYNC packet, this needs to be set 1353 * because older QEMUs (<9.0) still send data along with 1354 * the SYNC packet. 1355 */ 1356 has_data = p->normal_num || p->zero_num; 1357 } 1358 1359 qemu_mutex_unlock(&p->mutex); 1360 } else { 1361 /* 1362 * No packets, so we need to wait for the vmstate code to 1363 * give us work. 1364 */ 1365 qemu_sem_wait(&p->sem); 1366 1367 if (multifd_recv_should_exit()) { 1368 break; 1369 } 1370 1371 /* pairs with qatomic_store_release() at multifd_recv() */ 1372 if (!qatomic_load_acquire(&p->pending_job)) { 1373 /* 1374 * Migration thread did not send work, this is 1375 * equivalent to pending_sync on the sending 1376 * side. Post sem_sync to notify we reached this 1377 * point. 1378 */ 1379 qemu_sem_post(&multifd_recv_state->sem_sync); 1380 continue; 1381 } 1382 1383 has_data = !!p->data->size; 1384 } 1385 1386 if (has_data) { 1387 /* 1388 * multifd thread should not be active and receive data 1389 * when migration is in the Postcopy phase. Two threads 1390 * writing the same memory area could easily corrupt 1391 * the guest state. 1392 */ 1393 assert(!migration_in_postcopy()); 1394 if (is_device_state) { 1395 assert(use_packets); 1396 ret = multifd_device_state_recv(p, &local_err); 1397 } else { 1398 ret = multifd_recv_state->ops->recv(p, &local_err); 1399 } 1400 if (ret != 0) { 1401 break; 1402 } 1403 } else if (is_device_state) { 1404 error_setg(&local_err, 1405 "multifd: received empty device state packet"); 1406 break; 1407 } 1408 1409 if (use_packets) { 1410 if (flags & MULTIFD_FLAG_SYNC) { 1411 if (is_device_state) { 1412 error_setg(&local_err, 1413 "multifd: received SYNC device state packet"); 1414 break; 1415 } 1416 1417 qemu_sem_post(&multifd_recv_state->sem_sync); 1418 qemu_sem_wait(&p->sem_sync); 1419 } 1420 } else { 1421 p->data->size = 0; 1422 /* 1423 * Order data->size update before clearing 1424 * pending_job. Pairs with smp_mb_acquire() at 1425 * multifd_recv(). 1426 */ 1427 qatomic_store_release(&p->pending_job, false); 1428 } 1429 } 1430 1431 if (local_err) { 1432 multifd_recv_terminate_threads(local_err); 1433 error_free(local_err); 1434 } 1435 1436 rcu_unregister_thread(); 1437 trace_multifd_recv_thread_end(p->id, p->packets_recved); 1438 1439 return NULL; 1440 } 1441 1442 int multifd_recv_setup(Error **errp) 1443 { 1444 int thread_count; 1445 uint32_t page_count = multifd_ram_page_count(); 1446 bool use_packets = multifd_use_packets(); 1447 uint8_t i; 1448 1449 /* 1450 * Return successfully if multiFD recv state is already initialised 1451 * or multiFD is not enabled. 1452 */ 1453 if (multifd_recv_state || !migrate_multifd()) { 1454 return 0; 1455 } 1456 1457 thread_count = migrate_multifd_channels(); 1458 multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state)); 1459 multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count); 1460 1461 multifd_recv_state->data = g_new0(MultiFDRecvData, 1); 1462 multifd_recv_state->data->size = 0; 1463 1464 qatomic_set(&multifd_recv_state->count, 0); 1465 qatomic_set(&multifd_recv_state->exiting, 0); 1466 qemu_sem_init(&multifd_recv_state->sem_sync, 0); 1467 multifd_recv_state->ops = multifd_ops[migrate_multifd_compression()]; 1468 1469 for (i = 0; i < thread_count; i++) { 1470 MultiFDRecvParams *p = &multifd_recv_state->params[i]; 1471 1472 qemu_mutex_init(&p->mutex); 1473 qemu_sem_init(&p->sem_sync, 0); 1474 qemu_sem_init(&p->sem, 0); 1475 p->pending_job = false; 1476 p->id = i; 1477 1478 p->data = g_new0(MultiFDRecvData, 1); 1479 p->data->size = 0; 1480 1481 if (use_packets) { 1482 p->packet_len = sizeof(MultiFDPacket_t) 1483 + sizeof(uint64_t) * page_count; 1484 p->packet = g_malloc0(p->packet_len); 1485 p->packet_dev_state = g_malloc0(sizeof(*p->packet_dev_state)); 1486 } 1487 p->name = g_strdup_printf(MIGRATION_THREAD_DST_MULTIFD, i); 1488 p->normal = g_new0(ram_addr_t, page_count); 1489 p->zero = g_new0(ram_addr_t, page_count); 1490 } 1491 1492 for (i = 0; i < thread_count; i++) { 1493 MultiFDRecvParams *p = &multifd_recv_state->params[i]; 1494 int ret; 1495 1496 ret = multifd_recv_state->ops->recv_setup(p, errp); 1497 if (ret) { 1498 return ret; 1499 } 1500 } 1501 return 0; 1502 } 1503 1504 bool multifd_recv_all_channels_created(void) 1505 { 1506 int thread_count = migrate_multifd_channels(); 1507 1508 if (!migrate_multifd()) { 1509 return true; 1510 } 1511 1512 if (!multifd_recv_state) { 1513 /* Called before any connections created */ 1514 return false; 1515 } 1516 1517 return thread_count == qatomic_read(&multifd_recv_state->count); 1518 } 1519 1520 /* 1521 * Try to receive all multifd channels to get ready for the migration. 1522 * Sets @errp when failing to receive the current channel. 1523 */ 1524 void multifd_recv_new_channel(QIOChannel *ioc, Error **errp) 1525 { 1526 MultiFDRecvParams *p; 1527 Error *local_err = NULL; 1528 bool use_packets = multifd_use_packets(); 1529 int id; 1530 1531 if (use_packets) { 1532 id = multifd_recv_initial_packet(ioc, &local_err); 1533 if (id < 0) { 1534 multifd_recv_terminate_threads(local_err); 1535 error_propagate_prepend(errp, local_err, 1536 "failed to receive packet" 1537 " via multifd channel %d: ", 1538 qatomic_read(&multifd_recv_state->count)); 1539 return; 1540 } 1541 trace_multifd_recv_new_channel(id); 1542 } else { 1543 id = qatomic_read(&multifd_recv_state->count); 1544 } 1545 1546 p = &multifd_recv_state->params[id]; 1547 if (p->c != NULL) { 1548 error_setg(&local_err, "multifd: received id '%d' already setup'", 1549 id); 1550 multifd_recv_terminate_threads(local_err); 1551 error_propagate(errp, local_err); 1552 return; 1553 } 1554 p->c = ioc; 1555 object_ref(OBJECT(ioc)); 1556 1557 p->thread_created = true; 1558 qemu_thread_create(&p->thread, p->name, multifd_recv_thread, p, 1559 QEMU_THREAD_JOINABLE); 1560 qatomic_inc(&multifd_recv_state->count); 1561 } 1562