io-wq.c 29.1 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
// SPDX-License-Identifier: GPL-2.0
/*
 * Basic worker thread pool for io_uring
 *
 * Copyright (C) 2019 Jens Axboe
 *
 */
#include <linux/kernel.h>
#include <linux/init.h>
#include <linux/errno.h>
#include <linux/sched/signal.h>
#include <linux/mm.h>
#include <linux/sched/mm.h>
#include <linux/percpu.h>
#include <linux/slab.h>
#include <linux/kthread.h>
#include <linux/rculist_nulls.h>
18
#include <linux/fs_struct.h>
19
#include <linux/task_work.h>
20
#include <linux/blk-cgroup.h>
21
#include <linux/audit.h>
22 23 24 25 26 27 28 29 30

#include "io-wq.h"

#define WORKER_IDLE_TIMEOUT	(5 * HZ)

enum {
	IO_WORKER_F_UP		= 1,	/* up and active */
	IO_WORKER_F_RUNNING	= 2,	/* account as running */
	IO_WORKER_F_FREE	= 4,	/* worker on free list */
31 32
	IO_WORKER_F_FIXED	= 8,	/* static idle worker */
	IO_WORKER_F_BOUND	= 16,	/* is doing bounded work */
33 34 35 36 37
};

enum {
	IO_WQ_BIT_EXIT		= 0,	/* wq exiting */
	IO_WQ_BIT_CANCEL	= 1,	/* cancel work on list */
38
	IO_WQ_BIT_ERROR		= 2,	/* error on setup */
39 40 41 42 43 44 45 46 47 48 49 50 51
};

enum {
	IO_WQE_FLAG_STALLED	= 1,	/* stalled on hash */
};

/*
 * One for each thread in a wqe pool
 */
struct io_worker {
	refcount_t ref;
	unsigned flags;
	struct hlist_nulls_node nulls_node;
52
	struct list_head all_list;
53 54
	struct task_struct *task;
	struct io_wqe *wqe;
55

56
	struct io_wq_work *cur_work;
57
	spinlock_t lock;
58 59 60

	struct rcu_head rcu;
	struct mm_struct *mm;
61 62 63
#ifdef CONFIG_BLK_CGROUP
	struct cgroup_subsys_state *blkcg_css;
#endif
64 65
	const struct cred *cur_creds;
	const struct cred *saved_creds;
66
	struct files_struct *restore_files;
67
	struct nsproxy *restore_nsproxy;
68
	struct fs_struct *restore_fs;
69 70 71 72 73 74 75 76
};

#if BITS_PER_LONG == 64
#define IO_WQ_HASH_ORDER	6
#else
#define IO_WQ_HASH_ORDER	5
#endif

77 78
#define IO_WQ_NR_HASH_BUCKETS	(1u << IO_WQ_HASH_ORDER)

79 80 81 82 83 84 85 86 87 88 89
struct io_wqe_acct {
	unsigned nr_workers;
	unsigned max_workers;
	atomic_t nr_running;
};

enum {
	IO_WQ_ACCT_BOUND,
	IO_WQ_ACCT_UNBOUND,
};

90 91 92 93 94
/*
 * Per-node worker thread pool
 */
struct io_wqe {
	struct {
95
		raw_spinlock_t lock;
J
Jens Axboe 已提交
96
		struct io_wq_work_list work_list;
97 98 99 100 101
		unsigned long hash_map;
		unsigned flags;
	} ____cacheline_aligned_in_smp;

	int node;
102
	struct io_wqe_acct acct[2];
103

104
	struct hlist_nulls_head free_list;
105
	struct list_head all_list;
106 107

	struct io_wq *wq;
108
	struct io_wq_work *hash_tail[IO_WQ_NR_HASH_BUCKETS];
109 110 111 112 113 114 115 116 117
};

/*
 * Per io_wq state
  */
struct io_wq {
	struct io_wqe **wqes;
	unsigned long state;

118
	free_work_fn *free_work;
119
	io_wq_work_fn *do_work;
120

121
	struct task_struct *manager;
122
	struct user_struct *user;
123 124
	refcount_t refs;
	struct completion done;
J
Jens Axboe 已提交
125 126

	refcount_t use_refs;
127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146
};

static bool io_worker_get(struct io_worker *worker)
{
	return refcount_inc_not_zero(&worker->ref);
}

static void io_worker_release(struct io_worker *worker)
{
	if (refcount_dec_and_test(&worker->ref))
		wake_up_process(worker->task);
}

/*
 * Note: drops the wqe->lock if returning true! The caller must re-acquire
 * the lock in that case. Some callers need to restart handling if this
 * happens, so we can't just re-acquire the lock on behalf of the caller.
 */
static bool __io_worker_unuse(struct io_wqe *wqe, struct io_worker *worker)
{
147 148
	bool dropped_lock = false;

149 150 151
	if (worker->saved_creds) {
		revert_creds(worker->saved_creds);
		worker->cur_creds = worker->saved_creds = NULL;
152 153
	}

154 155
	if (current->files != worker->restore_files) {
		__acquire(&wqe->lock);
156
		raw_spin_unlock_irq(&wqe->lock);
157 158 159 160
		dropped_lock = true;

		task_lock(current);
		current->files = worker->restore_files;
161
		current->nsproxy = worker->restore_nsproxy;
162 163 164
		task_unlock(current);
	}

165 166 167
	if (current->fs != worker->restore_fs)
		current->fs = worker->restore_fs;

168 169 170 171 172
	/*
	 * If we have an active mm, we need to drop the wq lock before unusing
	 * it. If we do, return true and let the caller retry the idle loop.
	 */
	if (worker->mm) {
173 174
		if (!dropped_lock) {
			__acquire(&wqe->lock);
175
			raw_spin_unlock_irq(&wqe->lock);
176 177
			dropped_lock = true;
		}
178
		__set_current_state(TASK_RUNNING);
179
		kthread_unuse_mm(worker->mm);
180 181 182 183
		mmput(worker->mm);
		worker->mm = NULL;
	}

184 185 186 187 188 189
#ifdef CONFIG_BLK_CGROUP
	if (worker->blkcg_css) {
		kthread_associate_blkcg(NULL);
		worker->blkcg_css = NULL;
	}
#endif
190 191
	if (current->signal->rlim[RLIMIT_FSIZE].rlim_cur != RLIM_INFINITY)
		current->signal->rlim[RLIMIT_FSIZE].rlim_cur = RLIM_INFINITY;
192
	return dropped_lock;
193 194
}

195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212
static inline struct io_wqe_acct *io_work_get_acct(struct io_wqe *wqe,
						   struct io_wq_work *work)
{
	if (work->flags & IO_WQ_WORK_UNBOUND)
		return &wqe->acct[IO_WQ_ACCT_UNBOUND];

	return &wqe->acct[IO_WQ_ACCT_BOUND];
}

static inline struct io_wqe_acct *io_wqe_get_acct(struct io_wqe *wqe,
						  struct io_worker *worker)
{
	if (worker->flags & IO_WORKER_F_BOUND)
		return &wqe->acct[IO_WQ_ACCT_BOUND];

	return &wqe->acct[IO_WQ_ACCT_UNBOUND];
}

213 214 215
static void io_worker_exit(struct io_worker *worker)
{
	struct io_wqe *wqe = worker->wqe;
216
	struct io_wqe_acct *acct = io_wqe_get_acct(wqe, worker);
217 218 219 220 221 222 223 224 225 226 227 228 229

	/*
	 * If we're not at zero, someone else is holding a brief reference
	 * to the worker. Wait for that to go away.
	 */
	set_current_state(TASK_INTERRUPTIBLE);
	if (!refcount_dec_and_test(&worker->ref))
		schedule();
	__set_current_state(TASK_RUNNING);

	preempt_disable();
	current->flags &= ~PF_IO_WORKER;
	if (worker->flags & IO_WORKER_F_RUNNING)
230 231 232
		atomic_dec(&acct->nr_running);
	if (!(worker->flags & IO_WORKER_F_BOUND))
		atomic_dec(&wqe->wq->user->processes);
233 234 235
	worker->flags = 0;
	preempt_enable();

236
	raw_spin_lock_irq(&wqe->lock);
237
	hlist_nulls_del_rcu(&worker->nulls_node);
238
	list_del_rcu(&worker->all_list);
239 240
	if (__io_worker_unuse(wqe, worker)) {
		__release(&wqe->lock);
241
		raw_spin_lock_irq(&wqe->lock);
242
	}
243
	acct->nr_workers--;
244
	raw_spin_unlock_irq(&wqe->lock);
245

246
	kfree_rcu(worker, rcu);
247 248
	if (refcount_dec_and_test(&wqe->wq->refs))
		complete(&wqe->wq->done);
249 250
}

251 252 253
static inline bool io_wqe_run_queue(struct io_wqe *wqe)
	__must_hold(wqe->lock)
{
J
Jens Axboe 已提交
254 255
	if (!wq_list_empty(&wqe->work_list) &&
	    !(wqe->flags & IO_WQE_FLAG_STALLED))
256 257 258 259 260 261 262 263 264 265 266 267 268 269
		return true;
	return false;
}

/*
 * Check head of free list for an available worker. If one isn't available,
 * caller must wake up the wq manager to create one.
 */
static bool io_wqe_activate_free_worker(struct io_wqe *wqe)
	__must_hold(RCU)
{
	struct hlist_nulls_node *n;
	struct io_worker *worker;

270
	n = rcu_dereference(hlist_nulls_first_rcu(&wqe->free_list));
271 272 273 274 275
	if (is_a_nulls(n))
		return false;

	worker = hlist_nulls_entry(n, struct io_worker, nulls_node);
	if (io_worker_get(worker)) {
J
Jens Axboe 已提交
276
		wake_up_process(worker->task);
277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321
		io_worker_release(worker);
		return true;
	}

	return false;
}

/*
 * We need a worker. If we find a free one, we're good. If not, and we're
 * below the max number of workers, wake up the manager to create one.
 */
static void io_wqe_wake_worker(struct io_wqe *wqe, struct io_wqe_acct *acct)
{
	bool ret;

	/*
	 * Most likely an attempt to queue unbounded work on an io_wq that
	 * wasn't setup with any unbounded workers.
	 */
	WARN_ON_ONCE(!acct->max_workers);

	rcu_read_lock();
	ret = io_wqe_activate_free_worker(wqe);
	rcu_read_unlock();

	if (!ret && acct->nr_workers < acct->max_workers)
		wake_up_process(wqe->wq->manager);
}

static void io_wqe_inc_running(struct io_wqe *wqe, struct io_worker *worker)
{
	struct io_wqe_acct *acct = io_wqe_get_acct(wqe, worker);

	atomic_inc(&acct->nr_running);
}

static void io_wqe_dec_running(struct io_wqe *wqe, struct io_worker *worker)
	__must_hold(wqe->lock)
{
	struct io_wqe_acct *acct = io_wqe_get_acct(wqe, worker);

	if (atomic_dec_and_test(&acct->nr_running) && io_wqe_run_queue(wqe))
		io_wqe_wake_worker(wqe, acct);
}

322 323 324 325 326 327 328
static void io_worker_start(struct io_wqe *wqe, struct io_worker *worker)
{
	allow_kernel_signal(SIGINT);

	current->flags |= PF_IO_WORKER;

	worker->flags |= (IO_WORKER_F_UP | IO_WORKER_F_RUNNING);
329
	worker->restore_files = current->files;
330
	worker->restore_nsproxy = current->nsproxy;
331
	worker->restore_fs = current->fs;
332
	io_wqe_inc_running(wqe, worker);
333 334 335 336 337 338 339 340 341 342
}

/*
 * Worker will start processing some work. Move it to the busy list, if
 * it's currently on the freelist
 */
static void __io_worker_busy(struct io_wqe *wqe, struct io_worker *worker,
			     struct io_wq_work *work)
	__must_hold(wqe->lock)
{
343 344
	bool worker_bound, work_bound;

345 346 347 348
	if (worker->flags & IO_WORKER_F_FREE) {
		worker->flags &= ~IO_WORKER_F_FREE;
		hlist_nulls_del_init_rcu(&worker->nulls_node);
	}
349 350 351 352 353

	/*
	 * If worker is moving from bound to unbound (or vice versa), then
	 * ensure we update the running accounting.
	 */
354 355 356
	worker_bound = (worker->flags & IO_WORKER_F_BOUND) != 0;
	work_bound = (work->flags & IO_WQ_WORK_UNBOUND) == 0;
	if (worker_bound != work_bound) {
357 358 359 360 361 362 363 364 365 366 367 368 369 370
		io_wqe_dec_running(wqe, worker);
		if (work_bound) {
			worker->flags |= IO_WORKER_F_BOUND;
			wqe->acct[IO_WQ_ACCT_UNBOUND].nr_workers--;
			wqe->acct[IO_WQ_ACCT_BOUND].nr_workers++;
			atomic_dec(&wqe->wq->user->processes);
		} else {
			worker->flags &= ~IO_WORKER_F_BOUND;
			wqe->acct[IO_WQ_ACCT_UNBOUND].nr_workers++;
			wqe->acct[IO_WQ_ACCT_BOUND].nr_workers--;
			atomic_inc(&wqe->wq->user->processes);
		}
		io_wqe_inc_running(wqe, worker);
	 }
371 372 373 374 375 376 377 378 379 380 381 382 383 384
}

/*
 * No work, worker going to sleep. Move to freelist, and unuse mm if we
 * have one attached. Dropping the mm may potentially sleep, so we drop
 * the lock in that case and return success. Since the caller has to
 * retry the loop in that case (we changed task state), we don't regrab
 * the lock if we return success.
 */
static bool __io_worker_idle(struct io_wqe *wqe, struct io_worker *worker)
	__must_hold(wqe->lock)
{
	if (!(worker->flags & IO_WORKER_F_FREE)) {
		worker->flags |= IO_WORKER_F_FREE;
385
		hlist_nulls_add_head_rcu(&worker->nulls_node, &wqe->free_list);
386 387 388 389 390
	}

	return __io_worker_unuse(wqe, worker);
}

P
Pavel Begunkov 已提交
391 392 393 394 395 396
static inline unsigned int io_get_work_hash(struct io_wq_work *work)
{
	return work->flags >> IO_WQ_HASH_SHIFT;
}

static struct io_wq_work *io_get_next_work(struct io_wqe *wqe)
397 398
	__must_hold(wqe->lock)
{
J
Jens Axboe 已提交
399
	struct io_wq_work_node *node, *prev;
400
	struct io_wq_work *work, *tail;
P
Pavel Begunkov 已提交
401
	unsigned int hash;
402

J
Jens Axboe 已提交
403 404 405
	wq_list_for_each(node, prev, &wqe->work_list) {
		work = container_of(node, struct io_wq_work, list);

406
		/* not hashed, can run anytime */
407
		if (!io_wq_is_hashed(work)) {
408
			wq_list_del(&wqe->work_list, node, prev);
409 410 411 412
			return work;
		}

		/* hashed, can run if not already running */
P
Pavel Begunkov 已提交
413 414 415
		hash = io_get_work_hash(work);
		if (!(wqe->hash_map & BIT(hash))) {
			wqe->hash_map |= BIT(hash);
416 417 418 419
			/* all items with this hash lie in [work, tail] */
			tail = wqe->hash_tail[hash];
			wqe->hash_tail[hash] = NULL;
			wq_list_cut(&wqe->work_list, &tail->list, prev);
420 421 422 423 424 425 426
			return work;
		}
	}

	return NULL;
}

427 428 429
static void io_wq_switch_mm(struct io_worker *worker, struct io_wq_work *work)
{
	if (worker->mm) {
430
		kthread_unuse_mm(worker->mm);
431 432 433
		mmput(worker->mm);
		worker->mm = NULL;
	}
434

435 436 437
	if (mmget_not_zero(work->identity->mm)) {
		kthread_use_mm(work->identity->mm);
		worker->mm = work->identity->mm;
438 439 440 441 442 443 444
		return;
	}

	/* failed grabbing mm, ensure work gets cancelled */
	work->flags |= IO_WQ_WORK_CANCEL;
}

445 446 447 448
static inline void io_wq_switch_blkcg(struct io_worker *worker,
				      struct io_wq_work *work)
{
#ifdef CONFIG_BLK_CGROUP
449 450
	if (!(work->flags & IO_WQ_WORK_BLKCG))
		return;
451 452 453
	if (work->identity->blkcg_css != worker->blkcg_css) {
		kthread_associate_blkcg(work->identity->blkcg_css);
		worker->blkcg_css = work->identity->blkcg_css;
454 455 456 457
	}
#endif
}

458 459 460
static void io_wq_switch_creds(struct io_worker *worker,
			       struct io_wq_work *work)
{
461
	const struct cred *old_creds = override_creds(work->identity->creds);
462

463
	worker->cur_creds = work->identity->creds;
464 465 466 467 468 469
	if (worker->saved_creds)
		put_cred(old_creds); /* creds set by previous switch */
	else
		worker->saved_creds = old_creds;
}

470 471 472
static void io_impersonate_work(struct io_worker *worker,
				struct io_wq_work *work)
{
473 474
	if ((work->flags & IO_WQ_WORK_FILES) &&
	    current->files != work->identity->files) {
475
		task_lock(current);
476 477
		current->files = work->identity->files;
		current->nsproxy = work->identity->nsproxy;
478 479
		task_unlock(current);
	}
480 481 482
	if ((work->flags & IO_WQ_WORK_FS) && current->fs != work->identity->fs)
		current->fs = work->identity->fs;
	if ((work->flags & IO_WQ_WORK_MM) && work->identity->mm != worker->mm)
483
		io_wq_switch_mm(worker, work);
484 485
	if ((work->flags & IO_WQ_WORK_CREDS) &&
	    worker->cur_creds != work->identity->creds)
486
		io_wq_switch_creds(worker, work);
487 488 489 490
	if (work->flags & IO_WQ_WORK_FSIZE)
		current->signal->rlim[RLIMIT_FSIZE].rlim_cur = work->identity->fsize;
	else if (current->signal->rlim[RLIMIT_FSIZE].rlim_cur != RLIM_INFINITY)
		current->signal->rlim[RLIMIT_FSIZE].rlim_cur = RLIM_INFINITY;
491
	io_wq_switch_blkcg(worker, work);
492 493 494 495
#ifdef CONFIG_AUDIT
	current->loginuid = work->identity->loginuid;
	current->sessionid = work->identity->sessionid;
#endif
496 497 498 499 500
}

static void io_assign_current_work(struct io_worker *worker,
				   struct io_wq_work *work)
{
501 502 503 504 505 506
	if (work) {
		/* flush pending signals before assigning new work */
		if (signal_pending(current))
			flush_signals(current);
		cond_resched();
	}
507

508 509 510 511 512
#ifdef CONFIG_AUDIT
	current->loginuid = KUIDT_INIT(AUDIT_UID_UNSET);
	current->sessionid = AUDIT_SID_UNSET;
#endif

513 514 515 516 517
	spin_lock_irq(&worker->lock);
	worker->cur_work = work;
	spin_unlock_irq(&worker->lock);
}

P
Pavel Begunkov 已提交
518 519
static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work);

520 521 522 523 524 525 526
static void io_worker_handle_work(struct io_worker *worker)
	__releases(wqe->lock)
{
	struct io_wqe *wqe = worker->wqe;
	struct io_wq *wq = wqe->wq;

	do {
527
		struct io_wq_work *work;
528
get_next:
529 530 531 532 533 534 535
		/*
		 * If we got some work, mark us as busy. If we didn't, but
		 * the list isn't empty, it means we stalled on hashed work.
		 * Mark us stalled so we don't keep looking for work when we
		 * can't make progress, any work completion or insertion will
		 * clear the stalled flag.
		 */
P
Pavel Begunkov 已提交
536
		work = io_get_next_work(wqe);
537 538
		if (work)
			__io_worker_busy(wqe, worker, work);
J
Jens Axboe 已提交
539
		else if (!wq_list_empty(&wqe->work_list))
540 541
			wqe->flags |= IO_WQE_FLAG_STALLED;

542
		raw_spin_unlock_irq(&wqe->lock);
543 544
		if (!work)
			break;
545
		io_assign_current_work(worker, work);
546

547 548
		/* handle a whole dependent link */
		do {
549
			struct io_wq_work *old_work, *next_hashed, *linked;
P
Pavel Begunkov 已提交
550
			unsigned int hash = io_get_work_hash(work);
551

552
			next_hashed = wq_next_work(work);
553
			io_impersonate_work(worker, work);
554 555 556 557 558 559 560
			/*
			 * OK to set IO_WQ_WORK_CANCEL even for uncancellable
			 * work, the worker function will do the right thing.
			 */
			if (test_bit(IO_WQ_BIT_CANCEL, &wq->state))
				work->flags |= IO_WQ_WORK_CANCEL;

561 562
			old_work = work;
			linked = wq->do_work(work);
563 564 565 566 567 568 569

			work = next_hashed;
			if (!work && linked && !io_wq_is_hashed(linked)) {
				work = linked;
				linked = NULL;
			}
			io_assign_current_work(worker, work);
570
			wq->free_work(old_work);
571

572 573 574 575
			if (linked)
				io_wqe_enqueue(wqe, linked);

			if (hash != -1U && !next_hashed) {
576
				raw_spin_lock_irq(&wqe->lock);
577 578
				wqe->hash_map &= ~BIT_ULL(hash);
				wqe->flags &= ~IO_WQE_FLAG_STALLED;
579 580 581
				/* skip unnecessary unlock-lock wqe->lock */
				if (!work)
					goto get_next;
582
				raw_spin_unlock_irq(&wqe->lock);
583
			}
584
		} while (work);
585

586
		raw_spin_lock_irq(&wqe->lock);
587 588 589 590 591 592 593 594 595 596 597 598
	} while (1);
}

static int io_wqe_worker(void *data)
{
	struct io_worker *worker = data;
	struct io_wqe *wqe = worker->wqe;
	struct io_wq *wq = wqe->wq;

	io_worker_start(wqe, worker);

	while (!test_bit(IO_WQ_BIT_EXIT, &wq->state)) {
J
Jens Axboe 已提交
599
		set_current_state(TASK_INTERRUPTIBLE);
600
loop:
601
		raw_spin_lock_irq(&wqe->lock);
602 603 604
		if (io_wqe_run_queue(wqe)) {
			__set_current_state(TASK_RUNNING);
			io_worker_handle_work(worker);
605
			goto loop;
606 607 608 609
		}
		/* drops the lock on success, retry */
		if (__io_worker_idle(wqe, worker)) {
			__release(&wqe->lock);
610
			goto loop;
611
		}
612
		raw_spin_unlock_irq(&wqe->lock);
613 614 615 616 617 618 619 620 621 622 623
		if (signal_pending(current))
			flush_signals(current);
		if (schedule_timeout(WORKER_IDLE_TIMEOUT))
			continue;
		/* timed out, exit unless we're the fixed worker */
		if (test_bit(IO_WQ_BIT_EXIT, &wq->state) ||
		    !(worker->flags & IO_WORKER_F_FIXED))
			break;
	}

	if (test_bit(IO_WQ_BIT_EXIT, &wq->state)) {
624
		raw_spin_lock_irq(&wqe->lock);
J
Jens Axboe 已提交
625
		if (!wq_list_empty(&wqe->work_list))
626 627
			io_worker_handle_work(worker);
		else
628
			raw_spin_unlock_irq(&wqe->lock);
629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647
	}

	io_worker_exit(worker);
	return 0;
}

/*
 * Called when a worker is scheduled in. Mark us as currently running.
 */
void io_wq_worker_running(struct task_struct *tsk)
{
	struct io_worker *worker = kthread_data(tsk);
	struct io_wqe *wqe = worker->wqe;

	if (!(worker->flags & IO_WORKER_F_UP))
		return;
	if (worker->flags & IO_WORKER_F_RUNNING)
		return;
	worker->flags |= IO_WORKER_F_RUNNING;
648
	io_wqe_inc_running(wqe, worker);
649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667
}

/*
 * Called when worker is going to sleep. If there are no workers currently
 * running and we have work pending, wake up a free one or have the manager
 * set one up.
 */
void io_wq_worker_sleeping(struct task_struct *tsk)
{
	struct io_worker *worker = kthread_data(tsk);
	struct io_wqe *wqe = worker->wqe;

	if (!(worker->flags & IO_WORKER_F_UP))
		return;
	if (!(worker->flags & IO_WORKER_F_RUNNING))
		return;

	worker->flags &= ~IO_WORKER_F_RUNNING;

668
	raw_spin_lock_irq(&wqe->lock);
669
	io_wqe_dec_running(wqe, worker);
670
	raw_spin_unlock_irq(&wqe->lock);
671 672
}

673
static bool create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index)
674
{
675
	struct io_wqe_acct *acct = &wqe->acct[index];
676 677
	struct io_worker *worker;

678
	worker = kzalloc_node(sizeof(*worker), GFP_KERNEL, wqe->node);
679
	if (!worker)
680
		return false;
681 682 683 684

	refcount_set(&worker->ref, 1);
	worker->nulls_node.pprev = NULL;
	worker->wqe = wqe;
685
	spin_lock_init(&worker->lock);
686 687

	worker->task = kthread_create_on_node(io_wqe_worker, worker, wqe->node,
688
				"io_wqe_worker-%d/%d", index, wqe->node);
689 690
	if (IS_ERR(worker->task)) {
		kfree(worker);
691
		return false;
692
	}
693
	kthread_bind_mask(worker->task, cpumask_of_node(wqe->node));
694

695
	raw_spin_lock_irq(&wqe->lock);
696
	hlist_nulls_add_head_rcu(&worker->nulls_node, &wqe->free_list);
697
	list_add_tail_rcu(&worker->all_list, &wqe->all_list);
698
	worker->flags |= IO_WORKER_F_FREE;
699 700 701
	if (index == IO_WQ_ACCT_BOUND)
		worker->flags |= IO_WORKER_F_BOUND;
	if (!acct->nr_workers && (worker->flags & IO_WORKER_F_BOUND))
702
		worker->flags |= IO_WORKER_F_FIXED;
703
	acct->nr_workers++;
704
	raw_spin_unlock_irq(&wqe->lock);
705

706 707 708
	if (index == IO_WQ_ACCT_UNBOUND)
		atomic_inc(&wq->user->processes);

709
	refcount_inc(&wq->refs);
710
	wake_up_process(worker->task);
711
	return true;
712 713
}

714
static inline bool io_wqe_need_worker(struct io_wqe *wqe, int index)
715 716
	__must_hold(wqe->lock)
{
717
	struct io_wqe_acct *acct = &wqe->acct[index];
718

719
	/* if we have available workers or no work, no need */
720
	if (!hlist_nulls_empty(&wqe->free_list) || !io_wqe_run_queue(wqe))
721 722
		return false;
	return acct->nr_workers < acct->max_workers;
723 724
}

725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761
static bool io_wqe_worker_send_sig(struct io_worker *worker, void *data)
{
	send_sig(SIGINT, worker->task, 1);
	return false;
}

/*
 * Iterate the passed in list and call the specific function for each
 * worker that isn't exiting
 */
static bool io_wq_for_each_worker(struct io_wqe *wqe,
				  bool (*func)(struct io_worker *, void *),
				  void *data)
{
	struct io_worker *worker;
	bool ret = false;

	list_for_each_entry_rcu(worker, &wqe->all_list, all_list) {
		if (io_worker_get(worker)) {
			/* no task if node is/was offline */
			if (worker->task)
				ret = func(worker, data);
			io_worker_release(worker);
			if (ret)
				break;
		}
	}

	return ret;
}

static bool io_wq_worker_wake(struct io_worker *worker, void *data)
{
	wake_up_process(worker->task);
	return false;
}

762 763 764 765 766 767
/*
 * Manager thread. Tasked with creating new workers, if we need them.
 */
static int io_wq_manager(void *data)
{
	struct io_wq *wq = data;
J
Jann Horn 已提交
768
	int node;
769

770
	/* create fixed workers */
771
	refcount_set(&wq->refs, 1);
J
Jann Horn 已提交
772
	for_each_node(node) {
773 774
		if (!node_online(node))
			continue;
775 776 777 778 779
		if (create_io_worker(wq, wq->wqes[node], IO_WQ_ACCT_BOUND))
			continue;
		set_bit(IO_WQ_BIT_ERROR, &wq->state);
		set_bit(IO_WQ_BIT_EXIT, &wq->state);
		goto out;
780
	}
781

782 783 784
	complete(&wq->done);

	while (!kthread_should_stop()) {
785 786 787
		if (current->task_works)
			task_work_run();

J
Jann Horn 已提交
788 789
		for_each_node(node) {
			struct io_wqe *wqe = wq->wqes[node];
790
			bool fork_worker[2] = { false, false };
791

792 793 794
			if (!node_online(node))
				continue;

795
			raw_spin_lock_irq(&wqe->lock);
796 797 798 799
			if (io_wqe_need_worker(wqe, IO_WQ_ACCT_BOUND))
				fork_worker[IO_WQ_ACCT_BOUND] = true;
			if (io_wqe_need_worker(wqe, IO_WQ_ACCT_UNBOUND))
				fork_worker[IO_WQ_ACCT_UNBOUND] = true;
800
			raw_spin_unlock_irq(&wqe->lock);
801 802 803 804
			if (fork_worker[IO_WQ_ACCT_BOUND])
				create_io_worker(wq, wqe, IO_WQ_ACCT_BOUND);
			if (fork_worker[IO_WQ_ACCT_UNBOUND])
				create_io_worker(wq, wqe, IO_WQ_ACCT_UNBOUND);
805 806 807 808 809
		}
		set_current_state(TASK_INTERRUPTIBLE);
		schedule_timeout(HZ);
	}

810 811 812
	if (current->task_works)
		task_work_run();

813 814
out:
	if (refcount_dec_and_test(&wq->refs)) {
815
		complete(&wq->done);
816 817 818 819 820 821 822 823 824
		return 0;
	}
	/* if ERROR is set and we get here, we have workers to wake */
	if (test_bit(IO_WQ_BIT_ERROR, &wq->state)) {
		rcu_read_lock();
		for_each_node(node)
			io_wq_for_each_worker(wq->wqes[node], io_wq_worker_wake, NULL);
		rcu_read_unlock();
	}
825
	return 0;
826 827
}

828 829 830 831 832 833 834 835 836 837 838
static bool io_wq_can_queue(struct io_wqe *wqe, struct io_wqe_acct *acct,
			    struct io_wq_work *work)
{
	bool free_worker;

	if (!(work->flags & IO_WQ_WORK_UNBOUND))
		return true;
	if (atomic_read(&acct->nr_running))
		return true;

	rcu_read_lock();
839
	free_worker = !hlist_nulls_empty(&wqe->free_list);
840 841 842 843 844 845 846 847 848 849 850
	rcu_read_unlock();
	if (free_worker)
		return true;

	if (atomic_read(&wqe->wq->user->processes) >= acct->max_workers &&
	    !(capable(CAP_SYS_RESOURCE) || capable(CAP_SYS_ADMIN)))
		return false;

	return true;
}

851
static void io_run_cancel(struct io_wq_work *work, struct io_wqe *wqe)
852
{
853 854
	struct io_wq *wq = wqe->wq;

855 856 857 858
	do {
		struct io_wq_work *old_work = work;

		work->flags |= IO_WQ_WORK_CANCEL;
859
		work = wq->do_work(work);
860
		wq->free_work(old_work);
861 862 863
	} while (work);
}

864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883
static void io_wqe_insert_work(struct io_wqe *wqe, struct io_wq_work *work)
{
	unsigned int hash;
	struct io_wq_work *tail;

	if (!io_wq_is_hashed(work)) {
append:
		wq_list_add_tail(&work->list, &wqe->work_list);
		return;
	}

	hash = io_get_work_hash(work);
	tail = wqe->hash_tail[hash];
	wqe->hash_tail[hash] = work;
	if (!tail)
		goto append;

	wq_list_add_after(&work->list, &tail->list, &wqe->work_list);
}

884 885
static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work)
{
886
	struct io_wqe_acct *acct = io_work_get_acct(wqe, work);
887
	int work_flags;
888 889
	unsigned long flags;

890 891 892 893 894 895 896
	/*
	 * Do early check to see if we need a new unbound worker, and if we do,
	 * if we're allowed to do so. This isn't 100% accurate as there's a
	 * gap between this check and incrementing the value, but that's OK.
	 * It's close enough to not be an issue, fork() has the same delay.
	 */
	if (unlikely(!io_wq_can_queue(wqe, acct, work))) {
897
		io_run_cancel(work, wqe);
898 899 900
		return;
	}

901
	work_flags = work->flags;
902
	raw_spin_lock_irqsave(&wqe->lock, flags);
903
	io_wqe_insert_work(wqe, work);
904
	wqe->flags &= ~IO_WQE_FLAG_STALLED;
905
	raw_spin_unlock_irqrestore(&wqe->lock, flags);
906

907 908
	if ((work_flags & IO_WQ_WORK_CONCURRENT) ||
	    !atomic_read(&acct->nr_running))
909
		io_wqe_wake_worker(wqe, acct);
910 911 912 913 914 915 916 917 918 919
}

void io_wq_enqueue(struct io_wq *wq, struct io_wq_work *work)
{
	struct io_wqe *wqe = wq->wqes[numa_node_id()];

	io_wqe_enqueue(wqe, work);
}

/*
920 921
 * Work items that hash to the same value will not be done in parallel.
 * Used to limit concurrent writes, generally hashed by inode.
922
 */
923
void io_wq_hash_work(struct io_wq_work *work, void *val)
924
{
925
	unsigned int bit;
926 927 928 929 930 931 932

	bit = hash_ptr(val, IO_WQ_HASH_ORDER);
	work->flags |= (IO_WQ_WORK_HASHED | (bit << IO_WQ_HASH_SHIFT));
}

void io_wq_cancel_all(struct io_wq *wq)
{
J
Jann Horn 已提交
933
	int node;
934 935 936 937

	set_bit(IO_WQ_BIT_CANCEL, &wq->state);

	rcu_read_lock();
J
Jann Horn 已提交
938 939
	for_each_node(node) {
		struct io_wqe *wqe = wq->wqes[node];
940

941
		io_wq_for_each_worker(wqe, io_wqe_worker_send_sig, NULL);
942 943 944 945
	}
	rcu_read_unlock();
}

946
struct io_cb_cancel_data {
947 948
	work_cancel_fn *fn;
	void *data;
949 950 951
	int nr_running;
	int nr_pending;
	bool cancel_all;
952 953
};

954
static bool io_wq_worker_cancel(struct io_worker *worker, void *data)
955
{
956
	struct io_cb_cancel_data *match = data;
957
	unsigned long flags;
958 959 960

	/*
	 * Hold the lock to avoid ->cur_work going out of scope, caller
961
	 * may dereference the passed in work.
962
	 */
963
	spin_lock_irqsave(&worker->lock, flags);
964
	if (worker->cur_work &&
965
	    !(worker->cur_work->flags & IO_WQ_WORK_NO_CANCEL) &&
966
	    match->fn(worker->cur_work, match->data)) {
967
		send_sig(SIGINT, worker->task, 1);
968
		match->nr_running++;
969
	}
970
	spin_unlock_irqrestore(&worker->lock, flags);
971

972
	return match->nr_running && !match->cancel_all;
973 974
}

975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992
static inline void io_wqe_remove_pending(struct io_wqe *wqe,
					 struct io_wq_work *work,
					 struct io_wq_work_node *prev)
{
	unsigned int hash = io_get_work_hash(work);
	struct io_wq_work *prev_work = NULL;

	if (io_wq_is_hashed(work) && work == wqe->hash_tail[hash]) {
		if (prev)
			prev_work = container_of(prev, struct io_wq_work, list);
		if (prev_work && io_get_work_hash(prev_work) == hash)
			wqe->hash_tail[hash] = prev_work;
		else
			wqe->hash_tail[hash] = NULL;
	}
	wq_list_del(&wqe->work_list, &work->list, prev);
}

993
static void io_wqe_cancel_pending_work(struct io_wqe *wqe,
994
				       struct io_cb_cancel_data *match)
995
{
J
Jens Axboe 已提交
996
	struct io_wq_work_node *node, *prev;
997
	struct io_wq_work *work;
998
	unsigned long flags;
999

1000
retry:
1001
	raw_spin_lock_irqsave(&wqe->lock, flags);
J
Jens Axboe 已提交
1002 1003
	wq_list_for_each(node, prev, &wqe->work_list) {
		work = container_of(node, struct io_wq_work, list);
1004 1005
		if (!match->fn(work, match->data))
			continue;
1006
		io_wqe_remove_pending(wqe, work, prev);
1007
		raw_spin_unlock_irqrestore(&wqe->lock, flags);
1008 1009 1010 1011 1012 1013 1014
		io_run_cancel(work, wqe);
		match->nr_pending++;
		if (!match->cancel_all)
			return;

		/* not safe to continue after unlock */
		goto retry;
1015
	}
1016
	raw_spin_unlock_irqrestore(&wqe->lock, flags);
1017 1018
}

1019
static void io_wqe_cancel_running_work(struct io_wqe *wqe,
1020 1021
				       struct io_cb_cancel_data *match)
{
1022
	rcu_read_lock();
1023
	io_wq_for_each_worker(wqe, io_wq_worker_cancel, match);
1024 1025 1026
	rcu_read_unlock();
}

1027
enum io_wq_cancel io_wq_cancel_cb(struct io_wq *wq, work_cancel_fn *cancel,
1028
				  void *data, bool cancel_all)
1029
{
1030
	struct io_cb_cancel_data match = {
1031 1032 1033
		.fn		= cancel,
		.data		= data,
		.cancel_all	= cancel_all,
1034
	};
J
Jann Horn 已提交
1035
	int node;
1036

1037 1038 1039 1040 1041
	/*
	 * First check pending list, if we're lucky we can just remove it
	 * from there. CANCEL_OK means that the work is returned as-new,
	 * no completion will be posted for it.
	 */
J
Jann Horn 已提交
1042 1043
	for_each_node(node) {
		struct io_wqe *wqe = wq->wqes[node];
1044

1045 1046
		io_wqe_cancel_pending_work(wqe, &match);
		if (match.nr_pending && !match.cancel_all)
1047
			return IO_WQ_CANCEL_OK;
1048 1049
	}

1050 1051 1052 1053 1054 1055 1056 1057 1058
	/*
	 * Now check if a free (going busy) or busy worker has the work
	 * currently running. If we find it there, we'll return CANCEL_RUNNING
	 * as an indication that we attempt to signal cancellation. The
	 * completion will run normally in this case.
	 */
	for_each_node(node) {
		struct io_wqe *wqe = wq->wqes[node];

1059 1060
		io_wqe_cancel_running_work(wqe, &match);
		if (match.nr_running && !match.cancel_all)
1061 1062 1063
			return IO_WQ_CANCEL_RUNNING;
	}

1064 1065 1066 1067
	if (match.nr_running)
		return IO_WQ_CANCEL_RUNNING;
	if (match.nr_pending)
		return IO_WQ_CANCEL_OK;
1068
	return IO_WQ_CANCEL_NOTFOUND;
1069 1070
}

1071 1072 1073 1074 1075 1076 1077
static bool io_wq_io_cb_cancel_data(struct io_wq_work *work, void *data)
{
	return work == data;
}

enum io_wq_cancel io_wq_cancel_work(struct io_wq *wq, struct io_wq_work *cwork)
{
1078
	return io_wq_cancel_cb(wq, io_wq_io_cb_cancel_data, (void *)cwork, false);
1079 1080
}

1081
struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data)
1082
{
J
Jann Horn 已提交
1083
	int ret = -ENOMEM, node;
1084 1085
	struct io_wq *wq;

1086
	if (WARN_ON_ONCE(!data->free_work || !data->do_work))
1087 1088
		return ERR_PTR(-EINVAL);

1089
	wq = kzalloc(sizeof(*wq), GFP_KERNEL);
1090 1091 1092
	if (!wq)
		return ERR_PTR(-ENOMEM);

J
Jann Horn 已提交
1093
	wq->wqes = kcalloc(nr_node_ids, sizeof(struct io_wqe *), GFP_KERNEL);
1094 1095 1096 1097 1098
	if (!wq->wqes) {
		kfree(wq);
		return ERR_PTR(-ENOMEM);
	}

1099
	wq->free_work = data->free_work;
1100
	wq->do_work = data->do_work;
1101

1102
	/* caller must already hold a reference to this */
1103
	wq->user = data->user;
1104

J
Jann Horn 已提交
1105
	for_each_node(node) {
1106
		struct io_wqe *wqe;
1107
		int alloc_node = node;
1108

1109 1110 1111
		if (!node_online(alloc_node))
			alloc_node = NUMA_NO_NODE;
		wqe = kzalloc_node(sizeof(struct io_wqe), GFP_KERNEL, alloc_node);
1112
		if (!wqe)
J
Jann Horn 已提交
1113 1114
			goto err;
		wq->wqes[node] = wqe;
1115
		wqe->node = alloc_node;
1116 1117
		wqe->acct[IO_WQ_ACCT_BOUND].max_workers = bounded;
		atomic_set(&wqe->acct[IO_WQ_ACCT_BOUND].nr_running, 0);
1118
		if (wq->user) {
1119 1120 1121 1122
			wqe->acct[IO_WQ_ACCT_UNBOUND].max_workers =
					task_rlimit(current, RLIMIT_NPROC);
		}
		atomic_set(&wqe->acct[IO_WQ_ACCT_UNBOUND].nr_running, 0);
1123
		wqe->wq = wq;
1124
		raw_spin_lock_init(&wqe->lock);
J
Jens Axboe 已提交
1125
		INIT_WQ_LIST(&wqe->work_list);
1126
		INIT_HLIST_NULLS_HEAD(&wqe->free_list, 0);
1127
		INIT_LIST_HEAD(&wqe->all_list);
1128 1129 1130 1131 1132 1133 1134
	}

	init_completion(&wq->done);

	wq->manager = kthread_create(io_wq_manager, wq, "io_wq_manager");
	if (!IS_ERR(wq->manager)) {
		wake_up_process(wq->manager);
1135 1136 1137 1138 1139
		wait_for_completion(&wq->done);
		if (test_bit(IO_WQ_BIT_ERROR, &wq->state)) {
			ret = -ENOMEM;
			goto err;
		}
J
Jens Axboe 已提交
1140
		refcount_set(&wq->use_refs, 1);
1141
		reinit_completion(&wq->done);
1142 1143 1144 1145 1146
		return wq;
	}

	ret = PTR_ERR(wq->manager);
	complete(&wq->done);
1147
err:
J
Jann Horn 已提交
1148 1149
	for_each_node(node)
		kfree(wq->wqes[node]);
1150 1151
	kfree(wq->wqes);
	kfree(wq);
1152 1153 1154
	return ERR_PTR(ret);
}

1155 1156
bool io_wq_get(struct io_wq *wq, struct io_wq_data *data)
{
1157
	if (data->free_work != wq->free_work || data->do_work != wq->do_work)
1158 1159 1160 1161 1162
		return false;

	return refcount_inc_not_zero(&wq->use_refs);
}

J
Jens Axboe 已提交
1163
static void __io_wq_destroy(struct io_wq *wq)
1164
{
J
Jann Horn 已提交
1165
	int node;
1166

1167 1168
	set_bit(IO_WQ_BIT_EXIT, &wq->state);
	if (wq->manager)
1169 1170 1171
		kthread_stop(wq->manager);

	rcu_read_lock();
J
Jann Horn 已提交
1172 1173
	for_each_node(node)
		io_wq_for_each_worker(wq->wqes[node], io_wq_worker_wake, NULL);
1174 1175 1176 1177
	rcu_read_unlock();

	wait_for_completion(&wq->done);

J
Jann Horn 已提交
1178 1179
	for_each_node(node)
		kfree(wq->wqes[node]);
1180 1181 1182
	kfree(wq->wqes);
	kfree(wq);
}
J
Jens Axboe 已提交
1183 1184 1185 1186 1187 1188

void io_wq_destroy(struct io_wq *wq)
{
	if (refcount_dec_and_test(&wq->use_refs))
		__io_wq_destroy(wq);
}
1189 1190 1191 1192 1193

struct task_struct *io_wq_get_task(struct io_wq *wq)
{
	return wq->manager;
}