1 /* 2 * Multifd common code 3 * 4 * Copyright (c) 2019-2020 Red Hat Inc 5 * 6 * Authors: 7 * Juan Quintela <quintela@redhat.com> 8 * 9 * This work is licensed under the terms of the GNU GPL, version 2 or later. 10 * See the COPYING file in the top-level directory. 11 */ 12 13 #include "qemu/osdep.h" 14 #include "qemu/cutils.h" 15 #include "qemu/iov.h" 16 #include "qemu/rcu.h" 17 #include "exec/target_page.h" 18 #include "system/system.h" 19 #include "system/ramblock.h" 20 #include "qemu/error-report.h" 21 #include "qapi/error.h" 22 #include "file.h" 23 #include "migration/misc.h" 24 #include "migration.h" 25 #include "migration-stats.h" 26 #include "savevm.h" 27 #include "socket.h" 28 #include "tls.h" 29 #include "qemu-file.h" 30 #include "trace.h" 31 #include "multifd.h" 32 #include "threadinfo.h" 33 #include "options.h" 34 #include "qemu/yank.h" 35 #include "io/channel-file.h" 36 #include "io/channel-socket.h" 37 #include "yank_functions.h" 38 39 typedef struct { 40 uint32_t magic; 41 uint32_t version; 42 unsigned char uuid[16]; /* QemuUUID */ 43 uint8_t id; 44 uint8_t unused1[7]; /* Reserved for future use */ 45 uint64_t unused2[4]; /* Reserved for future use */ 46 } __attribute__((packed)) MultiFDInit_t; 47 48 struct { 49 MultiFDSendParams *params; 50 51 /* multifd_send() body is not thread safe, needs serialization */ 52 QemuMutex multifd_send_mutex; 53 54 /* 55 * Global number of generated multifd packets. 56 * 57 * Note that we used 'uintptr_t' because it'll naturally support atomic 58 * operations on both 32bit / 64 bits hosts. It means on 32bit systems 59 * multifd will overflow the packet_num easier, but that should be 60 * fine. 61 * 62 * Another option is to use QEMU's Stat64 then it'll be 64 bits on all 63 * hosts, however so far it does not support atomic fetch_add() yet. 64 * Make it easy for now. 65 */ 66 uintptr_t packet_num; 67 /* 68 * Synchronization point past which no more channels will be 69 * created. 70 */ 71 QemuSemaphore channels_created; 72 /* send channels ready */ 73 QemuSemaphore channels_ready; 74 /* 75 * Have we already run terminate threads. There is a race when it 76 * happens that we got one error while we are exiting. 77 * We will use atomic operations. Only valid values are 0 and 1. 78 */ 79 int exiting; 80 /* multifd ops */ 81 const MultiFDMethods *ops; 82 } *multifd_send_state; 83 84 struct { 85 MultiFDRecvParams *params; 86 MultiFDRecvData *data; 87 /* number of created threads */ 88 int count; 89 /* 90 * This is always posted by the recv threads, the migration thread 91 * uses it to wait for recv threads to finish assigned tasks. 92 */ 93 QemuSemaphore sem_sync; 94 /* global number of generated multifd packets */ 95 uint64_t packet_num; 96 int exiting; 97 /* multifd ops */ 98 const MultiFDMethods *ops; 99 } *multifd_recv_state; 100 101 MultiFDSendData *multifd_send_data_alloc(void) 102 { 103 MultiFDSendData *new = g_new0(MultiFDSendData, 1); 104 105 multifd_ram_payload_alloc(&new->u.ram); 106 /* Device state allocates its payload on-demand */ 107 108 return new; 109 } 110 111 void multifd_send_data_clear(MultiFDSendData *data) 112 { 113 if (multifd_payload_empty(data)) { 114 return; 115 } 116 117 switch (data->type) { 118 case MULTIFD_PAYLOAD_DEVICE_STATE: 119 multifd_send_data_clear_device_state(&data->u.device_state); 120 break; 121 default: 122 /* Nothing to do */ 123 break; 124 } 125 126 data->type = MULTIFD_PAYLOAD_NONE; 127 } 128 129 void multifd_send_data_free(MultiFDSendData *data) 130 { 131 if (!data) { 132 return; 133 } 134 135 /* This also free's device state payload */ 136 multifd_send_data_clear(data); 137 138 multifd_ram_payload_free(&data->u.ram); 139 140 g_free(data); 141 } 142 143 static bool multifd_use_packets(void) 144 { 145 return !migrate_mapped_ram(); 146 } 147 148 void multifd_send_channel_created(void) 149 { 150 qemu_sem_post(&multifd_send_state->channels_created); 151 } 152 153 static const MultiFDMethods *multifd_ops[MULTIFD_COMPRESSION__MAX] = {}; 154 155 void multifd_register_ops(int method, const MultiFDMethods *ops) 156 { 157 assert(0 <= method && method < MULTIFD_COMPRESSION__MAX); 158 assert(!multifd_ops[method]); 159 multifd_ops[method] = ops; 160 } 161 162 static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp) 163 { 164 MultiFDInit_t msg = {}; 165 size_t size = sizeof(msg); 166 int ret; 167 168 msg.magic = cpu_to_be32(MULTIFD_MAGIC); 169 msg.version = cpu_to_be32(MULTIFD_VERSION); 170 msg.id = p->id; 171 memcpy(msg.uuid, &qemu_uuid.data, sizeof(msg.uuid)); 172 173 ret = qio_channel_write_all(p->c, (char *)&msg, size, errp); 174 if (ret != 0) { 175 return -1; 176 } 177 stat64_add(&mig_stats.multifd_bytes, size); 178 return 0; 179 } 180 181 static int multifd_recv_initial_packet(QIOChannel *c, Error **errp) 182 { 183 MultiFDInit_t msg; 184 int ret; 185 186 ret = qio_channel_read_all(c, (char *)&msg, sizeof(msg), errp); 187 if (ret != 0) { 188 return -1; 189 } 190 191 msg.magic = be32_to_cpu(msg.magic); 192 msg.version = be32_to_cpu(msg.version); 193 194 if (msg.magic != MULTIFD_MAGIC) { 195 error_setg(errp, "multifd: received packet magic %x " 196 "expected %x", msg.magic, MULTIFD_MAGIC); 197 return -1; 198 } 199 200 if (msg.version != MULTIFD_VERSION) { 201 error_setg(errp, "multifd: received packet version %u " 202 "expected %u", msg.version, MULTIFD_VERSION); 203 return -1; 204 } 205 206 if (memcmp(msg.uuid, &qemu_uuid, sizeof(qemu_uuid))) { 207 char *uuid = qemu_uuid_unparse_strdup(&qemu_uuid); 208 char *msg_uuid = qemu_uuid_unparse_strdup((const QemuUUID *)msg.uuid); 209 210 error_setg(errp, "multifd: received uuid '%s' and expected " 211 "uuid '%s' for channel %hhd", msg_uuid, uuid, msg.id); 212 g_free(uuid); 213 g_free(msg_uuid); 214 return -1; 215 } 216 217 if (msg.id > migrate_multifd_channels()) { 218 error_setg(errp, "multifd: received channel id %u is greater than " 219 "number of channels %u", msg.id, migrate_multifd_channels()); 220 return -1; 221 } 222 223 return msg.id; 224 } 225 226 /* Fills a RAM multifd packet */ 227 void multifd_send_fill_packet(MultiFDSendParams *p) 228 { 229 MultiFDPacket_t *packet = p->packet; 230 uint64_t packet_num; 231 bool sync_packet = p->flags & MULTIFD_FLAG_SYNC; 232 233 memset(packet, 0, p->packet_len); 234 235 packet->hdr.magic = cpu_to_be32(MULTIFD_MAGIC); 236 packet->hdr.version = cpu_to_be32(MULTIFD_VERSION); 237 238 packet->hdr.flags = cpu_to_be32(p->flags); 239 packet->next_packet_size = cpu_to_be32(p->next_packet_size); 240 241 packet_num = qatomic_fetch_inc(&multifd_send_state->packet_num); 242 packet->packet_num = cpu_to_be64(packet_num); 243 244 p->packets_sent++; 245 246 if (!sync_packet) { 247 multifd_ram_fill_packet(p); 248 } 249 250 trace_multifd_send_fill(p->id, packet_num, 251 p->flags, p->next_packet_size); 252 } 253 254 static int multifd_recv_unfill_packet_header(MultiFDRecvParams *p, 255 const MultiFDPacketHdr_t *hdr, 256 Error **errp) 257 { 258 uint32_t magic = be32_to_cpu(hdr->magic); 259 uint32_t version = be32_to_cpu(hdr->version); 260 261 if (magic != MULTIFD_MAGIC) { 262 error_setg(errp, "multifd: received packet magic %x, expected %x", 263 magic, MULTIFD_MAGIC); 264 return -1; 265 } 266 267 if (version != MULTIFD_VERSION) { 268 error_setg(errp, "multifd: received packet version %u, expected %u", 269 version, MULTIFD_VERSION); 270 return -1; 271 } 272 273 p->flags = be32_to_cpu(hdr->flags); 274 275 return 0; 276 } 277 278 static int multifd_recv_unfill_packet_device_state(MultiFDRecvParams *p, 279 Error **errp) 280 { 281 MultiFDPacketDeviceState_t *packet = p->packet_dev_state; 282 283 packet->instance_id = be32_to_cpu(packet->instance_id); 284 p->next_packet_size = be32_to_cpu(packet->next_packet_size); 285 286 return 0; 287 } 288 289 static int multifd_recv_unfill_packet_ram(MultiFDRecvParams *p, Error **errp) 290 { 291 const MultiFDPacket_t *packet = p->packet; 292 int ret = 0; 293 294 p->next_packet_size = be32_to_cpu(packet->next_packet_size); 295 p->packet_num = be64_to_cpu(packet->packet_num); 296 297 /* Always unfill, old QEMUs (<9.0) send data along with SYNC */ 298 ret = multifd_ram_unfill_packet(p, errp); 299 300 trace_multifd_recv_unfill(p->id, p->packet_num, p->flags, 301 p->next_packet_size); 302 303 return ret; 304 } 305 306 static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp) 307 { 308 p->packets_recved++; 309 310 if (p->flags & MULTIFD_FLAG_DEVICE_STATE) { 311 return multifd_recv_unfill_packet_device_state(p, errp); 312 } 313 314 return multifd_recv_unfill_packet_ram(p, errp); 315 } 316 317 static bool multifd_send_should_exit(void) 318 { 319 return qatomic_read(&multifd_send_state->exiting); 320 } 321 322 static bool multifd_recv_should_exit(void) 323 { 324 return qatomic_read(&multifd_recv_state->exiting); 325 } 326 327 /* 328 * The migration thread can wait on either of the two semaphores. This 329 * function can be used to kick the main thread out of waiting on either of 330 * them. Should mostly only be called when something wrong happened with 331 * the current multifd send thread. 332 */ 333 static void multifd_send_kick_main(MultiFDSendParams *p) 334 { 335 qemu_sem_post(&p->sem_sync); 336 qemu_sem_post(&multifd_send_state->channels_ready); 337 } 338 339 /* 340 * multifd_send() works by exchanging the MultiFDSendData object 341 * provided by the caller with an unused MultiFDSendData object from 342 * the next channel that is found to be idle. 343 * 344 * The channel owns the data until it finishes transmitting and the 345 * caller owns the empty object until it fills it with data and calls 346 * this function again. No locking necessary. 347 * 348 * Switching is safe because both the migration thread and the channel 349 * thread have barriers in place to serialize access. 350 * 351 * Returns true if succeed, false otherwise. 352 */ 353 bool multifd_send(MultiFDSendData **send_data) 354 { 355 int i; 356 static int next_channel; 357 MultiFDSendParams *p = NULL; /* make happy gcc */ 358 MultiFDSendData *tmp; 359 360 if (multifd_send_should_exit()) { 361 return false; 362 } 363 364 QEMU_LOCK_GUARD(&multifd_send_state->multifd_send_mutex); 365 366 /* We wait here, until at least one channel is ready */ 367 qemu_sem_wait(&multifd_send_state->channels_ready); 368 369 /* 370 * next_channel can remain from a previous migration that was 371 * using more channels, so ensure it doesn't overflow if the 372 * limit is lower now. 373 */ 374 next_channel %= migrate_multifd_channels(); 375 for (i = next_channel;; i = (i + 1) % migrate_multifd_channels()) { 376 if (multifd_send_should_exit()) { 377 return false; 378 } 379 p = &multifd_send_state->params[i]; 380 /* 381 * Lockless read to p->pending_job is safe, because only multifd 382 * sender thread can clear it. 383 */ 384 if (qatomic_read(&p->pending_job) == false) { 385 next_channel = (i + 1) % migrate_multifd_channels(); 386 break; 387 } 388 } 389 390 /* 391 * Make sure we read p->pending_job before all the rest. Pairs with 392 * qatomic_store_release() in multifd_send_thread(). 393 */ 394 smp_mb_acquire(); 395 396 assert(multifd_payload_empty(p->data)); 397 398 /* 399 * Swap the pointers. The channel gets the client data for 400 * transferring and the client gets back an unused data slot. 401 */ 402 tmp = *send_data; 403 *send_data = p->data; 404 p->data = tmp; 405 406 /* 407 * Making sure p->data is setup before marking pending_job=true. Pairs 408 * with the qatomic_load_acquire() in multifd_send_thread(). 409 */ 410 qatomic_store_release(&p->pending_job, true); 411 qemu_sem_post(&p->sem); 412 413 return true; 414 } 415 416 /* Multifd send side hit an error; remember it and prepare to quit */ 417 static void multifd_send_set_error(Error *err) 418 { 419 /* 420 * We don't want to exit each threads twice. Depending on where 421 * we get the error, or if there are two independent errors in two 422 * threads at the same time, we can end calling this function 423 * twice. 424 */ 425 if (qatomic_xchg(&multifd_send_state->exiting, 1)) { 426 return; 427 } 428 429 if (err) { 430 MigrationState *s = migrate_get_current(); 431 migrate_set_error(s, err); 432 if (s->state == MIGRATION_STATUS_SETUP || 433 s->state == MIGRATION_STATUS_PRE_SWITCHOVER || 434 s->state == MIGRATION_STATUS_DEVICE || 435 s->state == MIGRATION_STATUS_ACTIVE) { 436 migrate_set_state(&s->state, s->state, 437 MIGRATION_STATUS_FAILED); 438 } 439 } 440 } 441 442 static void multifd_send_terminate_threads(void) 443 { 444 int i; 445 446 trace_multifd_send_terminate_threads(); 447 448 /* 449 * Tell everyone we're quitting. No xchg() needed here; we simply 450 * always set it. 451 */ 452 qatomic_set(&multifd_send_state->exiting, 1); 453 454 /* 455 * Firstly, kick all threads out; no matter whether they are just idle, 456 * or blocked in an IO system call. 457 */ 458 for (i = 0; i < migrate_multifd_channels(); i++) { 459 MultiFDSendParams *p = &multifd_send_state->params[i]; 460 461 qemu_sem_post(&p->sem); 462 if (p->c) { 463 qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL); 464 } 465 } 466 467 /* 468 * Finally recycle all the threads. 469 */ 470 for (i = 0; i < migrate_multifd_channels(); i++) { 471 MultiFDSendParams *p = &multifd_send_state->params[i]; 472 473 if (p->tls_thread_created) { 474 qemu_thread_join(&p->tls_thread); 475 } 476 477 if (p->thread_created) { 478 qemu_thread_join(&p->thread); 479 } 480 } 481 } 482 483 static bool multifd_send_cleanup_channel(MultiFDSendParams *p, Error **errp) 484 { 485 if (p->c) { 486 migration_ioc_unregister_yank(p->c); 487 /* 488 * The object_unref() cannot guarantee the fd will always be 489 * released because finalize() of the iochannel is only 490 * triggered on the last reference and it's not guaranteed 491 * that we always hold the last refcount when reaching here. 492 * 493 * Closing the fd explicitly has the benefit that if there is any 494 * registered I/O handler callbacks on such fd, that will get a 495 * POLLNVAL event and will further trigger the cleanup to finally 496 * release the IOC. 497 * 498 * FIXME: It should logically be guaranteed that all multifd 499 * channels have no I/O handler callback registered when reaching 500 * here, because migration thread will wait for all multifd channel 501 * establishments to complete during setup. Since 502 * migration_cleanup() will be scheduled in main thread too, all 503 * previous callbacks should guarantee to be completed when 504 * reaching here. See multifd_send_state.channels_created and its 505 * usage. In the future, we could replace this with an assert 506 * making sure we're the last reference, or simply drop it if above 507 * is more clear to be justified. 508 */ 509 qio_channel_close(p->c, &error_abort); 510 object_unref(OBJECT(p->c)); 511 p->c = NULL; 512 } 513 qemu_sem_destroy(&p->sem); 514 qemu_sem_destroy(&p->sem_sync); 515 g_free(p->name); 516 p->name = NULL; 517 g_clear_pointer(&p->data, multifd_send_data_free); 518 p->packet_len = 0; 519 g_clear_pointer(&p->packet_device_state, g_free); 520 g_free(p->packet); 521 p->packet = NULL; 522 multifd_send_state->ops->send_cleanup(p, errp); 523 assert(!p->iov); 524 525 return *errp == NULL; 526 } 527 528 static void multifd_send_cleanup_state(void) 529 { 530 file_cleanup_outgoing_migration(); 531 socket_cleanup_outgoing_migration(); 532 multifd_device_state_send_cleanup(); 533 qemu_sem_destroy(&multifd_send_state->channels_created); 534 qemu_sem_destroy(&multifd_send_state->channels_ready); 535 qemu_mutex_destroy(&multifd_send_state->multifd_send_mutex); 536 g_free(multifd_send_state->params); 537 multifd_send_state->params = NULL; 538 g_free(multifd_send_state); 539 multifd_send_state = NULL; 540 } 541 542 void multifd_send_shutdown(void) 543 { 544 int i; 545 546 if (!migrate_multifd()) { 547 return; 548 } 549 550 for (i = 0; i < migrate_multifd_channels(); i++) { 551 MultiFDSendParams *p = &multifd_send_state->params[i]; 552 553 /* thread_created implies the TLS handshake has succeeded */ 554 if (p->tls_thread_created && p->thread_created) { 555 Error *local_err = NULL; 556 /* 557 * The destination expects the TLS session to always be 558 * properly terminated. This helps to detect a premature 559 * termination in the middle of the stream. Note that 560 * older QEMUs always break the connection on the source 561 * and the destination always sees 562 * GNUTLS_E_PREMATURE_TERMINATION. 563 */ 564 migration_tls_channel_end(p->c, &local_err); 565 566 /* 567 * The above can return an error in case the migration has 568 * already failed. If the migration succeeded, errors are 569 * not expected but there's no need to kill the source. 570 */ 571 if (local_err && !migration_has_failed(migrate_get_current())) { 572 warn_report( 573 "multifd_send_%d: Failed to terminate TLS connection: %s", 574 p->id, error_get_pretty(local_err)); 575 break; 576 } 577 } 578 } 579 580 multifd_send_terminate_threads(); 581 582 for (i = 0; i < migrate_multifd_channels(); i++) { 583 MultiFDSendParams *p = &multifd_send_state->params[i]; 584 Error *local_err = NULL; 585 586 if (!multifd_send_cleanup_channel(p, &local_err)) { 587 migrate_set_error(migrate_get_current(), local_err); 588 error_free(local_err); 589 } 590 } 591 592 multifd_send_cleanup_state(); 593 } 594 595 static int multifd_zero_copy_flush(QIOChannel *c) 596 { 597 int ret; 598 Error *err = NULL; 599 600 ret = qio_channel_flush(c, &err); 601 if (ret < 0) { 602 error_report_err(err); 603 return -1; 604 } 605 if (ret == 1) { 606 stat64_add(&mig_stats.dirty_sync_missed_zero_copy, 1); 607 } 608 609 return ret; 610 } 611 612 int multifd_send_sync_main(MultiFDSyncReq req) 613 { 614 int i; 615 bool flush_zero_copy; 616 617 assert(req != MULTIFD_SYNC_NONE); 618 619 flush_zero_copy = migrate_zero_copy_send(); 620 621 for (i = 0; i < migrate_multifd_channels(); i++) { 622 MultiFDSendParams *p = &multifd_send_state->params[i]; 623 624 if (multifd_send_should_exit()) { 625 return -1; 626 } 627 628 trace_multifd_send_sync_main_signal(p->id); 629 630 /* 631 * We should be the only user so far, so not possible to be set by 632 * others concurrently. 633 */ 634 assert(qatomic_read(&p->pending_sync) == MULTIFD_SYNC_NONE); 635 qatomic_set(&p->pending_sync, req); 636 qemu_sem_post(&p->sem); 637 } 638 for (i = 0; i < migrate_multifd_channels(); i++) { 639 MultiFDSendParams *p = &multifd_send_state->params[i]; 640 641 if (multifd_send_should_exit()) { 642 return -1; 643 } 644 645 qemu_sem_wait(&multifd_send_state->channels_ready); 646 trace_multifd_send_sync_main_wait(p->id); 647 qemu_sem_wait(&p->sem_sync); 648 649 if (flush_zero_copy && p->c && (multifd_zero_copy_flush(p->c) < 0)) { 650 return -1; 651 } 652 } 653 trace_multifd_send_sync_main(multifd_send_state->packet_num); 654 655 return 0; 656 } 657 658 static void *multifd_send_thread(void *opaque) 659 { 660 MultiFDSendParams *p = opaque; 661 MigrationThread *thread = NULL; 662 Error *local_err = NULL; 663 int ret = 0; 664 bool use_packets = multifd_use_packets(); 665 666 thread = migration_threads_add(p->name, qemu_get_thread_id()); 667 668 trace_multifd_send_thread_start(p->id); 669 rcu_register_thread(); 670 671 if (use_packets) { 672 if (multifd_send_initial_packet(p, &local_err) < 0) { 673 ret = -1; 674 goto out; 675 } 676 } 677 678 while (true) { 679 qemu_sem_post(&multifd_send_state->channels_ready); 680 qemu_sem_wait(&p->sem); 681 682 if (multifd_send_should_exit()) { 683 break; 684 } 685 686 /* 687 * Read pending_job flag before p->data. Pairs with the 688 * qatomic_store_release() in multifd_send(). 689 */ 690 if (qatomic_load_acquire(&p->pending_job)) { 691 bool is_device_state = multifd_payload_device_state(p->data); 692 size_t total_size; 693 694 p->flags = 0; 695 p->iovs_num = 0; 696 assert(!multifd_payload_empty(p->data)); 697 698 if (is_device_state) { 699 multifd_device_state_send_prepare(p); 700 } else { 701 ret = multifd_send_state->ops->send_prepare(p, &local_err); 702 if (ret != 0) { 703 break; 704 } 705 } 706 707 /* 708 * The packet header in the zerocopy RAM case is accounted for 709 * in multifd_nocomp_send_prepare() - where it is actually 710 * being sent. 711 */ 712 total_size = iov_size(p->iov, p->iovs_num); 713 714 if (migrate_mapped_ram()) { 715 assert(!is_device_state); 716 717 ret = file_write_ramblock_iov(p->c, p->iov, p->iovs_num, 718 &p->data->u.ram, &local_err); 719 } else { 720 ret = qio_channel_writev_full_all(p->c, p->iov, p->iovs_num, 721 NULL, 0, p->write_flags, 722 &local_err); 723 } 724 725 if (ret != 0) { 726 break; 727 } 728 729 stat64_add(&mig_stats.multifd_bytes, total_size); 730 731 p->next_packet_size = 0; 732 multifd_send_data_clear(p->data); 733 734 /* 735 * Making sure p->data is published before saying "we're 736 * free". Pairs with the smp_mb_acquire() in 737 * multifd_send(). 738 */ 739 qatomic_store_release(&p->pending_job, false); 740 } else { 741 MultiFDSyncReq req = qatomic_read(&p->pending_sync); 742 743 /* 744 * If not a normal job, must be a sync request. Note that 745 * pending_sync is a standalone flag (unlike pending_job), so 746 * it doesn't require explicit memory barriers. 747 */ 748 assert(req != MULTIFD_SYNC_NONE); 749 750 /* Only push the SYNC message if it involves a remote sync */ 751 if (req == MULTIFD_SYNC_ALL) { 752 p->flags = MULTIFD_FLAG_SYNC; 753 multifd_send_fill_packet(p); 754 ret = qio_channel_write_all(p->c, (void *)p->packet, 755 p->packet_len, &local_err); 756 if (ret != 0) { 757 break; 758 } 759 /* p->next_packet_size will always be zero for a SYNC packet */ 760 stat64_add(&mig_stats.multifd_bytes, p->packet_len); 761 } 762 763 qatomic_set(&p->pending_sync, MULTIFD_SYNC_NONE); 764 qemu_sem_post(&p->sem_sync); 765 } 766 } 767 768 out: 769 if (ret) { 770 assert(local_err); 771 trace_multifd_send_error(p->id); 772 multifd_send_set_error(local_err); 773 multifd_send_kick_main(p); 774 error_free(local_err); 775 } 776 777 rcu_unregister_thread(); 778 migration_threads_remove(thread); 779 trace_multifd_send_thread_end(p->id, p->packets_sent); 780 781 return NULL; 782 } 783 784 static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque); 785 786 typedef struct { 787 MultiFDSendParams *p; 788 QIOChannelTLS *tioc; 789 } MultiFDTLSThreadArgs; 790 791 static void *multifd_tls_handshake_thread(void *opaque) 792 { 793 MultiFDTLSThreadArgs *args = opaque; 794 795 qio_channel_tls_handshake(args->tioc, 796 multifd_new_send_channel_async, 797 args->p, 798 NULL, 799 NULL); 800 g_free(args); 801 802 return NULL; 803 } 804 805 static bool multifd_tls_channel_connect(MultiFDSendParams *p, 806 QIOChannel *ioc, 807 Error **errp) 808 { 809 MigrationState *s = migrate_get_current(); 810 const char *hostname = s->hostname; 811 MultiFDTLSThreadArgs *args; 812 QIOChannelTLS *tioc; 813 814 tioc = migration_tls_client_create(ioc, hostname, errp); 815 if (!tioc) { 816 return false; 817 } 818 819 /* 820 * Ownership of the socket channel now transfers to the newly 821 * created TLS channel, which has already taken a reference. 822 */ 823 object_unref(OBJECT(ioc)); 824 trace_multifd_tls_outgoing_handshake_start(ioc, tioc, hostname); 825 qio_channel_set_name(QIO_CHANNEL(tioc), "multifd-tls-outgoing"); 826 827 args = g_new0(MultiFDTLSThreadArgs, 1); 828 args->tioc = tioc; 829 args->p = p; 830 831 p->tls_thread_created = true; 832 qemu_thread_create(&p->tls_thread, MIGRATION_THREAD_SRC_TLS, 833 multifd_tls_handshake_thread, args, 834 QEMU_THREAD_JOINABLE); 835 return true; 836 } 837 838 void multifd_channel_connect(MultiFDSendParams *p, QIOChannel *ioc) 839 { 840 qio_channel_set_delay(ioc, false); 841 842 migration_ioc_register_yank(ioc); 843 /* Setup p->c only if the channel is completely setup */ 844 p->c = ioc; 845 846 p->thread_created = true; 847 qemu_thread_create(&p->thread, p->name, multifd_send_thread, p, 848 QEMU_THREAD_JOINABLE); 849 } 850 851 /* 852 * When TLS is enabled this function is called once to establish the 853 * TLS connection and a second time after the TLS handshake to create 854 * the multifd channel. Without TLS it goes straight into the channel 855 * creation. 856 */ 857 static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque) 858 { 859 MultiFDSendParams *p = opaque; 860 QIOChannel *ioc = QIO_CHANNEL(qio_task_get_source(task)); 861 Error *local_err = NULL; 862 bool ret; 863 864 trace_multifd_new_send_channel_async(p->id); 865 866 if (qio_task_propagate_error(task, &local_err)) { 867 ret = false; 868 goto out; 869 } 870 871 trace_multifd_set_outgoing_channel(ioc, object_get_typename(OBJECT(ioc)), 872 migrate_get_current()->hostname); 873 874 if (migrate_channel_requires_tls_upgrade(ioc)) { 875 ret = multifd_tls_channel_connect(p, ioc, &local_err); 876 if (ret) { 877 return; 878 } 879 } else { 880 multifd_channel_connect(p, ioc); 881 ret = true; 882 } 883 884 out: 885 /* 886 * Here we're not interested whether creation succeeded, only that 887 * it happened at all. 888 */ 889 multifd_send_channel_created(); 890 891 if (ret) { 892 return; 893 } 894 895 trace_multifd_new_send_channel_async_error(p->id, local_err); 896 multifd_send_set_error(local_err); 897 /* 898 * For error cases (TLS or non-TLS), IO channel is always freed here 899 * rather than when cleanup multifd: since p->c is not set, multifd 900 * cleanup code doesn't even know its existence. 901 */ 902 object_unref(OBJECT(ioc)); 903 error_free(local_err); 904 } 905 906 static bool multifd_new_send_channel_create(gpointer opaque, Error **errp) 907 { 908 if (!multifd_use_packets()) { 909 return file_send_channel_create(opaque, errp); 910 } 911 912 socket_send_channel_create(multifd_new_send_channel_async, opaque); 913 return true; 914 } 915 916 bool multifd_send_setup(void) 917 { 918 MigrationState *s = migrate_get_current(); 919 int thread_count, ret = 0; 920 uint32_t page_count = multifd_ram_page_count(); 921 bool use_packets = multifd_use_packets(); 922 uint8_t i; 923 924 if (!migrate_multifd()) { 925 return true; 926 } 927 928 thread_count = migrate_multifd_channels(); 929 multifd_send_state = g_malloc0(sizeof(*multifd_send_state)); 930 multifd_send_state->params = g_new0(MultiFDSendParams, thread_count); 931 qemu_mutex_init(&multifd_send_state->multifd_send_mutex); 932 qemu_sem_init(&multifd_send_state->channels_created, 0); 933 qemu_sem_init(&multifd_send_state->channels_ready, 0); 934 qatomic_set(&multifd_send_state->exiting, 0); 935 multifd_send_state->ops = multifd_ops[migrate_multifd_compression()]; 936 937 for (i = 0; i < thread_count; i++) { 938 MultiFDSendParams *p = &multifd_send_state->params[i]; 939 Error *local_err = NULL; 940 941 qemu_sem_init(&p->sem, 0); 942 qemu_sem_init(&p->sem_sync, 0); 943 p->id = i; 944 p->data = multifd_send_data_alloc(); 945 946 if (use_packets) { 947 p->packet_len = sizeof(MultiFDPacket_t) 948 + sizeof(uint64_t) * page_count; 949 p->packet = g_malloc0(p->packet_len); 950 p->packet_device_state = g_malloc0(sizeof(*p->packet_device_state)); 951 p->packet_device_state->hdr.magic = cpu_to_be32(MULTIFD_MAGIC); 952 p->packet_device_state->hdr.version = cpu_to_be32(MULTIFD_VERSION); 953 } 954 p->name = g_strdup_printf(MIGRATION_THREAD_SRC_MULTIFD, i); 955 p->write_flags = 0; 956 957 if (!multifd_new_send_channel_create(p, &local_err)) { 958 migrate_set_error(s, local_err); 959 ret = -1; 960 } 961 } 962 963 /* 964 * Wait until channel creation has started for all channels. The 965 * creation can still fail, but no more channels will be created 966 * past this point. 967 */ 968 for (i = 0; i < thread_count; i++) { 969 qemu_sem_wait(&multifd_send_state->channels_created); 970 } 971 972 if (ret) { 973 goto err; 974 } 975 976 for (i = 0; i < thread_count; i++) { 977 MultiFDSendParams *p = &multifd_send_state->params[i]; 978 Error *local_err = NULL; 979 980 ret = multifd_send_state->ops->send_setup(p, &local_err); 981 if (ret) { 982 migrate_set_error(s, local_err); 983 goto err; 984 } 985 assert(p->iov); 986 } 987 988 multifd_device_state_send_setup(); 989 990 return true; 991 992 err: 993 migrate_set_state(&s->state, MIGRATION_STATUS_SETUP, 994 MIGRATION_STATUS_FAILED); 995 return false; 996 } 997 998 bool multifd_recv(void) 999 { 1000 int i; 1001 static int next_recv_channel; 1002 MultiFDRecvParams *p = NULL; 1003 MultiFDRecvData *data = multifd_recv_state->data; 1004 1005 /* 1006 * next_channel can remain from a previous migration that was 1007 * using more channels, so ensure it doesn't overflow if the 1008 * limit is lower now. 1009 */ 1010 next_recv_channel %= migrate_multifd_channels(); 1011 for (i = next_recv_channel;; i = (i + 1) % migrate_multifd_channels()) { 1012 if (multifd_recv_should_exit()) { 1013 return false; 1014 } 1015 1016 p = &multifd_recv_state->params[i]; 1017 1018 if (qatomic_read(&p->pending_job) == false) { 1019 next_recv_channel = (i + 1) % migrate_multifd_channels(); 1020 break; 1021 } 1022 } 1023 1024 /* 1025 * Order pending_job read before manipulating p->data below. Pairs 1026 * with qatomic_store_release() at multifd_recv_thread(). 1027 */ 1028 smp_mb_acquire(); 1029 1030 assert(!p->data->size); 1031 multifd_recv_state->data = p->data; 1032 p->data = data; 1033 1034 /* 1035 * Order p->data update before setting pending_job. Pairs with 1036 * qatomic_load_acquire() at multifd_recv_thread(). 1037 */ 1038 qatomic_store_release(&p->pending_job, true); 1039 qemu_sem_post(&p->sem); 1040 1041 return true; 1042 } 1043 1044 MultiFDRecvData *multifd_get_recv_data(void) 1045 { 1046 return multifd_recv_state->data; 1047 } 1048 1049 static void multifd_recv_terminate_threads(Error *err) 1050 { 1051 int i; 1052 1053 trace_multifd_recv_terminate_threads(err != NULL); 1054 1055 if (qatomic_xchg(&multifd_recv_state->exiting, 1)) { 1056 return; 1057 } 1058 1059 if (err) { 1060 MigrationState *s = migrate_get_current(); 1061 migrate_set_error(s, err); 1062 if (s->state == MIGRATION_STATUS_SETUP || 1063 s->state == MIGRATION_STATUS_ACTIVE) { 1064 migrate_set_state(&s->state, s->state, 1065 MIGRATION_STATUS_FAILED); 1066 } 1067 } 1068 1069 for (i = 0; i < migrate_multifd_channels(); i++) { 1070 MultiFDRecvParams *p = &multifd_recv_state->params[i]; 1071 1072 /* 1073 * The migration thread and channels interact differently 1074 * depending on the presence of packets. 1075 */ 1076 if (multifd_use_packets()) { 1077 /* 1078 * The channel receives as long as there are packets. When 1079 * packets end (i.e. MULTIFD_FLAG_SYNC is reached), the 1080 * channel waits for the migration thread to sync. If the 1081 * sync never happens, do it here. 1082 */ 1083 qemu_sem_post(&p->sem_sync); 1084 } else { 1085 /* 1086 * The channel waits for the migration thread to give it 1087 * work. When the migration thread runs out of work, it 1088 * releases the channel and waits for any pending work to 1089 * finish. If we reach here (e.g. due to error) before the 1090 * work runs out, release the channel. 1091 */ 1092 qemu_sem_post(&p->sem); 1093 } 1094 1095 /* 1096 * We could arrive here for two reasons: 1097 * - normal quit, i.e. everything went fine, just finished 1098 * - error quit: We close the channels so the channel threads 1099 * finish the qio_channel_read_all_eof() 1100 */ 1101 if (p->c) { 1102 qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL); 1103 } 1104 } 1105 } 1106 1107 void multifd_recv_shutdown(void) 1108 { 1109 if (migrate_multifd()) { 1110 multifd_recv_terminate_threads(NULL); 1111 } 1112 } 1113 1114 static void multifd_recv_cleanup_channel(MultiFDRecvParams *p) 1115 { 1116 migration_ioc_unregister_yank(p->c); 1117 object_unref(OBJECT(p->c)); 1118 p->c = NULL; 1119 qemu_mutex_destroy(&p->mutex); 1120 qemu_sem_destroy(&p->sem_sync); 1121 qemu_sem_destroy(&p->sem); 1122 g_free(p->data); 1123 p->data = NULL; 1124 g_free(p->name); 1125 p->name = NULL; 1126 p->packet_len = 0; 1127 g_free(p->packet); 1128 p->packet = NULL; 1129 g_clear_pointer(&p->packet_dev_state, g_free); 1130 g_free(p->normal); 1131 p->normal = NULL; 1132 g_free(p->zero); 1133 p->zero = NULL; 1134 multifd_recv_state->ops->recv_cleanup(p); 1135 } 1136 1137 static void multifd_recv_cleanup_state(void) 1138 { 1139 qemu_sem_destroy(&multifd_recv_state->sem_sync); 1140 g_free(multifd_recv_state->params); 1141 multifd_recv_state->params = NULL; 1142 g_free(multifd_recv_state->data); 1143 multifd_recv_state->data = NULL; 1144 g_free(multifd_recv_state); 1145 multifd_recv_state = NULL; 1146 } 1147 1148 void multifd_recv_cleanup(void) 1149 { 1150 int i; 1151 1152 if (!migrate_multifd()) { 1153 return; 1154 } 1155 multifd_recv_terminate_threads(NULL); 1156 for (i = 0; i < migrate_multifd_channels(); i++) { 1157 MultiFDRecvParams *p = &multifd_recv_state->params[i]; 1158 1159 if (p->thread_created) { 1160 qemu_thread_join(&p->thread); 1161 } 1162 } 1163 for (i = 0; i < migrate_multifd_channels(); i++) { 1164 multifd_recv_cleanup_channel(&multifd_recv_state->params[i]); 1165 } 1166 multifd_recv_cleanup_state(); 1167 } 1168 1169 void multifd_recv_sync_main(void) 1170 { 1171 int thread_count = migrate_multifd_channels(); 1172 bool file_based = !multifd_use_packets(); 1173 int i; 1174 1175 if (!migrate_multifd()) { 1176 return; 1177 } 1178 1179 /* 1180 * File-based channels don't use packets and therefore need to 1181 * wait for more work. Release them to start the sync. 1182 */ 1183 if (file_based) { 1184 for (i = 0; i < thread_count; i++) { 1185 MultiFDRecvParams *p = &multifd_recv_state->params[i]; 1186 1187 trace_multifd_recv_sync_main_signal(p->id); 1188 qemu_sem_post(&p->sem); 1189 } 1190 } 1191 1192 /* 1193 * Initiate the synchronization by waiting for all channels. 1194 * 1195 * For socket-based migration this means each channel has received 1196 * the SYNC packet on the stream. 1197 * 1198 * For file-based migration this means each channel is done with 1199 * the work (pending_job=false). 1200 */ 1201 for (i = 0; i < thread_count; i++) { 1202 trace_multifd_recv_sync_main_wait(i); 1203 qemu_sem_wait(&multifd_recv_state->sem_sync); 1204 } 1205 1206 if (file_based) { 1207 /* 1208 * For file-based loading is done in one iteration. We're 1209 * done. 1210 */ 1211 return; 1212 } 1213 1214 /* 1215 * Sync done. Release the channels for the next iteration. 1216 */ 1217 for (i = 0; i < thread_count; i++) { 1218 MultiFDRecvParams *p = &multifd_recv_state->params[i]; 1219 1220 WITH_QEMU_LOCK_GUARD(&p->mutex) { 1221 if (multifd_recv_state->packet_num < p->packet_num) { 1222 multifd_recv_state->packet_num = p->packet_num; 1223 } 1224 } 1225 trace_multifd_recv_sync_main_signal(p->id); 1226 qemu_sem_post(&p->sem_sync); 1227 } 1228 trace_multifd_recv_sync_main(multifd_recv_state->packet_num); 1229 } 1230 1231 static int multifd_device_state_recv(MultiFDRecvParams *p, Error **errp) 1232 { 1233 g_autofree char *dev_state_buf = NULL; 1234 int ret; 1235 1236 dev_state_buf = g_malloc(p->next_packet_size); 1237 1238 ret = qio_channel_read_all(p->c, dev_state_buf, p->next_packet_size, errp); 1239 if (ret != 0) { 1240 return ret; 1241 } 1242 1243 if (p->packet_dev_state->idstr[sizeof(p->packet_dev_state->idstr) - 1] 1244 != 0) { 1245 error_setg(errp, "unterminated multifd device state idstr"); 1246 return -1; 1247 } 1248 1249 if (!qemu_loadvm_load_state_buffer(p->packet_dev_state->idstr, 1250 p->packet_dev_state->instance_id, 1251 dev_state_buf, p->next_packet_size, 1252 errp)) { 1253 ret = -1; 1254 } 1255 1256 return ret; 1257 } 1258 1259 static void *multifd_recv_thread(void *opaque) 1260 { 1261 MigrationState *s = migrate_get_current(); 1262 MultiFDRecvParams *p = opaque; 1263 Error *local_err = NULL; 1264 bool use_packets = multifd_use_packets(); 1265 int ret; 1266 1267 trace_multifd_recv_thread_start(p->id); 1268 rcu_register_thread(); 1269 1270 if (!s->multifd_clean_tls_termination) { 1271 p->read_flags = QIO_CHANNEL_READ_FLAG_RELAXED_EOF; 1272 } 1273 1274 while (true) { 1275 MultiFDPacketHdr_t hdr; 1276 uint32_t flags = 0; 1277 bool is_device_state = false; 1278 bool has_data = false; 1279 uint8_t *pkt_buf; 1280 size_t pkt_len; 1281 1282 p->normal_num = 0; 1283 1284 if (use_packets) { 1285 struct iovec iov = { 1286 .iov_base = (void *)&hdr, 1287 .iov_len = sizeof(hdr) 1288 }; 1289 1290 if (multifd_recv_should_exit()) { 1291 break; 1292 } 1293 1294 ret = qio_channel_readv_full_all_eof(p->c, &iov, 1, NULL, NULL, 1295 p->read_flags, &local_err); 1296 if (!ret) { 1297 /* EOF */ 1298 assert(!local_err); 1299 break; 1300 } 1301 1302 if (ret == -1) { 1303 break; 1304 } 1305 1306 ret = multifd_recv_unfill_packet_header(p, &hdr, &local_err); 1307 if (ret) { 1308 break; 1309 } 1310 1311 is_device_state = p->flags & MULTIFD_FLAG_DEVICE_STATE; 1312 if (is_device_state) { 1313 pkt_buf = (uint8_t *)p->packet_dev_state + sizeof(hdr); 1314 pkt_len = sizeof(*p->packet_dev_state) - sizeof(hdr); 1315 } else { 1316 pkt_buf = (uint8_t *)p->packet + sizeof(hdr); 1317 pkt_len = p->packet_len - sizeof(hdr); 1318 } 1319 1320 ret = qio_channel_read_all_eof(p->c, (char *)pkt_buf, pkt_len, 1321 &local_err); 1322 if (!ret) { 1323 /* EOF */ 1324 error_setg(&local_err, "multifd: unexpected EOF after packet header"); 1325 break; 1326 } 1327 1328 if (ret == -1) { 1329 break; 1330 } 1331 1332 qemu_mutex_lock(&p->mutex); 1333 ret = multifd_recv_unfill_packet(p, &local_err); 1334 if (ret) { 1335 qemu_mutex_unlock(&p->mutex); 1336 break; 1337 } 1338 1339 flags = p->flags; 1340 /* recv methods don't know how to handle the SYNC flag */ 1341 p->flags &= ~MULTIFD_FLAG_SYNC; 1342 1343 if (is_device_state) { 1344 has_data = p->next_packet_size > 0; 1345 } else { 1346 /* 1347 * Even if it's a SYNC packet, this needs to be set 1348 * because older QEMUs (<9.0) still send data along with 1349 * the SYNC packet. 1350 */ 1351 has_data = p->normal_num || p->zero_num; 1352 } 1353 1354 qemu_mutex_unlock(&p->mutex); 1355 } else { 1356 /* 1357 * No packets, so we need to wait for the vmstate code to 1358 * give us work. 1359 */ 1360 qemu_sem_wait(&p->sem); 1361 1362 if (multifd_recv_should_exit()) { 1363 break; 1364 } 1365 1366 /* pairs with qatomic_store_release() at multifd_recv() */ 1367 if (!qatomic_load_acquire(&p->pending_job)) { 1368 /* 1369 * Migration thread did not send work, this is 1370 * equivalent to pending_sync on the sending 1371 * side. Post sem_sync to notify we reached this 1372 * point. 1373 */ 1374 qemu_sem_post(&multifd_recv_state->sem_sync); 1375 continue; 1376 } 1377 1378 has_data = !!p->data->size; 1379 } 1380 1381 if (has_data) { 1382 if (is_device_state) { 1383 assert(use_packets); 1384 ret = multifd_device_state_recv(p, &local_err); 1385 } else { 1386 ret = multifd_recv_state->ops->recv(p, &local_err); 1387 } 1388 if (ret != 0) { 1389 break; 1390 } 1391 } else if (is_device_state) { 1392 error_setg(&local_err, 1393 "multifd: received empty device state packet"); 1394 break; 1395 } 1396 1397 if (use_packets) { 1398 if (flags & MULTIFD_FLAG_SYNC) { 1399 if (is_device_state) { 1400 error_setg(&local_err, 1401 "multifd: received SYNC device state packet"); 1402 break; 1403 } 1404 1405 qemu_sem_post(&multifd_recv_state->sem_sync); 1406 qemu_sem_wait(&p->sem_sync); 1407 } 1408 } else { 1409 p->data->size = 0; 1410 /* 1411 * Order data->size update before clearing 1412 * pending_job. Pairs with smp_mb_acquire() at 1413 * multifd_recv(). 1414 */ 1415 qatomic_store_release(&p->pending_job, false); 1416 } 1417 } 1418 1419 if (local_err) { 1420 multifd_recv_terminate_threads(local_err); 1421 error_free(local_err); 1422 } 1423 1424 rcu_unregister_thread(); 1425 trace_multifd_recv_thread_end(p->id, p->packets_recved); 1426 1427 return NULL; 1428 } 1429 1430 int multifd_recv_setup(Error **errp) 1431 { 1432 int thread_count; 1433 uint32_t page_count = multifd_ram_page_count(); 1434 bool use_packets = multifd_use_packets(); 1435 uint8_t i; 1436 1437 /* 1438 * Return successfully if multiFD recv state is already initialised 1439 * or multiFD is not enabled. 1440 */ 1441 if (multifd_recv_state || !migrate_multifd()) { 1442 return 0; 1443 } 1444 1445 thread_count = migrate_multifd_channels(); 1446 multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state)); 1447 multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count); 1448 1449 multifd_recv_state->data = g_new0(MultiFDRecvData, 1); 1450 multifd_recv_state->data->size = 0; 1451 1452 qatomic_set(&multifd_recv_state->count, 0); 1453 qatomic_set(&multifd_recv_state->exiting, 0); 1454 qemu_sem_init(&multifd_recv_state->sem_sync, 0); 1455 multifd_recv_state->ops = multifd_ops[migrate_multifd_compression()]; 1456 1457 for (i = 0; i < thread_count; i++) { 1458 MultiFDRecvParams *p = &multifd_recv_state->params[i]; 1459 1460 qemu_mutex_init(&p->mutex); 1461 qemu_sem_init(&p->sem_sync, 0); 1462 qemu_sem_init(&p->sem, 0); 1463 p->pending_job = false; 1464 p->id = i; 1465 1466 p->data = g_new0(MultiFDRecvData, 1); 1467 p->data->size = 0; 1468 1469 if (use_packets) { 1470 p->packet_len = sizeof(MultiFDPacket_t) 1471 + sizeof(uint64_t) * page_count; 1472 p->packet = g_malloc0(p->packet_len); 1473 p->packet_dev_state = g_malloc0(sizeof(*p->packet_dev_state)); 1474 } 1475 p->name = g_strdup_printf(MIGRATION_THREAD_DST_MULTIFD, i); 1476 p->normal = g_new0(ram_addr_t, page_count); 1477 p->zero = g_new0(ram_addr_t, page_count); 1478 } 1479 1480 for (i = 0; i < thread_count; i++) { 1481 MultiFDRecvParams *p = &multifd_recv_state->params[i]; 1482 int ret; 1483 1484 ret = multifd_recv_state->ops->recv_setup(p, errp); 1485 if (ret) { 1486 return ret; 1487 } 1488 } 1489 return 0; 1490 } 1491 1492 bool multifd_recv_all_channels_created(void) 1493 { 1494 int thread_count = migrate_multifd_channels(); 1495 1496 if (!migrate_multifd()) { 1497 return true; 1498 } 1499 1500 if (!multifd_recv_state) { 1501 /* Called before any connections created */ 1502 return false; 1503 } 1504 1505 return thread_count == qatomic_read(&multifd_recv_state->count); 1506 } 1507 1508 /* 1509 * Try to receive all multifd channels to get ready for the migration. 1510 * Sets @errp when failing to receive the current channel. 1511 */ 1512 void multifd_recv_new_channel(QIOChannel *ioc, Error **errp) 1513 { 1514 MultiFDRecvParams *p; 1515 Error *local_err = NULL; 1516 bool use_packets = multifd_use_packets(); 1517 int id; 1518 1519 if (use_packets) { 1520 id = multifd_recv_initial_packet(ioc, &local_err); 1521 if (id < 0) { 1522 multifd_recv_terminate_threads(local_err); 1523 error_propagate_prepend(errp, local_err, 1524 "failed to receive packet" 1525 " via multifd channel %d: ", 1526 qatomic_read(&multifd_recv_state->count)); 1527 return; 1528 } 1529 trace_multifd_recv_new_channel(id); 1530 } else { 1531 id = qatomic_read(&multifd_recv_state->count); 1532 } 1533 1534 p = &multifd_recv_state->params[id]; 1535 if (p->c != NULL) { 1536 error_setg(&local_err, "multifd: received id '%d' already setup'", 1537 id); 1538 multifd_recv_terminate_threads(local_err); 1539 error_propagate(errp, local_err); 1540 return; 1541 } 1542 p->c = ioc; 1543 object_ref(OBJECT(ioc)); 1544 1545 p->thread_created = true; 1546 qemu_thread_create(&p->thread, p->name, multifd_recv_thread, p, 1547 QEMU_THREAD_JOINABLE); 1548 qatomic_inc(&multifd_recv_state->count); 1549 } 1550