xref: /qemu/migration/multifd.c (revision 8ed7c0b6488a7f20318d6ba414f1cbcd0ed92afe)
1 /*
2  * Multifd common code
3  *
4  * Copyright (c) 2019-2020 Red Hat Inc
5  *
6  * Authors:
7  *  Juan Quintela <quintela@redhat.com>
8  *
9  * This work is licensed under the terms of the GNU GPL, version 2 or later.
10  * See the COPYING file in the top-level directory.
11  */
12 
13 #include "qemu/osdep.h"
14 #include "qemu/cutils.h"
15 #include "qemu/iov.h"
16 #include "qemu/rcu.h"
17 #include "exec/target_page.h"
18 #include "system/system.h"
19 #include "system/ramblock.h"
20 #include "qemu/error-report.h"
21 #include "qapi/error.h"
22 #include "file.h"
23 #include "migration/misc.h"
24 #include "migration.h"
25 #include "migration-stats.h"
26 #include "savevm.h"
27 #include "socket.h"
28 #include "tls.h"
29 #include "qemu-file.h"
30 #include "trace.h"
31 #include "multifd.h"
32 #include "threadinfo.h"
33 #include "options.h"
34 #include "qemu/yank.h"
35 #include "io/channel-file.h"
36 #include "io/channel-socket.h"
37 #include "yank_functions.h"
38 
39 typedef struct {
40     uint32_t magic;
41     uint32_t version;
42     unsigned char uuid[16]; /* QemuUUID */
43     uint8_t id;
44     uint8_t unused1[7];     /* Reserved for future use */
45     uint64_t unused2[4];    /* Reserved for future use */
46 } __attribute__((packed)) MultiFDInit_t;
47 
48 struct {
49     MultiFDSendParams *params;
50 
51     /* multifd_send() body is not thread safe, needs serialization */
52     QemuMutex multifd_send_mutex;
53 
54     /*
55      * Global number of generated multifd packets.
56      *
57      * Note that we used 'uintptr_t' because it'll naturally support atomic
58      * operations on both 32bit / 64 bits hosts.  It means on 32bit systems
59      * multifd will overflow the packet_num easier, but that should be
60      * fine.
61      *
62      * Another option is to use QEMU's Stat64 then it'll be 64 bits on all
63      * hosts, however so far it does not support atomic fetch_add() yet.
64      * Make it easy for now.
65      */
66     uintptr_t packet_num;
67     /*
68      * Synchronization point past which no more channels will be
69      * created.
70      */
71     QemuSemaphore channels_created;
72     /* send channels ready */
73     QemuSemaphore channels_ready;
74     /*
75      * Have we already run terminate threads.  There is a race when it
76      * happens that we got one error while we are exiting.
77      * We will use atomic operations.  Only valid values are 0 and 1.
78      */
79     int exiting;
80     /* multifd ops */
81     const MultiFDMethods *ops;
82 } *multifd_send_state;
83 
84 struct {
85     MultiFDRecvParams *params;
86     MultiFDRecvData *data;
87     /* number of created threads */
88     int count;
89     /*
90      * This is always posted by the recv threads, the migration thread
91      * uses it to wait for recv threads to finish assigned tasks.
92      */
93     QemuSemaphore sem_sync;
94     /* global number of generated multifd packets */
95     uint64_t packet_num;
96     int exiting;
97     /* multifd ops */
98     const MultiFDMethods *ops;
99 } *multifd_recv_state;
100 
101 MultiFDSendData *multifd_send_data_alloc(void)
102 {
103     MultiFDSendData *new = g_new0(MultiFDSendData, 1);
104 
105     multifd_ram_payload_alloc(&new->u.ram);
106     /* Device state allocates its payload on-demand */
107 
108     return new;
109 }
110 
111 void multifd_send_data_clear(MultiFDSendData *data)
112 {
113     if (multifd_payload_empty(data)) {
114         return;
115     }
116 
117     switch (data->type) {
118     case MULTIFD_PAYLOAD_DEVICE_STATE:
119         multifd_send_data_clear_device_state(&data->u.device_state);
120         break;
121     default:
122         /* Nothing to do */
123         break;
124     }
125 
126     data->type = MULTIFD_PAYLOAD_NONE;
127 }
128 
129 void multifd_send_data_free(MultiFDSendData *data)
130 {
131     if (!data) {
132         return;
133     }
134 
135     /* This also free's device state payload */
136     multifd_send_data_clear(data);
137 
138     multifd_ram_payload_free(&data->u.ram);
139 
140     g_free(data);
141 }
142 
143 static bool multifd_use_packets(void)
144 {
145     return !migrate_mapped_ram();
146 }
147 
148 void multifd_send_channel_created(void)
149 {
150     qemu_sem_post(&multifd_send_state->channels_created);
151 }
152 
153 static const MultiFDMethods *multifd_ops[MULTIFD_COMPRESSION__MAX] = {};
154 
155 void multifd_register_ops(int method, const MultiFDMethods *ops)
156 {
157     assert(0 <= method && method < MULTIFD_COMPRESSION__MAX);
158     assert(!multifd_ops[method]);
159     multifd_ops[method] = ops;
160 }
161 
162 static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp)
163 {
164     MultiFDInit_t msg = {};
165     size_t size = sizeof(msg);
166     int ret;
167 
168     msg.magic = cpu_to_be32(MULTIFD_MAGIC);
169     msg.version = cpu_to_be32(MULTIFD_VERSION);
170     msg.id = p->id;
171     memcpy(msg.uuid, &qemu_uuid.data, sizeof(msg.uuid));
172 
173     ret = qio_channel_write_all(p->c, (char *)&msg, size, errp);
174     if (ret != 0) {
175         return -1;
176     }
177     stat64_add(&mig_stats.multifd_bytes, size);
178     return 0;
179 }
180 
181 static int multifd_recv_initial_packet(QIOChannel *c, Error **errp)
182 {
183     MultiFDInit_t msg;
184     int ret;
185 
186     ret = qio_channel_read_all(c, (char *)&msg, sizeof(msg), errp);
187     if (ret != 0) {
188         return -1;
189     }
190 
191     msg.magic = be32_to_cpu(msg.magic);
192     msg.version = be32_to_cpu(msg.version);
193 
194     if (msg.magic != MULTIFD_MAGIC) {
195         error_setg(errp, "multifd: received packet magic %x "
196                    "expected %x", msg.magic, MULTIFD_MAGIC);
197         return -1;
198     }
199 
200     if (msg.version != MULTIFD_VERSION) {
201         error_setg(errp, "multifd: received packet version %u "
202                    "expected %u", msg.version, MULTIFD_VERSION);
203         return -1;
204     }
205 
206     if (memcmp(msg.uuid, &qemu_uuid, sizeof(qemu_uuid))) {
207         char *uuid = qemu_uuid_unparse_strdup(&qemu_uuid);
208         char *msg_uuid = qemu_uuid_unparse_strdup((const QemuUUID *)msg.uuid);
209 
210         error_setg(errp, "multifd: received uuid '%s' and expected "
211                    "uuid '%s' for channel %hhd", msg_uuid, uuid, msg.id);
212         g_free(uuid);
213         g_free(msg_uuid);
214         return -1;
215     }
216 
217     if (msg.id > migrate_multifd_channels()) {
218         error_setg(errp, "multifd: received channel id %u is greater than "
219                    "number of channels %u", msg.id, migrate_multifd_channels());
220         return -1;
221     }
222 
223     return msg.id;
224 }
225 
226 /* Fills a RAM multifd packet */
227 void multifd_send_fill_packet(MultiFDSendParams *p)
228 {
229     MultiFDPacket_t *packet = p->packet;
230     uint64_t packet_num;
231     bool sync_packet = p->flags & MULTIFD_FLAG_SYNC;
232 
233     memset(packet, 0, p->packet_len);
234 
235     packet->hdr.magic = cpu_to_be32(MULTIFD_MAGIC);
236     packet->hdr.version = cpu_to_be32(MULTIFD_VERSION);
237 
238     packet->hdr.flags = cpu_to_be32(p->flags);
239     packet->next_packet_size = cpu_to_be32(p->next_packet_size);
240 
241     packet_num = qatomic_fetch_inc(&multifd_send_state->packet_num);
242     packet->packet_num = cpu_to_be64(packet_num);
243 
244     p->packets_sent++;
245 
246     if (!sync_packet) {
247         multifd_ram_fill_packet(p);
248     }
249 
250     trace_multifd_send_fill(p->id, packet_num,
251                             p->flags, p->next_packet_size);
252 }
253 
254 static int multifd_recv_unfill_packet_header(MultiFDRecvParams *p,
255                                              const MultiFDPacketHdr_t *hdr,
256                                              Error **errp)
257 {
258     uint32_t magic = be32_to_cpu(hdr->magic);
259     uint32_t version = be32_to_cpu(hdr->version);
260 
261     if (magic != MULTIFD_MAGIC) {
262         error_setg(errp, "multifd: received packet magic %x, expected %x",
263                    magic, MULTIFD_MAGIC);
264         return -1;
265     }
266 
267     if (version != MULTIFD_VERSION) {
268         error_setg(errp, "multifd: received packet version %u, expected %u",
269                    version, MULTIFD_VERSION);
270         return -1;
271     }
272 
273     p->flags = be32_to_cpu(hdr->flags);
274 
275     return 0;
276 }
277 
278 static int multifd_recv_unfill_packet_device_state(MultiFDRecvParams *p,
279                                                    Error **errp)
280 {
281     MultiFDPacketDeviceState_t *packet = p->packet_dev_state;
282 
283     packet->instance_id = be32_to_cpu(packet->instance_id);
284     p->next_packet_size = be32_to_cpu(packet->next_packet_size);
285 
286     return 0;
287 }
288 
289 static int multifd_recv_unfill_packet_ram(MultiFDRecvParams *p, Error **errp)
290 {
291     const MultiFDPacket_t *packet = p->packet;
292     int ret = 0;
293 
294     p->next_packet_size = be32_to_cpu(packet->next_packet_size);
295     p->packet_num = be64_to_cpu(packet->packet_num);
296 
297     /* Always unfill, old QEMUs (<9.0) send data along with SYNC */
298     ret = multifd_ram_unfill_packet(p, errp);
299 
300     trace_multifd_recv_unfill(p->id, p->packet_num, p->flags,
301                               p->next_packet_size);
302 
303     return ret;
304 }
305 
306 static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp)
307 {
308     p->packets_recved++;
309 
310     if (p->flags & MULTIFD_FLAG_DEVICE_STATE) {
311         return multifd_recv_unfill_packet_device_state(p, errp);
312     }
313 
314     return multifd_recv_unfill_packet_ram(p, errp);
315 }
316 
317 static bool multifd_send_should_exit(void)
318 {
319     return qatomic_read(&multifd_send_state->exiting);
320 }
321 
322 static bool multifd_recv_should_exit(void)
323 {
324     return qatomic_read(&multifd_recv_state->exiting);
325 }
326 
327 /*
328  * The migration thread can wait on either of the two semaphores.  This
329  * function can be used to kick the main thread out of waiting on either of
330  * them.  Should mostly only be called when something wrong happened with
331  * the current multifd send thread.
332  */
333 static void multifd_send_kick_main(MultiFDSendParams *p)
334 {
335     qemu_sem_post(&p->sem_sync);
336     qemu_sem_post(&multifd_send_state->channels_ready);
337 }
338 
339 /*
340  * multifd_send() works by exchanging the MultiFDSendData object
341  * provided by the caller with an unused MultiFDSendData object from
342  * the next channel that is found to be idle.
343  *
344  * The channel owns the data until it finishes transmitting and the
345  * caller owns the empty object until it fills it with data and calls
346  * this function again. No locking necessary.
347  *
348  * Switching is safe because both the migration thread and the channel
349  * thread have barriers in place to serialize access.
350  *
351  * Returns true if succeed, false otherwise.
352  */
353 bool multifd_send(MultiFDSendData **send_data)
354 {
355     int i;
356     static int next_channel;
357     MultiFDSendParams *p = NULL; /* make happy gcc */
358     MultiFDSendData *tmp;
359 
360     if (multifd_send_should_exit()) {
361         return false;
362     }
363 
364     QEMU_LOCK_GUARD(&multifd_send_state->multifd_send_mutex);
365 
366     /* We wait here, until at least one channel is ready */
367     qemu_sem_wait(&multifd_send_state->channels_ready);
368 
369     /*
370      * next_channel can remain from a previous migration that was
371      * using more channels, so ensure it doesn't overflow if the
372      * limit is lower now.
373      */
374     next_channel %= migrate_multifd_channels();
375     for (i = next_channel;; i = (i + 1) % migrate_multifd_channels()) {
376         if (multifd_send_should_exit()) {
377             return false;
378         }
379         p = &multifd_send_state->params[i];
380         /*
381          * Lockless read to p->pending_job is safe, because only multifd
382          * sender thread can clear it.
383          */
384         if (qatomic_read(&p->pending_job) == false) {
385             next_channel = (i + 1) % migrate_multifd_channels();
386             break;
387         }
388     }
389 
390     /*
391      * Make sure we read p->pending_job before all the rest.  Pairs with
392      * qatomic_store_release() in multifd_send_thread().
393      */
394     smp_mb_acquire();
395 
396     assert(multifd_payload_empty(p->data));
397 
398     /*
399      * Swap the pointers. The channel gets the client data for
400      * transferring and the client gets back an unused data slot.
401      */
402     tmp = *send_data;
403     *send_data = p->data;
404     p->data = tmp;
405 
406     /*
407      * Making sure p->data is setup before marking pending_job=true. Pairs
408      * with the qatomic_load_acquire() in multifd_send_thread().
409      */
410     qatomic_store_release(&p->pending_job, true);
411     qemu_sem_post(&p->sem);
412 
413     return true;
414 }
415 
416 /* Multifd send side hit an error; remember it and prepare to quit */
417 static void multifd_send_set_error(Error *err)
418 {
419     /*
420      * We don't want to exit each threads twice.  Depending on where
421      * we get the error, or if there are two independent errors in two
422      * threads at the same time, we can end calling this function
423      * twice.
424      */
425     if (qatomic_xchg(&multifd_send_state->exiting, 1)) {
426         return;
427     }
428 
429     if (err) {
430         MigrationState *s = migrate_get_current();
431         migrate_set_error(s, err);
432         if (s->state == MIGRATION_STATUS_SETUP ||
433             s->state == MIGRATION_STATUS_PRE_SWITCHOVER ||
434             s->state == MIGRATION_STATUS_DEVICE ||
435             s->state == MIGRATION_STATUS_ACTIVE) {
436             migrate_set_state(&s->state, s->state,
437                               MIGRATION_STATUS_FAILED);
438         }
439     }
440 }
441 
442 static void multifd_send_terminate_threads(void)
443 {
444     int i;
445 
446     trace_multifd_send_terminate_threads();
447 
448     /*
449      * Tell everyone we're quitting.  No xchg() needed here; we simply
450      * always set it.
451      */
452     qatomic_set(&multifd_send_state->exiting, 1);
453 
454     /*
455      * Firstly, kick all threads out; no matter whether they are just idle,
456      * or blocked in an IO system call.
457      */
458     for (i = 0; i < migrate_multifd_channels(); i++) {
459         MultiFDSendParams *p = &multifd_send_state->params[i];
460 
461         qemu_sem_post(&p->sem);
462         if (p->c) {
463             qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
464         }
465     }
466 
467     /*
468      * Finally recycle all the threads.
469      */
470     for (i = 0; i < migrate_multifd_channels(); i++) {
471         MultiFDSendParams *p = &multifd_send_state->params[i];
472 
473         if (p->tls_thread_created) {
474             qemu_thread_join(&p->tls_thread);
475         }
476 
477         if (p->thread_created) {
478             qemu_thread_join(&p->thread);
479         }
480     }
481 }
482 
483 static bool multifd_send_cleanup_channel(MultiFDSendParams *p, Error **errp)
484 {
485     if (p->c) {
486         migration_ioc_unregister_yank(p->c);
487         /*
488          * The object_unref() cannot guarantee the fd will always be
489          * released because finalize() of the iochannel is only
490          * triggered on the last reference and it's not guaranteed
491          * that we always hold the last refcount when reaching here.
492          *
493          * Closing the fd explicitly has the benefit that if there is any
494          * registered I/O handler callbacks on such fd, that will get a
495          * POLLNVAL event and will further trigger the cleanup to finally
496          * release the IOC.
497          *
498          * FIXME: It should logically be guaranteed that all multifd
499          * channels have no I/O handler callback registered when reaching
500          * here, because migration thread will wait for all multifd channel
501          * establishments to complete during setup.  Since
502          * migration_cleanup() will be scheduled in main thread too, all
503          * previous callbacks should guarantee to be completed when
504          * reaching here.  See multifd_send_state.channels_created and its
505          * usage.  In the future, we could replace this with an assert
506          * making sure we're the last reference, or simply drop it if above
507          * is more clear to be justified.
508          */
509         qio_channel_close(p->c, &error_abort);
510         object_unref(OBJECT(p->c));
511         p->c = NULL;
512     }
513     qemu_sem_destroy(&p->sem);
514     qemu_sem_destroy(&p->sem_sync);
515     g_free(p->name);
516     p->name = NULL;
517     g_clear_pointer(&p->data, multifd_send_data_free);
518     p->packet_len = 0;
519     g_clear_pointer(&p->packet_device_state, g_free);
520     g_free(p->packet);
521     p->packet = NULL;
522     multifd_send_state->ops->send_cleanup(p, errp);
523     assert(!p->iov);
524 
525     return *errp == NULL;
526 }
527 
528 static void multifd_send_cleanup_state(void)
529 {
530     file_cleanup_outgoing_migration();
531     socket_cleanup_outgoing_migration();
532     multifd_device_state_send_cleanup();
533     qemu_sem_destroy(&multifd_send_state->channels_created);
534     qemu_sem_destroy(&multifd_send_state->channels_ready);
535     qemu_mutex_destroy(&multifd_send_state->multifd_send_mutex);
536     g_free(multifd_send_state->params);
537     multifd_send_state->params = NULL;
538     g_free(multifd_send_state);
539     multifd_send_state = NULL;
540 }
541 
542 void multifd_send_shutdown(void)
543 {
544     int i;
545 
546     if (!migrate_multifd()) {
547         return;
548     }
549 
550     for (i = 0; i < migrate_multifd_channels(); i++) {
551         MultiFDSendParams *p = &multifd_send_state->params[i];
552 
553         /* thread_created implies the TLS handshake has succeeded */
554         if (p->tls_thread_created && p->thread_created) {
555             Error *local_err = NULL;
556             /*
557              * The destination expects the TLS session to always be
558              * properly terminated. This helps to detect a premature
559              * termination in the middle of the stream.  Note that
560              * older QEMUs always break the connection on the source
561              * and the destination always sees
562              * GNUTLS_E_PREMATURE_TERMINATION.
563              */
564             migration_tls_channel_end(p->c, &local_err);
565 
566             /*
567              * The above can return an error in case the migration has
568              * already failed. If the migration succeeded, errors are
569              * not expected but there's no need to kill the source.
570              */
571             if (local_err && !migration_has_failed(migrate_get_current())) {
572                 warn_report(
573                     "multifd_send_%d: Failed to terminate TLS connection: %s",
574                     p->id, error_get_pretty(local_err));
575                 break;
576             }
577         }
578     }
579 
580     multifd_send_terminate_threads();
581 
582     for (i = 0; i < migrate_multifd_channels(); i++) {
583         MultiFDSendParams *p = &multifd_send_state->params[i];
584         Error *local_err = NULL;
585 
586         if (!multifd_send_cleanup_channel(p, &local_err)) {
587             migrate_set_error(migrate_get_current(), local_err);
588             error_free(local_err);
589         }
590     }
591 
592     multifd_send_cleanup_state();
593 }
594 
595 static int multifd_zero_copy_flush(QIOChannel *c)
596 {
597     int ret;
598     Error *err = NULL;
599 
600     ret = qio_channel_flush(c, &err);
601     if (ret < 0) {
602         error_report_err(err);
603         return -1;
604     }
605     if (ret == 1) {
606         stat64_add(&mig_stats.dirty_sync_missed_zero_copy, 1);
607     }
608 
609     return ret;
610 }
611 
612 int multifd_send_sync_main(MultiFDSyncReq req)
613 {
614     int i;
615     bool flush_zero_copy;
616 
617     assert(req != MULTIFD_SYNC_NONE);
618 
619     flush_zero_copy = migrate_zero_copy_send();
620 
621     for (i = 0; i < migrate_multifd_channels(); i++) {
622         MultiFDSendParams *p = &multifd_send_state->params[i];
623 
624         if (multifd_send_should_exit()) {
625             return -1;
626         }
627 
628         trace_multifd_send_sync_main_signal(p->id);
629 
630         /*
631          * We should be the only user so far, so not possible to be set by
632          * others concurrently.
633          */
634         assert(qatomic_read(&p->pending_sync) == MULTIFD_SYNC_NONE);
635         qatomic_set(&p->pending_sync, req);
636         qemu_sem_post(&p->sem);
637     }
638     for (i = 0; i < migrate_multifd_channels(); i++) {
639         MultiFDSendParams *p = &multifd_send_state->params[i];
640 
641         if (multifd_send_should_exit()) {
642             return -1;
643         }
644 
645         qemu_sem_wait(&multifd_send_state->channels_ready);
646         trace_multifd_send_sync_main_wait(p->id);
647         qemu_sem_wait(&p->sem_sync);
648 
649         if (flush_zero_copy && p->c && (multifd_zero_copy_flush(p->c) < 0)) {
650             return -1;
651         }
652     }
653     trace_multifd_send_sync_main(multifd_send_state->packet_num);
654 
655     return 0;
656 }
657 
658 static void *multifd_send_thread(void *opaque)
659 {
660     MultiFDSendParams *p = opaque;
661     MigrationThread *thread = NULL;
662     Error *local_err = NULL;
663     int ret = 0;
664     bool use_packets = multifd_use_packets();
665 
666     thread = migration_threads_add(p->name, qemu_get_thread_id());
667 
668     trace_multifd_send_thread_start(p->id);
669     rcu_register_thread();
670 
671     if (use_packets) {
672         if (multifd_send_initial_packet(p, &local_err) < 0) {
673             ret = -1;
674             goto out;
675         }
676     }
677 
678     while (true) {
679         qemu_sem_post(&multifd_send_state->channels_ready);
680         qemu_sem_wait(&p->sem);
681 
682         if (multifd_send_should_exit()) {
683             break;
684         }
685 
686         /*
687          * Read pending_job flag before p->data.  Pairs with the
688          * qatomic_store_release() in multifd_send().
689          */
690         if (qatomic_load_acquire(&p->pending_job)) {
691             bool is_device_state = multifd_payload_device_state(p->data);
692             size_t total_size;
693 
694             p->flags = 0;
695             p->iovs_num = 0;
696             assert(!multifd_payload_empty(p->data));
697 
698             if (is_device_state) {
699                 multifd_device_state_send_prepare(p);
700             } else {
701                 ret = multifd_send_state->ops->send_prepare(p, &local_err);
702                 if (ret != 0) {
703                     break;
704                 }
705             }
706 
707             /*
708              * The packet header in the zerocopy RAM case is accounted for
709              * in multifd_nocomp_send_prepare() - where it is actually
710              * being sent.
711              */
712             total_size = iov_size(p->iov, p->iovs_num);
713 
714             if (migrate_mapped_ram()) {
715                 assert(!is_device_state);
716 
717                 ret = file_write_ramblock_iov(p->c, p->iov, p->iovs_num,
718                                               &p->data->u.ram, &local_err);
719             } else {
720                 ret = qio_channel_writev_full_all(p->c, p->iov, p->iovs_num,
721                                                   NULL, 0, p->write_flags,
722                                                   &local_err);
723             }
724 
725             if (ret != 0) {
726                 break;
727             }
728 
729             stat64_add(&mig_stats.multifd_bytes, total_size);
730 
731             p->next_packet_size = 0;
732             multifd_send_data_clear(p->data);
733 
734             /*
735              * Making sure p->data is published before saying "we're
736              * free".  Pairs with the smp_mb_acquire() in
737              * multifd_send().
738              */
739             qatomic_store_release(&p->pending_job, false);
740         } else {
741             MultiFDSyncReq req = qatomic_read(&p->pending_sync);
742 
743             /*
744              * If not a normal job, must be a sync request.  Note that
745              * pending_sync is a standalone flag (unlike pending_job), so
746              * it doesn't require explicit memory barriers.
747              */
748             assert(req != MULTIFD_SYNC_NONE);
749 
750             /* Only push the SYNC message if it involves a remote sync */
751             if (req == MULTIFD_SYNC_ALL) {
752                 p->flags = MULTIFD_FLAG_SYNC;
753                 multifd_send_fill_packet(p);
754                 ret = qio_channel_write_all(p->c, (void *)p->packet,
755                                             p->packet_len, &local_err);
756                 if (ret != 0) {
757                     break;
758                 }
759                 /* p->next_packet_size will always be zero for a SYNC packet */
760                 stat64_add(&mig_stats.multifd_bytes, p->packet_len);
761             }
762 
763             qatomic_set(&p->pending_sync, MULTIFD_SYNC_NONE);
764             qemu_sem_post(&p->sem_sync);
765         }
766     }
767 
768 out:
769     if (ret) {
770         assert(local_err);
771         trace_multifd_send_error(p->id);
772         multifd_send_set_error(local_err);
773         multifd_send_kick_main(p);
774         error_free(local_err);
775     }
776 
777     rcu_unregister_thread();
778     migration_threads_remove(thread);
779     trace_multifd_send_thread_end(p->id, p->packets_sent);
780 
781     return NULL;
782 }
783 
784 static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque);
785 
786 typedef struct {
787     MultiFDSendParams *p;
788     QIOChannelTLS *tioc;
789 } MultiFDTLSThreadArgs;
790 
791 static void *multifd_tls_handshake_thread(void *opaque)
792 {
793     MultiFDTLSThreadArgs *args = opaque;
794 
795     qio_channel_tls_handshake(args->tioc,
796                               multifd_new_send_channel_async,
797                               args->p,
798                               NULL,
799                               NULL);
800     g_free(args);
801 
802     return NULL;
803 }
804 
805 static bool multifd_tls_channel_connect(MultiFDSendParams *p,
806                                         QIOChannel *ioc,
807                                         Error **errp)
808 {
809     MigrationState *s = migrate_get_current();
810     const char *hostname = s->hostname;
811     MultiFDTLSThreadArgs *args;
812     QIOChannelTLS *tioc;
813 
814     tioc = migration_tls_client_create(ioc, hostname, errp);
815     if (!tioc) {
816         return false;
817     }
818 
819     /*
820      * Ownership of the socket channel now transfers to the newly
821      * created TLS channel, which has already taken a reference.
822      */
823     object_unref(OBJECT(ioc));
824     trace_multifd_tls_outgoing_handshake_start(ioc, tioc, hostname);
825     qio_channel_set_name(QIO_CHANNEL(tioc), "multifd-tls-outgoing");
826 
827     args = g_new0(MultiFDTLSThreadArgs, 1);
828     args->tioc = tioc;
829     args->p = p;
830 
831     p->tls_thread_created = true;
832     qemu_thread_create(&p->tls_thread, MIGRATION_THREAD_SRC_TLS,
833                        multifd_tls_handshake_thread, args,
834                        QEMU_THREAD_JOINABLE);
835     return true;
836 }
837 
838 void multifd_channel_connect(MultiFDSendParams *p, QIOChannel *ioc)
839 {
840     qio_channel_set_delay(ioc, false);
841 
842     migration_ioc_register_yank(ioc);
843     /* Setup p->c only if the channel is completely setup */
844     p->c = ioc;
845 
846     p->thread_created = true;
847     qemu_thread_create(&p->thread, p->name, multifd_send_thread, p,
848                        QEMU_THREAD_JOINABLE);
849 }
850 
851 /*
852  * When TLS is enabled this function is called once to establish the
853  * TLS connection and a second time after the TLS handshake to create
854  * the multifd channel. Without TLS it goes straight into the channel
855  * creation.
856  */
857 static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque)
858 {
859     MultiFDSendParams *p = opaque;
860     QIOChannel *ioc = QIO_CHANNEL(qio_task_get_source(task));
861     Error *local_err = NULL;
862     bool ret;
863 
864     trace_multifd_new_send_channel_async(p->id);
865 
866     if (qio_task_propagate_error(task, &local_err)) {
867         ret = false;
868         goto out;
869     }
870 
871     trace_multifd_set_outgoing_channel(ioc, object_get_typename(OBJECT(ioc)),
872                                        migrate_get_current()->hostname);
873 
874     if (migrate_channel_requires_tls_upgrade(ioc)) {
875         ret = multifd_tls_channel_connect(p, ioc, &local_err);
876         if (ret) {
877             return;
878         }
879     } else {
880         multifd_channel_connect(p, ioc);
881         ret = true;
882     }
883 
884 out:
885     /*
886      * Here we're not interested whether creation succeeded, only that
887      * it happened at all.
888      */
889     multifd_send_channel_created();
890 
891     if (ret) {
892         return;
893     }
894 
895     trace_multifd_new_send_channel_async_error(p->id, local_err);
896     multifd_send_set_error(local_err);
897     /*
898      * For error cases (TLS or non-TLS), IO channel is always freed here
899      * rather than when cleanup multifd: since p->c is not set, multifd
900      * cleanup code doesn't even know its existence.
901      */
902     object_unref(OBJECT(ioc));
903     error_free(local_err);
904 }
905 
906 static bool multifd_new_send_channel_create(gpointer opaque, Error **errp)
907 {
908     if (!multifd_use_packets()) {
909         return file_send_channel_create(opaque, errp);
910     }
911 
912     socket_send_channel_create(multifd_new_send_channel_async, opaque);
913     return true;
914 }
915 
916 bool multifd_send_setup(void)
917 {
918     MigrationState *s = migrate_get_current();
919     int thread_count, ret = 0;
920     uint32_t page_count = multifd_ram_page_count();
921     bool use_packets = multifd_use_packets();
922     uint8_t i;
923 
924     if (!migrate_multifd()) {
925         return true;
926     }
927 
928     thread_count = migrate_multifd_channels();
929     multifd_send_state = g_malloc0(sizeof(*multifd_send_state));
930     multifd_send_state->params = g_new0(MultiFDSendParams, thread_count);
931     qemu_mutex_init(&multifd_send_state->multifd_send_mutex);
932     qemu_sem_init(&multifd_send_state->channels_created, 0);
933     qemu_sem_init(&multifd_send_state->channels_ready, 0);
934     qatomic_set(&multifd_send_state->exiting, 0);
935     multifd_send_state->ops = multifd_ops[migrate_multifd_compression()];
936 
937     for (i = 0; i < thread_count; i++) {
938         MultiFDSendParams *p = &multifd_send_state->params[i];
939         Error *local_err = NULL;
940 
941         qemu_sem_init(&p->sem, 0);
942         qemu_sem_init(&p->sem_sync, 0);
943         p->id = i;
944         p->data = multifd_send_data_alloc();
945 
946         if (use_packets) {
947             p->packet_len = sizeof(MultiFDPacket_t)
948                           + sizeof(uint64_t) * page_count;
949             p->packet = g_malloc0(p->packet_len);
950             p->packet_device_state = g_malloc0(sizeof(*p->packet_device_state));
951             p->packet_device_state->hdr.magic = cpu_to_be32(MULTIFD_MAGIC);
952             p->packet_device_state->hdr.version = cpu_to_be32(MULTIFD_VERSION);
953         }
954         p->name = g_strdup_printf(MIGRATION_THREAD_SRC_MULTIFD, i);
955         p->write_flags = 0;
956 
957         if (!multifd_new_send_channel_create(p, &local_err)) {
958             migrate_set_error(s, local_err);
959             ret = -1;
960         }
961     }
962 
963     /*
964      * Wait until channel creation has started for all channels. The
965      * creation can still fail, but no more channels will be created
966      * past this point.
967      */
968     for (i = 0; i < thread_count; i++) {
969         qemu_sem_wait(&multifd_send_state->channels_created);
970     }
971 
972     if (ret) {
973         goto err;
974     }
975 
976     for (i = 0; i < thread_count; i++) {
977         MultiFDSendParams *p = &multifd_send_state->params[i];
978         Error *local_err = NULL;
979 
980         ret = multifd_send_state->ops->send_setup(p, &local_err);
981         if (ret) {
982             migrate_set_error(s, local_err);
983             goto err;
984         }
985         assert(p->iov);
986     }
987 
988     multifd_device_state_send_setup();
989 
990     return true;
991 
992 err:
993     migrate_set_state(&s->state, MIGRATION_STATUS_SETUP,
994                       MIGRATION_STATUS_FAILED);
995     return false;
996 }
997 
998 bool multifd_recv(void)
999 {
1000     int i;
1001     static int next_recv_channel;
1002     MultiFDRecvParams *p = NULL;
1003     MultiFDRecvData *data = multifd_recv_state->data;
1004 
1005     /*
1006      * next_channel can remain from a previous migration that was
1007      * using more channels, so ensure it doesn't overflow if the
1008      * limit is lower now.
1009      */
1010     next_recv_channel %= migrate_multifd_channels();
1011     for (i = next_recv_channel;; i = (i + 1) % migrate_multifd_channels()) {
1012         if (multifd_recv_should_exit()) {
1013             return false;
1014         }
1015 
1016         p = &multifd_recv_state->params[i];
1017 
1018         if (qatomic_read(&p->pending_job) == false) {
1019             next_recv_channel = (i + 1) % migrate_multifd_channels();
1020             break;
1021         }
1022     }
1023 
1024     /*
1025      * Order pending_job read before manipulating p->data below. Pairs
1026      * with qatomic_store_release() at multifd_recv_thread().
1027      */
1028     smp_mb_acquire();
1029 
1030     assert(!p->data->size);
1031     multifd_recv_state->data = p->data;
1032     p->data = data;
1033 
1034     /*
1035      * Order p->data update before setting pending_job. Pairs with
1036      * qatomic_load_acquire() at multifd_recv_thread().
1037      */
1038     qatomic_store_release(&p->pending_job, true);
1039     qemu_sem_post(&p->sem);
1040 
1041     return true;
1042 }
1043 
1044 MultiFDRecvData *multifd_get_recv_data(void)
1045 {
1046     return multifd_recv_state->data;
1047 }
1048 
1049 static void multifd_recv_terminate_threads(Error *err)
1050 {
1051     int i;
1052 
1053     trace_multifd_recv_terminate_threads(err != NULL);
1054 
1055     if (qatomic_xchg(&multifd_recv_state->exiting, 1)) {
1056         return;
1057     }
1058 
1059     if (err) {
1060         MigrationState *s = migrate_get_current();
1061         migrate_set_error(s, err);
1062         if (s->state == MIGRATION_STATUS_SETUP ||
1063             s->state == MIGRATION_STATUS_ACTIVE) {
1064             migrate_set_state(&s->state, s->state,
1065                               MIGRATION_STATUS_FAILED);
1066         }
1067     }
1068 
1069     for (i = 0; i < migrate_multifd_channels(); i++) {
1070         MultiFDRecvParams *p = &multifd_recv_state->params[i];
1071 
1072         /*
1073          * The migration thread and channels interact differently
1074          * depending on the presence of packets.
1075          */
1076         if (multifd_use_packets()) {
1077             /*
1078              * The channel receives as long as there are packets. When
1079              * packets end (i.e. MULTIFD_FLAG_SYNC is reached), the
1080              * channel waits for the migration thread to sync. If the
1081              * sync never happens, do it here.
1082              */
1083             qemu_sem_post(&p->sem_sync);
1084         } else {
1085             /*
1086              * The channel waits for the migration thread to give it
1087              * work. When the migration thread runs out of work, it
1088              * releases the channel and waits for any pending work to
1089              * finish. If we reach here (e.g. due to error) before the
1090              * work runs out, release the channel.
1091              */
1092             qemu_sem_post(&p->sem);
1093         }
1094 
1095         /*
1096          * We could arrive here for two reasons:
1097          *  - normal quit, i.e. everything went fine, just finished
1098          *  - error quit: We close the channels so the channel threads
1099          *    finish the qio_channel_read_all_eof()
1100          */
1101         if (p->c) {
1102             qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
1103         }
1104     }
1105 }
1106 
1107 void multifd_recv_shutdown(void)
1108 {
1109     if (migrate_multifd()) {
1110         multifd_recv_terminate_threads(NULL);
1111     }
1112 }
1113 
1114 static void multifd_recv_cleanup_channel(MultiFDRecvParams *p)
1115 {
1116     migration_ioc_unregister_yank(p->c);
1117     object_unref(OBJECT(p->c));
1118     p->c = NULL;
1119     qemu_mutex_destroy(&p->mutex);
1120     qemu_sem_destroy(&p->sem_sync);
1121     qemu_sem_destroy(&p->sem);
1122     g_free(p->data);
1123     p->data = NULL;
1124     g_free(p->name);
1125     p->name = NULL;
1126     p->packet_len = 0;
1127     g_free(p->packet);
1128     p->packet = NULL;
1129     g_clear_pointer(&p->packet_dev_state, g_free);
1130     g_free(p->normal);
1131     p->normal = NULL;
1132     g_free(p->zero);
1133     p->zero = NULL;
1134     multifd_recv_state->ops->recv_cleanup(p);
1135 }
1136 
1137 static void multifd_recv_cleanup_state(void)
1138 {
1139     qemu_sem_destroy(&multifd_recv_state->sem_sync);
1140     g_free(multifd_recv_state->params);
1141     multifd_recv_state->params = NULL;
1142     g_free(multifd_recv_state->data);
1143     multifd_recv_state->data = NULL;
1144     g_free(multifd_recv_state);
1145     multifd_recv_state = NULL;
1146 }
1147 
1148 void multifd_recv_cleanup(void)
1149 {
1150     int i;
1151 
1152     if (!migrate_multifd()) {
1153         return;
1154     }
1155     multifd_recv_terminate_threads(NULL);
1156     for (i = 0; i < migrate_multifd_channels(); i++) {
1157         MultiFDRecvParams *p = &multifd_recv_state->params[i];
1158 
1159         if (p->thread_created) {
1160             qemu_thread_join(&p->thread);
1161         }
1162     }
1163     for (i = 0; i < migrate_multifd_channels(); i++) {
1164         multifd_recv_cleanup_channel(&multifd_recv_state->params[i]);
1165     }
1166     multifd_recv_cleanup_state();
1167 }
1168 
1169 void multifd_recv_sync_main(void)
1170 {
1171     int thread_count = migrate_multifd_channels();
1172     bool file_based = !multifd_use_packets();
1173     int i;
1174 
1175     if (!migrate_multifd()) {
1176         return;
1177     }
1178 
1179     /*
1180      * File-based channels don't use packets and therefore need to
1181      * wait for more work. Release them to start the sync.
1182      */
1183     if (file_based) {
1184         for (i = 0; i < thread_count; i++) {
1185             MultiFDRecvParams *p = &multifd_recv_state->params[i];
1186 
1187             trace_multifd_recv_sync_main_signal(p->id);
1188             qemu_sem_post(&p->sem);
1189         }
1190     }
1191 
1192     /*
1193      * Initiate the synchronization by waiting for all channels.
1194      *
1195      * For socket-based migration this means each channel has received
1196      * the SYNC packet on the stream.
1197      *
1198      * For file-based migration this means each channel is done with
1199      * the work (pending_job=false).
1200      */
1201     for (i = 0; i < thread_count; i++) {
1202         trace_multifd_recv_sync_main_wait(i);
1203         qemu_sem_wait(&multifd_recv_state->sem_sync);
1204     }
1205 
1206     if (file_based) {
1207         /*
1208          * For file-based loading is done in one iteration. We're
1209          * done.
1210          */
1211         return;
1212     }
1213 
1214     /*
1215      * Sync done. Release the channels for the next iteration.
1216      */
1217     for (i = 0; i < thread_count; i++) {
1218         MultiFDRecvParams *p = &multifd_recv_state->params[i];
1219 
1220         WITH_QEMU_LOCK_GUARD(&p->mutex) {
1221             if (multifd_recv_state->packet_num < p->packet_num) {
1222                 multifd_recv_state->packet_num = p->packet_num;
1223             }
1224         }
1225         trace_multifd_recv_sync_main_signal(p->id);
1226         qemu_sem_post(&p->sem_sync);
1227     }
1228     trace_multifd_recv_sync_main(multifd_recv_state->packet_num);
1229 }
1230 
1231 static int multifd_device_state_recv(MultiFDRecvParams *p, Error **errp)
1232 {
1233     g_autofree char *dev_state_buf = NULL;
1234     int ret;
1235 
1236     dev_state_buf = g_malloc(p->next_packet_size);
1237 
1238     ret = qio_channel_read_all(p->c, dev_state_buf, p->next_packet_size, errp);
1239     if (ret != 0) {
1240         return ret;
1241     }
1242 
1243     if (p->packet_dev_state->idstr[sizeof(p->packet_dev_state->idstr) - 1]
1244         != 0) {
1245         error_setg(errp, "unterminated multifd device state idstr");
1246         return -1;
1247     }
1248 
1249     if (!qemu_loadvm_load_state_buffer(p->packet_dev_state->idstr,
1250                                        p->packet_dev_state->instance_id,
1251                                        dev_state_buf, p->next_packet_size,
1252                                        errp)) {
1253         ret = -1;
1254     }
1255 
1256     return ret;
1257 }
1258 
1259 static void *multifd_recv_thread(void *opaque)
1260 {
1261     MigrationState *s = migrate_get_current();
1262     MultiFDRecvParams *p = opaque;
1263     Error *local_err = NULL;
1264     bool use_packets = multifd_use_packets();
1265     int ret;
1266 
1267     trace_multifd_recv_thread_start(p->id);
1268     rcu_register_thread();
1269 
1270     if (!s->multifd_clean_tls_termination) {
1271         p->read_flags = QIO_CHANNEL_READ_FLAG_RELAXED_EOF;
1272     }
1273 
1274     while (true) {
1275         MultiFDPacketHdr_t hdr;
1276         uint32_t flags = 0;
1277         bool is_device_state = false;
1278         bool has_data = false;
1279         uint8_t *pkt_buf;
1280         size_t pkt_len;
1281 
1282         p->normal_num = 0;
1283 
1284         if (use_packets) {
1285             struct iovec iov = {
1286                 .iov_base = (void *)&hdr,
1287                 .iov_len = sizeof(hdr)
1288             };
1289 
1290             if (multifd_recv_should_exit()) {
1291                 break;
1292             }
1293 
1294             ret = qio_channel_readv_full_all_eof(p->c, &iov, 1, NULL, NULL,
1295                                                  p->read_flags, &local_err);
1296             if (!ret) {
1297                 /* EOF */
1298                 assert(!local_err);
1299                 break;
1300             }
1301 
1302             if (ret == -1) {
1303                 break;
1304             }
1305 
1306             ret = multifd_recv_unfill_packet_header(p, &hdr, &local_err);
1307             if (ret) {
1308                 break;
1309             }
1310 
1311             is_device_state = p->flags & MULTIFD_FLAG_DEVICE_STATE;
1312             if (is_device_state) {
1313                 pkt_buf = (uint8_t *)p->packet_dev_state + sizeof(hdr);
1314                 pkt_len = sizeof(*p->packet_dev_state) - sizeof(hdr);
1315             } else {
1316                 pkt_buf = (uint8_t *)p->packet + sizeof(hdr);
1317                 pkt_len = p->packet_len - sizeof(hdr);
1318             }
1319 
1320             ret = qio_channel_read_all_eof(p->c, (char *)pkt_buf, pkt_len,
1321                                            &local_err);
1322             if (!ret) {
1323                 /* EOF */
1324                 error_setg(&local_err, "multifd: unexpected EOF after packet header");
1325                 break;
1326             }
1327 
1328             if (ret == -1) {
1329                 break;
1330             }
1331 
1332             qemu_mutex_lock(&p->mutex);
1333             ret = multifd_recv_unfill_packet(p, &local_err);
1334             if (ret) {
1335                 qemu_mutex_unlock(&p->mutex);
1336                 break;
1337             }
1338 
1339             flags = p->flags;
1340             /* recv methods don't know how to handle the SYNC flag */
1341             p->flags &= ~MULTIFD_FLAG_SYNC;
1342 
1343             if (is_device_state) {
1344                 has_data = p->next_packet_size > 0;
1345             } else {
1346                 /*
1347                  * Even if it's a SYNC packet, this needs to be set
1348                  * because older QEMUs (<9.0) still send data along with
1349                  * the SYNC packet.
1350                  */
1351                 has_data = p->normal_num || p->zero_num;
1352             }
1353 
1354             qemu_mutex_unlock(&p->mutex);
1355         } else {
1356             /*
1357              * No packets, so we need to wait for the vmstate code to
1358              * give us work.
1359              */
1360             qemu_sem_wait(&p->sem);
1361 
1362             if (multifd_recv_should_exit()) {
1363                 break;
1364             }
1365 
1366             /* pairs with qatomic_store_release() at multifd_recv() */
1367             if (!qatomic_load_acquire(&p->pending_job)) {
1368                 /*
1369                  * Migration thread did not send work, this is
1370                  * equivalent to pending_sync on the sending
1371                  * side. Post sem_sync to notify we reached this
1372                  * point.
1373                  */
1374                 qemu_sem_post(&multifd_recv_state->sem_sync);
1375                 continue;
1376             }
1377 
1378             has_data = !!p->data->size;
1379         }
1380 
1381         if (has_data) {
1382             if (is_device_state) {
1383                 assert(use_packets);
1384                 ret = multifd_device_state_recv(p, &local_err);
1385             } else {
1386                 ret = multifd_recv_state->ops->recv(p, &local_err);
1387             }
1388             if (ret != 0) {
1389                 break;
1390             }
1391         } else if (is_device_state) {
1392             error_setg(&local_err,
1393                        "multifd: received empty device state packet");
1394             break;
1395         }
1396 
1397         if (use_packets) {
1398             if (flags & MULTIFD_FLAG_SYNC) {
1399                 if (is_device_state) {
1400                     error_setg(&local_err,
1401                                "multifd: received SYNC device state packet");
1402                     break;
1403                 }
1404 
1405                 qemu_sem_post(&multifd_recv_state->sem_sync);
1406                 qemu_sem_wait(&p->sem_sync);
1407             }
1408         } else {
1409             p->data->size = 0;
1410             /*
1411              * Order data->size update before clearing
1412              * pending_job. Pairs with smp_mb_acquire() at
1413              * multifd_recv().
1414              */
1415             qatomic_store_release(&p->pending_job, false);
1416         }
1417     }
1418 
1419     if (local_err) {
1420         multifd_recv_terminate_threads(local_err);
1421         error_free(local_err);
1422     }
1423 
1424     rcu_unregister_thread();
1425     trace_multifd_recv_thread_end(p->id, p->packets_recved);
1426 
1427     return NULL;
1428 }
1429 
1430 int multifd_recv_setup(Error **errp)
1431 {
1432     int thread_count;
1433     uint32_t page_count = multifd_ram_page_count();
1434     bool use_packets = multifd_use_packets();
1435     uint8_t i;
1436 
1437     /*
1438      * Return successfully if multiFD recv state is already initialised
1439      * or multiFD is not enabled.
1440      */
1441     if (multifd_recv_state || !migrate_multifd()) {
1442         return 0;
1443     }
1444 
1445     thread_count = migrate_multifd_channels();
1446     multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state));
1447     multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count);
1448 
1449     multifd_recv_state->data = g_new0(MultiFDRecvData, 1);
1450     multifd_recv_state->data->size = 0;
1451 
1452     qatomic_set(&multifd_recv_state->count, 0);
1453     qatomic_set(&multifd_recv_state->exiting, 0);
1454     qemu_sem_init(&multifd_recv_state->sem_sync, 0);
1455     multifd_recv_state->ops = multifd_ops[migrate_multifd_compression()];
1456 
1457     for (i = 0; i < thread_count; i++) {
1458         MultiFDRecvParams *p = &multifd_recv_state->params[i];
1459 
1460         qemu_mutex_init(&p->mutex);
1461         qemu_sem_init(&p->sem_sync, 0);
1462         qemu_sem_init(&p->sem, 0);
1463         p->pending_job = false;
1464         p->id = i;
1465 
1466         p->data = g_new0(MultiFDRecvData, 1);
1467         p->data->size = 0;
1468 
1469         if (use_packets) {
1470             p->packet_len = sizeof(MultiFDPacket_t)
1471                 + sizeof(uint64_t) * page_count;
1472             p->packet = g_malloc0(p->packet_len);
1473             p->packet_dev_state = g_malloc0(sizeof(*p->packet_dev_state));
1474         }
1475         p->name = g_strdup_printf(MIGRATION_THREAD_DST_MULTIFD, i);
1476         p->normal = g_new0(ram_addr_t, page_count);
1477         p->zero = g_new0(ram_addr_t, page_count);
1478     }
1479 
1480     for (i = 0; i < thread_count; i++) {
1481         MultiFDRecvParams *p = &multifd_recv_state->params[i];
1482         int ret;
1483 
1484         ret = multifd_recv_state->ops->recv_setup(p, errp);
1485         if (ret) {
1486             return ret;
1487         }
1488     }
1489     return 0;
1490 }
1491 
1492 bool multifd_recv_all_channels_created(void)
1493 {
1494     int thread_count = migrate_multifd_channels();
1495 
1496     if (!migrate_multifd()) {
1497         return true;
1498     }
1499 
1500     if (!multifd_recv_state) {
1501         /* Called before any connections created */
1502         return false;
1503     }
1504 
1505     return thread_count == qatomic_read(&multifd_recv_state->count);
1506 }
1507 
1508 /*
1509  * Try to receive all multifd channels to get ready for the migration.
1510  * Sets @errp when failing to receive the current channel.
1511  */
1512 void multifd_recv_new_channel(QIOChannel *ioc, Error **errp)
1513 {
1514     MultiFDRecvParams *p;
1515     Error *local_err = NULL;
1516     bool use_packets = multifd_use_packets();
1517     int id;
1518 
1519     if (use_packets) {
1520         id = multifd_recv_initial_packet(ioc, &local_err);
1521         if (id < 0) {
1522             multifd_recv_terminate_threads(local_err);
1523             error_propagate_prepend(errp, local_err,
1524                                     "failed to receive packet"
1525                                     " via multifd channel %d: ",
1526                                     qatomic_read(&multifd_recv_state->count));
1527             return;
1528         }
1529         trace_multifd_recv_new_channel(id);
1530     } else {
1531         id = qatomic_read(&multifd_recv_state->count);
1532     }
1533 
1534     p = &multifd_recv_state->params[id];
1535     if (p->c != NULL) {
1536         error_setg(&local_err, "multifd: received id '%d' already setup'",
1537                    id);
1538         multifd_recv_terminate_threads(local_err);
1539         error_propagate(errp, local_err);
1540         return;
1541     }
1542     p->c = ioc;
1543     object_ref(OBJECT(ioc));
1544 
1545     p->thread_created = true;
1546     qemu_thread_create(&p->thread, p->name, multifd_recv_thread, p,
1547                        QEMU_THREAD_JOINABLE);
1548     qatomic_inc(&multifd_recv_state->count);
1549 }
1550