xref: /qemu/migration/multifd-device-state.c (revision d64db833d6e3cbe9ea5f36342480f920f3675cea)
1 /*
2  * Multifd device state migration
3  *
4  * Copyright (C) 2024,2025 Oracle and/or its affiliates.
5  *
6  * This work is licensed under the terms of the GNU GPL, version 2 or later.
7  * See the COPYING file in the top-level directory.
8  *
9  * SPDX-License-Identifier: GPL-2.0-or-later
10  */
11 
12 #include "qemu/osdep.h"
13 #include "qapi/error.h"
14 #include "qemu/lockable.h"
15 #include "block/thread-pool.h"
16 #include "migration.h"
17 #include "migration/misc.h"
18 #include "multifd.h"
19 #include "options.h"
20 
21 static struct {
22     QemuMutex queue_job_mutex;
23 
24     MultiFDSendData *send_data;
25 
26     ThreadPool *threads;
27     bool threads_abort;
28 } *multifd_send_device_state;
29 
30 void multifd_device_state_send_setup(void)
31 {
32     assert(!multifd_send_device_state);
33     multifd_send_device_state = g_malloc(sizeof(*multifd_send_device_state));
34 
35     qemu_mutex_init(&multifd_send_device_state->queue_job_mutex);
36 
37     multifd_send_device_state->send_data = multifd_send_data_alloc();
38 
39     multifd_send_device_state->threads = thread_pool_new();
40     multifd_send_device_state->threads_abort = false;
41 }
42 
43 void multifd_device_state_send_cleanup(void)
44 {
45     g_clear_pointer(&multifd_send_device_state->threads, thread_pool_free);
46     g_clear_pointer(&multifd_send_device_state->send_data,
47                     multifd_send_data_free);
48 
49     qemu_mutex_destroy(&multifd_send_device_state->queue_job_mutex);
50 
51     g_clear_pointer(&multifd_send_device_state, g_free);
52 }
53 
54 void multifd_send_data_clear_device_state(MultiFDDeviceState_t *device_state)
55 {
56     g_clear_pointer(&device_state->idstr, g_free);
57     g_clear_pointer(&device_state->buf, g_free);
58 }
59 
60 static void multifd_device_state_fill_packet(MultiFDSendParams *p)
61 {
62     MultiFDDeviceState_t *device_state = &p->data->u.device_state;
63     MultiFDPacketDeviceState_t *packet = p->packet_device_state;
64 
65     packet->hdr.flags = cpu_to_be32(p->flags);
66     strncpy(packet->idstr, device_state->idstr, sizeof(packet->idstr) - 1);
67     packet->idstr[sizeof(packet->idstr) - 1] = 0;
68     packet->instance_id = cpu_to_be32(device_state->instance_id);
69     packet->next_packet_size = cpu_to_be32(p->next_packet_size);
70 }
71 
72 static void multifd_prepare_header_device_state(MultiFDSendParams *p)
73 {
74     p->iov[0].iov_len = sizeof(*p->packet_device_state);
75     p->iov[0].iov_base = p->packet_device_state;
76     p->iovs_num++;
77 }
78 
79 void multifd_device_state_send_prepare(MultiFDSendParams *p)
80 {
81     MultiFDDeviceState_t *device_state = &p->data->u.device_state;
82 
83     assert(multifd_payload_device_state(p->data));
84 
85     multifd_prepare_header_device_state(p);
86 
87     assert(!(p->flags & MULTIFD_FLAG_SYNC));
88 
89     p->next_packet_size = device_state->buf_len;
90     if (p->next_packet_size > 0) {
91         p->iov[p->iovs_num].iov_base = device_state->buf;
92         p->iov[p->iovs_num].iov_len = p->next_packet_size;
93         p->iovs_num++;
94     }
95 
96     p->flags |= MULTIFD_FLAG_NOCOMP | MULTIFD_FLAG_DEVICE_STATE;
97 
98     multifd_device_state_fill_packet(p);
99 }
100 
101 bool multifd_queue_device_state(char *idstr, uint32_t instance_id,
102                                 char *data, size_t len)
103 {
104     /* Device state submissions can come from multiple threads */
105     QEMU_LOCK_GUARD(&multifd_send_device_state->queue_job_mutex);
106     MultiFDDeviceState_t *device_state;
107 
108     assert(multifd_payload_empty(multifd_send_device_state->send_data));
109 
110     multifd_set_payload_type(multifd_send_device_state->send_data,
111                              MULTIFD_PAYLOAD_DEVICE_STATE);
112     device_state = &multifd_send_device_state->send_data->u.device_state;
113     device_state->idstr = g_strdup(idstr);
114     device_state->instance_id = instance_id;
115     device_state->buf = g_memdup2(data, len);
116     device_state->buf_len = len;
117 
118     if (!multifd_send(&multifd_send_device_state->send_data)) {
119         multifd_send_data_clear(multifd_send_device_state->send_data);
120         return false;
121     }
122 
123     return true;
124 }
125 
126 bool multifd_device_state_supported(void)
127 {
128     return migrate_multifd() && !migrate_mapped_ram() &&
129         migrate_multifd_compression() == MULTIFD_COMPRESSION_NONE;
130 }
131 
132 static void multifd_device_state_save_thread_data_free(void *opaque)
133 {
134     SaveLiveCompletePrecopyThreadData *data = opaque;
135 
136     g_clear_pointer(&data->idstr, g_free);
137     g_free(data);
138 }
139 
140 static int multifd_device_state_save_thread(void *opaque)
141 {
142     SaveLiveCompletePrecopyThreadData *data = opaque;
143     g_autoptr(Error) local_err = NULL;
144 
145     if (!data->hdlr(data, &local_err)) {
146         MigrationState *s = migrate_get_current();
147 
148         /*
149          * Can't call abort_device_state_save_threads() here since new
150          * save threads could still be in process of being launched
151          * (if, for example, the very first save thread launched exited
152          * with an error very quickly).
153          */
154 
155         assert(local_err);
156 
157         /*
158          * In case of multiple save threads failing which thread error
159          * return we end setting is purely arbitrary.
160          */
161         migrate_set_error(s, local_err);
162     }
163 
164     return 0;
165 }
166 
167 bool multifd_device_state_save_thread_should_exit(void)
168 {
169     return qatomic_read(&multifd_send_device_state->threads_abort);
170 }
171 
172 void
173 multifd_spawn_device_state_save_thread(SaveLiveCompletePrecopyThreadHandler hdlr,
174                                        char *idstr, uint32_t instance_id,
175                                        void *opaque)
176 {
177     SaveLiveCompletePrecopyThreadData *data;
178 
179     assert(multifd_device_state_supported());
180     assert(multifd_send_device_state);
181 
182     assert(!qatomic_read(&multifd_send_device_state->threads_abort));
183 
184     data = g_new(SaveLiveCompletePrecopyThreadData, 1);
185     data->hdlr = hdlr;
186     data->idstr = g_strdup(idstr);
187     data->instance_id = instance_id;
188     data->handler_opaque = opaque;
189 
190     thread_pool_submit_immediate(multifd_send_device_state->threads,
191                                  multifd_device_state_save_thread,
192                                  data,
193                                  multifd_device_state_save_thread_data_free);
194 }
195 
196 void multifd_abort_device_state_save_threads(void)
197 {
198     assert(multifd_device_state_supported());
199 
200     qatomic_set(&multifd_send_device_state->threads_abort, true);
201 }
202 
203 bool multifd_join_device_state_save_threads(void)
204 {
205     MigrationState *s = migrate_get_current();
206 
207     assert(multifd_device_state_supported());
208 
209     thread_pool_wait(multifd_send_device_state->threads);
210 
211     return !migrate_has_error(s);
212 }
213