xref: /qemu/migration/multifd.c (revision f07a5674cf97b8473e5d06d7b1df9b51e97d553f)
1 /*
2  * Multifd common code
3  *
4  * Copyright (c) 2019-2020 Red Hat Inc
5  *
6  * Authors:
7  *  Juan Quintela <quintela@redhat.com>
8  *
9  * This work is licensed under the terms of the GNU GPL, version 2 or later.
10  * See the COPYING file in the top-level directory.
11  */
12 
13 #include "qemu/osdep.h"
14 #include "qemu/cutils.h"
15 #include "qemu/rcu.h"
16 #include "exec/target_page.h"
17 #include "system/system.h"
18 #include "exec/ramblock.h"
19 #include "qemu/error-report.h"
20 #include "qapi/error.h"
21 #include "file.h"
22 #include "migration.h"
23 #include "migration-stats.h"
24 #include "socket.h"
25 #include "tls.h"
26 #include "qemu-file.h"
27 #include "trace.h"
28 #include "multifd.h"
29 #include "threadinfo.h"
30 #include "options.h"
31 #include "qemu/yank.h"
32 #include "io/channel-file.h"
33 #include "io/channel-socket.h"
34 #include "yank_functions.h"
35 
36 /* Multiple fd's */
37 
38 #define MULTIFD_MAGIC 0x11223344U
39 #define MULTIFD_VERSION 1
40 
41 typedef struct {
42     uint32_t magic;
43     uint32_t version;
44     unsigned char uuid[16]; /* QemuUUID */
45     uint8_t id;
46     uint8_t unused1[7];     /* Reserved for future use */
47     uint64_t unused2[4];    /* Reserved for future use */
48 } __attribute__((packed)) MultiFDInit_t;
49 
50 struct {
51     MultiFDSendParams *params;
52     /*
53      * Global number of generated multifd packets.
54      *
55      * Note that we used 'uintptr_t' because it'll naturally support atomic
56      * operations on both 32bit / 64 bits hosts.  It means on 32bit systems
57      * multifd will overflow the packet_num easier, but that should be
58      * fine.
59      *
60      * Another option is to use QEMU's Stat64 then it'll be 64 bits on all
61      * hosts, however so far it does not support atomic fetch_add() yet.
62      * Make it easy for now.
63      */
64     uintptr_t packet_num;
65     /*
66      * Synchronization point past which no more channels will be
67      * created.
68      */
69     QemuSemaphore channels_created;
70     /* send channels ready */
71     QemuSemaphore channels_ready;
72     /*
73      * Have we already run terminate threads.  There is a race when it
74      * happens that we got one error while we are exiting.
75      * We will use atomic operations.  Only valid values are 0 and 1.
76      */
77     int exiting;
78     /* multifd ops */
79     const MultiFDMethods *ops;
80 } *multifd_send_state;
81 
82 struct {
83     MultiFDRecvParams *params;
84     MultiFDRecvData *data;
85     /* number of created threads */
86     int count;
87     /*
88      * This is always posted by the recv threads, the migration thread
89      * uses it to wait for recv threads to finish assigned tasks.
90      */
91     QemuSemaphore sem_sync;
92     /* global number of generated multifd packets */
93     uint64_t packet_num;
94     int exiting;
95     /* multifd ops */
96     const MultiFDMethods *ops;
97 } *multifd_recv_state;
98 
99 MultiFDSendData *multifd_send_data_alloc(void)
100 {
101     size_t max_payload_size, size_minus_payload;
102 
103     /*
104      * MultiFDPages_t has a flexible array at the end, account for it
105      * when allocating MultiFDSendData. Use max() in case other types
106      * added to the union in the future are larger than
107      * (MultiFDPages_t + flex array).
108      */
109     max_payload_size = MAX(multifd_ram_payload_size(), sizeof(MultiFDPayload));
110 
111     /*
112      * Account for any holes the compiler might insert. We can't pack
113      * the structure because that misaligns the members and triggers
114      * Waddress-of-packed-member.
115      */
116     size_minus_payload = sizeof(MultiFDSendData) - sizeof(MultiFDPayload);
117 
118     return g_malloc0(size_minus_payload + max_payload_size);
119 }
120 
121 static bool multifd_use_packets(void)
122 {
123     return !migrate_mapped_ram();
124 }
125 
126 void multifd_send_channel_created(void)
127 {
128     qemu_sem_post(&multifd_send_state->channels_created);
129 }
130 
131 static const MultiFDMethods *multifd_ops[MULTIFD_COMPRESSION__MAX] = {};
132 
133 void multifd_register_ops(int method, const MultiFDMethods *ops)
134 {
135     assert(0 <= method && method < MULTIFD_COMPRESSION__MAX);
136     assert(!multifd_ops[method]);
137     multifd_ops[method] = ops;
138 }
139 
140 static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp)
141 {
142     MultiFDInit_t msg = {};
143     size_t size = sizeof(msg);
144     int ret;
145 
146     msg.magic = cpu_to_be32(MULTIFD_MAGIC);
147     msg.version = cpu_to_be32(MULTIFD_VERSION);
148     msg.id = p->id;
149     memcpy(msg.uuid, &qemu_uuid.data, sizeof(msg.uuid));
150 
151     ret = qio_channel_write_all(p->c, (char *)&msg, size, errp);
152     if (ret != 0) {
153         return -1;
154     }
155     stat64_add(&mig_stats.multifd_bytes, size);
156     return 0;
157 }
158 
159 static int multifd_recv_initial_packet(QIOChannel *c, Error **errp)
160 {
161     MultiFDInit_t msg;
162     int ret;
163 
164     ret = qio_channel_read_all(c, (char *)&msg, sizeof(msg), errp);
165     if (ret != 0) {
166         return -1;
167     }
168 
169     msg.magic = be32_to_cpu(msg.magic);
170     msg.version = be32_to_cpu(msg.version);
171 
172     if (msg.magic != MULTIFD_MAGIC) {
173         error_setg(errp, "multifd: received packet magic %x "
174                    "expected %x", msg.magic, MULTIFD_MAGIC);
175         return -1;
176     }
177 
178     if (msg.version != MULTIFD_VERSION) {
179         error_setg(errp, "multifd: received packet version %u "
180                    "expected %u", msg.version, MULTIFD_VERSION);
181         return -1;
182     }
183 
184     if (memcmp(msg.uuid, &qemu_uuid, sizeof(qemu_uuid))) {
185         char *uuid = qemu_uuid_unparse_strdup(&qemu_uuid);
186         char *msg_uuid = qemu_uuid_unparse_strdup((const QemuUUID *)msg.uuid);
187 
188         error_setg(errp, "multifd: received uuid '%s' and expected "
189                    "uuid '%s' for channel %hhd", msg_uuid, uuid, msg.id);
190         g_free(uuid);
191         g_free(msg_uuid);
192         return -1;
193     }
194 
195     if (msg.id > migrate_multifd_channels()) {
196         error_setg(errp, "multifd: received channel id %u is greater than "
197                    "number of channels %u", msg.id, migrate_multifd_channels());
198         return -1;
199     }
200 
201     return msg.id;
202 }
203 
204 void multifd_send_fill_packet(MultiFDSendParams *p)
205 {
206     MultiFDPacket_t *packet = p->packet;
207     uint64_t packet_num;
208     bool sync_packet = p->flags & MULTIFD_FLAG_SYNC;
209 
210     memset(packet, 0, p->packet_len);
211 
212     packet->magic = cpu_to_be32(MULTIFD_MAGIC);
213     packet->version = cpu_to_be32(MULTIFD_VERSION);
214 
215     packet->flags = cpu_to_be32(p->flags);
216     packet->next_packet_size = cpu_to_be32(p->next_packet_size);
217 
218     packet_num = qatomic_fetch_inc(&multifd_send_state->packet_num);
219     packet->packet_num = cpu_to_be64(packet_num);
220 
221     p->packets_sent++;
222 
223     if (!sync_packet) {
224         multifd_ram_fill_packet(p);
225     }
226 
227     trace_multifd_send_fill(p->id, packet_num,
228                             p->flags, p->next_packet_size);
229 }
230 
231 static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp)
232 {
233     const MultiFDPacket_t *packet = p->packet;
234     uint32_t magic = be32_to_cpu(packet->magic);
235     uint32_t version = be32_to_cpu(packet->version);
236     int ret = 0;
237 
238     if (magic != MULTIFD_MAGIC) {
239         error_setg(errp, "multifd: received packet magic %x, expected %x",
240                    magic, MULTIFD_MAGIC);
241         return -1;
242     }
243 
244     if (version != MULTIFD_VERSION) {
245         error_setg(errp, "multifd: received packet version %u, expected %u",
246                    version, MULTIFD_VERSION);
247         return -1;
248     }
249 
250     p->flags = be32_to_cpu(packet->flags);
251     p->next_packet_size = be32_to_cpu(packet->next_packet_size);
252     p->packet_num = be64_to_cpu(packet->packet_num);
253     p->packets_recved++;
254 
255     /* Always unfill, old QEMUs (<9.0) send data along with SYNC */
256     ret = multifd_ram_unfill_packet(p, errp);
257 
258     trace_multifd_recv_unfill(p->id, p->packet_num, p->flags,
259                               p->next_packet_size);
260 
261     return ret;
262 }
263 
264 static bool multifd_send_should_exit(void)
265 {
266     return qatomic_read(&multifd_send_state->exiting);
267 }
268 
269 static bool multifd_recv_should_exit(void)
270 {
271     return qatomic_read(&multifd_recv_state->exiting);
272 }
273 
274 /*
275  * The migration thread can wait on either of the two semaphores.  This
276  * function can be used to kick the main thread out of waiting on either of
277  * them.  Should mostly only be called when something wrong happened with
278  * the current multifd send thread.
279  */
280 static void multifd_send_kick_main(MultiFDSendParams *p)
281 {
282     qemu_sem_post(&p->sem_sync);
283     qemu_sem_post(&multifd_send_state->channels_ready);
284 }
285 
286 /*
287  * multifd_send() works by exchanging the MultiFDSendData object
288  * provided by the caller with an unused MultiFDSendData object from
289  * the next channel that is found to be idle.
290  *
291  * The channel owns the data until it finishes transmitting and the
292  * caller owns the empty object until it fills it with data and calls
293  * this function again. No locking necessary.
294  *
295  * Switching is safe because both the migration thread and the channel
296  * thread have barriers in place to serialize access.
297  *
298  * Returns true if succeed, false otherwise.
299  */
300 bool multifd_send(MultiFDSendData **send_data)
301 {
302     int i;
303     static int next_channel;
304     MultiFDSendParams *p = NULL; /* make happy gcc */
305     MultiFDSendData *tmp;
306 
307     if (multifd_send_should_exit()) {
308         return false;
309     }
310 
311     /* We wait here, until at least one channel is ready */
312     qemu_sem_wait(&multifd_send_state->channels_ready);
313 
314     /*
315      * next_channel can remain from a previous migration that was
316      * using more channels, so ensure it doesn't overflow if the
317      * limit is lower now.
318      */
319     next_channel %= migrate_multifd_channels();
320     for (i = next_channel;; i = (i + 1) % migrate_multifd_channels()) {
321         if (multifd_send_should_exit()) {
322             return false;
323         }
324         p = &multifd_send_state->params[i];
325         /*
326          * Lockless read to p->pending_job is safe, because only multifd
327          * sender thread can clear it.
328          */
329         if (qatomic_read(&p->pending_job) == false) {
330             next_channel = (i + 1) % migrate_multifd_channels();
331             break;
332         }
333     }
334 
335     /*
336      * Make sure we read p->pending_job before all the rest.  Pairs with
337      * qatomic_store_release() in multifd_send_thread().
338      */
339     smp_mb_acquire();
340 
341     assert(multifd_payload_empty(p->data));
342 
343     /*
344      * Swap the pointers. The channel gets the client data for
345      * transferring and the client gets back an unused data slot.
346      */
347     tmp = *send_data;
348     *send_data = p->data;
349     p->data = tmp;
350 
351     /*
352      * Making sure p->data is setup before marking pending_job=true. Pairs
353      * with the qatomic_load_acquire() in multifd_send_thread().
354      */
355     qatomic_store_release(&p->pending_job, true);
356     qemu_sem_post(&p->sem);
357 
358     return true;
359 }
360 
361 /* Multifd send side hit an error; remember it and prepare to quit */
362 static void multifd_send_set_error(Error *err)
363 {
364     /*
365      * We don't want to exit each threads twice.  Depending on where
366      * we get the error, or if there are two independent errors in two
367      * threads at the same time, we can end calling this function
368      * twice.
369      */
370     if (qatomic_xchg(&multifd_send_state->exiting, 1)) {
371         return;
372     }
373 
374     if (err) {
375         MigrationState *s = migrate_get_current();
376         migrate_set_error(s, err);
377         if (s->state == MIGRATION_STATUS_SETUP ||
378             s->state == MIGRATION_STATUS_PRE_SWITCHOVER ||
379             s->state == MIGRATION_STATUS_DEVICE ||
380             s->state == MIGRATION_STATUS_ACTIVE) {
381             migrate_set_state(&s->state, s->state,
382                               MIGRATION_STATUS_FAILED);
383         }
384     }
385 }
386 
387 static void multifd_send_terminate_threads(void)
388 {
389     int i;
390 
391     trace_multifd_send_terminate_threads();
392 
393     /*
394      * Tell everyone we're quitting.  No xchg() needed here; we simply
395      * always set it.
396      */
397     qatomic_set(&multifd_send_state->exiting, 1);
398 
399     /*
400      * Firstly, kick all threads out; no matter whether they are just idle,
401      * or blocked in an IO system call.
402      */
403     for (i = 0; i < migrate_multifd_channels(); i++) {
404         MultiFDSendParams *p = &multifd_send_state->params[i];
405 
406         qemu_sem_post(&p->sem);
407         if (p->c) {
408             qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
409         }
410     }
411 
412     /*
413      * Finally recycle all the threads.
414      */
415     for (i = 0; i < migrate_multifd_channels(); i++) {
416         MultiFDSendParams *p = &multifd_send_state->params[i];
417 
418         if (p->tls_thread_created) {
419             qemu_thread_join(&p->tls_thread);
420         }
421 
422         if (p->thread_created) {
423             qemu_thread_join(&p->thread);
424         }
425     }
426 }
427 
428 static bool multifd_send_cleanup_channel(MultiFDSendParams *p, Error **errp)
429 {
430     if (p->c) {
431         migration_ioc_unregister_yank(p->c);
432         /*
433          * The object_unref() cannot guarantee the fd will always be
434          * released because finalize() of the iochannel is only
435          * triggered on the last reference and it's not guaranteed
436          * that we always hold the last refcount when reaching here.
437          *
438          * Closing the fd explicitly has the benefit that if there is any
439          * registered I/O handler callbacks on such fd, that will get a
440          * POLLNVAL event and will further trigger the cleanup to finally
441          * release the IOC.
442          *
443          * FIXME: It should logically be guaranteed that all multifd
444          * channels have no I/O handler callback registered when reaching
445          * here, because migration thread will wait for all multifd channel
446          * establishments to complete during setup.  Since
447          * migration_cleanup() will be scheduled in main thread too, all
448          * previous callbacks should guarantee to be completed when
449          * reaching here.  See multifd_send_state.channels_created and its
450          * usage.  In the future, we could replace this with an assert
451          * making sure we're the last reference, or simply drop it if above
452          * is more clear to be justified.
453          */
454         qio_channel_close(p->c, &error_abort);
455         object_unref(OBJECT(p->c));
456         p->c = NULL;
457     }
458     qemu_sem_destroy(&p->sem);
459     qemu_sem_destroy(&p->sem_sync);
460     g_free(p->name);
461     p->name = NULL;
462     g_free(p->data);
463     p->data = NULL;
464     p->packet_len = 0;
465     g_free(p->packet);
466     p->packet = NULL;
467     multifd_send_state->ops->send_cleanup(p, errp);
468     assert(!p->iov);
469 
470     return *errp == NULL;
471 }
472 
473 static void multifd_send_cleanup_state(void)
474 {
475     file_cleanup_outgoing_migration();
476     socket_cleanup_outgoing_migration();
477     qemu_sem_destroy(&multifd_send_state->channels_created);
478     qemu_sem_destroy(&multifd_send_state->channels_ready);
479     g_free(multifd_send_state->params);
480     multifd_send_state->params = NULL;
481     g_free(multifd_send_state);
482     multifd_send_state = NULL;
483 }
484 
485 void multifd_send_shutdown(void)
486 {
487     int i;
488 
489     if (!migrate_multifd()) {
490         return;
491     }
492 
493     for (i = 0; i < migrate_multifd_channels(); i++) {
494         MultiFDSendParams *p = &multifd_send_state->params[i];
495 
496         /* thread_created implies the TLS handshake has succeeded */
497         if (p->tls_thread_created && p->thread_created) {
498             Error *local_err = NULL;
499             /*
500              * The destination expects the TLS session to always be
501              * properly terminated. This helps to detect a premature
502              * termination in the middle of the stream.  Note that
503              * older QEMUs always break the connection on the source
504              * and the destination always sees
505              * GNUTLS_E_PREMATURE_TERMINATION.
506              */
507             migration_tls_channel_end(p->c, &local_err);
508 
509             /*
510              * The above can return an error in case the migration has
511              * already failed. If the migration succeeded, errors are
512              * not expected but there's no need to kill the source.
513              */
514             if (local_err && !migration_has_failed(migrate_get_current())) {
515                 warn_report(
516                     "multifd_send_%d: Failed to terminate TLS connection: %s",
517                     p->id, error_get_pretty(local_err));
518                 break;
519             }
520         }
521     }
522 
523     multifd_send_terminate_threads();
524 
525     for (i = 0; i < migrate_multifd_channels(); i++) {
526         MultiFDSendParams *p = &multifd_send_state->params[i];
527         Error *local_err = NULL;
528 
529         if (!multifd_send_cleanup_channel(p, &local_err)) {
530             migrate_set_error(migrate_get_current(), local_err);
531             error_free(local_err);
532         }
533     }
534 
535     multifd_send_cleanup_state();
536 }
537 
538 static int multifd_zero_copy_flush(QIOChannel *c)
539 {
540     int ret;
541     Error *err = NULL;
542 
543     ret = qio_channel_flush(c, &err);
544     if (ret < 0) {
545         error_report_err(err);
546         return -1;
547     }
548     if (ret == 1) {
549         stat64_add(&mig_stats.dirty_sync_missed_zero_copy, 1);
550     }
551 
552     return ret;
553 }
554 
555 int multifd_send_sync_main(MultiFDSyncReq req)
556 {
557     int i;
558     bool flush_zero_copy;
559 
560     assert(req != MULTIFD_SYNC_NONE);
561 
562     flush_zero_copy = migrate_zero_copy_send();
563 
564     for (i = 0; i < migrate_multifd_channels(); i++) {
565         MultiFDSendParams *p = &multifd_send_state->params[i];
566 
567         if (multifd_send_should_exit()) {
568             return -1;
569         }
570 
571         trace_multifd_send_sync_main_signal(p->id);
572 
573         /*
574          * We should be the only user so far, so not possible to be set by
575          * others concurrently.
576          */
577         assert(qatomic_read(&p->pending_sync) == MULTIFD_SYNC_NONE);
578         qatomic_set(&p->pending_sync, req);
579         qemu_sem_post(&p->sem);
580     }
581     for (i = 0; i < migrate_multifd_channels(); i++) {
582         MultiFDSendParams *p = &multifd_send_state->params[i];
583 
584         if (multifd_send_should_exit()) {
585             return -1;
586         }
587 
588         qemu_sem_wait(&multifd_send_state->channels_ready);
589         trace_multifd_send_sync_main_wait(p->id);
590         qemu_sem_wait(&p->sem_sync);
591 
592         if (flush_zero_copy && p->c && (multifd_zero_copy_flush(p->c) < 0)) {
593             return -1;
594         }
595     }
596     trace_multifd_send_sync_main(multifd_send_state->packet_num);
597 
598     return 0;
599 }
600 
601 static void *multifd_send_thread(void *opaque)
602 {
603     MultiFDSendParams *p = opaque;
604     MigrationThread *thread = NULL;
605     Error *local_err = NULL;
606     int ret = 0;
607     bool use_packets = multifd_use_packets();
608 
609     thread = migration_threads_add(p->name, qemu_get_thread_id());
610 
611     trace_multifd_send_thread_start(p->id);
612     rcu_register_thread();
613 
614     if (use_packets) {
615         if (multifd_send_initial_packet(p, &local_err) < 0) {
616             ret = -1;
617             goto out;
618         }
619     }
620 
621     while (true) {
622         qemu_sem_post(&multifd_send_state->channels_ready);
623         qemu_sem_wait(&p->sem);
624 
625         if (multifd_send_should_exit()) {
626             break;
627         }
628 
629         /*
630          * Read pending_job flag before p->data.  Pairs with the
631          * qatomic_store_release() in multifd_send().
632          */
633         if (qatomic_load_acquire(&p->pending_job)) {
634             p->flags = 0;
635             p->iovs_num = 0;
636             assert(!multifd_payload_empty(p->data));
637 
638             ret = multifd_send_state->ops->send_prepare(p, &local_err);
639             if (ret != 0) {
640                 break;
641             }
642 
643             if (migrate_mapped_ram()) {
644                 ret = file_write_ramblock_iov(p->c, p->iov, p->iovs_num,
645                                               &p->data->u.ram, &local_err);
646             } else {
647                 ret = qio_channel_writev_full_all(p->c, p->iov, p->iovs_num,
648                                                   NULL, 0, p->write_flags,
649                                                   &local_err);
650             }
651 
652             if (ret != 0) {
653                 break;
654             }
655 
656             stat64_add(&mig_stats.multifd_bytes,
657                        (uint64_t)p->next_packet_size + p->packet_len);
658 
659             p->next_packet_size = 0;
660             multifd_set_payload_type(p->data, MULTIFD_PAYLOAD_NONE);
661 
662             /*
663              * Making sure p->data is published before saying "we're
664              * free".  Pairs with the smp_mb_acquire() in
665              * multifd_send().
666              */
667             qatomic_store_release(&p->pending_job, false);
668         } else {
669             MultiFDSyncReq req = qatomic_read(&p->pending_sync);
670 
671             /*
672              * If not a normal job, must be a sync request.  Note that
673              * pending_sync is a standalone flag (unlike pending_job), so
674              * it doesn't require explicit memory barriers.
675              */
676             assert(req != MULTIFD_SYNC_NONE);
677 
678             /* Only push the SYNC message if it involves a remote sync */
679             if (req == MULTIFD_SYNC_ALL) {
680                 p->flags = MULTIFD_FLAG_SYNC;
681                 multifd_send_fill_packet(p);
682                 ret = qio_channel_write_all(p->c, (void *)p->packet,
683                                             p->packet_len, &local_err);
684                 if (ret != 0) {
685                     break;
686                 }
687                 /* p->next_packet_size will always be zero for a SYNC packet */
688                 stat64_add(&mig_stats.multifd_bytes, p->packet_len);
689             }
690 
691             qatomic_set(&p->pending_sync, MULTIFD_SYNC_NONE);
692             qemu_sem_post(&p->sem_sync);
693         }
694     }
695 
696 out:
697     if (ret) {
698         assert(local_err);
699         trace_multifd_send_error(p->id);
700         multifd_send_set_error(local_err);
701         multifd_send_kick_main(p);
702         error_free(local_err);
703     }
704 
705     rcu_unregister_thread();
706     migration_threads_remove(thread);
707     trace_multifd_send_thread_end(p->id, p->packets_sent);
708 
709     return NULL;
710 }
711 
712 static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque);
713 
714 typedef struct {
715     MultiFDSendParams *p;
716     QIOChannelTLS *tioc;
717 } MultiFDTLSThreadArgs;
718 
719 static void *multifd_tls_handshake_thread(void *opaque)
720 {
721     MultiFDTLSThreadArgs *args = opaque;
722 
723     qio_channel_tls_handshake(args->tioc,
724                               multifd_new_send_channel_async,
725                               args->p,
726                               NULL,
727                               NULL);
728     g_free(args);
729 
730     return NULL;
731 }
732 
733 static bool multifd_tls_channel_connect(MultiFDSendParams *p,
734                                         QIOChannel *ioc,
735                                         Error **errp)
736 {
737     MigrationState *s = migrate_get_current();
738     const char *hostname = s->hostname;
739     MultiFDTLSThreadArgs *args;
740     QIOChannelTLS *tioc;
741 
742     tioc = migration_tls_client_create(ioc, hostname, errp);
743     if (!tioc) {
744         return false;
745     }
746 
747     /*
748      * Ownership of the socket channel now transfers to the newly
749      * created TLS channel, which has already taken a reference.
750      */
751     object_unref(OBJECT(ioc));
752     trace_multifd_tls_outgoing_handshake_start(ioc, tioc, hostname);
753     qio_channel_set_name(QIO_CHANNEL(tioc), "multifd-tls-outgoing");
754 
755     args = g_new0(MultiFDTLSThreadArgs, 1);
756     args->tioc = tioc;
757     args->p = p;
758 
759     p->tls_thread_created = true;
760     qemu_thread_create(&p->tls_thread, MIGRATION_THREAD_SRC_TLS,
761                        multifd_tls_handshake_thread, args,
762                        QEMU_THREAD_JOINABLE);
763     return true;
764 }
765 
766 void multifd_channel_connect(MultiFDSendParams *p, QIOChannel *ioc)
767 {
768     qio_channel_set_delay(ioc, false);
769 
770     migration_ioc_register_yank(ioc);
771     /* Setup p->c only if the channel is completely setup */
772     p->c = ioc;
773 
774     p->thread_created = true;
775     qemu_thread_create(&p->thread, p->name, multifd_send_thread, p,
776                        QEMU_THREAD_JOINABLE);
777 }
778 
779 /*
780  * When TLS is enabled this function is called once to establish the
781  * TLS connection and a second time after the TLS handshake to create
782  * the multifd channel. Without TLS it goes straight into the channel
783  * creation.
784  */
785 static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque)
786 {
787     MultiFDSendParams *p = opaque;
788     QIOChannel *ioc = QIO_CHANNEL(qio_task_get_source(task));
789     Error *local_err = NULL;
790     bool ret;
791 
792     trace_multifd_new_send_channel_async(p->id);
793 
794     if (qio_task_propagate_error(task, &local_err)) {
795         ret = false;
796         goto out;
797     }
798 
799     trace_multifd_set_outgoing_channel(ioc, object_get_typename(OBJECT(ioc)),
800                                        migrate_get_current()->hostname);
801 
802     if (migrate_channel_requires_tls_upgrade(ioc)) {
803         ret = multifd_tls_channel_connect(p, ioc, &local_err);
804         if (ret) {
805             return;
806         }
807     } else {
808         multifd_channel_connect(p, ioc);
809         ret = true;
810     }
811 
812 out:
813     /*
814      * Here we're not interested whether creation succeeded, only that
815      * it happened at all.
816      */
817     multifd_send_channel_created();
818 
819     if (ret) {
820         return;
821     }
822 
823     trace_multifd_new_send_channel_async_error(p->id, local_err);
824     multifd_send_set_error(local_err);
825     /*
826      * For error cases (TLS or non-TLS), IO channel is always freed here
827      * rather than when cleanup multifd: since p->c is not set, multifd
828      * cleanup code doesn't even know its existence.
829      */
830     object_unref(OBJECT(ioc));
831     error_free(local_err);
832 }
833 
834 static bool multifd_new_send_channel_create(gpointer opaque, Error **errp)
835 {
836     if (!multifd_use_packets()) {
837         return file_send_channel_create(opaque, errp);
838     }
839 
840     socket_send_channel_create(multifd_new_send_channel_async, opaque);
841     return true;
842 }
843 
844 bool multifd_send_setup(void)
845 {
846     MigrationState *s = migrate_get_current();
847     int thread_count, ret = 0;
848     uint32_t page_count = multifd_ram_page_count();
849     bool use_packets = multifd_use_packets();
850     uint8_t i;
851 
852     if (!migrate_multifd()) {
853         return true;
854     }
855 
856     thread_count = migrate_multifd_channels();
857     multifd_send_state = g_malloc0(sizeof(*multifd_send_state));
858     multifd_send_state->params = g_new0(MultiFDSendParams, thread_count);
859     qemu_sem_init(&multifd_send_state->channels_created, 0);
860     qemu_sem_init(&multifd_send_state->channels_ready, 0);
861     qatomic_set(&multifd_send_state->exiting, 0);
862     multifd_send_state->ops = multifd_ops[migrate_multifd_compression()];
863 
864     for (i = 0; i < thread_count; i++) {
865         MultiFDSendParams *p = &multifd_send_state->params[i];
866         Error *local_err = NULL;
867 
868         qemu_sem_init(&p->sem, 0);
869         qemu_sem_init(&p->sem_sync, 0);
870         p->id = i;
871         p->data = multifd_send_data_alloc();
872 
873         if (use_packets) {
874             p->packet_len = sizeof(MultiFDPacket_t)
875                           + sizeof(uint64_t) * page_count;
876             p->packet = g_malloc0(p->packet_len);
877         }
878         p->name = g_strdup_printf(MIGRATION_THREAD_SRC_MULTIFD, i);
879         p->write_flags = 0;
880 
881         if (!multifd_new_send_channel_create(p, &local_err)) {
882             migrate_set_error(s, local_err);
883             ret = -1;
884         }
885     }
886 
887     /*
888      * Wait until channel creation has started for all channels. The
889      * creation can still fail, but no more channels will be created
890      * past this point.
891      */
892     for (i = 0; i < thread_count; i++) {
893         qemu_sem_wait(&multifd_send_state->channels_created);
894     }
895 
896     if (ret) {
897         goto err;
898     }
899 
900     for (i = 0; i < thread_count; i++) {
901         MultiFDSendParams *p = &multifd_send_state->params[i];
902         Error *local_err = NULL;
903 
904         ret = multifd_send_state->ops->send_setup(p, &local_err);
905         if (ret) {
906             migrate_set_error(s, local_err);
907             goto err;
908         }
909         assert(p->iov);
910     }
911 
912     return true;
913 
914 err:
915     migrate_set_state(&s->state, MIGRATION_STATUS_SETUP,
916                       MIGRATION_STATUS_FAILED);
917     return false;
918 }
919 
920 bool multifd_recv(void)
921 {
922     int i;
923     static int next_recv_channel;
924     MultiFDRecvParams *p = NULL;
925     MultiFDRecvData *data = multifd_recv_state->data;
926 
927     /*
928      * next_channel can remain from a previous migration that was
929      * using more channels, so ensure it doesn't overflow if the
930      * limit is lower now.
931      */
932     next_recv_channel %= migrate_multifd_channels();
933     for (i = next_recv_channel;; i = (i + 1) % migrate_multifd_channels()) {
934         if (multifd_recv_should_exit()) {
935             return false;
936         }
937 
938         p = &multifd_recv_state->params[i];
939 
940         if (qatomic_read(&p->pending_job) == false) {
941             next_recv_channel = (i + 1) % migrate_multifd_channels();
942             break;
943         }
944     }
945 
946     /*
947      * Order pending_job read before manipulating p->data below. Pairs
948      * with qatomic_store_release() at multifd_recv_thread().
949      */
950     smp_mb_acquire();
951 
952     assert(!p->data->size);
953     multifd_recv_state->data = p->data;
954     p->data = data;
955 
956     /*
957      * Order p->data update before setting pending_job. Pairs with
958      * qatomic_load_acquire() at multifd_recv_thread().
959      */
960     qatomic_store_release(&p->pending_job, true);
961     qemu_sem_post(&p->sem);
962 
963     return true;
964 }
965 
966 MultiFDRecvData *multifd_get_recv_data(void)
967 {
968     return multifd_recv_state->data;
969 }
970 
971 static void multifd_recv_terminate_threads(Error *err)
972 {
973     int i;
974 
975     trace_multifd_recv_terminate_threads(err != NULL);
976 
977     if (qatomic_xchg(&multifd_recv_state->exiting, 1)) {
978         return;
979     }
980 
981     if (err) {
982         MigrationState *s = migrate_get_current();
983         migrate_set_error(s, err);
984         if (s->state == MIGRATION_STATUS_SETUP ||
985             s->state == MIGRATION_STATUS_ACTIVE) {
986             migrate_set_state(&s->state, s->state,
987                               MIGRATION_STATUS_FAILED);
988         }
989     }
990 
991     for (i = 0; i < migrate_multifd_channels(); i++) {
992         MultiFDRecvParams *p = &multifd_recv_state->params[i];
993 
994         /*
995          * The migration thread and channels interact differently
996          * depending on the presence of packets.
997          */
998         if (multifd_use_packets()) {
999             /*
1000              * The channel receives as long as there are packets. When
1001              * packets end (i.e. MULTIFD_FLAG_SYNC is reached), the
1002              * channel waits for the migration thread to sync. If the
1003              * sync never happens, do it here.
1004              */
1005             qemu_sem_post(&p->sem_sync);
1006         } else {
1007             /*
1008              * The channel waits for the migration thread to give it
1009              * work. When the migration thread runs out of work, it
1010              * releases the channel and waits for any pending work to
1011              * finish. If we reach here (e.g. due to error) before the
1012              * work runs out, release the channel.
1013              */
1014             qemu_sem_post(&p->sem);
1015         }
1016 
1017         /*
1018          * We could arrive here for two reasons:
1019          *  - normal quit, i.e. everything went fine, just finished
1020          *  - error quit: We close the channels so the channel threads
1021          *    finish the qio_channel_read_all_eof()
1022          */
1023         if (p->c) {
1024             qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
1025         }
1026     }
1027 }
1028 
1029 void multifd_recv_shutdown(void)
1030 {
1031     if (migrate_multifd()) {
1032         multifd_recv_terminate_threads(NULL);
1033     }
1034 }
1035 
1036 static void multifd_recv_cleanup_channel(MultiFDRecvParams *p)
1037 {
1038     migration_ioc_unregister_yank(p->c);
1039     object_unref(OBJECT(p->c));
1040     p->c = NULL;
1041     qemu_mutex_destroy(&p->mutex);
1042     qemu_sem_destroy(&p->sem_sync);
1043     qemu_sem_destroy(&p->sem);
1044     g_free(p->data);
1045     p->data = NULL;
1046     g_free(p->name);
1047     p->name = NULL;
1048     p->packet_len = 0;
1049     g_free(p->packet);
1050     p->packet = NULL;
1051     g_free(p->normal);
1052     p->normal = NULL;
1053     g_free(p->zero);
1054     p->zero = NULL;
1055     multifd_recv_state->ops->recv_cleanup(p);
1056 }
1057 
1058 static void multifd_recv_cleanup_state(void)
1059 {
1060     qemu_sem_destroy(&multifd_recv_state->sem_sync);
1061     g_free(multifd_recv_state->params);
1062     multifd_recv_state->params = NULL;
1063     g_free(multifd_recv_state->data);
1064     multifd_recv_state->data = NULL;
1065     g_free(multifd_recv_state);
1066     multifd_recv_state = NULL;
1067 }
1068 
1069 void multifd_recv_cleanup(void)
1070 {
1071     int i;
1072 
1073     if (!migrate_multifd()) {
1074         return;
1075     }
1076     multifd_recv_terminate_threads(NULL);
1077     for (i = 0; i < migrate_multifd_channels(); i++) {
1078         MultiFDRecvParams *p = &multifd_recv_state->params[i];
1079 
1080         if (p->thread_created) {
1081             qemu_thread_join(&p->thread);
1082         }
1083     }
1084     for (i = 0; i < migrate_multifd_channels(); i++) {
1085         multifd_recv_cleanup_channel(&multifd_recv_state->params[i]);
1086     }
1087     multifd_recv_cleanup_state();
1088 }
1089 
1090 void multifd_recv_sync_main(void)
1091 {
1092     int thread_count = migrate_multifd_channels();
1093     bool file_based = !multifd_use_packets();
1094     int i;
1095 
1096     if (!migrate_multifd()) {
1097         return;
1098     }
1099 
1100     /*
1101      * File-based channels don't use packets and therefore need to
1102      * wait for more work. Release them to start the sync.
1103      */
1104     if (file_based) {
1105         for (i = 0; i < thread_count; i++) {
1106             MultiFDRecvParams *p = &multifd_recv_state->params[i];
1107 
1108             trace_multifd_recv_sync_main_signal(p->id);
1109             qemu_sem_post(&p->sem);
1110         }
1111     }
1112 
1113     /*
1114      * Initiate the synchronization by waiting for all channels.
1115      *
1116      * For socket-based migration this means each channel has received
1117      * the SYNC packet on the stream.
1118      *
1119      * For file-based migration this means each channel is done with
1120      * the work (pending_job=false).
1121      */
1122     for (i = 0; i < thread_count; i++) {
1123         trace_multifd_recv_sync_main_wait(i);
1124         qemu_sem_wait(&multifd_recv_state->sem_sync);
1125     }
1126 
1127     if (file_based) {
1128         /*
1129          * For file-based loading is done in one iteration. We're
1130          * done.
1131          */
1132         return;
1133     }
1134 
1135     /*
1136      * Sync done. Release the channels for the next iteration.
1137      */
1138     for (i = 0; i < thread_count; i++) {
1139         MultiFDRecvParams *p = &multifd_recv_state->params[i];
1140 
1141         WITH_QEMU_LOCK_GUARD(&p->mutex) {
1142             if (multifd_recv_state->packet_num < p->packet_num) {
1143                 multifd_recv_state->packet_num = p->packet_num;
1144             }
1145         }
1146         trace_multifd_recv_sync_main_signal(p->id);
1147         qemu_sem_post(&p->sem_sync);
1148     }
1149     trace_multifd_recv_sync_main(multifd_recv_state->packet_num);
1150 }
1151 
1152 static void *multifd_recv_thread(void *opaque)
1153 {
1154     MigrationState *s = migrate_get_current();
1155     MultiFDRecvParams *p = opaque;
1156     Error *local_err = NULL;
1157     bool use_packets = multifd_use_packets();
1158     int ret;
1159 
1160     trace_multifd_recv_thread_start(p->id);
1161     rcu_register_thread();
1162 
1163     if (!s->multifd_clean_tls_termination) {
1164         p->read_flags = QIO_CHANNEL_READ_FLAG_RELAXED_EOF;
1165     }
1166 
1167     while (true) {
1168         uint32_t flags = 0;
1169         bool has_data = false;
1170         p->normal_num = 0;
1171 
1172         if (use_packets) {
1173             struct iovec iov = {
1174                 .iov_base = (void *)p->packet,
1175                 .iov_len = p->packet_len
1176             };
1177 
1178             if (multifd_recv_should_exit()) {
1179                 break;
1180             }
1181 
1182             ret = qio_channel_readv_full_all_eof(p->c, &iov, 1, NULL, NULL,
1183                                                  p->read_flags, &local_err);
1184             if (!ret) {
1185                 /* EOF */
1186                 assert(!local_err);
1187                 break;
1188             }
1189 
1190             if (ret == -1) {
1191                 break;
1192             }
1193 
1194             qemu_mutex_lock(&p->mutex);
1195             ret = multifd_recv_unfill_packet(p, &local_err);
1196             if (ret) {
1197                 qemu_mutex_unlock(&p->mutex);
1198                 break;
1199             }
1200 
1201             flags = p->flags;
1202             /* recv methods don't know how to handle the SYNC flag */
1203             p->flags &= ~MULTIFD_FLAG_SYNC;
1204 
1205             /*
1206              * Even if it's a SYNC packet, this needs to be set
1207              * because older QEMUs (<9.0) still send data along with
1208              * the SYNC packet.
1209              */
1210             has_data = p->normal_num || p->zero_num;
1211             qemu_mutex_unlock(&p->mutex);
1212         } else {
1213             /*
1214              * No packets, so we need to wait for the vmstate code to
1215              * give us work.
1216              */
1217             qemu_sem_wait(&p->sem);
1218 
1219             if (multifd_recv_should_exit()) {
1220                 break;
1221             }
1222 
1223             /* pairs with qatomic_store_release() at multifd_recv() */
1224             if (!qatomic_load_acquire(&p->pending_job)) {
1225                 /*
1226                  * Migration thread did not send work, this is
1227                  * equivalent to pending_sync on the sending
1228                  * side. Post sem_sync to notify we reached this
1229                  * point.
1230                  */
1231                 qemu_sem_post(&multifd_recv_state->sem_sync);
1232                 continue;
1233             }
1234 
1235             has_data = !!p->data->size;
1236         }
1237 
1238         if (has_data) {
1239             ret = multifd_recv_state->ops->recv(p, &local_err);
1240             if (ret != 0) {
1241                 break;
1242             }
1243         }
1244 
1245         if (use_packets) {
1246             if (flags & MULTIFD_FLAG_SYNC) {
1247                 qemu_sem_post(&multifd_recv_state->sem_sync);
1248                 qemu_sem_wait(&p->sem_sync);
1249             }
1250         } else {
1251             p->data->size = 0;
1252             /*
1253              * Order data->size update before clearing
1254              * pending_job. Pairs with smp_mb_acquire() at
1255              * multifd_recv().
1256              */
1257             qatomic_store_release(&p->pending_job, false);
1258         }
1259     }
1260 
1261     if (local_err) {
1262         multifd_recv_terminate_threads(local_err);
1263         error_free(local_err);
1264     }
1265 
1266     rcu_unregister_thread();
1267     trace_multifd_recv_thread_end(p->id, p->packets_recved);
1268 
1269     return NULL;
1270 }
1271 
1272 int multifd_recv_setup(Error **errp)
1273 {
1274     int thread_count;
1275     uint32_t page_count = multifd_ram_page_count();
1276     bool use_packets = multifd_use_packets();
1277     uint8_t i;
1278 
1279     /*
1280      * Return successfully if multiFD recv state is already initialised
1281      * or multiFD is not enabled.
1282      */
1283     if (multifd_recv_state || !migrate_multifd()) {
1284         return 0;
1285     }
1286 
1287     thread_count = migrate_multifd_channels();
1288     multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state));
1289     multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count);
1290 
1291     multifd_recv_state->data = g_new0(MultiFDRecvData, 1);
1292     multifd_recv_state->data->size = 0;
1293 
1294     qatomic_set(&multifd_recv_state->count, 0);
1295     qatomic_set(&multifd_recv_state->exiting, 0);
1296     qemu_sem_init(&multifd_recv_state->sem_sync, 0);
1297     multifd_recv_state->ops = multifd_ops[migrate_multifd_compression()];
1298 
1299     for (i = 0; i < thread_count; i++) {
1300         MultiFDRecvParams *p = &multifd_recv_state->params[i];
1301 
1302         qemu_mutex_init(&p->mutex);
1303         qemu_sem_init(&p->sem_sync, 0);
1304         qemu_sem_init(&p->sem, 0);
1305         p->pending_job = false;
1306         p->id = i;
1307 
1308         p->data = g_new0(MultiFDRecvData, 1);
1309         p->data->size = 0;
1310 
1311         if (use_packets) {
1312             p->packet_len = sizeof(MultiFDPacket_t)
1313                 + sizeof(uint64_t) * page_count;
1314             p->packet = g_malloc0(p->packet_len);
1315         }
1316         p->name = g_strdup_printf(MIGRATION_THREAD_DST_MULTIFD, i);
1317         p->normal = g_new0(ram_addr_t, page_count);
1318         p->zero = g_new0(ram_addr_t, page_count);
1319     }
1320 
1321     for (i = 0; i < thread_count; i++) {
1322         MultiFDRecvParams *p = &multifd_recv_state->params[i];
1323         int ret;
1324 
1325         ret = multifd_recv_state->ops->recv_setup(p, errp);
1326         if (ret) {
1327             return ret;
1328         }
1329     }
1330     return 0;
1331 }
1332 
1333 bool multifd_recv_all_channels_created(void)
1334 {
1335     int thread_count = migrate_multifd_channels();
1336 
1337     if (!migrate_multifd()) {
1338         return true;
1339     }
1340 
1341     if (!multifd_recv_state) {
1342         /* Called before any connections created */
1343         return false;
1344     }
1345 
1346     return thread_count == qatomic_read(&multifd_recv_state->count);
1347 }
1348 
1349 /*
1350  * Try to receive all multifd channels to get ready for the migration.
1351  * Sets @errp when failing to receive the current channel.
1352  */
1353 void multifd_recv_new_channel(QIOChannel *ioc, Error **errp)
1354 {
1355     MultiFDRecvParams *p;
1356     Error *local_err = NULL;
1357     bool use_packets = multifd_use_packets();
1358     int id;
1359 
1360     if (use_packets) {
1361         id = multifd_recv_initial_packet(ioc, &local_err);
1362         if (id < 0) {
1363             multifd_recv_terminate_threads(local_err);
1364             error_propagate_prepend(errp, local_err,
1365                                     "failed to receive packet"
1366                                     " via multifd channel %d: ",
1367                                     qatomic_read(&multifd_recv_state->count));
1368             return;
1369         }
1370         trace_multifd_recv_new_channel(id);
1371     } else {
1372         id = qatomic_read(&multifd_recv_state->count);
1373     }
1374 
1375     p = &multifd_recv_state->params[id];
1376     if (p->c != NULL) {
1377         error_setg(&local_err, "multifd: received id '%d' already setup'",
1378                    id);
1379         multifd_recv_terminate_threads(local_err);
1380         error_propagate(errp, local_err);
1381         return;
1382     }
1383     p->c = ioc;
1384     object_ref(OBJECT(ioc));
1385 
1386     p->thread_created = true;
1387     qemu_thread_create(&p->thread, p->name, multifd_recv_thread, p,
1388                        QEMU_THREAD_JOINABLE);
1389     qatomic_inc(&multifd_recv_state->count);
1390 }
1391