xref: /qemu/util/aio-posix.c (revision 0462a32b4f63b2448b4a196381138afd50719dc4)
1 /*
2  * QEMU aio implementation
3  *
4  * Copyright IBM, Corp. 2008
5  *
6  * Authors:
7  *  Anthony Liguori   <aliguori@us.ibm.com>
8  *
9  * This work is licensed under the terms of the GNU GPL, version 2.  See
10  * the COPYING file in the top-level directory.
11  *
12  * Contributions after 2012-01-13 are licensed under the terms of the
13  * GNU GPL, version 2 or (at your option) any later version.
14  */
15 
16 #include "qemu/osdep.h"
17 #include "block/block.h"
18 #include "block/thread-pool.h"
19 #include "qemu/main-loop.h"
20 #include "qemu/lockcnt.h"
21 #include "qemu/rcu.h"
22 #include "qemu/rcu_queue.h"
23 #include "qemu/sockets.h"
24 #include "qemu/cutils.h"
25 #include "trace.h"
26 #include "aio-posix.h"
27 
28 /* Stop userspace polling on a handler if it isn't active for some time */
29 #define POLL_IDLE_INTERVAL_NS (7 * NANOSECONDS_PER_SECOND)
30 
31 static void adjust_polling_time(AioContext *ctx, AioPolledEvent *poll,
32                                 int64_t block_ns);
33 
aio_poll_disabled(AioContext * ctx)34 bool aio_poll_disabled(AioContext *ctx)
35 {
36     return qatomic_read(&ctx->poll_disable_cnt);
37 }
38 
aio_add_ready_handler(AioHandlerList * ready_list,AioHandler * node,int revents)39 void aio_add_ready_handler(AioHandlerList *ready_list,
40                            AioHandler *node,
41                            int revents)
42 {
43     QLIST_SAFE_REMOVE(node, node_ready); /* remove from nested parent's list */
44     node->pfd.revents = revents;
45     QLIST_INSERT_HEAD(ready_list, node, node_ready);
46 }
47 
aio_add_poll_ready_handler(AioHandlerList * ready_list,AioHandler * node)48 static void aio_add_poll_ready_handler(AioHandlerList *ready_list,
49                                        AioHandler *node)
50 {
51     QLIST_SAFE_REMOVE(node, node_ready); /* remove from nested parent's list */
52     node->poll_ready = true;
53     QLIST_INSERT_HEAD(ready_list, node, node_ready);
54 }
55 
find_aio_handler(AioContext * ctx,int fd)56 static AioHandler *find_aio_handler(AioContext *ctx, int fd)
57 {
58     AioHandler *node;
59 
60     QLIST_FOREACH(node, &ctx->aio_handlers, node) {
61         if (node->pfd.fd == fd) {
62             if (!QLIST_IS_INSERTED(node, node_deleted)) {
63                 return node;
64             }
65         }
66     }
67 
68     return NULL;
69 }
70 
aio_remove_fd_handler(AioContext * ctx,AioHandler * node)71 static bool aio_remove_fd_handler(AioContext *ctx, AioHandler *node)
72 {
73     /* If the GSource is in the process of being destroyed then
74      * g_source_remove_poll() causes an assertion failure.  Skip
75      * removal in that case, because glib cleans up its state during
76      * destruction anyway.
77      */
78     if (!g_source_is_destroyed(&ctx->source)) {
79         g_source_remove_poll(&ctx->source, &node->pfd);
80     }
81 
82     node->pfd.revents = 0;
83     node->poll_ready = false;
84 
85     /* If the fd monitor has already marked it deleted, leave it alone */
86     if (QLIST_IS_INSERTED(node, node_deleted)) {
87         return false;
88     }
89 
90     /* If a read is in progress, just mark the node as deleted */
91     if (qemu_lockcnt_count(&ctx->list_lock)) {
92         QLIST_INSERT_HEAD_RCU(&ctx->deleted_aio_handlers, node, node_deleted);
93         return false;
94     }
95     /* Otherwise, delete it for real.  We can't just mark it as
96      * deleted because deleted nodes are only cleaned up while
97      * no one is walking the handlers list.
98      */
99     QLIST_SAFE_REMOVE(node, node_poll);
100     QLIST_REMOVE(node, node);
101     return true;
102 }
103 
aio_set_fd_handler(AioContext * ctx,int fd,IOHandler * io_read,IOHandler * io_write,AioPollFn * io_poll,IOHandler * io_poll_ready,void * opaque)104 void aio_set_fd_handler(AioContext *ctx,
105                         int fd,
106                         IOHandler *io_read,
107                         IOHandler *io_write,
108                         AioPollFn *io_poll,
109                         IOHandler *io_poll_ready,
110                         void *opaque)
111 {
112     AioHandler *node;
113     AioHandler *new_node = NULL;
114     bool is_new = false;
115     bool deleted = false;
116     int poll_disable_change;
117 
118     if (io_poll && !io_poll_ready) {
119         io_poll = NULL; /* polling only makes sense if there is a handler */
120     }
121 
122     qemu_lockcnt_lock(&ctx->list_lock);
123 
124     node = find_aio_handler(ctx, fd);
125 
126     /* Are we deleting the fd handler? */
127     if (!io_read && !io_write && !io_poll) {
128         if (node == NULL) {
129             qemu_lockcnt_unlock(&ctx->list_lock);
130             return;
131         }
132         /* Clean events in order to unregister fd from the ctx epoll. */
133         node->pfd.events = 0;
134 
135         poll_disable_change = -!node->io_poll;
136     } else {
137         poll_disable_change = !io_poll - (node && !node->io_poll);
138         if (node == NULL) {
139             is_new = true;
140         }
141         /* Alloc and insert if it's not already there */
142         new_node = g_new0(AioHandler, 1);
143 
144         /* Update handler with latest information */
145         new_node->io_read = io_read;
146         new_node->io_write = io_write;
147         new_node->io_poll = io_poll;
148         new_node->io_poll_ready = io_poll_ready;
149         new_node->opaque = opaque;
150 
151         if (is_new) {
152             new_node->pfd.fd = fd;
153         } else {
154             new_node->pfd = node->pfd;
155         }
156         g_source_add_poll(&ctx->source, &new_node->pfd);
157 
158         new_node->pfd.events = (io_read ? G_IO_IN | G_IO_HUP | G_IO_ERR : 0);
159         new_node->pfd.events |= (io_write ? G_IO_OUT | G_IO_ERR : 0);
160 
161         QLIST_INSERT_HEAD_RCU(&ctx->aio_handlers, new_node, node);
162     }
163 
164     /* No need to order poll_disable_cnt writes against other updates;
165      * the counter is only used to avoid wasting time and latency on
166      * iterated polling when the system call will be ultimately necessary.
167      * Changing handlers is a rare event, and a little wasted polling until
168      * the aio_notify below is not an issue.
169      */
170     qatomic_set(&ctx->poll_disable_cnt,
171                qatomic_read(&ctx->poll_disable_cnt) + poll_disable_change);
172 
173     ctx->fdmon_ops->update(ctx, node, new_node);
174     if (node) {
175         deleted = aio_remove_fd_handler(ctx, node);
176     }
177     qemu_lockcnt_unlock(&ctx->list_lock);
178     aio_notify(ctx);
179 
180     if (deleted) {
181         g_free(node);
182     }
183 }
184 
aio_set_fd_poll(AioContext * ctx,int fd,IOHandler * io_poll_begin,IOHandler * io_poll_end)185 static void aio_set_fd_poll(AioContext *ctx, int fd,
186                             IOHandler *io_poll_begin,
187                             IOHandler *io_poll_end)
188 {
189     AioHandler *node = find_aio_handler(ctx, fd);
190 
191     if (!node) {
192         return;
193     }
194 
195     node->io_poll_begin = io_poll_begin;
196     node->io_poll_end = io_poll_end;
197 }
198 
aio_set_event_notifier(AioContext * ctx,EventNotifier * notifier,EventNotifierHandler * io_read,AioPollFn * io_poll,EventNotifierHandler * io_poll_ready)199 void aio_set_event_notifier(AioContext *ctx,
200                             EventNotifier *notifier,
201                             EventNotifierHandler *io_read,
202                             AioPollFn *io_poll,
203                             EventNotifierHandler *io_poll_ready)
204 {
205     aio_set_fd_handler(ctx, event_notifier_get_fd(notifier),
206                        (IOHandler *)io_read, NULL, io_poll,
207                        (IOHandler *)io_poll_ready, notifier);
208 }
209 
aio_set_event_notifier_poll(AioContext * ctx,EventNotifier * notifier,EventNotifierHandler * io_poll_begin,EventNotifierHandler * io_poll_end)210 void aio_set_event_notifier_poll(AioContext *ctx,
211                                  EventNotifier *notifier,
212                                  EventNotifierHandler *io_poll_begin,
213                                  EventNotifierHandler *io_poll_end)
214 {
215     aio_set_fd_poll(ctx, event_notifier_get_fd(notifier),
216                     (IOHandler *)io_poll_begin,
217                     (IOHandler *)io_poll_end);
218 }
219 
poll_set_started(AioContext * ctx,AioHandlerList * ready_list,bool started)220 static bool poll_set_started(AioContext *ctx, AioHandlerList *ready_list,
221                              bool started)
222 {
223     AioHandler *node;
224     bool progress = false;
225 
226     if (started == ctx->poll_started) {
227         return false;
228     }
229 
230     ctx->poll_started = started;
231 
232     qemu_lockcnt_inc(&ctx->list_lock);
233     QLIST_FOREACH(node, &ctx->poll_aio_handlers, node_poll) {
234         IOHandler *fn;
235 
236         if (QLIST_IS_INSERTED(node, node_deleted)) {
237             continue;
238         }
239 
240         if (started) {
241             fn = node->io_poll_begin;
242         } else {
243             fn = node->io_poll_end;
244         }
245 
246         if (fn) {
247             fn(node->opaque);
248         }
249 
250         /* Poll one last time in case ->io_poll_end() raced with the event */
251         if (!started && node->io_poll(node->opaque)) {
252             aio_add_poll_ready_handler(ready_list, node);
253             progress = true;
254         }
255     }
256     qemu_lockcnt_dec(&ctx->list_lock);
257 
258     return progress;
259 }
260 
261 
aio_prepare(AioContext * ctx)262 bool aio_prepare(AioContext *ctx)
263 {
264     AioHandlerList ready_list = QLIST_HEAD_INITIALIZER(ready_list);
265 
266     /* Poll mode cannot be used with glib's event loop, disable it. */
267     poll_set_started(ctx, &ready_list, false);
268     /* TODO what to do with this list? */
269 
270     return false;
271 }
272 
aio_pending(AioContext * ctx)273 bool aio_pending(AioContext *ctx)
274 {
275     AioHandler *node;
276     bool result = false;
277 
278     /*
279      * We have to walk very carefully in case aio_set_fd_handler is
280      * called while we're walking.
281      */
282     qemu_lockcnt_inc(&ctx->list_lock);
283 
284     QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) {
285         int revents;
286 
287         /* TODO should this check poll ready? */
288         revents = node->pfd.revents & node->pfd.events;
289         if (revents & (G_IO_IN | G_IO_HUP | G_IO_ERR) && node->io_read) {
290             result = true;
291             break;
292         }
293         if (revents & (G_IO_OUT | G_IO_ERR) && node->io_write) {
294             result = true;
295             break;
296         }
297     }
298     qemu_lockcnt_dec(&ctx->list_lock);
299 
300     return result;
301 }
302 
aio_free_deleted_handlers(AioContext * ctx)303 static void aio_free_deleted_handlers(AioContext *ctx)
304 {
305     AioHandler *node;
306 
307     if (QLIST_EMPTY_RCU(&ctx->deleted_aio_handlers)) {
308         return;
309     }
310     if (!qemu_lockcnt_dec_if_lock(&ctx->list_lock)) {
311         return; /* we are nested, let the parent do the freeing */
312     }
313 
314     while ((node = QLIST_FIRST_RCU(&ctx->deleted_aio_handlers))) {
315         QLIST_REMOVE(node, node);
316         QLIST_REMOVE(node, node_deleted);
317         QLIST_SAFE_REMOVE(node, node_poll);
318         g_free(node);
319     }
320 
321     qemu_lockcnt_inc_and_unlock(&ctx->list_lock);
322 }
323 
aio_dispatch_handler(AioContext * ctx,AioHandler * node)324 static bool aio_dispatch_handler(AioContext *ctx, AioHandler *node)
325 {
326     bool progress = false;
327     bool poll_ready;
328     int revents;
329 
330     revents = node->pfd.revents & node->pfd.events;
331     node->pfd.revents = 0;
332 
333     poll_ready = node->poll_ready;
334     node->poll_ready = false;
335 
336     /*
337      * Start polling AioHandlers when they become ready because activity is
338      * likely to continue.  Note that starvation is theoretically possible when
339      * fdmon_supports_polling(), but only until the fd fires for the first
340      * time.
341      */
342     if (!QLIST_IS_INSERTED(node, node_deleted) &&
343         !QLIST_IS_INSERTED(node, node_poll) &&
344         node->io_poll) {
345         trace_poll_add(ctx, node, node->pfd.fd, revents);
346         if (ctx->poll_started && node->io_poll_begin) {
347             node->io_poll_begin(node->opaque);
348         }
349         QLIST_INSERT_HEAD(&ctx->poll_aio_handlers, node, node_poll);
350     }
351     if (!QLIST_IS_INSERTED(node, node_deleted) &&
352         poll_ready && revents == 0 && node->io_poll_ready) {
353         /*
354          * Remove temporarily to avoid infinite loops when ->io_poll_ready()
355          * calls aio_poll() before clearing the condition that made the poll
356          * handler become ready.
357          */
358         QLIST_SAFE_REMOVE(node, node_poll);
359 
360         node->io_poll_ready(node->opaque);
361 
362         if (!QLIST_IS_INSERTED(node, node_poll)) {
363             QLIST_INSERT_HEAD(&ctx->poll_aio_handlers, node, node_poll);
364         }
365 
366         /*
367          * Return early since revents was zero. aio_notify() does not count as
368          * progress.
369          */
370         return node->opaque != &ctx->notifier;
371     }
372 
373     if (!QLIST_IS_INSERTED(node, node_deleted) &&
374         (revents & (G_IO_IN | G_IO_HUP | G_IO_ERR)) &&
375         node->io_read) {
376         node->io_read(node->opaque);
377 
378         /* aio_notify() does not count as progress */
379         if (node->opaque != &ctx->notifier) {
380             progress = true;
381         }
382     }
383     if (!QLIST_IS_INSERTED(node, node_deleted) &&
384         (revents & (G_IO_OUT | G_IO_ERR)) &&
385         node->io_write) {
386         node->io_write(node->opaque);
387         progress = true;
388     }
389 
390     return progress;
391 }
392 
393 /*
394  * If we have a list of ready handlers then this is more efficient than
395  * scanning all handlers with aio_dispatch_handlers().
396  */
aio_dispatch_ready_handlers(AioContext * ctx,AioHandlerList * ready_list,int64_t block_ns)397 static bool aio_dispatch_ready_handlers(AioContext *ctx,
398                                         AioHandlerList *ready_list,
399                                         int64_t block_ns)
400 {
401     bool progress = false;
402     AioHandler *node;
403 
404     while ((node = QLIST_FIRST(ready_list))) {
405         QLIST_REMOVE(node, node_ready);
406         progress = aio_dispatch_handler(ctx, node) || progress;
407 
408         /*
409          * Adjust polling time only after aio_dispatch_handler(), which can
410          * add the handler to ctx->poll_aio_handlers.
411          */
412         if (ctx->poll_max_ns && QLIST_IS_INSERTED(node, node_poll)) {
413             adjust_polling_time(ctx, &node->poll, block_ns);
414         }
415     }
416 
417     return progress;
418 }
419 
420 /* Slower than aio_dispatch_ready_handlers() but only used via glib */
aio_dispatch_handlers(AioContext * ctx)421 static bool aio_dispatch_handlers(AioContext *ctx)
422 {
423     AioHandler *node, *tmp;
424     bool progress = false;
425 
426     QLIST_FOREACH_SAFE_RCU(node, &ctx->aio_handlers, node, tmp) {
427         progress = aio_dispatch_handler(ctx, node) || progress;
428     }
429 
430     return progress;
431 }
432 
aio_dispatch(AioContext * ctx)433 void aio_dispatch(AioContext *ctx)
434 {
435     qemu_lockcnt_inc(&ctx->list_lock);
436     aio_bh_poll(ctx);
437     aio_dispatch_handlers(ctx);
438     aio_free_deleted_handlers(ctx);
439     qemu_lockcnt_dec(&ctx->list_lock);
440 
441     timerlistgroup_run_timers(&ctx->tlg);
442 }
443 
run_poll_handlers_once(AioContext * ctx,AioHandlerList * ready_list,int64_t now,int64_t * timeout)444 static bool run_poll_handlers_once(AioContext *ctx,
445                                    AioHandlerList *ready_list,
446                                    int64_t now,
447                                    int64_t *timeout)
448 {
449     bool progress = false;
450     AioHandler *node;
451     AioHandler *tmp;
452 
453     QLIST_FOREACH_SAFE(node, &ctx->poll_aio_handlers, node_poll, tmp) {
454         if (node->io_poll(node->opaque)) {
455             aio_add_poll_ready_handler(ready_list, node);
456 
457             node->poll_idle_timeout = now + POLL_IDLE_INTERVAL_NS;
458 
459             /*
460              * Polling was successful, exit try_poll_mode immediately
461              * to adjust the next polling time.
462              */
463             *timeout = 0;
464             if (node->opaque != &ctx->notifier) {
465                 progress = true;
466             }
467         }
468 
469         /* Caller handles freeing deleted nodes.  Don't do it here. */
470     }
471 
472     return progress;
473 }
474 
fdmon_supports_polling(AioContext * ctx)475 static bool fdmon_supports_polling(AioContext *ctx)
476 {
477     return ctx->fdmon_ops->need_wait != aio_poll_disabled;
478 }
479 
remove_idle_poll_handlers(AioContext * ctx,AioHandlerList * ready_list,int64_t now)480 static bool remove_idle_poll_handlers(AioContext *ctx,
481                                       AioHandlerList *ready_list,
482                                       int64_t now)
483 {
484     AioHandler *node;
485     AioHandler *tmp;
486     bool progress = false;
487 
488     /*
489      * File descriptor monitoring implementations without userspace polling
490      * support suffer from starvation when a subset of handlers is polled
491      * because fds will not be processed in a timely fashion.  Don't remove
492      * idle poll handlers.
493      */
494     if (!fdmon_supports_polling(ctx)) {
495         return false;
496     }
497 
498     QLIST_FOREACH_SAFE(node, &ctx->poll_aio_handlers, node_poll, tmp) {
499         if (node->poll_idle_timeout == 0LL) {
500             node->poll_idle_timeout = now + POLL_IDLE_INTERVAL_NS;
501         } else if (now >= node->poll_idle_timeout) {
502             trace_poll_remove(ctx, node, node->pfd.fd);
503             node->poll_idle_timeout = 0LL;
504             QLIST_SAFE_REMOVE(node, node_poll);
505             if (ctx->poll_started && node->io_poll_end) {
506                 node->io_poll_end(node->opaque);
507 
508                 /*
509                  * Final poll in case ->io_poll_end() races with an event.
510                  * Nevermind about re-adding the handler in the rare case where
511                  * this causes progress.
512                  */
513                 if (node->io_poll(node->opaque)) {
514                     aio_add_poll_ready_handler(ready_list, node);
515                     progress = true;
516                 }
517             }
518         }
519     }
520 
521     return progress;
522 }
523 
524 /* run_poll_handlers:
525  * @ctx: the AioContext
526  * @ready_list: the list to place ready handlers on
527  * @max_ns: maximum time to poll for, in nanoseconds
528  *
529  * Polls for a given time.
530  *
531  * Note that the caller must have incremented ctx->list_lock.
532  *
533  * Returns: true if progress was made, false otherwise
534  */
run_poll_handlers(AioContext * ctx,AioHandlerList * ready_list,int64_t max_ns,int64_t * timeout)535 static bool run_poll_handlers(AioContext *ctx, AioHandlerList *ready_list,
536                               int64_t max_ns, int64_t *timeout)
537 {
538     bool progress;
539     int64_t start_time, elapsed_time;
540 
541     assert(qemu_lockcnt_count(&ctx->list_lock) > 0);
542 
543     trace_run_poll_handlers_begin(ctx, max_ns, *timeout);
544 
545     /*
546      * Optimization: ->io_poll() handlers often contain RCU read critical
547      * sections and we therefore see many rcu_read_lock() -> rcu_read_unlock()
548      * -> rcu_read_lock() -> ... sequences with expensive memory
549      * synchronization primitives.  Make the entire polling loop an RCU
550      * critical section because nested rcu_read_lock()/rcu_read_unlock() calls
551      * are cheap.
552      */
553     RCU_READ_LOCK_GUARD();
554 
555     start_time = qemu_clock_get_ns(QEMU_CLOCK_REALTIME);
556     do {
557         progress = run_poll_handlers_once(ctx, ready_list,
558                                           start_time, timeout);
559         elapsed_time = qemu_clock_get_ns(QEMU_CLOCK_REALTIME) - start_time;
560         max_ns = qemu_soonest_timeout(*timeout, max_ns);
561         assert(!(max_ns && progress));
562     } while (elapsed_time < max_ns && !ctx->fdmon_ops->need_wait(ctx));
563 
564     if (remove_idle_poll_handlers(ctx, ready_list,
565                                   start_time + elapsed_time)) {
566         *timeout = 0;
567         progress = true;
568     }
569 
570     /* If time has passed with no successful polling, adjust *timeout to
571      * keep the same ending time.
572      */
573     if (*timeout != -1) {
574         *timeout -= MIN(*timeout, elapsed_time);
575     }
576 
577     trace_run_poll_handlers_end(ctx, progress, *timeout);
578     return progress;
579 }
580 
581 /* try_poll_mode:
582  * @ctx: the AioContext
583  * @ready_list: list to add handlers that need to be run
584  * @timeout: timeout for blocking wait, computed by the caller and updated if
585  *    polling succeeds.
586  *
587  * Note that the caller must have incremented ctx->list_lock.
588  *
589  * Returns: true if progress was made, false otherwise
590  */
try_poll_mode(AioContext * ctx,AioHandlerList * ready_list,int64_t * timeout)591 static bool try_poll_mode(AioContext *ctx, AioHandlerList *ready_list,
592                           int64_t *timeout)
593 {
594     AioHandler *node;
595     int64_t max_ns;
596 
597     if (QLIST_EMPTY_RCU(&ctx->poll_aio_handlers)) {
598         return false;
599     }
600 
601     max_ns = 0;
602     QLIST_FOREACH(node, &ctx->poll_aio_handlers, node_poll) {
603         max_ns = MAX(max_ns, node->poll.ns);
604     }
605     max_ns = qemu_soonest_timeout(*timeout, max_ns);
606 
607     if (max_ns && !ctx->fdmon_ops->need_wait(ctx)) {
608         /*
609          * Enable poll mode. It pairs with the poll_set_started() in
610          * aio_poll() which disables poll mode.
611          */
612         poll_set_started(ctx, ready_list, true);
613 
614         if (run_poll_handlers(ctx, ready_list, max_ns, timeout)) {
615             return true;
616         }
617     }
618     return false;
619 }
620 
adjust_polling_time(AioContext * ctx,AioPolledEvent * poll,int64_t block_ns)621 static void adjust_polling_time(AioContext *ctx, AioPolledEvent *poll,
622                                 int64_t block_ns)
623 {
624     if (block_ns <= poll->ns) {
625         /* This is the sweet spot, no adjustment needed */
626     } else if (block_ns > ctx->poll_max_ns) {
627         /* We'd have to poll for too long, poll less */
628         int64_t old = poll->ns;
629 
630         if (ctx->poll_shrink) {
631             poll->ns /= ctx->poll_shrink;
632         } else {
633             poll->ns = 0;
634         }
635 
636         trace_poll_shrink(ctx, old, poll->ns);
637     } else if (poll->ns < ctx->poll_max_ns &&
638                block_ns < ctx->poll_max_ns) {
639         /* There is room to grow, poll longer */
640         int64_t old = poll->ns;
641         int64_t grow = ctx->poll_grow;
642 
643         if (grow == 0) {
644             grow = 2;
645         }
646 
647         if (poll->ns) {
648             poll->ns *= grow;
649         } else {
650             poll->ns = 4000; /* start polling at 4 microseconds */
651         }
652 
653         if (poll->ns > ctx->poll_max_ns) {
654             poll->ns = ctx->poll_max_ns;
655         }
656 
657         trace_poll_grow(ctx, old, poll->ns);
658     }
659 }
660 
aio_poll(AioContext * ctx,bool blocking)661 bool aio_poll(AioContext *ctx, bool blocking)
662 {
663     AioHandlerList ready_list = QLIST_HEAD_INITIALIZER(ready_list);
664     bool progress;
665     bool use_notify_me;
666     int64_t timeout;
667     int64_t start = 0;
668     int64_t block_ns = 0;
669 
670     /*
671      * There cannot be two concurrent aio_poll calls for the same AioContext (or
672      * an aio_poll concurrent with a GSource prepare/check/dispatch callback).
673      * We rely on this below to avoid slow locked accesses to ctx->notify_me.
674      *
675      * aio_poll() may only be called in the AioContext's thread. iohandler_ctx
676      * is special in that it runs in the main thread, but that thread's context
677      * is qemu_aio_context.
678      */
679     assert(in_aio_context_home_thread(ctx == iohandler_get_aio_context() ?
680                                       qemu_get_aio_context() : ctx));
681 
682     qemu_lockcnt_inc(&ctx->list_lock);
683 
684     if (ctx->poll_max_ns) {
685         start = qemu_clock_get_ns(QEMU_CLOCK_REALTIME);
686     }
687 
688     timeout = blocking ? aio_compute_timeout(ctx) : 0;
689     progress = try_poll_mode(ctx, &ready_list, &timeout);
690     assert(!(timeout && progress));
691 
692     /*
693      * aio_notify can avoid the expensive event_notifier_set if
694      * everything (file descriptors, bottom halves, timers) will
695      * be re-evaluated before the next blocking poll().  This is
696      * already true when aio_poll is called with blocking == false;
697      * if blocking == true, it is only true after poll() returns,
698      * so disable the optimization now.
699      */
700     use_notify_me = timeout != 0;
701     if (use_notify_me) {
702         qatomic_set(&ctx->notify_me, qatomic_read(&ctx->notify_me) + 2);
703         /*
704          * Write ctx->notify_me before reading ctx->notified.  Pairs with
705          * smp_mb in aio_notify().
706          */
707         smp_mb();
708 
709         /* Don't block if aio_notify() was called */
710         if (qatomic_read(&ctx->notified)) {
711             timeout = 0;
712         }
713     }
714 
715     /* If polling is allowed, non-blocking aio_poll does not need the
716      * system call---a single round of run_poll_handlers_once suffices.
717      */
718     if (timeout || ctx->fdmon_ops->need_wait(ctx)) {
719         /*
720          * Disable poll mode. poll mode should be disabled before the call
721          * of ctx->fdmon_ops->wait() so that guest's notification can wake
722          * up IO threads when some work becomes pending. It is essential to
723          * avoid hangs or unnecessary latency.
724          */
725         if (poll_set_started(ctx, &ready_list, false)) {
726             timeout = 0;
727             progress = true;
728         }
729 
730         ctx->fdmon_ops->wait(ctx, &ready_list, timeout);
731     }
732 
733     if (use_notify_me) {
734         /* Finish the poll before clearing the flag.  */
735         qatomic_store_release(&ctx->notify_me,
736                              qatomic_read(&ctx->notify_me) - 2);
737     }
738 
739     aio_notify_accept(ctx);
740 
741     /* Calculate blocked time for adaptive polling */
742     if (ctx->poll_max_ns) {
743         block_ns = qemu_clock_get_ns(QEMU_CLOCK_REALTIME) - start;
744     }
745 
746     progress |= aio_bh_poll(ctx);
747     progress |= aio_dispatch_ready_handlers(ctx, &ready_list, block_ns);
748 
749     aio_free_deleted_handlers(ctx);
750 
751     qemu_lockcnt_dec(&ctx->list_lock);
752 
753     progress |= timerlistgroup_run_timers(&ctx->tlg);
754 
755     return progress;
756 }
757 
aio_context_setup(AioContext * ctx)758 void aio_context_setup(AioContext *ctx)
759 {
760     ctx->fdmon_ops = &fdmon_poll_ops;
761     ctx->epollfd = -1;
762 
763     /* Use the fastest fd monitoring implementation if available */
764     if (fdmon_io_uring_setup(ctx)) {
765         return;
766     }
767 
768     fdmon_epoll_setup(ctx);
769 }
770 
aio_context_destroy(AioContext * ctx)771 void aio_context_destroy(AioContext *ctx)
772 {
773     fdmon_io_uring_destroy(ctx);
774     fdmon_epoll_disable(ctx);
775     aio_free_deleted_handlers(ctx);
776 }
777 
aio_context_use_g_source(AioContext * ctx)778 void aio_context_use_g_source(AioContext *ctx)
779 {
780     /*
781      * Disable io_uring when the glib main loop is used because it doesn't
782      * support mixed glib/aio_poll() usage. It relies on aio_poll() being
783      * called regularly so that changes to the monitored file descriptors are
784      * submitted, otherwise a list of pending fd handlers builds up.
785      */
786     fdmon_io_uring_destroy(ctx);
787     aio_free_deleted_handlers(ctx);
788 }
789 
aio_context_set_poll_params(AioContext * ctx,int64_t max_ns,int64_t grow,int64_t shrink,Error ** errp)790 void aio_context_set_poll_params(AioContext *ctx, int64_t max_ns,
791                                  int64_t grow, int64_t shrink, Error **errp)
792 {
793     AioHandler *node;
794 
795     qemu_lockcnt_inc(&ctx->list_lock);
796     QLIST_FOREACH(node, &ctx->aio_handlers, node) {
797         node->poll.ns = 0;
798     }
799     qemu_lockcnt_dec(&ctx->list_lock);
800 
801     /* No thread synchronization here, it doesn't matter if an incorrect value
802      * is used once.
803      */
804     ctx->poll_max_ns = max_ns;
805     ctx->poll_grow = grow;
806     ctx->poll_shrink = shrink;
807 
808     aio_notify(ctx);
809 }
810 
aio_context_set_aio_params(AioContext * ctx,int64_t max_batch)811 void aio_context_set_aio_params(AioContext *ctx, int64_t max_batch)
812 {
813     /*
814      * No thread synchronization here, it doesn't matter if an incorrect value
815      * is used once.
816      */
817     ctx->aio_max_batch = max_batch;
818 
819     aio_notify(ctx);
820 }
821