Lines Matching +full:data +full:- +full:channel
4 * Copyright (c) 2019-2020 Red Hat Inc
10 * See the COPYING file in the top-level directory.
20 #include "qemu/error-report.h"
25 #include "migration-stats.h"
29 #include "qemu-file.h"
35 #include "io/channel-file.h"
36 #include "io/channel-socket.h"
86 MultiFDRecvData *data; member
105 multifd_ram_payload_alloc(&new->u.ram); in multifd_send_data_alloc()
106 /* Device state allocates its payload on-demand */ in multifd_send_data_alloc()
111 void multifd_send_data_clear(MultiFDSendData *data) in multifd_send_data_clear() argument
113 if (multifd_payload_empty(data)) { in multifd_send_data_clear()
117 switch (data->type) { in multifd_send_data_clear()
119 multifd_send_data_clear_device_state(&data->u.device_state); in multifd_send_data_clear()
126 data->type = MULTIFD_PAYLOAD_NONE; in multifd_send_data_clear()
129 void multifd_send_data_free(MultiFDSendData *data) in multifd_send_data_free() argument
131 if (!data) { in multifd_send_data_free()
136 multifd_send_data_clear(data); in multifd_send_data_free()
138 multifd_ram_payload_free(&data->u.ram); in multifd_send_data_free()
140 g_free(data); in multifd_send_data_free()
150 qemu_sem_post(&multifd_send_state->channels_created); in multifd_send_channel_created()
170 msg.id = p->id; in multifd_send_initial_packet()
171 memcpy(msg.uuid, &qemu_uuid.data, sizeof(msg.uuid)); in multifd_send_initial_packet()
173 ret = qio_channel_write_all(p->c, (char *)&msg, size, errp); in multifd_send_initial_packet()
175 return -1; in multifd_send_initial_packet()
188 return -1; in multifd_recv_initial_packet()
197 return -1; in multifd_recv_initial_packet()
203 return -1; in multifd_recv_initial_packet()
211 "uuid '%s' for channel %hhd", msg_uuid, uuid, msg.id); in multifd_recv_initial_packet()
214 return -1; in multifd_recv_initial_packet()
218 error_setg(errp, "multifd: received channel id %u is greater than " in multifd_recv_initial_packet()
220 return -1; in multifd_recv_initial_packet()
229 MultiFDPacket_t *packet = p->packet; in multifd_send_fill_packet()
231 bool sync_packet = p->flags & MULTIFD_FLAG_SYNC; in multifd_send_fill_packet()
233 memset(packet, 0, p->packet_len); in multifd_send_fill_packet()
235 packet->hdr.magic = cpu_to_be32(MULTIFD_MAGIC); in multifd_send_fill_packet()
236 packet->hdr.version = cpu_to_be32(MULTIFD_VERSION); in multifd_send_fill_packet()
238 packet->hdr.flags = cpu_to_be32(p->flags); in multifd_send_fill_packet()
239 packet->next_packet_size = cpu_to_be32(p->next_packet_size); in multifd_send_fill_packet()
241 packet_num = qatomic_fetch_inc(&multifd_send_state->packet_num); in multifd_send_fill_packet()
242 packet->packet_num = cpu_to_be64(packet_num); in multifd_send_fill_packet()
244 p->packets_sent++; in multifd_send_fill_packet()
250 trace_multifd_send_fill(p->id, packet_num, in multifd_send_fill_packet()
251 p->flags, p->next_packet_size); in multifd_send_fill_packet()
258 uint32_t magic = be32_to_cpu(hdr->magic); in multifd_recv_unfill_packet_header()
259 uint32_t version = be32_to_cpu(hdr->version); in multifd_recv_unfill_packet_header()
264 return -1; in multifd_recv_unfill_packet_header()
270 return -1; in multifd_recv_unfill_packet_header()
273 p->flags = be32_to_cpu(hdr->flags); in multifd_recv_unfill_packet_header()
281 MultiFDPacketDeviceState_t *packet = p->packet_dev_state; in multifd_recv_unfill_packet_device_state()
283 packet->instance_id = be32_to_cpu(packet->instance_id); in multifd_recv_unfill_packet_device_state()
284 p->next_packet_size = be32_to_cpu(packet->next_packet_size); in multifd_recv_unfill_packet_device_state()
291 const MultiFDPacket_t *packet = p->packet; in multifd_recv_unfill_packet_ram()
294 p->next_packet_size = be32_to_cpu(packet->next_packet_size); in multifd_recv_unfill_packet_ram()
295 p->packet_num = be64_to_cpu(packet->packet_num); in multifd_recv_unfill_packet_ram()
297 /* Always unfill, old QEMUs (<9.0) send data along with SYNC */ in multifd_recv_unfill_packet_ram()
300 trace_multifd_recv_unfill(p->id, p->packet_num, p->flags, in multifd_recv_unfill_packet_ram()
301 p->next_packet_size); in multifd_recv_unfill_packet_ram()
308 p->packets_recved++; in multifd_recv_unfill_packet()
310 if (p->flags & MULTIFD_FLAG_DEVICE_STATE) { in multifd_recv_unfill_packet()
319 return qatomic_read(&multifd_send_state->exiting); in multifd_send_should_exit()
324 return qatomic_read(&multifd_recv_state->exiting); in multifd_recv_should_exit()
335 qemu_sem_post(&p->sem_sync); in multifd_send_kick_main()
336 qemu_sem_post(&multifd_send_state->channels_ready); in multifd_send_kick_main()
342 * the next channel that is found to be idle.
344 * The channel owns the data until it finishes transmitting and the
345 * caller owns the empty object until it fills it with data and calls
348 * Switching is safe because both the migration thread and the channel
364 QEMU_LOCK_GUARD(&multifd_send_state->multifd_send_mutex); in multifd_send()
366 /* We wait here, until at least one channel is ready */ in multifd_send()
367 qemu_sem_wait(&multifd_send_state->channels_ready); in multifd_send()
379 p = &multifd_send_state->params[i]; in multifd_send()
381 * Lockless read to p->pending_job is safe, because only multifd in multifd_send()
384 if (qatomic_read(&p->pending_job) == false) { in multifd_send()
391 * Make sure we read p->pending_job before all the rest. Pairs with in multifd_send()
396 assert(multifd_payload_empty(p->data)); in multifd_send()
399 * Swap the pointers. The channel gets the client data for in multifd_send()
400 * transferring and the client gets back an unused data slot. in multifd_send()
403 *send_data = p->data; in multifd_send()
404 p->data = tmp; in multifd_send()
407 * Making sure p->data is setup before marking pending_job=true. Pairs in multifd_send()
410 qatomic_store_release(&p->pending_job, true); in multifd_send()
411 qemu_sem_post(&p->sem); in multifd_send()
425 if (qatomic_xchg(&multifd_send_state->exiting, 1)) { in multifd_send_set_error()
432 if (s->state == MIGRATION_STATUS_SETUP || in multifd_send_set_error()
433 s->state == MIGRATION_STATUS_PRE_SWITCHOVER || in multifd_send_set_error()
434 s->state == MIGRATION_STATUS_DEVICE || in multifd_send_set_error()
435 s->state == MIGRATION_STATUS_ACTIVE) { in multifd_send_set_error()
436 migrate_set_state(&s->state, s->state, in multifd_send_set_error()
452 qatomic_set(&multifd_send_state->exiting, 1); in multifd_send_terminate_threads()
459 MultiFDSendParams *p = &multifd_send_state->params[i]; in multifd_send_terminate_threads()
461 qemu_sem_post(&p->sem); in multifd_send_terminate_threads()
462 if (p->c) { in multifd_send_terminate_threads()
463 qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL); in multifd_send_terminate_threads()
471 MultiFDSendParams *p = &multifd_send_state->params[i]; in multifd_send_terminate_threads()
473 if (p->tls_thread_created) { in multifd_send_terminate_threads()
474 qemu_thread_join(&p->tls_thread); in multifd_send_terminate_threads()
477 if (p->thread_created) { in multifd_send_terminate_threads()
478 qemu_thread_join(&p->thread); in multifd_send_terminate_threads()
485 if (p->c) { in multifd_send_cleanup_channel()
486 migration_ioc_unregister_yank(p->c); in multifd_send_cleanup_channel()
500 * here, because migration thread will wait for all multifd channel in multifd_send_cleanup_channel()
509 qio_channel_close(p->c, &error_abort); in multifd_send_cleanup_channel()
510 object_unref(OBJECT(p->c)); in multifd_send_cleanup_channel()
511 p->c = NULL; in multifd_send_cleanup_channel()
513 qemu_sem_destroy(&p->sem); in multifd_send_cleanup_channel()
514 qemu_sem_destroy(&p->sem_sync); in multifd_send_cleanup_channel()
515 g_free(p->name); in multifd_send_cleanup_channel()
516 p->name = NULL; in multifd_send_cleanup_channel()
517 g_clear_pointer(&p->data, multifd_send_data_free); in multifd_send_cleanup_channel()
518 p->packet_len = 0; in multifd_send_cleanup_channel()
519 g_clear_pointer(&p->packet_device_state, g_free); in multifd_send_cleanup_channel()
520 g_free(p->packet); in multifd_send_cleanup_channel()
521 p->packet = NULL; in multifd_send_cleanup_channel()
522 multifd_send_state->ops->send_cleanup(p, errp); in multifd_send_cleanup_channel()
523 assert(!p->iov); in multifd_send_cleanup_channel()
533 qemu_sem_destroy(&multifd_send_state->channels_created); in multifd_send_cleanup_state()
534 qemu_sem_destroy(&multifd_send_state->channels_ready); in multifd_send_cleanup_state()
535 qemu_mutex_destroy(&multifd_send_state->multifd_send_mutex); in multifd_send_cleanup_state()
536 g_free(multifd_send_state->params); in multifd_send_cleanup_state()
537 multifd_send_state->params = NULL; in multifd_send_cleanup_state()
551 MultiFDSendParams *p = &multifd_send_state->params[i]; in multifd_send_shutdown()
554 if (p->tls_thread_created && p->thread_created) { in multifd_send_shutdown()
564 migration_tls_channel_end(p->c, &local_err); in multifd_send_shutdown()
574 p->id, error_get_pretty(local_err)); in multifd_send_shutdown()
583 MultiFDSendParams *p = &multifd_send_state->params[i]; in multifd_send_shutdown()
603 return -1; in multifd_zero_copy_flush()
622 MultiFDSendParams *p = &multifd_send_state->params[i]; in multifd_send_sync_main()
625 return -1; in multifd_send_sync_main()
628 trace_multifd_send_sync_main_signal(p->id); in multifd_send_sync_main()
634 assert(qatomic_read(&p->pending_sync) == MULTIFD_SYNC_NONE); in multifd_send_sync_main()
635 qatomic_set(&p->pending_sync, req); in multifd_send_sync_main()
636 qemu_sem_post(&p->sem); in multifd_send_sync_main()
639 MultiFDSendParams *p = &multifd_send_state->params[i]; in multifd_send_sync_main()
642 return -1; in multifd_send_sync_main()
645 qemu_sem_wait(&multifd_send_state->channels_ready); in multifd_send_sync_main()
646 trace_multifd_send_sync_main_wait(p->id); in multifd_send_sync_main()
647 qemu_sem_wait(&p->sem_sync); in multifd_send_sync_main()
649 if (flush_zero_copy && p->c && (multifd_zero_copy_flush(p->c) < 0)) { in multifd_send_sync_main()
650 return -1; in multifd_send_sync_main()
653 trace_multifd_send_sync_main(multifd_send_state->packet_num); in multifd_send_sync_main()
666 thread = migration_threads_add(p->name, qemu_get_thread_id()); in multifd_send_thread()
668 trace_multifd_send_thread_start(p->id); in multifd_send_thread()
673 ret = -1; in multifd_send_thread()
679 qemu_sem_post(&multifd_send_state->channels_ready); in multifd_send_thread()
680 qemu_sem_wait(&p->sem); in multifd_send_thread()
687 * Read pending_job flag before p->data. Pairs with the in multifd_send_thread()
690 if (qatomic_load_acquire(&p->pending_job)) { in multifd_send_thread()
691 bool is_device_state = multifd_payload_device_state(p->data); in multifd_send_thread()
695 p->flags = 0; in multifd_send_thread()
696 p->iovs_num = 0; in multifd_send_thread()
697 assert(!multifd_payload_empty(p->data)); in multifd_send_thread()
705 ret = multifd_send_state->ops->send_prepare(p, &local_err); in multifd_send_thread()
713 * in multifd_nocomp_send_prepare() - where it is actually in multifd_send_thread()
716 total_size = iov_size(p->iov, p->iovs_num); in multifd_send_thread()
721 ret = file_write_ramblock_iov(p->c, p->iov, p->iovs_num, in multifd_send_thread()
722 &p->data->u.ram, &local_err); in multifd_send_thread()
724 ret = qio_channel_writev_full_all(p->c, p->iov, p->iovs_num, in multifd_send_thread()
726 p->write_flags & ~write_flags_masked, in multifd_send_thread()
736 p->next_packet_size = 0; in multifd_send_thread()
737 multifd_send_data_clear(p->data); in multifd_send_thread()
740 * Making sure p->data is published before saying "we're in multifd_send_thread()
744 qatomic_store_release(&p->pending_job, false); in multifd_send_thread()
746 MultiFDSyncReq req = qatomic_read(&p->pending_sync); in multifd_send_thread()
757 p->flags = MULTIFD_FLAG_SYNC; in multifd_send_thread()
759 ret = qio_channel_write_all(p->c, (void *)p->packet, in multifd_send_thread()
760 p->packet_len, &local_err); in multifd_send_thread()
764 /* p->next_packet_size will always be zero for a SYNC packet */ in multifd_send_thread()
765 stat64_add(&mig_stats.multifd_bytes, p->packet_len); in multifd_send_thread()
768 qatomic_set(&p->pending_sync, MULTIFD_SYNC_NONE); in multifd_send_thread()
769 qemu_sem_post(&p->sem_sync); in multifd_send_thread()
776 trace_multifd_send_error(p->id); in multifd_send_thread()
784 trace_multifd_send_thread_end(p->id, p->packets_sent); in multifd_send_thread()
800 qio_channel_tls_handshake(args->tioc, in multifd_tls_handshake_thread()
802 args->p, in multifd_tls_handshake_thread()
815 const char *hostname = s->hostname; in multifd_tls_channel_connect()
825 * Ownership of the socket channel now transfers to the newly in multifd_tls_channel_connect()
826 * created TLS channel, which has already taken a reference. in multifd_tls_channel_connect()
830 qio_channel_set_name(QIO_CHANNEL(tioc), "multifd-tls-outgoing"); in multifd_tls_channel_connect()
833 args->tioc = tioc; in multifd_tls_channel_connect()
834 args->p = p; in multifd_tls_channel_connect()
836 p->tls_thread_created = true; in multifd_tls_channel_connect()
837 qemu_thread_create(&p->tls_thread, MIGRATION_THREAD_SRC_TLS, in multifd_tls_channel_connect()
848 /* Setup p->c only if the channel is completely setup */ in multifd_channel_connect()
849 p->c = ioc; in multifd_channel_connect()
851 p->thread_created = true; in multifd_channel_connect()
852 qemu_thread_create(&p->thread, p->name, multifd_send_thread, p, in multifd_channel_connect()
859 * the multifd channel. Without TLS it goes straight into the channel
869 trace_multifd_new_send_channel_async(p->id); in multifd_new_send_channel_async()
877 migrate_get_current()->hostname); in multifd_new_send_channel_async()
900 trace_multifd_new_send_channel_async_error(p->id, local_err); in multifd_new_send_channel_async()
903 * For error cases (TLS or non-TLS), IO channel is always freed here in multifd_new_send_channel_async()
904 * rather than when cleanup multifd: since p->c is not set, multifd in multifd_new_send_channel_async()
935 multifd_send_state->params = g_new0(MultiFDSendParams, thread_count); in multifd_send_setup()
936 qemu_mutex_init(&multifd_send_state->multifd_send_mutex); in multifd_send_setup()
937 qemu_sem_init(&multifd_send_state->channels_created, 0); in multifd_send_setup()
938 qemu_sem_init(&multifd_send_state->channels_ready, 0); in multifd_send_setup()
939 qatomic_set(&multifd_send_state->exiting, 0); in multifd_send_setup()
940 multifd_send_state->ops = multifd_ops[migrate_multifd_compression()]; in multifd_send_setup()
943 MultiFDSendParams *p = &multifd_send_state->params[i]; in multifd_send_setup()
946 qemu_sem_init(&p->sem, 0); in multifd_send_setup()
947 qemu_sem_init(&p->sem_sync, 0); in multifd_send_setup()
948 p->id = i; in multifd_send_setup()
949 p->data = multifd_send_data_alloc(); in multifd_send_setup()
952 p->packet_len = sizeof(MultiFDPacket_t) in multifd_send_setup()
954 p->packet = g_malloc0(p->packet_len); in multifd_send_setup()
955 p->packet_device_state = g_malloc0(sizeof(*p->packet_device_state)); in multifd_send_setup()
956 p->packet_device_state->hdr.magic = cpu_to_be32(MULTIFD_MAGIC); in multifd_send_setup()
957 p->packet_device_state->hdr.version = cpu_to_be32(MULTIFD_VERSION); in multifd_send_setup()
959 p->name = g_strdup_printf(MIGRATION_THREAD_SRC_MULTIFD, i); in multifd_send_setup()
960 p->write_flags = 0; in multifd_send_setup()
964 ret = -1; in multifd_send_setup()
969 * Wait until channel creation has started for all channels. The in multifd_send_setup()
974 qemu_sem_wait(&multifd_send_state->channels_created); in multifd_send_setup()
982 MultiFDSendParams *p = &multifd_send_state->params[i]; in multifd_send_setup()
985 ret = multifd_send_state->ops->send_setup(p, &local_err); in multifd_send_setup()
990 assert(p->iov); in multifd_send_setup()
998 migrate_set_state(&s->state, MIGRATION_STATUS_SETUP, in multifd_send_setup()
1008 MultiFDRecvData *data = multifd_recv_state->data; in multifd_recv() local
1021 p = &multifd_recv_state->params[i]; in multifd_recv()
1023 if (qatomic_read(&p->pending_job) == false) { in multifd_recv()
1030 * Order pending_job read before manipulating p->data below. Pairs in multifd_recv()
1035 assert(!p->data->size); in multifd_recv()
1036 multifd_recv_state->data = p->data; in multifd_recv()
1037 p->data = data; in multifd_recv()
1040 * Order p->data update before setting pending_job. Pairs with in multifd_recv()
1043 qatomic_store_release(&p->pending_job, true); in multifd_recv()
1044 qemu_sem_post(&p->sem); in multifd_recv()
1051 return multifd_recv_state->data; in multifd_get_recv_data()
1060 if (qatomic_xchg(&multifd_recv_state->exiting, 1)) { in multifd_recv_terminate_threads()
1067 if (s->state == MIGRATION_STATUS_SETUP || in multifd_recv_terminate_threads()
1068 s->state == MIGRATION_STATUS_ACTIVE) { in multifd_recv_terminate_threads()
1069 migrate_set_state(&s->state, s->state, in multifd_recv_terminate_threads()
1075 MultiFDRecvParams *p = &multifd_recv_state->params[i]; in multifd_recv_terminate_threads()
1083 * The channel receives as long as there are packets. When in multifd_recv_terminate_threads()
1085 * channel waits for the migration thread to sync. If the in multifd_recv_terminate_threads()
1088 qemu_sem_post(&p->sem_sync); in multifd_recv_terminate_threads()
1091 * The channel waits for the migration thread to give it in multifd_recv_terminate_threads()
1093 * releases the channel and waits for any pending work to in multifd_recv_terminate_threads()
1095 * work runs out, release the channel. in multifd_recv_terminate_threads()
1097 qemu_sem_post(&p->sem); in multifd_recv_terminate_threads()
1102 * - normal quit, i.e. everything went fine, just finished in multifd_recv_terminate_threads()
1103 * - error quit: We close the channels so the channel threads in multifd_recv_terminate_threads()
1106 if (p->c) { in multifd_recv_terminate_threads()
1107 qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL); in multifd_recv_terminate_threads()
1121 migration_ioc_unregister_yank(p->c); in multifd_recv_cleanup_channel()
1122 object_unref(OBJECT(p->c)); in multifd_recv_cleanup_channel()
1123 p->c = NULL; in multifd_recv_cleanup_channel()
1124 qemu_mutex_destroy(&p->mutex); in multifd_recv_cleanup_channel()
1125 qemu_sem_destroy(&p->sem_sync); in multifd_recv_cleanup_channel()
1126 qemu_sem_destroy(&p->sem); in multifd_recv_cleanup_channel()
1127 g_free(p->data); in multifd_recv_cleanup_channel()
1128 p->data = NULL; in multifd_recv_cleanup_channel()
1129 g_free(p->name); in multifd_recv_cleanup_channel()
1130 p->name = NULL; in multifd_recv_cleanup_channel()
1131 p->packet_len = 0; in multifd_recv_cleanup_channel()
1132 g_free(p->packet); in multifd_recv_cleanup_channel()
1133 p->packet = NULL; in multifd_recv_cleanup_channel()
1134 g_clear_pointer(&p->packet_dev_state, g_free); in multifd_recv_cleanup_channel()
1135 g_free(p->normal); in multifd_recv_cleanup_channel()
1136 p->normal = NULL; in multifd_recv_cleanup_channel()
1137 g_free(p->zero); in multifd_recv_cleanup_channel()
1138 p->zero = NULL; in multifd_recv_cleanup_channel()
1139 multifd_recv_state->ops->recv_cleanup(p); in multifd_recv_cleanup_channel()
1144 qemu_sem_destroy(&multifd_recv_state->sem_sync); in multifd_recv_cleanup_state()
1145 g_free(multifd_recv_state->params); in multifd_recv_cleanup_state()
1146 multifd_recv_state->params = NULL; in multifd_recv_cleanup_state()
1147 g_free(multifd_recv_state->data); in multifd_recv_cleanup_state()
1148 multifd_recv_state->data = NULL; in multifd_recv_cleanup_state()
1162 MultiFDRecvParams *p = &multifd_recv_state->params[i]; in multifd_recv_cleanup()
1164 if (p->thread_created) { in multifd_recv_cleanup()
1165 qemu_thread_join(&p->thread); in multifd_recv_cleanup()
1169 multifd_recv_cleanup_channel(&multifd_recv_state->params[i]); in multifd_recv_cleanup()
1185 * File-based channels don't use packets and therefore need to in multifd_recv_sync_main()
1190 MultiFDRecvParams *p = &multifd_recv_state->params[i]; in multifd_recv_sync_main()
1192 trace_multifd_recv_sync_main_signal(p->id); in multifd_recv_sync_main()
1193 qemu_sem_post(&p->sem); in multifd_recv_sync_main()
1200 * For socket-based migration this means each channel has received in multifd_recv_sync_main()
1203 * For file-based migration this means each channel is done with in multifd_recv_sync_main()
1208 qemu_sem_wait(&multifd_recv_state->sem_sync); in multifd_recv_sync_main()
1213 * For file-based loading is done in one iteration. We're in multifd_recv_sync_main()
1223 MultiFDRecvParams *p = &multifd_recv_state->params[i]; in multifd_recv_sync_main()
1225 WITH_QEMU_LOCK_GUARD(&p->mutex) { in multifd_recv_sync_main()
1226 if (multifd_recv_state->packet_num < p->packet_num) { in multifd_recv_sync_main()
1227 multifd_recv_state->packet_num = p->packet_num; in multifd_recv_sync_main()
1230 trace_multifd_recv_sync_main_signal(p->id); in multifd_recv_sync_main()
1231 qemu_sem_post(&p->sem_sync); in multifd_recv_sync_main()
1233 trace_multifd_recv_sync_main(multifd_recv_state->packet_num); in multifd_recv_sync_main()
1241 dev_state_buf = g_malloc(p->next_packet_size); in multifd_device_state_recv()
1243 ret = qio_channel_read_all(p->c, dev_state_buf, p->next_packet_size, errp); in multifd_device_state_recv()
1248 if (p->packet_dev_state->idstr[sizeof(p->packet_dev_state->idstr) - 1] in multifd_device_state_recv()
1251 return -1; in multifd_device_state_recv()
1254 if (!qemu_loadvm_load_state_buffer(p->packet_dev_state->idstr, in multifd_device_state_recv()
1255 p->packet_dev_state->instance_id, in multifd_device_state_recv()
1256 dev_state_buf, p->next_packet_size, in multifd_device_state_recv()
1258 ret = -1; in multifd_device_state_recv()
1272 trace_multifd_recv_thread_start(p->id); in multifd_recv_thread()
1275 if (!s->multifd_clean_tls_termination) { in multifd_recv_thread()
1276 p->read_flags = QIO_CHANNEL_READ_FLAG_RELAXED_EOF; in multifd_recv_thread()
1287 p->normal_num = 0; in multifd_recv_thread()
1299 ret = qio_channel_readv_full_all_eof(p->c, &iov, 1, NULL, NULL, in multifd_recv_thread()
1300 p->read_flags, &local_err); in multifd_recv_thread()
1307 if (ret == -1) { in multifd_recv_thread()
1316 is_device_state = p->flags & MULTIFD_FLAG_DEVICE_STATE; in multifd_recv_thread()
1318 pkt_buf = (uint8_t *)p->packet_dev_state + sizeof(hdr); in multifd_recv_thread()
1319 pkt_len = sizeof(*p->packet_dev_state) - sizeof(hdr); in multifd_recv_thread()
1321 pkt_buf = (uint8_t *)p->packet + sizeof(hdr); in multifd_recv_thread()
1322 pkt_len = p->packet_len - sizeof(hdr); in multifd_recv_thread()
1325 ret = qio_channel_read_all_eof(p->c, (char *)pkt_buf, pkt_len, in multifd_recv_thread()
1333 if (ret == -1) { in multifd_recv_thread()
1337 qemu_mutex_lock(&p->mutex); in multifd_recv_thread()
1340 qemu_mutex_unlock(&p->mutex); in multifd_recv_thread()
1344 flags = p->flags; in multifd_recv_thread()
1346 p->flags &= ~MULTIFD_FLAG_SYNC; in multifd_recv_thread()
1349 has_data = p->next_packet_size > 0; in multifd_recv_thread()
1353 * because older QEMUs (<9.0) still send data along with in multifd_recv_thread()
1356 has_data = p->normal_num || p->zero_num; in multifd_recv_thread()
1359 qemu_mutex_unlock(&p->mutex); in multifd_recv_thread()
1365 qemu_sem_wait(&p->sem); in multifd_recv_thread()
1372 if (!qatomic_load_acquire(&p->pending_job)) { in multifd_recv_thread()
1379 qemu_sem_post(&multifd_recv_state->sem_sync); in multifd_recv_thread()
1383 has_data = !!p->data->size; in multifd_recv_thread()
1388 * multifd thread should not be active and receive data in multifd_recv_thread()
1398 ret = multifd_recv_state->ops->recv(p, &local_err); in multifd_recv_thread()
1417 qemu_sem_post(&multifd_recv_state->sem_sync); in multifd_recv_thread()
1418 qemu_sem_wait(&p->sem_sync); in multifd_recv_thread()
1421 p->data->size = 0; in multifd_recv_thread()
1423 * Order data->size update before clearing in multifd_recv_thread()
1427 qatomic_store_release(&p->pending_job, false); in multifd_recv_thread()
1437 trace_multifd_recv_thread_end(p->id, p->packets_recved); in multifd_recv_thread()
1459 multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count); in multifd_recv_setup()
1461 multifd_recv_state->data = g_new0(MultiFDRecvData, 1); in multifd_recv_setup()
1462 multifd_recv_state->data->size = 0; in multifd_recv_setup()
1464 qatomic_set(&multifd_recv_state->count, 0); in multifd_recv_setup()
1465 qatomic_set(&multifd_recv_state->exiting, 0); in multifd_recv_setup()
1466 qemu_sem_init(&multifd_recv_state->sem_sync, 0); in multifd_recv_setup()
1467 multifd_recv_state->ops = multifd_ops[migrate_multifd_compression()]; in multifd_recv_setup()
1470 MultiFDRecvParams *p = &multifd_recv_state->params[i]; in multifd_recv_setup()
1472 qemu_mutex_init(&p->mutex); in multifd_recv_setup()
1473 qemu_sem_init(&p->sem_sync, 0); in multifd_recv_setup()
1474 qemu_sem_init(&p->sem, 0); in multifd_recv_setup()
1475 p->pending_job = false; in multifd_recv_setup()
1476 p->id = i; in multifd_recv_setup()
1478 p->data = g_new0(MultiFDRecvData, 1); in multifd_recv_setup()
1479 p->data->size = 0; in multifd_recv_setup()
1482 p->packet_len = sizeof(MultiFDPacket_t) in multifd_recv_setup()
1484 p->packet = g_malloc0(p->packet_len); in multifd_recv_setup()
1485 p->packet_dev_state = g_malloc0(sizeof(*p->packet_dev_state)); in multifd_recv_setup()
1487 p->name = g_strdup_printf(MIGRATION_THREAD_DST_MULTIFD, i); in multifd_recv_setup()
1488 p->normal = g_new0(ram_addr_t, page_count); in multifd_recv_setup()
1489 p->zero = g_new0(ram_addr_t, page_count); in multifd_recv_setup()
1493 MultiFDRecvParams *p = &multifd_recv_state->params[i]; in multifd_recv_setup()
1496 ret = multifd_recv_state->ops->recv_setup(p, errp); in multifd_recv_setup()
1517 return thread_count == qatomic_read(&multifd_recv_state->count); in multifd_recv_all_channels_created()
1522 * Sets @errp when failing to receive the current channel.
1537 " via multifd channel %d: ", in multifd_recv_new_channel()
1538 qatomic_read(&multifd_recv_state->count)); in multifd_recv_new_channel()
1543 id = qatomic_read(&multifd_recv_state->count); in multifd_recv_new_channel()
1546 p = &multifd_recv_state->params[id]; in multifd_recv_new_channel()
1547 if (p->c != NULL) { in multifd_recv_new_channel()
1554 p->c = ioc; in multifd_recv_new_channel()
1557 p->thread_created = true; in multifd_recv_new_channel()
1558 qemu_thread_create(&p->thread, p->name, multifd_recv_thread, p, in multifd_recv_new_channel()
1560 qatomic_inc(&multifd_recv_state->count); in multifd_recv_new_channel()