diff --git a/aio-posix.c b/aio-posix.c index feab8e8a00f0d2caf95c7f82c6f2f09f7adb26fa..c6adddbd8262d00254cc88298f2fd738425b253d 100644 --- a/aio-posix.c +++ b/aio-posix.c @@ -18,6 +18,8 @@ #include "block/block.h" #include "qemu/queue.h" #include "qemu/sockets.h" +#include "qemu/cutils.h" +#include "trace.h" #ifdef CONFIG_EPOLL_CREATE1 #include #endif @@ -27,6 +29,7 @@ struct AioHandler GPollFD pfd; IOHandler *io_read; IOHandler *io_write; + AioPollFn *io_poll; int deleted; void *opaque; bool is_external; @@ -210,7 +213,7 @@ void aio_set_fd_handler(AioContext *ctx, node = find_aio_handler(ctx, fd); /* Are we deleting the fd handler? */ - if (!io_read && !io_write) { + if (!io_read && !io_write && !io_poll) { if (node == NULL) { return; } @@ -229,6 +232,10 @@ void aio_set_fd_handler(AioContext *ctx, QLIST_REMOVE(node, node); deleted = true; } + + if (!node->io_poll) { + ctx->poll_disable_cnt--; + } } else { if (node == NULL) { /* Alloc and insert if it's not already there */ @@ -238,10 +245,16 @@ void aio_set_fd_handler(AioContext *ctx, g_source_add_poll(&ctx->source, &node->pfd); is_new = true; + + ctx->poll_disable_cnt += !io_poll; + } else { + ctx->poll_disable_cnt += !io_poll - !node->io_poll; } + /* Update handler with latest information */ node->io_read = io_read; node->io_write = io_write; + node->io_poll = io_poll; node->opaque = opaque; node->is_external = is_external; @@ -251,6 +264,7 @@ void aio_set_fd_handler(AioContext *ctx, aio_epoll_update(ctx, node, is_new); aio_notify(ctx); + if (deleted) { g_free(node); } @@ -408,10 +422,83 @@ static void add_pollfd(AioHandler *node) npfd++; } +/* run_poll_handlers: + * @ctx: the AioContext + * @max_ns: maximum time to poll for, in nanoseconds + * + * Polls for a given time. + * + * Note that ctx->notify_me must be non-zero so this function can detect + * aio_notify(). + * + * Note that the caller must have incremented ctx->walking_handlers. + * + * Returns: true if progress was made, false otherwise + */ +static bool run_poll_handlers(AioContext *ctx, int64_t max_ns) +{ + bool progress = false; + int64_t end_time; + + assert(ctx->notify_me); + assert(ctx->walking_handlers > 0); + assert(ctx->poll_disable_cnt == 0); + + trace_run_poll_handlers_begin(ctx, max_ns); + + end_time = qemu_clock_get_ns(QEMU_CLOCK_REALTIME) + max_ns; + + do { + AioHandler *node; + + QLIST_FOREACH(node, &ctx->aio_handlers, node) { + if (!node->deleted && node->io_poll && + node->io_poll(node->opaque)) { + progress = true; + } + + /* Caller handles freeing deleted nodes. Don't do it here. */ + } + } while (!progress && qemu_clock_get_ns(QEMU_CLOCK_REALTIME) < end_time); + + trace_run_poll_handlers_end(ctx, progress); + + return progress; +} + +/* try_poll_mode: + * @ctx: the AioContext + * @blocking: polling is only attempted when blocking is true + * + * If blocking is true then ctx->notify_me must be non-zero so this function + * can detect aio_notify(). + * + * Note that the caller must have incremented ctx->walking_handlers. + * + * Returns: true if progress was made, false otherwise + */ +static bool try_poll_mode(AioContext *ctx, bool blocking) +{ + if (blocking && ctx->poll_max_ns && ctx->poll_disable_cnt == 0) { + /* See qemu_soonest_timeout() uint64_t hack */ + int64_t max_ns = MIN((uint64_t)aio_compute_timeout(ctx), + (uint64_t)ctx->poll_max_ns); + + if (max_ns) { + if (run_poll_handlers(ctx, max_ns)) { + return true; + } + } + } + + return false; +} + bool aio_poll(AioContext *ctx, bool blocking) { AioHandler *node; - int i, ret; + int i; + int ret = 0; bool progress; int64_t timeout; @@ -431,42 +518,47 @@ bool aio_poll(AioContext *ctx, bool blocking) ctx->walking_handlers++; - assert(npfd == 0); + if (try_poll_mode(ctx, blocking)) { + progress = true; + } else { + assert(npfd == 0); - /* fill pollfds */ + /* fill pollfds */ - if (!aio_epoll_enabled(ctx)) { - QLIST_FOREACH(node, &ctx->aio_handlers, node) { - if (!node->deleted && node->pfd.events - && aio_node_check(ctx, node->is_external)) { - add_pollfd(node); + if (!aio_epoll_enabled(ctx)) { + QLIST_FOREACH(node, &ctx->aio_handlers, node) { + if (!node->deleted && node->pfd.events + && aio_node_check(ctx, node->is_external)) { + add_pollfd(node); + } } } - } - timeout = blocking ? aio_compute_timeout(ctx) : 0; + timeout = blocking ? aio_compute_timeout(ctx) : 0; - /* wait until next event */ - if (timeout) { - aio_context_release(ctx); - } - if (aio_epoll_check_poll(ctx, pollfds, npfd, timeout)) { - AioHandler epoll_handler; - - epoll_handler.pfd.fd = ctx->epollfd; - epoll_handler.pfd.events = G_IO_IN | G_IO_OUT | G_IO_HUP | G_IO_ERR; - npfd = 0; - add_pollfd(&epoll_handler); - ret = aio_epoll(ctx, pollfds, npfd, timeout); - } else { - ret = qemu_poll_ns(pollfds, npfd, timeout); + /* wait until next event */ + if (timeout) { + aio_context_release(ctx); + } + if (aio_epoll_check_poll(ctx, pollfds, npfd, timeout)) { + AioHandler epoll_handler; + + epoll_handler.pfd.fd = ctx->epollfd; + epoll_handler.pfd.events = G_IO_IN | G_IO_OUT | G_IO_HUP | G_IO_ERR; + npfd = 0; + add_pollfd(&epoll_handler); + ret = aio_epoll(ctx, pollfds, npfd, timeout); + } else { + ret = qemu_poll_ns(pollfds, npfd, timeout); + } + if (timeout) { + aio_context_acquire(ctx); + } } + if (blocking) { atomic_sub(&ctx->notify_me, 2); } - if (timeout) { - aio_context_acquire(ctx); - } aio_notify_accept(ctx); @@ -492,6 +584,13 @@ bool aio_poll(AioContext *ctx, bool blocking) void aio_context_setup(AioContext *ctx) { + /* TODO remove this in final patch submission */ + if (getenv("QEMU_AIO_POLL_MAX_NS")) { + fprintf(stderr, "The QEMU_AIO_POLL_MAX_NS environment variable has " + "been replaced with -object iothread,poll-max-ns=NUM\n"); + exit(1); + } + #ifdef CONFIG_EPOLL_CREATE1 assert(!ctx->epollfd); ctx->epollfd = epoll_create1(EPOLL_CLOEXEC); @@ -503,3 +602,13 @@ void aio_context_setup(AioContext *ctx) } #endif } + +void aio_context_set_poll_params(AioContext *ctx, int64_t max_ns, Error **errp) +{ + /* No thread synchronization here, it doesn't matter if an incorrect poll + * timeout is used once. + */ + ctx->poll_max_ns = max_ns; + + aio_notify(ctx); +} diff --git a/aio-win32.c b/aio-win32.c index 3ef8ea4caa0b7a7b8e2e8e6c097f55a0fe98e31c..0a6e91b0c37799d4b0f272aad8f59d31cfdd24e4 100644 --- a/aio-win32.c +++ b/aio-win32.c @@ -20,6 +20,7 @@ #include "block/block.h" #include "qemu/queue.h" #include "qemu/sockets.h" +#include "qapi/error.h" struct AioHandler { EventNotifier *e; @@ -38,6 +39,7 @@ void aio_set_fd_handler(AioContext *ctx, bool is_external, IOHandler *io_read, IOHandler *io_write, + AioPollFn *io_poll, void *opaque) { /* fd is a SOCKET in our case */ @@ -103,7 +105,8 @@ void aio_set_fd_handler(AioContext *ctx, void aio_set_event_notifier(AioContext *ctx, EventNotifier *e, bool is_external, - EventNotifierHandler *io_notify) + EventNotifierHandler *io_notify, + AioPollFn *io_poll) { AioHandler *node; @@ -376,3 +379,8 @@ bool aio_poll(AioContext *ctx, bool blocking) void aio_context_setup(AioContext *ctx) { } + +void aio_context_set_poll_params(AioContext *ctx, int64_t max_ns, Error **errp) +{ + error_setg(errp, "AioContext polling is not implemented on Windows"); +} diff --git a/async.c b/async.c index 678845fb2cebeb404a15c65b442eaed3ace5fa49..29abf40ca737652ee89077f323f8044ac3b59245 100644 --- a/async.c +++ b/async.c @@ -349,6 +349,15 @@ static void event_notifier_dummy_cb(EventNotifier *e) { } +/* Returns true if aio_notify() was called (e.g. a BH was scheduled) */ +static bool event_notifier_poll(void *opaque) +{ + EventNotifier *e = opaque; + AioContext *ctx = container_of(e, AioContext, notifier); + + return atomic_read(&ctx->notified); +} + AioContext *aio_context_new(Error **errp) { int ret; @@ -367,7 +376,7 @@ AioContext *aio_context_new(Error **errp) false, (EventNotifierHandler *) event_notifier_dummy_cb, - NULL); + event_notifier_poll); #ifdef CONFIG_LINUX_AIO ctx->linux_aio = NULL; #endif @@ -376,6 +385,8 @@ AioContext *aio_context_new(Error **errp) qemu_rec_mutex_init(&ctx->lock); timerlistgroup_init(&ctx->tlg, aio_timerlist_notify, ctx); + ctx->poll_max_ns = 0; + return ctx; fail: g_source_destroy(&ctx->source); diff --git a/include/block/aio.h b/include/block/aio.h index 7d9cd0d03f0e64a9e5330510400b02411f58d3d7..349143f6d9a783a7fc95e590e005557955eaeced 100644 --- a/include/block/aio.h +++ b/include/block/aio.h @@ -131,6 +131,12 @@ struct AioContext { int external_disable_cnt; + /* Number of AioHandlers without .io_poll() */ + int poll_disable_cnt; + + /* Maximum polling time in nanoseconds */ + int64_t poll_max_ns; + /* epoll(7) state used when built with CONFIG_EPOLL */ int epollfd; bool epoll_enabled; @@ -481,4 +487,14 @@ static inline bool aio_context_in_iothread(AioContext *ctx) */ void aio_context_setup(AioContext *ctx); +/** + * aio_context_set_poll_params: + * @ctx: the aio context + * @max_ns: how long to busy poll for, in nanoseconds + * + * Poll mode can be disabled by setting poll_max_ns to 0. + */ +void aio_context_set_poll_params(AioContext *ctx, int64_t max_ns, + Error **errp); + #endif diff --git a/trace-events b/trace-events index f74e1d3d229bd6c2bbadd280e7e40cd5b565d2e3..7fe3a1b0e8ffef0783a41ca51c4633b2b9105971 100644 --- a/trace-events +++ b/trace-events @@ -25,6 +25,10 @@ # # The should be a sprintf()-compatible format string. +# aio-posix.c +run_poll_handlers_begin(void *ctx, int64_t max_ns) "ctx %p max_ns %"PRId64 +run_poll_handlers_end(void *ctx, bool progress) "ctx %p progress %d" + # thread-pool.c thread_pool_submit(void *pool, void *req, void *opaque) "pool %p req %p opaque %p" thread_pool_complete(void *pool, void *req, void *opaque, int ret) "pool %p req %p opaque %p ret %d"