xref: /qemu/tests/unit/test-thread-pool.c (revision e2abbeac7a0c19e7757370da4c5164c862016d31)
1  #include "qemu/osdep.h"
2  #include "qemu-common.h"
3  #include "block/aio.h"
4  #include "block/thread-pool.h"
5  #include "block/block.h"
6  #include "qapi/error.h"
7  #include "qemu/timer.h"
8  #include "qemu/error-report.h"
9  #include "qemu/main-loop.h"
10  
11  static AioContext *ctx;
12  static ThreadPool *pool;
13  static int active;
14  
15  typedef struct {
16      BlockAIOCB *aiocb;
17      int n;
18      int ret;
19  } WorkerTestData;
20  
21  static int worker_cb(void *opaque)
22  {
23      WorkerTestData *data = opaque;
24      return qatomic_fetch_inc(&data->n);
25  }
26  
27  static int long_cb(void *opaque)
28  {
29      WorkerTestData *data = opaque;
30      if (qatomic_cmpxchg(&data->n, 0, 1) == 0) {
31          g_usleep(2000000);
32          qatomic_or(&data->n, 2);
33      }
34      return 0;
35  }
36  
37  static void done_cb(void *opaque, int ret)
38  {
39      WorkerTestData *data = opaque;
40      g_assert(data->ret == -EINPROGRESS || data->ret == -ECANCELED);
41      data->ret = ret;
42      data->aiocb = NULL;
43  
44      /* Callbacks are serialized, so no need to use atomic ops.  */
45      active--;
46  }
47  
48  static void test_submit(void)
49  {
50      WorkerTestData data = { .n = 0 };
51      thread_pool_submit(pool, worker_cb, &data);
52      while (data.n == 0) {
53          aio_poll(ctx, true);
54      }
55      g_assert_cmpint(data.n, ==, 1);
56  }
57  
58  static void test_submit_aio(void)
59  {
60      WorkerTestData data = { .n = 0, .ret = -EINPROGRESS };
61      data.aiocb = thread_pool_submit_aio(pool, worker_cb, &data,
62                                          done_cb, &data);
63  
64      /* The callbacks are not called until after the first wait.  */
65      active = 1;
66      g_assert_cmpint(data.ret, ==, -EINPROGRESS);
67      while (data.ret == -EINPROGRESS) {
68          aio_poll(ctx, true);
69      }
70      g_assert_cmpint(active, ==, 0);
71      g_assert_cmpint(data.n, ==, 1);
72      g_assert_cmpint(data.ret, ==, 0);
73  }
74  
75  static void co_test_cb(void *opaque)
76  {
77      WorkerTestData *data = opaque;
78  
79      active = 1;
80      data->n = 0;
81      data->ret = -EINPROGRESS;
82      thread_pool_submit_co(pool, worker_cb, data);
83  
84      /* The test continues in test_submit_co, after qemu_coroutine_enter... */
85  
86      g_assert_cmpint(data->n, ==, 1);
87      data->ret = 0;
88      active--;
89  
90      /* The test continues in test_submit_co, after aio_poll... */
91  }
92  
93  static void test_submit_co(void)
94  {
95      WorkerTestData data;
96      Coroutine *co = qemu_coroutine_create(co_test_cb, &data);
97  
98      qemu_coroutine_enter(co);
99  
100      /* Back here once the worker has started.  */
101  
102      g_assert_cmpint(active, ==, 1);
103      g_assert_cmpint(data.ret, ==, -EINPROGRESS);
104  
105      /* aio_poll will execute the rest of the coroutine.  */
106  
107      while (data.ret == -EINPROGRESS) {
108          aio_poll(ctx, true);
109      }
110  
111      /* Back here after the coroutine has finished.  */
112  
113      g_assert_cmpint(active, ==, 0);
114      g_assert_cmpint(data.ret, ==, 0);
115  }
116  
117  static void test_submit_many(void)
118  {
119      WorkerTestData data[100];
120      int i;
121  
122      /* Start more work items than there will be threads.  */
123      for (i = 0; i < 100; i++) {
124          data[i].n = 0;
125          data[i].ret = -EINPROGRESS;
126          thread_pool_submit_aio(pool, worker_cb, &data[i], done_cb, &data[i]);
127      }
128  
129      active = 100;
130      while (active > 0) {
131          aio_poll(ctx, true);
132      }
133      for (i = 0; i < 100; i++) {
134          g_assert_cmpint(data[i].n, ==, 1);
135          g_assert_cmpint(data[i].ret, ==, 0);
136      }
137  }
138  
139  static void do_test_cancel(bool sync)
140  {
141      WorkerTestData data[100];
142      int num_canceled;
143      int i;
144  
145      /* Start more work items than there will be threads, to ensure
146       * the pool is full.
147       */
148      test_submit_many();
149  
150      /* Start long running jobs, to ensure we can cancel some.  */
151      for (i = 0; i < 100; i++) {
152          data[i].n = 0;
153          data[i].ret = -EINPROGRESS;
154          data[i].aiocb = thread_pool_submit_aio(pool, long_cb, &data[i],
155                                                 done_cb, &data[i]);
156      }
157  
158      /* Starting the threads may be left to a bottom half.  Let it
159       * run, but do not waste too much time...
160       */
161      active = 100;
162      aio_notify(ctx);
163      aio_poll(ctx, false);
164  
165      /* Wait some time for the threads to start, with some sanity
166       * testing on the behavior of the scheduler...
167       */
168      g_assert_cmpint(active, ==, 100);
169      g_usleep(1000000);
170      g_assert_cmpint(active, >, 50);
171  
172      /* Cancel the jobs that haven't been started yet.  */
173      num_canceled = 0;
174      for (i = 0; i < 100; i++) {
175          if (qatomic_cmpxchg(&data[i].n, 0, 4) == 0) {
176              data[i].ret = -ECANCELED;
177              if (sync) {
178                  bdrv_aio_cancel(data[i].aiocb);
179              } else {
180                  bdrv_aio_cancel_async(data[i].aiocb);
181              }
182              num_canceled++;
183          }
184      }
185      g_assert_cmpint(active, >, 0);
186      g_assert_cmpint(num_canceled, <, 100);
187  
188      for (i = 0; i < 100; i++) {
189          if (data[i].aiocb && qatomic_read(&data[i].n) < 4) {
190              if (sync) {
191                  /* Canceling the others will be a blocking operation.  */
192                  bdrv_aio_cancel(data[i].aiocb);
193              } else {
194                  bdrv_aio_cancel_async(data[i].aiocb);
195              }
196          }
197      }
198  
199      /* Finish execution and execute any remaining callbacks.  */
200      while (active > 0) {
201          aio_poll(ctx, true);
202      }
203      g_assert_cmpint(active, ==, 0);
204      for (i = 0; i < 100; i++) {
205          g_assert(data[i].aiocb == NULL);
206          switch (data[i].n) {
207          case 0:
208              fprintf(stderr, "Callback not canceled but never started?\n");
209              abort();
210          case 3:
211              /* Couldn't be canceled asynchronously, must have completed.  */
212              g_assert_cmpint(data[i].ret, ==, 0);
213              break;
214          case 4:
215              /* Could be canceled asynchronously, never started.  */
216              g_assert_cmpint(data[i].ret, ==, -ECANCELED);
217              break;
218          default:
219              fprintf(stderr, "Callback aborted while running?\n");
220              abort();
221          }
222      }
223  }
224  
225  static void test_cancel(void)
226  {
227      do_test_cancel(true);
228  }
229  
230  static void test_cancel_async(void)
231  {
232      do_test_cancel(false);
233  }
234  
235  int main(int argc, char **argv)
236  {
237      qemu_init_main_loop(&error_abort);
238      ctx = qemu_get_current_aio_context();
239      pool = aio_get_thread_pool(ctx);
240  
241      g_test_init(&argc, &argv, NULL);
242      g_test_add_func("/thread-pool/submit", test_submit);
243      g_test_add_func("/thread-pool/submit-aio", test_submit_aio);
244      g_test_add_func("/thread-pool/submit-co", test_submit_co);
245      g_test_add_func("/thread-pool/submit-many", test_submit_many);
246      g_test_add_func("/thread-pool/cancel", test_cancel);
247      g_test_add_func("/thread-pool/cancel-async", test_cancel_async);
248  
249      return g_test_run();
250  }
251