1 /* 2 * QEMU aio implementation 3 * 4 * Copyright IBM, Corp. 2008 5 * 6 * Authors: 7 * Anthony Liguori <aliguori@us.ibm.com> 8 * 9 * This work is licensed under the terms of the GNU GPL, version 2. See 10 * the COPYING file in the top-level directory. 11 * 12 * Contributions after 2012-01-13 are licensed under the terms of the 13 * GNU GPL, version 2 or (at your option) any later version. 14 */ 15 16 #include "qemu/osdep.h" 17 #include "block/block.h" 18 #include "block/thread-pool.h" 19 #include "qemu/main-loop.h" 20 #include "qemu/lockcnt.h" 21 #include "qemu/rcu.h" 22 #include "qemu/rcu_queue.h" 23 #include "qemu/sockets.h" 24 #include "qemu/cutils.h" 25 #include "trace.h" 26 #include "aio-posix.h" 27 28 /* Stop userspace polling on a handler if it isn't active for some time */ 29 #define POLL_IDLE_INTERVAL_NS (7 * NANOSECONDS_PER_SECOND) 30 31 static void adjust_polling_time(AioContext *ctx, AioPolledEvent *poll, 32 int64_t block_ns); 33 34 bool aio_poll_disabled(AioContext *ctx) 35 { 36 return qatomic_read(&ctx->poll_disable_cnt); 37 } 38 39 void aio_add_ready_handler(AioHandlerList *ready_list, 40 AioHandler *node, 41 int revents) 42 { 43 QLIST_SAFE_REMOVE(node, node_ready); /* remove from nested parent's list */ 44 node->pfd.revents = revents; 45 QLIST_INSERT_HEAD(ready_list, node, node_ready); 46 } 47 48 static void aio_add_poll_ready_handler(AioHandlerList *ready_list, 49 AioHandler *node) 50 { 51 QLIST_SAFE_REMOVE(node, node_ready); /* remove from nested parent's list */ 52 node->poll_ready = true; 53 QLIST_INSERT_HEAD(ready_list, node, node_ready); 54 } 55 56 static AioHandler *find_aio_handler(AioContext *ctx, int fd) 57 { 58 AioHandler *node; 59 60 QLIST_FOREACH(node, &ctx->aio_handlers, node) { 61 if (node->pfd.fd == fd) { 62 if (!QLIST_IS_INSERTED(node, node_deleted)) { 63 return node; 64 } 65 } 66 } 67 68 return NULL; 69 } 70 71 static bool aio_remove_fd_handler(AioContext *ctx, AioHandler *node) 72 { 73 /* If the GSource is in the process of being destroyed then 74 * g_source_remove_poll() causes an assertion failure. Skip 75 * removal in that case, because glib cleans up its state during 76 * destruction anyway. 77 */ 78 if (!g_source_is_destroyed(&ctx->source)) { 79 g_source_remove_poll(&ctx->source, &node->pfd); 80 } 81 82 node->pfd.revents = 0; 83 node->poll_ready = false; 84 85 /* If the fd monitor has already marked it deleted, leave it alone */ 86 if (QLIST_IS_INSERTED(node, node_deleted)) { 87 return false; 88 } 89 90 /* If a read is in progress, just mark the node as deleted */ 91 if (qemu_lockcnt_count(&ctx->list_lock)) { 92 QLIST_INSERT_HEAD_RCU(&ctx->deleted_aio_handlers, node, node_deleted); 93 return false; 94 } 95 /* Otherwise, delete it for real. We can't just mark it as 96 * deleted because deleted nodes are only cleaned up while 97 * no one is walking the handlers list. 98 */ 99 QLIST_SAFE_REMOVE(node, node_poll); 100 QLIST_REMOVE(node, node); 101 return true; 102 } 103 104 void aio_set_fd_handler(AioContext *ctx, 105 int fd, 106 IOHandler *io_read, 107 IOHandler *io_write, 108 AioPollFn *io_poll, 109 IOHandler *io_poll_ready, 110 void *opaque) 111 { 112 AioHandler *node; 113 AioHandler *new_node = NULL; 114 bool is_new = false; 115 bool deleted = false; 116 int poll_disable_change; 117 118 if (io_poll && !io_poll_ready) { 119 io_poll = NULL; /* polling only makes sense if there is a handler */ 120 } 121 122 qemu_lockcnt_lock(&ctx->list_lock); 123 124 node = find_aio_handler(ctx, fd); 125 126 /* Are we deleting the fd handler? */ 127 if (!io_read && !io_write && !io_poll) { 128 if (node == NULL) { 129 qemu_lockcnt_unlock(&ctx->list_lock); 130 return; 131 } 132 /* Clean events in order to unregister fd from the ctx epoll. */ 133 node->pfd.events = 0; 134 135 poll_disable_change = -!node->io_poll; 136 } else { 137 poll_disable_change = !io_poll - (node && !node->io_poll); 138 if (node == NULL) { 139 is_new = true; 140 } 141 /* Alloc and insert if it's not already there */ 142 new_node = g_new0(AioHandler, 1); 143 144 /* Update handler with latest information */ 145 new_node->io_read = io_read; 146 new_node->io_write = io_write; 147 new_node->io_poll = io_poll; 148 new_node->io_poll_ready = io_poll_ready; 149 new_node->opaque = opaque; 150 151 if (is_new) { 152 new_node->pfd.fd = fd; 153 } else { 154 new_node->pfd = node->pfd; 155 } 156 g_source_add_poll(&ctx->source, &new_node->pfd); 157 158 new_node->pfd.events = (io_read ? G_IO_IN | G_IO_HUP | G_IO_ERR : 0); 159 new_node->pfd.events |= (io_write ? G_IO_OUT | G_IO_ERR : 0); 160 161 QLIST_INSERT_HEAD_RCU(&ctx->aio_handlers, new_node, node); 162 } 163 164 /* No need to order poll_disable_cnt writes against other updates; 165 * the counter is only used to avoid wasting time and latency on 166 * iterated polling when the system call will be ultimately necessary. 167 * Changing handlers is a rare event, and a little wasted polling until 168 * the aio_notify below is not an issue. 169 */ 170 qatomic_set(&ctx->poll_disable_cnt, 171 qatomic_read(&ctx->poll_disable_cnt) + poll_disable_change); 172 173 ctx->fdmon_ops->update(ctx, node, new_node); 174 if (node) { 175 deleted = aio_remove_fd_handler(ctx, node); 176 } 177 qemu_lockcnt_unlock(&ctx->list_lock); 178 aio_notify(ctx); 179 180 if (deleted) { 181 g_free(node); 182 } 183 } 184 185 static void aio_set_fd_poll(AioContext *ctx, int fd, 186 IOHandler *io_poll_begin, 187 IOHandler *io_poll_end) 188 { 189 AioHandler *node = find_aio_handler(ctx, fd); 190 191 if (!node) { 192 return; 193 } 194 195 node->io_poll_begin = io_poll_begin; 196 node->io_poll_end = io_poll_end; 197 } 198 199 void aio_set_event_notifier(AioContext *ctx, 200 EventNotifier *notifier, 201 EventNotifierHandler *io_read, 202 AioPollFn *io_poll, 203 EventNotifierHandler *io_poll_ready) 204 { 205 aio_set_fd_handler(ctx, event_notifier_get_fd(notifier), 206 (IOHandler *)io_read, NULL, io_poll, 207 (IOHandler *)io_poll_ready, notifier); 208 } 209 210 void aio_set_event_notifier_poll(AioContext *ctx, 211 EventNotifier *notifier, 212 EventNotifierHandler *io_poll_begin, 213 EventNotifierHandler *io_poll_end) 214 { 215 aio_set_fd_poll(ctx, event_notifier_get_fd(notifier), 216 (IOHandler *)io_poll_begin, 217 (IOHandler *)io_poll_end); 218 } 219 220 static bool poll_set_started(AioContext *ctx, AioHandlerList *ready_list, 221 bool started) 222 { 223 AioHandler *node; 224 bool progress = false; 225 226 if (started == ctx->poll_started) { 227 return false; 228 } 229 230 ctx->poll_started = started; 231 232 qemu_lockcnt_inc(&ctx->list_lock); 233 QLIST_FOREACH(node, &ctx->poll_aio_handlers, node_poll) { 234 IOHandler *fn; 235 236 if (QLIST_IS_INSERTED(node, node_deleted)) { 237 continue; 238 } 239 240 if (started) { 241 fn = node->io_poll_begin; 242 } else { 243 fn = node->io_poll_end; 244 } 245 246 if (fn) { 247 fn(node->opaque); 248 } 249 250 /* Poll one last time in case ->io_poll_end() raced with the event */ 251 if (!started && node->io_poll(node->opaque)) { 252 aio_add_poll_ready_handler(ready_list, node); 253 progress = true; 254 } 255 } 256 qemu_lockcnt_dec(&ctx->list_lock); 257 258 return progress; 259 } 260 261 262 bool aio_prepare(AioContext *ctx) 263 { 264 AioHandlerList ready_list = QLIST_HEAD_INITIALIZER(ready_list); 265 266 /* Poll mode cannot be used with glib's event loop, disable it. */ 267 poll_set_started(ctx, &ready_list, false); 268 /* TODO what to do with this list? */ 269 270 return false; 271 } 272 273 bool aio_pending(AioContext *ctx) 274 { 275 AioHandler *node; 276 bool result = false; 277 278 /* 279 * We have to walk very carefully in case aio_set_fd_handler is 280 * called while we're walking. 281 */ 282 qemu_lockcnt_inc(&ctx->list_lock); 283 284 QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) { 285 int revents; 286 287 /* TODO should this check poll ready? */ 288 revents = node->pfd.revents & node->pfd.events; 289 if (revents & (G_IO_IN | G_IO_HUP | G_IO_ERR) && node->io_read) { 290 result = true; 291 break; 292 } 293 if (revents & (G_IO_OUT | G_IO_ERR) && node->io_write) { 294 result = true; 295 break; 296 } 297 } 298 qemu_lockcnt_dec(&ctx->list_lock); 299 300 return result; 301 } 302 303 static void aio_free_deleted_handlers(AioContext *ctx) 304 { 305 AioHandler *node; 306 307 if (QLIST_EMPTY_RCU(&ctx->deleted_aio_handlers)) { 308 return; 309 } 310 if (!qemu_lockcnt_dec_if_lock(&ctx->list_lock)) { 311 return; /* we are nested, let the parent do the freeing */ 312 } 313 314 while ((node = QLIST_FIRST_RCU(&ctx->deleted_aio_handlers))) { 315 QLIST_REMOVE(node, node); 316 QLIST_REMOVE(node, node_deleted); 317 QLIST_SAFE_REMOVE(node, node_poll); 318 g_free(node); 319 } 320 321 qemu_lockcnt_inc_and_unlock(&ctx->list_lock); 322 } 323 324 static bool aio_dispatch_handler(AioContext *ctx, AioHandler *node) 325 { 326 bool progress = false; 327 bool poll_ready; 328 int revents; 329 330 revents = node->pfd.revents & node->pfd.events; 331 node->pfd.revents = 0; 332 333 poll_ready = node->poll_ready; 334 node->poll_ready = false; 335 336 /* 337 * Start polling AioHandlers when they become ready because activity is 338 * likely to continue. Note that starvation is theoretically possible when 339 * fdmon_supports_polling(), but only until the fd fires for the first 340 * time. 341 */ 342 if (!QLIST_IS_INSERTED(node, node_deleted) && 343 !QLIST_IS_INSERTED(node, node_poll) && 344 node->io_poll) { 345 trace_poll_add(ctx, node, node->pfd.fd, revents); 346 if (ctx->poll_started && node->io_poll_begin) { 347 node->io_poll_begin(node->opaque); 348 } 349 QLIST_INSERT_HEAD(&ctx->poll_aio_handlers, node, node_poll); 350 } 351 if (!QLIST_IS_INSERTED(node, node_deleted) && 352 poll_ready && revents == 0 && node->io_poll_ready) { 353 /* 354 * Remove temporarily to avoid infinite loops when ->io_poll_ready() 355 * calls aio_poll() before clearing the condition that made the poll 356 * handler become ready. 357 */ 358 QLIST_SAFE_REMOVE(node, node_poll); 359 360 node->io_poll_ready(node->opaque); 361 362 if (!QLIST_IS_INSERTED(node, node_poll)) { 363 QLIST_INSERT_HEAD(&ctx->poll_aio_handlers, node, node_poll); 364 } 365 366 /* 367 * Return early since revents was zero. aio_notify() does not count as 368 * progress. 369 */ 370 return node->opaque != &ctx->notifier; 371 } 372 373 if (!QLIST_IS_INSERTED(node, node_deleted) && 374 (revents & (G_IO_IN | G_IO_HUP | G_IO_ERR)) && 375 node->io_read) { 376 node->io_read(node->opaque); 377 378 /* aio_notify() does not count as progress */ 379 if (node->opaque != &ctx->notifier) { 380 progress = true; 381 } 382 } 383 if (!QLIST_IS_INSERTED(node, node_deleted) && 384 (revents & (G_IO_OUT | G_IO_ERR)) && 385 node->io_write) { 386 node->io_write(node->opaque); 387 progress = true; 388 } 389 390 return progress; 391 } 392 393 /* 394 * If we have a list of ready handlers then this is more efficient than 395 * scanning all handlers with aio_dispatch_handlers(). 396 */ 397 static bool aio_dispatch_ready_handlers(AioContext *ctx, 398 AioHandlerList *ready_list, 399 int64_t block_ns) 400 { 401 bool progress = false; 402 AioHandler *node; 403 404 while ((node = QLIST_FIRST(ready_list))) { 405 QLIST_REMOVE(node, node_ready); 406 progress = aio_dispatch_handler(ctx, node) || progress; 407 408 /* 409 * Adjust polling time only after aio_dispatch_handler(), which can 410 * add the handler to ctx->poll_aio_handlers. 411 */ 412 if (ctx->poll_max_ns && QLIST_IS_INSERTED(node, node_poll)) { 413 adjust_polling_time(ctx, &node->poll, block_ns); 414 } 415 } 416 417 return progress; 418 } 419 420 /* Slower than aio_dispatch_ready_handlers() but only used via glib */ 421 static bool aio_dispatch_handlers(AioContext *ctx) 422 { 423 AioHandler *node, *tmp; 424 bool progress = false; 425 426 QLIST_FOREACH_SAFE_RCU(node, &ctx->aio_handlers, node, tmp) { 427 progress = aio_dispatch_handler(ctx, node) || progress; 428 } 429 430 return progress; 431 } 432 433 void aio_dispatch(AioContext *ctx) 434 { 435 qemu_lockcnt_inc(&ctx->list_lock); 436 aio_bh_poll(ctx); 437 aio_dispatch_handlers(ctx); 438 aio_free_deleted_handlers(ctx); 439 qemu_lockcnt_dec(&ctx->list_lock); 440 441 timerlistgroup_run_timers(&ctx->tlg); 442 } 443 444 static bool run_poll_handlers_once(AioContext *ctx, 445 AioHandlerList *ready_list, 446 int64_t now, 447 int64_t *timeout) 448 { 449 bool progress = false; 450 AioHandler *node; 451 AioHandler *tmp; 452 453 QLIST_FOREACH_SAFE(node, &ctx->poll_aio_handlers, node_poll, tmp) { 454 if (node->io_poll(node->opaque)) { 455 aio_add_poll_ready_handler(ready_list, node); 456 457 node->poll_idle_timeout = now + POLL_IDLE_INTERVAL_NS; 458 459 /* 460 * Polling was successful, exit try_poll_mode immediately 461 * to adjust the next polling time. 462 */ 463 *timeout = 0; 464 if (node->opaque != &ctx->notifier) { 465 progress = true; 466 } 467 } 468 469 /* Caller handles freeing deleted nodes. Don't do it here. */ 470 } 471 472 return progress; 473 } 474 475 static bool fdmon_supports_polling(AioContext *ctx) 476 { 477 return ctx->fdmon_ops->need_wait != aio_poll_disabled; 478 } 479 480 static bool remove_idle_poll_handlers(AioContext *ctx, 481 AioHandlerList *ready_list, 482 int64_t now) 483 { 484 AioHandler *node; 485 AioHandler *tmp; 486 bool progress = false; 487 488 /* 489 * File descriptor monitoring implementations without userspace polling 490 * support suffer from starvation when a subset of handlers is polled 491 * because fds will not be processed in a timely fashion. Don't remove 492 * idle poll handlers. 493 */ 494 if (!fdmon_supports_polling(ctx)) { 495 return false; 496 } 497 498 QLIST_FOREACH_SAFE(node, &ctx->poll_aio_handlers, node_poll, tmp) { 499 if (node->poll_idle_timeout == 0LL) { 500 node->poll_idle_timeout = now + POLL_IDLE_INTERVAL_NS; 501 } else if (now >= node->poll_idle_timeout) { 502 trace_poll_remove(ctx, node, node->pfd.fd); 503 node->poll_idle_timeout = 0LL; 504 QLIST_SAFE_REMOVE(node, node_poll); 505 if (ctx->poll_started && node->io_poll_end) { 506 node->io_poll_end(node->opaque); 507 508 /* 509 * Final poll in case ->io_poll_end() races with an event. 510 * Nevermind about re-adding the handler in the rare case where 511 * this causes progress. 512 */ 513 if (node->io_poll(node->opaque)) { 514 aio_add_poll_ready_handler(ready_list, node); 515 progress = true; 516 } 517 } 518 } 519 } 520 521 return progress; 522 } 523 524 /* run_poll_handlers: 525 * @ctx: the AioContext 526 * @ready_list: the list to place ready handlers on 527 * @max_ns: maximum time to poll for, in nanoseconds 528 * 529 * Polls for a given time. 530 * 531 * Note that the caller must have incremented ctx->list_lock. 532 * 533 * Returns: true if progress was made, false otherwise 534 */ 535 static bool run_poll_handlers(AioContext *ctx, AioHandlerList *ready_list, 536 int64_t max_ns, int64_t *timeout) 537 { 538 bool progress; 539 int64_t start_time, elapsed_time; 540 541 assert(qemu_lockcnt_count(&ctx->list_lock) > 0); 542 543 trace_run_poll_handlers_begin(ctx, max_ns, *timeout); 544 545 /* 546 * Optimization: ->io_poll() handlers often contain RCU read critical 547 * sections and we therefore see many rcu_read_lock() -> rcu_read_unlock() 548 * -> rcu_read_lock() -> ... sequences with expensive memory 549 * synchronization primitives. Make the entire polling loop an RCU 550 * critical section because nested rcu_read_lock()/rcu_read_unlock() calls 551 * are cheap. 552 */ 553 RCU_READ_LOCK_GUARD(); 554 555 start_time = qemu_clock_get_ns(QEMU_CLOCK_REALTIME); 556 do { 557 progress = run_poll_handlers_once(ctx, ready_list, 558 start_time, timeout); 559 elapsed_time = qemu_clock_get_ns(QEMU_CLOCK_REALTIME) - start_time; 560 max_ns = qemu_soonest_timeout(*timeout, max_ns); 561 assert(!(max_ns && progress)); 562 } while (elapsed_time < max_ns && !ctx->fdmon_ops->need_wait(ctx)); 563 564 if (remove_idle_poll_handlers(ctx, ready_list, 565 start_time + elapsed_time)) { 566 *timeout = 0; 567 progress = true; 568 } 569 570 /* If time has passed with no successful polling, adjust *timeout to 571 * keep the same ending time. 572 */ 573 if (*timeout != -1) { 574 *timeout -= MIN(*timeout, elapsed_time); 575 } 576 577 trace_run_poll_handlers_end(ctx, progress, *timeout); 578 return progress; 579 } 580 581 /* try_poll_mode: 582 * @ctx: the AioContext 583 * @ready_list: list to add handlers that need to be run 584 * @timeout: timeout for blocking wait, computed by the caller and updated if 585 * polling succeeds. 586 * 587 * Note that the caller must have incremented ctx->list_lock. 588 * 589 * Returns: true if progress was made, false otherwise 590 */ 591 static bool try_poll_mode(AioContext *ctx, AioHandlerList *ready_list, 592 int64_t *timeout) 593 { 594 AioHandler *node; 595 int64_t max_ns; 596 597 if (QLIST_EMPTY_RCU(&ctx->poll_aio_handlers)) { 598 return false; 599 } 600 601 max_ns = 0; 602 QLIST_FOREACH(node, &ctx->poll_aio_handlers, node_poll) { 603 max_ns = MAX(max_ns, node->poll.ns); 604 } 605 max_ns = qemu_soonest_timeout(*timeout, max_ns); 606 607 if (max_ns && !ctx->fdmon_ops->need_wait(ctx)) { 608 /* 609 * Enable poll mode. It pairs with the poll_set_started() in 610 * aio_poll() which disables poll mode. 611 */ 612 poll_set_started(ctx, ready_list, true); 613 614 if (run_poll_handlers(ctx, ready_list, max_ns, timeout)) { 615 return true; 616 } 617 } 618 return false; 619 } 620 621 static void adjust_polling_time(AioContext *ctx, AioPolledEvent *poll, 622 int64_t block_ns) 623 { 624 if (block_ns <= poll->ns) { 625 /* This is the sweet spot, no adjustment needed */ 626 } else if (block_ns > ctx->poll_max_ns) { 627 /* We'd have to poll for too long, poll less */ 628 int64_t old = poll->ns; 629 630 if (ctx->poll_shrink) { 631 poll->ns /= ctx->poll_shrink; 632 } else { 633 poll->ns = 0; 634 } 635 636 trace_poll_shrink(ctx, old, poll->ns); 637 } else if (poll->ns < ctx->poll_max_ns && 638 block_ns < ctx->poll_max_ns) { 639 /* There is room to grow, poll longer */ 640 int64_t old = poll->ns; 641 int64_t grow = ctx->poll_grow; 642 643 if (grow == 0) { 644 grow = 2; 645 } 646 647 if (poll->ns) { 648 poll->ns *= grow; 649 } else { 650 poll->ns = 4000; /* start polling at 4 microseconds */ 651 } 652 653 if (poll->ns > ctx->poll_max_ns) { 654 poll->ns = ctx->poll_max_ns; 655 } 656 657 trace_poll_grow(ctx, old, poll->ns); 658 } 659 } 660 661 bool aio_poll(AioContext *ctx, bool blocking) 662 { 663 AioHandlerList ready_list = QLIST_HEAD_INITIALIZER(ready_list); 664 bool progress; 665 bool use_notify_me; 666 int64_t timeout; 667 int64_t start = 0; 668 int64_t block_ns = 0; 669 670 /* 671 * There cannot be two concurrent aio_poll calls for the same AioContext (or 672 * an aio_poll concurrent with a GSource prepare/check/dispatch callback). 673 * We rely on this below to avoid slow locked accesses to ctx->notify_me. 674 * 675 * aio_poll() may only be called in the AioContext's thread. iohandler_ctx 676 * is special in that it runs in the main thread, but that thread's context 677 * is qemu_aio_context. 678 */ 679 assert(in_aio_context_home_thread(ctx == iohandler_get_aio_context() ? 680 qemu_get_aio_context() : ctx)); 681 682 qemu_lockcnt_inc(&ctx->list_lock); 683 684 if (ctx->poll_max_ns) { 685 start = qemu_clock_get_ns(QEMU_CLOCK_REALTIME); 686 } 687 688 timeout = blocking ? aio_compute_timeout(ctx) : 0; 689 progress = try_poll_mode(ctx, &ready_list, &timeout); 690 assert(!(timeout && progress)); 691 692 /* 693 * aio_notify can avoid the expensive event_notifier_set if 694 * everything (file descriptors, bottom halves, timers) will 695 * be re-evaluated before the next blocking poll(). This is 696 * already true when aio_poll is called with blocking == false; 697 * if blocking == true, it is only true after poll() returns, 698 * so disable the optimization now. 699 */ 700 use_notify_me = timeout != 0; 701 if (use_notify_me) { 702 qatomic_set(&ctx->notify_me, qatomic_read(&ctx->notify_me) + 2); 703 /* 704 * Write ctx->notify_me before reading ctx->notified. Pairs with 705 * smp_mb in aio_notify(). 706 */ 707 smp_mb(); 708 709 /* Don't block if aio_notify() was called */ 710 if (qatomic_read(&ctx->notified)) { 711 timeout = 0; 712 } 713 } 714 715 /* If polling is allowed, non-blocking aio_poll does not need the 716 * system call---a single round of run_poll_handlers_once suffices. 717 */ 718 if (timeout || ctx->fdmon_ops->need_wait(ctx)) { 719 /* 720 * Disable poll mode. poll mode should be disabled before the call 721 * of ctx->fdmon_ops->wait() so that guest's notification can wake 722 * up IO threads when some work becomes pending. It is essential to 723 * avoid hangs or unnecessary latency. 724 */ 725 if (poll_set_started(ctx, &ready_list, false)) { 726 timeout = 0; 727 progress = true; 728 } 729 730 ctx->fdmon_ops->wait(ctx, &ready_list, timeout); 731 } 732 733 if (use_notify_me) { 734 /* Finish the poll before clearing the flag. */ 735 qatomic_store_release(&ctx->notify_me, 736 qatomic_read(&ctx->notify_me) - 2); 737 } 738 739 aio_notify_accept(ctx); 740 741 /* Calculate blocked time for adaptive polling */ 742 if (ctx->poll_max_ns) { 743 block_ns = qemu_clock_get_ns(QEMU_CLOCK_REALTIME) - start; 744 } 745 746 progress |= aio_bh_poll(ctx); 747 progress |= aio_dispatch_ready_handlers(ctx, &ready_list, block_ns); 748 749 aio_free_deleted_handlers(ctx); 750 751 qemu_lockcnt_dec(&ctx->list_lock); 752 753 progress |= timerlistgroup_run_timers(&ctx->tlg); 754 755 return progress; 756 } 757 758 void aio_context_setup(AioContext *ctx) 759 { 760 ctx->fdmon_ops = &fdmon_poll_ops; 761 ctx->epollfd = -1; 762 763 /* Use the fastest fd monitoring implementation if available */ 764 if (fdmon_io_uring_setup(ctx)) { 765 return; 766 } 767 768 fdmon_epoll_setup(ctx); 769 } 770 771 void aio_context_destroy(AioContext *ctx) 772 { 773 fdmon_io_uring_destroy(ctx); 774 fdmon_epoll_disable(ctx); 775 aio_free_deleted_handlers(ctx); 776 } 777 778 void aio_context_use_g_source(AioContext *ctx) 779 { 780 /* 781 * Disable io_uring when the glib main loop is used because it doesn't 782 * support mixed glib/aio_poll() usage. It relies on aio_poll() being 783 * called regularly so that changes to the monitored file descriptors are 784 * submitted, otherwise a list of pending fd handlers builds up. 785 */ 786 fdmon_io_uring_destroy(ctx); 787 aio_free_deleted_handlers(ctx); 788 } 789 790 void aio_context_set_poll_params(AioContext *ctx, int64_t max_ns, 791 int64_t grow, int64_t shrink, Error **errp) 792 { 793 AioHandler *node; 794 795 qemu_lockcnt_inc(&ctx->list_lock); 796 QLIST_FOREACH(node, &ctx->aio_handlers, node) { 797 node->poll.ns = 0; 798 } 799 qemu_lockcnt_dec(&ctx->list_lock); 800 801 /* No thread synchronization here, it doesn't matter if an incorrect value 802 * is used once. 803 */ 804 ctx->poll_max_ns = max_ns; 805 ctx->poll_grow = grow; 806 ctx->poll_shrink = shrink; 807 808 aio_notify(ctx); 809 } 810 811 void aio_context_set_aio_params(AioContext *ctx, int64_t max_batch) 812 { 813 /* 814 * No thread synchronization here, it doesn't matter if an incorrect value 815 * is used once. 816 */ 817 ctx->aio_max_batch = max_batch; 818 819 aio_notify(ctx); 820 } 821