提交 aa124ba8 编写于 作者: J Jens Axboe 提交者: Caspar Zhang

io_uring: add submission polling

commit 6c271ce2f1d572f7fa225700a13cfe7ced492434 upstream.

This enables an application to do IO, without ever entering the kernel.
By using the SQ ring to fill in new sqes and watching for completions
on the CQ ring, we can submit and reap IOs without doing a single system
call. The kernel side thread will poll for new submissions, and in case
of HIPRI/polled IO, it'll also poll for completions.

By default, we allow 1 second of active spinning. This can by changed
by passing in a different grace period at io_uring_register(2) time.
If the thread exceeds this idle time without having any work to do, it
will set:

sq_ring->flags |= IORING_SQ_NEED_WAKEUP.

The application will have to call io_uring_enter() to start things back
up again. If IO is kept busy, that will never be needed. Basically an
application that has this feature enabled will guard it's
io_uring_enter(2) call with:

read_barrier();
if (*sq_ring->flags & IORING_SQ_NEED_WAKEUP)
	io_uring_enter(fd, 0, 0, IORING_ENTER_SQ_WAKEUP);

instead of calling it unconditionally.

It's mandatory to use fixed files with this feature. Failure to do so
will result in the application getting an -EBADF CQ entry when
submitting IO.
Reviewed-by: NHannes Reinecke <hare@suse.com>
Signed-off-by: NJens Axboe <axboe@kernel.dk>
Signed-off-by: NJoseph Qi <joseph.qi@linux.alibaba.com>
Reviewed-by: NJeffle Xu <jefflexu@linux.alibaba.com>
Acked-by: NCaspar Zhang <caspar@linux.alibaba.com>
上级 7bfbdad6
...@@ -44,6 +44,7 @@ ...@@ -44,6 +44,7 @@
#include <linux/percpu.h> #include <linux/percpu.h>
#include <linux/slab.h> #include <linux/slab.h>
#include <linux/workqueue.h> #include <linux/workqueue.h>
#include <linux/kthread.h>
#include <linux/blkdev.h> #include <linux/blkdev.h>
#include <linux/bvec.h> #include <linux/bvec.h>
#include <linux/net.h> #include <linux/net.h>
...@@ -108,12 +109,16 @@ struct io_ring_ctx { ...@@ -108,12 +109,16 @@ struct io_ring_ctx {
unsigned cached_sq_head; unsigned cached_sq_head;
unsigned sq_entries; unsigned sq_entries;
unsigned sq_mask; unsigned sq_mask;
unsigned sq_thread_idle;
struct io_uring_sqe *sq_sqes; struct io_uring_sqe *sq_sqes;
} ____cacheline_aligned_in_smp; } ____cacheline_aligned_in_smp;
/* IO offload */ /* IO offload */
struct workqueue_struct *sqo_wq; struct workqueue_struct *sqo_wq;
struct task_struct *sqo_thread; /* if using sq thread polling */
struct mm_struct *sqo_mm; struct mm_struct *sqo_mm;
wait_queue_head_t sqo_wait;
unsigned sqo_stop;
struct { struct {
/* CQ ring */ /* CQ ring */
...@@ -168,6 +173,7 @@ struct sqe_submit { ...@@ -168,6 +173,7 @@ struct sqe_submit {
unsigned short index; unsigned short index;
bool has_user; bool has_user;
bool needs_lock; bool needs_lock;
bool needs_fixed_file;
}; };
struct io_kiocb { struct io_kiocb {
...@@ -327,6 +333,8 @@ static void io_cqring_add_event(struct io_ring_ctx *ctx, u64 ki_user_data, ...@@ -327,6 +333,8 @@ static void io_cqring_add_event(struct io_ring_ctx *ctx, u64 ki_user_data,
if (waitqueue_active(&ctx->wait)) if (waitqueue_active(&ctx->wait))
wake_up(&ctx->wait); wake_up(&ctx->wait);
if (waitqueue_active(&ctx->sqo_wait))
wake_up(&ctx->sqo_wait);
} }
static void io_ring_drop_ctx_refs(struct io_ring_ctx *ctx, unsigned refs) static void io_ring_drop_ctx_refs(struct io_ring_ctx *ctx, unsigned refs)
...@@ -680,9 +688,10 @@ static bool io_file_supports_async(struct file *file) ...@@ -680,9 +688,10 @@ static bool io_file_supports_async(struct file *file)
return false; return false;
} }
static int io_prep_rw(struct io_kiocb *req, const struct io_uring_sqe *sqe, static int io_prep_rw(struct io_kiocb *req, const struct sqe_submit *s,
bool force_nonblock, struct io_submit_state *state) bool force_nonblock, struct io_submit_state *state)
{ {
const struct io_uring_sqe *sqe = s->sqe;
struct io_ring_ctx *ctx = req->ctx; struct io_ring_ctx *ctx = req->ctx;
struct kiocb *kiocb = &req->rw; struct kiocb *kiocb = &req->rw;
unsigned ioprio, flags; unsigned ioprio, flags;
...@@ -702,6 +711,8 @@ static int io_prep_rw(struct io_kiocb *req, const struct io_uring_sqe *sqe, ...@@ -702,6 +711,8 @@ static int io_prep_rw(struct io_kiocb *req, const struct io_uring_sqe *sqe,
kiocb->ki_filp = ctx->user_files[fd]; kiocb->ki_filp = ctx->user_files[fd];
req->flags |= REQ_F_FIXED_FILE; req->flags |= REQ_F_FIXED_FILE;
} else { } else {
if (s->needs_fixed_file)
return -EBADF;
kiocb->ki_filp = io_file_get(state, fd); kiocb->ki_filp = io_file_get(state, fd);
if (unlikely(!kiocb->ki_filp)) if (unlikely(!kiocb->ki_filp))
return -EBADF; return -EBADF;
...@@ -865,7 +876,7 @@ static ssize_t io_read(struct io_kiocb *req, const struct sqe_submit *s, ...@@ -865,7 +876,7 @@ static ssize_t io_read(struct io_kiocb *req, const struct sqe_submit *s,
struct file *file; struct file *file;
ssize_t ret; ssize_t ret;
ret = io_prep_rw(req, s->sqe, force_nonblock, state); ret = io_prep_rw(req, s, force_nonblock, state);
if (ret) if (ret)
return ret; return ret;
file = kiocb->ki_filp; file = kiocb->ki_filp;
...@@ -909,7 +920,7 @@ static ssize_t io_write(struct io_kiocb *req, const struct sqe_submit *s, ...@@ -909,7 +920,7 @@ static ssize_t io_write(struct io_kiocb *req, const struct sqe_submit *s,
struct file *file; struct file *file;
ssize_t ret; ssize_t ret;
ret = io_prep_rw(req, s->sqe, force_nonblock, state); ret = io_prep_rw(req, s, force_nonblock, state);
if (ret) if (ret)
return ret; return ret;
/* Hold on to the file for -EAGAIN */ /* Hold on to the file for -EAGAIN */
...@@ -1301,6 +1312,169 @@ static bool io_get_sqring(struct io_ring_ctx *ctx, struct sqe_submit *s) ...@@ -1301,6 +1312,169 @@ static bool io_get_sqring(struct io_ring_ctx *ctx, struct sqe_submit *s)
return false; return false;
} }
static int io_submit_sqes(struct io_ring_ctx *ctx, struct sqe_submit *sqes,
unsigned int nr, bool has_user, bool mm_fault)
{
struct io_submit_state state, *statep = NULL;
int ret, i, submitted = 0;
if (nr > IO_PLUG_THRESHOLD) {
io_submit_state_start(&state, ctx, nr);
statep = &state;
}
for (i = 0; i < nr; i++) {
if (unlikely(mm_fault)) {
ret = -EFAULT;
} else {
sqes[i].has_user = has_user;
sqes[i].needs_lock = true;
sqes[i].needs_fixed_file = true;
ret = io_submit_sqe(ctx, &sqes[i], statep);
}
if (!ret) {
submitted++;
continue;
}
io_cqring_add_event(ctx, sqes[i].sqe->user_data, ret, 0);
}
if (statep)
io_submit_state_end(&state);
return submitted;
}
static int io_sq_thread(void *data)
{
struct sqe_submit sqes[IO_IOPOLL_BATCH];
struct io_ring_ctx *ctx = data;
struct mm_struct *cur_mm = NULL;
mm_segment_t old_fs;
DEFINE_WAIT(wait);
unsigned inflight;
unsigned long timeout;
old_fs = get_fs();
set_fs(USER_DS);
timeout = inflight = 0;
while (!kthread_should_stop() && !ctx->sqo_stop) {
bool all_fixed, mm_fault = false;
int i;
if (inflight) {
unsigned nr_events = 0;
if (ctx->flags & IORING_SETUP_IOPOLL) {
/*
* We disallow the app entering submit/complete
* with polling, but we still need to lock the
* ring to prevent racing with polled issue
* that got punted to a workqueue.
*/
mutex_lock(&ctx->uring_lock);
io_iopoll_check(ctx, &nr_events, 0);
mutex_unlock(&ctx->uring_lock);
} else {
/*
* Normal IO, just pretend everything completed.
* We don't have to poll completions for that.
*/
nr_events = inflight;
}
inflight -= nr_events;
if (!inflight)
timeout = jiffies + ctx->sq_thread_idle;
}
if (!io_get_sqring(ctx, &sqes[0])) {
/*
* We're polling. If we're within the defined idle
* period, then let us spin without work before going
* to sleep.
*/
if (inflight || !time_after(jiffies, timeout)) {
cpu_relax();
continue;
}
/*
* Drop cur_mm before scheduling, we can't hold it for
* long periods (or over schedule()). Do this before
* adding ourselves to the waitqueue, as the unuse/drop
* may sleep.
*/
if (cur_mm) {
unuse_mm(cur_mm);
mmput(cur_mm);
cur_mm = NULL;
}
prepare_to_wait(&ctx->sqo_wait, &wait,
TASK_INTERRUPTIBLE);
/* Tell userspace we may need a wakeup call */
ctx->sq_ring->flags |= IORING_SQ_NEED_WAKEUP;
smp_wmb();
if (!io_get_sqring(ctx, &sqes[0])) {
if (kthread_should_stop()) {
finish_wait(&ctx->sqo_wait, &wait);
break;
}
if (signal_pending(current))
flush_signals(current);
schedule();
finish_wait(&ctx->sqo_wait, &wait);
ctx->sq_ring->flags &= ~IORING_SQ_NEED_WAKEUP;
smp_wmb();
continue;
}
finish_wait(&ctx->sqo_wait, &wait);
ctx->sq_ring->flags &= ~IORING_SQ_NEED_WAKEUP;
smp_wmb();
}
i = 0;
all_fixed = true;
do {
if (all_fixed && io_sqe_needs_user(sqes[i].sqe))
all_fixed = false;
i++;
if (i == ARRAY_SIZE(sqes))
break;
} while (io_get_sqring(ctx, &sqes[i]));
/* Unless all new commands are FIXED regions, grab mm */
if (!all_fixed && !cur_mm) {
mm_fault = !mmget_not_zero(ctx->sqo_mm);
if (!mm_fault) {
use_mm(ctx->sqo_mm);
cur_mm = ctx->sqo_mm;
}
}
inflight += io_submit_sqes(ctx, sqes, i, cur_mm != NULL,
mm_fault);
/* Commit SQ ring head once we've consumed all SQEs */
io_commit_sqring(ctx);
}
set_fs(old_fs);
if (cur_mm) {
unuse_mm(cur_mm);
mmput(cur_mm);
}
return 0;
}
static int io_ring_submit(struct io_ring_ctx *ctx, unsigned int to_submit) static int io_ring_submit(struct io_ring_ctx *ctx, unsigned int to_submit)
{ {
struct io_submit_state state, *statep = NULL; struct io_submit_state state, *statep = NULL;
...@@ -1319,6 +1493,7 @@ static int io_ring_submit(struct io_ring_ctx *ctx, unsigned int to_submit) ...@@ -1319,6 +1493,7 @@ static int io_ring_submit(struct io_ring_ctx *ctx, unsigned int to_submit)
s.has_user = true; s.has_user = true;
s.needs_lock = false; s.needs_lock = false;
s.needs_fixed_file = false;
ret = io_submit_sqe(ctx, &s, statep); ret = io_submit_sqe(ctx, &s, statep);
if (ret) { if (ret) {
...@@ -1418,8 +1593,20 @@ static int io_sqe_files_unregister(struct io_ring_ctx *ctx) ...@@ -1418,8 +1593,20 @@ static int io_sqe_files_unregister(struct io_ring_ctx *ctx)
return 0; return 0;
} }
static void io_sq_thread_stop(struct io_ring_ctx *ctx)
{
if (ctx->sqo_thread) {
ctx->sqo_stop = 1;
mb();
kthread_stop(ctx->sqo_thread);
ctx->sqo_thread = NULL;
}
}
static void io_finish_async(struct io_ring_ctx *ctx) static void io_finish_async(struct io_ring_ctx *ctx)
{ {
io_sq_thread_stop(ctx);
if (ctx->sqo_wq) { if (ctx->sqo_wq) {
destroy_workqueue(ctx->sqo_wq); destroy_workqueue(ctx->sqo_wq);
ctx->sqo_wq = NULL; ctx->sqo_wq = NULL;
...@@ -1583,13 +1770,47 @@ static int io_sqe_files_register(struct io_ring_ctx *ctx, void __user *arg, ...@@ -1583,13 +1770,47 @@ static int io_sqe_files_register(struct io_ring_ctx *ctx, void __user *arg,
return ret; return ret;
} }
static int io_sq_offload_start(struct io_ring_ctx *ctx) static int io_sq_offload_start(struct io_ring_ctx *ctx,
struct io_uring_params *p)
{ {
int ret; int ret;
init_waitqueue_head(&ctx->sqo_wait);
mmgrab(current->mm); mmgrab(current->mm);
ctx->sqo_mm = current->mm; ctx->sqo_mm = current->mm;
ctx->sq_thread_idle = msecs_to_jiffies(p->sq_thread_idle);
if (!ctx->sq_thread_idle)
ctx->sq_thread_idle = HZ;
ret = -EINVAL;
if (!cpu_possible(p->sq_thread_cpu))
goto err;
if (ctx->flags & IORING_SETUP_SQPOLL) {
if (p->flags & IORING_SETUP_SQ_AFF) {
int cpu;
cpu = array_index_nospec(p->sq_thread_cpu, NR_CPUS);
ctx->sqo_thread = kthread_create_on_cpu(io_sq_thread,
ctx, cpu,
"io_uring-sq");
} else {
ctx->sqo_thread = kthread_create(io_sq_thread, ctx,
"io_uring-sq");
}
if (IS_ERR(ctx->sqo_thread)) {
ret = PTR_ERR(ctx->sqo_thread);
ctx->sqo_thread = NULL;
goto err;
}
wake_up_process(ctx->sqo_thread);
} else if (p->flags & IORING_SETUP_SQ_AFF) {
/* Can't have SQ_AFF without SQPOLL */
ret = -EINVAL;
goto err;
}
/* Do QD, or 2 * CPUS, whatever is smallest */ /* Do QD, or 2 * CPUS, whatever is smallest */
ctx->sqo_wq = alloc_workqueue("io_ring-wq", WQ_UNBOUND | WQ_FREEZABLE, ctx->sqo_wq = alloc_workqueue("io_ring-wq", WQ_UNBOUND | WQ_FREEZABLE,
min(ctx->sq_entries - 1, 2 * num_online_cpus())); min(ctx->sq_entries - 1, 2 * num_online_cpus()));
...@@ -1600,6 +1821,7 @@ static int io_sq_offload_start(struct io_ring_ctx *ctx) ...@@ -1600,6 +1821,7 @@ static int io_sq_offload_start(struct io_ring_ctx *ctx)
return 0; return 0;
err: err:
io_sq_thread_stop(ctx);
mmdrop(ctx->sqo_mm); mmdrop(ctx->sqo_mm);
ctx->sqo_mm = NULL; ctx->sqo_mm = NULL;
return ret; return ret;
...@@ -1959,7 +2181,7 @@ SYSCALL_DEFINE6(io_uring_enter, unsigned int, fd, u32, to_submit, ...@@ -1959,7 +2181,7 @@ SYSCALL_DEFINE6(io_uring_enter, unsigned int, fd, u32, to_submit,
int submitted = 0; int submitted = 0;
struct fd f; struct fd f;
if (flags & ~IORING_ENTER_GETEVENTS) if (flags & ~(IORING_ENTER_GETEVENTS | IORING_ENTER_SQ_WAKEUP))
return -EINVAL; return -EINVAL;
f = fdget(fd); f = fdget(fd);
...@@ -1975,6 +2197,18 @@ SYSCALL_DEFINE6(io_uring_enter, unsigned int, fd, u32, to_submit, ...@@ -1975,6 +2197,18 @@ SYSCALL_DEFINE6(io_uring_enter, unsigned int, fd, u32, to_submit,
if (!percpu_ref_tryget(&ctx->refs)) if (!percpu_ref_tryget(&ctx->refs))
goto out_fput; goto out_fput;
/*
* For SQ polling, the thread will do all submissions and completions.
* Just return the requested submit count, and wake the thread if
* we were asked to.
*/
if (ctx->flags & IORING_SETUP_SQPOLL) {
if (flags & IORING_ENTER_SQ_WAKEUP)
wake_up(&ctx->sqo_wait);
submitted = to_submit;
goto out_ctx;
}
ret = 0; ret = 0;
if (to_submit) { if (to_submit) {
to_submit = min(to_submit, ctx->sq_entries); to_submit = min(to_submit, ctx->sq_entries);
...@@ -2156,7 +2390,7 @@ static int io_uring_create(unsigned entries, struct io_uring_params *p) ...@@ -2156,7 +2390,7 @@ static int io_uring_create(unsigned entries, struct io_uring_params *p)
if (ret) if (ret)
goto err; goto err;
ret = io_sq_offload_start(ctx); ret = io_sq_offload_start(ctx, p);
if (ret) if (ret)
goto err; goto err;
...@@ -2204,7 +2438,8 @@ static long io_uring_setup(u32 entries, struct io_uring_params __user *params) ...@@ -2204,7 +2438,8 @@ static long io_uring_setup(u32 entries, struct io_uring_params __user *params)
return -EINVAL; return -EINVAL;
} }
if (p.flags & ~IORING_SETUP_IOPOLL) if (p.flags & ~(IORING_SETUP_IOPOLL | IORING_SETUP_SQPOLL |
IORING_SETUP_SQ_AFF))
return -EINVAL; return -EINVAL;
ret = io_uring_create(entries, &p); ret = io_uring_create(entries, &p);
......
...@@ -42,6 +42,8 @@ struct io_uring_sqe { ...@@ -42,6 +42,8 @@ struct io_uring_sqe {
* io_uring_setup() flags * io_uring_setup() flags
*/ */
#define IORING_SETUP_IOPOLL (1U << 0) /* io_context is polled */ #define IORING_SETUP_IOPOLL (1U << 0) /* io_context is polled */
#define IORING_SETUP_SQPOLL (1U << 1) /* SQ poll thread */
#define IORING_SETUP_SQ_AFF (1U << 2) /* sq_thread_cpu is valid */
#define IORING_OP_NOP 0 #define IORING_OP_NOP 0
#define IORING_OP_READV 1 #define IORING_OP_READV 1
...@@ -86,6 +88,11 @@ struct io_sqring_offsets { ...@@ -86,6 +88,11 @@ struct io_sqring_offsets {
__u64 resv2; __u64 resv2;
}; };
/*
* sq_ring->flags
*/
#define IORING_SQ_NEED_WAKEUP (1U << 0) /* needs io_uring_enter wakeup */
struct io_cqring_offsets { struct io_cqring_offsets {
__u32 head; __u32 head;
__u32 tail; __u32 tail;
...@@ -100,6 +107,7 @@ struct io_cqring_offsets { ...@@ -100,6 +107,7 @@ struct io_cqring_offsets {
* io_uring_enter(2) flags * io_uring_enter(2) flags
*/ */
#define IORING_ENTER_GETEVENTS (1U << 0) #define IORING_ENTER_GETEVENTS (1U << 0)
#define IORING_ENTER_SQ_WAKEUP (1U << 1)
/* /*
* Passed in for io_uring_setup(2). Copied back with updated info on success * Passed in for io_uring_setup(2). Copied back with updated info on success
...@@ -108,7 +116,9 @@ struct io_uring_params { ...@@ -108,7 +116,9 @@ struct io_uring_params {
__u32 sq_entries; __u32 sq_entries;
__u32 cq_entries; __u32 cq_entries;
__u32 flags; __u32 flags;
__u32 resv[7]; __u32 sq_thread_cpu;
__u32 sq_thread_idle;
__u32 resv[5];
struct io_sqring_offsets sq_off; struct io_sqring_offsets sq_off;
struct io_cqring_offsets cq_off; struct io_cqring_offsets cq_off;
}; };
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册