提交 ef286499 编写于 作者: X Xiaoguang Wang 提交者: Caspar Zhang

alinux: io_uring: add percpu io sq thread support

task #26578122

Currently we can create multiple io_uring instances which all have SQPOLL
enabled and make them bound to same cpu core by setting sq_thread_cpu argument,
but sometimes this isn't efficient. Imagine such extreme case, create two io
uring instances, which both have SQPOLL enabled and are bound to same cpu core.
One instance submits io per 500ms, another instance submits io continually,
then obviously the 1st instance still always contend for cpu resource, which
will impact 2nd instance.

To fix this issue, add a new flag IORING_SETUP_SQPOLL_PERCPU, when both
IORING_SETUP_SQ_AFF and IORING_SETUP_SQPOLL_PERCPU are enabled, we create a
percpu io sq_thread to handle multiple io_uring instances' io requests with
round-robin strategy, the improvements are very big, see below:

IOPS:
  No of instances       1     2     4     8     16     32
  kernel unpatched   589k  487k  303k  165k  85.8k  43.7k
  kernel patched     590k  593k  581k  538k   494k   406k

LATENCY(us):
  No of instances       1     2     4     8     16     32
  kernel unpatched    217   262   422   775  1488    2917
  kernel patched      216   215   219   237   258     313

Link: https://lore.kernel.org/io-uring/20200520115648.6140-1-xiaoguang.wang@linux.alibaba.com/Reviewed-by: NJiufei Xue <jiufei.xue@linux.alibaba.com>
Reviewed-by: NJoseph Qi <joseph.qi@linux.alibaba.com>
Signed-off-by: NXiaoguang Wang <xiaoguang.wang@linux.alibaba.com>
上级 b21ffd2e
......@@ -259,7 +259,13 @@ struct io_ring_ctx {
struct io_wq *io_wq;
struct task_struct *sqo_thread; /* if using sq thread polling */
struct mm_struct *sqo_mm;
wait_queue_head_t sqo_wait;
wait_queue_head_t *sqo_wait;
wait_queue_head_t __sqo_wait;
/* Used for percpu io sq thread */
int submit_status;
int sq_thread_cpu;
struct list_head node;
/*
* If used, fixed file set. Writers must ensure that ->refs is dead,
......@@ -330,6 +336,17 @@ struct io_ring_ctx {
struct work_struct exit_work;
};
struct sq_thread_percpu {
struct list_head ctx_list;
struct mutex lock;
wait_queue_head_t sqo_wait;
struct task_struct *sqo_thread;
struct completion sq_thread_comp;
unsigned int sq_thread_idle;
};
static struct sq_thread_percpu __percpu *percpu_threads;
/*
* First field must be the file pointer in all the
* iocb unions! See also 'struct kiocb' in <linux/fs.h>
......@@ -981,9 +998,11 @@ static struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p)
goto err;
ctx->flags = p->flags;
init_waitqueue_head(&ctx->sqo_wait);
init_waitqueue_head(&ctx->__sqo_wait);
ctx->sqo_wait = &ctx->__sqo_wait;
init_waitqueue_head(&ctx->cq_wait);
INIT_LIST_HEAD(&ctx->cq_overflow_list);
INIT_LIST_HEAD(&ctx->node);
init_completion(&ctx->ref_comp);
init_completion(&ctx->sq_thread_comp);
idr_init(&ctx->io_buffer_idr);
......@@ -1210,8 +1229,8 @@ static void io_cqring_ev_posted(struct io_ring_ctx *ctx)
{
if (waitqueue_active(&ctx->wait))
wake_up(&ctx->wait);
if (waitqueue_active(&ctx->sqo_wait))
wake_up(&ctx->sqo_wait);
if (waitqueue_active(ctx->sqo_wait))
wake_up(ctx->sqo_wait);
if (io_should_trigger_evfd(ctx))
eventfd_signal(ctx->cq_ev_fd, 1);
}
......@@ -2034,8 +2053,8 @@ static void io_iopoll_req_issued(struct io_kiocb *req)
list_add_tail(&req->list, &ctx->poll_list);
if ((ctx->flags & IORING_SETUP_SQPOLL) &&
wq_has_sleeper(&ctx->sqo_wait))
wake_up(&ctx->sqo_wait);
wq_has_sleeper(ctx->sqo_wait))
wake_up(ctx->sqo_wait);
}
static void __io_state_file_put(struct io_submit_state *state)
......@@ -6077,7 +6096,7 @@ static int io_sq_thread(void *data)
continue;
}
prepare_to_wait(&ctx->sqo_wait, &wait,
prepare_to_wait(ctx->sqo_wait, &wait,
TASK_INTERRUPTIBLE);
/*
* While doing polled IO, before going to sleep, we need
......@@ -6088,7 +6107,7 @@ static int io_sq_thread(void *data)
*/
if ((ctx->flags & IORING_SETUP_IOPOLL) &&
!list_empty_careful(&ctx->poll_list)) {
finish_wait(&ctx->sqo_wait, &wait);
finish_wait(ctx->sqo_wait, &wait);
continue;
}
......@@ -6097,25 +6116,25 @@ static int io_sq_thread(void *data)
to_submit = io_sqring_entries(ctx);
if (!to_submit || ret == -EBUSY) {
if (kthread_should_park()) {
finish_wait(&ctx->sqo_wait, &wait);
finish_wait(ctx->sqo_wait, &wait);
break;
}
if (current->task_works) {
task_work_run();
finish_wait(&ctx->sqo_wait, &wait);
finish_wait(ctx->sqo_wait, &wait);
io_ring_clear_wakeup_flag(ctx);
continue;
}
if (signal_pending(current))
flush_signals(current);
schedule();
finish_wait(&ctx->sqo_wait, &wait);
finish_wait(ctx->sqo_wait, &wait);
io_ring_clear_wakeup_flag(ctx);
ret = 0;
continue;
}
finish_wait(&ctx->sqo_wait, &wait);
finish_wait(ctx->sqo_wait, &wait);
io_ring_clear_wakeup_flag(ctx);
}
......@@ -6139,6 +6158,147 @@ static int io_sq_thread(void *data)
return 0;
}
static int process_ctx(struct sq_thread_percpu *t, struct io_ring_ctx *ctx)
{
int ret = 0;
unsigned int to_submit;
struct io_ring_ctx *ctx2;
list_for_each_entry(ctx2, &t->ctx_list, node) {
if (!list_empty(&ctx2->poll_list)) {
unsigned int nr_events = 0;
mutex_lock(&ctx2->uring_lock);
if (!list_empty(&ctx2->poll_list))
io_iopoll_getevents(ctx2, &nr_events, 0);
mutex_unlock(&ctx2->uring_lock);
}
}
to_submit = io_sqring_entries(ctx);
if (to_submit) {
mutex_lock(&ctx->uring_lock);
if (likely(!percpu_ref_is_dying(&ctx->refs)))
ret = io_submit_sqes(ctx, to_submit, NULL, -1);
mutex_unlock(&ctx->uring_lock);
}
return ret;
}
static int io_sq_thread_percpu(void *data)
{
struct sq_thread_percpu *t = data;
struct io_ring_ctx *ctx;
const struct cred *saved_creds, *cur_creds, *old_creds;
mm_segment_t old_fs;
DEFINE_WAIT(wait);
unsigned long timeout;
int iters = 0;
complete(&t->sq_thread_comp);
old_fs = get_fs();
set_fs(USER_DS);
timeout = jiffies + t->sq_thread_idle;
saved_creds = cur_creds = NULL;
while (!kthread_should_park()) {
bool needs_run, needs_wait;
unsigned int to_submit;
mutex_lock(&t->lock);
again:
needs_run = false;
list_for_each_entry(ctx, &t->ctx_list, node) {
if (cur_creds != ctx->creds) {
old_creds = override_creds(ctx->creds);
cur_creds = ctx->creds;
if (saved_creds)
put_cred(old_creds);
else
saved_creds = old_creds;
}
ctx->submit_status = process_ctx(t, ctx);
if (!needs_run)
to_submit = io_sqring_entries(ctx);
if (!needs_run &&
((to_submit && ctx->submit_status != -EBUSY) ||
!list_empty(&ctx->poll_list)))
needs_run = true;
}
if (needs_run && (++iters & 7)) {
if (current->task_works)
task_work_run();
timeout = jiffies + t->sq_thread_idle;
goto again;
}
mutex_unlock(&t->lock);
if (needs_run || !time_after(jiffies, timeout)) {
if (current->task_works)
task_work_run();
if (need_resched()) {
io_sq_thread_drop_mm(ctx);
cond_resched();
}
if (needs_run)
timeout = jiffies + t->sq_thread_idle;
continue;
}
needs_wait = true;
prepare_to_wait(&t->sqo_wait, &wait, TASK_INTERRUPTIBLE);
mutex_lock(&t->lock);
list_for_each_entry(ctx, &t->ctx_list, node) {
if ((ctx->flags & IORING_SETUP_IOPOLL) &&
!list_empty_careful(&ctx->poll_list)) {
needs_wait = false;
break;
}
to_submit = io_sqring_entries(ctx);
if (to_submit && ctx->submit_status != -EBUSY) {
needs_wait = false;
break;
}
}
if (needs_wait) {
list_for_each_entry(ctx, &t->ctx_list, node)
io_ring_set_wakeup_flag(ctx);
}
mutex_unlock(&t->lock);
if (needs_wait) {
if (current->task_works)
task_work_run();
io_sq_thread_drop_mm(ctx);
if (kthread_should_park()) {
finish_wait(&t->sqo_wait, &wait);
break;
}
schedule();
mutex_lock(&t->lock);
list_for_each_entry(ctx, &t->ctx_list, node)
io_ring_clear_wakeup_flag(ctx);
mutex_unlock(&t->lock);
finish_wait(&t->sqo_wait, &wait);
} else
finish_wait(&t->sqo_wait, &wait);
timeout = jiffies + t->sq_thread_idle;
}
if (current->task_works)
task_work_run();
set_fs(old_fs);
io_sq_thread_drop_mm(ctx);
if (saved_creds)
revert_creds(saved_creds);
kthread_parkme();
return 0;
}
struct io_wait_queue {
struct wait_queue_entry wq;
struct io_ring_ctx *ctx;
......@@ -6300,18 +6460,25 @@ static int io_sqe_files_unregister(struct io_ring_ctx *ctx)
return 0;
}
static void destroy_sq_thread_percpu(struct io_ring_ctx *ctx, int cpu);
static void io_sq_thread_stop(struct io_ring_ctx *ctx)
{
if (ctx->sqo_thread) {
wait_for_completion(&ctx->sq_thread_comp);
/*
* The park is a bit of a work-around, without it we get
* warning spews on shutdown with SQPOLL set and affinity
* set to a single CPU.
*/
kthread_park(ctx->sqo_thread);
kthread_stop(ctx->sqo_thread);
ctx->sqo_thread = NULL;
if ((ctx->flags & IORING_SETUP_SQ_AFF) &&
(ctx->flags & IORING_SETUP_SQPOLL_PERCPU) && percpu_threads) {
destroy_sq_thread_percpu(ctx, ctx->sq_thread_cpu);
} else {
wait_for_completion(&ctx->sq_thread_comp);
/*
* The park is a bit of a work-around, without it we get
* warning spews on shutdown with SQPOLL set and affinity
* set to a single CPU.
*/
kthread_park(ctx->sqo_thread);
kthread_stop(ctx->sqo_thread);
ctx->sqo_thread = NULL;
}
}
}
......@@ -6926,6 +7093,54 @@ static int io_init_wq_offload(struct io_ring_ctx *ctx,
return ret;
}
static void create_sq_thread_percpu(struct io_ring_ctx *ctx, int cpu)
{
struct sq_thread_percpu *t;
t = per_cpu_ptr(percpu_threads, cpu);
mutex_lock(&t->lock);
if (!t->sqo_thread) {
t->sqo_thread = kthread_create_on_cpu(io_sq_thread_percpu, t,
cpu, "io_uring-sq-percpu");
if (IS_ERR(t->sqo_thread)) {
ctx->sqo_thread = t->sqo_thread;
t->sqo_thread = NULL;
mutex_unlock(&t->lock);
return;
}
}
if (t->sq_thread_idle < ctx->sq_thread_idle)
t->sq_thread_idle = ctx->sq_thread_idle;
ctx->sqo_wait = &t->sqo_wait;
ctx->sq_thread_cpu = cpu;
list_add_tail(&ctx->node, &t->ctx_list);
ctx->sqo_thread = t->sqo_thread;
mutex_unlock(&t->lock);
}
static void destroy_sq_thread_percpu(struct io_ring_ctx *ctx, int cpu)
{
struct sq_thread_percpu *t;
struct task_struct *sqo_thread = NULL;
t = per_cpu_ptr(percpu_threads, cpu);
mutex_lock(&t->lock);
list_del(&ctx->node);
if (list_empty(&t->ctx_list)) {
sqo_thread = t->sqo_thread;
t->sqo_thread = NULL;
}
mutex_unlock(&t->lock);
if (sqo_thread) {
wait_for_completion(&t->sq_thread_comp);
kthread_park(sqo_thread);
kthread_stop(sqo_thread);
}
}
static int io_sq_offload_start(struct io_ring_ctx *ctx,
struct io_uring_params *p)
{
......@@ -6952,9 +7167,11 @@ static int io_sq_offload_start(struct io_ring_ctx *ctx,
if (!cpu_online(cpu))
goto err;
ctx->sqo_thread = kthread_create_on_cpu(io_sq_thread,
ctx, cpu,
"io_uring-sq");
if ((p->flags & IORING_SETUP_SQPOLL_PERCPU) && percpu_threads)
create_sq_thread_percpu(ctx, cpu);
else
ctx->sqo_thread = kthread_create_on_cpu(io_sq_thread, ctx, cpu,
"io_uring-sq");
} else {
ctx->sqo_thread = kthread_create(io_sq_thread, ctx,
"io_uring-sq");
......@@ -7641,7 +7858,7 @@ SYSCALL_DEFINE6(io_uring_enter, unsigned int, fd, u32, to_submit,
if (!list_empty_careful(&ctx->cq_overflow_list))
io_cqring_overflow_flush(ctx, false);
if (flags & IORING_ENTER_SQ_WAKEUP)
wake_up(&ctx->sqo_wait);
wake_up(ctx->sqo_wait);
submitted = to_submit;
} else if (to_submit) {
mutex_lock(&ctx->uring_lock);
......@@ -7999,7 +8216,8 @@ static long io_uring_setup(u32 entries, struct io_uring_params __user *params)
if (p.flags & ~(IORING_SETUP_IOPOLL | IORING_SETUP_SQPOLL |
IORING_SETUP_SQ_AFF | IORING_SETUP_CQSIZE |
IORING_SETUP_CLAMP | IORING_SETUP_ATTACH_WQ))
IORING_SETUP_CLAMP | IORING_SETUP_ATTACH_WQ |
IORING_SETUP_SQPOLL_PERCPU))
return -EINVAL;
return io_uring_create(entries, &p, params);
......@@ -8227,6 +8445,8 @@ SYSCALL_DEFINE4(io_uring_register, unsigned int, fd, unsigned int, opcode,
static int __init io_uring_init(void)
{
int cpu;
#define __BUILD_BUG_VERIFY_ELEMENT(stype, eoffset, etype, ename) do { \
BUILD_BUG_ON(offsetof(stype, ename) != eoffset); \
BUILD_BUG_ON(sizeof(etype) != sizeof_field(stype, ename)); \
......@@ -8266,6 +8486,28 @@ static int __init io_uring_init(void)
BUILD_BUG_ON(ARRAY_SIZE(io_op_defs) != IORING_OP_LAST);
BUILD_BUG_ON(__REQ_F_LAST_BIT >= 8 * sizeof(int));
req_cachep = KMEM_CACHE(io_kiocb, SLAB_HWCACHE_ALIGN | SLAB_PANIC);
percpu_threads = alloc_percpu(struct sq_thread_percpu);
/*
* Don't take this as fatal error, if this happens, we will just
* make io sq thread not go through io_sq_thread percpu version.
*/
if (!percpu_threads)
return 0;
for_each_possible_cpu(cpu) {
struct sq_thread_percpu *t;
t = per_cpu_ptr(percpu_threads, cpu);
INIT_LIST_HEAD(&t->ctx_list);
init_waitqueue_head(&t->sqo_wait);
init_completion(&t->sq_thread_comp);
mutex_init(&t->lock);
t->sqo_thread = NULL;
t->sq_thread_idle = 0;
}
return 0;
};
__initcall(io_uring_init);
......@@ -94,6 +94,7 @@ enum {
#define IORING_SETUP_CQSIZE (1U << 3) /* app defines CQ size */
#define IORING_SETUP_CLAMP (1U << 4) /* clamp SQ/CQ ring sizes */
#define IORING_SETUP_ATTACH_WQ (1U << 5) /* attach to existing wq */
#define IORING_SETUP_SQPOLL_PERCPU (1U << 31) /* use percpu SQ poll thread */
enum {
IORING_OP_NOP,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册