xref: /qemu/migration/multifd.c (revision c0e6b8b798bee5d8772ca8db19638ec89b47c946)
1 /*
2  * Multifd common code
3  *
4  * Copyright (c) 2019-2020 Red Hat Inc
5  *
6  * Authors:
7  *  Juan Quintela <quintela@redhat.com>
8  *
9  * This work is licensed under the terms of the GNU GPL, version 2 or later.
10  * See the COPYING file in the top-level directory.
11  */
12 
13 #include "qemu/osdep.h"
14 #include "qemu/cutils.h"
15 #include "qemu/rcu.h"
16 #include "exec/target_page.h"
17 #include "system/system.h"
18 #include "exec/ramblock.h"
19 #include "qemu/error-report.h"
20 #include "qapi/error.h"
21 #include "file.h"
22 #include "migration.h"
23 #include "migration-stats.h"
24 #include "socket.h"
25 #include "tls.h"
26 #include "qemu-file.h"
27 #include "trace.h"
28 #include "multifd.h"
29 #include "threadinfo.h"
30 #include "options.h"
31 #include "qemu/yank.h"
32 #include "io/channel-file.h"
33 #include "io/channel-socket.h"
34 #include "yank_functions.h"
35 
36 /* Multiple fd's */
37 
38 #define MULTIFD_MAGIC 0x11223344U
39 #define MULTIFD_VERSION 1
40 
41 typedef struct {
42     uint32_t magic;
43     uint32_t version;
44     unsigned char uuid[16]; /* QemuUUID */
45     uint8_t id;
46     uint8_t unused1[7];     /* Reserved for future use */
47     uint64_t unused2[4];    /* Reserved for future use */
48 } __attribute__((packed)) MultiFDInit_t;
49 
50 struct {
51     MultiFDSendParams *params;
52     /*
53      * Global number of generated multifd packets.
54      *
55      * Note that we used 'uintptr_t' because it'll naturally support atomic
56      * operations on both 32bit / 64 bits hosts.  It means on 32bit systems
57      * multifd will overflow the packet_num easier, but that should be
58      * fine.
59      *
60      * Another option is to use QEMU's Stat64 then it'll be 64 bits on all
61      * hosts, however so far it does not support atomic fetch_add() yet.
62      * Make it easy for now.
63      */
64     uintptr_t packet_num;
65     /*
66      * Synchronization point past which no more channels will be
67      * created.
68      */
69     QemuSemaphore channels_created;
70     /* send channels ready */
71     QemuSemaphore channels_ready;
72     /*
73      * Have we already run terminate threads.  There is a race when it
74      * happens that we got one error while we are exiting.
75      * We will use atomic operations.  Only valid values are 0 and 1.
76      */
77     int exiting;
78     /* multifd ops */
79     const MultiFDMethods *ops;
80 } *multifd_send_state;
81 
82 struct {
83     MultiFDRecvParams *params;
84     MultiFDRecvData *data;
85     /* number of created threads */
86     int count;
87     /*
88      * This is always posted by the recv threads, the migration thread
89      * uses it to wait for recv threads to finish assigned tasks.
90      */
91     QemuSemaphore sem_sync;
92     /* global number of generated multifd packets */
93     uint64_t packet_num;
94     int exiting;
95     /* multifd ops */
96     const MultiFDMethods *ops;
97 } *multifd_recv_state;
98 
99 MultiFDSendData *multifd_send_data_alloc(void)
100 {
101     size_t max_payload_size, size_minus_payload;
102 
103     /*
104      * MultiFDPages_t has a flexible array at the end, account for it
105      * when allocating MultiFDSendData. Use max() in case other types
106      * added to the union in the future are larger than
107      * (MultiFDPages_t + flex array).
108      */
109     max_payload_size = MAX(multifd_ram_payload_size(), sizeof(MultiFDPayload));
110 
111     /*
112      * Account for any holes the compiler might insert. We can't pack
113      * the structure because that misaligns the members and triggers
114      * Waddress-of-packed-member.
115      */
116     size_minus_payload = sizeof(MultiFDSendData) - sizeof(MultiFDPayload);
117 
118     return g_malloc0(size_minus_payload + max_payload_size);
119 }
120 
121 static bool multifd_use_packets(void)
122 {
123     return !migrate_mapped_ram();
124 }
125 
126 void multifd_send_channel_created(void)
127 {
128     qemu_sem_post(&multifd_send_state->channels_created);
129 }
130 
131 static const MultiFDMethods *multifd_ops[MULTIFD_COMPRESSION__MAX] = {};
132 
133 void multifd_register_ops(int method, const MultiFDMethods *ops)
134 {
135     assert(0 <= method && method < MULTIFD_COMPRESSION__MAX);
136     assert(!multifd_ops[method]);
137     multifd_ops[method] = ops;
138 }
139 
140 static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp)
141 {
142     MultiFDInit_t msg = {};
143     size_t size = sizeof(msg);
144     int ret;
145 
146     msg.magic = cpu_to_be32(MULTIFD_MAGIC);
147     msg.version = cpu_to_be32(MULTIFD_VERSION);
148     msg.id = p->id;
149     memcpy(msg.uuid, &qemu_uuid.data, sizeof(msg.uuid));
150 
151     ret = qio_channel_write_all(p->c, (char *)&msg, size, errp);
152     if (ret != 0) {
153         return -1;
154     }
155     stat64_add(&mig_stats.multifd_bytes, size);
156     return 0;
157 }
158 
159 static int multifd_recv_initial_packet(QIOChannel *c, Error **errp)
160 {
161     MultiFDInit_t msg;
162     int ret;
163 
164     ret = qio_channel_read_all(c, (char *)&msg, sizeof(msg), errp);
165     if (ret != 0) {
166         return -1;
167     }
168 
169     msg.magic = be32_to_cpu(msg.magic);
170     msg.version = be32_to_cpu(msg.version);
171 
172     if (msg.magic != MULTIFD_MAGIC) {
173         error_setg(errp, "multifd: received packet magic %x "
174                    "expected %x", msg.magic, MULTIFD_MAGIC);
175         return -1;
176     }
177 
178     if (msg.version != MULTIFD_VERSION) {
179         error_setg(errp, "multifd: received packet version %u "
180                    "expected %u", msg.version, MULTIFD_VERSION);
181         return -1;
182     }
183 
184     if (memcmp(msg.uuid, &qemu_uuid, sizeof(qemu_uuid))) {
185         char *uuid = qemu_uuid_unparse_strdup(&qemu_uuid);
186         char *msg_uuid = qemu_uuid_unparse_strdup((const QemuUUID *)msg.uuid);
187 
188         error_setg(errp, "multifd: received uuid '%s' and expected "
189                    "uuid '%s' for channel %hhd", msg_uuid, uuid, msg.id);
190         g_free(uuid);
191         g_free(msg_uuid);
192         return -1;
193     }
194 
195     if (msg.id > migrate_multifd_channels()) {
196         error_setg(errp, "multifd: received channel id %u is greater than "
197                    "number of channels %u", msg.id, migrate_multifd_channels());
198         return -1;
199     }
200 
201     return msg.id;
202 }
203 
204 void multifd_send_fill_packet(MultiFDSendParams *p)
205 {
206     MultiFDPacket_t *packet = p->packet;
207     uint64_t packet_num;
208     bool sync_packet = p->flags & MULTIFD_FLAG_SYNC;
209 
210     memset(packet, 0, p->packet_len);
211 
212     packet->magic = cpu_to_be32(MULTIFD_MAGIC);
213     packet->version = cpu_to_be32(MULTIFD_VERSION);
214 
215     packet->flags = cpu_to_be32(p->flags);
216     packet->next_packet_size = cpu_to_be32(p->next_packet_size);
217 
218     packet_num = qatomic_fetch_inc(&multifd_send_state->packet_num);
219     packet->packet_num = cpu_to_be64(packet_num);
220 
221     p->packets_sent++;
222 
223     if (!sync_packet) {
224         multifd_ram_fill_packet(p);
225     }
226 
227     trace_multifd_send_fill(p->id, packet_num,
228                             p->flags, p->next_packet_size);
229 }
230 
231 static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp)
232 {
233     const MultiFDPacket_t *packet = p->packet;
234     uint32_t magic = be32_to_cpu(packet->magic);
235     uint32_t version = be32_to_cpu(packet->version);
236     int ret = 0;
237 
238     if (magic != MULTIFD_MAGIC) {
239         error_setg(errp, "multifd: received packet magic %x, expected %x",
240                    magic, MULTIFD_MAGIC);
241         return -1;
242     }
243 
244     if (version != MULTIFD_VERSION) {
245         error_setg(errp, "multifd: received packet version %u, expected %u",
246                    version, MULTIFD_VERSION);
247         return -1;
248     }
249 
250     p->flags = be32_to_cpu(packet->flags);
251     p->next_packet_size = be32_to_cpu(packet->next_packet_size);
252     p->packet_num = be64_to_cpu(packet->packet_num);
253     p->packets_recved++;
254 
255     /* Always unfill, old QEMUs (<9.0) send data along with SYNC */
256     ret = multifd_ram_unfill_packet(p, errp);
257 
258     trace_multifd_recv_unfill(p->id, p->packet_num, p->flags,
259                               p->next_packet_size);
260 
261     return ret;
262 }
263 
264 static bool multifd_send_should_exit(void)
265 {
266     return qatomic_read(&multifd_send_state->exiting);
267 }
268 
269 static bool multifd_recv_should_exit(void)
270 {
271     return qatomic_read(&multifd_recv_state->exiting);
272 }
273 
274 /*
275  * The migration thread can wait on either of the two semaphores.  This
276  * function can be used to kick the main thread out of waiting on either of
277  * them.  Should mostly only be called when something wrong happened with
278  * the current multifd send thread.
279  */
280 static void multifd_send_kick_main(MultiFDSendParams *p)
281 {
282     qemu_sem_post(&p->sem_sync);
283     qemu_sem_post(&multifd_send_state->channels_ready);
284 }
285 
286 /*
287  * multifd_send() works by exchanging the MultiFDSendData object
288  * provided by the caller with an unused MultiFDSendData object from
289  * the next channel that is found to be idle.
290  *
291  * The channel owns the data until it finishes transmitting and the
292  * caller owns the empty object until it fills it with data and calls
293  * this function again. No locking necessary.
294  *
295  * Switching is safe because both the migration thread and the channel
296  * thread have barriers in place to serialize access.
297  *
298  * Returns true if succeed, false otherwise.
299  */
300 bool multifd_send(MultiFDSendData **send_data)
301 {
302     int i;
303     static int next_channel;
304     MultiFDSendParams *p = NULL; /* make happy gcc */
305     MultiFDSendData *tmp;
306 
307     if (multifd_send_should_exit()) {
308         return false;
309     }
310 
311     /* We wait here, until at least one channel is ready */
312     qemu_sem_wait(&multifd_send_state->channels_ready);
313 
314     /*
315      * next_channel can remain from a previous migration that was
316      * using more channels, so ensure it doesn't overflow if the
317      * limit is lower now.
318      */
319     next_channel %= migrate_multifd_channels();
320     for (i = next_channel;; i = (i + 1) % migrate_multifd_channels()) {
321         if (multifd_send_should_exit()) {
322             return false;
323         }
324         p = &multifd_send_state->params[i];
325         /*
326          * Lockless read to p->pending_job is safe, because only multifd
327          * sender thread can clear it.
328          */
329         if (qatomic_read(&p->pending_job) == false) {
330             next_channel = (i + 1) % migrate_multifd_channels();
331             break;
332         }
333     }
334 
335     /*
336      * Make sure we read p->pending_job before all the rest.  Pairs with
337      * qatomic_store_release() in multifd_send_thread().
338      */
339     smp_mb_acquire();
340 
341     assert(multifd_payload_empty(p->data));
342 
343     /*
344      * Swap the pointers. The channel gets the client data for
345      * transferring and the client gets back an unused data slot.
346      */
347     tmp = *send_data;
348     *send_data = p->data;
349     p->data = tmp;
350 
351     /*
352      * Making sure p->data is setup before marking pending_job=true. Pairs
353      * with the qatomic_load_acquire() in multifd_send_thread().
354      */
355     qatomic_store_release(&p->pending_job, true);
356     qemu_sem_post(&p->sem);
357 
358     return true;
359 }
360 
361 /* Multifd send side hit an error; remember it and prepare to quit */
362 static void multifd_send_set_error(Error *err)
363 {
364     /*
365      * We don't want to exit each threads twice.  Depending on where
366      * we get the error, or if there are two independent errors in two
367      * threads at the same time, we can end calling this function
368      * twice.
369      */
370     if (qatomic_xchg(&multifd_send_state->exiting, 1)) {
371         return;
372     }
373 
374     if (err) {
375         MigrationState *s = migrate_get_current();
376         migrate_set_error(s, err);
377         if (s->state == MIGRATION_STATUS_SETUP ||
378             s->state == MIGRATION_STATUS_PRE_SWITCHOVER ||
379             s->state == MIGRATION_STATUS_DEVICE ||
380             s->state == MIGRATION_STATUS_ACTIVE) {
381             migrate_set_state(&s->state, s->state,
382                               MIGRATION_STATUS_FAILED);
383         }
384     }
385 }
386 
387 static void multifd_send_terminate_threads(void)
388 {
389     int i;
390 
391     trace_multifd_send_terminate_threads();
392 
393     /*
394      * Tell everyone we're quitting.  No xchg() needed here; we simply
395      * always set it.
396      */
397     qatomic_set(&multifd_send_state->exiting, 1);
398 
399     /*
400      * Firstly, kick all threads out; no matter whether they are just idle,
401      * or blocked in an IO system call.
402      */
403     for (i = 0; i < migrate_multifd_channels(); i++) {
404         MultiFDSendParams *p = &multifd_send_state->params[i];
405 
406         qemu_sem_post(&p->sem);
407         if (p->c) {
408             qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
409         }
410     }
411 
412     /*
413      * Finally recycle all the threads.
414      */
415     for (i = 0; i < migrate_multifd_channels(); i++) {
416         MultiFDSendParams *p = &multifd_send_state->params[i];
417 
418         if (p->tls_thread_created) {
419             qemu_thread_join(&p->tls_thread);
420         }
421 
422         if (p->thread_created) {
423             qemu_thread_join(&p->thread);
424         }
425     }
426 }
427 
428 static bool multifd_send_cleanup_channel(MultiFDSendParams *p, Error **errp)
429 {
430     if (p->c) {
431         migration_ioc_unregister_yank(p->c);
432         /*
433          * The object_unref() cannot guarantee the fd will always be
434          * released because finalize() of the iochannel is only
435          * triggered on the last reference and it's not guaranteed
436          * that we always hold the last refcount when reaching here.
437          *
438          * Closing the fd explicitly has the benefit that if there is any
439          * registered I/O handler callbacks on such fd, that will get a
440          * POLLNVAL event and will further trigger the cleanup to finally
441          * release the IOC.
442          *
443          * FIXME: It should logically be guaranteed that all multifd
444          * channels have no I/O handler callback registered when reaching
445          * here, because migration thread will wait for all multifd channel
446          * establishments to complete during setup.  Since
447          * migrate_fd_cleanup() will be scheduled in main thread too, all
448          * previous callbacks should guarantee to be completed when
449          * reaching here.  See multifd_send_state.channels_created and its
450          * usage.  In the future, we could replace this with an assert
451          * making sure we're the last reference, or simply drop it if above
452          * is more clear to be justified.
453          */
454         qio_channel_close(p->c, &error_abort);
455         object_unref(OBJECT(p->c));
456         p->c = NULL;
457     }
458     qemu_sem_destroy(&p->sem);
459     qemu_sem_destroy(&p->sem_sync);
460     g_free(p->name);
461     p->name = NULL;
462     g_free(p->data);
463     p->data = NULL;
464     p->packet_len = 0;
465     g_free(p->packet);
466     p->packet = NULL;
467     multifd_send_state->ops->send_cleanup(p, errp);
468     assert(!p->iov);
469 
470     return *errp == NULL;
471 }
472 
473 static void multifd_send_cleanup_state(void)
474 {
475     file_cleanup_outgoing_migration();
476     socket_cleanup_outgoing_migration();
477     qemu_sem_destroy(&multifd_send_state->channels_created);
478     qemu_sem_destroy(&multifd_send_state->channels_ready);
479     g_free(multifd_send_state->params);
480     multifd_send_state->params = NULL;
481     g_free(multifd_send_state);
482     multifd_send_state = NULL;
483 }
484 
485 void multifd_send_shutdown(void)
486 {
487     int i;
488 
489     if (!migrate_multifd()) {
490         return;
491     }
492 
493     multifd_send_terminate_threads();
494 
495     for (i = 0; i < migrate_multifd_channels(); i++) {
496         MultiFDSendParams *p = &multifd_send_state->params[i];
497         Error *local_err = NULL;
498 
499         if (!multifd_send_cleanup_channel(p, &local_err)) {
500             migrate_set_error(migrate_get_current(), local_err);
501             error_free(local_err);
502         }
503     }
504 
505     multifd_send_cleanup_state();
506 }
507 
508 static int multifd_zero_copy_flush(QIOChannel *c)
509 {
510     int ret;
511     Error *err = NULL;
512 
513     ret = qio_channel_flush(c, &err);
514     if (ret < 0) {
515         error_report_err(err);
516         return -1;
517     }
518     if (ret == 1) {
519         stat64_add(&mig_stats.dirty_sync_missed_zero_copy, 1);
520     }
521 
522     return ret;
523 }
524 
525 int multifd_send_sync_main(MultiFDSyncReq req)
526 {
527     int i;
528     bool flush_zero_copy;
529 
530     assert(req != MULTIFD_SYNC_NONE);
531 
532     flush_zero_copy = migrate_zero_copy_send();
533 
534     for (i = 0; i < migrate_multifd_channels(); i++) {
535         MultiFDSendParams *p = &multifd_send_state->params[i];
536 
537         if (multifd_send_should_exit()) {
538             return -1;
539         }
540 
541         trace_multifd_send_sync_main_signal(p->id);
542 
543         /*
544          * We should be the only user so far, so not possible to be set by
545          * others concurrently.
546          */
547         assert(qatomic_read(&p->pending_sync) == MULTIFD_SYNC_NONE);
548         qatomic_set(&p->pending_sync, req);
549         qemu_sem_post(&p->sem);
550     }
551     for (i = 0; i < migrate_multifd_channels(); i++) {
552         MultiFDSendParams *p = &multifd_send_state->params[i];
553 
554         if (multifd_send_should_exit()) {
555             return -1;
556         }
557 
558         qemu_sem_wait(&multifd_send_state->channels_ready);
559         trace_multifd_send_sync_main_wait(p->id);
560         qemu_sem_wait(&p->sem_sync);
561 
562         if (flush_zero_copy && p->c && (multifd_zero_copy_flush(p->c) < 0)) {
563             return -1;
564         }
565     }
566     trace_multifd_send_sync_main(multifd_send_state->packet_num);
567 
568     return 0;
569 }
570 
571 static void *multifd_send_thread(void *opaque)
572 {
573     MultiFDSendParams *p = opaque;
574     MigrationThread *thread = NULL;
575     Error *local_err = NULL;
576     int ret = 0;
577     bool use_packets = multifd_use_packets();
578 
579     thread = migration_threads_add(p->name, qemu_get_thread_id());
580 
581     trace_multifd_send_thread_start(p->id);
582     rcu_register_thread();
583 
584     if (use_packets) {
585         if (multifd_send_initial_packet(p, &local_err) < 0) {
586             ret = -1;
587             goto out;
588         }
589     }
590 
591     while (true) {
592         qemu_sem_post(&multifd_send_state->channels_ready);
593         qemu_sem_wait(&p->sem);
594 
595         if (multifd_send_should_exit()) {
596             break;
597         }
598 
599         /*
600          * Read pending_job flag before p->data.  Pairs with the
601          * qatomic_store_release() in multifd_send().
602          */
603         if (qatomic_load_acquire(&p->pending_job)) {
604             p->flags = 0;
605             p->iovs_num = 0;
606             assert(!multifd_payload_empty(p->data));
607 
608             ret = multifd_send_state->ops->send_prepare(p, &local_err);
609             if (ret != 0) {
610                 break;
611             }
612 
613             if (migrate_mapped_ram()) {
614                 ret = file_write_ramblock_iov(p->c, p->iov, p->iovs_num,
615                                               &p->data->u.ram, &local_err);
616             } else {
617                 ret = qio_channel_writev_full_all(p->c, p->iov, p->iovs_num,
618                                                   NULL, 0, p->write_flags,
619                                                   &local_err);
620             }
621 
622             if (ret != 0) {
623                 break;
624             }
625 
626             stat64_add(&mig_stats.multifd_bytes,
627                        (uint64_t)p->next_packet_size + p->packet_len);
628 
629             p->next_packet_size = 0;
630             multifd_set_payload_type(p->data, MULTIFD_PAYLOAD_NONE);
631 
632             /*
633              * Making sure p->data is published before saying "we're
634              * free".  Pairs with the smp_mb_acquire() in
635              * multifd_send().
636              */
637             qatomic_store_release(&p->pending_job, false);
638         } else {
639             MultiFDSyncReq req = qatomic_read(&p->pending_sync);
640 
641             /*
642              * If not a normal job, must be a sync request.  Note that
643              * pending_sync is a standalone flag (unlike pending_job), so
644              * it doesn't require explicit memory barriers.
645              */
646             assert(req != MULTIFD_SYNC_NONE);
647 
648             /* Only push the SYNC message if it involves a remote sync */
649             if (req == MULTIFD_SYNC_ALL) {
650                 p->flags = MULTIFD_FLAG_SYNC;
651                 multifd_send_fill_packet(p);
652                 ret = qio_channel_write_all(p->c, (void *)p->packet,
653                                             p->packet_len, &local_err);
654                 if (ret != 0) {
655                     break;
656                 }
657                 /* p->next_packet_size will always be zero for a SYNC packet */
658                 stat64_add(&mig_stats.multifd_bytes, p->packet_len);
659             }
660 
661             qatomic_set(&p->pending_sync, MULTIFD_SYNC_NONE);
662             qemu_sem_post(&p->sem_sync);
663         }
664     }
665 
666 out:
667     if (ret) {
668         assert(local_err);
669         trace_multifd_send_error(p->id);
670         multifd_send_set_error(local_err);
671         multifd_send_kick_main(p);
672         error_free(local_err);
673     }
674 
675     rcu_unregister_thread();
676     migration_threads_remove(thread);
677     trace_multifd_send_thread_end(p->id, p->packets_sent);
678 
679     return NULL;
680 }
681 
682 static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque);
683 
684 typedef struct {
685     MultiFDSendParams *p;
686     QIOChannelTLS *tioc;
687 } MultiFDTLSThreadArgs;
688 
689 static void *multifd_tls_handshake_thread(void *opaque)
690 {
691     MultiFDTLSThreadArgs *args = opaque;
692 
693     qio_channel_tls_handshake(args->tioc,
694                               multifd_new_send_channel_async,
695                               args->p,
696                               NULL,
697                               NULL);
698     g_free(args);
699 
700     return NULL;
701 }
702 
703 static bool multifd_tls_channel_connect(MultiFDSendParams *p,
704                                         QIOChannel *ioc,
705                                         Error **errp)
706 {
707     MigrationState *s = migrate_get_current();
708     const char *hostname = s->hostname;
709     MultiFDTLSThreadArgs *args;
710     QIOChannelTLS *tioc;
711 
712     tioc = migration_tls_client_create(ioc, hostname, errp);
713     if (!tioc) {
714         return false;
715     }
716 
717     /*
718      * Ownership of the socket channel now transfers to the newly
719      * created TLS channel, which has already taken a reference.
720      */
721     object_unref(OBJECT(ioc));
722     trace_multifd_tls_outgoing_handshake_start(ioc, tioc, hostname);
723     qio_channel_set_name(QIO_CHANNEL(tioc), "multifd-tls-outgoing");
724 
725     args = g_new0(MultiFDTLSThreadArgs, 1);
726     args->tioc = tioc;
727     args->p = p;
728 
729     p->tls_thread_created = true;
730     qemu_thread_create(&p->tls_thread, MIGRATION_THREAD_SRC_TLS,
731                        multifd_tls_handshake_thread, args,
732                        QEMU_THREAD_JOINABLE);
733     return true;
734 }
735 
736 void multifd_channel_connect(MultiFDSendParams *p, QIOChannel *ioc)
737 {
738     qio_channel_set_delay(ioc, false);
739 
740     migration_ioc_register_yank(ioc);
741     /* Setup p->c only if the channel is completely setup */
742     p->c = ioc;
743 
744     p->thread_created = true;
745     qemu_thread_create(&p->thread, p->name, multifd_send_thread, p,
746                        QEMU_THREAD_JOINABLE);
747 }
748 
749 /*
750  * When TLS is enabled this function is called once to establish the
751  * TLS connection and a second time after the TLS handshake to create
752  * the multifd channel. Without TLS it goes straight into the channel
753  * creation.
754  */
755 static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque)
756 {
757     MultiFDSendParams *p = opaque;
758     QIOChannel *ioc = QIO_CHANNEL(qio_task_get_source(task));
759     Error *local_err = NULL;
760     bool ret;
761 
762     trace_multifd_new_send_channel_async(p->id);
763 
764     if (qio_task_propagate_error(task, &local_err)) {
765         ret = false;
766         goto out;
767     }
768 
769     trace_multifd_set_outgoing_channel(ioc, object_get_typename(OBJECT(ioc)),
770                                        migrate_get_current()->hostname);
771 
772     if (migrate_channel_requires_tls_upgrade(ioc)) {
773         ret = multifd_tls_channel_connect(p, ioc, &local_err);
774         if (ret) {
775             return;
776         }
777     } else {
778         multifd_channel_connect(p, ioc);
779         ret = true;
780     }
781 
782 out:
783     /*
784      * Here we're not interested whether creation succeeded, only that
785      * it happened at all.
786      */
787     multifd_send_channel_created();
788 
789     if (ret) {
790         return;
791     }
792 
793     trace_multifd_new_send_channel_async_error(p->id, local_err);
794     multifd_send_set_error(local_err);
795     /*
796      * For error cases (TLS or non-TLS), IO channel is always freed here
797      * rather than when cleanup multifd: since p->c is not set, multifd
798      * cleanup code doesn't even know its existence.
799      */
800     object_unref(OBJECT(ioc));
801     error_free(local_err);
802 }
803 
804 static bool multifd_new_send_channel_create(gpointer opaque, Error **errp)
805 {
806     if (!multifd_use_packets()) {
807         return file_send_channel_create(opaque, errp);
808     }
809 
810     socket_send_channel_create(multifd_new_send_channel_async, opaque);
811     return true;
812 }
813 
814 bool multifd_send_setup(void)
815 {
816     MigrationState *s = migrate_get_current();
817     int thread_count, ret = 0;
818     uint32_t page_count = multifd_ram_page_count();
819     bool use_packets = multifd_use_packets();
820     uint8_t i;
821 
822     if (!migrate_multifd()) {
823         return true;
824     }
825 
826     thread_count = migrate_multifd_channels();
827     multifd_send_state = g_malloc0(sizeof(*multifd_send_state));
828     multifd_send_state->params = g_new0(MultiFDSendParams, thread_count);
829     qemu_sem_init(&multifd_send_state->channels_created, 0);
830     qemu_sem_init(&multifd_send_state->channels_ready, 0);
831     qatomic_set(&multifd_send_state->exiting, 0);
832     multifd_send_state->ops = multifd_ops[migrate_multifd_compression()];
833 
834     for (i = 0; i < thread_count; i++) {
835         MultiFDSendParams *p = &multifd_send_state->params[i];
836         Error *local_err = NULL;
837 
838         qemu_sem_init(&p->sem, 0);
839         qemu_sem_init(&p->sem_sync, 0);
840         p->id = i;
841         p->data = multifd_send_data_alloc();
842 
843         if (use_packets) {
844             p->packet_len = sizeof(MultiFDPacket_t)
845                           + sizeof(uint64_t) * page_count;
846             p->packet = g_malloc0(p->packet_len);
847         }
848         p->name = g_strdup_printf(MIGRATION_THREAD_SRC_MULTIFD, i);
849         p->write_flags = 0;
850 
851         if (!multifd_new_send_channel_create(p, &local_err)) {
852             migrate_set_error(s, local_err);
853             ret = -1;
854         }
855     }
856 
857     /*
858      * Wait until channel creation has started for all channels. The
859      * creation can still fail, but no more channels will be created
860      * past this point.
861      */
862     for (i = 0; i < thread_count; i++) {
863         qemu_sem_wait(&multifd_send_state->channels_created);
864     }
865 
866     if (ret) {
867         goto err;
868     }
869 
870     for (i = 0; i < thread_count; i++) {
871         MultiFDSendParams *p = &multifd_send_state->params[i];
872         Error *local_err = NULL;
873 
874         ret = multifd_send_state->ops->send_setup(p, &local_err);
875         if (ret) {
876             migrate_set_error(s, local_err);
877             goto err;
878         }
879         assert(p->iov);
880     }
881 
882     return true;
883 
884 err:
885     migrate_set_state(&s->state, MIGRATION_STATUS_SETUP,
886                       MIGRATION_STATUS_FAILED);
887     return false;
888 }
889 
890 bool multifd_recv(void)
891 {
892     int i;
893     static int next_recv_channel;
894     MultiFDRecvParams *p = NULL;
895     MultiFDRecvData *data = multifd_recv_state->data;
896 
897     /*
898      * next_channel can remain from a previous migration that was
899      * using more channels, so ensure it doesn't overflow if the
900      * limit is lower now.
901      */
902     next_recv_channel %= migrate_multifd_channels();
903     for (i = next_recv_channel;; i = (i + 1) % migrate_multifd_channels()) {
904         if (multifd_recv_should_exit()) {
905             return false;
906         }
907 
908         p = &multifd_recv_state->params[i];
909 
910         if (qatomic_read(&p->pending_job) == false) {
911             next_recv_channel = (i + 1) % migrate_multifd_channels();
912             break;
913         }
914     }
915 
916     /*
917      * Order pending_job read before manipulating p->data below. Pairs
918      * with qatomic_store_release() at multifd_recv_thread().
919      */
920     smp_mb_acquire();
921 
922     assert(!p->data->size);
923     multifd_recv_state->data = p->data;
924     p->data = data;
925 
926     /*
927      * Order p->data update before setting pending_job. Pairs with
928      * qatomic_load_acquire() at multifd_recv_thread().
929      */
930     qatomic_store_release(&p->pending_job, true);
931     qemu_sem_post(&p->sem);
932 
933     return true;
934 }
935 
936 MultiFDRecvData *multifd_get_recv_data(void)
937 {
938     return multifd_recv_state->data;
939 }
940 
941 static void multifd_recv_terminate_threads(Error *err)
942 {
943     int i;
944 
945     trace_multifd_recv_terminate_threads(err != NULL);
946 
947     if (qatomic_xchg(&multifd_recv_state->exiting, 1)) {
948         return;
949     }
950 
951     if (err) {
952         MigrationState *s = migrate_get_current();
953         migrate_set_error(s, err);
954         if (s->state == MIGRATION_STATUS_SETUP ||
955             s->state == MIGRATION_STATUS_ACTIVE) {
956             migrate_set_state(&s->state, s->state,
957                               MIGRATION_STATUS_FAILED);
958         }
959     }
960 
961     for (i = 0; i < migrate_multifd_channels(); i++) {
962         MultiFDRecvParams *p = &multifd_recv_state->params[i];
963 
964         /*
965          * The migration thread and channels interact differently
966          * depending on the presence of packets.
967          */
968         if (multifd_use_packets()) {
969             /*
970              * The channel receives as long as there are packets. When
971              * packets end (i.e. MULTIFD_FLAG_SYNC is reached), the
972              * channel waits for the migration thread to sync. If the
973              * sync never happens, do it here.
974              */
975             qemu_sem_post(&p->sem_sync);
976         } else {
977             /*
978              * The channel waits for the migration thread to give it
979              * work. When the migration thread runs out of work, it
980              * releases the channel and waits for any pending work to
981              * finish. If we reach here (e.g. due to error) before the
982              * work runs out, release the channel.
983              */
984             qemu_sem_post(&p->sem);
985         }
986 
987         /*
988          * We could arrive here for two reasons:
989          *  - normal quit, i.e. everything went fine, just finished
990          *  - error quit: We close the channels so the channel threads
991          *    finish the qio_channel_read_all_eof()
992          */
993         if (p->c) {
994             qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
995         }
996     }
997 }
998 
999 void multifd_recv_shutdown(void)
1000 {
1001     if (migrate_multifd()) {
1002         multifd_recv_terminate_threads(NULL);
1003     }
1004 }
1005 
1006 static void multifd_recv_cleanup_channel(MultiFDRecvParams *p)
1007 {
1008     migration_ioc_unregister_yank(p->c);
1009     object_unref(OBJECT(p->c));
1010     p->c = NULL;
1011     qemu_mutex_destroy(&p->mutex);
1012     qemu_sem_destroy(&p->sem_sync);
1013     qemu_sem_destroy(&p->sem);
1014     g_free(p->data);
1015     p->data = NULL;
1016     g_free(p->name);
1017     p->name = NULL;
1018     p->packet_len = 0;
1019     g_free(p->packet);
1020     p->packet = NULL;
1021     g_free(p->normal);
1022     p->normal = NULL;
1023     g_free(p->zero);
1024     p->zero = NULL;
1025     multifd_recv_state->ops->recv_cleanup(p);
1026 }
1027 
1028 static void multifd_recv_cleanup_state(void)
1029 {
1030     qemu_sem_destroy(&multifd_recv_state->sem_sync);
1031     g_free(multifd_recv_state->params);
1032     multifd_recv_state->params = NULL;
1033     g_free(multifd_recv_state->data);
1034     multifd_recv_state->data = NULL;
1035     g_free(multifd_recv_state);
1036     multifd_recv_state = NULL;
1037 }
1038 
1039 void multifd_recv_cleanup(void)
1040 {
1041     int i;
1042 
1043     if (!migrate_multifd()) {
1044         return;
1045     }
1046     multifd_recv_terminate_threads(NULL);
1047     for (i = 0; i < migrate_multifd_channels(); i++) {
1048         MultiFDRecvParams *p = &multifd_recv_state->params[i];
1049 
1050         if (p->thread_created) {
1051             qemu_thread_join(&p->thread);
1052         }
1053     }
1054     for (i = 0; i < migrate_multifd_channels(); i++) {
1055         multifd_recv_cleanup_channel(&multifd_recv_state->params[i]);
1056     }
1057     multifd_recv_cleanup_state();
1058 }
1059 
1060 void multifd_recv_sync_main(void)
1061 {
1062     int thread_count = migrate_multifd_channels();
1063     bool file_based = !multifd_use_packets();
1064     int i;
1065 
1066     if (!migrate_multifd()) {
1067         return;
1068     }
1069 
1070     /*
1071      * File-based channels don't use packets and therefore need to
1072      * wait for more work. Release them to start the sync.
1073      */
1074     if (file_based) {
1075         for (i = 0; i < thread_count; i++) {
1076             MultiFDRecvParams *p = &multifd_recv_state->params[i];
1077 
1078             trace_multifd_recv_sync_main_signal(p->id);
1079             qemu_sem_post(&p->sem);
1080         }
1081     }
1082 
1083     /*
1084      * Initiate the synchronization by waiting for all channels.
1085      *
1086      * For socket-based migration this means each channel has received
1087      * the SYNC packet on the stream.
1088      *
1089      * For file-based migration this means each channel is done with
1090      * the work (pending_job=false).
1091      */
1092     for (i = 0; i < thread_count; i++) {
1093         trace_multifd_recv_sync_main_wait(i);
1094         qemu_sem_wait(&multifd_recv_state->sem_sync);
1095     }
1096 
1097     if (file_based) {
1098         /*
1099          * For file-based loading is done in one iteration. We're
1100          * done.
1101          */
1102         return;
1103     }
1104 
1105     /*
1106      * Sync done. Release the channels for the next iteration.
1107      */
1108     for (i = 0; i < thread_count; i++) {
1109         MultiFDRecvParams *p = &multifd_recv_state->params[i];
1110 
1111         WITH_QEMU_LOCK_GUARD(&p->mutex) {
1112             if (multifd_recv_state->packet_num < p->packet_num) {
1113                 multifd_recv_state->packet_num = p->packet_num;
1114             }
1115         }
1116         trace_multifd_recv_sync_main_signal(p->id);
1117         qemu_sem_post(&p->sem_sync);
1118     }
1119     trace_multifd_recv_sync_main(multifd_recv_state->packet_num);
1120 }
1121 
1122 static void *multifd_recv_thread(void *opaque)
1123 {
1124     MultiFDRecvParams *p = opaque;
1125     Error *local_err = NULL;
1126     bool use_packets = multifd_use_packets();
1127     int ret;
1128 
1129     trace_multifd_recv_thread_start(p->id);
1130     rcu_register_thread();
1131 
1132     while (true) {
1133         uint32_t flags = 0;
1134         bool has_data = false;
1135         p->normal_num = 0;
1136 
1137         if (use_packets) {
1138             if (multifd_recv_should_exit()) {
1139                 break;
1140             }
1141 
1142             ret = qio_channel_read_all_eof(p->c, (void *)p->packet,
1143                                            p->packet_len, &local_err);
1144             if (ret == 0 || ret == -1) {   /* 0: EOF  -1: Error */
1145                 break;
1146             }
1147 
1148             qemu_mutex_lock(&p->mutex);
1149             ret = multifd_recv_unfill_packet(p, &local_err);
1150             if (ret) {
1151                 qemu_mutex_unlock(&p->mutex);
1152                 break;
1153             }
1154 
1155             flags = p->flags;
1156             /* recv methods don't know how to handle the SYNC flag */
1157             p->flags &= ~MULTIFD_FLAG_SYNC;
1158 
1159             /*
1160              * Even if it's a SYNC packet, this needs to be set
1161              * because older QEMUs (<9.0) still send data along with
1162              * the SYNC packet.
1163              */
1164             has_data = p->normal_num || p->zero_num;
1165             qemu_mutex_unlock(&p->mutex);
1166         } else {
1167             /*
1168              * No packets, so we need to wait for the vmstate code to
1169              * give us work.
1170              */
1171             qemu_sem_wait(&p->sem);
1172 
1173             if (multifd_recv_should_exit()) {
1174                 break;
1175             }
1176 
1177             /* pairs with qatomic_store_release() at multifd_recv() */
1178             if (!qatomic_load_acquire(&p->pending_job)) {
1179                 /*
1180                  * Migration thread did not send work, this is
1181                  * equivalent to pending_sync on the sending
1182                  * side. Post sem_sync to notify we reached this
1183                  * point.
1184                  */
1185                 qemu_sem_post(&multifd_recv_state->sem_sync);
1186                 continue;
1187             }
1188 
1189             has_data = !!p->data->size;
1190         }
1191 
1192         if (has_data) {
1193             ret = multifd_recv_state->ops->recv(p, &local_err);
1194             if (ret != 0) {
1195                 break;
1196             }
1197         }
1198 
1199         if (use_packets) {
1200             if (flags & MULTIFD_FLAG_SYNC) {
1201                 qemu_sem_post(&multifd_recv_state->sem_sync);
1202                 qemu_sem_wait(&p->sem_sync);
1203             }
1204         } else {
1205             p->data->size = 0;
1206             /*
1207              * Order data->size update before clearing
1208              * pending_job. Pairs with smp_mb_acquire() at
1209              * multifd_recv().
1210              */
1211             qatomic_store_release(&p->pending_job, false);
1212         }
1213     }
1214 
1215     if (local_err) {
1216         multifd_recv_terminate_threads(local_err);
1217         error_free(local_err);
1218     }
1219 
1220     rcu_unregister_thread();
1221     trace_multifd_recv_thread_end(p->id, p->packets_recved);
1222 
1223     return NULL;
1224 }
1225 
1226 int multifd_recv_setup(Error **errp)
1227 {
1228     int thread_count;
1229     uint32_t page_count = multifd_ram_page_count();
1230     bool use_packets = multifd_use_packets();
1231     uint8_t i;
1232 
1233     /*
1234      * Return successfully if multiFD recv state is already initialised
1235      * or multiFD is not enabled.
1236      */
1237     if (multifd_recv_state || !migrate_multifd()) {
1238         return 0;
1239     }
1240 
1241     thread_count = migrate_multifd_channels();
1242     multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state));
1243     multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count);
1244 
1245     multifd_recv_state->data = g_new0(MultiFDRecvData, 1);
1246     multifd_recv_state->data->size = 0;
1247 
1248     qatomic_set(&multifd_recv_state->count, 0);
1249     qatomic_set(&multifd_recv_state->exiting, 0);
1250     qemu_sem_init(&multifd_recv_state->sem_sync, 0);
1251     multifd_recv_state->ops = multifd_ops[migrate_multifd_compression()];
1252 
1253     for (i = 0; i < thread_count; i++) {
1254         MultiFDRecvParams *p = &multifd_recv_state->params[i];
1255 
1256         qemu_mutex_init(&p->mutex);
1257         qemu_sem_init(&p->sem_sync, 0);
1258         qemu_sem_init(&p->sem, 0);
1259         p->pending_job = false;
1260         p->id = i;
1261 
1262         p->data = g_new0(MultiFDRecvData, 1);
1263         p->data->size = 0;
1264 
1265         if (use_packets) {
1266             p->packet_len = sizeof(MultiFDPacket_t)
1267                 + sizeof(uint64_t) * page_count;
1268             p->packet = g_malloc0(p->packet_len);
1269         }
1270         p->name = g_strdup_printf(MIGRATION_THREAD_DST_MULTIFD, i);
1271         p->normal = g_new0(ram_addr_t, page_count);
1272         p->zero = g_new0(ram_addr_t, page_count);
1273     }
1274 
1275     for (i = 0; i < thread_count; i++) {
1276         MultiFDRecvParams *p = &multifd_recv_state->params[i];
1277         int ret;
1278 
1279         ret = multifd_recv_state->ops->recv_setup(p, errp);
1280         if (ret) {
1281             return ret;
1282         }
1283     }
1284     return 0;
1285 }
1286 
1287 bool multifd_recv_all_channels_created(void)
1288 {
1289     int thread_count = migrate_multifd_channels();
1290 
1291     if (!migrate_multifd()) {
1292         return true;
1293     }
1294 
1295     if (!multifd_recv_state) {
1296         /* Called before any connections created */
1297         return false;
1298     }
1299 
1300     return thread_count == qatomic_read(&multifd_recv_state->count);
1301 }
1302 
1303 /*
1304  * Try to receive all multifd channels to get ready for the migration.
1305  * Sets @errp when failing to receive the current channel.
1306  */
1307 void multifd_recv_new_channel(QIOChannel *ioc, Error **errp)
1308 {
1309     MultiFDRecvParams *p;
1310     Error *local_err = NULL;
1311     bool use_packets = multifd_use_packets();
1312     int id;
1313 
1314     if (use_packets) {
1315         id = multifd_recv_initial_packet(ioc, &local_err);
1316         if (id < 0) {
1317             multifd_recv_terminate_threads(local_err);
1318             error_propagate_prepend(errp, local_err,
1319                                     "failed to receive packet"
1320                                     " via multifd channel %d: ",
1321                                     qatomic_read(&multifd_recv_state->count));
1322             return;
1323         }
1324         trace_multifd_recv_new_channel(id);
1325     } else {
1326         id = qatomic_read(&multifd_recv_state->count);
1327     }
1328 
1329     p = &multifd_recv_state->params[id];
1330     if (p->c != NULL) {
1331         error_setg(&local_err, "multifd: received id '%d' already setup'",
1332                    id);
1333         multifd_recv_terminate_threads(local_err);
1334         error_propagate(errp, local_err);
1335         return;
1336     }
1337     p->c = ioc;
1338     object_ref(OBJECT(ioc));
1339 
1340     p->thread_created = true;
1341     qemu_thread_create(&p->thread, p->name, multifd_recv_thread, p,
1342                        QEMU_THREAD_JOINABLE);
1343     qatomic_inc(&multifd_recv_state->count);
1344 }
1345