From e9fd939654f17651ff65e7e55aa6934d29eb4335 Mon Sep 17 00:00:00 2001 From: Pavel Begunkov Date: Wed, 4 Mar 2020 16:14:12 +0300 Subject: [PATCH] io_uring/io-wq: forward submission ref to async First it changes io-wq interfaces. It replaces {get,put}_work() with free_work(), which guaranteed to be called exactly once. It also enforces free_work() callback to be non-NULL. io_uring follows the changes and instead of putting a submission reference in io_put_req_async_completion(), it will be done in io_free_work(). As removes io_get_work() with corresponding refcount_inc(), the ref balance is maintained. Signed-off-by: Pavel Begunkov Signed-off-by: Jens Axboe --- fs/io-wq.c | 29 ++++++++++++++--------------- fs/io-wq.h | 6 ++---- fs/io_uring.c | 31 +++++++++++-------------------- 3 files changed, 27 insertions(+), 39 deletions(-) diff --git a/fs/io-wq.c b/fs/io-wq.c index 82e76011d409..eda36f997dea 100644 --- a/fs/io-wq.c +++ b/fs/io-wq.c @@ -107,8 +107,7 @@ struct io_wq { struct io_wqe **wqes; unsigned long state; - get_work_fn *get_work; - put_work_fn *put_work; + free_work_fn *free_work; struct task_struct *manager; struct user_struct *user; @@ -509,16 +508,11 @@ static void io_worker_handle_work(struct io_worker *worker) if (test_bit(IO_WQ_BIT_CANCEL, &wq->state)) work->flags |= IO_WQ_WORK_CANCEL; - if (wq->get_work) - wq->get_work(work); - old_work = work; work->func(&work); work = (old_work == work) ? NULL : work; io_assign_current_work(worker, work); - - if (wq->put_work) - wq->put_work(old_work); + wq->free_work(old_work); if (hash != -1U) { spin_lock_irq(&wqe->lock); @@ -749,14 +743,17 @@ static bool io_wq_can_queue(struct io_wqe *wqe, struct io_wqe_acct *acct, return true; } -static void io_run_cancel(struct io_wq_work *work) +static void io_run_cancel(struct io_wq_work *work, struct io_wqe *wqe) { + struct io_wq *wq = wqe->wq; + do { struct io_wq_work *old_work = work; work->flags |= IO_WQ_WORK_CANCEL; work->func(&work); work = (work == old_work) ? NULL : work; + wq->free_work(old_work); } while (work); } @@ -773,7 +770,7 @@ static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work) * It's close enough to not be an issue, fork() has the same delay. */ if (unlikely(!io_wq_can_queue(wqe, acct, work))) { - io_run_cancel(work); + io_run_cancel(work, wqe); return; } @@ -912,7 +909,7 @@ static enum io_wq_cancel io_wqe_cancel_cb_work(struct io_wqe *wqe, spin_unlock_irqrestore(&wqe->lock, flags); if (found) { - io_run_cancel(work); + io_run_cancel(work, wqe); return IO_WQ_CANCEL_OK; } @@ -987,7 +984,7 @@ static enum io_wq_cancel io_wqe_cancel_work(struct io_wqe *wqe, spin_unlock_irqrestore(&wqe->lock, flags); if (found) { - io_run_cancel(work); + io_run_cancel(work, wqe); return IO_WQ_CANCEL_OK; } @@ -1064,6 +1061,9 @@ struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data) int ret = -ENOMEM, node; struct io_wq *wq; + if (WARN_ON_ONCE(!data->free_work)) + return ERR_PTR(-EINVAL); + wq = kzalloc(sizeof(*wq), GFP_KERNEL); if (!wq) return ERR_PTR(-ENOMEM); @@ -1074,8 +1074,7 @@ struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data) return ERR_PTR(-ENOMEM); } - wq->get_work = data->get_work; - wq->put_work = data->put_work; + wq->free_work = data->free_work; /* caller must already hold a reference to this */ wq->user = data->user; @@ -1132,7 +1131,7 @@ struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data) bool io_wq_get(struct io_wq *wq, struct io_wq_data *data) { - if (data->get_work != wq->get_work || data->put_work != wq->put_work) + if (data->free_work != wq->free_work) return false; return refcount_inc_not_zero(&wq->use_refs); diff --git a/fs/io-wq.h b/fs/io-wq.h index a0978d6958f0..2117b9a4f161 100644 --- a/fs/io-wq.h +++ b/fs/io-wq.h @@ -81,14 +81,12 @@ struct io_wq_work { *(work) = (struct io_wq_work){ .func = _func }; \ } while (0) \ -typedef void (get_work_fn)(struct io_wq_work *); -typedef void (put_work_fn)(struct io_wq_work *); +typedef void (free_work_fn)(struct io_wq_work *); struct io_wq_data { struct user_struct *user; - get_work_fn *get_work; - put_work_fn *put_work; + free_work_fn *free_work; }; struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data); diff --git a/fs/io_uring.c b/fs/io_uring.c index 40ca9e6a5ace..0d6f4b3b8f13 100644 --- a/fs/io_uring.c +++ b/fs/io_uring.c @@ -1558,8 +1558,8 @@ static void io_put_req(struct io_kiocb *req) io_free_req(req); } -static void io_put_req_async_completion(struct io_kiocb *req, - struct io_wq_work **workptr) +static void io_steal_work(struct io_kiocb *req, + struct io_wq_work **workptr) { /* * It's in an io-wq worker, so there always should be at least @@ -1569,7 +1569,6 @@ static void io_put_req_async_completion(struct io_kiocb *req, * It also means, that if the counter dropped to 1, then there is * no asynchronous users left, so it's safe to steal the next work. */ - refcount_dec(&req->refs); if (refcount_read(&req->refs) == 1) { struct io_kiocb *nxt = NULL; @@ -2578,7 +2577,7 @@ static bool io_req_cancelled(struct io_kiocb *req) if (req->work.flags & IO_WQ_WORK_CANCEL) { req_set_fail_links(req); io_cqring_add_event(req, -ECANCELED); - io_double_put_req(req); + io_put_req(req); return true; } @@ -2606,7 +2605,7 @@ static void io_fsync_finish(struct io_wq_work **workptr) if (io_req_cancelled(req)) return; __io_fsync(req); - io_put_req_async_completion(req, workptr); + io_steal_work(req, workptr); } static int io_fsync(struct io_kiocb *req, bool force_nonblock) @@ -2639,7 +2638,7 @@ static void io_fallocate_finish(struct io_wq_work **workptr) if (io_req_cancelled(req)) return; __io_fallocate(req); - io_put_req_async_completion(req, workptr); + io_steal_work(req, workptr); } static int io_fallocate_prep(struct io_kiocb *req, @@ -3006,7 +3005,7 @@ static void io_close_finish(struct io_wq_work **workptr) /* not cancellable, don't do io_req_cancelled() */ __io_close_finish(req); - io_put_req_async_completion(req, workptr); + io_steal_work(req, workptr); } static int io_close(struct io_kiocb *req, bool force_nonblock) @@ -3452,7 +3451,7 @@ static void io_accept_finish(struct io_wq_work **workptr) if (io_req_cancelled(req)) return; __io_accept(req, false); - io_put_req_async_completion(req, workptr); + io_steal_work(req, workptr); } #endif @@ -4719,7 +4718,7 @@ static void io_wq_submit_work(struct io_wq_work **workptr) io_put_req(req); } - io_put_req_async_completion(req, workptr); + io_steal_work(req, workptr); } static int io_req_needs_file(struct io_kiocb *req, int fd) @@ -6105,21 +6104,14 @@ static int io_sqe_files_update(struct io_ring_ctx *ctx, void __user *arg, return __io_sqe_files_update(ctx, &up, nr_args); } -static void io_put_work(struct io_wq_work *work) +static void io_free_work(struct io_wq_work *work) { struct io_kiocb *req = container_of(work, struct io_kiocb, work); - /* Consider that io_put_req_async_completion() relies on this ref */ + /* Consider that io_steal_work() relies on this ref */ io_put_req(req); } -static void io_get_work(struct io_wq_work *work) -{ - struct io_kiocb *req = container_of(work, struct io_kiocb, work); - - refcount_inc(&req->refs); -} - static int io_init_wq_offload(struct io_ring_ctx *ctx, struct io_uring_params *p) { @@ -6130,8 +6122,7 @@ static int io_init_wq_offload(struct io_ring_ctx *ctx, int ret = 0; data.user = ctx->user; - data.get_work = io_get_work; - data.put_work = io_put_work; + data.free_work = io_free_work; if (!(p->flags & IORING_SETUP_ATTACH_WQ)) { /* Do QD, or 4 * CPUS, whatever is smallest */ -- GitLab