xref: /qemu/tests/unit/test-aio-multithread.c (revision 96215036f47403438c7c7869b7cd419bd7a11f82)
1 /*
2  * AioContext multithreading tests
3  *
4  * Copyright Red Hat, Inc. 2016
5  *
6  * Authors:
7  *  Paolo Bonzini    <pbonzini@redhat.com>
8  *
9  * This work is licensed under the terms of the GNU LGPL, version 2 or later.
10  * See the COPYING.LIB file in the top-level directory.
11  */
12 
13 #include "qemu/osdep.h"
14 #include "block/aio.h"
15 #include "qemu/coroutine.h"
16 #include "qemu/thread.h"
17 #include "qemu/error-report.h"
18 #include "iothread.h"
19 
20 /* AioContext management */
21 
22 #define NUM_CONTEXTS 5
23 
24 static IOThread *threads[NUM_CONTEXTS];
25 static AioContext *ctx[NUM_CONTEXTS];
26 static __thread int id = -1;
27 
28 static QemuEvent done_event;
29 
30 /* Run a function synchronously on a remote iothread. */
31 
32 typedef struct CtxRunData {
33     QEMUBHFunc *cb;
34     void *arg;
35 } CtxRunData;
36 
ctx_run_bh_cb(void * opaque)37 static void ctx_run_bh_cb(void *opaque)
38 {
39     CtxRunData *data = opaque;
40 
41     data->cb(data->arg);
42     qemu_event_set(&done_event);
43 }
44 
ctx_run(int i,QEMUBHFunc * cb,void * opaque)45 static void ctx_run(int i, QEMUBHFunc *cb, void *opaque)
46 {
47     CtxRunData data = {
48         .cb = cb,
49         .arg = opaque
50     };
51 
52     qemu_event_reset(&done_event);
53     aio_bh_schedule_oneshot(ctx[i], ctx_run_bh_cb, &data);
54     qemu_event_wait(&done_event);
55 }
56 
57 /* Starting the iothreads. */
58 
set_id_cb(void * opaque)59 static void set_id_cb(void *opaque)
60 {
61     int *i = opaque;
62 
63     id = *i;
64 }
65 
create_aio_contexts(void)66 static void create_aio_contexts(void)
67 {
68     int i;
69 
70     for (i = 0; i < NUM_CONTEXTS; i++) {
71         threads[i] = iothread_new();
72         ctx[i] = iothread_get_aio_context(threads[i]);
73     }
74 
75     qemu_event_init(&done_event, false);
76     for (i = 0; i < NUM_CONTEXTS; i++) {
77         ctx_run(i, set_id_cb, &i);
78     }
79 }
80 
81 /* Stopping the iothreads. */
82 
join_aio_contexts(void)83 static void join_aio_contexts(void)
84 {
85     int i;
86 
87     for (i = 0; i < NUM_CONTEXTS; i++) {
88         aio_context_ref(ctx[i]);
89     }
90     for (i = 0; i < NUM_CONTEXTS; i++) {
91         iothread_join(threads[i]);
92     }
93     for (i = 0; i < NUM_CONTEXTS; i++) {
94         aio_context_unref(ctx[i]);
95     }
96     qemu_event_destroy(&done_event);
97 }
98 
99 /* Basic test for the stuff above. */
100 
test_lifecycle(void)101 static void test_lifecycle(void)
102 {
103     create_aio_contexts();
104     join_aio_contexts();
105 }
106 
107 /* aio_co_schedule test.  */
108 
109 static Coroutine *to_schedule[NUM_CONTEXTS];
110 static bool stop[NUM_CONTEXTS];
111 
112 static int count_retry;
113 static int count_here;
114 static int count_other;
115 
schedule_next(int n)116 static bool schedule_next(int n)
117 {
118     Coroutine *co;
119 
120     co = qatomic_xchg(&to_schedule[n], NULL);
121     if (!co) {
122         qatomic_inc(&count_retry);
123         return false;
124     }
125 
126     if (n == id) {
127         qatomic_inc(&count_here);
128     } else {
129         qatomic_inc(&count_other);
130     }
131 
132     aio_co_schedule(ctx[n], co);
133     return true;
134 }
135 
finish_cb(void * opaque)136 static void finish_cb(void *opaque)
137 {
138     stop[id] = true;
139     schedule_next(id);
140 }
141 
test_multi_co_schedule_entry(void * opaque)142 static coroutine_fn void test_multi_co_schedule_entry(void *opaque)
143 {
144     g_assert(to_schedule[id] == NULL);
145 
146     /*
147      * The next iteration will set to_schedule[id] again, but once finish_cb
148      * is scheduled there is no guarantee that it will actually be woken up,
149      * so at that point it must not go to sleep.
150      */
151     while (!stop[id]) {
152         int n;
153 
154         n = g_test_rand_int_range(0, NUM_CONTEXTS);
155         schedule_next(n);
156 
157         qatomic_set_mb(&to_schedule[id], qemu_coroutine_self());
158         /* finish_cb can run here.  */
159         qemu_coroutine_yield();
160         g_assert(to_schedule[id] == NULL);
161     }
162 }
163 
164 
test_multi_co_schedule(int seconds)165 static void test_multi_co_schedule(int seconds)
166 {
167     int i;
168 
169     count_here = count_other = count_retry = 0;
170 
171     create_aio_contexts();
172     for (i = 0; i < NUM_CONTEXTS; i++) {
173         Coroutine *co1 = qemu_coroutine_create(test_multi_co_schedule_entry, NULL);
174         aio_co_schedule(ctx[i], co1);
175     }
176 
177     g_usleep(seconds * 1000000);
178 
179     /* Guarantee that each AioContext is woken up from its last wait.  */
180     for (i = 0; i < NUM_CONTEXTS; i++) {
181         ctx_run(i, finish_cb, NULL);
182         g_assert(to_schedule[i] == NULL);
183     }
184 
185     join_aio_contexts();
186     g_test_message("scheduled %d, queued %d, retry %d, total %d",
187                   count_other, count_here, count_retry,
188                   count_here + count_other + count_retry);
189 }
190 
test_multi_co_schedule_1(void)191 static void test_multi_co_schedule_1(void)
192 {
193     test_multi_co_schedule(1);
194 }
195 
test_multi_co_schedule_10(void)196 static void test_multi_co_schedule_10(void)
197 {
198     test_multi_co_schedule(10);
199 }
200 
201 /* CoMutex thread-safety.  */
202 
203 static uint32_t atomic_counter;
204 static uint32_t running;
205 static uint32_t counter;
206 static CoMutex comutex;
207 static bool now_stopping;
208 
test_multi_co_mutex_entry(void * opaque)209 static void coroutine_fn test_multi_co_mutex_entry(void *opaque)
210 {
211     while (!qatomic_read(&now_stopping)) {
212         qemu_co_mutex_lock(&comutex);
213         counter++;
214         qemu_co_mutex_unlock(&comutex);
215 
216         /* Increase atomic_counter *after* releasing the mutex.  Otherwise
217          * there is a chance (it happens about 1 in 3 runs) that the iothread
218          * exits before the coroutine is woken up, causing a spurious
219          * assertion failure.
220          */
221         qatomic_inc(&atomic_counter);
222     }
223     qatomic_dec(&running);
224 }
225 
test_multi_co_mutex(int threads,int seconds)226 static void test_multi_co_mutex(int threads, int seconds)
227 {
228     int i;
229 
230     qemu_co_mutex_init(&comutex);
231     counter = 0;
232     atomic_counter = 0;
233     now_stopping = false;
234 
235     create_aio_contexts();
236     assert(threads <= NUM_CONTEXTS);
237     running = threads;
238     for (i = 0; i < threads; i++) {
239         Coroutine *co1 = qemu_coroutine_create(test_multi_co_mutex_entry, NULL);
240         aio_co_schedule(ctx[i], co1);
241     }
242 
243     g_usleep(seconds * 1000000);
244 
245     qatomic_set(&now_stopping, true);
246     while (running > 0) {
247         g_usleep(100000);
248     }
249 
250     join_aio_contexts();
251     g_test_message("%d iterations/second", counter / seconds);
252     g_assert_cmpint(counter, ==, atomic_counter);
253 }
254 
255 /* Testing with NUM_CONTEXTS threads focuses on the queue.  The mutex however
256  * is too contended (and the threads spend too much time in aio_poll)
257  * to actually stress the handoff protocol.
258  */
test_multi_co_mutex_1(void)259 static void test_multi_co_mutex_1(void)
260 {
261     test_multi_co_mutex(NUM_CONTEXTS, 1);
262 }
263 
test_multi_co_mutex_10(void)264 static void test_multi_co_mutex_10(void)
265 {
266     test_multi_co_mutex(NUM_CONTEXTS, 10);
267 }
268 
269 /* Testing with fewer threads stresses the handoff protocol too.  Still, the
270  * case where the locker _can_ pick up a handoff is very rare, happening
271  * about 10 times in 1 million, so increase the runtime a bit compared to
272  * other "quick" testcases that only run for 1 second.
273  */
test_multi_co_mutex_2_3(void)274 static void test_multi_co_mutex_2_3(void)
275 {
276     test_multi_co_mutex(2, 3);
277 }
278 
test_multi_co_mutex_2_30(void)279 static void test_multi_co_mutex_2_30(void)
280 {
281     test_multi_co_mutex(2, 30);
282 }
283 
284 /* Same test with fair mutexes, for performance comparison.  */
285 
286 #ifdef CONFIG_LINUX
287 #include "qemu/futex.h"
288 
289 /* The nodes for the mutex reside in this structure (on which we try to avoid
290  * false sharing).  The head of the mutex is in the "mutex_head" variable.
291  */
292 static struct {
293     int next, locked;
294     int padding[14];
295 } nodes[NUM_CONTEXTS] __attribute__((__aligned__(64)));
296 
297 static int mutex_head = -1;
298 
mcs_mutex_lock(void)299 static void mcs_mutex_lock(void)
300 {
301     int prev;
302 
303     nodes[id].next = -1;
304     nodes[id].locked = 1;
305     prev = qatomic_xchg(&mutex_head, id);
306     if (prev != -1) {
307         qatomic_set(&nodes[prev].next, id);
308         while (qatomic_read(&nodes[id].locked) == 1) {
309             qemu_futex_wait(&nodes[id].locked, 1);
310         }
311     }
312 }
313 
mcs_mutex_unlock(void)314 static void mcs_mutex_unlock(void)
315 {
316     int next;
317     if (qatomic_read(&nodes[id].next) == -1) {
318         if (qatomic_read(&mutex_head) == id &&
319             qatomic_cmpxchg(&mutex_head, id, -1) == id) {
320             /* Last item in the list, exit.  */
321             return;
322         }
323         while (qatomic_read(&nodes[id].next) == -1) {
324             /* mcs_mutex_lock did the xchg, but has not updated
325              * nodes[prev].next yet.
326              */
327         }
328     }
329 
330     /* Wake up the next in line.  */
331     next = qatomic_read(&nodes[id].next);
332     nodes[next].locked = 0;
333     qemu_futex_wake_single(&nodes[next].locked);
334 }
335 
test_multi_fair_mutex_entry(void * opaque)336 static void test_multi_fair_mutex_entry(void *opaque)
337 {
338     while (!qatomic_read(&now_stopping)) {
339         mcs_mutex_lock();
340         counter++;
341         mcs_mutex_unlock();
342         qatomic_inc(&atomic_counter);
343     }
344     qatomic_dec(&running);
345 }
346 
test_multi_fair_mutex(int threads,int seconds)347 static void test_multi_fair_mutex(int threads, int seconds)
348 {
349     int i;
350 
351     assert(mutex_head == -1);
352     counter = 0;
353     atomic_counter = 0;
354     now_stopping = false;
355 
356     create_aio_contexts();
357     assert(threads <= NUM_CONTEXTS);
358     running = threads;
359     for (i = 0; i < threads; i++) {
360         Coroutine *co1 = qemu_coroutine_create(test_multi_fair_mutex_entry, NULL);
361         aio_co_schedule(ctx[i], co1);
362     }
363 
364     g_usleep(seconds * 1000000);
365 
366     qatomic_set(&now_stopping, true);
367     while (running > 0) {
368         g_usleep(100000);
369     }
370 
371     join_aio_contexts();
372     g_test_message("%d iterations/second", counter / seconds);
373     g_assert_cmpint(counter, ==, atomic_counter);
374 }
375 
test_multi_fair_mutex_1(void)376 static void test_multi_fair_mutex_1(void)
377 {
378     test_multi_fair_mutex(NUM_CONTEXTS, 1);
379 }
380 
test_multi_fair_mutex_10(void)381 static void test_multi_fair_mutex_10(void)
382 {
383     test_multi_fair_mutex(NUM_CONTEXTS, 10);
384 }
385 #endif
386 
387 /* Same test with pthread mutexes, for performance comparison and
388  * portability.  */
389 
390 static QemuMutex mutex;
391 
test_multi_mutex_entry(void * opaque)392 static void test_multi_mutex_entry(void *opaque)
393 {
394     while (!qatomic_read(&now_stopping)) {
395         qemu_mutex_lock(&mutex);
396         counter++;
397         qemu_mutex_unlock(&mutex);
398         qatomic_inc(&atomic_counter);
399     }
400     qatomic_dec(&running);
401 }
402 
test_multi_mutex(int threads,int seconds)403 static void test_multi_mutex(int threads, int seconds)
404 {
405     int i;
406 
407     qemu_mutex_init(&mutex);
408     counter = 0;
409     atomic_counter = 0;
410     now_stopping = false;
411 
412     create_aio_contexts();
413     assert(threads <= NUM_CONTEXTS);
414     running = threads;
415     for (i = 0; i < threads; i++) {
416         Coroutine *co1 = qemu_coroutine_create(test_multi_mutex_entry, NULL);
417         aio_co_schedule(ctx[i], co1);
418     }
419 
420     g_usleep(seconds * 1000000);
421 
422     qatomic_set(&now_stopping, true);
423     while (running > 0) {
424         g_usleep(100000);
425     }
426 
427     join_aio_contexts();
428     g_test_message("%d iterations/second", counter / seconds);
429     g_assert_cmpint(counter, ==, atomic_counter);
430 }
431 
test_multi_mutex_1(void)432 static void test_multi_mutex_1(void)
433 {
434     test_multi_mutex(NUM_CONTEXTS, 1);
435 }
436 
test_multi_mutex_10(void)437 static void test_multi_mutex_10(void)
438 {
439     test_multi_mutex(NUM_CONTEXTS, 10);
440 }
441 
442 /* End of tests.  */
443 
main(int argc,char ** argv)444 int main(int argc, char **argv)
445 {
446     init_clocks(NULL);
447 
448     g_test_init(&argc, &argv, NULL);
449     g_test_add_func("/aio/multi/lifecycle", test_lifecycle);
450     if (g_test_quick()) {
451         g_test_add_func("/aio/multi/schedule", test_multi_co_schedule_1);
452         g_test_add_func("/aio/multi/mutex/contended", test_multi_co_mutex_1);
453         g_test_add_func("/aio/multi/mutex/handoff", test_multi_co_mutex_2_3);
454 #ifdef CONFIG_LINUX
455         g_test_add_func("/aio/multi/mutex/mcs", test_multi_fair_mutex_1);
456 #endif
457         g_test_add_func("/aio/multi/mutex/pthread", test_multi_mutex_1);
458     } else {
459         g_test_add_func("/aio/multi/schedule", test_multi_co_schedule_10);
460         g_test_add_func("/aio/multi/mutex/contended", test_multi_co_mutex_10);
461         g_test_add_func("/aio/multi/mutex/handoff", test_multi_co_mutex_2_30);
462 #ifdef CONFIG_LINUX
463         g_test_add_func("/aio/multi/mutex/mcs", test_multi_fair_mutex_10);
464 #endif
465         g_test_add_func("/aio/multi/mutex/pthread", test_multi_mutex_10);
466     }
467     return g_test_run();
468 }
469