xref: /qemu/migration/multifd.c (revision 6ff5da16000f908140723e164d33a0b51a6c4162)
1 /*
2  * Multifd common code
3  *
4  * Copyright (c) 2019-2020 Red Hat Inc
5  *
6  * Authors:
7  *  Juan Quintela <quintela@redhat.com>
8  *
9  * This work is licensed under the terms of the GNU GPL, version 2 or later.
10  * See the COPYING file in the top-level directory.
11  */
12 
13 #include "qemu/osdep.h"
14 #include "qemu/cutils.h"
15 #include "qemu/iov.h"
16 #include "qemu/rcu.h"
17 #include "exec/target_page.h"
18 #include "system/system.h"
19 #include "exec/ramblock.h"
20 #include "qemu/error-report.h"
21 #include "qapi/error.h"
22 #include "file.h"
23 #include "migration/misc.h"
24 #include "migration.h"
25 #include "migration-stats.h"
26 #include "savevm.h"
27 #include "socket.h"
28 #include "tls.h"
29 #include "qemu-file.h"
30 #include "trace.h"
31 #include "multifd.h"
32 #include "threadinfo.h"
33 #include "options.h"
34 #include "qemu/yank.h"
35 #include "io/channel-file.h"
36 #include "io/channel-socket.h"
37 #include "yank_functions.h"
38 
39 /* Multiple fd's */
40 
41 #define MULTIFD_MAGIC 0x11223344U
42 #define MULTIFD_VERSION 1
43 
44 typedef struct {
45     uint32_t magic;
46     uint32_t version;
47     unsigned char uuid[16]; /* QemuUUID */
48     uint8_t id;
49     uint8_t unused1[7];     /* Reserved for future use */
50     uint64_t unused2[4];    /* Reserved for future use */
51 } __attribute__((packed)) MultiFDInit_t;
52 
53 struct {
54     MultiFDSendParams *params;
55 
56     /* multifd_send() body is not thread safe, needs serialization */
57     QemuMutex multifd_send_mutex;
58 
59     /*
60      * Global number of generated multifd packets.
61      *
62      * Note that we used 'uintptr_t' because it'll naturally support atomic
63      * operations on both 32bit / 64 bits hosts.  It means on 32bit systems
64      * multifd will overflow the packet_num easier, but that should be
65      * fine.
66      *
67      * Another option is to use QEMU's Stat64 then it'll be 64 bits on all
68      * hosts, however so far it does not support atomic fetch_add() yet.
69      * Make it easy for now.
70      */
71     uintptr_t packet_num;
72     /*
73      * Synchronization point past which no more channels will be
74      * created.
75      */
76     QemuSemaphore channels_created;
77     /* send channels ready */
78     QemuSemaphore channels_ready;
79     /*
80      * Have we already run terminate threads.  There is a race when it
81      * happens that we got one error while we are exiting.
82      * We will use atomic operations.  Only valid values are 0 and 1.
83      */
84     int exiting;
85     /* multifd ops */
86     const MultiFDMethods *ops;
87 } *multifd_send_state;
88 
89 struct {
90     MultiFDRecvParams *params;
91     MultiFDRecvData *data;
92     /* number of created threads */
93     int count;
94     /*
95      * This is always posted by the recv threads, the migration thread
96      * uses it to wait for recv threads to finish assigned tasks.
97      */
98     QemuSemaphore sem_sync;
99     /* global number of generated multifd packets */
100     uint64_t packet_num;
101     int exiting;
102     /* multifd ops */
103     const MultiFDMethods *ops;
104 } *multifd_recv_state;
105 
106 MultiFDSendData *multifd_send_data_alloc(void)
107 {
108     MultiFDSendData *new = g_new0(MultiFDSendData, 1);
109 
110     multifd_ram_payload_alloc(&new->u.ram);
111     /* Device state allocates its payload on-demand */
112 
113     return new;
114 }
115 
116 void multifd_send_data_clear(MultiFDSendData *data)
117 {
118     if (multifd_payload_empty(data)) {
119         return;
120     }
121 
122     switch (data->type) {
123     case MULTIFD_PAYLOAD_DEVICE_STATE:
124         multifd_send_data_clear_device_state(&data->u.device_state);
125         break;
126     default:
127         /* Nothing to do */
128         break;
129     }
130 
131     data->type = MULTIFD_PAYLOAD_NONE;
132 }
133 
134 void multifd_send_data_free(MultiFDSendData *data)
135 {
136     if (!data) {
137         return;
138     }
139 
140     /* This also free's device state payload */
141     multifd_send_data_clear(data);
142 
143     multifd_ram_payload_free(&data->u.ram);
144 
145     g_free(data);
146 }
147 
148 static bool multifd_use_packets(void)
149 {
150     return !migrate_mapped_ram();
151 }
152 
153 void multifd_send_channel_created(void)
154 {
155     qemu_sem_post(&multifd_send_state->channels_created);
156 }
157 
158 static const MultiFDMethods *multifd_ops[MULTIFD_COMPRESSION__MAX] = {};
159 
160 void multifd_register_ops(int method, const MultiFDMethods *ops)
161 {
162     assert(0 <= method && method < MULTIFD_COMPRESSION__MAX);
163     assert(!multifd_ops[method]);
164     multifd_ops[method] = ops;
165 }
166 
167 static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp)
168 {
169     MultiFDInit_t msg = {};
170     size_t size = sizeof(msg);
171     int ret;
172 
173     msg.magic = cpu_to_be32(MULTIFD_MAGIC);
174     msg.version = cpu_to_be32(MULTIFD_VERSION);
175     msg.id = p->id;
176     memcpy(msg.uuid, &qemu_uuid.data, sizeof(msg.uuid));
177 
178     ret = qio_channel_write_all(p->c, (char *)&msg, size, errp);
179     if (ret != 0) {
180         return -1;
181     }
182     stat64_add(&mig_stats.multifd_bytes, size);
183     return 0;
184 }
185 
186 static int multifd_recv_initial_packet(QIOChannel *c, Error **errp)
187 {
188     MultiFDInit_t msg;
189     int ret;
190 
191     ret = qio_channel_read_all(c, (char *)&msg, sizeof(msg), errp);
192     if (ret != 0) {
193         return -1;
194     }
195 
196     msg.magic = be32_to_cpu(msg.magic);
197     msg.version = be32_to_cpu(msg.version);
198 
199     if (msg.magic != MULTIFD_MAGIC) {
200         error_setg(errp, "multifd: received packet magic %x "
201                    "expected %x", msg.magic, MULTIFD_MAGIC);
202         return -1;
203     }
204 
205     if (msg.version != MULTIFD_VERSION) {
206         error_setg(errp, "multifd: received packet version %u "
207                    "expected %u", msg.version, MULTIFD_VERSION);
208         return -1;
209     }
210 
211     if (memcmp(msg.uuid, &qemu_uuid, sizeof(qemu_uuid))) {
212         char *uuid = qemu_uuid_unparse_strdup(&qemu_uuid);
213         char *msg_uuid = qemu_uuid_unparse_strdup((const QemuUUID *)msg.uuid);
214 
215         error_setg(errp, "multifd: received uuid '%s' and expected "
216                    "uuid '%s' for channel %hhd", msg_uuid, uuid, msg.id);
217         g_free(uuid);
218         g_free(msg_uuid);
219         return -1;
220     }
221 
222     if (msg.id > migrate_multifd_channels()) {
223         error_setg(errp, "multifd: received channel id %u is greater than "
224                    "number of channels %u", msg.id, migrate_multifd_channels());
225         return -1;
226     }
227 
228     return msg.id;
229 }
230 
231 /* Fills a RAM multifd packet */
232 void multifd_send_fill_packet(MultiFDSendParams *p)
233 {
234     MultiFDPacket_t *packet = p->packet;
235     uint64_t packet_num;
236     bool sync_packet = p->flags & MULTIFD_FLAG_SYNC;
237 
238     memset(packet, 0, p->packet_len);
239 
240     packet->hdr.magic = cpu_to_be32(MULTIFD_MAGIC);
241     packet->hdr.version = cpu_to_be32(MULTIFD_VERSION);
242 
243     packet->hdr.flags = cpu_to_be32(p->flags);
244     packet->next_packet_size = cpu_to_be32(p->next_packet_size);
245 
246     packet_num = qatomic_fetch_inc(&multifd_send_state->packet_num);
247     packet->packet_num = cpu_to_be64(packet_num);
248 
249     p->packets_sent++;
250 
251     if (!sync_packet) {
252         multifd_ram_fill_packet(p);
253     }
254 
255     trace_multifd_send_fill(p->id, packet_num,
256                             p->flags, p->next_packet_size);
257 }
258 
259 static int multifd_recv_unfill_packet_header(MultiFDRecvParams *p,
260                                              const MultiFDPacketHdr_t *hdr,
261                                              Error **errp)
262 {
263     uint32_t magic = be32_to_cpu(hdr->magic);
264     uint32_t version = be32_to_cpu(hdr->version);
265 
266     if (magic != MULTIFD_MAGIC) {
267         error_setg(errp, "multifd: received packet magic %x, expected %x",
268                    magic, MULTIFD_MAGIC);
269         return -1;
270     }
271 
272     if (version != MULTIFD_VERSION) {
273         error_setg(errp, "multifd: received packet version %u, expected %u",
274                    version, MULTIFD_VERSION);
275         return -1;
276     }
277 
278     p->flags = be32_to_cpu(hdr->flags);
279 
280     return 0;
281 }
282 
283 static int multifd_recv_unfill_packet_device_state(MultiFDRecvParams *p,
284                                                    Error **errp)
285 {
286     MultiFDPacketDeviceState_t *packet = p->packet_dev_state;
287 
288     packet->instance_id = be32_to_cpu(packet->instance_id);
289     p->next_packet_size = be32_to_cpu(packet->next_packet_size);
290 
291     return 0;
292 }
293 
294 static int multifd_recv_unfill_packet_ram(MultiFDRecvParams *p, Error **errp)
295 {
296     const MultiFDPacket_t *packet = p->packet;
297     int ret = 0;
298 
299     p->next_packet_size = be32_to_cpu(packet->next_packet_size);
300     p->packet_num = be64_to_cpu(packet->packet_num);
301 
302     /* Always unfill, old QEMUs (<9.0) send data along with SYNC */
303     ret = multifd_ram_unfill_packet(p, errp);
304 
305     trace_multifd_recv_unfill(p->id, p->packet_num, p->flags,
306                               p->next_packet_size);
307 
308     return ret;
309 }
310 
311 static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp)
312 {
313     p->packets_recved++;
314 
315     if (p->flags & MULTIFD_FLAG_DEVICE_STATE) {
316         return multifd_recv_unfill_packet_device_state(p, errp);
317     }
318 
319     return multifd_recv_unfill_packet_ram(p, errp);
320 }
321 
322 static bool multifd_send_should_exit(void)
323 {
324     return qatomic_read(&multifd_send_state->exiting);
325 }
326 
327 static bool multifd_recv_should_exit(void)
328 {
329     return qatomic_read(&multifd_recv_state->exiting);
330 }
331 
332 /*
333  * The migration thread can wait on either of the two semaphores.  This
334  * function can be used to kick the main thread out of waiting on either of
335  * them.  Should mostly only be called when something wrong happened with
336  * the current multifd send thread.
337  */
338 static void multifd_send_kick_main(MultiFDSendParams *p)
339 {
340     qemu_sem_post(&p->sem_sync);
341     qemu_sem_post(&multifd_send_state->channels_ready);
342 }
343 
344 /*
345  * multifd_send() works by exchanging the MultiFDSendData object
346  * provided by the caller with an unused MultiFDSendData object from
347  * the next channel that is found to be idle.
348  *
349  * The channel owns the data until it finishes transmitting and the
350  * caller owns the empty object until it fills it with data and calls
351  * this function again. No locking necessary.
352  *
353  * Switching is safe because both the migration thread and the channel
354  * thread have barriers in place to serialize access.
355  *
356  * Returns true if succeed, false otherwise.
357  */
358 bool multifd_send(MultiFDSendData **send_data)
359 {
360     int i;
361     static int next_channel;
362     MultiFDSendParams *p = NULL; /* make happy gcc */
363     MultiFDSendData *tmp;
364 
365     if (multifd_send_should_exit()) {
366         return false;
367     }
368 
369     QEMU_LOCK_GUARD(&multifd_send_state->multifd_send_mutex);
370 
371     /* We wait here, until at least one channel is ready */
372     qemu_sem_wait(&multifd_send_state->channels_ready);
373 
374     /*
375      * next_channel can remain from a previous migration that was
376      * using more channels, so ensure it doesn't overflow if the
377      * limit is lower now.
378      */
379     next_channel %= migrate_multifd_channels();
380     for (i = next_channel;; i = (i + 1) % migrate_multifd_channels()) {
381         if (multifd_send_should_exit()) {
382             return false;
383         }
384         p = &multifd_send_state->params[i];
385         /*
386          * Lockless read to p->pending_job is safe, because only multifd
387          * sender thread can clear it.
388          */
389         if (qatomic_read(&p->pending_job) == false) {
390             next_channel = (i + 1) % migrate_multifd_channels();
391             break;
392         }
393     }
394 
395     /*
396      * Make sure we read p->pending_job before all the rest.  Pairs with
397      * qatomic_store_release() in multifd_send_thread().
398      */
399     smp_mb_acquire();
400 
401     assert(multifd_payload_empty(p->data));
402 
403     /*
404      * Swap the pointers. The channel gets the client data for
405      * transferring and the client gets back an unused data slot.
406      */
407     tmp = *send_data;
408     *send_data = p->data;
409     p->data = tmp;
410 
411     /*
412      * Making sure p->data is setup before marking pending_job=true. Pairs
413      * with the qatomic_load_acquire() in multifd_send_thread().
414      */
415     qatomic_store_release(&p->pending_job, true);
416     qemu_sem_post(&p->sem);
417 
418     return true;
419 }
420 
421 /* Multifd send side hit an error; remember it and prepare to quit */
422 static void multifd_send_set_error(Error *err)
423 {
424     /*
425      * We don't want to exit each threads twice.  Depending on where
426      * we get the error, or if there are two independent errors in two
427      * threads at the same time, we can end calling this function
428      * twice.
429      */
430     if (qatomic_xchg(&multifd_send_state->exiting, 1)) {
431         return;
432     }
433 
434     if (err) {
435         MigrationState *s = migrate_get_current();
436         migrate_set_error(s, err);
437         if (s->state == MIGRATION_STATUS_SETUP ||
438             s->state == MIGRATION_STATUS_PRE_SWITCHOVER ||
439             s->state == MIGRATION_STATUS_DEVICE ||
440             s->state == MIGRATION_STATUS_ACTIVE) {
441             migrate_set_state(&s->state, s->state,
442                               MIGRATION_STATUS_FAILED);
443         }
444     }
445 }
446 
447 static void multifd_send_terminate_threads(void)
448 {
449     int i;
450 
451     trace_multifd_send_terminate_threads();
452 
453     /*
454      * Tell everyone we're quitting.  No xchg() needed here; we simply
455      * always set it.
456      */
457     qatomic_set(&multifd_send_state->exiting, 1);
458 
459     /*
460      * Firstly, kick all threads out; no matter whether they are just idle,
461      * or blocked in an IO system call.
462      */
463     for (i = 0; i < migrate_multifd_channels(); i++) {
464         MultiFDSendParams *p = &multifd_send_state->params[i];
465 
466         qemu_sem_post(&p->sem);
467         if (p->c) {
468             qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
469         }
470     }
471 
472     /*
473      * Finally recycle all the threads.
474      */
475     for (i = 0; i < migrate_multifd_channels(); i++) {
476         MultiFDSendParams *p = &multifd_send_state->params[i];
477 
478         if (p->tls_thread_created) {
479             qemu_thread_join(&p->tls_thread);
480         }
481 
482         if (p->thread_created) {
483             qemu_thread_join(&p->thread);
484         }
485     }
486 }
487 
488 static bool multifd_send_cleanup_channel(MultiFDSendParams *p, Error **errp)
489 {
490     if (p->c) {
491         migration_ioc_unregister_yank(p->c);
492         /*
493          * The object_unref() cannot guarantee the fd will always be
494          * released because finalize() of the iochannel is only
495          * triggered on the last reference and it's not guaranteed
496          * that we always hold the last refcount when reaching here.
497          *
498          * Closing the fd explicitly has the benefit that if there is any
499          * registered I/O handler callbacks on such fd, that will get a
500          * POLLNVAL event and will further trigger the cleanup to finally
501          * release the IOC.
502          *
503          * FIXME: It should logically be guaranteed that all multifd
504          * channels have no I/O handler callback registered when reaching
505          * here, because migration thread will wait for all multifd channel
506          * establishments to complete during setup.  Since
507          * migration_cleanup() will be scheduled in main thread too, all
508          * previous callbacks should guarantee to be completed when
509          * reaching here.  See multifd_send_state.channels_created and its
510          * usage.  In the future, we could replace this with an assert
511          * making sure we're the last reference, or simply drop it if above
512          * is more clear to be justified.
513          */
514         qio_channel_close(p->c, &error_abort);
515         object_unref(OBJECT(p->c));
516         p->c = NULL;
517     }
518     qemu_sem_destroy(&p->sem);
519     qemu_sem_destroy(&p->sem_sync);
520     g_free(p->name);
521     p->name = NULL;
522     g_clear_pointer(&p->data, multifd_send_data_free);
523     p->packet_len = 0;
524     g_clear_pointer(&p->packet_device_state, g_free);
525     g_free(p->packet);
526     p->packet = NULL;
527     multifd_send_state->ops->send_cleanup(p, errp);
528     assert(!p->iov);
529 
530     return *errp == NULL;
531 }
532 
533 static void multifd_send_cleanup_state(void)
534 {
535     file_cleanup_outgoing_migration();
536     socket_cleanup_outgoing_migration();
537     multifd_device_state_send_cleanup();
538     qemu_sem_destroy(&multifd_send_state->channels_created);
539     qemu_sem_destroy(&multifd_send_state->channels_ready);
540     qemu_mutex_destroy(&multifd_send_state->multifd_send_mutex);
541     g_free(multifd_send_state->params);
542     multifd_send_state->params = NULL;
543     g_free(multifd_send_state);
544     multifd_send_state = NULL;
545 }
546 
547 void multifd_send_shutdown(void)
548 {
549     int i;
550 
551     if (!migrate_multifd()) {
552         return;
553     }
554 
555     for (i = 0; i < migrate_multifd_channels(); i++) {
556         MultiFDSendParams *p = &multifd_send_state->params[i];
557 
558         /* thread_created implies the TLS handshake has succeeded */
559         if (p->tls_thread_created && p->thread_created) {
560             Error *local_err = NULL;
561             /*
562              * The destination expects the TLS session to always be
563              * properly terminated. This helps to detect a premature
564              * termination in the middle of the stream.  Note that
565              * older QEMUs always break the connection on the source
566              * and the destination always sees
567              * GNUTLS_E_PREMATURE_TERMINATION.
568              */
569             migration_tls_channel_end(p->c, &local_err);
570 
571             /*
572              * The above can return an error in case the migration has
573              * already failed. If the migration succeeded, errors are
574              * not expected but there's no need to kill the source.
575              */
576             if (local_err && !migration_has_failed(migrate_get_current())) {
577                 warn_report(
578                     "multifd_send_%d: Failed to terminate TLS connection: %s",
579                     p->id, error_get_pretty(local_err));
580                 break;
581             }
582         }
583     }
584 
585     multifd_send_terminate_threads();
586 
587     for (i = 0; i < migrate_multifd_channels(); i++) {
588         MultiFDSendParams *p = &multifd_send_state->params[i];
589         Error *local_err = NULL;
590 
591         if (!multifd_send_cleanup_channel(p, &local_err)) {
592             migrate_set_error(migrate_get_current(), local_err);
593             error_free(local_err);
594         }
595     }
596 
597     multifd_send_cleanup_state();
598 }
599 
600 static int multifd_zero_copy_flush(QIOChannel *c)
601 {
602     int ret;
603     Error *err = NULL;
604 
605     ret = qio_channel_flush(c, &err);
606     if (ret < 0) {
607         error_report_err(err);
608         return -1;
609     }
610     if (ret == 1) {
611         stat64_add(&mig_stats.dirty_sync_missed_zero_copy, 1);
612     }
613 
614     return ret;
615 }
616 
617 int multifd_send_sync_main(MultiFDSyncReq req)
618 {
619     int i;
620     bool flush_zero_copy;
621 
622     assert(req != MULTIFD_SYNC_NONE);
623 
624     flush_zero_copy = migrate_zero_copy_send();
625 
626     for (i = 0; i < migrate_multifd_channels(); i++) {
627         MultiFDSendParams *p = &multifd_send_state->params[i];
628 
629         if (multifd_send_should_exit()) {
630             return -1;
631         }
632 
633         trace_multifd_send_sync_main_signal(p->id);
634 
635         /*
636          * We should be the only user so far, so not possible to be set by
637          * others concurrently.
638          */
639         assert(qatomic_read(&p->pending_sync) == MULTIFD_SYNC_NONE);
640         qatomic_set(&p->pending_sync, req);
641         qemu_sem_post(&p->sem);
642     }
643     for (i = 0; i < migrate_multifd_channels(); i++) {
644         MultiFDSendParams *p = &multifd_send_state->params[i];
645 
646         if (multifd_send_should_exit()) {
647             return -1;
648         }
649 
650         qemu_sem_wait(&multifd_send_state->channels_ready);
651         trace_multifd_send_sync_main_wait(p->id);
652         qemu_sem_wait(&p->sem_sync);
653 
654         if (flush_zero_copy && p->c && (multifd_zero_copy_flush(p->c) < 0)) {
655             return -1;
656         }
657     }
658     trace_multifd_send_sync_main(multifd_send_state->packet_num);
659 
660     return 0;
661 }
662 
663 static void *multifd_send_thread(void *opaque)
664 {
665     MultiFDSendParams *p = opaque;
666     MigrationThread *thread = NULL;
667     Error *local_err = NULL;
668     int ret = 0;
669     bool use_packets = multifd_use_packets();
670 
671     thread = migration_threads_add(p->name, qemu_get_thread_id());
672 
673     trace_multifd_send_thread_start(p->id);
674     rcu_register_thread();
675 
676     if (use_packets) {
677         if (multifd_send_initial_packet(p, &local_err) < 0) {
678             ret = -1;
679             goto out;
680         }
681     }
682 
683     while (true) {
684         qemu_sem_post(&multifd_send_state->channels_ready);
685         qemu_sem_wait(&p->sem);
686 
687         if (multifd_send_should_exit()) {
688             break;
689         }
690 
691         /*
692          * Read pending_job flag before p->data.  Pairs with the
693          * qatomic_store_release() in multifd_send().
694          */
695         if (qatomic_load_acquire(&p->pending_job)) {
696             bool is_device_state = multifd_payload_device_state(p->data);
697             size_t total_size;
698 
699             p->flags = 0;
700             p->iovs_num = 0;
701             assert(!multifd_payload_empty(p->data));
702 
703             if (is_device_state) {
704                 multifd_device_state_send_prepare(p);
705             } else {
706                 ret = multifd_send_state->ops->send_prepare(p, &local_err);
707                 if (ret != 0) {
708                     break;
709                 }
710             }
711 
712             /*
713              * The packet header in the zerocopy RAM case is accounted for
714              * in multifd_nocomp_send_prepare() - where it is actually
715              * being sent.
716              */
717             total_size = iov_size(p->iov, p->iovs_num);
718 
719             if (migrate_mapped_ram()) {
720                 assert(!is_device_state);
721 
722                 ret = file_write_ramblock_iov(p->c, p->iov, p->iovs_num,
723                                               &p->data->u.ram, &local_err);
724             } else {
725                 ret = qio_channel_writev_full_all(p->c, p->iov, p->iovs_num,
726                                                   NULL, 0, p->write_flags,
727                                                   &local_err);
728             }
729 
730             if (ret != 0) {
731                 break;
732             }
733 
734             stat64_add(&mig_stats.multifd_bytes, total_size);
735 
736             p->next_packet_size = 0;
737             multifd_send_data_clear(p->data);
738 
739             /*
740              * Making sure p->data is published before saying "we're
741              * free".  Pairs with the smp_mb_acquire() in
742              * multifd_send().
743              */
744             qatomic_store_release(&p->pending_job, false);
745         } else {
746             MultiFDSyncReq req = qatomic_read(&p->pending_sync);
747 
748             /*
749              * If not a normal job, must be a sync request.  Note that
750              * pending_sync is a standalone flag (unlike pending_job), so
751              * it doesn't require explicit memory barriers.
752              */
753             assert(req != MULTIFD_SYNC_NONE);
754 
755             /* Only push the SYNC message if it involves a remote sync */
756             if (req == MULTIFD_SYNC_ALL) {
757                 p->flags = MULTIFD_FLAG_SYNC;
758                 multifd_send_fill_packet(p);
759                 ret = qio_channel_write_all(p->c, (void *)p->packet,
760                                             p->packet_len, &local_err);
761                 if (ret != 0) {
762                     break;
763                 }
764                 /* p->next_packet_size will always be zero for a SYNC packet */
765                 stat64_add(&mig_stats.multifd_bytes, p->packet_len);
766             }
767 
768             qatomic_set(&p->pending_sync, MULTIFD_SYNC_NONE);
769             qemu_sem_post(&p->sem_sync);
770         }
771     }
772 
773 out:
774     if (ret) {
775         assert(local_err);
776         trace_multifd_send_error(p->id);
777         multifd_send_set_error(local_err);
778         multifd_send_kick_main(p);
779         error_free(local_err);
780     }
781 
782     rcu_unregister_thread();
783     migration_threads_remove(thread);
784     trace_multifd_send_thread_end(p->id, p->packets_sent);
785 
786     return NULL;
787 }
788 
789 static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque);
790 
791 typedef struct {
792     MultiFDSendParams *p;
793     QIOChannelTLS *tioc;
794 } MultiFDTLSThreadArgs;
795 
796 static void *multifd_tls_handshake_thread(void *opaque)
797 {
798     MultiFDTLSThreadArgs *args = opaque;
799 
800     qio_channel_tls_handshake(args->tioc,
801                               multifd_new_send_channel_async,
802                               args->p,
803                               NULL,
804                               NULL);
805     g_free(args);
806 
807     return NULL;
808 }
809 
810 static bool multifd_tls_channel_connect(MultiFDSendParams *p,
811                                         QIOChannel *ioc,
812                                         Error **errp)
813 {
814     MigrationState *s = migrate_get_current();
815     const char *hostname = s->hostname;
816     MultiFDTLSThreadArgs *args;
817     QIOChannelTLS *tioc;
818 
819     tioc = migration_tls_client_create(ioc, hostname, errp);
820     if (!tioc) {
821         return false;
822     }
823 
824     /*
825      * Ownership of the socket channel now transfers to the newly
826      * created TLS channel, which has already taken a reference.
827      */
828     object_unref(OBJECT(ioc));
829     trace_multifd_tls_outgoing_handshake_start(ioc, tioc, hostname);
830     qio_channel_set_name(QIO_CHANNEL(tioc), "multifd-tls-outgoing");
831 
832     args = g_new0(MultiFDTLSThreadArgs, 1);
833     args->tioc = tioc;
834     args->p = p;
835 
836     p->tls_thread_created = true;
837     qemu_thread_create(&p->tls_thread, MIGRATION_THREAD_SRC_TLS,
838                        multifd_tls_handshake_thread, args,
839                        QEMU_THREAD_JOINABLE);
840     return true;
841 }
842 
843 void multifd_channel_connect(MultiFDSendParams *p, QIOChannel *ioc)
844 {
845     qio_channel_set_delay(ioc, false);
846 
847     migration_ioc_register_yank(ioc);
848     /* Setup p->c only if the channel is completely setup */
849     p->c = ioc;
850 
851     p->thread_created = true;
852     qemu_thread_create(&p->thread, p->name, multifd_send_thread, p,
853                        QEMU_THREAD_JOINABLE);
854 }
855 
856 /*
857  * When TLS is enabled this function is called once to establish the
858  * TLS connection and a second time after the TLS handshake to create
859  * the multifd channel. Without TLS it goes straight into the channel
860  * creation.
861  */
862 static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque)
863 {
864     MultiFDSendParams *p = opaque;
865     QIOChannel *ioc = QIO_CHANNEL(qio_task_get_source(task));
866     Error *local_err = NULL;
867     bool ret;
868 
869     trace_multifd_new_send_channel_async(p->id);
870 
871     if (qio_task_propagate_error(task, &local_err)) {
872         ret = false;
873         goto out;
874     }
875 
876     trace_multifd_set_outgoing_channel(ioc, object_get_typename(OBJECT(ioc)),
877                                        migrate_get_current()->hostname);
878 
879     if (migrate_channel_requires_tls_upgrade(ioc)) {
880         ret = multifd_tls_channel_connect(p, ioc, &local_err);
881         if (ret) {
882             return;
883         }
884     } else {
885         multifd_channel_connect(p, ioc);
886         ret = true;
887     }
888 
889 out:
890     /*
891      * Here we're not interested whether creation succeeded, only that
892      * it happened at all.
893      */
894     multifd_send_channel_created();
895 
896     if (ret) {
897         return;
898     }
899 
900     trace_multifd_new_send_channel_async_error(p->id, local_err);
901     multifd_send_set_error(local_err);
902     /*
903      * For error cases (TLS or non-TLS), IO channel is always freed here
904      * rather than when cleanup multifd: since p->c is not set, multifd
905      * cleanup code doesn't even know its existence.
906      */
907     object_unref(OBJECT(ioc));
908     error_free(local_err);
909 }
910 
911 static bool multifd_new_send_channel_create(gpointer opaque, Error **errp)
912 {
913     if (!multifd_use_packets()) {
914         return file_send_channel_create(opaque, errp);
915     }
916 
917     socket_send_channel_create(multifd_new_send_channel_async, opaque);
918     return true;
919 }
920 
921 bool multifd_send_setup(void)
922 {
923     MigrationState *s = migrate_get_current();
924     int thread_count, ret = 0;
925     uint32_t page_count = multifd_ram_page_count();
926     bool use_packets = multifd_use_packets();
927     uint8_t i;
928 
929     if (!migrate_multifd()) {
930         return true;
931     }
932 
933     thread_count = migrate_multifd_channels();
934     multifd_send_state = g_malloc0(sizeof(*multifd_send_state));
935     multifd_send_state->params = g_new0(MultiFDSendParams, thread_count);
936     qemu_mutex_init(&multifd_send_state->multifd_send_mutex);
937     qemu_sem_init(&multifd_send_state->channels_created, 0);
938     qemu_sem_init(&multifd_send_state->channels_ready, 0);
939     qatomic_set(&multifd_send_state->exiting, 0);
940     multifd_send_state->ops = multifd_ops[migrate_multifd_compression()];
941 
942     for (i = 0; i < thread_count; i++) {
943         MultiFDSendParams *p = &multifd_send_state->params[i];
944         Error *local_err = NULL;
945 
946         qemu_sem_init(&p->sem, 0);
947         qemu_sem_init(&p->sem_sync, 0);
948         p->id = i;
949         p->data = multifd_send_data_alloc();
950 
951         if (use_packets) {
952             p->packet_len = sizeof(MultiFDPacket_t)
953                           + sizeof(uint64_t) * page_count;
954             p->packet = g_malloc0(p->packet_len);
955             p->packet_device_state = g_malloc0(sizeof(*p->packet_device_state));
956             p->packet_device_state->hdr.magic = cpu_to_be32(MULTIFD_MAGIC);
957             p->packet_device_state->hdr.version = cpu_to_be32(MULTIFD_VERSION);
958         }
959         p->name = g_strdup_printf(MIGRATION_THREAD_SRC_MULTIFD, i);
960         p->write_flags = 0;
961 
962         if (!multifd_new_send_channel_create(p, &local_err)) {
963             migrate_set_error(s, local_err);
964             ret = -1;
965         }
966     }
967 
968     /*
969      * Wait until channel creation has started for all channels. The
970      * creation can still fail, but no more channels will be created
971      * past this point.
972      */
973     for (i = 0; i < thread_count; i++) {
974         qemu_sem_wait(&multifd_send_state->channels_created);
975     }
976 
977     if (ret) {
978         goto err;
979     }
980 
981     for (i = 0; i < thread_count; i++) {
982         MultiFDSendParams *p = &multifd_send_state->params[i];
983         Error *local_err = NULL;
984 
985         ret = multifd_send_state->ops->send_setup(p, &local_err);
986         if (ret) {
987             migrate_set_error(s, local_err);
988             goto err;
989         }
990         assert(p->iov);
991     }
992 
993     multifd_device_state_send_setup();
994 
995     return true;
996 
997 err:
998     migrate_set_state(&s->state, MIGRATION_STATUS_SETUP,
999                       MIGRATION_STATUS_FAILED);
1000     return false;
1001 }
1002 
1003 bool multifd_recv(void)
1004 {
1005     int i;
1006     static int next_recv_channel;
1007     MultiFDRecvParams *p = NULL;
1008     MultiFDRecvData *data = multifd_recv_state->data;
1009 
1010     /*
1011      * next_channel can remain from a previous migration that was
1012      * using more channels, so ensure it doesn't overflow if the
1013      * limit is lower now.
1014      */
1015     next_recv_channel %= migrate_multifd_channels();
1016     for (i = next_recv_channel;; i = (i + 1) % migrate_multifd_channels()) {
1017         if (multifd_recv_should_exit()) {
1018             return false;
1019         }
1020 
1021         p = &multifd_recv_state->params[i];
1022 
1023         if (qatomic_read(&p->pending_job) == false) {
1024             next_recv_channel = (i + 1) % migrate_multifd_channels();
1025             break;
1026         }
1027     }
1028 
1029     /*
1030      * Order pending_job read before manipulating p->data below. Pairs
1031      * with qatomic_store_release() at multifd_recv_thread().
1032      */
1033     smp_mb_acquire();
1034 
1035     assert(!p->data->size);
1036     multifd_recv_state->data = p->data;
1037     p->data = data;
1038 
1039     /*
1040      * Order p->data update before setting pending_job. Pairs with
1041      * qatomic_load_acquire() at multifd_recv_thread().
1042      */
1043     qatomic_store_release(&p->pending_job, true);
1044     qemu_sem_post(&p->sem);
1045 
1046     return true;
1047 }
1048 
1049 MultiFDRecvData *multifd_get_recv_data(void)
1050 {
1051     return multifd_recv_state->data;
1052 }
1053 
1054 static void multifd_recv_terminate_threads(Error *err)
1055 {
1056     int i;
1057 
1058     trace_multifd_recv_terminate_threads(err != NULL);
1059 
1060     if (qatomic_xchg(&multifd_recv_state->exiting, 1)) {
1061         return;
1062     }
1063 
1064     if (err) {
1065         MigrationState *s = migrate_get_current();
1066         migrate_set_error(s, err);
1067         if (s->state == MIGRATION_STATUS_SETUP ||
1068             s->state == MIGRATION_STATUS_ACTIVE) {
1069             migrate_set_state(&s->state, s->state,
1070                               MIGRATION_STATUS_FAILED);
1071         }
1072     }
1073 
1074     for (i = 0; i < migrate_multifd_channels(); i++) {
1075         MultiFDRecvParams *p = &multifd_recv_state->params[i];
1076 
1077         /*
1078          * The migration thread and channels interact differently
1079          * depending on the presence of packets.
1080          */
1081         if (multifd_use_packets()) {
1082             /*
1083              * The channel receives as long as there are packets. When
1084              * packets end (i.e. MULTIFD_FLAG_SYNC is reached), the
1085              * channel waits for the migration thread to sync. If the
1086              * sync never happens, do it here.
1087              */
1088             qemu_sem_post(&p->sem_sync);
1089         } else {
1090             /*
1091              * The channel waits for the migration thread to give it
1092              * work. When the migration thread runs out of work, it
1093              * releases the channel and waits for any pending work to
1094              * finish. If we reach here (e.g. due to error) before the
1095              * work runs out, release the channel.
1096              */
1097             qemu_sem_post(&p->sem);
1098         }
1099 
1100         /*
1101          * We could arrive here for two reasons:
1102          *  - normal quit, i.e. everything went fine, just finished
1103          *  - error quit: We close the channels so the channel threads
1104          *    finish the qio_channel_read_all_eof()
1105          */
1106         if (p->c) {
1107             qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
1108         }
1109     }
1110 }
1111 
1112 void multifd_recv_shutdown(void)
1113 {
1114     if (migrate_multifd()) {
1115         multifd_recv_terminate_threads(NULL);
1116     }
1117 }
1118 
1119 static void multifd_recv_cleanup_channel(MultiFDRecvParams *p)
1120 {
1121     migration_ioc_unregister_yank(p->c);
1122     object_unref(OBJECT(p->c));
1123     p->c = NULL;
1124     qemu_mutex_destroy(&p->mutex);
1125     qemu_sem_destroy(&p->sem_sync);
1126     qemu_sem_destroy(&p->sem);
1127     g_free(p->data);
1128     p->data = NULL;
1129     g_free(p->name);
1130     p->name = NULL;
1131     p->packet_len = 0;
1132     g_free(p->packet);
1133     p->packet = NULL;
1134     g_clear_pointer(&p->packet_dev_state, g_free);
1135     g_free(p->normal);
1136     p->normal = NULL;
1137     g_free(p->zero);
1138     p->zero = NULL;
1139     multifd_recv_state->ops->recv_cleanup(p);
1140 }
1141 
1142 static void multifd_recv_cleanup_state(void)
1143 {
1144     qemu_sem_destroy(&multifd_recv_state->sem_sync);
1145     g_free(multifd_recv_state->params);
1146     multifd_recv_state->params = NULL;
1147     g_free(multifd_recv_state->data);
1148     multifd_recv_state->data = NULL;
1149     g_free(multifd_recv_state);
1150     multifd_recv_state = NULL;
1151 }
1152 
1153 void multifd_recv_cleanup(void)
1154 {
1155     int i;
1156 
1157     if (!migrate_multifd()) {
1158         return;
1159     }
1160     multifd_recv_terminate_threads(NULL);
1161     for (i = 0; i < migrate_multifd_channels(); i++) {
1162         MultiFDRecvParams *p = &multifd_recv_state->params[i];
1163 
1164         if (p->thread_created) {
1165             qemu_thread_join(&p->thread);
1166         }
1167     }
1168     for (i = 0; i < migrate_multifd_channels(); i++) {
1169         multifd_recv_cleanup_channel(&multifd_recv_state->params[i]);
1170     }
1171     multifd_recv_cleanup_state();
1172 }
1173 
1174 void multifd_recv_sync_main(void)
1175 {
1176     int thread_count = migrate_multifd_channels();
1177     bool file_based = !multifd_use_packets();
1178     int i;
1179 
1180     if (!migrate_multifd()) {
1181         return;
1182     }
1183 
1184     /*
1185      * File-based channels don't use packets and therefore need to
1186      * wait for more work. Release them to start the sync.
1187      */
1188     if (file_based) {
1189         for (i = 0; i < thread_count; i++) {
1190             MultiFDRecvParams *p = &multifd_recv_state->params[i];
1191 
1192             trace_multifd_recv_sync_main_signal(p->id);
1193             qemu_sem_post(&p->sem);
1194         }
1195     }
1196 
1197     /*
1198      * Initiate the synchronization by waiting for all channels.
1199      *
1200      * For socket-based migration this means each channel has received
1201      * the SYNC packet on the stream.
1202      *
1203      * For file-based migration this means each channel is done with
1204      * the work (pending_job=false).
1205      */
1206     for (i = 0; i < thread_count; i++) {
1207         trace_multifd_recv_sync_main_wait(i);
1208         qemu_sem_wait(&multifd_recv_state->sem_sync);
1209     }
1210 
1211     if (file_based) {
1212         /*
1213          * For file-based loading is done in one iteration. We're
1214          * done.
1215          */
1216         return;
1217     }
1218 
1219     /*
1220      * Sync done. Release the channels for the next iteration.
1221      */
1222     for (i = 0; i < thread_count; i++) {
1223         MultiFDRecvParams *p = &multifd_recv_state->params[i];
1224 
1225         WITH_QEMU_LOCK_GUARD(&p->mutex) {
1226             if (multifd_recv_state->packet_num < p->packet_num) {
1227                 multifd_recv_state->packet_num = p->packet_num;
1228             }
1229         }
1230         trace_multifd_recv_sync_main_signal(p->id);
1231         qemu_sem_post(&p->sem_sync);
1232     }
1233     trace_multifd_recv_sync_main(multifd_recv_state->packet_num);
1234 }
1235 
1236 static int multifd_device_state_recv(MultiFDRecvParams *p, Error **errp)
1237 {
1238     g_autofree char *dev_state_buf = NULL;
1239     int ret;
1240 
1241     dev_state_buf = g_malloc(p->next_packet_size);
1242 
1243     ret = qio_channel_read_all(p->c, dev_state_buf, p->next_packet_size, errp);
1244     if (ret != 0) {
1245         return ret;
1246     }
1247 
1248     if (p->packet_dev_state->idstr[sizeof(p->packet_dev_state->idstr) - 1]
1249         != 0) {
1250         error_setg(errp, "unterminated multifd device state idstr");
1251         return -1;
1252     }
1253 
1254     if (!qemu_loadvm_load_state_buffer(p->packet_dev_state->idstr,
1255                                        p->packet_dev_state->instance_id,
1256                                        dev_state_buf, p->next_packet_size,
1257                                        errp)) {
1258         ret = -1;
1259     }
1260 
1261     return ret;
1262 }
1263 
1264 static void *multifd_recv_thread(void *opaque)
1265 {
1266     MigrationState *s = migrate_get_current();
1267     MultiFDRecvParams *p = opaque;
1268     Error *local_err = NULL;
1269     bool use_packets = multifd_use_packets();
1270     int ret;
1271 
1272     trace_multifd_recv_thread_start(p->id);
1273     rcu_register_thread();
1274 
1275     if (!s->multifd_clean_tls_termination) {
1276         p->read_flags = QIO_CHANNEL_READ_FLAG_RELAXED_EOF;
1277     }
1278 
1279     while (true) {
1280         MultiFDPacketHdr_t hdr;
1281         uint32_t flags = 0;
1282         bool is_device_state = false;
1283         bool has_data = false;
1284         uint8_t *pkt_buf;
1285         size_t pkt_len;
1286 
1287         p->normal_num = 0;
1288 
1289         if (use_packets) {
1290             struct iovec iov = {
1291                 .iov_base = (void *)&hdr,
1292                 .iov_len = sizeof(hdr)
1293             };
1294 
1295             if (multifd_recv_should_exit()) {
1296                 break;
1297             }
1298 
1299             ret = qio_channel_readv_full_all_eof(p->c, &iov, 1, NULL, NULL,
1300                                                  p->read_flags, &local_err);
1301             if (!ret) {
1302                 /* EOF */
1303                 assert(!local_err);
1304                 break;
1305             }
1306 
1307             if (ret == -1) {
1308                 break;
1309             }
1310 
1311             ret = multifd_recv_unfill_packet_header(p, &hdr, &local_err);
1312             if (ret) {
1313                 break;
1314             }
1315 
1316             is_device_state = p->flags & MULTIFD_FLAG_DEVICE_STATE;
1317             if (is_device_state) {
1318                 pkt_buf = (uint8_t *)p->packet_dev_state + sizeof(hdr);
1319                 pkt_len = sizeof(*p->packet_dev_state) - sizeof(hdr);
1320             } else {
1321                 pkt_buf = (uint8_t *)p->packet + sizeof(hdr);
1322                 pkt_len = p->packet_len - sizeof(hdr);
1323             }
1324 
1325             ret = qio_channel_read_all_eof(p->c, (char *)pkt_buf, pkt_len,
1326                                            &local_err);
1327             if (!ret) {
1328                 /* EOF */
1329                 error_setg(&local_err, "multifd: unexpected EOF after packet header");
1330                 break;
1331             }
1332 
1333             if (ret == -1) {
1334                 break;
1335             }
1336 
1337             qemu_mutex_lock(&p->mutex);
1338             ret = multifd_recv_unfill_packet(p, &local_err);
1339             if (ret) {
1340                 qemu_mutex_unlock(&p->mutex);
1341                 break;
1342             }
1343 
1344             flags = p->flags;
1345             /* recv methods don't know how to handle the SYNC flag */
1346             p->flags &= ~MULTIFD_FLAG_SYNC;
1347 
1348             if (is_device_state) {
1349                 has_data = p->next_packet_size > 0;
1350             } else {
1351                 /*
1352                  * Even if it's a SYNC packet, this needs to be set
1353                  * because older QEMUs (<9.0) still send data along with
1354                  * the SYNC packet.
1355                  */
1356                 has_data = p->normal_num || p->zero_num;
1357             }
1358 
1359             qemu_mutex_unlock(&p->mutex);
1360         } else {
1361             /*
1362              * No packets, so we need to wait for the vmstate code to
1363              * give us work.
1364              */
1365             qemu_sem_wait(&p->sem);
1366 
1367             if (multifd_recv_should_exit()) {
1368                 break;
1369             }
1370 
1371             /* pairs with qatomic_store_release() at multifd_recv() */
1372             if (!qatomic_load_acquire(&p->pending_job)) {
1373                 /*
1374                  * Migration thread did not send work, this is
1375                  * equivalent to pending_sync on the sending
1376                  * side. Post sem_sync to notify we reached this
1377                  * point.
1378                  */
1379                 qemu_sem_post(&multifd_recv_state->sem_sync);
1380                 continue;
1381             }
1382 
1383             has_data = !!p->data->size;
1384         }
1385 
1386         if (has_data) {
1387             if (is_device_state) {
1388                 assert(use_packets);
1389                 ret = multifd_device_state_recv(p, &local_err);
1390             } else {
1391                 ret = multifd_recv_state->ops->recv(p, &local_err);
1392             }
1393             if (ret != 0) {
1394                 break;
1395             }
1396         } else if (is_device_state) {
1397             error_setg(&local_err,
1398                        "multifd: received empty device state packet");
1399             break;
1400         }
1401 
1402         if (use_packets) {
1403             if (flags & MULTIFD_FLAG_SYNC) {
1404                 if (is_device_state) {
1405                     error_setg(&local_err,
1406                                "multifd: received SYNC device state packet");
1407                     break;
1408                 }
1409 
1410                 qemu_sem_post(&multifd_recv_state->sem_sync);
1411                 qemu_sem_wait(&p->sem_sync);
1412             }
1413         } else {
1414             p->data->size = 0;
1415             /*
1416              * Order data->size update before clearing
1417              * pending_job. Pairs with smp_mb_acquire() at
1418              * multifd_recv().
1419              */
1420             qatomic_store_release(&p->pending_job, false);
1421         }
1422     }
1423 
1424     if (local_err) {
1425         multifd_recv_terminate_threads(local_err);
1426         error_free(local_err);
1427     }
1428 
1429     rcu_unregister_thread();
1430     trace_multifd_recv_thread_end(p->id, p->packets_recved);
1431 
1432     return NULL;
1433 }
1434 
1435 int multifd_recv_setup(Error **errp)
1436 {
1437     int thread_count;
1438     uint32_t page_count = multifd_ram_page_count();
1439     bool use_packets = multifd_use_packets();
1440     uint8_t i;
1441 
1442     /*
1443      * Return successfully if multiFD recv state is already initialised
1444      * or multiFD is not enabled.
1445      */
1446     if (multifd_recv_state || !migrate_multifd()) {
1447         return 0;
1448     }
1449 
1450     thread_count = migrate_multifd_channels();
1451     multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state));
1452     multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count);
1453 
1454     multifd_recv_state->data = g_new0(MultiFDRecvData, 1);
1455     multifd_recv_state->data->size = 0;
1456 
1457     qatomic_set(&multifd_recv_state->count, 0);
1458     qatomic_set(&multifd_recv_state->exiting, 0);
1459     qemu_sem_init(&multifd_recv_state->sem_sync, 0);
1460     multifd_recv_state->ops = multifd_ops[migrate_multifd_compression()];
1461 
1462     for (i = 0; i < thread_count; i++) {
1463         MultiFDRecvParams *p = &multifd_recv_state->params[i];
1464 
1465         qemu_mutex_init(&p->mutex);
1466         qemu_sem_init(&p->sem_sync, 0);
1467         qemu_sem_init(&p->sem, 0);
1468         p->pending_job = false;
1469         p->id = i;
1470 
1471         p->data = g_new0(MultiFDRecvData, 1);
1472         p->data->size = 0;
1473 
1474         if (use_packets) {
1475             p->packet_len = sizeof(MultiFDPacket_t)
1476                 + sizeof(uint64_t) * page_count;
1477             p->packet = g_malloc0(p->packet_len);
1478             p->packet_dev_state = g_malloc0(sizeof(*p->packet_dev_state));
1479         }
1480         p->name = g_strdup_printf(MIGRATION_THREAD_DST_MULTIFD, i);
1481         p->normal = g_new0(ram_addr_t, page_count);
1482         p->zero = g_new0(ram_addr_t, page_count);
1483     }
1484 
1485     for (i = 0; i < thread_count; i++) {
1486         MultiFDRecvParams *p = &multifd_recv_state->params[i];
1487         int ret;
1488 
1489         ret = multifd_recv_state->ops->recv_setup(p, errp);
1490         if (ret) {
1491             return ret;
1492         }
1493     }
1494     return 0;
1495 }
1496 
1497 bool multifd_recv_all_channels_created(void)
1498 {
1499     int thread_count = migrate_multifd_channels();
1500 
1501     if (!migrate_multifd()) {
1502         return true;
1503     }
1504 
1505     if (!multifd_recv_state) {
1506         /* Called before any connections created */
1507         return false;
1508     }
1509 
1510     return thread_count == qatomic_read(&multifd_recv_state->count);
1511 }
1512 
1513 /*
1514  * Try to receive all multifd channels to get ready for the migration.
1515  * Sets @errp when failing to receive the current channel.
1516  */
1517 void multifd_recv_new_channel(QIOChannel *ioc, Error **errp)
1518 {
1519     MultiFDRecvParams *p;
1520     Error *local_err = NULL;
1521     bool use_packets = multifd_use_packets();
1522     int id;
1523 
1524     if (use_packets) {
1525         id = multifd_recv_initial_packet(ioc, &local_err);
1526         if (id < 0) {
1527             multifd_recv_terminate_threads(local_err);
1528             error_propagate_prepend(errp, local_err,
1529                                     "failed to receive packet"
1530                                     " via multifd channel %d: ",
1531                                     qatomic_read(&multifd_recv_state->count));
1532             return;
1533         }
1534         trace_multifd_recv_new_channel(id);
1535     } else {
1536         id = qatomic_read(&multifd_recv_state->count);
1537     }
1538 
1539     p = &multifd_recv_state->params[id];
1540     if (p->c != NULL) {
1541         error_setg(&local_err, "multifd: received id '%d' already setup'",
1542                    id);
1543         multifd_recv_terminate_threads(local_err);
1544         error_propagate(errp, local_err);
1545         return;
1546     }
1547     p->c = ioc;
1548     object_ref(OBJECT(ioc));
1549 
1550     p->thread_created = true;
1551     qemu_thread_create(&p->thread, p->name, multifd_recv_thread, p,
1552                        QEMU_THREAD_JOINABLE);
1553     qatomic_inc(&multifd_recv_state->count);
1554 }
1555