xref: /qemu/migration/multifd.c (revision c6e1f60cc73c787317316bb2956f9a95a5daee15)
1 /*
2  * Multifd common code
3  *
4  * Copyright (c) 2019-2020 Red Hat Inc
5  *
6  * Authors:
7  *  Juan Quintela <quintela@redhat.com>
8  *
9  * This work is licensed under the terms of the GNU GPL, version 2 or later.
10  * See the COPYING file in the top-level directory.
11  */
12 
13 #include "qemu/osdep.h"
14 #include "qemu/cutils.h"
15 #include "qemu/iov.h"
16 #include "qemu/rcu.h"
17 #include "exec/target_page.h"
18 #include "system/system.h"
19 #include "system/ramblock.h"
20 #include "qemu/error-report.h"
21 #include "qapi/error.h"
22 #include "file.h"
23 #include "migration/misc.h"
24 #include "migration.h"
25 #include "migration-stats.h"
26 #include "savevm.h"
27 #include "socket.h"
28 #include "tls.h"
29 #include "qemu-file.h"
30 #include "trace.h"
31 #include "multifd.h"
32 #include "threadinfo.h"
33 #include "options.h"
34 #include "qemu/yank.h"
35 #include "io/channel-file.h"
36 #include "io/channel-socket.h"
37 #include "yank_functions.h"
38 
39 typedef struct {
40     uint32_t magic;
41     uint32_t version;
42     unsigned char uuid[16]; /* QemuUUID */
43     uint8_t id;
44     uint8_t unused1[7];     /* Reserved for future use */
45     uint64_t unused2[4];    /* Reserved for future use */
46 } __attribute__((packed)) MultiFDInit_t;
47 
48 struct {
49     MultiFDSendParams *params;
50 
51     /* multifd_send() body is not thread safe, needs serialization */
52     QemuMutex multifd_send_mutex;
53 
54     /*
55      * Global number of generated multifd packets.
56      *
57      * Note that we used 'uintptr_t' because it'll naturally support atomic
58      * operations on both 32bit / 64 bits hosts.  It means on 32bit systems
59      * multifd will overflow the packet_num easier, but that should be
60      * fine.
61      *
62      * Another option is to use QEMU's Stat64 then it'll be 64 bits on all
63      * hosts, however so far it does not support atomic fetch_add() yet.
64      * Make it easy for now.
65      */
66     uintptr_t packet_num;
67     /*
68      * Synchronization point past which no more channels will be
69      * created.
70      */
71     QemuSemaphore channels_created;
72     /* send channels ready */
73     QemuSemaphore channels_ready;
74     /*
75      * Have we already run terminate threads.  There is a race when it
76      * happens that we got one error while we are exiting.
77      * We will use atomic operations.  Only valid values are 0 and 1.
78      */
79     int exiting;
80     /* multifd ops */
81     const MultiFDMethods *ops;
82 } *multifd_send_state;
83 
84 struct {
85     MultiFDRecvParams *params;
86     MultiFDRecvData *data;
87     /* number of created threads */
88     int count;
89     /*
90      * This is always posted by the recv threads, the migration thread
91      * uses it to wait for recv threads to finish assigned tasks.
92      */
93     QemuSemaphore sem_sync;
94     /* global number of generated multifd packets */
95     uint64_t packet_num;
96     int exiting;
97     /* multifd ops */
98     const MultiFDMethods *ops;
99 } *multifd_recv_state;
100 
multifd_send_data_alloc(void)101 MultiFDSendData *multifd_send_data_alloc(void)
102 {
103     MultiFDSendData *new = g_new0(MultiFDSendData, 1);
104 
105     multifd_ram_payload_alloc(&new->u.ram);
106     /* Device state allocates its payload on-demand */
107 
108     return new;
109 }
110 
multifd_send_data_clear(MultiFDSendData * data)111 void multifd_send_data_clear(MultiFDSendData *data)
112 {
113     if (multifd_payload_empty(data)) {
114         return;
115     }
116 
117     switch (data->type) {
118     case MULTIFD_PAYLOAD_DEVICE_STATE:
119         multifd_send_data_clear_device_state(&data->u.device_state);
120         break;
121     default:
122         /* Nothing to do */
123         break;
124     }
125 
126     data->type = MULTIFD_PAYLOAD_NONE;
127 }
128 
multifd_send_data_free(MultiFDSendData * data)129 void multifd_send_data_free(MultiFDSendData *data)
130 {
131     if (!data) {
132         return;
133     }
134 
135     /* This also free's device state payload */
136     multifd_send_data_clear(data);
137 
138     multifd_ram_payload_free(&data->u.ram);
139 
140     g_free(data);
141 }
142 
multifd_use_packets(void)143 static bool multifd_use_packets(void)
144 {
145     return !migrate_mapped_ram();
146 }
147 
multifd_send_channel_created(void)148 void multifd_send_channel_created(void)
149 {
150     qemu_sem_post(&multifd_send_state->channels_created);
151 }
152 
153 static const MultiFDMethods *multifd_ops[MULTIFD_COMPRESSION__MAX] = {};
154 
multifd_register_ops(int method,const MultiFDMethods * ops)155 void multifd_register_ops(int method, const MultiFDMethods *ops)
156 {
157     assert(0 <= method && method < MULTIFD_COMPRESSION__MAX);
158     assert(!multifd_ops[method]);
159     multifd_ops[method] = ops;
160 }
161 
multifd_send_initial_packet(MultiFDSendParams * p,Error ** errp)162 static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp)
163 {
164     MultiFDInit_t msg = {};
165     size_t size = sizeof(msg);
166     int ret;
167 
168     msg.magic = cpu_to_be32(MULTIFD_MAGIC);
169     msg.version = cpu_to_be32(MULTIFD_VERSION);
170     msg.id = p->id;
171     memcpy(msg.uuid, &qemu_uuid.data, sizeof(msg.uuid));
172 
173     ret = qio_channel_write_all(p->c, (char *)&msg, size, errp);
174     if (ret != 0) {
175         return -1;
176     }
177     stat64_add(&mig_stats.multifd_bytes, size);
178     return 0;
179 }
180 
multifd_recv_initial_packet(QIOChannel * c,Error ** errp)181 static int multifd_recv_initial_packet(QIOChannel *c, Error **errp)
182 {
183     MultiFDInit_t msg;
184     int ret;
185 
186     ret = qio_channel_read_all(c, (char *)&msg, sizeof(msg), errp);
187     if (ret != 0) {
188         return -1;
189     }
190 
191     msg.magic = be32_to_cpu(msg.magic);
192     msg.version = be32_to_cpu(msg.version);
193 
194     if (msg.magic != MULTIFD_MAGIC) {
195         error_setg(errp, "multifd: received packet magic %x "
196                    "expected %x", msg.magic, MULTIFD_MAGIC);
197         return -1;
198     }
199 
200     if (msg.version != MULTIFD_VERSION) {
201         error_setg(errp, "multifd: received packet version %u "
202                    "expected %u", msg.version, MULTIFD_VERSION);
203         return -1;
204     }
205 
206     if (memcmp(msg.uuid, &qemu_uuid, sizeof(qemu_uuid))) {
207         char *uuid = qemu_uuid_unparse_strdup(&qemu_uuid);
208         char *msg_uuid = qemu_uuid_unparse_strdup((const QemuUUID *)msg.uuid);
209 
210         error_setg(errp, "multifd: received uuid '%s' and expected "
211                    "uuid '%s' for channel %hhd", msg_uuid, uuid, msg.id);
212         g_free(uuid);
213         g_free(msg_uuid);
214         return -1;
215     }
216 
217     if (msg.id > migrate_multifd_channels()) {
218         error_setg(errp, "multifd: received channel id %u is greater than "
219                    "number of channels %u", msg.id, migrate_multifd_channels());
220         return -1;
221     }
222 
223     return msg.id;
224 }
225 
226 /* Fills a RAM multifd packet */
multifd_send_fill_packet(MultiFDSendParams * p)227 void multifd_send_fill_packet(MultiFDSendParams *p)
228 {
229     MultiFDPacket_t *packet = p->packet;
230     uint64_t packet_num;
231     bool sync_packet = p->flags & MULTIFD_FLAG_SYNC;
232 
233     memset(packet, 0, p->packet_len);
234 
235     packet->hdr.magic = cpu_to_be32(MULTIFD_MAGIC);
236     packet->hdr.version = cpu_to_be32(MULTIFD_VERSION);
237 
238     packet->hdr.flags = cpu_to_be32(p->flags);
239     packet->next_packet_size = cpu_to_be32(p->next_packet_size);
240 
241     packet_num = qatomic_fetch_inc(&multifd_send_state->packet_num);
242     packet->packet_num = cpu_to_be64(packet_num);
243 
244     p->packets_sent++;
245 
246     if (!sync_packet) {
247         multifd_ram_fill_packet(p);
248     }
249 
250     trace_multifd_send_fill(p->id, packet_num,
251                             p->flags, p->next_packet_size);
252 }
253 
multifd_recv_unfill_packet_header(MultiFDRecvParams * p,const MultiFDPacketHdr_t * hdr,Error ** errp)254 static int multifd_recv_unfill_packet_header(MultiFDRecvParams *p,
255                                              const MultiFDPacketHdr_t *hdr,
256                                              Error **errp)
257 {
258     uint32_t magic = be32_to_cpu(hdr->magic);
259     uint32_t version = be32_to_cpu(hdr->version);
260 
261     if (magic != MULTIFD_MAGIC) {
262         error_setg(errp, "multifd: received packet magic %x, expected %x",
263                    magic, MULTIFD_MAGIC);
264         return -1;
265     }
266 
267     if (version != MULTIFD_VERSION) {
268         error_setg(errp, "multifd: received packet version %u, expected %u",
269                    version, MULTIFD_VERSION);
270         return -1;
271     }
272 
273     p->flags = be32_to_cpu(hdr->flags);
274 
275     return 0;
276 }
277 
multifd_recv_unfill_packet_device_state(MultiFDRecvParams * p,Error ** errp)278 static int multifd_recv_unfill_packet_device_state(MultiFDRecvParams *p,
279                                                    Error **errp)
280 {
281     MultiFDPacketDeviceState_t *packet = p->packet_dev_state;
282 
283     packet->instance_id = be32_to_cpu(packet->instance_id);
284     p->next_packet_size = be32_to_cpu(packet->next_packet_size);
285 
286     return 0;
287 }
288 
multifd_recv_unfill_packet_ram(MultiFDRecvParams * p,Error ** errp)289 static int multifd_recv_unfill_packet_ram(MultiFDRecvParams *p, Error **errp)
290 {
291     const MultiFDPacket_t *packet = p->packet;
292     int ret = 0;
293 
294     p->next_packet_size = be32_to_cpu(packet->next_packet_size);
295     p->packet_num = be64_to_cpu(packet->packet_num);
296 
297     /* Always unfill, old QEMUs (<9.0) send data along with SYNC */
298     ret = multifd_ram_unfill_packet(p, errp);
299 
300     trace_multifd_recv_unfill(p->id, p->packet_num, p->flags,
301                               p->next_packet_size);
302 
303     return ret;
304 }
305 
multifd_recv_unfill_packet(MultiFDRecvParams * p,Error ** errp)306 static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp)
307 {
308     p->packets_recved++;
309 
310     if (p->flags & MULTIFD_FLAG_DEVICE_STATE) {
311         return multifd_recv_unfill_packet_device_state(p, errp);
312     }
313 
314     return multifd_recv_unfill_packet_ram(p, errp);
315 }
316 
multifd_send_should_exit(void)317 static bool multifd_send_should_exit(void)
318 {
319     return qatomic_read(&multifd_send_state->exiting);
320 }
321 
multifd_recv_should_exit(void)322 static bool multifd_recv_should_exit(void)
323 {
324     return qatomic_read(&multifd_recv_state->exiting);
325 }
326 
327 /*
328  * The migration thread can wait on either of the two semaphores.  This
329  * function can be used to kick the main thread out of waiting on either of
330  * them.  Should mostly only be called when something wrong happened with
331  * the current multifd send thread.
332  */
multifd_send_kick_main(MultiFDSendParams * p)333 static void multifd_send_kick_main(MultiFDSendParams *p)
334 {
335     qemu_sem_post(&p->sem_sync);
336     qemu_sem_post(&multifd_send_state->channels_ready);
337 }
338 
339 /*
340  * multifd_send() works by exchanging the MultiFDSendData object
341  * provided by the caller with an unused MultiFDSendData object from
342  * the next channel that is found to be idle.
343  *
344  * The channel owns the data until it finishes transmitting and the
345  * caller owns the empty object until it fills it with data and calls
346  * this function again. No locking necessary.
347  *
348  * Switching is safe because both the migration thread and the channel
349  * thread have barriers in place to serialize access.
350  *
351  * Returns true if succeed, false otherwise.
352  */
multifd_send(MultiFDSendData ** send_data)353 bool multifd_send(MultiFDSendData **send_data)
354 {
355     int i;
356     static int next_channel;
357     MultiFDSendParams *p = NULL; /* make happy gcc */
358     MultiFDSendData *tmp;
359 
360     if (multifd_send_should_exit()) {
361         return false;
362     }
363 
364     QEMU_LOCK_GUARD(&multifd_send_state->multifd_send_mutex);
365 
366     /* We wait here, until at least one channel is ready */
367     qemu_sem_wait(&multifd_send_state->channels_ready);
368 
369     /*
370      * next_channel can remain from a previous migration that was
371      * using more channels, so ensure it doesn't overflow if the
372      * limit is lower now.
373      */
374     next_channel %= migrate_multifd_channels();
375     for (i = next_channel;; i = (i + 1) % migrate_multifd_channels()) {
376         if (multifd_send_should_exit()) {
377             return false;
378         }
379         p = &multifd_send_state->params[i];
380         /*
381          * Lockless read to p->pending_job is safe, because only multifd
382          * sender thread can clear it.
383          */
384         if (qatomic_read(&p->pending_job) == false) {
385             next_channel = (i + 1) % migrate_multifd_channels();
386             break;
387         }
388     }
389 
390     /*
391      * Make sure we read p->pending_job before all the rest.  Pairs with
392      * qatomic_store_release() in multifd_send_thread().
393      */
394     smp_mb_acquire();
395 
396     assert(multifd_payload_empty(p->data));
397 
398     /*
399      * Swap the pointers. The channel gets the client data for
400      * transferring and the client gets back an unused data slot.
401      */
402     tmp = *send_data;
403     *send_data = p->data;
404     p->data = tmp;
405 
406     /*
407      * Making sure p->data is setup before marking pending_job=true. Pairs
408      * with the qatomic_load_acquire() in multifd_send_thread().
409      */
410     qatomic_store_release(&p->pending_job, true);
411     qemu_sem_post(&p->sem);
412 
413     return true;
414 }
415 
416 /* Multifd send side hit an error; remember it and prepare to quit */
multifd_send_set_error(Error * err)417 static void multifd_send_set_error(Error *err)
418 {
419     /*
420      * We don't want to exit each threads twice.  Depending on where
421      * we get the error, or if there are two independent errors in two
422      * threads at the same time, we can end calling this function
423      * twice.
424      */
425     if (qatomic_xchg(&multifd_send_state->exiting, 1)) {
426         return;
427     }
428 
429     if (err) {
430         MigrationState *s = migrate_get_current();
431         migrate_set_error(s, err);
432         if (s->state == MIGRATION_STATUS_SETUP ||
433             s->state == MIGRATION_STATUS_PRE_SWITCHOVER ||
434             s->state == MIGRATION_STATUS_DEVICE ||
435             s->state == MIGRATION_STATUS_ACTIVE) {
436             migrate_set_state(&s->state, s->state,
437                               MIGRATION_STATUS_FAILED);
438         }
439     }
440 }
441 
multifd_send_terminate_threads(void)442 static void multifd_send_terminate_threads(void)
443 {
444     int i;
445 
446     trace_multifd_send_terminate_threads();
447 
448     /*
449      * Tell everyone we're quitting.  No xchg() needed here; we simply
450      * always set it.
451      */
452     qatomic_set(&multifd_send_state->exiting, 1);
453 
454     /*
455      * Firstly, kick all threads out; no matter whether they are just idle,
456      * or blocked in an IO system call.
457      */
458     for (i = 0; i < migrate_multifd_channels(); i++) {
459         MultiFDSendParams *p = &multifd_send_state->params[i];
460 
461         qemu_sem_post(&p->sem);
462         if (p->c) {
463             qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
464         }
465     }
466 
467     /*
468      * Finally recycle all the threads.
469      */
470     for (i = 0; i < migrate_multifd_channels(); i++) {
471         MultiFDSendParams *p = &multifd_send_state->params[i];
472 
473         if (p->tls_thread_created) {
474             qemu_thread_join(&p->tls_thread);
475         }
476 
477         if (p->thread_created) {
478             qemu_thread_join(&p->thread);
479         }
480     }
481 }
482 
multifd_send_cleanup_channel(MultiFDSendParams * p,Error ** errp)483 static bool multifd_send_cleanup_channel(MultiFDSendParams *p, Error **errp)
484 {
485     if (p->c) {
486         migration_ioc_unregister_yank(p->c);
487         /*
488          * The object_unref() cannot guarantee the fd will always be
489          * released because finalize() of the iochannel is only
490          * triggered on the last reference and it's not guaranteed
491          * that we always hold the last refcount when reaching here.
492          *
493          * Closing the fd explicitly has the benefit that if there is any
494          * registered I/O handler callbacks on such fd, that will get a
495          * POLLNVAL event and will further trigger the cleanup to finally
496          * release the IOC.
497          *
498          * FIXME: It should logically be guaranteed that all multifd
499          * channels have no I/O handler callback registered when reaching
500          * here, because migration thread will wait for all multifd channel
501          * establishments to complete during setup.  Since
502          * migration_cleanup() will be scheduled in main thread too, all
503          * previous callbacks should guarantee to be completed when
504          * reaching here.  See multifd_send_state.channels_created and its
505          * usage.  In the future, we could replace this with an assert
506          * making sure we're the last reference, or simply drop it if above
507          * is more clear to be justified.
508          */
509         qio_channel_close(p->c, &error_abort);
510         object_unref(OBJECT(p->c));
511         p->c = NULL;
512     }
513     qemu_sem_destroy(&p->sem);
514     qemu_sem_destroy(&p->sem_sync);
515     g_free(p->name);
516     p->name = NULL;
517     g_clear_pointer(&p->data, multifd_send_data_free);
518     p->packet_len = 0;
519     g_clear_pointer(&p->packet_device_state, g_free);
520     g_free(p->packet);
521     p->packet = NULL;
522     multifd_send_state->ops->send_cleanup(p, errp);
523     assert(!p->iov);
524 
525     return *errp == NULL;
526 }
527 
multifd_send_cleanup_state(void)528 static void multifd_send_cleanup_state(void)
529 {
530     file_cleanup_outgoing_migration();
531     socket_cleanup_outgoing_migration();
532     multifd_device_state_send_cleanup();
533     qemu_sem_destroy(&multifd_send_state->channels_created);
534     qemu_sem_destroy(&multifd_send_state->channels_ready);
535     qemu_mutex_destroy(&multifd_send_state->multifd_send_mutex);
536     g_free(multifd_send_state->params);
537     multifd_send_state->params = NULL;
538     g_free(multifd_send_state);
539     multifd_send_state = NULL;
540 }
541 
multifd_send_shutdown(void)542 void multifd_send_shutdown(void)
543 {
544     int i;
545 
546     if (!migrate_multifd()) {
547         return;
548     }
549 
550     for (i = 0; i < migrate_multifd_channels(); i++) {
551         MultiFDSendParams *p = &multifd_send_state->params[i];
552 
553         /* thread_created implies the TLS handshake has succeeded */
554         if (p->tls_thread_created && p->thread_created) {
555             Error *local_err = NULL;
556             /*
557              * The destination expects the TLS session to always be
558              * properly terminated. This helps to detect a premature
559              * termination in the middle of the stream.  Note that
560              * older QEMUs always break the connection on the source
561              * and the destination always sees
562              * GNUTLS_E_PREMATURE_TERMINATION.
563              */
564             migration_tls_channel_end(p->c, &local_err);
565 
566             /*
567              * The above can return an error in case the migration has
568              * already failed. If the migration succeeded, errors are
569              * not expected but there's no need to kill the source.
570              */
571             if (local_err && !migration_has_failed(migrate_get_current())) {
572                 warn_report(
573                     "multifd_send_%d: Failed to terminate TLS connection: %s",
574                     p->id, error_get_pretty(local_err));
575                 break;
576             }
577         }
578     }
579 
580     multifd_send_terminate_threads();
581 
582     for (i = 0; i < migrate_multifd_channels(); i++) {
583         MultiFDSendParams *p = &multifd_send_state->params[i];
584         Error *local_err = NULL;
585 
586         if (!multifd_send_cleanup_channel(p, &local_err)) {
587             migrate_set_error(migrate_get_current(), local_err);
588             error_free(local_err);
589         }
590     }
591 
592     multifd_send_cleanup_state();
593 }
594 
multifd_zero_copy_flush(QIOChannel * c)595 static int multifd_zero_copy_flush(QIOChannel *c)
596 {
597     int ret;
598     Error *err = NULL;
599 
600     ret = qio_channel_flush(c, &err);
601     if (ret < 0) {
602         error_report_err(err);
603         return -1;
604     }
605     if (ret == 1) {
606         stat64_add(&mig_stats.dirty_sync_missed_zero_copy, 1);
607     }
608 
609     return ret;
610 }
611 
multifd_send_sync_main(MultiFDSyncReq req)612 int multifd_send_sync_main(MultiFDSyncReq req)
613 {
614     int i;
615     bool flush_zero_copy;
616 
617     assert(req != MULTIFD_SYNC_NONE);
618 
619     flush_zero_copy = migrate_zero_copy_send();
620 
621     for (i = 0; i < migrate_multifd_channels(); i++) {
622         MultiFDSendParams *p = &multifd_send_state->params[i];
623 
624         if (multifd_send_should_exit()) {
625             return -1;
626         }
627 
628         trace_multifd_send_sync_main_signal(p->id);
629 
630         /*
631          * We should be the only user so far, so not possible to be set by
632          * others concurrently.
633          */
634         assert(qatomic_read(&p->pending_sync) == MULTIFD_SYNC_NONE);
635         qatomic_set(&p->pending_sync, req);
636         qemu_sem_post(&p->sem);
637     }
638     for (i = 0; i < migrate_multifd_channels(); i++) {
639         MultiFDSendParams *p = &multifd_send_state->params[i];
640 
641         if (multifd_send_should_exit()) {
642             return -1;
643         }
644 
645         qemu_sem_wait(&multifd_send_state->channels_ready);
646         trace_multifd_send_sync_main_wait(p->id);
647         qemu_sem_wait(&p->sem_sync);
648 
649         if (flush_zero_copy && p->c && (multifd_zero_copy_flush(p->c) < 0)) {
650             return -1;
651         }
652     }
653     trace_multifd_send_sync_main(multifd_send_state->packet_num);
654 
655     return 0;
656 }
657 
multifd_send_thread(void * opaque)658 static void *multifd_send_thread(void *opaque)
659 {
660     MultiFDSendParams *p = opaque;
661     MigrationThread *thread = NULL;
662     Error *local_err = NULL;
663     int ret = 0;
664     bool use_packets = multifd_use_packets();
665 
666     thread = migration_threads_add(p->name, qemu_get_thread_id());
667 
668     trace_multifd_send_thread_start(p->id);
669     rcu_register_thread();
670 
671     if (use_packets) {
672         if (multifd_send_initial_packet(p, &local_err) < 0) {
673             ret = -1;
674             goto out;
675         }
676     }
677 
678     while (true) {
679         qemu_sem_post(&multifd_send_state->channels_ready);
680         qemu_sem_wait(&p->sem);
681 
682         if (multifd_send_should_exit()) {
683             break;
684         }
685 
686         /*
687          * Read pending_job flag before p->data.  Pairs with the
688          * qatomic_store_release() in multifd_send().
689          */
690         if (qatomic_load_acquire(&p->pending_job)) {
691             bool is_device_state = multifd_payload_device_state(p->data);
692             size_t total_size;
693             int write_flags_masked = 0;
694 
695             p->flags = 0;
696             p->iovs_num = 0;
697             assert(!multifd_payload_empty(p->data));
698 
699             if (is_device_state) {
700                 multifd_device_state_send_prepare(p);
701 
702                 /* Device state packets cannot be sent via zerocopy */
703                 write_flags_masked |= QIO_CHANNEL_WRITE_FLAG_ZERO_COPY;
704             } else {
705                 ret = multifd_send_state->ops->send_prepare(p, &local_err);
706                 if (ret != 0) {
707                     break;
708                 }
709             }
710 
711             /*
712              * The packet header in the zerocopy RAM case is accounted for
713              * in multifd_nocomp_send_prepare() - where it is actually
714              * being sent.
715              */
716             total_size = iov_size(p->iov, p->iovs_num);
717 
718             if (migrate_mapped_ram()) {
719                 assert(!is_device_state);
720 
721                 ret = file_write_ramblock_iov(p->c, p->iov, p->iovs_num,
722                                               &p->data->u.ram, &local_err);
723             } else {
724                 ret = qio_channel_writev_full_all(p->c, p->iov, p->iovs_num,
725                                                   NULL, 0,
726                                                   p->write_flags & ~write_flags_masked,
727                                                   &local_err);
728             }
729 
730             if (ret != 0) {
731                 break;
732             }
733 
734             stat64_add(&mig_stats.multifd_bytes, total_size);
735 
736             p->next_packet_size = 0;
737             multifd_send_data_clear(p->data);
738 
739             /*
740              * Making sure p->data is published before saying "we're
741              * free".  Pairs with the smp_mb_acquire() in
742              * multifd_send().
743              */
744             qatomic_store_release(&p->pending_job, false);
745         } else {
746             MultiFDSyncReq req = qatomic_read(&p->pending_sync);
747 
748             /*
749              * If not a normal job, must be a sync request.  Note that
750              * pending_sync is a standalone flag (unlike pending_job), so
751              * it doesn't require explicit memory barriers.
752              */
753             assert(req != MULTIFD_SYNC_NONE);
754 
755             /* Only push the SYNC message if it involves a remote sync */
756             if (req == MULTIFD_SYNC_ALL) {
757                 p->flags = MULTIFD_FLAG_SYNC;
758                 multifd_send_fill_packet(p);
759                 ret = qio_channel_write_all(p->c, (void *)p->packet,
760                                             p->packet_len, &local_err);
761                 if (ret != 0) {
762                     break;
763                 }
764                 /* p->next_packet_size will always be zero for a SYNC packet */
765                 stat64_add(&mig_stats.multifd_bytes, p->packet_len);
766             }
767 
768             qatomic_set(&p->pending_sync, MULTIFD_SYNC_NONE);
769             qemu_sem_post(&p->sem_sync);
770         }
771     }
772 
773 out:
774     if (ret) {
775         assert(local_err);
776         trace_multifd_send_error(p->id);
777         multifd_send_set_error(local_err);
778         multifd_send_kick_main(p);
779         error_free(local_err);
780     }
781 
782     rcu_unregister_thread();
783     migration_threads_remove(thread);
784     trace_multifd_send_thread_end(p->id, p->packets_sent);
785 
786     return NULL;
787 }
788 
789 static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque);
790 
791 typedef struct {
792     MultiFDSendParams *p;
793     QIOChannelTLS *tioc;
794 } MultiFDTLSThreadArgs;
795 
multifd_tls_handshake_thread(void * opaque)796 static void *multifd_tls_handshake_thread(void *opaque)
797 {
798     MultiFDTLSThreadArgs *args = opaque;
799 
800     qio_channel_tls_handshake(args->tioc,
801                               multifd_new_send_channel_async,
802                               args->p,
803                               NULL,
804                               NULL);
805     g_free(args);
806 
807     return NULL;
808 }
809 
multifd_tls_channel_connect(MultiFDSendParams * p,QIOChannel * ioc,Error ** errp)810 static bool multifd_tls_channel_connect(MultiFDSendParams *p,
811                                         QIOChannel *ioc,
812                                         Error **errp)
813 {
814     MigrationState *s = migrate_get_current();
815     const char *hostname = s->hostname;
816     MultiFDTLSThreadArgs *args;
817     QIOChannelTLS *tioc;
818 
819     tioc = migration_tls_client_create(ioc, hostname, errp);
820     if (!tioc) {
821         return false;
822     }
823 
824     /*
825      * Ownership of the socket channel now transfers to the newly
826      * created TLS channel, which has already taken a reference.
827      */
828     object_unref(OBJECT(ioc));
829     trace_multifd_tls_outgoing_handshake_start(ioc, tioc, hostname);
830     qio_channel_set_name(QIO_CHANNEL(tioc), "multifd-tls-outgoing");
831 
832     args = g_new0(MultiFDTLSThreadArgs, 1);
833     args->tioc = tioc;
834     args->p = p;
835 
836     p->tls_thread_created = true;
837     qemu_thread_create(&p->tls_thread, MIGRATION_THREAD_SRC_TLS,
838                        multifd_tls_handshake_thread, args,
839                        QEMU_THREAD_JOINABLE);
840     return true;
841 }
842 
multifd_channel_connect(MultiFDSendParams * p,QIOChannel * ioc)843 void multifd_channel_connect(MultiFDSendParams *p, QIOChannel *ioc)
844 {
845     qio_channel_set_delay(ioc, false);
846 
847     migration_ioc_register_yank(ioc);
848     /* Setup p->c only if the channel is completely setup */
849     p->c = ioc;
850 
851     p->thread_created = true;
852     qemu_thread_create(&p->thread, p->name, multifd_send_thread, p,
853                        QEMU_THREAD_JOINABLE);
854 }
855 
856 /*
857  * When TLS is enabled this function is called once to establish the
858  * TLS connection and a second time after the TLS handshake to create
859  * the multifd channel. Without TLS it goes straight into the channel
860  * creation.
861  */
multifd_new_send_channel_async(QIOTask * task,gpointer opaque)862 static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque)
863 {
864     MultiFDSendParams *p = opaque;
865     QIOChannel *ioc = QIO_CHANNEL(qio_task_get_source(task));
866     Error *local_err = NULL;
867     bool ret;
868 
869     trace_multifd_new_send_channel_async(p->id);
870 
871     if (qio_task_propagate_error(task, &local_err)) {
872         ret = false;
873         goto out;
874     }
875 
876     trace_multifd_set_outgoing_channel(ioc, object_get_typename(OBJECT(ioc)),
877                                        migrate_get_current()->hostname);
878 
879     if (migrate_channel_requires_tls_upgrade(ioc)) {
880         ret = multifd_tls_channel_connect(p, ioc, &local_err);
881         if (ret) {
882             return;
883         }
884     } else {
885         multifd_channel_connect(p, ioc);
886         ret = true;
887     }
888 
889 out:
890     /*
891      * Here we're not interested whether creation succeeded, only that
892      * it happened at all.
893      */
894     multifd_send_channel_created();
895 
896     if (ret) {
897         return;
898     }
899 
900     trace_multifd_new_send_channel_async_error(p->id, local_err);
901     multifd_send_set_error(local_err);
902     /*
903      * For error cases (TLS or non-TLS), IO channel is always freed here
904      * rather than when cleanup multifd: since p->c is not set, multifd
905      * cleanup code doesn't even know its existence.
906      */
907     object_unref(OBJECT(ioc));
908     error_free(local_err);
909 }
910 
multifd_new_send_channel_create(gpointer opaque,Error ** errp)911 static bool multifd_new_send_channel_create(gpointer opaque, Error **errp)
912 {
913     if (!multifd_use_packets()) {
914         return file_send_channel_create(opaque, errp);
915     }
916 
917     socket_send_channel_create(multifd_new_send_channel_async, opaque);
918     return true;
919 }
920 
multifd_send_setup(void)921 bool multifd_send_setup(void)
922 {
923     MigrationState *s = migrate_get_current();
924     int thread_count, ret = 0;
925     uint32_t page_count = multifd_ram_page_count();
926     bool use_packets = multifd_use_packets();
927     uint8_t i;
928 
929     if (!migrate_multifd()) {
930         return true;
931     }
932 
933     thread_count = migrate_multifd_channels();
934     multifd_send_state = g_malloc0(sizeof(*multifd_send_state));
935     multifd_send_state->params = g_new0(MultiFDSendParams, thread_count);
936     qemu_mutex_init(&multifd_send_state->multifd_send_mutex);
937     qemu_sem_init(&multifd_send_state->channels_created, 0);
938     qemu_sem_init(&multifd_send_state->channels_ready, 0);
939     qatomic_set(&multifd_send_state->exiting, 0);
940     multifd_send_state->ops = multifd_ops[migrate_multifd_compression()];
941 
942     for (i = 0; i < thread_count; i++) {
943         MultiFDSendParams *p = &multifd_send_state->params[i];
944         Error *local_err = NULL;
945 
946         qemu_sem_init(&p->sem, 0);
947         qemu_sem_init(&p->sem_sync, 0);
948         p->id = i;
949         p->data = multifd_send_data_alloc();
950 
951         if (use_packets) {
952             p->packet_len = sizeof(MultiFDPacket_t)
953                           + sizeof(uint64_t) * page_count;
954             p->packet = g_malloc0(p->packet_len);
955             p->packet_device_state = g_malloc0(sizeof(*p->packet_device_state));
956             p->packet_device_state->hdr.magic = cpu_to_be32(MULTIFD_MAGIC);
957             p->packet_device_state->hdr.version = cpu_to_be32(MULTIFD_VERSION);
958         }
959         p->name = g_strdup_printf(MIGRATION_THREAD_SRC_MULTIFD, i);
960         p->write_flags = 0;
961 
962         if (!multifd_new_send_channel_create(p, &local_err)) {
963             migrate_set_error(s, local_err);
964             ret = -1;
965         }
966     }
967 
968     /*
969      * Wait until channel creation has started for all channels. The
970      * creation can still fail, but no more channels will be created
971      * past this point.
972      */
973     for (i = 0; i < thread_count; i++) {
974         qemu_sem_wait(&multifd_send_state->channels_created);
975     }
976 
977     if (ret) {
978         goto err;
979     }
980 
981     for (i = 0; i < thread_count; i++) {
982         MultiFDSendParams *p = &multifd_send_state->params[i];
983         Error *local_err = NULL;
984 
985         ret = multifd_send_state->ops->send_setup(p, &local_err);
986         if (ret) {
987             migrate_set_error(s, local_err);
988             goto err;
989         }
990         assert(p->iov);
991     }
992 
993     multifd_device_state_send_setup();
994 
995     return true;
996 
997 err:
998     migrate_set_state(&s->state, MIGRATION_STATUS_SETUP,
999                       MIGRATION_STATUS_FAILED);
1000     return false;
1001 }
1002 
multifd_recv(void)1003 bool multifd_recv(void)
1004 {
1005     int i;
1006     static int next_recv_channel;
1007     MultiFDRecvParams *p = NULL;
1008     MultiFDRecvData *data = multifd_recv_state->data;
1009 
1010     /*
1011      * next_channel can remain from a previous migration that was
1012      * using more channels, so ensure it doesn't overflow if the
1013      * limit is lower now.
1014      */
1015     next_recv_channel %= migrate_multifd_channels();
1016     for (i = next_recv_channel;; i = (i + 1) % migrate_multifd_channels()) {
1017         if (multifd_recv_should_exit()) {
1018             return false;
1019         }
1020 
1021         p = &multifd_recv_state->params[i];
1022 
1023         if (qatomic_read(&p->pending_job) == false) {
1024             next_recv_channel = (i + 1) % migrate_multifd_channels();
1025             break;
1026         }
1027     }
1028 
1029     /*
1030      * Order pending_job read before manipulating p->data below. Pairs
1031      * with qatomic_store_release() at multifd_recv_thread().
1032      */
1033     smp_mb_acquire();
1034 
1035     assert(!p->data->size);
1036     multifd_recv_state->data = p->data;
1037     p->data = data;
1038 
1039     /*
1040      * Order p->data update before setting pending_job. Pairs with
1041      * qatomic_load_acquire() at multifd_recv_thread().
1042      */
1043     qatomic_store_release(&p->pending_job, true);
1044     qemu_sem_post(&p->sem);
1045 
1046     return true;
1047 }
1048 
multifd_get_recv_data(void)1049 MultiFDRecvData *multifd_get_recv_data(void)
1050 {
1051     return multifd_recv_state->data;
1052 }
1053 
multifd_recv_terminate_threads(Error * err)1054 static void multifd_recv_terminate_threads(Error *err)
1055 {
1056     int i;
1057 
1058     trace_multifd_recv_terminate_threads(err != NULL);
1059 
1060     if (qatomic_xchg(&multifd_recv_state->exiting, 1)) {
1061         return;
1062     }
1063 
1064     if (err) {
1065         MigrationState *s = migrate_get_current();
1066         migrate_set_error(s, err);
1067         if (s->state == MIGRATION_STATUS_SETUP ||
1068             s->state == MIGRATION_STATUS_ACTIVE) {
1069             migrate_set_state(&s->state, s->state,
1070                               MIGRATION_STATUS_FAILED);
1071         }
1072     }
1073 
1074     for (i = 0; i < migrate_multifd_channels(); i++) {
1075         MultiFDRecvParams *p = &multifd_recv_state->params[i];
1076 
1077         /*
1078          * The migration thread and channels interact differently
1079          * depending on the presence of packets.
1080          */
1081         if (multifd_use_packets()) {
1082             /*
1083              * The channel receives as long as there are packets. When
1084              * packets end (i.e. MULTIFD_FLAG_SYNC is reached), the
1085              * channel waits for the migration thread to sync. If the
1086              * sync never happens, do it here.
1087              */
1088             qemu_sem_post(&p->sem_sync);
1089         } else {
1090             /*
1091              * The channel waits for the migration thread to give it
1092              * work. When the migration thread runs out of work, it
1093              * releases the channel and waits for any pending work to
1094              * finish. If we reach here (e.g. due to error) before the
1095              * work runs out, release the channel.
1096              */
1097             qemu_sem_post(&p->sem);
1098         }
1099 
1100         /*
1101          * We could arrive here for two reasons:
1102          *  - normal quit, i.e. everything went fine, just finished
1103          *  - error quit: We close the channels so the channel threads
1104          *    finish the qio_channel_read_all_eof()
1105          */
1106         if (p->c) {
1107             qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
1108         }
1109     }
1110 }
1111 
multifd_recv_shutdown(void)1112 void multifd_recv_shutdown(void)
1113 {
1114     if (migrate_multifd()) {
1115         multifd_recv_terminate_threads(NULL);
1116     }
1117 }
1118 
multifd_recv_cleanup_channel(MultiFDRecvParams * p)1119 static void multifd_recv_cleanup_channel(MultiFDRecvParams *p)
1120 {
1121     migration_ioc_unregister_yank(p->c);
1122     object_unref(OBJECT(p->c));
1123     p->c = NULL;
1124     qemu_mutex_destroy(&p->mutex);
1125     qemu_sem_destroy(&p->sem_sync);
1126     qemu_sem_destroy(&p->sem);
1127     g_free(p->data);
1128     p->data = NULL;
1129     g_free(p->name);
1130     p->name = NULL;
1131     p->packet_len = 0;
1132     g_free(p->packet);
1133     p->packet = NULL;
1134     g_clear_pointer(&p->packet_dev_state, g_free);
1135     g_free(p->normal);
1136     p->normal = NULL;
1137     g_free(p->zero);
1138     p->zero = NULL;
1139     multifd_recv_state->ops->recv_cleanup(p);
1140 }
1141 
multifd_recv_cleanup_state(void)1142 static void multifd_recv_cleanup_state(void)
1143 {
1144     qemu_sem_destroy(&multifd_recv_state->sem_sync);
1145     g_free(multifd_recv_state->params);
1146     multifd_recv_state->params = NULL;
1147     g_free(multifd_recv_state->data);
1148     multifd_recv_state->data = NULL;
1149     g_free(multifd_recv_state);
1150     multifd_recv_state = NULL;
1151 }
1152 
multifd_recv_cleanup(void)1153 void multifd_recv_cleanup(void)
1154 {
1155     int i;
1156 
1157     if (!migrate_multifd()) {
1158         return;
1159     }
1160     multifd_recv_terminate_threads(NULL);
1161     for (i = 0; i < migrate_multifd_channels(); i++) {
1162         MultiFDRecvParams *p = &multifd_recv_state->params[i];
1163 
1164         if (p->thread_created) {
1165             qemu_thread_join(&p->thread);
1166         }
1167     }
1168     for (i = 0; i < migrate_multifd_channels(); i++) {
1169         multifd_recv_cleanup_channel(&multifd_recv_state->params[i]);
1170     }
1171     multifd_recv_cleanup_state();
1172 }
1173 
multifd_recv_sync_main(void)1174 void multifd_recv_sync_main(void)
1175 {
1176     int thread_count = migrate_multifd_channels();
1177     bool file_based = !multifd_use_packets();
1178     int i;
1179 
1180     if (!migrate_multifd()) {
1181         return;
1182     }
1183 
1184     /*
1185      * File-based channels don't use packets and therefore need to
1186      * wait for more work. Release them to start the sync.
1187      */
1188     if (file_based) {
1189         for (i = 0; i < thread_count; i++) {
1190             MultiFDRecvParams *p = &multifd_recv_state->params[i];
1191 
1192             trace_multifd_recv_sync_main_signal(p->id);
1193             qemu_sem_post(&p->sem);
1194         }
1195     }
1196 
1197     /*
1198      * Initiate the synchronization by waiting for all channels.
1199      *
1200      * For socket-based migration this means each channel has received
1201      * the SYNC packet on the stream.
1202      *
1203      * For file-based migration this means each channel is done with
1204      * the work (pending_job=false).
1205      */
1206     for (i = 0; i < thread_count; i++) {
1207         trace_multifd_recv_sync_main_wait(i);
1208         qemu_sem_wait(&multifd_recv_state->sem_sync);
1209     }
1210 
1211     if (file_based) {
1212         /*
1213          * For file-based loading is done in one iteration. We're
1214          * done.
1215          */
1216         return;
1217     }
1218 
1219     /*
1220      * Sync done. Release the channels for the next iteration.
1221      */
1222     for (i = 0; i < thread_count; i++) {
1223         MultiFDRecvParams *p = &multifd_recv_state->params[i];
1224 
1225         WITH_QEMU_LOCK_GUARD(&p->mutex) {
1226             if (multifd_recv_state->packet_num < p->packet_num) {
1227                 multifd_recv_state->packet_num = p->packet_num;
1228             }
1229         }
1230         trace_multifd_recv_sync_main_signal(p->id);
1231         qemu_sem_post(&p->sem_sync);
1232     }
1233     trace_multifd_recv_sync_main(multifd_recv_state->packet_num);
1234 }
1235 
multifd_device_state_recv(MultiFDRecvParams * p,Error ** errp)1236 static int multifd_device_state_recv(MultiFDRecvParams *p, Error **errp)
1237 {
1238     g_autofree char *dev_state_buf = NULL;
1239     int ret;
1240 
1241     dev_state_buf = g_malloc(p->next_packet_size);
1242 
1243     ret = qio_channel_read_all(p->c, dev_state_buf, p->next_packet_size, errp);
1244     if (ret != 0) {
1245         return ret;
1246     }
1247 
1248     if (p->packet_dev_state->idstr[sizeof(p->packet_dev_state->idstr) - 1]
1249         != 0) {
1250         error_setg(errp, "unterminated multifd device state idstr");
1251         return -1;
1252     }
1253 
1254     if (!qemu_loadvm_load_state_buffer(p->packet_dev_state->idstr,
1255                                        p->packet_dev_state->instance_id,
1256                                        dev_state_buf, p->next_packet_size,
1257                                        errp)) {
1258         ret = -1;
1259     }
1260 
1261     return ret;
1262 }
1263 
multifd_recv_thread(void * opaque)1264 static void *multifd_recv_thread(void *opaque)
1265 {
1266     MigrationState *s = migrate_get_current();
1267     MultiFDRecvParams *p = opaque;
1268     Error *local_err = NULL;
1269     bool use_packets = multifd_use_packets();
1270     int ret;
1271 
1272     trace_multifd_recv_thread_start(p->id);
1273     rcu_register_thread();
1274 
1275     if (!s->multifd_clean_tls_termination) {
1276         p->read_flags = QIO_CHANNEL_READ_FLAG_RELAXED_EOF;
1277     }
1278 
1279     while (true) {
1280         MultiFDPacketHdr_t hdr;
1281         uint32_t flags = 0;
1282         bool is_device_state = false;
1283         bool has_data = false;
1284         uint8_t *pkt_buf;
1285         size_t pkt_len;
1286 
1287         p->normal_num = 0;
1288 
1289         if (use_packets) {
1290             struct iovec iov = {
1291                 .iov_base = (void *)&hdr,
1292                 .iov_len = sizeof(hdr)
1293             };
1294 
1295             if (multifd_recv_should_exit()) {
1296                 break;
1297             }
1298 
1299             ret = qio_channel_readv_full_all_eof(p->c, &iov, 1, NULL, NULL,
1300                                                  p->read_flags, &local_err);
1301             if (!ret) {
1302                 /* EOF */
1303                 assert(!local_err);
1304                 break;
1305             }
1306 
1307             if (ret == -1) {
1308                 break;
1309             }
1310 
1311             ret = multifd_recv_unfill_packet_header(p, &hdr, &local_err);
1312             if (ret) {
1313                 break;
1314             }
1315 
1316             is_device_state = p->flags & MULTIFD_FLAG_DEVICE_STATE;
1317             if (is_device_state) {
1318                 pkt_buf = (uint8_t *)p->packet_dev_state + sizeof(hdr);
1319                 pkt_len = sizeof(*p->packet_dev_state) - sizeof(hdr);
1320             } else {
1321                 pkt_buf = (uint8_t *)p->packet + sizeof(hdr);
1322                 pkt_len = p->packet_len - sizeof(hdr);
1323             }
1324 
1325             ret = qio_channel_read_all_eof(p->c, (char *)pkt_buf, pkt_len,
1326                                            &local_err);
1327             if (!ret) {
1328                 /* EOF */
1329                 error_setg(&local_err, "multifd: unexpected EOF after packet header");
1330                 break;
1331             }
1332 
1333             if (ret == -1) {
1334                 break;
1335             }
1336 
1337             qemu_mutex_lock(&p->mutex);
1338             ret = multifd_recv_unfill_packet(p, &local_err);
1339             if (ret) {
1340                 qemu_mutex_unlock(&p->mutex);
1341                 break;
1342             }
1343 
1344             flags = p->flags;
1345             /* recv methods don't know how to handle the SYNC flag */
1346             p->flags &= ~MULTIFD_FLAG_SYNC;
1347 
1348             if (is_device_state) {
1349                 has_data = p->next_packet_size > 0;
1350             } else {
1351                 /*
1352                  * Even if it's a SYNC packet, this needs to be set
1353                  * because older QEMUs (<9.0) still send data along with
1354                  * the SYNC packet.
1355                  */
1356                 has_data = p->normal_num || p->zero_num;
1357             }
1358 
1359             qemu_mutex_unlock(&p->mutex);
1360         } else {
1361             /*
1362              * No packets, so we need to wait for the vmstate code to
1363              * give us work.
1364              */
1365             qemu_sem_wait(&p->sem);
1366 
1367             if (multifd_recv_should_exit()) {
1368                 break;
1369             }
1370 
1371             /* pairs with qatomic_store_release() at multifd_recv() */
1372             if (!qatomic_load_acquire(&p->pending_job)) {
1373                 /*
1374                  * Migration thread did not send work, this is
1375                  * equivalent to pending_sync on the sending
1376                  * side. Post sem_sync to notify we reached this
1377                  * point.
1378                  */
1379                 qemu_sem_post(&multifd_recv_state->sem_sync);
1380                 continue;
1381             }
1382 
1383             has_data = !!p->data->size;
1384         }
1385 
1386         if (has_data) {
1387             /*
1388              * multifd thread should not be active and receive data
1389              * when migration is in the Postcopy phase. Two threads
1390              * writing the same memory area could easily corrupt
1391              * the guest state.
1392              */
1393             assert(!migration_in_postcopy());
1394             if (is_device_state) {
1395                 assert(use_packets);
1396                 ret = multifd_device_state_recv(p, &local_err);
1397             } else {
1398                 ret = multifd_recv_state->ops->recv(p, &local_err);
1399             }
1400             if (ret != 0) {
1401                 break;
1402             }
1403         } else if (is_device_state) {
1404             error_setg(&local_err,
1405                        "multifd: received empty device state packet");
1406             break;
1407         }
1408 
1409         if (use_packets) {
1410             if (flags & MULTIFD_FLAG_SYNC) {
1411                 if (is_device_state) {
1412                     error_setg(&local_err,
1413                                "multifd: received SYNC device state packet");
1414                     break;
1415                 }
1416 
1417                 qemu_sem_post(&multifd_recv_state->sem_sync);
1418                 qemu_sem_wait(&p->sem_sync);
1419             }
1420         } else {
1421             p->data->size = 0;
1422             /*
1423              * Order data->size update before clearing
1424              * pending_job. Pairs with smp_mb_acquire() at
1425              * multifd_recv().
1426              */
1427             qatomic_store_release(&p->pending_job, false);
1428         }
1429     }
1430 
1431     if (local_err) {
1432         multifd_recv_terminate_threads(local_err);
1433         error_free(local_err);
1434     }
1435 
1436     rcu_unregister_thread();
1437     trace_multifd_recv_thread_end(p->id, p->packets_recved);
1438 
1439     return NULL;
1440 }
1441 
multifd_recv_setup(Error ** errp)1442 int multifd_recv_setup(Error **errp)
1443 {
1444     int thread_count;
1445     uint32_t page_count = multifd_ram_page_count();
1446     bool use_packets = multifd_use_packets();
1447     uint8_t i;
1448 
1449     /*
1450      * Return successfully if multiFD recv state is already initialised
1451      * or multiFD is not enabled.
1452      */
1453     if (multifd_recv_state || !migrate_multifd()) {
1454         return 0;
1455     }
1456 
1457     thread_count = migrate_multifd_channels();
1458     multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state));
1459     multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count);
1460 
1461     multifd_recv_state->data = g_new0(MultiFDRecvData, 1);
1462     multifd_recv_state->data->size = 0;
1463 
1464     qatomic_set(&multifd_recv_state->count, 0);
1465     qatomic_set(&multifd_recv_state->exiting, 0);
1466     qemu_sem_init(&multifd_recv_state->sem_sync, 0);
1467     multifd_recv_state->ops = multifd_ops[migrate_multifd_compression()];
1468 
1469     for (i = 0; i < thread_count; i++) {
1470         MultiFDRecvParams *p = &multifd_recv_state->params[i];
1471 
1472         qemu_mutex_init(&p->mutex);
1473         qemu_sem_init(&p->sem_sync, 0);
1474         qemu_sem_init(&p->sem, 0);
1475         p->pending_job = false;
1476         p->id = i;
1477 
1478         p->data = g_new0(MultiFDRecvData, 1);
1479         p->data->size = 0;
1480 
1481         if (use_packets) {
1482             p->packet_len = sizeof(MultiFDPacket_t)
1483                 + sizeof(uint64_t) * page_count;
1484             p->packet = g_malloc0(p->packet_len);
1485             p->packet_dev_state = g_malloc0(sizeof(*p->packet_dev_state));
1486         }
1487         p->name = g_strdup_printf(MIGRATION_THREAD_DST_MULTIFD, i);
1488         p->normal = g_new0(ram_addr_t, page_count);
1489         p->zero = g_new0(ram_addr_t, page_count);
1490     }
1491 
1492     for (i = 0; i < thread_count; i++) {
1493         MultiFDRecvParams *p = &multifd_recv_state->params[i];
1494         int ret;
1495 
1496         ret = multifd_recv_state->ops->recv_setup(p, errp);
1497         if (ret) {
1498             return ret;
1499         }
1500     }
1501     return 0;
1502 }
1503 
multifd_recv_all_channels_created(void)1504 bool multifd_recv_all_channels_created(void)
1505 {
1506     int thread_count = migrate_multifd_channels();
1507 
1508     if (!migrate_multifd()) {
1509         return true;
1510     }
1511 
1512     if (!multifd_recv_state) {
1513         /* Called before any connections created */
1514         return false;
1515     }
1516 
1517     return thread_count == qatomic_read(&multifd_recv_state->count);
1518 }
1519 
1520 /*
1521  * Try to receive all multifd channels to get ready for the migration.
1522  * Sets @errp when failing to receive the current channel.
1523  */
multifd_recv_new_channel(QIOChannel * ioc,Error ** errp)1524 void multifd_recv_new_channel(QIOChannel *ioc, Error **errp)
1525 {
1526     MultiFDRecvParams *p;
1527     Error *local_err = NULL;
1528     bool use_packets = multifd_use_packets();
1529     int id;
1530 
1531     if (use_packets) {
1532         id = multifd_recv_initial_packet(ioc, &local_err);
1533         if (id < 0) {
1534             multifd_recv_terminate_threads(local_err);
1535             error_propagate_prepend(errp, local_err,
1536                                     "failed to receive packet"
1537                                     " via multifd channel %d: ",
1538                                     qatomic_read(&multifd_recv_state->count));
1539             return;
1540         }
1541         trace_multifd_recv_new_channel(id);
1542     } else {
1543         id = qatomic_read(&multifd_recv_state->count);
1544     }
1545 
1546     p = &multifd_recv_state->params[id];
1547     if (p->c != NULL) {
1548         error_setg(&local_err, "multifd: received id '%d' already setup'",
1549                    id);
1550         multifd_recv_terminate_threads(local_err);
1551         error_propagate(errp, local_err);
1552         return;
1553     }
1554     p->c = ioc;
1555     object_ref(OBJECT(ioc));
1556 
1557     p->thread_created = true;
1558     qemu_thread_create(&p->thread, p->name, multifd_recv_thread, p,
1559                        QEMU_THREAD_JOINABLE);
1560     qatomic_inc(&multifd_recv_state->count);
1561 }
1562