提交 d1ce9eb0 编写于 作者: J Jens Axboe 提交者: Jialin Zhang

io_uring: import 5.15-stable io_uring

stable inclusion
from stable-v5.10.162
commit 788d0824269bef539fe31a785b1517882eafed93
category: bugfix
bugzilla: https://gitee.com/src-openeuler/kernel/issues/I6BTWC
CVE: CVE-2023-0240

Reference: https://git.kernel.org/pub/scm/linux/kernel/git/stable/linux.git/commit/?h=v5.10.167&id=788d0824269bef539fe31a785b1517882eafed93

--------------------------------

No upstream commit exists.

This imports the io_uring codebase from 5.15.85, wholesale. Changes
from that code base:

- Drop IOCB_ALLOC_CACHE, we don't have that in 5.10.
- Drop MKDIRAT/SYMLINKAT/LINKAT. Would require further VFS backports,
  and we don't support these in 5.10 to begin with.
- sock_from_file() old style calling convention.
- Use compat_get_bitmap() only for CONFIG_COMPAT=y
Signed-off-by: NJens Axboe <axboe@kernel.dk>
Signed-off-by: NGreg Kroah-Hartman <gregkh@linuxfoundation.org>
Signed-off-by: NLi Lingfeng <lilingfeng3@huawei.com>
Reviewed-by: NZhang Yi <yi.zhang@huawei.com>
Reviewed-by: NWang Weiyang <wangweiyang2@huawei.com>
Signed-off-by: NJialin Zhang <zhangjialin11@huawei.com>
上级 9bd6f12d
...@@ -1129,7 +1129,7 @@ export MODORDER := $(extmod-prefix)modules.order ...@@ -1129,7 +1129,7 @@ export MODORDER := $(extmod-prefix)modules.order
export MODULES_NSDEPS := $(extmod-prefix)modules.nsdeps export MODULES_NSDEPS := $(extmod-prefix)modules.nsdeps
ifeq ($(KBUILD_EXTMOD),) ifeq ($(KBUILD_EXTMOD),)
core-y += kernel/ certs/ mm/ fs/ ipc/ security/ crypto/ block/ core-y += kernel/ certs/ mm/ fs/ ipc/ security/ crypto/ block/ io_uring/
vmlinux-dirs := $(patsubst %/,%,$(filter %/, \ vmlinux-dirs := $(patsubst %/,%,$(filter %/, \
$(core-y) $(core-m) $(drivers-y) $(drivers-m) \ $(core-y) $(core-m) $(drivers-y) $(drivers-m) \
......
...@@ -32,8 +32,6 @@ obj-$(CONFIG_TIMERFD) += timerfd.o ...@@ -32,8 +32,6 @@ obj-$(CONFIG_TIMERFD) += timerfd.o
obj-$(CONFIG_EVENTFD) += eventfd.o obj-$(CONFIG_EVENTFD) += eventfd.o
obj-$(CONFIG_USERFAULTFD) += userfaultfd.o obj-$(CONFIG_USERFAULTFD) += userfaultfd.o
obj-$(CONFIG_AIO) += aio.o obj-$(CONFIG_AIO) += aio.o
obj-$(CONFIG_IO_URING) += io_uring.o
obj-$(CONFIG_IO_WQ) += io-wq.o
obj-$(CONFIG_FS_DAX) += dax.o obj-$(CONFIG_FS_DAX) += dax.o
obj-$(CONFIG_FS_ENCRYPTION) += crypto/ obj-$(CONFIG_FS_ENCRYPTION) += crypto/
obj-$(CONFIG_FS_VERITY) += verity/ obj-$(CONFIG_FS_VERITY) += verity/
......
...@@ -5,50 +5,20 @@ ...@@ -5,50 +5,20 @@
#include <linux/sched.h> #include <linux/sched.h>
#include <linux/xarray.h> #include <linux/xarray.h>
struct io_identity {
struct files_struct *files;
struct mm_struct *mm;
#ifdef CONFIG_BLK_CGROUP
struct cgroup_subsys_state *blkcg_css;
#endif
const struct cred *creds;
struct nsproxy *nsproxy;
struct fs_struct *fs;
unsigned long fsize;
#ifdef CONFIG_AUDIT
kuid_t loginuid;
unsigned int sessionid;
#endif
refcount_t count;
};
struct io_uring_task {
/* submission side */
struct xarray xa;
struct wait_queue_head wait;
struct file *last;
struct percpu_counter inflight;
struct io_identity __identity;
struct io_identity *identity;
atomic_t in_idle;
bool sqpoll;
};
#if defined(CONFIG_IO_URING) #if defined(CONFIG_IO_URING)
struct sock *io_uring_get_socket(struct file *file); struct sock *io_uring_get_socket(struct file *file);
void __io_uring_task_cancel(void); void __io_uring_cancel(bool cancel_all);
void __io_uring_files_cancel(struct files_struct *files);
void __io_uring_free(struct task_struct *tsk); void __io_uring_free(struct task_struct *tsk);
static inline void io_uring_task_cancel(void) static inline void io_uring_files_cancel(void)
{ {
if (current->io_uring && !xa_empty(&current->io_uring->xa)) if (current->io_uring)
__io_uring_task_cancel(); __io_uring_cancel(false);
} }
static inline void io_uring_files_cancel(struct files_struct *files) static inline void io_uring_task_cancel(void)
{ {
if (current->io_uring && !xa_empty(&current->io_uring->xa)) if (current->io_uring)
__io_uring_files_cancel(files); __io_uring_cancel(true);
} }
static inline void io_uring_free(struct task_struct *tsk) static inline void io_uring_free(struct task_struct *tsk)
{ {
...@@ -63,7 +33,7 @@ static inline struct sock *io_uring_get_socket(struct file *file) ...@@ -63,7 +33,7 @@ static inline struct sock *io_uring_get_socket(struct file *file)
static inline void io_uring_task_cancel(void) static inline void io_uring_task_cancel(void)
{ {
} }
static inline void io_uring_files_cancel(struct files_struct *files) static inline void io_uring_files_cancel(void)
{ {
} }
static inline void io_uring_free(struct task_struct *tsk) static inline void io_uring_free(struct task_struct *tsk)
......
...@@ -925,6 +925,9 @@ struct task_struct { ...@@ -925,6 +925,9 @@ struct task_struct {
/* CLONE_CHILD_CLEARTID: */ /* CLONE_CHILD_CLEARTID: */
int __user *clear_child_tid; int __user *clear_child_tid;
/* PF_IO_WORKER */
void *pf_io_worker;
u64 utime; u64 utime;
u64 stime; u64 stime;
#ifdef CONFIG_ARCH_HAS_SCALED_CPUTIME #ifdef CONFIG_ARCH_HAS_SCALED_CPUTIME
......
...@@ -341,7 +341,7 @@ asmlinkage long sys_io_uring_setup(u32 entries, ...@@ -341,7 +341,7 @@ asmlinkage long sys_io_uring_setup(u32 entries,
struct io_uring_params __user *p); struct io_uring_params __user *p);
asmlinkage long sys_io_uring_enter(unsigned int fd, u32 to_submit, asmlinkage long sys_io_uring_enter(unsigned int fd, u32 to_submit,
u32 min_complete, u32 flags, u32 min_complete, u32 flags,
const sigset_t __user *sig, size_t sigsz); const void __user *argp, size_t argsz);
asmlinkage long sys_io_uring_register(unsigned int fd, unsigned int op, asmlinkage long sys_io_uring_register(unsigned int fd, unsigned int op,
void __user *arg, unsigned int nr_args); void __user *arg, unsigned int nr_args);
......
...@@ -12,11 +12,11 @@ struct io_wq_work; ...@@ -12,11 +12,11 @@ struct io_wq_work;
/** /**
* io_uring_create - called after a new io_uring context was prepared * io_uring_create - called after a new io_uring context was prepared
* *
* @fd: corresponding file descriptor * @fd: corresponding file descriptor
* @ctx: pointer to a ring context structure * @ctx: pointer to a ring context structure
* @sq_entries: actual SQ size * @sq_entries: actual SQ size
* @cq_entries: actual CQ size * @cq_entries: actual CQ size
* @flags: SQ ring flags, provided to io_uring_setup(2) * @flags: SQ ring flags, provided to io_uring_setup(2)
* *
* Allows to trace io_uring creation and provide pointer to a context, that can * Allows to trace io_uring creation and provide pointer to a context, that can
* be used later to find correlated events. * be used later to find correlated events.
...@@ -49,15 +49,15 @@ TRACE_EVENT(io_uring_create, ...@@ -49,15 +49,15 @@ TRACE_EVENT(io_uring_create,
); );
/** /**
* io_uring_register - called after a buffer/file/eventfd was succesfully * io_uring_register - called after a buffer/file/eventfd was successfully
* registered for a ring * registered for a ring
* *
* @ctx: pointer to a ring context structure * @ctx: pointer to a ring context structure
* @opcode: describes which operation to perform * @opcode: describes which operation to perform
* @nr_user_files: number of registered files * @nr_user_files: number of registered files
* @nr_user_bufs: number of registered buffers * @nr_user_bufs: number of registered buffers
* @cq_ev_fd: whether eventfs registered or not * @cq_ev_fd: whether eventfs registered or not
* @ret: return code * @ret: return code
* *
* Allows to trace fixed files/buffers/eventfds, that could be registered to * Allows to trace fixed files/buffers/eventfds, that could be registered to
* avoid an overhead of getting references to them for every operation. This * avoid an overhead of getting references to them for every operation. This
...@@ -142,16 +142,16 @@ TRACE_EVENT(io_uring_queue_async_work, ...@@ -142,16 +142,16 @@ TRACE_EVENT(io_uring_queue_async_work,
TP_ARGS(ctx, rw, req, work, flags), TP_ARGS(ctx, rw, req, work, flags),
TP_STRUCT__entry ( TP_STRUCT__entry (
__field( void *, ctx ) __field( void *, ctx )
__field( int, rw ) __field( int, rw )
__field( void *, req ) __field( void *, req )
__field( struct io_wq_work *, work ) __field( struct io_wq_work *, work )
__field( unsigned int, flags ) __field( unsigned int, flags )
), ),
TP_fast_assign( TP_fast_assign(
__entry->ctx = ctx; __entry->ctx = ctx;
__entry->rw = rw; __entry->rw = rw;
__entry->req = req; __entry->req = req;
__entry->work = work; __entry->work = work;
__entry->flags = flags; __entry->flags = flags;
...@@ -196,10 +196,10 @@ TRACE_EVENT(io_uring_defer, ...@@ -196,10 +196,10 @@ TRACE_EVENT(io_uring_defer,
/** /**
* io_uring_link - called before the io_uring request added into link_list of * io_uring_link - called before the io_uring request added into link_list of
* another request * another request
* *
* @ctx: pointer to a ring context structure * @ctx: pointer to a ring context structure
* @req: pointer to a linked request * @req: pointer to a linked request
* @target_req: pointer to a previous request, that would contain @req * @target_req: pointer to a previous request, that would contain @req
* *
* Allows to track linked requests, to understand dependencies between requests * Allows to track linked requests, to understand dependencies between requests
...@@ -212,8 +212,8 @@ TRACE_EVENT(io_uring_link, ...@@ -212,8 +212,8 @@ TRACE_EVENT(io_uring_link,
TP_ARGS(ctx, req, target_req), TP_ARGS(ctx, req, target_req),
TP_STRUCT__entry ( TP_STRUCT__entry (
__field( void *, ctx ) __field( void *, ctx )
__field( void *, req ) __field( void *, req )
__field( void *, target_req ) __field( void *, target_req )
), ),
...@@ -244,7 +244,7 @@ TRACE_EVENT(io_uring_cqring_wait, ...@@ -244,7 +244,7 @@ TRACE_EVENT(io_uring_cqring_wait,
TP_ARGS(ctx, min_events), TP_ARGS(ctx, min_events),
TP_STRUCT__entry ( TP_STRUCT__entry (
__field( void *, ctx ) __field( void *, ctx )
__field( int, min_events ) __field( int, min_events )
), ),
...@@ -272,7 +272,7 @@ TRACE_EVENT(io_uring_fail_link, ...@@ -272,7 +272,7 @@ TRACE_EVENT(io_uring_fail_link,
TP_ARGS(req, link), TP_ARGS(req, link),
TP_STRUCT__entry ( TP_STRUCT__entry (
__field( void *, req ) __field( void *, req )
__field( void *, link ) __field( void *, link )
), ),
...@@ -290,38 +290,42 @@ TRACE_EVENT(io_uring_fail_link, ...@@ -290,38 +290,42 @@ TRACE_EVENT(io_uring_fail_link,
* @ctx: pointer to a ring context structure * @ctx: pointer to a ring context structure
* @user_data: user data associated with the request * @user_data: user data associated with the request
* @res: result of the request * @res: result of the request
* @cflags: completion flags
* *
*/ */
TRACE_EVENT(io_uring_complete, TRACE_EVENT(io_uring_complete,
TP_PROTO(void *ctx, u64 user_data, long res), TP_PROTO(void *ctx, u64 user_data, int res, unsigned cflags),
TP_ARGS(ctx, user_data, res), TP_ARGS(ctx, user_data, res, cflags),
TP_STRUCT__entry ( TP_STRUCT__entry (
__field( void *, ctx ) __field( void *, ctx )
__field( u64, user_data ) __field( u64, user_data )
__field( long, res ) __field( int, res )
__field( unsigned, cflags )
), ),
TP_fast_assign( TP_fast_assign(
__entry->ctx = ctx; __entry->ctx = ctx;
__entry->user_data = user_data; __entry->user_data = user_data;
__entry->res = res; __entry->res = res;
__entry->cflags = cflags;
), ),
TP_printk("ring %p, user_data 0x%llx, result %ld", TP_printk("ring %p, user_data 0x%llx, result %d, cflags %x",
__entry->ctx, (unsigned long long)__entry->user_data, __entry->ctx, (unsigned long long)__entry->user_data,
__entry->res) __entry->res, __entry->cflags)
); );
/** /**
* io_uring_submit_sqe - called before submitting one SQE * io_uring_submit_sqe - called before submitting one SQE
* *
* @ctx: pointer to a ring context structure * @ctx: pointer to a ring context structure
* @req: pointer to a submitted request
* @opcode: opcode of request * @opcode: opcode of request
* @user_data: user data associated with the request * @user_data: user data associated with the request
* @flags request flags
* @force_nonblock: whether a context blocking or not * @force_nonblock: whether a context blocking or not
* @sq_thread: true if sq_thread has submitted this SQE * @sq_thread: true if sq_thread has submitted this SQE
* *
...@@ -330,41 +334,60 @@ TRACE_EVENT(io_uring_complete, ...@@ -330,41 +334,60 @@ TRACE_EVENT(io_uring_complete,
*/ */
TRACE_EVENT(io_uring_submit_sqe, TRACE_EVENT(io_uring_submit_sqe,
TP_PROTO(void *ctx, u8 opcode, u64 user_data, bool force_nonblock, TP_PROTO(void *ctx, void *req, u8 opcode, u64 user_data, u32 flags,
bool sq_thread), bool force_nonblock, bool sq_thread),
TP_ARGS(ctx, opcode, user_data, force_nonblock, sq_thread), TP_ARGS(ctx, req, opcode, user_data, flags, force_nonblock, sq_thread),
TP_STRUCT__entry ( TP_STRUCT__entry (
__field( void *, ctx ) __field( void *, ctx )
__field( void *, req )
__field( u8, opcode ) __field( u8, opcode )
__field( u64, user_data ) __field( u64, user_data )
__field( u32, flags )
__field( bool, force_nonblock ) __field( bool, force_nonblock )
__field( bool, sq_thread ) __field( bool, sq_thread )
), ),
TP_fast_assign( TP_fast_assign(
__entry->ctx = ctx; __entry->ctx = ctx;
__entry->req = req;
__entry->opcode = opcode; __entry->opcode = opcode;
__entry->user_data = user_data; __entry->user_data = user_data;
__entry->flags = flags;
__entry->force_nonblock = force_nonblock; __entry->force_nonblock = force_nonblock;
__entry->sq_thread = sq_thread; __entry->sq_thread = sq_thread;
), ),
TP_printk("ring %p, op %d, data 0x%llx, non block %d, sq_thread %d", TP_printk("ring %p, req %p, op %d, data 0x%llx, flags %u, "
__entry->ctx, __entry->opcode, "non block %d, sq_thread %d", __entry->ctx, __entry->req,
(unsigned long long) __entry->user_data, __entry->opcode, (unsigned long long)__entry->user_data,
__entry->force_nonblock, __entry->sq_thread) __entry->flags, __entry->force_nonblock, __entry->sq_thread)
); );
/*
* io_uring_poll_arm - called after arming a poll wait if successful
*
* @ctx: pointer to a ring context structure
* @req: pointer to the armed request
* @opcode: opcode of request
* @user_data: user data associated with the request
* @mask: request poll events mask
* @events: registered events of interest
*
* Allows to track which fds are waiting for and what are the events of
* interest.
*/
TRACE_EVENT(io_uring_poll_arm, TRACE_EVENT(io_uring_poll_arm,
TP_PROTO(void *ctx, u8 opcode, u64 user_data, int mask, int events), TP_PROTO(void *ctx, void *req, u8 opcode, u64 user_data,
int mask, int events),
TP_ARGS(ctx, opcode, user_data, mask, events), TP_ARGS(ctx, req, opcode, user_data, mask, events),
TP_STRUCT__entry ( TP_STRUCT__entry (
__field( void *, ctx ) __field( void *, ctx )
__field( void *, req )
__field( u8, opcode ) __field( u8, opcode )
__field( u64, user_data ) __field( u64, user_data )
__field( int, mask ) __field( int, mask )
...@@ -373,16 +396,17 @@ TRACE_EVENT(io_uring_poll_arm, ...@@ -373,16 +396,17 @@ TRACE_EVENT(io_uring_poll_arm,
TP_fast_assign( TP_fast_assign(
__entry->ctx = ctx; __entry->ctx = ctx;
__entry->req = req;
__entry->opcode = opcode; __entry->opcode = opcode;
__entry->user_data = user_data; __entry->user_data = user_data;
__entry->mask = mask; __entry->mask = mask;
__entry->events = events; __entry->events = events;
), ),
TP_printk("ring %p, op %d, data 0x%llx, mask 0x%x, events 0x%x", TP_printk("ring %p, req %p, op %d, data 0x%llx, mask 0x%x, events 0x%x",
__entry->ctx, __entry->opcode, __entry->ctx, __entry->req, __entry->opcode,
(unsigned long long) __entry->user_data, (unsigned long long) __entry->user_data,
__entry->mask, __entry->events) __entry->mask, __entry->events)
); );
TRACE_EVENT(io_uring_poll_wake, TRACE_EVENT(io_uring_poll_wake,
...@@ -437,27 +461,40 @@ TRACE_EVENT(io_uring_task_add, ...@@ -437,27 +461,40 @@ TRACE_EVENT(io_uring_task_add,
__entry->mask) __entry->mask)
); );
/*
* io_uring_task_run - called when task_work_run() executes the poll events
* notification callbacks
*
* @ctx: pointer to a ring context structure
* @req: pointer to the armed request
* @opcode: opcode of request
* @user_data: user data associated with the request
*
* Allows to track when notified poll events are processed
*/
TRACE_EVENT(io_uring_task_run, TRACE_EVENT(io_uring_task_run,
TP_PROTO(void *ctx, u8 opcode, u64 user_data), TP_PROTO(void *ctx, void *req, u8 opcode, u64 user_data),
TP_ARGS(ctx, opcode, user_data), TP_ARGS(ctx, req, opcode, user_data),
TP_STRUCT__entry ( TP_STRUCT__entry (
__field( void *, ctx ) __field( void *, ctx )
__field( void *, req )
__field( u8, opcode ) __field( u8, opcode )
__field( u64, user_data ) __field( u64, user_data )
), ),
TP_fast_assign( TP_fast_assign(
__entry->ctx = ctx; __entry->ctx = ctx;
__entry->req = req;
__entry->opcode = opcode; __entry->opcode = opcode;
__entry->user_data = user_data; __entry->user_data = user_data;
), ),
TP_printk("ring %p, op %d, data 0x%llx", TP_printk("ring %p, req %p, op %d, data 0x%llx",
__entry->ctx, __entry->opcode, __entry->ctx, __entry->req, __entry->opcode,
(unsigned long long) __entry->user_data) (unsigned long long) __entry->user_data)
); );
#endif /* _TRACE_IO_URING_H */ #endif /* _TRACE_IO_URING_H */
......
...@@ -42,23 +42,25 @@ struct io_uring_sqe { ...@@ -42,23 +42,25 @@ struct io_uring_sqe {
__u32 statx_flags; __u32 statx_flags;
__u32 fadvise_advice; __u32 fadvise_advice;
__u32 splice_flags; __u32 splice_flags;
__u32 rename_flags;
__u32 unlink_flags;
__u32 hardlink_flags;
}; };
__u64 user_data; /* data to be passed back at completion time */ __u64 user_data; /* data to be passed back at completion time */
/* pack this to avoid bogus arm OABI complaints */
union { union {
struct { /* index into fixed buffers, if used */
/* pack this to avoid bogus arm OABI complaints */ __u16 buf_index;
union { /* for grouped buffer selection */
/* index into fixed buffers, if used */ __u16 buf_group;
__u16 buf_index; } __attribute__((packed));
/* for grouped buffer selection */ /* personality to use, if used */
__u16 buf_group; __u16 personality;
} __attribute__((packed)); union {
/* personality to use, if used */ __s32 splice_fd_in;
__u16 personality; __u32 file_index;
__s32 splice_fd_in;
};
__u64 __pad2[3];
}; };
__u64 __pad2[2];
}; };
enum { enum {
...@@ -132,6 +134,9 @@ enum { ...@@ -132,6 +134,9 @@ enum {
IORING_OP_PROVIDE_BUFFERS, IORING_OP_PROVIDE_BUFFERS,
IORING_OP_REMOVE_BUFFERS, IORING_OP_REMOVE_BUFFERS,
IORING_OP_TEE, IORING_OP_TEE,
IORING_OP_SHUTDOWN,
IORING_OP_RENAMEAT,
IORING_OP_UNLINKAT,
/* this goes last, obviously */ /* this goes last, obviously */
IORING_OP_LAST, IORING_OP_LAST,
...@@ -145,14 +150,34 @@ enum { ...@@ -145,14 +150,34 @@ enum {
/* /*
* sqe->timeout_flags * sqe->timeout_flags
*/ */
#define IORING_TIMEOUT_ABS (1U << 0) #define IORING_TIMEOUT_ABS (1U << 0)
#define IORING_TIMEOUT_UPDATE (1U << 1)
#define IORING_TIMEOUT_BOOTTIME (1U << 2)
#define IORING_TIMEOUT_REALTIME (1U << 3)
#define IORING_LINK_TIMEOUT_UPDATE (1U << 4)
#define IORING_TIMEOUT_CLOCK_MASK (IORING_TIMEOUT_BOOTTIME | IORING_TIMEOUT_REALTIME)
#define IORING_TIMEOUT_UPDATE_MASK (IORING_TIMEOUT_UPDATE | IORING_LINK_TIMEOUT_UPDATE)
/* /*
* sqe->splice_flags * sqe->splice_flags
* extends splice(2) flags * extends splice(2) flags
*/ */
#define SPLICE_F_FD_IN_FIXED (1U << 31) /* the last bit of __u32 */ #define SPLICE_F_FD_IN_FIXED (1U << 31) /* the last bit of __u32 */
/*
* POLL_ADD flags. Note that since sqe->poll_events is the flag space, the
* command flags for POLL_ADD are stored in sqe->len.
*
* IORING_POLL_ADD_MULTI Multishot poll. Sets IORING_CQE_F_MORE if
* the poll handler will continue to report
* CQEs on behalf of the same SQE.
*
* IORING_POLL_UPDATE Update existing poll request, matching
* sqe->addr as the old user_data field.
*/
#define IORING_POLL_ADD_MULTI (1U << 0)
#define IORING_POLL_UPDATE_EVENTS (1U << 1)
#define IORING_POLL_UPDATE_USER_DATA (1U << 2)
/* /*
* IO completion data structure (Completion Queue Entry) * IO completion data structure (Completion Queue Entry)
*/ */
...@@ -166,8 +191,10 @@ struct io_uring_cqe { ...@@ -166,8 +191,10 @@ struct io_uring_cqe {
* cqe->flags * cqe->flags
* *
* IORING_CQE_F_BUFFER If set, the upper 16 bits are the buffer ID * IORING_CQE_F_BUFFER If set, the upper 16 bits are the buffer ID
* IORING_CQE_F_MORE If set, parent SQE will generate more CQE entries
*/ */
#define IORING_CQE_F_BUFFER (1U << 0) #define IORING_CQE_F_BUFFER (1U << 0)
#define IORING_CQE_F_MORE (1U << 1)
enum { enum {
IORING_CQE_BUFFER_SHIFT = 16, IORING_CQE_BUFFER_SHIFT = 16,
...@@ -226,6 +253,7 @@ struct io_cqring_offsets { ...@@ -226,6 +253,7 @@ struct io_cqring_offsets {
#define IORING_ENTER_GETEVENTS (1U << 0) #define IORING_ENTER_GETEVENTS (1U << 0)
#define IORING_ENTER_SQ_WAKEUP (1U << 1) #define IORING_ENTER_SQ_WAKEUP (1U << 1)
#define IORING_ENTER_SQ_WAIT (1U << 2) #define IORING_ENTER_SQ_WAIT (1U << 2)
#define IORING_ENTER_EXT_ARG (1U << 3)
/* /*
* Passed in for io_uring_setup(2). Copied back with updated info on success * Passed in for io_uring_setup(2). Copied back with updated info on success
...@@ -253,6 +281,10 @@ struct io_uring_params { ...@@ -253,6 +281,10 @@ struct io_uring_params {
#define IORING_FEAT_CUR_PERSONALITY (1U << 4) #define IORING_FEAT_CUR_PERSONALITY (1U << 4)
#define IORING_FEAT_FAST_POLL (1U << 5) #define IORING_FEAT_FAST_POLL (1U << 5)
#define IORING_FEAT_POLL_32BITS (1U << 6) #define IORING_FEAT_POLL_32BITS (1U << 6)
#define IORING_FEAT_SQPOLL_NONFIXED (1U << 7)
#define IORING_FEAT_EXT_ARG (1U << 8)
#define IORING_FEAT_NATIVE_WORKERS (1U << 9)
#define IORING_FEAT_RSRC_TAGS (1U << 10)
/* /*
* io_uring_register(2) opcodes and arguments * io_uring_register(2) opcodes and arguments
...@@ -272,16 +304,62 @@ enum { ...@@ -272,16 +304,62 @@ enum {
IORING_REGISTER_RESTRICTIONS = 11, IORING_REGISTER_RESTRICTIONS = 11,
IORING_REGISTER_ENABLE_RINGS = 12, IORING_REGISTER_ENABLE_RINGS = 12,
/* extended with tagging */
IORING_REGISTER_FILES2 = 13,
IORING_REGISTER_FILES_UPDATE2 = 14,
IORING_REGISTER_BUFFERS2 = 15,
IORING_REGISTER_BUFFERS_UPDATE = 16,
/* set/clear io-wq thread affinities */
IORING_REGISTER_IOWQ_AFF = 17,
IORING_UNREGISTER_IOWQ_AFF = 18,
/* set/get max number of io-wq workers */
IORING_REGISTER_IOWQ_MAX_WORKERS = 19,
/* this goes last */ /* this goes last */
IORING_REGISTER_LAST IORING_REGISTER_LAST
}; };
/* io-wq worker categories */
enum {
IO_WQ_BOUND,
IO_WQ_UNBOUND,
};
/* deprecated, see struct io_uring_rsrc_update */
struct io_uring_files_update { struct io_uring_files_update {
__u32 offset; __u32 offset;
__u32 resv; __u32 resv;
__aligned_u64 /* __s32 * */ fds; __aligned_u64 /* __s32 * */ fds;
}; };
struct io_uring_rsrc_register {
__u32 nr;
__u32 resv;
__u64 resv2;
__aligned_u64 data;
__aligned_u64 tags;
};
struct io_uring_rsrc_update {
__u32 offset;
__u32 resv;
__aligned_u64 data;
};
struct io_uring_rsrc_update2 {
__u32 offset;
__u32 resv;
__aligned_u64 data;
__aligned_u64 tags;
__u32 nr;
__u32 resv2;
};
/* Skip updating fd indexes set to this value in the fd table */
#define IORING_REGISTER_FILES_SKIP (-2)
#define IO_URING_OP_SUPPORTED (1U << 0) #define IO_URING_OP_SUPPORTED (1U << 0)
struct io_uring_probe_op { struct io_uring_probe_op {
...@@ -329,4 +407,11 @@ enum { ...@@ -329,4 +407,11 @@ enum {
IORING_RESTRICTION_LAST IORING_RESTRICTION_LAST
}; };
struct io_uring_getevents_arg {
__u64 sigmask;
__u32 sigmask_sz;
__u32 pad;
__u64 ts;
};
#endif #endif
# SPDX-License-Identifier: GPL-2.0
#
# Makefile for io_uring
obj-$(CONFIG_IO_URING) += io_uring.o
obj-$(CONFIG_IO_WQ) += io-wq.o
...@@ -9,19 +9,13 @@ ...@@ -9,19 +9,13 @@
#include <linux/init.h> #include <linux/init.h>
#include <linux/errno.h> #include <linux/errno.h>
#include <linux/sched/signal.h> #include <linux/sched/signal.h>
#include <linux/mm.h>
#include <linux/sched/mm.h>
#include <linux/percpu.h> #include <linux/percpu.h>
#include <linux/slab.h> #include <linux/slab.h>
#include <linux/kthread.h>
#include <linux/rculist_nulls.h> #include <linux/rculist_nulls.h>
#include <linux/fs_struct.h>
#include <linux/task_work.h>
#include <linux/blk-cgroup.h>
#include <linux/audit.h>
#include <linux/cpu.h> #include <linux/cpu.h>
#include <linux/tracehook.h>
#include <uapi/linux/io_uring.h>
#include "../kernel/sched/sched.h"
#include "io-wq.h" #include "io-wq.h"
#define WORKER_IDLE_TIMEOUT (5 * HZ) #define WORKER_IDLE_TIMEOUT (5 * HZ)
...@@ -30,18 +24,15 @@ enum { ...@@ -30,18 +24,15 @@ enum {
IO_WORKER_F_UP = 1, /* up and active */ IO_WORKER_F_UP = 1, /* up and active */
IO_WORKER_F_RUNNING = 2, /* account as running */ IO_WORKER_F_RUNNING = 2, /* account as running */
IO_WORKER_F_FREE = 4, /* worker on free list */ IO_WORKER_F_FREE = 4, /* worker on free list */
IO_WORKER_F_FIXED = 8, /* static idle worker */ IO_WORKER_F_BOUND = 8, /* is doing bounded work */
IO_WORKER_F_BOUND = 16, /* is doing bounded work */
}; };
enum { enum {
IO_WQ_BIT_EXIT = 0, /* wq exiting */ IO_WQ_BIT_EXIT = 0, /* wq exiting */
IO_WQ_BIT_CANCEL = 1, /* cancel work on list */
IO_WQ_BIT_ERROR = 2, /* error on setup */
}; };
enum { enum {
IO_WQE_FLAG_STALLED = 1, /* stalled on hash */ IO_ACCT_STALLED_BIT = 0, /* stalled on hash */
}; };
/* /*
...@@ -58,16 +49,16 @@ struct io_worker { ...@@ -58,16 +49,16 @@ struct io_worker {
struct io_wq_work *cur_work; struct io_wq_work *cur_work;
spinlock_t lock; spinlock_t lock;
struct rcu_head rcu; struct completion ref_done;
struct mm_struct *mm;
#ifdef CONFIG_BLK_CGROUP unsigned long create_state;
struct cgroup_subsys_state *blkcg_css; struct callback_head create_work;
#endif int create_index;
const struct cred *cur_creds;
const struct cred *saved_creds; union {
struct files_struct *restore_files; struct rcu_head rcu;
struct nsproxy *restore_nsproxy; struct work_struct work;
struct fs_struct *restore_fs; };
}; };
#if BITS_PER_LONG == 64 #if BITS_PER_LONG == 64
...@@ -81,57 +72,77 @@ struct io_worker { ...@@ -81,57 +72,77 @@ struct io_worker {
struct io_wqe_acct { struct io_wqe_acct {
unsigned nr_workers; unsigned nr_workers;
unsigned max_workers; unsigned max_workers;
int index;
atomic_t nr_running; atomic_t nr_running;
struct io_wq_work_list work_list;
unsigned long flags;
}; };
enum { enum {
IO_WQ_ACCT_BOUND, IO_WQ_ACCT_BOUND,
IO_WQ_ACCT_UNBOUND, IO_WQ_ACCT_UNBOUND,
IO_WQ_ACCT_NR,
}; };
/* /*
* Per-node worker thread pool * Per-node worker thread pool
*/ */
struct io_wqe { struct io_wqe {
struct { raw_spinlock_t lock;
raw_spinlock_t lock; struct io_wqe_acct acct[2];
struct io_wq_work_list work_list;
unsigned long hash_map;
unsigned flags;
} ____cacheline_aligned_in_smp;
int node; int node;
struct io_wqe_acct acct[2];
struct hlist_nulls_head free_list; struct hlist_nulls_head free_list;
struct list_head all_list; struct list_head all_list;
struct wait_queue_entry wait;
struct io_wq *wq; struct io_wq *wq;
struct io_wq_work *hash_tail[IO_WQ_NR_HASH_BUCKETS]; struct io_wq_work *hash_tail[IO_WQ_NR_HASH_BUCKETS];
cpumask_var_t cpu_mask;
}; };
/* /*
* Per io_wq state * Per io_wq state
*/ */
struct io_wq { struct io_wq {
struct io_wqe **wqes;
unsigned long state; unsigned long state;
free_work_fn *free_work; free_work_fn *free_work;
io_wq_work_fn *do_work; io_wq_work_fn *do_work;
struct task_struct *manager; struct io_wq_hash *hash;
struct user_struct *user;
refcount_t refs; atomic_t worker_refs;
struct completion done; struct completion worker_done;
struct hlist_node cpuhp_node; struct hlist_node cpuhp_node;
refcount_t use_refs; struct task_struct *task;
struct io_wqe *wqes[];
}; };
static enum cpuhp_state io_wq_online; static enum cpuhp_state io_wq_online;
struct io_cb_cancel_data {
work_cancel_fn *fn;
void *data;
int nr_running;
int nr_pending;
bool cancel_all;
};
static bool create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index);
static void io_wqe_dec_running(struct io_worker *worker);
static bool io_acct_cancel_pending_work(struct io_wqe *wqe,
struct io_wqe_acct *acct,
struct io_cb_cancel_data *match);
static void create_worker_cb(struct callback_head *cb);
static void io_wq_cancel_tw_create(struct io_wq *wq);
static bool io_worker_get(struct io_worker *worker) static bool io_worker_get(struct io_worker *worker)
{ {
return refcount_inc_not_zero(&worker->ref); return refcount_inc_not_zero(&worker->ref);
...@@ -140,148 +151,126 @@ static bool io_worker_get(struct io_worker *worker) ...@@ -140,148 +151,126 @@ static bool io_worker_get(struct io_worker *worker)
static void io_worker_release(struct io_worker *worker) static void io_worker_release(struct io_worker *worker)
{ {
if (refcount_dec_and_test(&worker->ref)) if (refcount_dec_and_test(&worker->ref))
wake_up_process(worker->task); complete(&worker->ref_done);
} }
/* static inline struct io_wqe_acct *io_get_acct(struct io_wqe *wqe, bool bound)
* Note: drops the wqe->lock if returning true! The caller must re-acquire
* the lock in that case. Some callers need to restart handling if this
* happens, so we can't just re-acquire the lock on behalf of the caller.
*/
static bool __io_worker_unuse(struct io_wqe *wqe, struct io_worker *worker)
{ {
bool dropped_lock = false; return &wqe->acct[bound ? IO_WQ_ACCT_BOUND : IO_WQ_ACCT_UNBOUND];
}
if (worker->saved_creds) {
revert_creds(worker->saved_creds);
worker->cur_creds = worker->saved_creds = NULL;
}
if (current->files != worker->restore_files) {
__acquire(&wqe->lock);
raw_spin_unlock_irq(&wqe->lock);
dropped_lock = true;
task_lock(current);
current->files = worker->restore_files;
current->nsproxy = worker->restore_nsproxy;
task_unlock(current);
}
if (current->fs != worker->restore_fs) static inline struct io_wqe_acct *io_work_get_acct(struct io_wqe *wqe,
current->fs = worker->restore_fs; struct io_wq_work *work)
{
return io_get_acct(wqe, !(work->flags & IO_WQ_WORK_UNBOUND));
}
/* static inline struct io_wqe_acct *io_wqe_get_acct(struct io_worker *worker)
* If we have an active mm, we need to drop the wq lock before unusing {
* it. If we do, return true and let the caller retry the idle loop. return io_get_acct(worker->wqe, worker->flags & IO_WORKER_F_BOUND);
*/ }
if (worker->mm) {
if (!dropped_lock) {
__acquire(&wqe->lock);
raw_spin_unlock_irq(&wqe->lock);
dropped_lock = true;
}
__set_current_state(TASK_RUNNING);
kthread_unuse_mm(worker->mm);
mmput(worker->mm);
worker->mm = NULL;
}
#ifdef CONFIG_BLK_CGROUP static void io_worker_ref_put(struct io_wq *wq)
if (worker->blkcg_css) { {
kthread_associate_blkcg(NULL); if (atomic_dec_and_test(&wq->worker_refs))
worker->blkcg_css = NULL; complete(&wq->worker_done);
}
#endif
if (current->signal->rlim[RLIMIT_FSIZE].rlim_cur != RLIM_INFINITY)
current->signal->rlim[RLIMIT_FSIZE].rlim_cur = RLIM_INFINITY;
return dropped_lock;
} }
static inline struct io_wqe_acct *io_work_get_acct(struct io_wqe *wqe, static void io_worker_cancel_cb(struct io_worker *worker)
struct io_wq_work *work)
{ {
if (work->flags & IO_WQ_WORK_UNBOUND) struct io_wqe_acct *acct = io_wqe_get_acct(worker);
return &wqe->acct[IO_WQ_ACCT_UNBOUND]; struct io_wqe *wqe = worker->wqe;
struct io_wq *wq = wqe->wq;
return &wqe->acct[IO_WQ_ACCT_BOUND]; atomic_dec(&acct->nr_running);
raw_spin_lock(&worker->wqe->lock);
acct->nr_workers--;
raw_spin_unlock(&worker->wqe->lock);
io_worker_ref_put(wq);
clear_bit_unlock(0, &worker->create_state);
io_worker_release(worker);
} }
static inline struct io_wqe_acct *io_wqe_get_acct(struct io_wqe *wqe, static bool io_task_worker_match(struct callback_head *cb, void *data)
struct io_worker *worker)
{ {
if (worker->flags & IO_WORKER_F_BOUND) struct io_worker *worker;
return &wqe->acct[IO_WQ_ACCT_BOUND];
return &wqe->acct[IO_WQ_ACCT_UNBOUND]; if (cb->func != create_worker_cb)
return false;
worker = container_of(cb, struct io_worker, create_work);
return worker == data;
} }
static void io_worker_exit(struct io_worker *worker) static void io_worker_exit(struct io_worker *worker)
{ {
struct io_wqe *wqe = worker->wqe; struct io_wqe *wqe = worker->wqe;
struct io_wqe_acct *acct = io_wqe_get_acct(wqe, worker); struct io_wq *wq = wqe->wq;
/* while (1) {
* If we're not at zero, someone else is holding a brief reference struct callback_head *cb = task_work_cancel_match(wq->task,
* to the worker. Wait for that to go away. io_task_worker_match, worker);
*/
set_current_state(TASK_INTERRUPTIBLE); if (!cb)
if (!refcount_dec_and_test(&worker->ref)) break;
schedule(); io_worker_cancel_cb(worker);
__set_current_state(TASK_RUNNING); }
if (refcount_dec_and_test(&worker->ref))
complete(&worker->ref_done);
wait_for_completion(&worker->ref_done);
raw_spin_lock(&wqe->lock);
if (worker->flags & IO_WORKER_F_FREE)
hlist_nulls_del_rcu(&worker->nulls_node);
list_del_rcu(&worker->all_list);
preempt_disable(); preempt_disable();
current->flags &= ~PF_IO_WORKER; io_wqe_dec_running(worker);
if (worker->flags & IO_WORKER_F_RUNNING)
atomic_dec(&acct->nr_running);
if (!(worker->flags & IO_WORKER_F_BOUND))
atomic_dec(&wqe->wq->user->processes);
worker->flags = 0; worker->flags = 0;
current->flags &= ~PF_IO_WORKER;
preempt_enable(); preempt_enable();
raw_spin_unlock(&wqe->lock);
raw_spin_lock_irq(&wqe->lock);
hlist_nulls_del_rcu(&worker->nulls_node);
list_del_rcu(&worker->all_list);
if (__io_worker_unuse(wqe, worker)) {
__release(&wqe->lock);
raw_spin_lock_irq(&wqe->lock);
}
acct->nr_workers--;
raw_spin_unlock_irq(&wqe->lock);
kfree_rcu(worker, rcu); kfree_rcu(worker, rcu);
if (refcount_dec_and_test(&wqe->wq->refs)) io_worker_ref_put(wqe->wq);
complete(&wqe->wq->done); do_exit(0);
} }
static inline bool io_wqe_run_queue(struct io_wqe *wqe) static inline bool io_acct_run_queue(struct io_wqe_acct *acct)
__must_hold(wqe->lock)
{ {
if (!wq_list_empty(&wqe->work_list) && if (!wq_list_empty(&acct->work_list) &&
!(wqe->flags & IO_WQE_FLAG_STALLED)) !test_bit(IO_ACCT_STALLED_BIT, &acct->flags))
return true; return true;
return false; return false;
} }
/* /*
* Check head of free list for an available worker. If one isn't available, * Check head of free list for an available worker. If one isn't available,
* caller must wake up the wq manager to create one. * caller must create one.
*/ */
static bool io_wqe_activate_free_worker(struct io_wqe *wqe) static bool io_wqe_activate_free_worker(struct io_wqe *wqe,
struct io_wqe_acct *acct)
__must_hold(RCU) __must_hold(RCU)
{ {
struct hlist_nulls_node *n; struct hlist_nulls_node *n;
struct io_worker *worker; struct io_worker *worker;
n = rcu_dereference(hlist_nulls_first_rcu(&wqe->free_list)); /*
if (is_a_nulls(n)) * Iterate free_list and see if we can find an idle worker to
return false; * activate. If a given worker is on the free_list but in the process
* of exiting, keep trying.
worker = hlist_nulls_entry(n, struct io_worker, nulls_node); */
if (io_worker_get(worker)) { hlist_nulls_for_each_entry_rcu(worker, n, &wqe->free_list, nulls_node) {
wake_up_process(worker->task); if (!io_worker_get(worker))
continue;
if (io_wqe_get_acct(worker) != acct) {
io_worker_release(worker);
continue;
}
if (wake_up_process(worker->task)) {
io_worker_release(worker);
return true;
}
io_worker_release(worker); io_worker_release(worker);
return true;
} }
return false; return false;
...@@ -289,12 +278,10 @@ static bool io_wqe_activate_free_worker(struct io_wqe *wqe) ...@@ -289,12 +278,10 @@ static bool io_wqe_activate_free_worker(struct io_wqe *wqe)
/* /*
* We need a worker. If we find a free one, we're good. If not, and we're * We need a worker. If we find a free one, we're good. If not, and we're
* below the max number of workers, wake up the manager to create one. * below the max number of workers, create one.
*/ */
static void io_wqe_wake_worker(struct io_wqe *wqe, struct io_wqe_acct *acct) static bool io_wqe_create_worker(struct io_wqe *wqe, struct io_wqe_acct *acct)
{ {
bool ret;
/* /*
* Most likely an attempt to queue unbounded work on an io_wq that * Most likely an attempt to queue unbounded work on an io_wq that
* wasn't setup with any unbounded workers. * wasn't setup with any unbounded workers.
...@@ -302,41 +289,116 @@ static void io_wqe_wake_worker(struct io_wqe *wqe, struct io_wqe_acct *acct) ...@@ -302,41 +289,116 @@ static void io_wqe_wake_worker(struct io_wqe *wqe, struct io_wqe_acct *acct)
if (unlikely(!acct->max_workers)) if (unlikely(!acct->max_workers))
pr_warn_once("io-wq is not configured for unbound workers"); pr_warn_once("io-wq is not configured for unbound workers");
rcu_read_lock(); raw_spin_lock(&wqe->lock);
ret = io_wqe_activate_free_worker(wqe); if (acct->nr_workers >= acct->max_workers) {
rcu_read_unlock(); raw_spin_unlock(&wqe->lock);
return true;
if (!ret && acct->nr_workers < acct->max_workers) }
wake_up_process(wqe->wq->manager); acct->nr_workers++;
raw_spin_unlock(&wqe->lock);
atomic_inc(&acct->nr_running);
atomic_inc(&wqe->wq->worker_refs);
return create_io_worker(wqe->wq, wqe, acct->index);
} }
static void io_wqe_inc_running(struct io_wqe *wqe, struct io_worker *worker) static void io_wqe_inc_running(struct io_worker *worker)
{ {
struct io_wqe_acct *acct = io_wqe_get_acct(wqe, worker); struct io_wqe_acct *acct = io_wqe_get_acct(worker);
atomic_inc(&acct->nr_running); atomic_inc(&acct->nr_running);
} }
static void io_wqe_dec_running(struct io_wqe *wqe, struct io_worker *worker) static void create_worker_cb(struct callback_head *cb)
__must_hold(wqe->lock) {
struct io_worker *worker;
struct io_wq *wq;
struct io_wqe *wqe;
struct io_wqe_acct *acct;
bool do_create = false;
worker = container_of(cb, struct io_worker, create_work);
wqe = worker->wqe;
wq = wqe->wq;
acct = &wqe->acct[worker->create_index];
raw_spin_lock(&wqe->lock);
if (acct->nr_workers < acct->max_workers) {
acct->nr_workers++;
do_create = true;
}
raw_spin_unlock(&wqe->lock);
if (do_create) {
create_io_worker(wq, wqe, worker->create_index);
} else {
atomic_dec(&acct->nr_running);
io_worker_ref_put(wq);
}
clear_bit_unlock(0, &worker->create_state);
io_worker_release(worker);
}
static bool io_queue_worker_create(struct io_worker *worker,
struct io_wqe_acct *acct,
task_work_func_t func)
{ {
struct io_wqe_acct *acct = io_wqe_get_acct(wqe, worker); struct io_wqe *wqe = worker->wqe;
struct io_wq *wq = wqe->wq;
if (atomic_dec_and_test(&acct->nr_running) && io_wqe_run_queue(wqe)) /* raced with exit, just ignore create call */
io_wqe_wake_worker(wqe, acct); if (test_bit(IO_WQ_BIT_EXIT, &wq->state))
goto fail;
if (!io_worker_get(worker))
goto fail;
/*
* create_state manages ownership of create_work/index. We should
* only need one entry per worker, as the worker going to sleep
* will trigger the condition, and waking will clear it once it
* runs the task_work.
*/
if (test_bit(0, &worker->create_state) ||
test_and_set_bit_lock(0, &worker->create_state))
goto fail_release;
atomic_inc(&wq->worker_refs);
init_task_work(&worker->create_work, func);
worker->create_index = acct->index;
if (!task_work_add(wq->task, &worker->create_work, TWA_SIGNAL)) {
/*
* EXIT may have been set after checking it above, check after
* adding the task_work and remove any creation item if it is
* now set. wq exit does that too, but we can have added this
* work item after we canceled in io_wq_exit_workers().
*/
if (test_bit(IO_WQ_BIT_EXIT, &wq->state))
io_wq_cancel_tw_create(wq);
io_worker_ref_put(wq);
return true;
}
io_worker_ref_put(wq);
clear_bit_unlock(0, &worker->create_state);
fail_release:
io_worker_release(worker);
fail:
atomic_dec(&acct->nr_running);
io_worker_ref_put(wq);
return false;
} }
static void io_worker_start(struct io_wqe *wqe, struct io_worker *worker) static void io_wqe_dec_running(struct io_worker *worker)
__must_hold(wqe->lock)
{ {
allow_kernel_signal(SIGINT); struct io_wqe_acct *acct = io_wqe_get_acct(worker);
struct io_wqe *wqe = worker->wqe;
current->flags |= PF_IO_WORKER; if (!(worker->flags & IO_WORKER_F_UP))
return;
worker->flags |= (IO_WORKER_F_UP | IO_WORKER_F_RUNNING); if (atomic_dec_and_test(&acct->nr_running) && io_acct_run_queue(acct)) {
worker->restore_files = current->files; atomic_inc(&acct->nr_running);
worker->restore_nsproxy = current->nsproxy; atomic_inc(&wqe->wq->worker_refs);
worker->restore_fs = current->fs; raw_spin_unlock(&wqe->lock);
io_wqe_inc_running(wqe, worker); io_queue_worker_create(worker, acct, create_worker_cb);
raw_spin_lock(&wqe->lock);
}
} }
/* /*
...@@ -347,34 +409,10 @@ static void __io_worker_busy(struct io_wqe *wqe, struct io_worker *worker, ...@@ -347,34 +409,10 @@ static void __io_worker_busy(struct io_wqe *wqe, struct io_worker *worker,
struct io_wq_work *work) struct io_wq_work *work)
__must_hold(wqe->lock) __must_hold(wqe->lock)
{ {
bool worker_bound, work_bound;
if (worker->flags & IO_WORKER_F_FREE) { if (worker->flags & IO_WORKER_F_FREE) {
worker->flags &= ~IO_WORKER_F_FREE; worker->flags &= ~IO_WORKER_F_FREE;
hlist_nulls_del_init_rcu(&worker->nulls_node); hlist_nulls_del_init_rcu(&worker->nulls_node);
} }
/*
* If worker is moving from bound to unbound (or vice versa), then
* ensure we update the running accounting.
*/
worker_bound = (worker->flags & IO_WORKER_F_BOUND) != 0;
work_bound = (work->flags & IO_WQ_WORK_UNBOUND) == 0;
if (worker_bound != work_bound) {
io_wqe_dec_running(wqe, worker);
if (work_bound) {
worker->flags |= IO_WORKER_F_BOUND;
wqe->acct[IO_WQ_ACCT_UNBOUND].nr_workers--;
wqe->acct[IO_WQ_ACCT_BOUND].nr_workers++;
atomic_dec(&wqe->wq->user->processes);
} else {
worker->flags &= ~IO_WORKER_F_BOUND;
wqe->acct[IO_WQ_ACCT_UNBOUND].nr_workers++;
wqe->acct[IO_WQ_ACCT_BOUND].nr_workers--;
atomic_inc(&wqe->wq->user->processes);
}
io_wqe_inc_running(wqe, worker);
}
} }
/* /*
...@@ -384,15 +422,13 @@ static void __io_worker_busy(struct io_wqe *wqe, struct io_worker *worker, ...@@ -384,15 +422,13 @@ static void __io_worker_busy(struct io_wqe *wqe, struct io_worker *worker,
* retry the loop in that case (we changed task state), we don't regrab * retry the loop in that case (we changed task state), we don't regrab
* the lock if we return success. * the lock if we return success.
*/ */
static bool __io_worker_idle(struct io_wqe *wqe, struct io_worker *worker) static void __io_worker_idle(struct io_wqe *wqe, struct io_worker *worker)
__must_hold(wqe->lock) __must_hold(wqe->lock)
{ {
if (!(worker->flags & IO_WORKER_F_FREE)) { if (!(worker->flags & IO_WORKER_F_FREE)) {
worker->flags |= IO_WORKER_F_FREE; worker->flags |= IO_WORKER_F_FREE;
hlist_nulls_add_head_rcu(&worker->nulls_node, &wqe->free_list); hlist_nulls_add_head_rcu(&worker->nulls_node, &wqe->free_list);
} }
return __io_worker_unuse(wqe, worker);
} }
static inline unsigned int io_get_work_hash(struct io_wq_work *work) static inline unsigned int io_get_work_hash(struct io_wq_work *work)
...@@ -400,130 +436,102 @@ static inline unsigned int io_get_work_hash(struct io_wq_work *work) ...@@ -400,130 +436,102 @@ static inline unsigned int io_get_work_hash(struct io_wq_work *work)
return work->flags >> IO_WQ_HASH_SHIFT; return work->flags >> IO_WQ_HASH_SHIFT;
} }
static struct io_wq_work *io_get_next_work(struct io_wqe *wqe) static bool io_wait_on_hash(struct io_wqe *wqe, unsigned int hash)
{
struct io_wq *wq = wqe->wq;
bool ret = false;
spin_lock_irq(&wq->hash->wait.lock);
if (list_empty(&wqe->wait.entry)) {
__add_wait_queue(&wq->hash->wait, &wqe->wait);
if (!test_bit(hash, &wq->hash->map)) {
__set_current_state(TASK_RUNNING);
list_del_init(&wqe->wait.entry);
ret = true;
}
}
spin_unlock_irq(&wq->hash->wait.lock);
return ret;
}
static struct io_wq_work *io_get_next_work(struct io_wqe_acct *acct,
struct io_worker *worker)
__must_hold(wqe->lock) __must_hold(wqe->lock)
{ {
struct io_wq_work_node *node, *prev; struct io_wq_work_node *node, *prev;
struct io_wq_work *work, *tail; struct io_wq_work *work, *tail;
unsigned int hash; unsigned int stall_hash = -1U;
struct io_wqe *wqe = worker->wqe;
wq_list_for_each(node, prev, &acct->work_list) {
unsigned int hash;
wq_list_for_each(node, prev, &wqe->work_list) {
work = container_of(node, struct io_wq_work, list); work = container_of(node, struct io_wq_work, list);
/* not hashed, can run anytime */ /* not hashed, can run anytime */
if (!io_wq_is_hashed(work)) { if (!io_wq_is_hashed(work)) {
wq_list_del(&wqe->work_list, node, prev); wq_list_del(&acct->work_list, node, prev);
return work; return work;
} }
/* hashed, can run if not already running */
hash = io_get_work_hash(work); hash = io_get_work_hash(work);
if (!(wqe->hash_map & BIT(hash))) { /* all items with this hash lie in [work, tail] */
wqe->hash_map |= BIT(hash); tail = wqe->hash_tail[hash];
/* all items with this hash lie in [work, tail] */
tail = wqe->hash_tail[hash]; /* hashed, can run if not already running */
if (!test_and_set_bit(hash, &wqe->wq->hash->map)) {
wqe->hash_tail[hash] = NULL; wqe->hash_tail[hash] = NULL;
wq_list_cut(&wqe->work_list, &tail->list, prev); wq_list_cut(&acct->work_list, &tail->list, prev);
return work; return work;
} }
if (stall_hash == -1U)
stall_hash = hash;
/* fast forward to a next hash, for-each will fix up @prev */
node = &tail->list;
} }
return NULL; if (stall_hash != -1U) {
} bool unstalled;
static void io_wq_switch_mm(struct io_worker *worker, struct io_wq_work *work)
{
if (worker->mm) {
kthread_unuse_mm(worker->mm);
mmput(worker->mm);
worker->mm = NULL;
}
if (mmget_not_zero(work->identity->mm)) {
kthread_use_mm(work->identity->mm);
worker->mm = work->identity->mm;
return;
}
/* failed grabbing mm, ensure work gets cancelled */ /*
work->flags |= IO_WQ_WORK_CANCEL; * Set this before dropping the lock to avoid racing with new
} * work being added and clearing the stalled bit.
*/
static inline void io_wq_switch_blkcg(struct io_worker *worker, set_bit(IO_ACCT_STALLED_BIT, &acct->flags);
struct io_wq_work *work) raw_spin_unlock(&wqe->lock);
{ unstalled = io_wait_on_hash(wqe, stall_hash);
#ifdef CONFIG_BLK_CGROUP raw_spin_lock(&wqe->lock);
if (!(work->flags & IO_WQ_WORK_BLKCG)) if (unstalled) {
return; clear_bit(IO_ACCT_STALLED_BIT, &acct->flags);
if (work->identity->blkcg_css != worker->blkcg_css) { if (wq_has_sleeper(&wqe->wq->hash->wait))
kthread_associate_blkcg(work->identity->blkcg_css); wake_up(&wqe->wq->hash->wait);
worker->blkcg_css = work->identity->blkcg_css; }
} }
#endif
}
static void io_wq_switch_creds(struct io_worker *worker,
struct io_wq_work *work)
{
const struct cred *old_creds = override_creds(work->identity->creds);
worker->cur_creds = work->identity->creds; return NULL;
if (worker->saved_creds)
put_cred(old_creds); /* creds set by previous switch */
else
worker->saved_creds = old_creds;
} }
static void io_impersonate_work(struct io_worker *worker, static bool io_flush_signals(void)
struct io_wq_work *work)
{ {
if ((work->flags & IO_WQ_WORK_FILES) && if (unlikely(test_thread_flag(TIF_NOTIFY_SIGNAL))) {
current->files != work->identity->files) { __set_current_state(TASK_RUNNING);
task_lock(current); tracehook_notify_signal();
current->files = work->identity->files; return true;
current->nsproxy = work->identity->nsproxy;
task_unlock(current);
if (!work->identity->files) {
/* failed grabbing files, ensure work gets cancelled */
work->flags |= IO_WQ_WORK_CANCEL;
}
} }
if ((work->flags & IO_WQ_WORK_FS) && current->fs != work->identity->fs) return false;
current->fs = work->identity->fs;
if ((work->flags & IO_WQ_WORK_MM) && work->identity->mm != worker->mm)
io_wq_switch_mm(worker, work);
if ((work->flags & IO_WQ_WORK_CREDS) &&
worker->cur_creds != work->identity->creds)
io_wq_switch_creds(worker, work);
if (work->flags & IO_WQ_WORK_FSIZE)
current->signal->rlim[RLIMIT_FSIZE].rlim_cur = work->identity->fsize;
else if (current->signal->rlim[RLIMIT_FSIZE].rlim_cur != RLIM_INFINITY)
current->signal->rlim[RLIMIT_FSIZE].rlim_cur = RLIM_INFINITY;
io_wq_switch_blkcg(worker, work);
#ifdef CONFIG_AUDIT
current->loginuid = work->identity->loginuid;
current->sessionid = work->identity->sessionid;
#endif
} }
static void io_assign_current_work(struct io_worker *worker, static void io_assign_current_work(struct io_worker *worker,
struct io_wq_work *work) struct io_wq_work *work)
{ {
if (work) { if (work) {
/* flush pending signals before assigning new work */ io_flush_signals();
if (signal_pending(current))
flush_signals(current);
cond_resched(); cond_resched();
} }
#ifdef CONFIG_AUDIT spin_lock(&worker->lock);
current->loginuid = KUIDT_INIT(AUDIT_UID_UNSET);
current->sessionid = AUDIT_SID_UNSET;
#endif
spin_lock_irq(&worker->lock);
worker->cur_work = work; worker->cur_work = work;
spin_unlock_irq(&worker->lock); spin_unlock(&worker->lock);
} }
static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work); static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work);
...@@ -531,8 +539,10 @@ static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work); ...@@ -531,8 +539,10 @@ static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work);
static void io_worker_handle_work(struct io_worker *worker) static void io_worker_handle_work(struct io_worker *worker)
__releases(wqe->lock) __releases(wqe->lock)
{ {
struct io_wqe_acct *acct = io_wqe_get_acct(worker);
struct io_wqe *wqe = worker->wqe; struct io_wqe *wqe = worker->wqe;
struct io_wq *wq = wqe->wq; struct io_wq *wq = wqe->wq;
bool do_kill = test_bit(IO_WQ_BIT_EXIT, &wq->state);
do { do {
struct io_wq_work *work; struct io_wq_work *work;
...@@ -544,99 +554,108 @@ static void io_worker_handle_work(struct io_worker *worker) ...@@ -544,99 +554,108 @@ static void io_worker_handle_work(struct io_worker *worker)
* can't make progress, any work completion or insertion will * can't make progress, any work completion or insertion will
* clear the stalled flag. * clear the stalled flag.
*/ */
work = io_get_next_work(wqe); work = io_get_next_work(acct, worker);
if (work) if (work)
__io_worker_busy(wqe, worker, work); __io_worker_busy(wqe, worker, work);
else if (!wq_list_empty(&wqe->work_list))
wqe->flags |= IO_WQE_FLAG_STALLED;
raw_spin_unlock_irq(&wqe->lock); raw_spin_unlock(&wqe->lock);
if (!work) if (!work)
break; break;
io_assign_current_work(worker, work); io_assign_current_work(worker, work);
__set_current_state(TASK_RUNNING);
/* handle a whole dependent link */ /* handle a whole dependent link */
do { do {
struct io_wq_work *old_work, *next_hashed, *linked; struct io_wq_work *next_hashed, *linked;
unsigned int hash = io_get_work_hash(work); unsigned int hash = io_get_work_hash(work);
next_hashed = wq_next_work(work); next_hashed = wq_next_work(work);
io_impersonate_work(worker, work);
/*
* OK to set IO_WQ_WORK_CANCEL even for uncancellable
* work, the worker function will do the right thing.
*/
if (test_bit(IO_WQ_BIT_CANCEL, &wq->state))
work->flags |= IO_WQ_WORK_CANCEL;
old_work = work; if (unlikely(do_kill) && (work->flags & IO_WQ_WORK_UNBOUND))
linked = wq->do_work(work); work->flags |= IO_WQ_WORK_CANCEL;
wq->do_work(work);
io_assign_current_work(worker, NULL);
linked = wq->free_work(work);
work = next_hashed; work = next_hashed;
if (!work && linked && !io_wq_is_hashed(linked)) { if (!work && linked && !io_wq_is_hashed(linked)) {
work = linked; work = linked;
linked = NULL; linked = NULL;
} }
io_assign_current_work(worker, work); io_assign_current_work(worker, work);
wq->free_work(old_work);
if (linked) if (linked)
io_wqe_enqueue(wqe, linked); io_wqe_enqueue(wqe, linked);
if (hash != -1U && !next_hashed) { if (hash != -1U && !next_hashed) {
raw_spin_lock_irq(&wqe->lock); /* serialize hash clear with wake_up() */
wqe->hash_map &= ~BIT_ULL(hash); spin_lock_irq(&wq->hash->wait.lock);
wqe->flags &= ~IO_WQE_FLAG_STALLED; clear_bit(hash, &wq->hash->map);
clear_bit(IO_ACCT_STALLED_BIT, &acct->flags);
spin_unlock_irq(&wq->hash->wait.lock);
if (wq_has_sleeper(&wq->hash->wait))
wake_up(&wq->hash->wait);
raw_spin_lock(&wqe->lock);
/* skip unnecessary unlock-lock wqe->lock */ /* skip unnecessary unlock-lock wqe->lock */
if (!work) if (!work)
goto get_next; goto get_next;
raw_spin_unlock_irq(&wqe->lock); raw_spin_unlock(&wqe->lock);
} }
} while (work); } while (work);
raw_spin_lock_irq(&wqe->lock); raw_spin_lock(&wqe->lock);
} while (1); } while (1);
} }
static int io_wqe_worker(void *data) static int io_wqe_worker(void *data)
{ {
struct io_worker *worker = data; struct io_worker *worker = data;
struct io_wqe_acct *acct = io_wqe_get_acct(worker);
struct io_wqe *wqe = worker->wqe; struct io_wqe *wqe = worker->wqe;
struct io_wq *wq = wqe->wq; struct io_wq *wq = wqe->wq;
bool last_timeout = false;
char buf[TASK_COMM_LEN];
io_worker_start(wqe, worker); worker->flags |= (IO_WORKER_F_UP | IO_WORKER_F_RUNNING);
snprintf(buf, sizeof(buf), "iou-wrk-%d", wq->task->pid);
set_task_comm(current, buf);
while (!test_bit(IO_WQ_BIT_EXIT, &wq->state)) { while (!test_bit(IO_WQ_BIT_EXIT, &wq->state)) {
long ret;
set_current_state(TASK_INTERRUPTIBLE); set_current_state(TASK_INTERRUPTIBLE);
loop: loop:
raw_spin_lock_irq(&wqe->lock); raw_spin_lock(&wqe->lock);
if (io_wqe_run_queue(wqe)) { if (io_acct_run_queue(acct)) {
__set_current_state(TASK_RUNNING);
io_worker_handle_work(worker); io_worker_handle_work(worker);
goto loop; goto loop;
} }
/* drops the lock on success, retry */ /* timed out, exit unless we're the last worker */
if (__io_worker_idle(wqe, worker)) { if (last_timeout && acct->nr_workers > 1) {
__release(&wqe->lock); acct->nr_workers--;
goto loop; raw_spin_unlock(&wqe->lock);
__set_current_state(TASK_RUNNING);
break;
} }
raw_spin_unlock_irq(&wqe->lock); last_timeout = false;
if (signal_pending(current)) __io_worker_idle(wqe, worker);
flush_signals(current); raw_spin_unlock(&wqe->lock);
if (schedule_timeout(WORKER_IDLE_TIMEOUT)) if (io_flush_signals())
continue; continue;
/* timed out, exit unless we're the fixed worker */ ret = schedule_timeout(WORKER_IDLE_TIMEOUT);
if (test_bit(IO_WQ_BIT_EXIT, &wq->state) || if (signal_pending(current)) {
!(worker->flags & IO_WORKER_F_FIXED)) struct ksignal ksig;
if (!get_signal(&ksig))
continue;
break; break;
}
last_timeout = !ret;
} }
if (test_bit(IO_WQ_BIT_EXIT, &wq->state)) { if (test_bit(IO_WQ_BIT_EXIT, &wq->state)) {
raw_spin_lock_irq(&wqe->lock); raw_spin_lock(&wqe->lock);
if (!wq_list_empty(&wqe->work_list)) io_worker_handle_work(worker);
io_worker_handle_work(worker);
else
raw_spin_unlock_irq(&wqe->lock);
} }
io_worker_exit(worker); io_worker_exit(worker);
...@@ -648,27 +667,28 @@ static int io_wqe_worker(void *data) ...@@ -648,27 +667,28 @@ static int io_wqe_worker(void *data)
*/ */
void io_wq_worker_running(struct task_struct *tsk) void io_wq_worker_running(struct task_struct *tsk)
{ {
struct io_worker *worker = kthread_data(tsk); struct io_worker *worker = tsk->pf_io_worker;
struct io_wqe *wqe = worker->wqe;
if (!worker)
return;
if (!(worker->flags & IO_WORKER_F_UP)) if (!(worker->flags & IO_WORKER_F_UP))
return; return;
if (worker->flags & IO_WORKER_F_RUNNING) if (worker->flags & IO_WORKER_F_RUNNING)
return; return;
worker->flags |= IO_WORKER_F_RUNNING; worker->flags |= IO_WORKER_F_RUNNING;
io_wqe_inc_running(wqe, worker); io_wqe_inc_running(worker);
} }
/* /*
* Called when worker is going to sleep. If there are no workers currently * Called when worker is going to sleep. If there are no workers currently
* running and we have work pending, wake up a free one or have the manager * running and we have work pending, wake up a free one or create a new one.
* set one up.
*/ */
void io_wq_worker_sleeping(struct task_struct *tsk) void io_wq_worker_sleeping(struct task_struct *tsk)
{ {
struct io_worker *worker = kthread_data(tsk); struct io_worker *worker = tsk->pf_io_worker;
struct io_wqe *wqe = worker->wqe;
if (!worker)
return;
if (!(worker->flags & IO_WORKER_F_UP)) if (!(worker->flags & IO_WORKER_F_UP))
return; return;
if (!(worker->flags & IO_WORKER_F_RUNNING)) if (!(worker->flags & IO_WORKER_F_RUNNING))
...@@ -676,67 +696,140 @@ void io_wq_worker_sleeping(struct task_struct *tsk) ...@@ -676,67 +696,140 @@ void io_wq_worker_sleeping(struct task_struct *tsk)
worker->flags &= ~IO_WORKER_F_RUNNING; worker->flags &= ~IO_WORKER_F_RUNNING;
raw_spin_lock_irq(&wqe->lock); raw_spin_lock(&worker->wqe->lock);
io_wqe_dec_running(wqe, worker); io_wqe_dec_running(worker);
raw_spin_unlock_irq(&wqe->lock); raw_spin_unlock(&worker->wqe->lock);
} }
static bool create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index) static void io_init_new_worker(struct io_wqe *wqe, struct io_worker *worker,
struct task_struct *tsk)
{ {
struct io_wqe_acct *acct = &wqe->acct[index]; tsk->pf_io_worker = worker;
struct io_worker *worker; worker->task = tsk;
set_cpus_allowed_ptr(tsk, wqe->cpu_mask);
tsk->flags |= PF_NO_SETAFFINITY;
worker = kzalloc_node(sizeof(*worker), GFP_KERNEL, wqe->node); raw_spin_lock(&wqe->lock);
if (!worker) hlist_nulls_add_head_rcu(&worker->nulls_node, &wqe->free_list);
list_add_tail_rcu(&worker->all_list, &wqe->all_list);
worker->flags |= IO_WORKER_F_FREE;
raw_spin_unlock(&wqe->lock);
wake_up_new_task(tsk);
}
static bool io_wq_work_match_all(struct io_wq_work *work, void *data)
{
return true;
}
static inline bool io_should_retry_thread(long err)
{
/*
* Prevent perpetual task_work retry, if the task (or its group) is
* exiting.
*/
if (fatal_signal_pending(current))
return false; return false;
refcount_set(&worker->ref, 1); switch (err) {
worker->nulls_node.pprev = NULL; case -EAGAIN:
worker->wqe = wqe; case -ERESTARTSYS:
spin_lock_init(&worker->lock); case -ERESTARTNOINTR:
case -ERESTARTNOHAND:
return true;
default:
return false;
}
}
static void create_worker_cont(struct callback_head *cb)
{
struct io_worker *worker;
struct task_struct *tsk;
struct io_wqe *wqe;
worker->task = kthread_create_on_node(io_wqe_worker, worker, wqe->node, worker = container_of(cb, struct io_worker, create_work);
"io_wqe_worker-%d/%d", index, wqe->node); clear_bit_unlock(0, &worker->create_state);
if (IS_ERR(worker->task)) { wqe = worker->wqe;
tsk = create_io_thread(io_wqe_worker, worker, wqe->node);
if (!IS_ERR(tsk)) {
io_init_new_worker(wqe, worker, tsk);
io_worker_release(worker);
return;
} else if (!io_should_retry_thread(PTR_ERR(tsk))) {
struct io_wqe_acct *acct = io_wqe_get_acct(worker);
atomic_dec(&acct->nr_running);
raw_spin_lock(&wqe->lock);
acct->nr_workers--;
if (!acct->nr_workers) {
struct io_cb_cancel_data match = {
.fn = io_wq_work_match_all,
.cancel_all = true,
};
while (io_acct_cancel_pending_work(wqe, acct, &match))
raw_spin_lock(&wqe->lock);
}
raw_spin_unlock(&wqe->lock);
io_worker_ref_put(wqe->wq);
kfree(worker); kfree(worker);
return false; return;
} }
kthread_bind_mask(worker->task, cpumask_of_node(wqe->node));
raw_spin_lock_irq(&wqe->lock); /* re-create attempts grab a new worker ref, drop the existing one */
hlist_nulls_add_head_rcu(&worker->nulls_node, &wqe->free_list); io_worker_release(worker);
list_add_tail_rcu(&worker->all_list, &wqe->all_list); schedule_work(&worker->work);
worker->flags |= IO_WORKER_F_FREE; }
if (index == IO_WQ_ACCT_BOUND)
worker->flags |= IO_WORKER_F_BOUND;
if (!acct->nr_workers && (worker->flags & IO_WORKER_F_BOUND))
worker->flags |= IO_WORKER_F_FIXED;
acct->nr_workers++;
raw_spin_unlock_irq(&wqe->lock);
if (index == IO_WQ_ACCT_UNBOUND) static void io_workqueue_create(struct work_struct *work)
atomic_inc(&wq->user->processes); {
struct io_worker *worker = container_of(work, struct io_worker, work);
struct io_wqe_acct *acct = io_wqe_get_acct(worker);
refcount_inc(&wq->refs); if (!io_queue_worker_create(worker, acct, create_worker_cont))
wake_up_process(worker->task); kfree(worker);
return true;
} }
static inline bool io_wqe_need_worker(struct io_wqe *wqe, int index) static bool create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index)
__must_hold(wqe->lock)
{ {
struct io_wqe_acct *acct = &wqe->acct[index]; struct io_wqe_acct *acct = &wqe->acct[index];
struct io_worker *worker;
struct task_struct *tsk;
/* if we have available workers or no work, no need */ __set_current_state(TASK_RUNNING);
if (!hlist_nulls_empty(&wqe->free_list) || !io_wqe_run_queue(wqe))
worker = kzalloc_node(sizeof(*worker), GFP_KERNEL, wqe->node);
if (!worker) {
fail:
atomic_dec(&acct->nr_running);
raw_spin_lock(&wqe->lock);
acct->nr_workers--;
raw_spin_unlock(&wqe->lock);
io_worker_ref_put(wq);
return false; return false;
return acct->nr_workers < acct->max_workers; }
}
static bool io_wqe_worker_send_sig(struct io_worker *worker, void *data) refcount_set(&worker->ref, 1);
{ worker->wqe = wqe;
send_sig(SIGINT, worker->task, 1); spin_lock_init(&worker->lock);
return false; init_completion(&worker->ref_done);
if (index == IO_WQ_ACCT_BOUND)
worker->flags |= IO_WORKER_F_BOUND;
tsk = create_io_thread(io_wqe_worker, worker, wqe->node);
if (!IS_ERR(tsk)) {
io_init_new_worker(wqe, worker, tsk);
} else if (!io_should_retry_thread(PTR_ERR(tsk))) {
kfree(worker);
goto fail;
} else {
INIT_WORK(&worker->work, io_workqueue_create);
schedule_work(&worker->work);
}
return true;
} }
/* /*
...@@ -766,120 +859,31 @@ static bool io_wq_for_each_worker(struct io_wqe *wqe, ...@@ -766,120 +859,31 @@ static bool io_wq_for_each_worker(struct io_wqe *wqe,
static bool io_wq_worker_wake(struct io_worker *worker, void *data) static bool io_wq_worker_wake(struct io_worker *worker, void *data)
{ {
set_notify_signal(worker->task);
wake_up_process(worker->task); wake_up_process(worker->task);
return false; return false;
} }
/*
* Manager thread. Tasked with creating new workers, if we need them.
*/
static int io_wq_manager(void *data)
{
struct io_wq *wq = data;
int node;
/* create fixed workers */
refcount_set(&wq->refs, 1);
for_each_node(node) {
if (!node_online(node))
continue;
if (create_io_worker(wq, wq->wqes[node], IO_WQ_ACCT_BOUND))
continue;
set_bit(IO_WQ_BIT_ERROR, &wq->state);
set_bit(IO_WQ_BIT_EXIT, &wq->state);
goto out;
}
complete(&wq->done);
while (!kthread_should_stop()) {
if (current->task_works)
task_work_run();
for_each_node(node) {
struct io_wqe *wqe = wq->wqes[node];
bool fork_worker[2] = { false, false };
if (!node_online(node))
continue;
raw_spin_lock_irq(&wqe->lock);
if (io_wqe_need_worker(wqe, IO_WQ_ACCT_BOUND))
fork_worker[IO_WQ_ACCT_BOUND] = true;
if (io_wqe_need_worker(wqe, IO_WQ_ACCT_UNBOUND))
fork_worker[IO_WQ_ACCT_UNBOUND] = true;
raw_spin_unlock_irq(&wqe->lock);
if (fork_worker[IO_WQ_ACCT_BOUND])
create_io_worker(wq, wqe, IO_WQ_ACCT_BOUND);
if (fork_worker[IO_WQ_ACCT_UNBOUND])
create_io_worker(wq, wqe, IO_WQ_ACCT_UNBOUND);
}
set_current_state(TASK_INTERRUPTIBLE);
schedule_timeout(HZ);
}
if (current->task_works)
task_work_run();
out:
if (refcount_dec_and_test(&wq->refs)) {
complete(&wq->done);
return 0;
}
/* if ERROR is set and we get here, we have workers to wake */
if (test_bit(IO_WQ_BIT_ERROR, &wq->state)) {
rcu_read_lock();
for_each_node(node)
io_wq_for_each_worker(wq->wqes[node], io_wq_worker_wake, NULL);
rcu_read_unlock();
}
return 0;
}
static bool io_wq_can_queue(struct io_wqe *wqe, struct io_wqe_acct *acct,
struct io_wq_work *work)
{
bool free_worker;
if (!(work->flags & IO_WQ_WORK_UNBOUND))
return true;
if (atomic_read(&acct->nr_running))
return true;
rcu_read_lock();
free_worker = !hlist_nulls_empty(&wqe->free_list);
rcu_read_unlock();
if (free_worker)
return true;
if (atomic_read(&wqe->wq->user->processes) >= acct->max_workers &&
!(capable(CAP_SYS_RESOURCE) || capable(CAP_SYS_ADMIN)))
return false;
return true;
}
static void io_run_cancel(struct io_wq_work *work, struct io_wqe *wqe) static void io_run_cancel(struct io_wq_work *work, struct io_wqe *wqe)
{ {
struct io_wq *wq = wqe->wq; struct io_wq *wq = wqe->wq;
do { do {
struct io_wq_work *old_work = work;
work->flags |= IO_WQ_WORK_CANCEL; work->flags |= IO_WQ_WORK_CANCEL;
work = wq->do_work(work); wq->do_work(work);
wq->free_work(old_work); work = wq->free_work(work);
} while (work); } while (work);
} }
static void io_wqe_insert_work(struct io_wqe *wqe, struct io_wq_work *work) static void io_wqe_insert_work(struct io_wqe *wqe, struct io_wq_work *work)
{ {
struct io_wqe_acct *acct = io_work_get_acct(wqe, work);
unsigned int hash; unsigned int hash;
struct io_wq_work *tail; struct io_wq_work *tail;
if (!io_wq_is_hashed(work)) { if (!io_wq_is_hashed(work)) {
append: append:
wq_list_add_tail(&work->list, &wqe->work_list); wq_list_add_tail(&work->list, &acct->work_list);
return; return;
} }
...@@ -889,35 +893,62 @@ static void io_wqe_insert_work(struct io_wqe *wqe, struct io_wq_work *work) ...@@ -889,35 +893,62 @@ static void io_wqe_insert_work(struct io_wqe *wqe, struct io_wq_work *work)
if (!tail) if (!tail)
goto append; goto append;
wq_list_add_after(&work->list, &tail->list, &wqe->work_list); wq_list_add_after(&work->list, &tail->list, &acct->work_list);
}
static bool io_wq_work_match_item(struct io_wq_work *work, void *data)
{
return work == data;
} }
static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work) static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work)
{ {
struct io_wqe_acct *acct = io_work_get_acct(wqe, work); struct io_wqe_acct *acct = io_work_get_acct(wqe, work);
bool do_wake; unsigned work_flags = work->flags;
unsigned long flags; bool do_create;
/* /*
* Do early check to see if we need a new unbound worker, and if we do, * If io-wq is exiting for this task, or if the request has explicitly
* if we're allowed to do so. This isn't 100% accurate as there's a * been marked as one that should not get executed, cancel it here.
* gap between this check and incrementing the value, but that's OK.
* It's close enough to not be an issue, fork() has the same delay.
*/ */
if (unlikely(!io_wq_can_queue(wqe, acct, work))) { if (test_bit(IO_WQ_BIT_EXIT, &wqe->wq->state) ||
(work->flags & IO_WQ_WORK_CANCEL)) {
io_run_cancel(work, wqe); io_run_cancel(work, wqe);
return; return;
} }
raw_spin_lock_irqsave(&wqe->lock, flags); raw_spin_lock(&wqe->lock);
io_wqe_insert_work(wqe, work); io_wqe_insert_work(wqe, work);
wqe->flags &= ~IO_WQE_FLAG_STALLED; clear_bit(IO_ACCT_STALLED_BIT, &acct->flags);
do_wake = (work->flags & IO_WQ_WORK_CONCURRENT) ||
!atomic_read(&acct->nr_running);
raw_spin_unlock_irqrestore(&wqe->lock, flags);
if (do_wake) rcu_read_lock();
io_wqe_wake_worker(wqe, acct); do_create = !io_wqe_activate_free_worker(wqe, acct);
rcu_read_unlock();
raw_spin_unlock(&wqe->lock);
if (do_create && ((work_flags & IO_WQ_WORK_CONCURRENT) ||
!atomic_read(&acct->nr_running))) {
bool did_create;
did_create = io_wqe_create_worker(wqe, acct);
if (likely(did_create))
return;
raw_spin_lock(&wqe->lock);
/* fatal condition, failed to create the first worker */
if (!acct->nr_workers) {
struct io_cb_cancel_data match = {
.fn = io_wq_work_match_item,
.data = work,
.cancel_all = false,
};
if (io_acct_cancel_pending_work(wqe, acct, &match))
raw_spin_lock(&wqe->lock);
}
raw_spin_unlock(&wqe->lock);
}
} }
void io_wq_enqueue(struct io_wq *wq, struct io_wq_work *work) void io_wq_enqueue(struct io_wq *wq, struct io_wq_work *work)
...@@ -939,46 +970,21 @@ void io_wq_hash_work(struct io_wq_work *work, void *val) ...@@ -939,46 +970,21 @@ void io_wq_hash_work(struct io_wq_work *work, void *val)
work->flags |= (IO_WQ_WORK_HASHED | (bit << IO_WQ_HASH_SHIFT)); work->flags |= (IO_WQ_WORK_HASHED | (bit << IO_WQ_HASH_SHIFT));
} }
void io_wq_cancel_all(struct io_wq *wq)
{
int node;
set_bit(IO_WQ_BIT_CANCEL, &wq->state);
rcu_read_lock();
for_each_node(node) {
struct io_wqe *wqe = wq->wqes[node];
io_wq_for_each_worker(wqe, io_wqe_worker_send_sig, NULL);
}
rcu_read_unlock();
}
struct io_cb_cancel_data {
work_cancel_fn *fn;
void *data;
int nr_running;
int nr_pending;
bool cancel_all;
};
static bool io_wq_worker_cancel(struct io_worker *worker, void *data) static bool io_wq_worker_cancel(struct io_worker *worker, void *data)
{ {
struct io_cb_cancel_data *match = data; struct io_cb_cancel_data *match = data;
unsigned long flags;
/* /*
* Hold the lock to avoid ->cur_work going out of scope, caller * Hold the lock to avoid ->cur_work going out of scope, caller
* may dereference the passed in work. * may dereference the passed in work.
*/ */
spin_lock_irqsave(&worker->lock, flags); spin_lock(&worker->lock);
if (worker->cur_work && if (worker->cur_work &&
!(worker->cur_work->flags & IO_WQ_WORK_NO_CANCEL) &&
match->fn(worker->cur_work, match->data)) { match->fn(worker->cur_work, match->data)) {
send_sig(SIGINT, worker->task, 1); set_notify_signal(worker->task);
match->nr_running++; match->nr_running++;
} }
spin_unlock_irqrestore(&worker->lock, flags); spin_unlock(&worker->lock);
return match->nr_running && !match->cancel_all; return match->nr_running && !match->cancel_all;
} }
...@@ -987,6 +993,7 @@ static inline void io_wqe_remove_pending(struct io_wqe *wqe, ...@@ -987,6 +993,7 @@ static inline void io_wqe_remove_pending(struct io_wqe *wqe,
struct io_wq_work *work, struct io_wq_work *work,
struct io_wq_work_node *prev) struct io_wq_work_node *prev)
{ {
struct io_wqe_acct *acct = io_work_get_acct(wqe, work);
unsigned int hash = io_get_work_hash(work); unsigned int hash = io_get_work_hash(work);
struct io_wq_work *prev_work = NULL; struct io_wq_work *prev_work = NULL;
...@@ -998,33 +1005,48 @@ static inline void io_wqe_remove_pending(struct io_wqe *wqe, ...@@ -998,33 +1005,48 @@ static inline void io_wqe_remove_pending(struct io_wqe *wqe,
else else
wqe->hash_tail[hash] = NULL; wqe->hash_tail[hash] = NULL;
} }
wq_list_del(&wqe->work_list, &work->list, prev); wq_list_del(&acct->work_list, &work->list, prev);
} }
static void io_wqe_cancel_pending_work(struct io_wqe *wqe, static bool io_acct_cancel_pending_work(struct io_wqe *wqe,
struct io_cb_cancel_data *match) struct io_wqe_acct *acct,
struct io_cb_cancel_data *match)
__releases(wqe->lock)
{ {
struct io_wq_work_node *node, *prev; struct io_wq_work_node *node, *prev;
struct io_wq_work *work; struct io_wq_work *work;
unsigned long flags;
retry: wq_list_for_each(node, prev, &acct->work_list) {
raw_spin_lock_irqsave(&wqe->lock, flags);
wq_list_for_each(node, prev, &wqe->work_list) {
work = container_of(node, struct io_wq_work, list); work = container_of(node, struct io_wq_work, list);
if (!match->fn(work, match->data)) if (!match->fn(work, match->data))
continue; continue;
io_wqe_remove_pending(wqe, work, prev); io_wqe_remove_pending(wqe, work, prev);
raw_spin_unlock_irqrestore(&wqe->lock, flags); raw_spin_unlock(&wqe->lock);
io_run_cancel(work, wqe); io_run_cancel(work, wqe);
match->nr_pending++; match->nr_pending++;
if (!match->cancel_all)
return;
/* not safe to continue after unlock */ /* not safe to continue after unlock */
goto retry; return true;
}
return false;
}
static void io_wqe_cancel_pending_work(struct io_wqe *wqe,
struct io_cb_cancel_data *match)
{
int i;
retry:
raw_spin_lock(&wqe->lock);
for (i = 0; i < IO_WQ_ACCT_NR; i++) {
struct io_wqe_acct *acct = io_get_acct(wqe, i == 0);
if (io_acct_cancel_pending_work(wqe, acct, match)) {
if (match->cancel_all)
goto retry;
return;
}
} }
raw_spin_unlock_irqrestore(&wqe->lock, flags); raw_spin_unlock(&wqe->lock);
} }
static void io_wqe_cancel_running_work(struct io_wqe *wqe, static void io_wqe_cancel_running_work(struct io_wqe *wqe,
...@@ -1079,9 +1101,28 @@ enum io_wq_cancel io_wq_cancel_cb(struct io_wq *wq, work_cancel_fn *cancel, ...@@ -1079,9 +1101,28 @@ enum io_wq_cancel io_wq_cancel_cb(struct io_wq *wq, work_cancel_fn *cancel,
return IO_WQ_CANCEL_NOTFOUND; return IO_WQ_CANCEL_NOTFOUND;
} }
static int io_wqe_hash_wake(struct wait_queue_entry *wait, unsigned mode,
int sync, void *key)
{
struct io_wqe *wqe = container_of(wait, struct io_wqe, wait);
int i;
list_del_init(&wait->entry);
rcu_read_lock();
for (i = 0; i < IO_WQ_ACCT_NR; i++) {
struct io_wqe_acct *acct = &wqe->acct[i];
if (test_and_clear_bit(IO_ACCT_STALLED_BIT, &acct->flags))
io_wqe_activate_free_worker(wqe, acct);
}
rcu_read_unlock();
return 1;
}
struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data) struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data)
{ {
int ret = -ENOMEM, node; int ret, node, i;
struct io_wq *wq; struct io_wq *wq;
if (WARN_ON_ONCE(!data->free_work || !data->do_work)) if (WARN_ON_ONCE(!data->free_work || !data->do_work))
...@@ -1089,24 +1130,18 @@ struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data) ...@@ -1089,24 +1130,18 @@ struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data)
if (WARN_ON_ONCE(!bounded)) if (WARN_ON_ONCE(!bounded))
return ERR_PTR(-EINVAL); return ERR_PTR(-EINVAL);
wq = kzalloc(sizeof(*wq), GFP_KERNEL); wq = kzalloc(struct_size(wq, wqes, nr_node_ids), GFP_KERNEL);
if (!wq) if (!wq)
return ERR_PTR(-ENOMEM); return ERR_PTR(-ENOMEM);
wq->wqes = kcalloc(nr_node_ids, sizeof(struct io_wqe *), GFP_KERNEL);
if (!wq->wqes)
goto err_wq;
ret = cpuhp_state_add_instance_nocalls(io_wq_online, &wq->cpuhp_node); ret = cpuhp_state_add_instance_nocalls(io_wq_online, &wq->cpuhp_node);
if (ret) if (ret)
goto err_wqes; goto err_wq;
refcount_inc(&data->hash->refs);
wq->hash = data->hash;
wq->free_work = data->free_work; wq->free_work = data->free_work;
wq->do_work = data->do_work; wq->do_work = data->do_work;
/* caller must already hold a reference to this */
wq->user = data->user;
ret = -ENOMEM; ret = -ENOMEM;
for_each_node(node) { for_each_node(node) {
struct io_wqe *wqe; struct io_wqe *wqe;
...@@ -1118,113 +1153,234 @@ struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data) ...@@ -1118,113 +1153,234 @@ struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data)
if (!wqe) if (!wqe)
goto err; goto err;
wq->wqes[node] = wqe; wq->wqes[node] = wqe;
if (!alloc_cpumask_var(&wqe->cpu_mask, GFP_KERNEL))
goto err;
cpumask_copy(wqe->cpu_mask, cpumask_of_node(node));
wqe->node = alloc_node; wqe->node = alloc_node;
wqe->acct[IO_WQ_ACCT_BOUND].max_workers = bounded; wqe->acct[IO_WQ_ACCT_BOUND].max_workers = bounded;
atomic_set(&wqe->acct[IO_WQ_ACCT_BOUND].nr_running, 0); wqe->acct[IO_WQ_ACCT_UNBOUND].max_workers =
if (wq->user) {
wqe->acct[IO_WQ_ACCT_UNBOUND].max_workers =
task_rlimit(current, RLIMIT_NPROC); task_rlimit(current, RLIMIT_NPROC);
INIT_LIST_HEAD(&wqe->wait.entry);
wqe->wait.func = io_wqe_hash_wake;
for (i = 0; i < IO_WQ_ACCT_NR; i++) {
struct io_wqe_acct *acct = &wqe->acct[i];
acct->index = i;
atomic_set(&acct->nr_running, 0);
INIT_WQ_LIST(&acct->work_list);
} }
atomic_set(&wqe->acct[IO_WQ_ACCT_UNBOUND].nr_running, 0);
wqe->wq = wq; wqe->wq = wq;
raw_spin_lock_init(&wqe->lock); raw_spin_lock_init(&wqe->lock);
INIT_WQ_LIST(&wqe->work_list);
INIT_HLIST_NULLS_HEAD(&wqe->free_list, 0); INIT_HLIST_NULLS_HEAD(&wqe->free_list, 0);
INIT_LIST_HEAD(&wqe->all_list); INIT_LIST_HEAD(&wqe->all_list);
} }
init_completion(&wq->done); wq->task = get_task_struct(data->task);
atomic_set(&wq->worker_refs, 1);
wq->manager = kthread_create(io_wq_manager, wq, "io_wq_manager"); init_completion(&wq->worker_done);
if (!IS_ERR(wq->manager)) { return wq;
wake_up_process(wq->manager);
wait_for_completion(&wq->done);
if (test_bit(IO_WQ_BIT_ERROR, &wq->state)) {
ret = -ENOMEM;
goto err;
}
refcount_set(&wq->use_refs, 1);
reinit_completion(&wq->done);
return wq;
}
ret = PTR_ERR(wq->manager);
complete(&wq->done);
err: err:
io_wq_put_hash(data->hash);
cpuhp_state_remove_instance_nocalls(io_wq_online, &wq->cpuhp_node); cpuhp_state_remove_instance_nocalls(io_wq_online, &wq->cpuhp_node);
for_each_node(node) for_each_node(node) {
if (!wq->wqes[node])
continue;
free_cpumask_var(wq->wqes[node]->cpu_mask);
kfree(wq->wqes[node]); kfree(wq->wqes[node]);
err_wqes: }
kfree(wq->wqes);
err_wq: err_wq:
kfree(wq); kfree(wq);
return ERR_PTR(ret); return ERR_PTR(ret);
} }
bool io_wq_get(struct io_wq *wq, struct io_wq_data *data) static bool io_task_work_match(struct callback_head *cb, void *data)
{ {
if (data->free_work != wq->free_work || data->do_work != wq->do_work) struct io_worker *worker;
if (cb->func != create_worker_cb && cb->func != create_worker_cont)
return false; return false;
worker = container_of(cb, struct io_worker, create_work);
return worker->wqe->wq == data;
}
void io_wq_exit_start(struct io_wq *wq)
{
set_bit(IO_WQ_BIT_EXIT, &wq->state);
}
static void io_wq_cancel_tw_create(struct io_wq *wq)
{
struct callback_head *cb;
return refcount_inc_not_zero(&wq->use_refs); while ((cb = task_work_cancel_match(wq->task, io_task_work_match, wq)) != NULL) {
struct io_worker *worker;
worker = container_of(cb, struct io_worker, create_work);
io_worker_cancel_cb(worker);
}
} }
static void __io_wq_destroy(struct io_wq *wq) static void io_wq_exit_workers(struct io_wq *wq)
{ {
int node; int node;
cpuhp_state_remove_instance_nocalls(io_wq_online, &wq->cpuhp_node); if (!wq->task)
return;
set_bit(IO_WQ_BIT_EXIT, &wq->state); io_wq_cancel_tw_create(wq);
if (wq->manager)
kthread_stop(wq->manager);
rcu_read_lock(); rcu_read_lock();
for_each_node(node) for_each_node(node) {
io_wq_for_each_worker(wq->wqes[node], io_wq_worker_wake, NULL); struct io_wqe *wqe = wq->wqes[node];
rcu_read_unlock();
wait_for_completion(&wq->done); io_wq_for_each_worker(wqe, io_wq_worker_wake, NULL);
}
rcu_read_unlock();
io_worker_ref_put(wq);
wait_for_completion(&wq->worker_done);
for_each_node(node) for_each_node(node) {
kfree(wq->wqes[node]); spin_lock_irq(&wq->hash->wait.lock);
kfree(wq->wqes); list_del_init(&wq->wqes[node]->wait.entry);
kfree(wq); spin_unlock_irq(&wq->hash->wait.lock);
}
put_task_struct(wq->task);
wq->task = NULL;
} }
void io_wq_destroy(struct io_wq *wq) static void io_wq_destroy(struct io_wq *wq)
{ {
if (refcount_dec_and_test(&wq->use_refs)) int node;
__io_wq_destroy(wq);
cpuhp_state_remove_instance_nocalls(io_wq_online, &wq->cpuhp_node);
for_each_node(node) {
struct io_wqe *wqe = wq->wqes[node];
struct io_cb_cancel_data match = {
.fn = io_wq_work_match_all,
.cancel_all = true,
};
io_wqe_cancel_pending_work(wqe, &match);
free_cpumask_var(wqe->cpu_mask);
kfree(wqe);
}
io_wq_put_hash(wq->hash);
kfree(wq);
} }
struct task_struct *io_wq_get_task(struct io_wq *wq) void io_wq_put_and_exit(struct io_wq *wq)
{ {
return wq->manager; WARN_ON_ONCE(!test_bit(IO_WQ_BIT_EXIT, &wq->state));
io_wq_exit_workers(wq);
io_wq_destroy(wq);
} }
struct online_data {
unsigned int cpu;
bool online;
};
static bool io_wq_worker_affinity(struct io_worker *worker, void *data) static bool io_wq_worker_affinity(struct io_worker *worker, void *data)
{ {
struct task_struct *task = worker->task; struct online_data *od = data;
struct rq_flags rf;
struct rq *rq; if (od->online)
cpumask_set_cpu(od->cpu, worker->wqe->cpu_mask);
rq = task_rq_lock(task, &rf); else
do_set_cpus_allowed(task, cpumask_of_node(worker->wqe->node)); cpumask_clear_cpu(od->cpu, worker->wqe->cpu_mask);
task->flags |= PF_NO_SETAFFINITY;
task_rq_unlock(rq, task, &rf);
return false; return false;
} }
static int __io_wq_cpu_online(struct io_wq *wq, unsigned int cpu, bool online)
{
struct online_data od = {
.cpu = cpu,
.online = online
};
int i;
rcu_read_lock();
for_each_node(i)
io_wq_for_each_worker(wq->wqes[i], io_wq_worker_affinity, &od);
rcu_read_unlock();
return 0;
}
static int io_wq_cpu_online(unsigned int cpu, struct hlist_node *node) static int io_wq_cpu_online(unsigned int cpu, struct hlist_node *node)
{ {
struct io_wq *wq = hlist_entry_safe(node, struct io_wq, cpuhp_node); struct io_wq *wq = hlist_entry_safe(node, struct io_wq, cpuhp_node);
return __io_wq_cpu_online(wq, cpu, true);
}
static int io_wq_cpu_offline(unsigned int cpu, struct hlist_node *node)
{
struct io_wq *wq = hlist_entry_safe(node, struct io_wq, cpuhp_node);
return __io_wq_cpu_online(wq, cpu, false);
}
int io_wq_cpu_affinity(struct io_wq *wq, cpumask_var_t mask)
{
int i; int i;
rcu_read_lock(); rcu_read_lock();
for_each_node(i) for_each_node(i) {
io_wq_for_each_worker(wq->wqes[i], io_wq_worker_affinity, NULL); struct io_wqe *wqe = wq->wqes[i];
if (mask)
cpumask_copy(wqe->cpu_mask, mask);
else
cpumask_copy(wqe->cpu_mask, cpumask_of_node(i));
}
rcu_read_unlock();
return 0;
}
/*
* Set max number of unbounded workers, returns old value. If new_count is 0,
* then just return the old value.
*/
int io_wq_max_workers(struct io_wq *wq, int *new_count)
{
int prev[IO_WQ_ACCT_NR];
bool first_node = true;
int i, node;
BUILD_BUG_ON((int) IO_WQ_ACCT_BOUND != (int) IO_WQ_BOUND);
BUILD_BUG_ON((int) IO_WQ_ACCT_UNBOUND != (int) IO_WQ_UNBOUND);
BUILD_BUG_ON((int) IO_WQ_ACCT_NR != 2);
for (i = 0; i < 2; i++) {
if (new_count[i] > task_rlimit(current, RLIMIT_NPROC))
new_count[i] = task_rlimit(current, RLIMIT_NPROC);
}
for (i = 0; i < IO_WQ_ACCT_NR; i++)
prev[i] = 0;
rcu_read_lock();
for_each_node(node) {
struct io_wqe *wqe = wq->wqes[node];
struct io_wqe_acct *acct;
raw_spin_lock(&wqe->lock);
for (i = 0; i < IO_WQ_ACCT_NR; i++) {
acct = &wqe->acct[i];
if (first_node)
prev[i] = max_t(int, acct->max_workers, prev[i]);
if (new_count[i])
acct->max_workers = new_count[i];
}
raw_spin_unlock(&wqe->lock);
first_node = false;
}
rcu_read_unlock(); rcu_read_unlock();
for (i = 0; i < IO_WQ_ACCT_NR; i++)
new_count[i] = prev[i];
return 0; return 0;
} }
...@@ -1233,7 +1389,7 @@ static __init int io_wq_init(void) ...@@ -1233,7 +1389,7 @@ static __init int io_wq_init(void)
int ret; int ret;
ret = cpuhp_setup_state_multi(CPUHP_AP_ONLINE_DYN, "io-wq/online", ret = cpuhp_setup_state_multi(CPUHP_AP_ONLINE_DYN, "io-wq/online",
io_wq_cpu_online, NULL); io_wq_cpu_online, io_wq_cpu_offline);
if (ret < 0) if (ret < 0)
return ret; return ret;
io_wq_online = ret; io_wq_online = ret;
......
#ifndef INTERNAL_IO_WQ_H #ifndef INTERNAL_IO_WQ_H
#define INTERNAL_IO_WQ_H #define INTERNAL_IO_WQ_H
#include <linux/io_uring.h> #include <linux/refcount.h>
struct io_wq; struct io_wq;
...@@ -9,16 +9,8 @@ enum { ...@@ -9,16 +9,8 @@ enum {
IO_WQ_WORK_CANCEL = 1, IO_WQ_WORK_CANCEL = 1,
IO_WQ_WORK_HASHED = 2, IO_WQ_WORK_HASHED = 2,
IO_WQ_WORK_UNBOUND = 4, IO_WQ_WORK_UNBOUND = 4,
IO_WQ_WORK_NO_CANCEL = 8,
IO_WQ_WORK_CONCURRENT = 16, IO_WQ_WORK_CONCURRENT = 16,
IO_WQ_WORK_FILES = 32,
IO_WQ_WORK_FS = 64,
IO_WQ_WORK_MM = 128,
IO_WQ_WORK_CREDS = 256,
IO_WQ_WORK_BLKCG = 512,
IO_WQ_WORK_FSIZE = 1024,
IO_WQ_HASH_SHIFT = 24, /* upper 8 bits are used for hash key */ IO_WQ_HASH_SHIFT = 24, /* upper 8 bits are used for hash key */
}; };
...@@ -52,6 +44,7 @@ static inline void wq_list_add_after(struct io_wq_work_node *node, ...@@ -52,6 +44,7 @@ static inline void wq_list_add_after(struct io_wq_work_node *node,
static inline void wq_list_add_tail(struct io_wq_work_node *node, static inline void wq_list_add_tail(struct io_wq_work_node *node,
struct io_wq_work_list *list) struct io_wq_work_list *list)
{ {
node->next = NULL;
if (!list->first) { if (!list->first) {
list->last = node; list->last = node;
WRITE_ONCE(list->first, node); WRITE_ONCE(list->first, node);
...@@ -59,7 +52,6 @@ static inline void wq_list_add_tail(struct io_wq_work_node *node, ...@@ -59,7 +52,6 @@ static inline void wq_list_add_tail(struct io_wq_work_node *node,
list->last->next = node; list->last->next = node;
list->last = node; list->last = node;
} }
node->next = NULL;
} }
static inline void wq_list_cut(struct io_wq_work_list *list, static inline void wq_list_cut(struct io_wq_work_list *list,
...@@ -95,7 +87,6 @@ static inline void wq_list_del(struct io_wq_work_list *list, ...@@ -95,7 +87,6 @@ static inline void wq_list_del(struct io_wq_work_list *list,
struct io_wq_work { struct io_wq_work {
struct io_wq_work_node list; struct io_wq_work_node list;
struct io_identity *identity;
unsigned flags; unsigned flags;
}; };
...@@ -107,37 +98,48 @@ static inline struct io_wq_work *wq_next_work(struct io_wq_work *work) ...@@ -107,37 +98,48 @@ static inline struct io_wq_work *wq_next_work(struct io_wq_work *work)
return container_of(work->list.next, struct io_wq_work, list); return container_of(work->list.next, struct io_wq_work, list);
} }
typedef void (free_work_fn)(struct io_wq_work *); typedef struct io_wq_work *(free_work_fn)(struct io_wq_work *);
typedef struct io_wq_work *(io_wq_work_fn)(struct io_wq_work *); typedef void (io_wq_work_fn)(struct io_wq_work *);
struct io_wq_data { struct io_wq_hash {
struct user_struct *user; refcount_t refs;
unsigned long map;
struct wait_queue_head wait;
};
static inline void io_wq_put_hash(struct io_wq_hash *hash)
{
if (refcount_dec_and_test(&hash->refs))
kfree(hash);
}
struct io_wq_data {
struct io_wq_hash *hash;
struct task_struct *task;
io_wq_work_fn *do_work; io_wq_work_fn *do_work;
free_work_fn *free_work; free_work_fn *free_work;
}; };
struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data); struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data);
bool io_wq_get(struct io_wq *wq, struct io_wq_data *data); void io_wq_exit_start(struct io_wq *wq);
void io_wq_destroy(struct io_wq *wq); void io_wq_put_and_exit(struct io_wq *wq);
void io_wq_enqueue(struct io_wq *wq, struct io_wq_work *work); void io_wq_enqueue(struct io_wq *wq, struct io_wq_work *work);
void io_wq_hash_work(struct io_wq_work *work, void *val); void io_wq_hash_work(struct io_wq_work *work, void *val);
int io_wq_cpu_affinity(struct io_wq *wq, cpumask_var_t mask);
int io_wq_max_workers(struct io_wq *wq, int *new_count);
static inline bool io_wq_is_hashed(struct io_wq_work *work) static inline bool io_wq_is_hashed(struct io_wq_work *work)
{ {
return work->flags & IO_WQ_WORK_HASHED; return work->flags & IO_WQ_WORK_HASHED;
} }
void io_wq_cancel_all(struct io_wq *wq);
typedef bool (work_cancel_fn)(struct io_wq_work *, void *); typedef bool (work_cancel_fn)(struct io_wq_work *, void *);
enum io_wq_cancel io_wq_cancel_cb(struct io_wq *wq, work_cancel_fn *cancel, enum io_wq_cancel io_wq_cancel_cb(struct io_wq *wq, work_cancel_fn *cancel,
void *data, bool cancel_all); void *data, bool cancel_all);
struct task_struct *io_wq_get_task(struct io_wq *wq);
#if defined(CONFIG_IO_WQ) #if defined(CONFIG_IO_WQ)
extern void io_wq_worker_sleeping(struct task_struct *); extern void io_wq_worker_sleeping(struct task_struct *);
extern void io_wq_worker_running(struct task_struct *); extern void io_wq_worker_running(struct task_struct *);
...@@ -152,6 +154,7 @@ static inline void io_wq_worker_running(struct task_struct *tsk) ...@@ -152,6 +154,7 @@ static inline void io_wq_worker_running(struct task_struct *tsk)
static inline bool io_wq_current_is_worker(void) static inline bool io_wq_current_is_worker(void)
{ {
return in_task() && (current->flags & PF_IO_WORKER); return in_task() && (current->flags & PF_IO_WORKER) &&
current->pf_io_worker;
} }
#endif #endif
因为 它太大了无法显示 source diff 。你可以改为 查看blob
...@@ -764,7 +764,7 @@ void __noreturn do_exit(long code) ...@@ -764,7 +764,7 @@ void __noreturn do_exit(long code)
schedule(); schedule();
} }
io_uring_files_cancel(tsk->files); io_uring_files_cancel();
exit_signals(tsk); /* sets PF_EXITING */ exit_signals(tsk); /* sets PF_EXITING */
/* sync mm's RSS info before statistics gathering */ /* sync mm's RSS info before statistics gathering */
......
...@@ -950,6 +950,7 @@ static struct task_struct *dup_task_struct(struct task_struct *orig, int node) ...@@ -950,6 +950,7 @@ static struct task_struct *dup_task_struct(struct task_struct *orig, int node)
tsk->splice_pipe = NULL; tsk->splice_pipe = NULL;
tsk->task_frag.page = NULL; tsk->task_frag.page = NULL;
tsk->wake_q.next = NULL; tsk->wake_q.next = NULL;
tsk->pf_io_worker = NULL;
account_kernel_stack(tsk, 1); account_kernel_stack(tsk, 1);
......
...@@ -21,7 +21,7 @@ ...@@ -21,7 +21,7 @@
#include <asm/tlb.h> #include <asm/tlb.h>
#include "../workqueue_internal.h" #include "../workqueue_internal.h"
#include "../../fs/io-wq.h" #include "../../io_uring/io-wq.h"
#include "../smpboot.h" #include "../smpboot.h"
#include "pelt.h" #include "pelt.h"
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册