提交 f13fad7b 编写于 作者: J Jens Axboe

io_uring: pass down completion state on the issue side

No functional changes in this patch, just in preparation for having the
completion state be available on the issue side. Later on, this will
allow requests that complete inline to be completed in batches.
Signed-off-by: NJens Axboe <axboe@kernel.dk>
上级 013538bd
...@@ -909,7 +909,8 @@ static void io_cleanup_req(struct io_kiocb *req); ...@@ -909,7 +909,8 @@ static void io_cleanup_req(struct io_kiocb *req);
static int io_file_get(struct io_submit_state *state, struct io_kiocb *req, static int io_file_get(struct io_submit_state *state, struct io_kiocb *req,
int fd, struct file **out_file, bool fixed); int fd, struct file **out_file, bool fixed);
static void __io_queue_sqe(struct io_kiocb *req, static void __io_queue_sqe(struct io_kiocb *req,
const struct io_uring_sqe *sqe); const struct io_uring_sqe *sqe,
struct io_comp_state *cs);
static ssize_t io_import_iovec(int rw, struct io_kiocb *req, static ssize_t io_import_iovec(int rw, struct io_kiocb *req,
struct iovec **iovec, struct iov_iter *iter, struct iovec **iovec, struct iov_iter *iter,
...@@ -2806,7 +2807,7 @@ static void io_async_buf_retry(struct callback_head *cb) ...@@ -2806,7 +2807,7 @@ static void io_async_buf_retry(struct callback_head *cb)
__set_current_state(TASK_RUNNING); __set_current_state(TASK_RUNNING);
if (!__io_sq_thread_acquire_mm(ctx)) { if (!__io_sq_thread_acquire_mm(ctx)) {
mutex_lock(&ctx->uring_lock); mutex_lock(&ctx->uring_lock);
__io_queue_sqe(req, NULL); __io_queue_sqe(req, NULL, NULL);
mutex_unlock(&ctx->uring_lock); mutex_unlock(&ctx->uring_lock);
} else { } else {
__io_async_buf_error(req, -EFAULT); __io_async_buf_error(req, -EFAULT);
...@@ -4430,7 +4431,7 @@ static void io_poll_task_func(struct callback_head *cb) ...@@ -4430,7 +4431,7 @@ static void io_poll_task_func(struct callback_head *cb)
struct io_ring_ctx *ctx = nxt->ctx; struct io_ring_ctx *ctx = nxt->ctx;
mutex_lock(&ctx->uring_lock); mutex_lock(&ctx->uring_lock);
__io_queue_sqe(nxt, NULL); __io_queue_sqe(nxt, NULL, NULL);
mutex_unlock(&ctx->uring_lock); mutex_unlock(&ctx->uring_lock);
} }
} }
...@@ -4555,7 +4556,7 @@ static void io_async_task_func(struct callback_head *cb) ...@@ -4555,7 +4556,7 @@ static void io_async_task_func(struct callback_head *cb)
goto end_req; goto end_req;
} }
mutex_lock(&ctx->uring_lock); mutex_lock(&ctx->uring_lock);
__io_queue_sqe(req, NULL); __io_queue_sqe(req, NULL, NULL);
mutex_unlock(&ctx->uring_lock); mutex_unlock(&ctx->uring_lock);
} else { } else {
io_cqring_ev_posted(ctx); io_cqring_ev_posted(ctx);
...@@ -5352,7 +5353,7 @@ static void io_cleanup_req(struct io_kiocb *req) ...@@ -5352,7 +5353,7 @@ static void io_cleanup_req(struct io_kiocb *req)
} }
static int io_issue_sqe(struct io_kiocb *req, const struct io_uring_sqe *sqe, static int io_issue_sqe(struct io_kiocb *req, const struct io_uring_sqe *sqe,
bool force_nonblock) bool force_nonblock, struct io_comp_state *cs)
{ {
struct io_ring_ctx *ctx = req->ctx; struct io_ring_ctx *ctx = req->ctx;
int ret; int ret;
...@@ -5637,7 +5638,7 @@ static void io_wq_submit_work(struct io_wq_work **workptr) ...@@ -5637,7 +5638,7 @@ static void io_wq_submit_work(struct io_wq_work **workptr)
if (!ret) { if (!ret) {
do { do {
ret = io_issue_sqe(req, NULL, false); ret = io_issue_sqe(req, NULL, false, NULL);
/* /*
* We can get EAGAIN for polled IO even though we're * We can get EAGAIN for polled IO even though we're
* forcing a sync submission from here, since we can't * forcing a sync submission from here, since we can't
...@@ -5814,7 +5815,8 @@ static struct io_kiocb *io_prep_linked_timeout(struct io_kiocb *req) ...@@ -5814,7 +5815,8 @@ static struct io_kiocb *io_prep_linked_timeout(struct io_kiocb *req)
return nxt; return nxt;
} }
static void __io_queue_sqe(struct io_kiocb *req, const struct io_uring_sqe *sqe) static void __io_queue_sqe(struct io_kiocb *req, const struct io_uring_sqe *sqe,
struct io_comp_state *cs)
{ {
struct io_kiocb *linked_timeout; struct io_kiocb *linked_timeout;
struct io_kiocb *nxt; struct io_kiocb *nxt;
...@@ -5834,7 +5836,7 @@ static void __io_queue_sqe(struct io_kiocb *req, const struct io_uring_sqe *sqe) ...@@ -5834,7 +5836,7 @@ static void __io_queue_sqe(struct io_kiocb *req, const struct io_uring_sqe *sqe)
old_creds = override_creds(req->work.creds); old_creds = override_creds(req->work.creds);
} }
ret = io_issue_sqe(req, sqe, true); ret = io_issue_sqe(req, sqe, true, cs);
/* /*
* We async punt it if the file wasn't marked NOWAIT, or if the file * We async punt it if the file wasn't marked NOWAIT, or if the file
...@@ -5892,7 +5894,8 @@ static void __io_queue_sqe(struct io_kiocb *req, const struct io_uring_sqe *sqe) ...@@ -5892,7 +5894,8 @@ static void __io_queue_sqe(struct io_kiocb *req, const struct io_uring_sqe *sqe)
revert_creds(old_creds); revert_creds(old_creds);
} }
static void io_queue_sqe(struct io_kiocb *req, const struct io_uring_sqe *sqe) static void io_queue_sqe(struct io_kiocb *req, const struct io_uring_sqe *sqe,
struct io_comp_state *cs)
{ {
int ret; int ret;
...@@ -5921,21 +5924,22 @@ static void io_queue_sqe(struct io_kiocb *req, const struct io_uring_sqe *sqe) ...@@ -5921,21 +5924,22 @@ static void io_queue_sqe(struct io_kiocb *req, const struct io_uring_sqe *sqe)
req->work.flags |= IO_WQ_WORK_CONCURRENT; req->work.flags |= IO_WQ_WORK_CONCURRENT;
io_queue_async_work(req); io_queue_async_work(req);
} else { } else {
__io_queue_sqe(req, sqe); __io_queue_sqe(req, sqe, cs);
} }
} }
static inline void io_queue_link_head(struct io_kiocb *req) static inline void io_queue_link_head(struct io_kiocb *req,
struct io_comp_state *cs)
{ {
if (unlikely(req->flags & REQ_F_FAIL_LINK)) { if (unlikely(req->flags & REQ_F_FAIL_LINK)) {
io_put_req(req); io_put_req(req);
io_req_complete(req, -ECANCELED); io_req_complete(req, -ECANCELED);
} else } else
io_queue_sqe(req, NULL); io_queue_sqe(req, NULL, cs);
} }
static int io_submit_sqe(struct io_kiocb *req, const struct io_uring_sqe *sqe, static int io_submit_sqe(struct io_kiocb *req, const struct io_uring_sqe *sqe,
struct io_kiocb **link) struct io_kiocb **link, struct io_comp_state *cs)
{ {
struct io_ring_ctx *ctx = req->ctx; struct io_ring_ctx *ctx = req->ctx;
int ret; int ret;
...@@ -5975,7 +5979,7 @@ static int io_submit_sqe(struct io_kiocb *req, const struct io_uring_sqe *sqe, ...@@ -5975,7 +5979,7 @@ static int io_submit_sqe(struct io_kiocb *req, const struct io_uring_sqe *sqe,
/* last request of a link, enqueue the link */ /* last request of a link, enqueue the link */
if (!(req->flags & (REQ_F_LINK | REQ_F_HARDLINK))) { if (!(req->flags & (REQ_F_LINK | REQ_F_HARDLINK))) {
io_queue_link_head(head); io_queue_link_head(head, cs);
*link = NULL; *link = NULL;
} }
} else { } else {
...@@ -5995,18 +5999,47 @@ static int io_submit_sqe(struct io_kiocb *req, const struct io_uring_sqe *sqe, ...@@ -5995,18 +5999,47 @@ static int io_submit_sqe(struct io_kiocb *req, const struct io_uring_sqe *sqe,
req->flags |= REQ_F_FAIL_LINK; req->flags |= REQ_F_FAIL_LINK;
*link = req; *link = req;
} else { } else {
io_queue_sqe(req, sqe); io_queue_sqe(req, sqe, cs);
} }
} }
return 0; return 0;
} }
static void io_submit_flush_completions(struct io_comp_state *cs)
{
struct io_ring_ctx *ctx = cs->ctx;
spin_lock_irq(&ctx->completion_lock);
while (!list_empty(&cs->list)) {
struct io_kiocb *req;
req = list_first_entry(&cs->list, struct io_kiocb, list);
list_del(&req->list);
io_cqring_fill_event(req, req->result);
if (!(req->flags & REQ_F_LINK_HEAD)) {
req->flags |= REQ_F_COMP_LOCKED;
io_put_req(req);
} else {
spin_unlock_irq(&ctx->completion_lock);
io_put_req(req);
spin_lock_irq(&ctx->completion_lock);
}
}
io_commit_cqring(ctx);
spin_unlock_irq(&ctx->completion_lock);
io_cqring_ev_posted(ctx);
cs->nr = 0;
}
/* /*
* Batched submission is done, ensure local IO is flushed out. * Batched submission is done, ensure local IO is flushed out.
*/ */
static void io_submit_state_end(struct io_submit_state *state) static void io_submit_state_end(struct io_submit_state *state)
{ {
if (!list_empty(&state->comp.list))
io_submit_flush_completions(&state->comp);
blk_finish_plug(&state->plug); blk_finish_plug(&state->plug);
io_state_file_put(state); io_state_file_put(state);
if (state->free_reqs) if (state->free_reqs)
...@@ -6196,7 +6229,7 @@ static int io_submit_sqes(struct io_ring_ctx *ctx, unsigned int nr, ...@@ -6196,7 +6229,7 @@ static int io_submit_sqes(struct io_ring_ctx *ctx, unsigned int nr,
trace_io_uring_submit_sqe(ctx, req->opcode, req->user_data, trace_io_uring_submit_sqe(ctx, req->opcode, req->user_data,
true, io_async_submit(ctx)); true, io_async_submit(ctx));
err = io_submit_sqe(req, sqe, &link); err = io_submit_sqe(req, sqe, &link, &state.comp);
if (err) if (err)
goto fail_req; goto fail_req;
} }
...@@ -6207,7 +6240,7 @@ static int io_submit_sqes(struct io_ring_ctx *ctx, unsigned int nr, ...@@ -6207,7 +6240,7 @@ static int io_submit_sqes(struct io_ring_ctx *ctx, unsigned int nr,
percpu_ref_put_many(&ctx->refs, nr - ref_used); percpu_ref_put_many(&ctx->refs, nr - ref_used);
} }
if (link) if (link)
io_queue_link_head(link); io_queue_link_head(link, &state.comp);
io_submit_state_end(&state); io_submit_state_end(&state);
/* Commit SQ ring head once we've consumed and submitted all SQEs */ /* Commit SQ ring head once we've consumed and submitted all SQEs */
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册