diff --git a/fs/io_uring.c b/fs/io_uring.c index 852024a94873b5cab47c39936744fb2e747aa038..4aa484955572a2e8d204be6eb854bc702638789a 100644 --- a/fs/io_uring.c +++ b/fs/io_uring.c @@ -77,6 +77,7 @@ #include #include #include +#include #define CREATE_TRACE_POINTS #include @@ -291,7 +292,6 @@ struct io_ring_ctx { struct { spinlock_t completion_lock; - struct llist_head poll_llist; /* * ->poll_list is protected by the ctx->uring_lock for @@ -560,10 +560,6 @@ struct io_kiocb { }; struct io_async_ctx *io; - /* - * llist_node is only used for poll deferred completions - */ - struct llist_node llist_node; bool needs_fixed_file; u8 opcode; @@ -581,7 +577,17 @@ struct io_kiocb { struct list_head inflight_entry; - struct io_wq_work work; + union { + /* + * Only commands that never go async can use the below fields, + * obviously. Right now only IORING_OP_POLL_ADD uses them. + */ + struct { + struct task_struct *task; + struct callback_head task_work; + }; + struct io_wq_work work; + }; }; #define IO_PLUG_THRESHOLD 2 @@ -776,10 +782,10 @@ static int __io_sqe_files_update(struct io_ring_ctx *ctx, static int io_grab_files(struct io_kiocb *req); static void io_ring_file_ref_flush(struct fixed_file_data *data); static void io_cleanup_req(struct io_kiocb *req); -static int io_file_get(struct io_submit_state *state, - struct io_kiocb *req, - int fd, struct file **out_file, - bool fixed); +static int io_file_get(struct io_submit_state *state, struct io_kiocb *req, + int fd, struct file **out_file, bool fixed); +static void __io_queue_sqe(struct io_kiocb *req, + const struct io_uring_sqe *sqe); static struct kmem_cache *req_cachep; @@ -850,7 +856,6 @@ static struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p) mutex_init(&ctx->uring_lock); init_waitqueue_head(&ctx->wait); spin_lock_init(&ctx->completion_lock); - init_llist_head(&ctx->poll_llist); INIT_LIST_HEAD(&ctx->poll_list); INIT_LIST_HEAD(&ctx->defer_list); INIT_LIST_HEAD(&ctx->timeout_list); @@ -1084,24 +1089,19 @@ static inline bool io_should_trigger_evfd(struct io_ring_ctx *ctx) return false; if (!ctx->eventfd_async) return true; - return io_wq_current_is_worker() || in_interrupt(); + return io_wq_current_is_worker(); } -static void __io_cqring_ev_posted(struct io_ring_ctx *ctx, bool trigger_ev) +static void io_cqring_ev_posted(struct io_ring_ctx *ctx) { if (waitqueue_active(&ctx->wait)) wake_up(&ctx->wait); if (waitqueue_active(&ctx->sqo_wait)) wake_up(&ctx->sqo_wait); - if (trigger_ev) + if (io_should_trigger_evfd(ctx)) eventfd_signal(ctx->cq_ev_fd, 1); } -static void io_cqring_ev_posted(struct io_ring_ctx *ctx) -{ - __io_cqring_ev_posted(ctx, io_should_trigger_evfd(ctx)); -} - /* Returns true if there are no backlogged entries after the flush */ static bool io_cqring_overflow_flush(struct io_ring_ctx *ctx, bool force) { @@ -3557,18 +3557,27 @@ static int io_connect(struct io_kiocb *req, struct io_kiocb **nxt, #endif } -static void io_poll_remove_one(struct io_kiocb *req) +static bool io_poll_remove_one(struct io_kiocb *req) { struct io_poll_iocb *poll = &req->poll; + bool do_complete = false; spin_lock(&poll->head->lock); WRITE_ONCE(poll->canceled, true); if (!list_empty(&poll->wait.entry)) { list_del_init(&poll->wait.entry); - io_queue_async_work(req); + do_complete = true; } spin_unlock(&poll->head->lock); hash_del(&req->hash_node); + if (do_complete) { + io_cqring_fill_event(req, -ECANCELED); + io_commit_cqring(req->ctx); + req->flags |= REQ_F_COMP_LOCKED; + io_put_req(req); + } + + return do_complete; } static void io_poll_remove_all(struct io_ring_ctx *ctx) @@ -3586,6 +3595,8 @@ static void io_poll_remove_all(struct io_ring_ctx *ctx) io_poll_remove_one(req); } spin_unlock_irq(&ctx->completion_lock); + + io_cqring_ev_posted(ctx); } static int io_poll_cancel(struct io_ring_ctx *ctx, __u64 sqe_addr) @@ -3595,10 +3606,11 @@ static int io_poll_cancel(struct io_ring_ctx *ctx, __u64 sqe_addr) list = &ctx->cancel_hash[hash_long(sqe_addr, ctx->cancel_hash_bits)]; hlist_for_each_entry(req, list, hash_node) { - if (sqe_addr == req->user_data) { - io_poll_remove_one(req); + if (sqe_addr != req->user_data) + continue; + if (io_poll_remove_one(req)) return 0; - } + return -EALREADY; } return -ENOENT; @@ -3648,92 +3660,28 @@ static void io_poll_complete(struct io_kiocb *req, __poll_t mask, int error) io_commit_cqring(ctx); } -static void io_poll_complete_work(struct io_wq_work **workptr) +static void io_poll_task_handler(struct io_kiocb *req, struct io_kiocb **nxt) { - struct io_wq_work *work = *workptr; - struct io_kiocb *req = container_of(work, struct io_kiocb, work); - struct io_poll_iocb *poll = &req->poll; - struct poll_table_struct pt = { ._key = poll->events }; struct io_ring_ctx *ctx = req->ctx; - struct io_kiocb *nxt = NULL; - __poll_t mask = 0; - int ret = 0; - if (work->flags & IO_WQ_WORK_CANCEL) { - WRITE_ONCE(poll->canceled, true); - ret = -ECANCELED; - } else if (READ_ONCE(poll->canceled)) { - ret = -ECANCELED; - } - - if (ret != -ECANCELED) - mask = vfs_poll(poll->file, &pt) & poll->events; - - /* - * Note that ->ki_cancel callers also delete iocb from active_reqs after - * calling ->ki_cancel. We need the ctx_lock roundtrip here to - * synchronize with them. In the cancellation case the list_del_init - * itself is not actually needed, but harmless so we keep it in to - * avoid further branches in the fast path. - */ spin_lock_irq(&ctx->completion_lock); - if (!mask && ret != -ECANCELED) { - add_wait_queue(poll->head, &poll->wait); - spin_unlock_irq(&ctx->completion_lock); - return; - } hash_del(&req->hash_node); - io_poll_complete(req, mask, ret); - spin_unlock_irq(&ctx->completion_lock); - - io_cqring_ev_posted(ctx); - - if (ret < 0) - req_set_fail_links(req); - io_put_req_find_next(req, &nxt); - if (nxt) - io_wq_assign_next(workptr, nxt); -} - -static void __io_poll_flush(struct io_ring_ctx *ctx, struct llist_node *nodes) -{ - struct io_kiocb *req, *tmp; - struct req_batch rb; - - rb.to_free = rb.need_iter = 0; - spin_lock_irq(&ctx->completion_lock); - llist_for_each_entry_safe(req, tmp, nodes, llist_node) { - hash_del(&req->hash_node); - io_poll_complete(req, req->result, 0); - - if (refcount_dec_and_test(&req->refs) && - !io_req_multi_free(&rb, req)) { - req->flags |= REQ_F_COMP_LOCKED; - io_free_req(req); - } - } + io_poll_complete(req, req->result, 0); + req->flags |= REQ_F_COMP_LOCKED; + io_put_req_find_next(req, nxt); spin_unlock_irq(&ctx->completion_lock); io_cqring_ev_posted(ctx); - io_free_req_many(ctx, &rb); -} - -static void io_poll_flush(struct io_wq_work **workptr) -{ - struct io_kiocb *req = container_of(*workptr, struct io_kiocb, work); - struct llist_node *nodes; - - nodes = llist_del_all(&req->ctx->poll_llist); - if (nodes) - __io_poll_flush(req->ctx, nodes); } -static void io_poll_trigger_evfd(struct io_wq_work **workptr) +static void io_poll_task_func(struct callback_head *cb) { - struct io_kiocb *req = container_of(*workptr, struct io_kiocb, work); + struct io_kiocb *req = container_of(cb, struct io_kiocb, task_work); + struct io_kiocb *nxt = NULL; - eventfd_signal(req->ctx->cq_ev_fd, 1); - io_put_req(req); + io_poll_task_handler(req, &nxt); + if (nxt) + __io_queue_sqe(nxt, NULL); } static int io_poll_wake(struct wait_queue_entry *wait, unsigned mode, int sync, @@ -3741,8 +3689,8 @@ static int io_poll_wake(struct wait_queue_entry *wait, unsigned mode, int sync, { struct io_kiocb *req = wait->private; struct io_poll_iocb *poll = &req->poll; - struct io_ring_ctx *ctx = req->ctx; __poll_t mask = key_to_poll(key); + struct task_struct *tsk; /* for instances that support it check for an event match first: */ if (mask && !(mask & poll->events)) @@ -3750,46 +3698,11 @@ static int io_poll_wake(struct wait_queue_entry *wait, unsigned mode, int sync, list_del_init(&poll->wait.entry); - /* - * Run completion inline if we can. We're using trylock here because - * we are violating the completion_lock -> poll wq lock ordering. - * If we have a link timeout we're going to need the completion_lock - * for finalizing the request, mark us as having grabbed that already. - */ - if (mask) { - unsigned long flags; - - if (llist_empty(&ctx->poll_llist) && - spin_trylock_irqsave(&ctx->completion_lock, flags)) { - bool trigger_ev; - - hash_del(&req->hash_node); - io_poll_complete(req, mask, 0); - - trigger_ev = io_should_trigger_evfd(ctx); - if (trigger_ev && eventfd_signal_count()) { - trigger_ev = false; - req->work.func = io_poll_trigger_evfd; - } else { - req->flags |= REQ_F_COMP_LOCKED; - io_put_req(req); - req = NULL; - } - spin_unlock_irqrestore(&ctx->completion_lock, flags); - __io_cqring_ev_posted(ctx, trigger_ev); - } else { - req->result = mask; - req->llist_node.next = NULL; - /* if the list wasn't empty, we're done */ - if (!llist_add(&req->llist_node, &ctx->poll_llist)) - req = NULL; - else - req->work.func = io_poll_flush; - } - } - if (req) - io_queue_async_work(req); - + tsk = req->task; + req->result = mask; + init_task_work(&req->task_work, io_poll_task_func); + task_work_add(tsk, &req->task_work, true); + wake_up_process(tsk); return 1; } @@ -3837,6 +3750,9 @@ static int io_poll_add_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe events = READ_ONCE(sqe->poll_events); poll->events = demangle_poll(events) | EPOLLERR | EPOLLHUP; + + /* task will wait for requests on exit, don't need a ref */ + req->task = current; return 0; } @@ -3848,7 +3764,6 @@ static int io_poll_add(struct io_kiocb *req, struct io_kiocb **nxt) bool cancel = false; __poll_t mask; - INIT_IO_WORK(&req->work, io_poll_complete_work); INIT_HLIST_NODE(&req->hash_node); poll->head = NULL; @@ -5285,6 +5200,8 @@ static int io_sq_thread(void *data) if (!list_empty(&ctx->poll_list) || (!time_after(jiffies, timeout) && ret != -EBUSY && !percpu_ref_is_dying(&ctx->refs))) { + if (current->task_works) + task_work_run(); cond_resched(); continue; } @@ -5315,6 +5232,10 @@ static int io_sq_thread(void *data) finish_wait(&ctx->sqo_wait, &wait); break; } + if (current->task_works) { + task_work_run(); + continue; + } if (signal_pending(current)) flush_signals(current); schedule(); @@ -5334,6 +5255,9 @@ static int io_sq_thread(void *data) timeout = jiffies + ctx->sq_thread_idle; } + if (current->task_works) + task_work_run(); + set_fs(old_fs); if (cur_mm) { unuse_mm(cur_mm); @@ -5398,8 +5322,13 @@ static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events, struct io_rings *rings = ctx->rings; int ret = 0; - if (io_cqring_events(ctx, false) >= min_events) - return 0; + do { + if (io_cqring_events(ctx, false) >= min_events) + return 0; + if (!current->task_works) + break; + task_work_run(); + } while (1); if (sig) { #ifdef CONFIG_COMPAT @@ -5419,6 +5348,8 @@ static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events, do { prepare_to_wait_exclusive(&ctx->wait, &iowq.wq, TASK_INTERRUPTIBLE); + if (current->task_works) + task_work_run(); if (io_should_wake(&iowq, false)) break; schedule(); @@ -6745,6 +6676,9 @@ SYSCALL_DEFINE6(io_uring_enter, unsigned int, fd, u32, to_submit, int submitted = 0; struct fd f; + if (current->task_works) + task_work_run(); + if (flags & ~(IORING_ENTER_GETEVENTS | IORING_ENTER_SQ_WAKEUP)) return -EINVAL;