1 /*
2 * QEMU aio implementation
3 *
4 * Copyright IBM, Corp. 2008
5 *
6 * Authors:
7 * Anthony Liguori <aliguori@us.ibm.com>
8 *
9 * This work is licensed under the terms of the GNU GPL, version 2. See
10 * the COPYING file in the top-level directory.
11 *
12 * Contributions after 2012-01-13 are licensed under the terms of the
13 * GNU GPL, version 2 or (at your option) any later version.
14 */
15
16 #include "qemu/osdep.h"
17 #include "block/block.h"
18 #include "block/thread-pool.h"
19 #include "qemu/main-loop.h"
20 #include "qemu/lockcnt.h"
21 #include "qemu/rcu.h"
22 #include "qemu/rcu_queue.h"
23 #include "qemu/sockets.h"
24 #include "qemu/cutils.h"
25 #include "trace.h"
26 #include "aio-posix.h"
27
28 /* Stop userspace polling on a handler if it isn't active for some time */
29 #define POLL_IDLE_INTERVAL_NS (7 * NANOSECONDS_PER_SECOND)
30
31 static void adjust_polling_time(AioContext *ctx, AioPolledEvent *poll,
32 int64_t block_ns);
33
aio_poll_disabled(AioContext * ctx)34 bool aio_poll_disabled(AioContext *ctx)
35 {
36 return qatomic_read(&ctx->poll_disable_cnt);
37 }
38
aio_add_ready_handler(AioHandlerList * ready_list,AioHandler * node,int revents)39 void aio_add_ready_handler(AioHandlerList *ready_list,
40 AioHandler *node,
41 int revents)
42 {
43 QLIST_SAFE_REMOVE(node, node_ready); /* remove from nested parent's list */
44 node->pfd.revents = revents;
45 QLIST_INSERT_HEAD(ready_list, node, node_ready);
46 }
47
aio_add_poll_ready_handler(AioHandlerList * ready_list,AioHandler * node)48 static void aio_add_poll_ready_handler(AioHandlerList *ready_list,
49 AioHandler *node)
50 {
51 QLIST_SAFE_REMOVE(node, node_ready); /* remove from nested parent's list */
52 node->poll_ready = true;
53 QLIST_INSERT_HEAD(ready_list, node, node_ready);
54 }
55
find_aio_handler(AioContext * ctx,int fd)56 static AioHandler *find_aio_handler(AioContext *ctx, int fd)
57 {
58 AioHandler *node;
59
60 QLIST_FOREACH(node, &ctx->aio_handlers, node) {
61 if (node->pfd.fd == fd) {
62 if (!QLIST_IS_INSERTED(node, node_deleted)) {
63 return node;
64 }
65 }
66 }
67
68 return NULL;
69 }
70
aio_remove_fd_handler(AioContext * ctx,AioHandler * node)71 static bool aio_remove_fd_handler(AioContext *ctx, AioHandler *node)
72 {
73 /* If the GSource is in the process of being destroyed then
74 * g_source_remove_poll() causes an assertion failure. Skip
75 * removal in that case, because glib cleans up its state during
76 * destruction anyway.
77 */
78 if (!g_source_is_destroyed(&ctx->source)) {
79 g_source_remove_poll(&ctx->source, &node->pfd);
80 }
81
82 node->pfd.revents = 0;
83 node->poll_ready = false;
84
85 /* If the fd monitor has already marked it deleted, leave it alone */
86 if (QLIST_IS_INSERTED(node, node_deleted)) {
87 return false;
88 }
89
90 /* If a read is in progress, just mark the node as deleted */
91 if (qemu_lockcnt_count(&ctx->list_lock)) {
92 QLIST_INSERT_HEAD_RCU(&ctx->deleted_aio_handlers, node, node_deleted);
93 return false;
94 }
95 /* Otherwise, delete it for real. We can't just mark it as
96 * deleted because deleted nodes are only cleaned up while
97 * no one is walking the handlers list.
98 */
99 QLIST_SAFE_REMOVE(node, node_poll);
100 QLIST_REMOVE(node, node);
101 return true;
102 }
103
aio_set_fd_handler(AioContext * ctx,int fd,IOHandler * io_read,IOHandler * io_write,AioPollFn * io_poll,IOHandler * io_poll_ready,void * opaque)104 void aio_set_fd_handler(AioContext *ctx,
105 int fd,
106 IOHandler *io_read,
107 IOHandler *io_write,
108 AioPollFn *io_poll,
109 IOHandler *io_poll_ready,
110 void *opaque)
111 {
112 AioHandler *node;
113 AioHandler *new_node = NULL;
114 bool is_new = false;
115 bool deleted = false;
116 int poll_disable_change;
117
118 if (io_poll && !io_poll_ready) {
119 io_poll = NULL; /* polling only makes sense if there is a handler */
120 }
121
122 qemu_lockcnt_lock(&ctx->list_lock);
123
124 node = find_aio_handler(ctx, fd);
125
126 /* Are we deleting the fd handler? */
127 if (!io_read && !io_write && !io_poll) {
128 if (node == NULL) {
129 qemu_lockcnt_unlock(&ctx->list_lock);
130 return;
131 }
132 /* Clean events in order to unregister fd from the ctx epoll. */
133 node->pfd.events = 0;
134
135 poll_disable_change = -!node->io_poll;
136 } else {
137 poll_disable_change = !io_poll - (node && !node->io_poll);
138 if (node == NULL) {
139 is_new = true;
140 }
141 /* Alloc and insert if it's not already there */
142 new_node = g_new0(AioHandler, 1);
143
144 /* Update handler with latest information */
145 new_node->io_read = io_read;
146 new_node->io_write = io_write;
147 new_node->io_poll = io_poll;
148 new_node->io_poll_ready = io_poll_ready;
149 new_node->opaque = opaque;
150
151 if (is_new) {
152 new_node->pfd.fd = fd;
153 } else {
154 new_node->pfd = node->pfd;
155 }
156 g_source_add_poll(&ctx->source, &new_node->pfd);
157
158 new_node->pfd.events = (io_read ? G_IO_IN | G_IO_HUP | G_IO_ERR : 0);
159 new_node->pfd.events |= (io_write ? G_IO_OUT | G_IO_ERR : 0);
160
161 QLIST_INSERT_HEAD_RCU(&ctx->aio_handlers, new_node, node);
162 }
163
164 /* No need to order poll_disable_cnt writes against other updates;
165 * the counter is only used to avoid wasting time and latency on
166 * iterated polling when the system call will be ultimately necessary.
167 * Changing handlers is a rare event, and a little wasted polling until
168 * the aio_notify below is not an issue.
169 */
170 qatomic_set(&ctx->poll_disable_cnt,
171 qatomic_read(&ctx->poll_disable_cnt) + poll_disable_change);
172
173 ctx->fdmon_ops->update(ctx, node, new_node);
174 if (node) {
175 deleted = aio_remove_fd_handler(ctx, node);
176 }
177 qemu_lockcnt_unlock(&ctx->list_lock);
178 aio_notify(ctx);
179
180 if (deleted) {
181 g_free(node);
182 }
183 }
184
aio_set_fd_poll(AioContext * ctx,int fd,IOHandler * io_poll_begin,IOHandler * io_poll_end)185 static void aio_set_fd_poll(AioContext *ctx, int fd,
186 IOHandler *io_poll_begin,
187 IOHandler *io_poll_end)
188 {
189 AioHandler *node = find_aio_handler(ctx, fd);
190
191 if (!node) {
192 return;
193 }
194
195 node->io_poll_begin = io_poll_begin;
196 node->io_poll_end = io_poll_end;
197 }
198
aio_set_event_notifier(AioContext * ctx,EventNotifier * notifier,EventNotifierHandler * io_read,AioPollFn * io_poll,EventNotifierHandler * io_poll_ready)199 void aio_set_event_notifier(AioContext *ctx,
200 EventNotifier *notifier,
201 EventNotifierHandler *io_read,
202 AioPollFn *io_poll,
203 EventNotifierHandler *io_poll_ready)
204 {
205 aio_set_fd_handler(ctx, event_notifier_get_fd(notifier),
206 (IOHandler *)io_read, NULL, io_poll,
207 (IOHandler *)io_poll_ready, notifier);
208 }
209
aio_set_event_notifier_poll(AioContext * ctx,EventNotifier * notifier,EventNotifierHandler * io_poll_begin,EventNotifierHandler * io_poll_end)210 void aio_set_event_notifier_poll(AioContext *ctx,
211 EventNotifier *notifier,
212 EventNotifierHandler *io_poll_begin,
213 EventNotifierHandler *io_poll_end)
214 {
215 aio_set_fd_poll(ctx, event_notifier_get_fd(notifier),
216 (IOHandler *)io_poll_begin,
217 (IOHandler *)io_poll_end);
218 }
219
poll_set_started(AioContext * ctx,AioHandlerList * ready_list,bool started)220 static bool poll_set_started(AioContext *ctx, AioHandlerList *ready_list,
221 bool started)
222 {
223 AioHandler *node;
224 bool progress = false;
225
226 if (started == ctx->poll_started) {
227 return false;
228 }
229
230 ctx->poll_started = started;
231
232 qemu_lockcnt_inc(&ctx->list_lock);
233 QLIST_FOREACH(node, &ctx->poll_aio_handlers, node_poll) {
234 IOHandler *fn;
235
236 if (QLIST_IS_INSERTED(node, node_deleted)) {
237 continue;
238 }
239
240 if (started) {
241 fn = node->io_poll_begin;
242 } else {
243 fn = node->io_poll_end;
244 }
245
246 if (fn) {
247 fn(node->opaque);
248 }
249
250 /* Poll one last time in case ->io_poll_end() raced with the event */
251 if (!started && node->io_poll(node->opaque)) {
252 aio_add_poll_ready_handler(ready_list, node);
253 progress = true;
254 }
255 }
256 qemu_lockcnt_dec(&ctx->list_lock);
257
258 return progress;
259 }
260
261
aio_prepare(AioContext * ctx)262 bool aio_prepare(AioContext *ctx)
263 {
264 AioHandlerList ready_list = QLIST_HEAD_INITIALIZER(ready_list);
265
266 /* Poll mode cannot be used with glib's event loop, disable it. */
267 poll_set_started(ctx, &ready_list, false);
268 /* TODO what to do with this list? */
269
270 return false;
271 }
272
aio_pending(AioContext * ctx)273 bool aio_pending(AioContext *ctx)
274 {
275 AioHandler *node;
276 bool result = false;
277
278 /*
279 * We have to walk very carefully in case aio_set_fd_handler is
280 * called while we're walking.
281 */
282 qemu_lockcnt_inc(&ctx->list_lock);
283
284 QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) {
285 int revents;
286
287 /* TODO should this check poll ready? */
288 revents = node->pfd.revents & node->pfd.events;
289 if (revents & (G_IO_IN | G_IO_HUP | G_IO_ERR) && node->io_read) {
290 result = true;
291 break;
292 }
293 if (revents & (G_IO_OUT | G_IO_ERR) && node->io_write) {
294 result = true;
295 break;
296 }
297 }
298 qemu_lockcnt_dec(&ctx->list_lock);
299
300 return result;
301 }
302
aio_free_deleted_handlers(AioContext * ctx)303 static void aio_free_deleted_handlers(AioContext *ctx)
304 {
305 AioHandler *node;
306
307 if (QLIST_EMPTY_RCU(&ctx->deleted_aio_handlers)) {
308 return;
309 }
310 if (!qemu_lockcnt_dec_if_lock(&ctx->list_lock)) {
311 return; /* we are nested, let the parent do the freeing */
312 }
313
314 while ((node = QLIST_FIRST_RCU(&ctx->deleted_aio_handlers))) {
315 QLIST_REMOVE(node, node);
316 QLIST_REMOVE(node, node_deleted);
317 QLIST_SAFE_REMOVE(node, node_poll);
318 g_free(node);
319 }
320
321 qemu_lockcnt_inc_and_unlock(&ctx->list_lock);
322 }
323
aio_dispatch_handler(AioContext * ctx,AioHandler * node)324 static bool aio_dispatch_handler(AioContext *ctx, AioHandler *node)
325 {
326 bool progress = false;
327 bool poll_ready;
328 int revents;
329
330 revents = node->pfd.revents & node->pfd.events;
331 node->pfd.revents = 0;
332
333 poll_ready = node->poll_ready;
334 node->poll_ready = false;
335
336 /*
337 * Start polling AioHandlers when they become ready because activity is
338 * likely to continue. Note that starvation is theoretically possible when
339 * fdmon_supports_polling(), but only until the fd fires for the first
340 * time.
341 */
342 if (!QLIST_IS_INSERTED(node, node_deleted) &&
343 !QLIST_IS_INSERTED(node, node_poll) &&
344 node->io_poll) {
345 trace_poll_add(ctx, node, node->pfd.fd, revents);
346 if (ctx->poll_started && node->io_poll_begin) {
347 node->io_poll_begin(node->opaque);
348 }
349 QLIST_INSERT_HEAD(&ctx->poll_aio_handlers, node, node_poll);
350 }
351 if (!QLIST_IS_INSERTED(node, node_deleted) &&
352 poll_ready && revents == 0 && node->io_poll_ready) {
353 /*
354 * Remove temporarily to avoid infinite loops when ->io_poll_ready()
355 * calls aio_poll() before clearing the condition that made the poll
356 * handler become ready.
357 */
358 QLIST_SAFE_REMOVE(node, node_poll);
359
360 node->io_poll_ready(node->opaque);
361
362 if (!QLIST_IS_INSERTED(node, node_poll)) {
363 QLIST_INSERT_HEAD(&ctx->poll_aio_handlers, node, node_poll);
364 }
365
366 /*
367 * Return early since revents was zero. aio_notify() does not count as
368 * progress.
369 */
370 return node->opaque != &ctx->notifier;
371 }
372
373 if (!QLIST_IS_INSERTED(node, node_deleted) &&
374 (revents & (G_IO_IN | G_IO_HUP | G_IO_ERR)) &&
375 node->io_read) {
376 node->io_read(node->opaque);
377
378 /* aio_notify() does not count as progress */
379 if (node->opaque != &ctx->notifier) {
380 progress = true;
381 }
382 }
383 if (!QLIST_IS_INSERTED(node, node_deleted) &&
384 (revents & (G_IO_OUT | G_IO_ERR)) &&
385 node->io_write) {
386 node->io_write(node->opaque);
387 progress = true;
388 }
389
390 return progress;
391 }
392
393 /*
394 * If we have a list of ready handlers then this is more efficient than
395 * scanning all handlers with aio_dispatch_handlers().
396 */
aio_dispatch_ready_handlers(AioContext * ctx,AioHandlerList * ready_list,int64_t block_ns)397 static bool aio_dispatch_ready_handlers(AioContext *ctx,
398 AioHandlerList *ready_list,
399 int64_t block_ns)
400 {
401 bool progress = false;
402 AioHandler *node;
403
404 while ((node = QLIST_FIRST(ready_list))) {
405 QLIST_REMOVE(node, node_ready);
406 progress = aio_dispatch_handler(ctx, node) || progress;
407
408 /*
409 * Adjust polling time only after aio_dispatch_handler(), which can
410 * add the handler to ctx->poll_aio_handlers.
411 */
412 if (ctx->poll_max_ns && QLIST_IS_INSERTED(node, node_poll)) {
413 adjust_polling_time(ctx, &node->poll, block_ns);
414 }
415 }
416
417 return progress;
418 }
419
420 /* Slower than aio_dispatch_ready_handlers() but only used via glib */
aio_dispatch_handlers(AioContext * ctx)421 static bool aio_dispatch_handlers(AioContext *ctx)
422 {
423 AioHandler *node, *tmp;
424 bool progress = false;
425
426 QLIST_FOREACH_SAFE_RCU(node, &ctx->aio_handlers, node, tmp) {
427 progress = aio_dispatch_handler(ctx, node) || progress;
428 }
429
430 return progress;
431 }
432
aio_dispatch(AioContext * ctx)433 void aio_dispatch(AioContext *ctx)
434 {
435 qemu_lockcnt_inc(&ctx->list_lock);
436 aio_bh_poll(ctx);
437 aio_dispatch_handlers(ctx);
438 aio_free_deleted_handlers(ctx);
439 qemu_lockcnt_dec(&ctx->list_lock);
440
441 timerlistgroup_run_timers(&ctx->tlg);
442 }
443
run_poll_handlers_once(AioContext * ctx,AioHandlerList * ready_list,int64_t now,int64_t * timeout)444 static bool run_poll_handlers_once(AioContext *ctx,
445 AioHandlerList *ready_list,
446 int64_t now,
447 int64_t *timeout)
448 {
449 bool progress = false;
450 AioHandler *node;
451 AioHandler *tmp;
452
453 QLIST_FOREACH_SAFE(node, &ctx->poll_aio_handlers, node_poll, tmp) {
454 if (node->io_poll(node->opaque)) {
455 aio_add_poll_ready_handler(ready_list, node);
456
457 node->poll_idle_timeout = now + POLL_IDLE_INTERVAL_NS;
458
459 /*
460 * Polling was successful, exit try_poll_mode immediately
461 * to adjust the next polling time.
462 */
463 *timeout = 0;
464 if (node->opaque != &ctx->notifier) {
465 progress = true;
466 }
467 }
468
469 /* Caller handles freeing deleted nodes. Don't do it here. */
470 }
471
472 return progress;
473 }
474
fdmon_supports_polling(AioContext * ctx)475 static bool fdmon_supports_polling(AioContext *ctx)
476 {
477 return ctx->fdmon_ops->need_wait != aio_poll_disabled;
478 }
479
remove_idle_poll_handlers(AioContext * ctx,AioHandlerList * ready_list,int64_t now)480 static bool remove_idle_poll_handlers(AioContext *ctx,
481 AioHandlerList *ready_list,
482 int64_t now)
483 {
484 AioHandler *node;
485 AioHandler *tmp;
486 bool progress = false;
487
488 /*
489 * File descriptor monitoring implementations without userspace polling
490 * support suffer from starvation when a subset of handlers is polled
491 * because fds will not be processed in a timely fashion. Don't remove
492 * idle poll handlers.
493 */
494 if (!fdmon_supports_polling(ctx)) {
495 return false;
496 }
497
498 QLIST_FOREACH_SAFE(node, &ctx->poll_aio_handlers, node_poll, tmp) {
499 if (node->poll_idle_timeout == 0LL) {
500 node->poll_idle_timeout = now + POLL_IDLE_INTERVAL_NS;
501 } else if (now >= node->poll_idle_timeout) {
502 trace_poll_remove(ctx, node, node->pfd.fd);
503 node->poll_idle_timeout = 0LL;
504 QLIST_SAFE_REMOVE(node, node_poll);
505 if (ctx->poll_started && node->io_poll_end) {
506 node->io_poll_end(node->opaque);
507
508 /*
509 * Final poll in case ->io_poll_end() races with an event.
510 * Nevermind about re-adding the handler in the rare case where
511 * this causes progress.
512 */
513 if (node->io_poll(node->opaque)) {
514 aio_add_poll_ready_handler(ready_list, node);
515 progress = true;
516 }
517 }
518 }
519 }
520
521 return progress;
522 }
523
524 /* run_poll_handlers:
525 * @ctx: the AioContext
526 * @ready_list: the list to place ready handlers on
527 * @max_ns: maximum time to poll for, in nanoseconds
528 *
529 * Polls for a given time.
530 *
531 * Note that the caller must have incremented ctx->list_lock.
532 *
533 * Returns: true if progress was made, false otherwise
534 */
run_poll_handlers(AioContext * ctx,AioHandlerList * ready_list,int64_t max_ns,int64_t * timeout)535 static bool run_poll_handlers(AioContext *ctx, AioHandlerList *ready_list,
536 int64_t max_ns, int64_t *timeout)
537 {
538 bool progress;
539 int64_t start_time, elapsed_time;
540
541 assert(qemu_lockcnt_count(&ctx->list_lock) > 0);
542
543 trace_run_poll_handlers_begin(ctx, max_ns, *timeout);
544
545 /*
546 * Optimization: ->io_poll() handlers often contain RCU read critical
547 * sections and we therefore see many rcu_read_lock() -> rcu_read_unlock()
548 * -> rcu_read_lock() -> ... sequences with expensive memory
549 * synchronization primitives. Make the entire polling loop an RCU
550 * critical section because nested rcu_read_lock()/rcu_read_unlock() calls
551 * are cheap.
552 */
553 RCU_READ_LOCK_GUARD();
554
555 start_time = qemu_clock_get_ns(QEMU_CLOCK_REALTIME);
556 do {
557 progress = run_poll_handlers_once(ctx, ready_list,
558 start_time, timeout);
559 elapsed_time = qemu_clock_get_ns(QEMU_CLOCK_REALTIME) - start_time;
560 max_ns = qemu_soonest_timeout(*timeout, max_ns);
561 assert(!(max_ns && progress));
562 } while (elapsed_time < max_ns && !ctx->fdmon_ops->need_wait(ctx));
563
564 if (remove_idle_poll_handlers(ctx, ready_list,
565 start_time + elapsed_time)) {
566 *timeout = 0;
567 progress = true;
568 }
569
570 /* If time has passed with no successful polling, adjust *timeout to
571 * keep the same ending time.
572 */
573 if (*timeout != -1) {
574 *timeout -= MIN(*timeout, elapsed_time);
575 }
576
577 trace_run_poll_handlers_end(ctx, progress, *timeout);
578 return progress;
579 }
580
581 /* try_poll_mode:
582 * @ctx: the AioContext
583 * @ready_list: list to add handlers that need to be run
584 * @timeout: timeout for blocking wait, computed by the caller and updated if
585 * polling succeeds.
586 *
587 * Note that the caller must have incremented ctx->list_lock.
588 *
589 * Returns: true if progress was made, false otherwise
590 */
try_poll_mode(AioContext * ctx,AioHandlerList * ready_list,int64_t * timeout)591 static bool try_poll_mode(AioContext *ctx, AioHandlerList *ready_list,
592 int64_t *timeout)
593 {
594 AioHandler *node;
595 int64_t max_ns;
596
597 if (QLIST_EMPTY_RCU(&ctx->poll_aio_handlers)) {
598 return false;
599 }
600
601 max_ns = 0;
602 QLIST_FOREACH(node, &ctx->poll_aio_handlers, node_poll) {
603 max_ns = MAX(max_ns, node->poll.ns);
604 }
605 max_ns = qemu_soonest_timeout(*timeout, max_ns);
606
607 if (max_ns && !ctx->fdmon_ops->need_wait(ctx)) {
608 /*
609 * Enable poll mode. It pairs with the poll_set_started() in
610 * aio_poll() which disables poll mode.
611 */
612 poll_set_started(ctx, ready_list, true);
613
614 if (run_poll_handlers(ctx, ready_list, max_ns, timeout)) {
615 return true;
616 }
617 }
618 return false;
619 }
620
adjust_polling_time(AioContext * ctx,AioPolledEvent * poll,int64_t block_ns)621 static void adjust_polling_time(AioContext *ctx, AioPolledEvent *poll,
622 int64_t block_ns)
623 {
624 if (block_ns <= poll->ns) {
625 /* This is the sweet spot, no adjustment needed */
626 } else if (block_ns > ctx->poll_max_ns) {
627 /* We'd have to poll for too long, poll less */
628 int64_t old = poll->ns;
629
630 if (ctx->poll_shrink) {
631 poll->ns /= ctx->poll_shrink;
632 } else {
633 poll->ns = 0;
634 }
635
636 trace_poll_shrink(ctx, old, poll->ns);
637 } else if (poll->ns < ctx->poll_max_ns &&
638 block_ns < ctx->poll_max_ns) {
639 /* There is room to grow, poll longer */
640 int64_t old = poll->ns;
641 int64_t grow = ctx->poll_grow;
642
643 if (grow == 0) {
644 grow = 2;
645 }
646
647 if (poll->ns) {
648 poll->ns *= grow;
649 } else {
650 poll->ns = 4000; /* start polling at 4 microseconds */
651 }
652
653 if (poll->ns > ctx->poll_max_ns) {
654 poll->ns = ctx->poll_max_ns;
655 }
656
657 trace_poll_grow(ctx, old, poll->ns);
658 }
659 }
660
aio_poll(AioContext * ctx,bool blocking)661 bool aio_poll(AioContext *ctx, bool blocking)
662 {
663 AioHandlerList ready_list = QLIST_HEAD_INITIALIZER(ready_list);
664 bool progress;
665 bool use_notify_me;
666 int64_t timeout;
667 int64_t start = 0;
668 int64_t block_ns = 0;
669
670 /*
671 * There cannot be two concurrent aio_poll calls for the same AioContext (or
672 * an aio_poll concurrent with a GSource prepare/check/dispatch callback).
673 * We rely on this below to avoid slow locked accesses to ctx->notify_me.
674 *
675 * aio_poll() may only be called in the AioContext's thread. iohandler_ctx
676 * is special in that it runs in the main thread, but that thread's context
677 * is qemu_aio_context.
678 */
679 assert(in_aio_context_home_thread(ctx == iohandler_get_aio_context() ?
680 qemu_get_aio_context() : ctx));
681
682 qemu_lockcnt_inc(&ctx->list_lock);
683
684 if (ctx->poll_max_ns) {
685 start = qemu_clock_get_ns(QEMU_CLOCK_REALTIME);
686 }
687
688 timeout = blocking ? aio_compute_timeout(ctx) : 0;
689 progress = try_poll_mode(ctx, &ready_list, &timeout);
690 assert(!(timeout && progress));
691
692 /*
693 * aio_notify can avoid the expensive event_notifier_set if
694 * everything (file descriptors, bottom halves, timers) will
695 * be re-evaluated before the next blocking poll(). This is
696 * already true when aio_poll is called with blocking == false;
697 * if blocking == true, it is only true after poll() returns,
698 * so disable the optimization now.
699 */
700 use_notify_me = timeout != 0;
701 if (use_notify_me) {
702 qatomic_set(&ctx->notify_me, qatomic_read(&ctx->notify_me) + 2);
703 /*
704 * Write ctx->notify_me before reading ctx->notified. Pairs with
705 * smp_mb in aio_notify().
706 */
707 smp_mb();
708
709 /* Don't block if aio_notify() was called */
710 if (qatomic_read(&ctx->notified)) {
711 timeout = 0;
712 }
713 }
714
715 /* If polling is allowed, non-blocking aio_poll does not need the
716 * system call---a single round of run_poll_handlers_once suffices.
717 */
718 if (timeout || ctx->fdmon_ops->need_wait(ctx)) {
719 /*
720 * Disable poll mode. poll mode should be disabled before the call
721 * of ctx->fdmon_ops->wait() so that guest's notification can wake
722 * up IO threads when some work becomes pending. It is essential to
723 * avoid hangs or unnecessary latency.
724 */
725 if (poll_set_started(ctx, &ready_list, false)) {
726 timeout = 0;
727 progress = true;
728 }
729
730 ctx->fdmon_ops->wait(ctx, &ready_list, timeout);
731 }
732
733 if (use_notify_me) {
734 /* Finish the poll before clearing the flag. */
735 qatomic_store_release(&ctx->notify_me,
736 qatomic_read(&ctx->notify_me) - 2);
737 }
738
739 aio_notify_accept(ctx);
740
741 /* Calculate blocked time for adaptive polling */
742 if (ctx->poll_max_ns) {
743 block_ns = qemu_clock_get_ns(QEMU_CLOCK_REALTIME) - start;
744 }
745
746 progress |= aio_bh_poll(ctx);
747 progress |= aio_dispatch_ready_handlers(ctx, &ready_list, block_ns);
748
749 aio_free_deleted_handlers(ctx);
750
751 qemu_lockcnt_dec(&ctx->list_lock);
752
753 progress |= timerlistgroup_run_timers(&ctx->tlg);
754
755 return progress;
756 }
757
aio_context_setup(AioContext * ctx)758 void aio_context_setup(AioContext *ctx)
759 {
760 ctx->fdmon_ops = &fdmon_poll_ops;
761 ctx->epollfd = -1;
762
763 /* Use the fastest fd monitoring implementation if available */
764 if (fdmon_io_uring_setup(ctx)) {
765 return;
766 }
767
768 fdmon_epoll_setup(ctx);
769 }
770
aio_context_destroy(AioContext * ctx)771 void aio_context_destroy(AioContext *ctx)
772 {
773 fdmon_io_uring_destroy(ctx);
774 fdmon_epoll_disable(ctx);
775 aio_free_deleted_handlers(ctx);
776 }
777
aio_context_use_g_source(AioContext * ctx)778 void aio_context_use_g_source(AioContext *ctx)
779 {
780 /*
781 * Disable io_uring when the glib main loop is used because it doesn't
782 * support mixed glib/aio_poll() usage. It relies on aio_poll() being
783 * called regularly so that changes to the monitored file descriptors are
784 * submitted, otherwise a list of pending fd handlers builds up.
785 */
786 fdmon_io_uring_destroy(ctx);
787 aio_free_deleted_handlers(ctx);
788 }
789
aio_context_set_poll_params(AioContext * ctx,int64_t max_ns,int64_t grow,int64_t shrink,Error ** errp)790 void aio_context_set_poll_params(AioContext *ctx, int64_t max_ns,
791 int64_t grow, int64_t shrink, Error **errp)
792 {
793 AioHandler *node;
794
795 qemu_lockcnt_inc(&ctx->list_lock);
796 QLIST_FOREACH(node, &ctx->aio_handlers, node) {
797 node->poll.ns = 0;
798 }
799 qemu_lockcnt_dec(&ctx->list_lock);
800
801 /* No thread synchronization here, it doesn't matter if an incorrect value
802 * is used once.
803 */
804 ctx->poll_max_ns = max_ns;
805 ctx->poll_grow = grow;
806 ctx->poll_shrink = shrink;
807
808 aio_notify(ctx);
809 }
810
aio_context_set_aio_params(AioContext * ctx,int64_t max_batch)811 void aio_context_set_aio_params(AioContext *ctx, int64_t max_batch)
812 {
813 /*
814 * No thread synchronization here, it doesn't matter if an incorrect value
815 * is used once.
816 */
817 ctx->aio_max_batch = max_batch;
818
819 aio_notify(ctx);
820 }
821