1 /*
2 * Multifd common code
3 *
4 * Copyright (c) 2019-2020 Red Hat Inc
5 *
6 * Authors:
7 * Juan Quintela <quintela@redhat.com>
8 *
9 * This work is licensed under the terms of the GNU GPL, version 2 or later.
10 * See the COPYING file in the top-level directory.
11 */
12
13 #include "qemu/osdep.h"
14 #include "qemu/cutils.h"
15 #include "qemu/iov.h"
16 #include "qemu/rcu.h"
17 #include "exec/target_page.h"
18 #include "system/system.h"
19 #include "system/ramblock.h"
20 #include "qemu/error-report.h"
21 #include "qapi/error.h"
22 #include "file.h"
23 #include "migration/misc.h"
24 #include "migration.h"
25 #include "migration-stats.h"
26 #include "savevm.h"
27 #include "socket.h"
28 #include "tls.h"
29 #include "qemu-file.h"
30 #include "trace.h"
31 #include "multifd.h"
32 #include "threadinfo.h"
33 #include "options.h"
34 #include "qemu/yank.h"
35 #include "io/channel-file.h"
36 #include "io/channel-socket.h"
37 #include "yank_functions.h"
38
39 typedef struct {
40 uint32_t magic;
41 uint32_t version;
42 unsigned char uuid[16]; /* QemuUUID */
43 uint8_t id;
44 uint8_t unused1[7]; /* Reserved for future use */
45 uint64_t unused2[4]; /* Reserved for future use */
46 } __attribute__((packed)) MultiFDInit_t;
47
48 struct {
49 MultiFDSendParams *params;
50
51 /* multifd_send() body is not thread safe, needs serialization */
52 QemuMutex multifd_send_mutex;
53
54 /*
55 * Global number of generated multifd packets.
56 *
57 * Note that we used 'uintptr_t' because it'll naturally support atomic
58 * operations on both 32bit / 64 bits hosts. It means on 32bit systems
59 * multifd will overflow the packet_num easier, but that should be
60 * fine.
61 *
62 * Another option is to use QEMU's Stat64 then it'll be 64 bits on all
63 * hosts, however so far it does not support atomic fetch_add() yet.
64 * Make it easy for now.
65 */
66 uintptr_t packet_num;
67 /*
68 * Synchronization point past which no more channels will be
69 * created.
70 */
71 QemuSemaphore channels_created;
72 /* send channels ready */
73 QemuSemaphore channels_ready;
74 /*
75 * Have we already run terminate threads. There is a race when it
76 * happens that we got one error while we are exiting.
77 * We will use atomic operations. Only valid values are 0 and 1.
78 */
79 int exiting;
80 /* multifd ops */
81 const MultiFDMethods *ops;
82 } *multifd_send_state;
83
84 struct {
85 MultiFDRecvParams *params;
86 MultiFDRecvData *data;
87 /* number of created threads */
88 int count;
89 /*
90 * This is always posted by the recv threads, the migration thread
91 * uses it to wait for recv threads to finish assigned tasks.
92 */
93 QemuSemaphore sem_sync;
94 /* global number of generated multifd packets */
95 uint64_t packet_num;
96 int exiting;
97 /* multifd ops */
98 const MultiFDMethods *ops;
99 } *multifd_recv_state;
100
multifd_send_data_alloc(void)101 MultiFDSendData *multifd_send_data_alloc(void)
102 {
103 MultiFDSendData *new = g_new0(MultiFDSendData, 1);
104
105 multifd_ram_payload_alloc(&new->u.ram);
106 /* Device state allocates its payload on-demand */
107
108 return new;
109 }
110
multifd_send_data_clear(MultiFDSendData * data)111 void multifd_send_data_clear(MultiFDSendData *data)
112 {
113 if (multifd_payload_empty(data)) {
114 return;
115 }
116
117 switch (data->type) {
118 case MULTIFD_PAYLOAD_DEVICE_STATE:
119 multifd_send_data_clear_device_state(&data->u.device_state);
120 break;
121 default:
122 /* Nothing to do */
123 break;
124 }
125
126 data->type = MULTIFD_PAYLOAD_NONE;
127 }
128
multifd_send_data_free(MultiFDSendData * data)129 void multifd_send_data_free(MultiFDSendData *data)
130 {
131 if (!data) {
132 return;
133 }
134
135 /* This also free's device state payload */
136 multifd_send_data_clear(data);
137
138 multifd_ram_payload_free(&data->u.ram);
139
140 g_free(data);
141 }
142
multifd_use_packets(void)143 static bool multifd_use_packets(void)
144 {
145 return !migrate_mapped_ram();
146 }
147
multifd_send_channel_created(void)148 void multifd_send_channel_created(void)
149 {
150 qemu_sem_post(&multifd_send_state->channels_created);
151 }
152
153 static const MultiFDMethods *multifd_ops[MULTIFD_COMPRESSION__MAX] = {};
154
multifd_register_ops(int method,const MultiFDMethods * ops)155 void multifd_register_ops(int method, const MultiFDMethods *ops)
156 {
157 assert(0 <= method && method < MULTIFD_COMPRESSION__MAX);
158 assert(!multifd_ops[method]);
159 multifd_ops[method] = ops;
160 }
161
multifd_send_initial_packet(MultiFDSendParams * p,Error ** errp)162 static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp)
163 {
164 MultiFDInit_t msg = {};
165 size_t size = sizeof(msg);
166 int ret;
167
168 msg.magic = cpu_to_be32(MULTIFD_MAGIC);
169 msg.version = cpu_to_be32(MULTIFD_VERSION);
170 msg.id = p->id;
171 memcpy(msg.uuid, &qemu_uuid.data, sizeof(msg.uuid));
172
173 ret = qio_channel_write_all(p->c, (char *)&msg, size, errp);
174 if (ret != 0) {
175 return -1;
176 }
177 stat64_add(&mig_stats.multifd_bytes, size);
178 return 0;
179 }
180
multifd_recv_initial_packet(QIOChannel * c,Error ** errp)181 static int multifd_recv_initial_packet(QIOChannel *c, Error **errp)
182 {
183 MultiFDInit_t msg;
184 int ret;
185
186 ret = qio_channel_read_all(c, (char *)&msg, sizeof(msg), errp);
187 if (ret != 0) {
188 return -1;
189 }
190
191 msg.magic = be32_to_cpu(msg.magic);
192 msg.version = be32_to_cpu(msg.version);
193
194 if (msg.magic != MULTIFD_MAGIC) {
195 error_setg(errp, "multifd: received packet magic %x "
196 "expected %x", msg.magic, MULTIFD_MAGIC);
197 return -1;
198 }
199
200 if (msg.version != MULTIFD_VERSION) {
201 error_setg(errp, "multifd: received packet version %u "
202 "expected %u", msg.version, MULTIFD_VERSION);
203 return -1;
204 }
205
206 if (memcmp(msg.uuid, &qemu_uuid, sizeof(qemu_uuid))) {
207 char *uuid = qemu_uuid_unparse_strdup(&qemu_uuid);
208 char *msg_uuid = qemu_uuid_unparse_strdup((const QemuUUID *)msg.uuid);
209
210 error_setg(errp, "multifd: received uuid '%s' and expected "
211 "uuid '%s' for channel %hhd", msg_uuid, uuid, msg.id);
212 g_free(uuid);
213 g_free(msg_uuid);
214 return -1;
215 }
216
217 if (msg.id > migrate_multifd_channels()) {
218 error_setg(errp, "multifd: received channel id %u is greater than "
219 "number of channels %u", msg.id, migrate_multifd_channels());
220 return -1;
221 }
222
223 return msg.id;
224 }
225
226 /* Fills a RAM multifd packet */
multifd_send_fill_packet(MultiFDSendParams * p)227 void multifd_send_fill_packet(MultiFDSendParams *p)
228 {
229 MultiFDPacket_t *packet = p->packet;
230 uint64_t packet_num;
231 bool sync_packet = p->flags & MULTIFD_FLAG_SYNC;
232
233 memset(packet, 0, p->packet_len);
234
235 packet->hdr.magic = cpu_to_be32(MULTIFD_MAGIC);
236 packet->hdr.version = cpu_to_be32(MULTIFD_VERSION);
237
238 packet->hdr.flags = cpu_to_be32(p->flags);
239 packet->next_packet_size = cpu_to_be32(p->next_packet_size);
240
241 packet_num = qatomic_fetch_inc(&multifd_send_state->packet_num);
242 packet->packet_num = cpu_to_be64(packet_num);
243
244 p->packets_sent++;
245
246 if (!sync_packet) {
247 multifd_ram_fill_packet(p);
248 }
249
250 trace_multifd_send_fill(p->id, packet_num,
251 p->flags, p->next_packet_size);
252 }
253
multifd_recv_unfill_packet_header(MultiFDRecvParams * p,const MultiFDPacketHdr_t * hdr,Error ** errp)254 static int multifd_recv_unfill_packet_header(MultiFDRecvParams *p,
255 const MultiFDPacketHdr_t *hdr,
256 Error **errp)
257 {
258 uint32_t magic = be32_to_cpu(hdr->magic);
259 uint32_t version = be32_to_cpu(hdr->version);
260
261 if (magic != MULTIFD_MAGIC) {
262 error_setg(errp, "multifd: received packet magic %x, expected %x",
263 magic, MULTIFD_MAGIC);
264 return -1;
265 }
266
267 if (version != MULTIFD_VERSION) {
268 error_setg(errp, "multifd: received packet version %u, expected %u",
269 version, MULTIFD_VERSION);
270 return -1;
271 }
272
273 p->flags = be32_to_cpu(hdr->flags);
274
275 return 0;
276 }
277
multifd_recv_unfill_packet_device_state(MultiFDRecvParams * p,Error ** errp)278 static int multifd_recv_unfill_packet_device_state(MultiFDRecvParams *p,
279 Error **errp)
280 {
281 MultiFDPacketDeviceState_t *packet = p->packet_dev_state;
282
283 packet->instance_id = be32_to_cpu(packet->instance_id);
284 p->next_packet_size = be32_to_cpu(packet->next_packet_size);
285
286 return 0;
287 }
288
multifd_recv_unfill_packet_ram(MultiFDRecvParams * p,Error ** errp)289 static int multifd_recv_unfill_packet_ram(MultiFDRecvParams *p, Error **errp)
290 {
291 const MultiFDPacket_t *packet = p->packet;
292 int ret = 0;
293
294 p->next_packet_size = be32_to_cpu(packet->next_packet_size);
295 p->packet_num = be64_to_cpu(packet->packet_num);
296
297 /* Always unfill, old QEMUs (<9.0) send data along with SYNC */
298 ret = multifd_ram_unfill_packet(p, errp);
299
300 trace_multifd_recv_unfill(p->id, p->packet_num, p->flags,
301 p->next_packet_size);
302
303 return ret;
304 }
305
multifd_recv_unfill_packet(MultiFDRecvParams * p,Error ** errp)306 static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp)
307 {
308 p->packets_recved++;
309
310 if (p->flags & MULTIFD_FLAG_DEVICE_STATE) {
311 return multifd_recv_unfill_packet_device_state(p, errp);
312 }
313
314 return multifd_recv_unfill_packet_ram(p, errp);
315 }
316
multifd_send_should_exit(void)317 static bool multifd_send_should_exit(void)
318 {
319 return qatomic_read(&multifd_send_state->exiting);
320 }
321
multifd_recv_should_exit(void)322 static bool multifd_recv_should_exit(void)
323 {
324 return qatomic_read(&multifd_recv_state->exiting);
325 }
326
327 /*
328 * The migration thread can wait on either of the two semaphores. This
329 * function can be used to kick the main thread out of waiting on either of
330 * them. Should mostly only be called when something wrong happened with
331 * the current multifd send thread.
332 */
multifd_send_kick_main(MultiFDSendParams * p)333 static void multifd_send_kick_main(MultiFDSendParams *p)
334 {
335 qemu_sem_post(&p->sem_sync);
336 qemu_sem_post(&multifd_send_state->channels_ready);
337 }
338
339 /*
340 * multifd_send() works by exchanging the MultiFDSendData object
341 * provided by the caller with an unused MultiFDSendData object from
342 * the next channel that is found to be idle.
343 *
344 * The channel owns the data until it finishes transmitting and the
345 * caller owns the empty object until it fills it with data and calls
346 * this function again. No locking necessary.
347 *
348 * Switching is safe because both the migration thread and the channel
349 * thread have barriers in place to serialize access.
350 *
351 * Returns true if succeed, false otherwise.
352 */
multifd_send(MultiFDSendData ** send_data)353 bool multifd_send(MultiFDSendData **send_data)
354 {
355 int i;
356 static int next_channel;
357 MultiFDSendParams *p = NULL; /* make happy gcc */
358 MultiFDSendData *tmp;
359
360 if (multifd_send_should_exit()) {
361 return false;
362 }
363
364 QEMU_LOCK_GUARD(&multifd_send_state->multifd_send_mutex);
365
366 /* We wait here, until at least one channel is ready */
367 qemu_sem_wait(&multifd_send_state->channels_ready);
368
369 /*
370 * next_channel can remain from a previous migration that was
371 * using more channels, so ensure it doesn't overflow if the
372 * limit is lower now.
373 */
374 next_channel %= migrate_multifd_channels();
375 for (i = next_channel;; i = (i + 1) % migrate_multifd_channels()) {
376 if (multifd_send_should_exit()) {
377 return false;
378 }
379 p = &multifd_send_state->params[i];
380 /*
381 * Lockless read to p->pending_job is safe, because only multifd
382 * sender thread can clear it.
383 */
384 if (qatomic_read(&p->pending_job) == false) {
385 next_channel = (i + 1) % migrate_multifd_channels();
386 break;
387 }
388 }
389
390 /*
391 * Make sure we read p->pending_job before all the rest. Pairs with
392 * qatomic_store_release() in multifd_send_thread().
393 */
394 smp_mb_acquire();
395
396 assert(multifd_payload_empty(p->data));
397
398 /*
399 * Swap the pointers. The channel gets the client data for
400 * transferring and the client gets back an unused data slot.
401 */
402 tmp = *send_data;
403 *send_data = p->data;
404 p->data = tmp;
405
406 /*
407 * Making sure p->data is setup before marking pending_job=true. Pairs
408 * with the qatomic_load_acquire() in multifd_send_thread().
409 */
410 qatomic_store_release(&p->pending_job, true);
411 qemu_sem_post(&p->sem);
412
413 return true;
414 }
415
416 /* Multifd send side hit an error; remember it and prepare to quit */
multifd_send_set_error(Error * err)417 static void multifd_send_set_error(Error *err)
418 {
419 /*
420 * We don't want to exit each threads twice. Depending on where
421 * we get the error, or if there are two independent errors in two
422 * threads at the same time, we can end calling this function
423 * twice.
424 */
425 if (qatomic_xchg(&multifd_send_state->exiting, 1)) {
426 return;
427 }
428
429 if (err) {
430 MigrationState *s = migrate_get_current();
431 migrate_set_error(s, err);
432 if (s->state == MIGRATION_STATUS_SETUP ||
433 s->state == MIGRATION_STATUS_PRE_SWITCHOVER ||
434 s->state == MIGRATION_STATUS_DEVICE ||
435 s->state == MIGRATION_STATUS_ACTIVE) {
436 migrate_set_state(&s->state, s->state,
437 MIGRATION_STATUS_FAILED);
438 }
439 }
440 }
441
multifd_send_terminate_threads(void)442 static void multifd_send_terminate_threads(void)
443 {
444 int i;
445
446 trace_multifd_send_terminate_threads();
447
448 /*
449 * Tell everyone we're quitting. No xchg() needed here; we simply
450 * always set it.
451 */
452 qatomic_set(&multifd_send_state->exiting, 1);
453
454 /*
455 * Firstly, kick all threads out; no matter whether they are just idle,
456 * or blocked in an IO system call.
457 */
458 for (i = 0; i < migrate_multifd_channels(); i++) {
459 MultiFDSendParams *p = &multifd_send_state->params[i];
460
461 qemu_sem_post(&p->sem);
462 if (p->c) {
463 qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
464 }
465 }
466
467 /*
468 * Finally recycle all the threads.
469 */
470 for (i = 0; i < migrate_multifd_channels(); i++) {
471 MultiFDSendParams *p = &multifd_send_state->params[i];
472
473 if (p->tls_thread_created) {
474 qemu_thread_join(&p->tls_thread);
475 }
476
477 if (p->thread_created) {
478 qemu_thread_join(&p->thread);
479 }
480 }
481 }
482
multifd_send_cleanup_channel(MultiFDSendParams * p,Error ** errp)483 static bool multifd_send_cleanup_channel(MultiFDSendParams *p, Error **errp)
484 {
485 if (p->c) {
486 migration_ioc_unregister_yank(p->c);
487 /*
488 * The object_unref() cannot guarantee the fd will always be
489 * released because finalize() of the iochannel is only
490 * triggered on the last reference and it's not guaranteed
491 * that we always hold the last refcount when reaching here.
492 *
493 * Closing the fd explicitly has the benefit that if there is any
494 * registered I/O handler callbacks on such fd, that will get a
495 * POLLNVAL event and will further trigger the cleanup to finally
496 * release the IOC.
497 *
498 * FIXME: It should logically be guaranteed that all multifd
499 * channels have no I/O handler callback registered when reaching
500 * here, because migration thread will wait for all multifd channel
501 * establishments to complete during setup. Since
502 * migration_cleanup() will be scheduled in main thread too, all
503 * previous callbacks should guarantee to be completed when
504 * reaching here. See multifd_send_state.channels_created and its
505 * usage. In the future, we could replace this with an assert
506 * making sure we're the last reference, or simply drop it if above
507 * is more clear to be justified.
508 */
509 qio_channel_close(p->c, &error_abort);
510 object_unref(OBJECT(p->c));
511 p->c = NULL;
512 }
513 qemu_sem_destroy(&p->sem);
514 qemu_sem_destroy(&p->sem_sync);
515 g_free(p->name);
516 p->name = NULL;
517 g_clear_pointer(&p->data, multifd_send_data_free);
518 p->packet_len = 0;
519 g_clear_pointer(&p->packet_device_state, g_free);
520 g_free(p->packet);
521 p->packet = NULL;
522 multifd_send_state->ops->send_cleanup(p, errp);
523 assert(!p->iov);
524
525 return *errp == NULL;
526 }
527
multifd_send_cleanup_state(void)528 static void multifd_send_cleanup_state(void)
529 {
530 file_cleanup_outgoing_migration();
531 socket_cleanup_outgoing_migration();
532 multifd_device_state_send_cleanup();
533 qemu_sem_destroy(&multifd_send_state->channels_created);
534 qemu_sem_destroy(&multifd_send_state->channels_ready);
535 qemu_mutex_destroy(&multifd_send_state->multifd_send_mutex);
536 g_free(multifd_send_state->params);
537 multifd_send_state->params = NULL;
538 g_free(multifd_send_state);
539 multifd_send_state = NULL;
540 }
541
multifd_send_shutdown(void)542 void multifd_send_shutdown(void)
543 {
544 int i;
545
546 if (!migrate_multifd()) {
547 return;
548 }
549
550 for (i = 0; i < migrate_multifd_channels(); i++) {
551 MultiFDSendParams *p = &multifd_send_state->params[i];
552
553 /* thread_created implies the TLS handshake has succeeded */
554 if (p->tls_thread_created && p->thread_created) {
555 Error *local_err = NULL;
556 /*
557 * The destination expects the TLS session to always be
558 * properly terminated. This helps to detect a premature
559 * termination in the middle of the stream. Note that
560 * older QEMUs always break the connection on the source
561 * and the destination always sees
562 * GNUTLS_E_PREMATURE_TERMINATION.
563 */
564 migration_tls_channel_end(p->c, &local_err);
565
566 /*
567 * The above can return an error in case the migration has
568 * already failed. If the migration succeeded, errors are
569 * not expected but there's no need to kill the source.
570 */
571 if (local_err && !migration_has_failed(migrate_get_current())) {
572 warn_report(
573 "multifd_send_%d: Failed to terminate TLS connection: %s",
574 p->id, error_get_pretty(local_err));
575 break;
576 }
577 }
578 }
579
580 multifd_send_terminate_threads();
581
582 for (i = 0; i < migrate_multifd_channels(); i++) {
583 MultiFDSendParams *p = &multifd_send_state->params[i];
584 Error *local_err = NULL;
585
586 if (!multifd_send_cleanup_channel(p, &local_err)) {
587 migrate_set_error(migrate_get_current(), local_err);
588 error_free(local_err);
589 }
590 }
591
592 multifd_send_cleanup_state();
593 }
594
multifd_zero_copy_flush(QIOChannel * c)595 static int multifd_zero_copy_flush(QIOChannel *c)
596 {
597 int ret;
598 Error *err = NULL;
599
600 ret = qio_channel_flush(c, &err);
601 if (ret < 0) {
602 error_report_err(err);
603 return -1;
604 }
605 if (ret == 1) {
606 stat64_add(&mig_stats.dirty_sync_missed_zero_copy, 1);
607 }
608
609 return ret;
610 }
611
multifd_send_sync_main(MultiFDSyncReq req)612 int multifd_send_sync_main(MultiFDSyncReq req)
613 {
614 int i;
615 bool flush_zero_copy;
616
617 assert(req != MULTIFD_SYNC_NONE);
618
619 flush_zero_copy = migrate_zero_copy_send();
620
621 for (i = 0; i < migrate_multifd_channels(); i++) {
622 MultiFDSendParams *p = &multifd_send_state->params[i];
623
624 if (multifd_send_should_exit()) {
625 return -1;
626 }
627
628 trace_multifd_send_sync_main_signal(p->id);
629
630 /*
631 * We should be the only user so far, so not possible to be set by
632 * others concurrently.
633 */
634 assert(qatomic_read(&p->pending_sync) == MULTIFD_SYNC_NONE);
635 qatomic_set(&p->pending_sync, req);
636 qemu_sem_post(&p->sem);
637 }
638 for (i = 0; i < migrate_multifd_channels(); i++) {
639 MultiFDSendParams *p = &multifd_send_state->params[i];
640
641 if (multifd_send_should_exit()) {
642 return -1;
643 }
644
645 qemu_sem_wait(&multifd_send_state->channels_ready);
646 trace_multifd_send_sync_main_wait(p->id);
647 qemu_sem_wait(&p->sem_sync);
648
649 if (flush_zero_copy && p->c && (multifd_zero_copy_flush(p->c) < 0)) {
650 return -1;
651 }
652 }
653 trace_multifd_send_sync_main(multifd_send_state->packet_num);
654
655 return 0;
656 }
657
multifd_send_thread(void * opaque)658 static void *multifd_send_thread(void *opaque)
659 {
660 MultiFDSendParams *p = opaque;
661 MigrationThread *thread = NULL;
662 Error *local_err = NULL;
663 int ret = 0;
664 bool use_packets = multifd_use_packets();
665
666 thread = migration_threads_add(p->name, qemu_get_thread_id());
667
668 trace_multifd_send_thread_start(p->id);
669 rcu_register_thread();
670
671 if (use_packets) {
672 if (multifd_send_initial_packet(p, &local_err) < 0) {
673 ret = -1;
674 goto out;
675 }
676 }
677
678 while (true) {
679 qemu_sem_post(&multifd_send_state->channels_ready);
680 qemu_sem_wait(&p->sem);
681
682 if (multifd_send_should_exit()) {
683 break;
684 }
685
686 /*
687 * Read pending_job flag before p->data. Pairs with the
688 * qatomic_store_release() in multifd_send().
689 */
690 if (qatomic_load_acquire(&p->pending_job)) {
691 bool is_device_state = multifd_payload_device_state(p->data);
692 size_t total_size;
693 int write_flags_masked = 0;
694
695 p->flags = 0;
696 p->iovs_num = 0;
697 assert(!multifd_payload_empty(p->data));
698
699 if (is_device_state) {
700 multifd_device_state_send_prepare(p);
701
702 /* Device state packets cannot be sent via zerocopy */
703 write_flags_masked |= QIO_CHANNEL_WRITE_FLAG_ZERO_COPY;
704 } else {
705 ret = multifd_send_state->ops->send_prepare(p, &local_err);
706 if (ret != 0) {
707 break;
708 }
709 }
710
711 /*
712 * The packet header in the zerocopy RAM case is accounted for
713 * in multifd_nocomp_send_prepare() - where it is actually
714 * being sent.
715 */
716 total_size = iov_size(p->iov, p->iovs_num);
717
718 if (migrate_mapped_ram()) {
719 assert(!is_device_state);
720
721 ret = file_write_ramblock_iov(p->c, p->iov, p->iovs_num,
722 &p->data->u.ram, &local_err);
723 } else {
724 ret = qio_channel_writev_full_all(p->c, p->iov, p->iovs_num,
725 NULL, 0,
726 p->write_flags & ~write_flags_masked,
727 &local_err);
728 }
729
730 if (ret != 0) {
731 break;
732 }
733
734 stat64_add(&mig_stats.multifd_bytes, total_size);
735
736 p->next_packet_size = 0;
737 multifd_send_data_clear(p->data);
738
739 /*
740 * Making sure p->data is published before saying "we're
741 * free". Pairs with the smp_mb_acquire() in
742 * multifd_send().
743 */
744 qatomic_store_release(&p->pending_job, false);
745 } else {
746 MultiFDSyncReq req = qatomic_read(&p->pending_sync);
747
748 /*
749 * If not a normal job, must be a sync request. Note that
750 * pending_sync is a standalone flag (unlike pending_job), so
751 * it doesn't require explicit memory barriers.
752 */
753 assert(req != MULTIFD_SYNC_NONE);
754
755 /* Only push the SYNC message if it involves a remote sync */
756 if (req == MULTIFD_SYNC_ALL) {
757 p->flags = MULTIFD_FLAG_SYNC;
758 multifd_send_fill_packet(p);
759 ret = qio_channel_write_all(p->c, (void *)p->packet,
760 p->packet_len, &local_err);
761 if (ret != 0) {
762 break;
763 }
764 /* p->next_packet_size will always be zero for a SYNC packet */
765 stat64_add(&mig_stats.multifd_bytes, p->packet_len);
766 }
767
768 qatomic_set(&p->pending_sync, MULTIFD_SYNC_NONE);
769 qemu_sem_post(&p->sem_sync);
770 }
771 }
772
773 out:
774 if (ret) {
775 assert(local_err);
776 trace_multifd_send_error(p->id);
777 multifd_send_set_error(local_err);
778 multifd_send_kick_main(p);
779 error_free(local_err);
780 }
781
782 rcu_unregister_thread();
783 migration_threads_remove(thread);
784 trace_multifd_send_thread_end(p->id, p->packets_sent);
785
786 return NULL;
787 }
788
789 static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque);
790
791 typedef struct {
792 MultiFDSendParams *p;
793 QIOChannelTLS *tioc;
794 } MultiFDTLSThreadArgs;
795
multifd_tls_handshake_thread(void * opaque)796 static void *multifd_tls_handshake_thread(void *opaque)
797 {
798 MultiFDTLSThreadArgs *args = opaque;
799
800 qio_channel_tls_handshake(args->tioc,
801 multifd_new_send_channel_async,
802 args->p,
803 NULL,
804 NULL);
805 g_free(args);
806
807 return NULL;
808 }
809
multifd_tls_channel_connect(MultiFDSendParams * p,QIOChannel * ioc,Error ** errp)810 static bool multifd_tls_channel_connect(MultiFDSendParams *p,
811 QIOChannel *ioc,
812 Error **errp)
813 {
814 MigrationState *s = migrate_get_current();
815 const char *hostname = s->hostname;
816 MultiFDTLSThreadArgs *args;
817 QIOChannelTLS *tioc;
818
819 tioc = migration_tls_client_create(ioc, hostname, errp);
820 if (!tioc) {
821 return false;
822 }
823
824 /*
825 * Ownership of the socket channel now transfers to the newly
826 * created TLS channel, which has already taken a reference.
827 */
828 object_unref(OBJECT(ioc));
829 trace_multifd_tls_outgoing_handshake_start(ioc, tioc, hostname);
830 qio_channel_set_name(QIO_CHANNEL(tioc), "multifd-tls-outgoing");
831
832 args = g_new0(MultiFDTLSThreadArgs, 1);
833 args->tioc = tioc;
834 args->p = p;
835
836 p->tls_thread_created = true;
837 qemu_thread_create(&p->tls_thread, MIGRATION_THREAD_SRC_TLS,
838 multifd_tls_handshake_thread, args,
839 QEMU_THREAD_JOINABLE);
840 return true;
841 }
842
multifd_channel_connect(MultiFDSendParams * p,QIOChannel * ioc)843 void multifd_channel_connect(MultiFDSendParams *p, QIOChannel *ioc)
844 {
845 qio_channel_set_delay(ioc, false);
846
847 migration_ioc_register_yank(ioc);
848 /* Setup p->c only if the channel is completely setup */
849 p->c = ioc;
850
851 p->thread_created = true;
852 qemu_thread_create(&p->thread, p->name, multifd_send_thread, p,
853 QEMU_THREAD_JOINABLE);
854 }
855
856 /*
857 * When TLS is enabled this function is called once to establish the
858 * TLS connection and a second time after the TLS handshake to create
859 * the multifd channel. Without TLS it goes straight into the channel
860 * creation.
861 */
multifd_new_send_channel_async(QIOTask * task,gpointer opaque)862 static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque)
863 {
864 MultiFDSendParams *p = opaque;
865 QIOChannel *ioc = QIO_CHANNEL(qio_task_get_source(task));
866 Error *local_err = NULL;
867 bool ret;
868
869 trace_multifd_new_send_channel_async(p->id);
870
871 if (qio_task_propagate_error(task, &local_err)) {
872 ret = false;
873 goto out;
874 }
875
876 trace_multifd_set_outgoing_channel(ioc, object_get_typename(OBJECT(ioc)),
877 migrate_get_current()->hostname);
878
879 if (migrate_channel_requires_tls_upgrade(ioc)) {
880 ret = multifd_tls_channel_connect(p, ioc, &local_err);
881 if (ret) {
882 return;
883 }
884 } else {
885 multifd_channel_connect(p, ioc);
886 ret = true;
887 }
888
889 out:
890 /*
891 * Here we're not interested whether creation succeeded, only that
892 * it happened at all.
893 */
894 multifd_send_channel_created();
895
896 if (ret) {
897 return;
898 }
899
900 trace_multifd_new_send_channel_async_error(p->id, local_err);
901 multifd_send_set_error(local_err);
902 /*
903 * For error cases (TLS or non-TLS), IO channel is always freed here
904 * rather than when cleanup multifd: since p->c is not set, multifd
905 * cleanup code doesn't even know its existence.
906 */
907 object_unref(OBJECT(ioc));
908 error_free(local_err);
909 }
910
multifd_new_send_channel_create(gpointer opaque,Error ** errp)911 static bool multifd_new_send_channel_create(gpointer opaque, Error **errp)
912 {
913 if (!multifd_use_packets()) {
914 return file_send_channel_create(opaque, errp);
915 }
916
917 socket_send_channel_create(multifd_new_send_channel_async, opaque);
918 return true;
919 }
920
multifd_send_setup(void)921 bool multifd_send_setup(void)
922 {
923 MigrationState *s = migrate_get_current();
924 int thread_count, ret = 0;
925 uint32_t page_count = multifd_ram_page_count();
926 bool use_packets = multifd_use_packets();
927 uint8_t i;
928
929 if (!migrate_multifd()) {
930 return true;
931 }
932
933 thread_count = migrate_multifd_channels();
934 multifd_send_state = g_malloc0(sizeof(*multifd_send_state));
935 multifd_send_state->params = g_new0(MultiFDSendParams, thread_count);
936 qemu_mutex_init(&multifd_send_state->multifd_send_mutex);
937 qemu_sem_init(&multifd_send_state->channels_created, 0);
938 qemu_sem_init(&multifd_send_state->channels_ready, 0);
939 qatomic_set(&multifd_send_state->exiting, 0);
940 multifd_send_state->ops = multifd_ops[migrate_multifd_compression()];
941
942 for (i = 0; i < thread_count; i++) {
943 MultiFDSendParams *p = &multifd_send_state->params[i];
944 Error *local_err = NULL;
945
946 qemu_sem_init(&p->sem, 0);
947 qemu_sem_init(&p->sem_sync, 0);
948 p->id = i;
949 p->data = multifd_send_data_alloc();
950
951 if (use_packets) {
952 p->packet_len = sizeof(MultiFDPacket_t)
953 + sizeof(uint64_t) * page_count;
954 p->packet = g_malloc0(p->packet_len);
955 p->packet_device_state = g_malloc0(sizeof(*p->packet_device_state));
956 p->packet_device_state->hdr.magic = cpu_to_be32(MULTIFD_MAGIC);
957 p->packet_device_state->hdr.version = cpu_to_be32(MULTIFD_VERSION);
958 }
959 p->name = g_strdup_printf(MIGRATION_THREAD_SRC_MULTIFD, i);
960 p->write_flags = 0;
961
962 if (!multifd_new_send_channel_create(p, &local_err)) {
963 migrate_set_error(s, local_err);
964 ret = -1;
965 }
966 }
967
968 /*
969 * Wait until channel creation has started for all channels. The
970 * creation can still fail, but no more channels will be created
971 * past this point.
972 */
973 for (i = 0; i < thread_count; i++) {
974 qemu_sem_wait(&multifd_send_state->channels_created);
975 }
976
977 if (ret) {
978 goto err;
979 }
980
981 for (i = 0; i < thread_count; i++) {
982 MultiFDSendParams *p = &multifd_send_state->params[i];
983 Error *local_err = NULL;
984
985 ret = multifd_send_state->ops->send_setup(p, &local_err);
986 if (ret) {
987 migrate_set_error(s, local_err);
988 goto err;
989 }
990 assert(p->iov);
991 }
992
993 multifd_device_state_send_setup();
994
995 return true;
996
997 err:
998 migrate_set_state(&s->state, MIGRATION_STATUS_SETUP,
999 MIGRATION_STATUS_FAILED);
1000 return false;
1001 }
1002
multifd_recv(void)1003 bool multifd_recv(void)
1004 {
1005 int i;
1006 static int next_recv_channel;
1007 MultiFDRecvParams *p = NULL;
1008 MultiFDRecvData *data = multifd_recv_state->data;
1009
1010 /*
1011 * next_channel can remain from a previous migration that was
1012 * using more channels, so ensure it doesn't overflow if the
1013 * limit is lower now.
1014 */
1015 next_recv_channel %= migrate_multifd_channels();
1016 for (i = next_recv_channel;; i = (i + 1) % migrate_multifd_channels()) {
1017 if (multifd_recv_should_exit()) {
1018 return false;
1019 }
1020
1021 p = &multifd_recv_state->params[i];
1022
1023 if (qatomic_read(&p->pending_job) == false) {
1024 next_recv_channel = (i + 1) % migrate_multifd_channels();
1025 break;
1026 }
1027 }
1028
1029 /*
1030 * Order pending_job read before manipulating p->data below. Pairs
1031 * with qatomic_store_release() at multifd_recv_thread().
1032 */
1033 smp_mb_acquire();
1034
1035 assert(!p->data->size);
1036 multifd_recv_state->data = p->data;
1037 p->data = data;
1038
1039 /*
1040 * Order p->data update before setting pending_job. Pairs with
1041 * qatomic_load_acquire() at multifd_recv_thread().
1042 */
1043 qatomic_store_release(&p->pending_job, true);
1044 qemu_sem_post(&p->sem);
1045
1046 return true;
1047 }
1048
multifd_get_recv_data(void)1049 MultiFDRecvData *multifd_get_recv_data(void)
1050 {
1051 return multifd_recv_state->data;
1052 }
1053
multifd_recv_terminate_threads(Error * err)1054 static void multifd_recv_terminate_threads(Error *err)
1055 {
1056 int i;
1057
1058 trace_multifd_recv_terminate_threads(err != NULL);
1059
1060 if (qatomic_xchg(&multifd_recv_state->exiting, 1)) {
1061 return;
1062 }
1063
1064 if (err) {
1065 MigrationState *s = migrate_get_current();
1066 migrate_set_error(s, err);
1067 if (s->state == MIGRATION_STATUS_SETUP ||
1068 s->state == MIGRATION_STATUS_ACTIVE) {
1069 migrate_set_state(&s->state, s->state,
1070 MIGRATION_STATUS_FAILED);
1071 }
1072 }
1073
1074 for (i = 0; i < migrate_multifd_channels(); i++) {
1075 MultiFDRecvParams *p = &multifd_recv_state->params[i];
1076
1077 /*
1078 * The migration thread and channels interact differently
1079 * depending on the presence of packets.
1080 */
1081 if (multifd_use_packets()) {
1082 /*
1083 * The channel receives as long as there are packets. When
1084 * packets end (i.e. MULTIFD_FLAG_SYNC is reached), the
1085 * channel waits for the migration thread to sync. If the
1086 * sync never happens, do it here.
1087 */
1088 qemu_sem_post(&p->sem_sync);
1089 } else {
1090 /*
1091 * The channel waits for the migration thread to give it
1092 * work. When the migration thread runs out of work, it
1093 * releases the channel and waits for any pending work to
1094 * finish. If we reach here (e.g. due to error) before the
1095 * work runs out, release the channel.
1096 */
1097 qemu_sem_post(&p->sem);
1098 }
1099
1100 /*
1101 * We could arrive here for two reasons:
1102 * - normal quit, i.e. everything went fine, just finished
1103 * - error quit: We close the channels so the channel threads
1104 * finish the qio_channel_read_all_eof()
1105 */
1106 if (p->c) {
1107 qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
1108 }
1109 }
1110 }
1111
multifd_recv_shutdown(void)1112 void multifd_recv_shutdown(void)
1113 {
1114 if (migrate_multifd()) {
1115 multifd_recv_terminate_threads(NULL);
1116 }
1117 }
1118
multifd_recv_cleanup_channel(MultiFDRecvParams * p)1119 static void multifd_recv_cleanup_channel(MultiFDRecvParams *p)
1120 {
1121 migration_ioc_unregister_yank(p->c);
1122 object_unref(OBJECT(p->c));
1123 p->c = NULL;
1124 qemu_mutex_destroy(&p->mutex);
1125 qemu_sem_destroy(&p->sem_sync);
1126 qemu_sem_destroy(&p->sem);
1127 g_free(p->data);
1128 p->data = NULL;
1129 g_free(p->name);
1130 p->name = NULL;
1131 p->packet_len = 0;
1132 g_free(p->packet);
1133 p->packet = NULL;
1134 g_clear_pointer(&p->packet_dev_state, g_free);
1135 g_free(p->normal);
1136 p->normal = NULL;
1137 g_free(p->zero);
1138 p->zero = NULL;
1139 multifd_recv_state->ops->recv_cleanup(p);
1140 }
1141
multifd_recv_cleanup_state(void)1142 static void multifd_recv_cleanup_state(void)
1143 {
1144 qemu_sem_destroy(&multifd_recv_state->sem_sync);
1145 g_free(multifd_recv_state->params);
1146 multifd_recv_state->params = NULL;
1147 g_free(multifd_recv_state->data);
1148 multifd_recv_state->data = NULL;
1149 g_free(multifd_recv_state);
1150 multifd_recv_state = NULL;
1151 }
1152
multifd_recv_cleanup(void)1153 void multifd_recv_cleanup(void)
1154 {
1155 int i;
1156
1157 if (!migrate_multifd()) {
1158 return;
1159 }
1160 multifd_recv_terminate_threads(NULL);
1161 for (i = 0; i < migrate_multifd_channels(); i++) {
1162 MultiFDRecvParams *p = &multifd_recv_state->params[i];
1163
1164 if (p->thread_created) {
1165 qemu_thread_join(&p->thread);
1166 }
1167 }
1168 for (i = 0; i < migrate_multifd_channels(); i++) {
1169 multifd_recv_cleanup_channel(&multifd_recv_state->params[i]);
1170 }
1171 multifd_recv_cleanup_state();
1172 }
1173
multifd_recv_sync_main(void)1174 void multifd_recv_sync_main(void)
1175 {
1176 int thread_count = migrate_multifd_channels();
1177 bool file_based = !multifd_use_packets();
1178 int i;
1179
1180 if (!migrate_multifd()) {
1181 return;
1182 }
1183
1184 /*
1185 * File-based channels don't use packets and therefore need to
1186 * wait for more work. Release them to start the sync.
1187 */
1188 if (file_based) {
1189 for (i = 0; i < thread_count; i++) {
1190 MultiFDRecvParams *p = &multifd_recv_state->params[i];
1191
1192 trace_multifd_recv_sync_main_signal(p->id);
1193 qemu_sem_post(&p->sem);
1194 }
1195 }
1196
1197 /*
1198 * Initiate the synchronization by waiting for all channels.
1199 *
1200 * For socket-based migration this means each channel has received
1201 * the SYNC packet on the stream.
1202 *
1203 * For file-based migration this means each channel is done with
1204 * the work (pending_job=false).
1205 */
1206 for (i = 0; i < thread_count; i++) {
1207 trace_multifd_recv_sync_main_wait(i);
1208 qemu_sem_wait(&multifd_recv_state->sem_sync);
1209 }
1210
1211 if (file_based) {
1212 /*
1213 * For file-based loading is done in one iteration. We're
1214 * done.
1215 */
1216 return;
1217 }
1218
1219 /*
1220 * Sync done. Release the channels for the next iteration.
1221 */
1222 for (i = 0; i < thread_count; i++) {
1223 MultiFDRecvParams *p = &multifd_recv_state->params[i];
1224
1225 WITH_QEMU_LOCK_GUARD(&p->mutex) {
1226 if (multifd_recv_state->packet_num < p->packet_num) {
1227 multifd_recv_state->packet_num = p->packet_num;
1228 }
1229 }
1230 trace_multifd_recv_sync_main_signal(p->id);
1231 qemu_sem_post(&p->sem_sync);
1232 }
1233 trace_multifd_recv_sync_main(multifd_recv_state->packet_num);
1234 }
1235
multifd_device_state_recv(MultiFDRecvParams * p,Error ** errp)1236 static int multifd_device_state_recv(MultiFDRecvParams *p, Error **errp)
1237 {
1238 g_autofree char *dev_state_buf = NULL;
1239 int ret;
1240
1241 dev_state_buf = g_malloc(p->next_packet_size);
1242
1243 ret = qio_channel_read_all(p->c, dev_state_buf, p->next_packet_size, errp);
1244 if (ret != 0) {
1245 return ret;
1246 }
1247
1248 if (p->packet_dev_state->idstr[sizeof(p->packet_dev_state->idstr) - 1]
1249 != 0) {
1250 error_setg(errp, "unterminated multifd device state idstr");
1251 return -1;
1252 }
1253
1254 if (!qemu_loadvm_load_state_buffer(p->packet_dev_state->idstr,
1255 p->packet_dev_state->instance_id,
1256 dev_state_buf, p->next_packet_size,
1257 errp)) {
1258 ret = -1;
1259 }
1260
1261 return ret;
1262 }
1263
multifd_recv_thread(void * opaque)1264 static void *multifd_recv_thread(void *opaque)
1265 {
1266 MigrationState *s = migrate_get_current();
1267 MultiFDRecvParams *p = opaque;
1268 Error *local_err = NULL;
1269 bool use_packets = multifd_use_packets();
1270 int ret;
1271
1272 trace_multifd_recv_thread_start(p->id);
1273 rcu_register_thread();
1274
1275 if (!s->multifd_clean_tls_termination) {
1276 p->read_flags = QIO_CHANNEL_READ_FLAG_RELAXED_EOF;
1277 }
1278
1279 while (true) {
1280 MultiFDPacketHdr_t hdr;
1281 uint32_t flags = 0;
1282 bool is_device_state = false;
1283 bool has_data = false;
1284 uint8_t *pkt_buf;
1285 size_t pkt_len;
1286
1287 p->normal_num = 0;
1288
1289 if (use_packets) {
1290 struct iovec iov = {
1291 .iov_base = (void *)&hdr,
1292 .iov_len = sizeof(hdr)
1293 };
1294
1295 if (multifd_recv_should_exit()) {
1296 break;
1297 }
1298
1299 ret = qio_channel_readv_full_all_eof(p->c, &iov, 1, NULL, NULL,
1300 p->read_flags, &local_err);
1301 if (!ret) {
1302 /* EOF */
1303 assert(!local_err);
1304 break;
1305 }
1306
1307 if (ret == -1) {
1308 break;
1309 }
1310
1311 ret = multifd_recv_unfill_packet_header(p, &hdr, &local_err);
1312 if (ret) {
1313 break;
1314 }
1315
1316 is_device_state = p->flags & MULTIFD_FLAG_DEVICE_STATE;
1317 if (is_device_state) {
1318 pkt_buf = (uint8_t *)p->packet_dev_state + sizeof(hdr);
1319 pkt_len = sizeof(*p->packet_dev_state) - sizeof(hdr);
1320 } else {
1321 pkt_buf = (uint8_t *)p->packet + sizeof(hdr);
1322 pkt_len = p->packet_len - sizeof(hdr);
1323 }
1324
1325 ret = qio_channel_read_all_eof(p->c, (char *)pkt_buf, pkt_len,
1326 &local_err);
1327 if (!ret) {
1328 /* EOF */
1329 error_setg(&local_err, "multifd: unexpected EOF after packet header");
1330 break;
1331 }
1332
1333 if (ret == -1) {
1334 break;
1335 }
1336
1337 qemu_mutex_lock(&p->mutex);
1338 ret = multifd_recv_unfill_packet(p, &local_err);
1339 if (ret) {
1340 qemu_mutex_unlock(&p->mutex);
1341 break;
1342 }
1343
1344 flags = p->flags;
1345 /* recv methods don't know how to handle the SYNC flag */
1346 p->flags &= ~MULTIFD_FLAG_SYNC;
1347
1348 if (is_device_state) {
1349 has_data = p->next_packet_size > 0;
1350 } else {
1351 /*
1352 * Even if it's a SYNC packet, this needs to be set
1353 * because older QEMUs (<9.0) still send data along with
1354 * the SYNC packet.
1355 */
1356 has_data = p->normal_num || p->zero_num;
1357 }
1358
1359 qemu_mutex_unlock(&p->mutex);
1360 } else {
1361 /*
1362 * No packets, so we need to wait for the vmstate code to
1363 * give us work.
1364 */
1365 qemu_sem_wait(&p->sem);
1366
1367 if (multifd_recv_should_exit()) {
1368 break;
1369 }
1370
1371 /* pairs with qatomic_store_release() at multifd_recv() */
1372 if (!qatomic_load_acquire(&p->pending_job)) {
1373 /*
1374 * Migration thread did not send work, this is
1375 * equivalent to pending_sync on the sending
1376 * side. Post sem_sync to notify we reached this
1377 * point.
1378 */
1379 qemu_sem_post(&multifd_recv_state->sem_sync);
1380 continue;
1381 }
1382
1383 has_data = !!p->data->size;
1384 }
1385
1386 if (has_data) {
1387 /*
1388 * multifd thread should not be active and receive data
1389 * when migration is in the Postcopy phase. Two threads
1390 * writing the same memory area could easily corrupt
1391 * the guest state.
1392 */
1393 assert(!migration_in_postcopy());
1394 if (is_device_state) {
1395 assert(use_packets);
1396 ret = multifd_device_state_recv(p, &local_err);
1397 } else {
1398 ret = multifd_recv_state->ops->recv(p, &local_err);
1399 }
1400 if (ret != 0) {
1401 break;
1402 }
1403 } else if (is_device_state) {
1404 error_setg(&local_err,
1405 "multifd: received empty device state packet");
1406 break;
1407 }
1408
1409 if (use_packets) {
1410 if (flags & MULTIFD_FLAG_SYNC) {
1411 if (is_device_state) {
1412 error_setg(&local_err,
1413 "multifd: received SYNC device state packet");
1414 break;
1415 }
1416
1417 qemu_sem_post(&multifd_recv_state->sem_sync);
1418 qemu_sem_wait(&p->sem_sync);
1419 }
1420 } else {
1421 p->data->size = 0;
1422 /*
1423 * Order data->size update before clearing
1424 * pending_job. Pairs with smp_mb_acquire() at
1425 * multifd_recv().
1426 */
1427 qatomic_store_release(&p->pending_job, false);
1428 }
1429 }
1430
1431 if (local_err) {
1432 multifd_recv_terminate_threads(local_err);
1433 error_free(local_err);
1434 }
1435
1436 rcu_unregister_thread();
1437 trace_multifd_recv_thread_end(p->id, p->packets_recved);
1438
1439 return NULL;
1440 }
1441
multifd_recv_setup(Error ** errp)1442 int multifd_recv_setup(Error **errp)
1443 {
1444 int thread_count;
1445 uint32_t page_count = multifd_ram_page_count();
1446 bool use_packets = multifd_use_packets();
1447 uint8_t i;
1448
1449 /*
1450 * Return successfully if multiFD recv state is already initialised
1451 * or multiFD is not enabled.
1452 */
1453 if (multifd_recv_state || !migrate_multifd()) {
1454 return 0;
1455 }
1456
1457 thread_count = migrate_multifd_channels();
1458 multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state));
1459 multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count);
1460
1461 multifd_recv_state->data = g_new0(MultiFDRecvData, 1);
1462 multifd_recv_state->data->size = 0;
1463
1464 qatomic_set(&multifd_recv_state->count, 0);
1465 qatomic_set(&multifd_recv_state->exiting, 0);
1466 qemu_sem_init(&multifd_recv_state->sem_sync, 0);
1467 multifd_recv_state->ops = multifd_ops[migrate_multifd_compression()];
1468
1469 for (i = 0; i < thread_count; i++) {
1470 MultiFDRecvParams *p = &multifd_recv_state->params[i];
1471
1472 qemu_mutex_init(&p->mutex);
1473 qemu_sem_init(&p->sem_sync, 0);
1474 qemu_sem_init(&p->sem, 0);
1475 p->pending_job = false;
1476 p->id = i;
1477
1478 p->data = g_new0(MultiFDRecvData, 1);
1479 p->data->size = 0;
1480
1481 if (use_packets) {
1482 p->packet_len = sizeof(MultiFDPacket_t)
1483 + sizeof(uint64_t) * page_count;
1484 p->packet = g_malloc0(p->packet_len);
1485 p->packet_dev_state = g_malloc0(sizeof(*p->packet_dev_state));
1486 }
1487 p->name = g_strdup_printf(MIGRATION_THREAD_DST_MULTIFD, i);
1488 p->normal = g_new0(ram_addr_t, page_count);
1489 p->zero = g_new0(ram_addr_t, page_count);
1490 }
1491
1492 for (i = 0; i < thread_count; i++) {
1493 MultiFDRecvParams *p = &multifd_recv_state->params[i];
1494 int ret;
1495
1496 ret = multifd_recv_state->ops->recv_setup(p, errp);
1497 if (ret) {
1498 return ret;
1499 }
1500 }
1501 return 0;
1502 }
1503
multifd_recv_all_channels_created(void)1504 bool multifd_recv_all_channels_created(void)
1505 {
1506 int thread_count = migrate_multifd_channels();
1507
1508 if (!migrate_multifd()) {
1509 return true;
1510 }
1511
1512 if (!multifd_recv_state) {
1513 /* Called before any connections created */
1514 return false;
1515 }
1516
1517 return thread_count == qatomic_read(&multifd_recv_state->count);
1518 }
1519
1520 /*
1521 * Try to receive all multifd channels to get ready for the migration.
1522 * Sets @errp when failing to receive the current channel.
1523 */
multifd_recv_new_channel(QIOChannel * ioc,Error ** errp)1524 void multifd_recv_new_channel(QIOChannel *ioc, Error **errp)
1525 {
1526 MultiFDRecvParams *p;
1527 Error *local_err = NULL;
1528 bool use_packets = multifd_use_packets();
1529 int id;
1530
1531 if (use_packets) {
1532 id = multifd_recv_initial_packet(ioc, &local_err);
1533 if (id < 0) {
1534 multifd_recv_terminate_threads(local_err);
1535 error_propagate_prepend(errp, local_err,
1536 "failed to receive packet"
1537 " via multifd channel %d: ",
1538 qatomic_read(&multifd_recv_state->count));
1539 return;
1540 }
1541 trace_multifd_recv_new_channel(id);
1542 } else {
1543 id = qatomic_read(&multifd_recv_state->count);
1544 }
1545
1546 p = &multifd_recv_state->params[id];
1547 if (p->c != NULL) {
1548 error_setg(&local_err, "multifd: received id '%d' already setup'",
1549 id);
1550 multifd_recv_terminate_threads(local_err);
1551 error_propagate(errp, local_err);
1552 return;
1553 }
1554 p->c = ioc;
1555 object_ref(OBJECT(ioc));
1556
1557 p->thread_created = true;
1558 qemu_thread_create(&p->thread, p->name, multifd_recv_thread, p,
1559 QEMU_THREAD_JOINABLE);
1560 qatomic_inc(&multifd_recv_state->count);
1561 }
1562