io-wq.c 31.1 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14
// SPDX-License-Identifier: GPL-2.0
/*
 * Basic worker thread pool for io_uring
 *
 * Copyright (C) 2019 Jens Axboe
 *
 */
#include <linux/kernel.h>
#include <linux/init.h>
#include <linux/errno.h>
#include <linux/sched/signal.h>
#include <linux/percpu.h>
#include <linux/slab.h>
#include <linux/rculist_nulls.h>
15
#include <linux/cpu.h>
16
#include <linux/tracehook.h>
17
#include <uapi/linux/io_uring.h>
18 19 20 21 22 23 24 25 26

#include "io-wq.h"

#define WORKER_IDLE_TIMEOUT	(5 * HZ)

enum {
	IO_WORKER_F_UP		= 1,	/* up and active */
	IO_WORKER_F_RUNNING	= 2,	/* account as running */
	IO_WORKER_F_FREE	= 4,	/* worker on free list */
J
Jens Axboe 已提交
27
	IO_WORKER_F_BOUND	= 8,	/* is doing bounded work */
28 29 30 31 32 33 34
};

enum {
	IO_WQ_BIT_EXIT		= 0,	/* wq exiting */
};

enum {
35
	IO_ACCT_STALLED_BIT	= 0,	/* stalled on hash */
36 37 38 39 40 41 42 43 44
};

/*
 * One for each thread in a wqe pool
 */
struct io_worker {
	refcount_t ref;
	unsigned flags;
	struct hlist_nulls_node nulls_node;
45
	struct list_head all_list;
46 47
	struct task_struct *task;
	struct io_wqe *wqe;
48

49
	struct io_wq_work *cur_work;
50
	spinlock_t lock;
51

52 53
	struct completion ref_done;

54 55 56 57
	unsigned long create_state;
	struct callback_head create_work;
	int create_index;

58 59 60 61
	union {
		struct rcu_head rcu;
		struct work_struct work;
	};
62 63 64 65 66 67 68 69
};

#if BITS_PER_LONG == 64
#define IO_WQ_HASH_ORDER	6
#else
#define IO_WQ_HASH_ORDER	5
#endif

70 71
#define IO_WQ_NR_HASH_BUCKETS	(1u << IO_WQ_HASH_ORDER)

72 73 74
struct io_wqe_acct {
	unsigned nr_workers;
	unsigned max_workers;
75
	int index;
76
	atomic_t nr_running;
77 78
	struct io_wq_work_list work_list;
	unsigned long flags;
79 80 81 82 83
};

enum {
	IO_WQ_ACCT_BOUND,
	IO_WQ_ACCT_UNBOUND,
84
	IO_WQ_ACCT_NR,
85 86
};

87 88 89 90
/*
 * Per-node worker thread pool
 */
struct io_wqe {
91 92
	raw_spinlock_t lock;
	struct io_wqe_acct acct[2];
93 94 95

	int node;

96
	struct hlist_nulls_head free_list;
97
	struct list_head all_list;
98

99 100
	struct wait_queue_entry wait;

101
	struct io_wq *wq;
102
	struct io_wq_work *hash_tail[IO_WQ_NR_HASH_BUCKETS];
J
Jens Axboe 已提交
103 104

	cpumask_var_t cpu_mask;
105 106 107 108 109 110 111 112
};

/*
 * Per io_wq state
  */
struct io_wq {
	unsigned long state;

113
	free_work_fn *free_work;
114
	io_wq_work_fn *do_work;
115

116 117
	struct io_wq_hash *hash;

118 119 120
	atomic_t worker_refs;
	struct completion worker_done;

121
	struct hlist_node cpuhp_node;
122

123
	struct task_struct *task;
124 125

	struct io_wqe *wqes[];
126 127
};

128 129
static enum cpuhp_state io_wq_online;

130 131 132 133 134 135 136 137
struct io_cb_cancel_data {
	work_cancel_fn *fn;
	void *data;
	int nr_running;
	int nr_pending;
	bool cancel_all;
};

138
static bool create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index);
139
static void io_wqe_dec_running(struct io_worker *worker);
140 141 142
static bool io_acct_cancel_pending_work(struct io_wqe *wqe,
					struct io_wqe_acct *acct,
					struct io_cb_cancel_data *match);
143

144 145 146 147 148 149 150 151
static bool io_worker_get(struct io_worker *worker)
{
	return refcount_inc_not_zero(&worker->ref);
}

static void io_worker_release(struct io_worker *worker)
{
	if (refcount_dec_and_test(&worker->ref))
152
		complete(&worker->ref_done);
153 154
}

P
Pavel Begunkov 已提交
155 156 157 158 159
static inline struct io_wqe_acct *io_get_acct(struct io_wqe *wqe, bool bound)
{
	return &wqe->acct[bound ? IO_WQ_ACCT_BOUND : IO_WQ_ACCT_UNBOUND];
}

160 161 162
static inline struct io_wqe_acct *io_work_get_acct(struct io_wqe *wqe,
						   struct io_wq_work *work)
{
P
Pavel Begunkov 已提交
163
	return io_get_acct(wqe, !(work->flags & IO_WQ_WORK_UNBOUND));
164 165
}

166
static inline struct io_wqe_acct *io_wqe_get_acct(struct io_worker *worker)
167
{
P
Pavel Begunkov 已提交
168
	return io_get_acct(worker->wqe, worker->flags & IO_WORKER_F_BOUND);
169 170
}

171 172 173 174 175 176
static void io_worker_ref_put(struct io_wq *wq)
{
	if (atomic_dec_and_test(&wq->worker_refs))
		complete(&wq->worker_done);
}

177 178 179 180
static void io_worker_exit(struct io_worker *worker)
{
	struct io_wqe *wqe = worker->wqe;

181 182 183
	if (refcount_dec_and_test(&worker->ref))
		complete(&worker->ref_done);
	wait_for_completion(&worker->ref_done);
184

185
	raw_spin_lock(&wqe->lock);
186
	if (worker->flags & IO_WORKER_F_FREE)
187
		hlist_nulls_del_rcu(&worker->nulls_node);
188
	list_del_rcu(&worker->all_list);
189 190 191 192 193
	preempt_disable();
	io_wqe_dec_running(worker);
	worker->flags = 0;
	current->flags &= ~PF_IO_WORKER;
	preempt_enable();
194
	raw_spin_unlock(&wqe->lock);
195

196
	kfree_rcu(worker, rcu);
197
	io_worker_ref_put(wqe->wq);
198
	do_exit(0);
199 200
}

201
static inline bool io_acct_run_queue(struct io_wqe_acct *acct)
202
{
203 204
	if (!wq_list_empty(&acct->work_list) &&
	    !test_bit(IO_ACCT_STALLED_BIT, &acct->flags))
205 206 207 208 209 210
		return true;
	return false;
}

/*
 * Check head of free list for an available worker. If one isn't available,
211
 * caller must create one.
212
 */
213 214
static bool io_wqe_activate_free_worker(struct io_wqe *wqe,
					struct io_wqe_acct *acct)
215 216 217 218 219
	__must_hold(RCU)
{
	struct hlist_nulls_node *n;
	struct io_worker *worker;

220 221 222 223 224 225 226 227
	/*
	 * Iterate free_list and see if we can find an idle worker to
	 * activate. If a given worker is on the free_list but in the process
	 * of exiting, keep trying.
	 */
	hlist_nulls_for_each_entry_rcu(worker, n, &wqe->free_list, nulls_node) {
		if (!io_worker_get(worker))
			continue;
228 229 230 231
		if (io_wqe_get_acct(worker) != acct) {
			io_worker_release(worker);
			continue;
		}
232 233 234 235
		if (wake_up_process(worker->task)) {
			io_worker_release(worker);
			return true;
		}
236 237 238 239 240 241 242 243
		io_worker_release(worker);
	}

	return false;
}

/*
 * We need a worker. If we find a free one, we're good. If not, and we're
244
 * below the max number of workers, create one.
245
 */
246
static bool io_wqe_create_worker(struct io_wqe *wqe, struct io_wqe_acct *acct)
247 248 249 250 251
{
	/*
	 * Most likely an attempt to queue unbounded work on an io_wq that
	 * wasn't setup with any unbounded workers.
	 */
P
Pavel Begunkov 已提交
252 253
	if (unlikely(!acct->max_workers))
		pr_warn_once("io-wq is not configured for unbound workers");
254

255
	raw_spin_lock(&wqe->lock);
P
Pavel Begunkov 已提交
256
	if (acct->nr_workers >= acct->max_workers) {
257 258
		raw_spin_unlock(&wqe->lock);
		return true;
259
	}
260
	acct->nr_workers++;
261
	raw_spin_unlock(&wqe->lock);
262 263 264
	atomic_inc(&acct->nr_running);
	atomic_inc(&wqe->wq->worker_refs);
	return create_io_worker(wqe->wq, wqe, acct->index);
265 266
}

267
static void io_wqe_inc_running(struct io_worker *worker)
268
{
269
	struct io_wqe_acct *acct = io_wqe_get_acct(worker);
270 271 272 273

	atomic_inc(&acct->nr_running);
}

274 275
static void create_worker_cb(struct callback_head *cb)
{
276
	struct io_worker *worker;
277
	struct io_wq *wq;
278 279
	struct io_wqe *wqe;
	struct io_wqe_acct *acct;
J
Jens Axboe 已提交
280
	bool do_create = false;
281

282 283
	worker = container_of(cb, struct io_worker, create_work);
	wqe = worker->wqe;
284
	wq = wqe->wq;
285
	acct = &wqe->acct[worker->create_index];
286
	raw_spin_lock(&wqe->lock);
287
	if (acct->nr_workers < acct->max_workers) {
288
		acct->nr_workers++;
289 290
		do_create = true;
	}
291
	raw_spin_unlock(&wqe->lock);
292
	if (do_create) {
J
Jens Axboe 已提交
293
		create_io_worker(wq, wqe, worker->create_index);
294 295 296 297
	} else {
		atomic_dec(&acct->nr_running);
		io_worker_ref_put(wq);
	}
298 299
	clear_bit_unlock(0, &worker->create_state);
	io_worker_release(worker);
300 301
}

302 303 304
static bool io_queue_worker_create(struct io_worker *worker,
				   struct io_wqe_acct *acct,
				   task_work_func_t func)
305
{
306
	struct io_wqe *wqe = worker->wqe;
307 308 309 310 311
	struct io_wq *wq = wqe->wq;

	/* raced with exit, just ignore create call */
	if (test_bit(IO_WQ_BIT_EXIT, &wq->state))
		goto fail;
312 313 314 315 316 317 318 319 320 321 322
	if (!io_worker_get(worker))
		goto fail;
	/*
	 * create_state manages ownership of create_work/index. We should
	 * only need one entry per worker, as the worker going to sleep
	 * will trigger the condition, and waking will clear it once it
	 * runs the task_work.
	 */
	if (test_bit(0, &worker->create_state) ||
	    test_and_set_bit_lock(0, &worker->create_state))
		goto fail_release;
323

324
	init_task_work(&worker->create_work, func);
325 326
	worker->create_index = acct->index;
	if (!task_work_add(wq->task, &worker->create_work, TWA_SIGNAL))
327
		return true;
328 329 330
	clear_bit_unlock(0, &worker->create_state);
fail_release:
	io_worker_release(worker);
331 332 333
fail:
	atomic_dec(&acct->nr_running);
	io_worker_ref_put(wq);
334
	return false;
335 336
}

337
static void io_wqe_dec_running(struct io_worker *worker)
338 339
	__must_hold(wqe->lock)
{
340 341
	struct io_wqe_acct *acct = io_wqe_get_acct(worker);
	struct io_wqe *wqe = worker->wqe;
342

343 344 345
	if (!(worker->flags & IO_WORKER_F_UP))
		return;

346
	if (atomic_dec_and_test(&acct->nr_running) && io_acct_run_queue(acct)) {
347 348
		atomic_inc(&acct->nr_running);
		atomic_inc(&wqe->wq->worker_refs);
349
		io_queue_worker_create(worker, acct, create_worker_cb);
350
	}
351 352
}

353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373
/*
 * Worker will start processing some work. Move it to the busy list, if
 * it's currently on the freelist
 */
static void __io_worker_busy(struct io_wqe *wqe, struct io_worker *worker,
			     struct io_wq_work *work)
	__must_hold(wqe->lock)
{
	if (worker->flags & IO_WORKER_F_FREE) {
		worker->flags &= ~IO_WORKER_F_FREE;
		hlist_nulls_del_init_rcu(&worker->nulls_node);
	}
}

/*
 * No work, worker going to sleep. Move to freelist, and unuse mm if we
 * have one attached. Dropping the mm may potentially sleep, so we drop
 * the lock in that case and return success. Since the caller has to
 * retry the loop in that case (we changed task state), we don't regrab
 * the lock if we return success.
 */
374
static void __io_worker_idle(struct io_wqe *wqe, struct io_worker *worker)
375 376 377 378
	__must_hold(wqe->lock)
{
	if (!(worker->flags & IO_WORKER_F_FREE)) {
		worker->flags |= IO_WORKER_F_FREE;
379
		hlist_nulls_add_head_rcu(&worker->nulls_node, &wqe->free_list);
380 381 382
	}
}

P
Pavel Begunkov 已提交
383 384 385 386 387
static inline unsigned int io_get_work_hash(struct io_wq_work *work)
{
	return work->flags >> IO_WQ_HASH_SHIFT;
}

388 389 390 391
static void io_wait_on_hash(struct io_wqe *wqe, unsigned int hash)
{
	struct io_wq *wq = wqe->wq;

392
	spin_lock_irq(&wq->hash->wait.lock);
393 394 395 396 397 398 399
	if (list_empty(&wqe->wait.entry)) {
		__add_wait_queue(&wq->hash->wait, &wqe->wait);
		if (!test_bit(hash, &wq->hash->map)) {
			__set_current_state(TASK_RUNNING);
			list_del_init(&wqe->wait.entry);
		}
	}
400
	spin_unlock_irq(&wq->hash->wait.lock);
401 402
}

403
static struct io_wq_work *io_get_next_work(struct io_wqe_acct *acct,
J
Jens Axboe 已提交
404
					   struct io_worker *worker)
405 406
	__must_hold(wqe->lock)
{
J
Jens Axboe 已提交
407
	struct io_wq_work_node *node, *prev;
408
	struct io_wq_work *work, *tail;
409
	unsigned int stall_hash = -1U;
410
	struct io_wqe *wqe = worker->wqe;
411

412
	wq_list_for_each(node, prev, &acct->work_list) {
413 414
		unsigned int hash;

J
Jens Axboe 已提交
415 416
		work = container_of(node, struct io_wq_work, list);

417
		/* not hashed, can run anytime */
418
		if (!io_wq_is_hashed(work)) {
419
			wq_list_del(&acct->work_list, node, prev);
420 421 422
			return work;
		}

P
Pavel Begunkov 已提交
423
		hash = io_get_work_hash(work);
424 425 426 427 428
		/* all items with this hash lie in [work, tail] */
		tail = wqe->hash_tail[hash];

		/* hashed, can run if not already running */
		if (!test_and_set_bit(hash, &wqe->wq->hash->map)) {
429
			wqe->hash_tail[hash] = NULL;
430
			wq_list_cut(&acct->work_list, &tail->list, prev);
431 432
			return work;
		}
433 434 435 436 437 438 439
		if (stall_hash == -1U)
			stall_hash = hash;
		/* fast forward to a next hash, for-each will fix up @prev */
		node = &tail->list;
	}

	if (stall_hash != -1U) {
J
Jens Axboe 已提交
440 441 442 443
		/*
		 * Set this before dropping the lock to avoid racing with new
		 * work being added and clearing the stalled bit.
		 */
444
		set_bit(IO_ACCT_STALLED_BIT, &acct->flags);
445 446 447
		raw_spin_unlock(&wqe->lock);
		io_wait_on_hash(wqe, stall_hash);
		raw_spin_lock(&wqe->lock);
448 449 450 451 452
	}

	return NULL;
}

453
static bool io_flush_signals(void)
454
{
455
	if (unlikely(test_thread_flag(TIF_NOTIFY_SIGNAL))) {
456
		__set_current_state(TASK_RUNNING);
457
		tracehook_notify_signal();
458
		return true;
459
	}
460
	return false;
461 462 463 464 465
}

static void io_assign_current_work(struct io_worker *worker,
				   struct io_wq_work *work)
{
466
	if (work) {
467
		io_flush_signals();
468 469
		cond_resched();
	}
470

471
	spin_lock(&worker->lock);
472
	worker->cur_work = work;
473
	spin_unlock(&worker->lock);
474 475
}

P
Pavel Begunkov 已提交
476 477
static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work);

478 479 480
static void io_worker_handle_work(struct io_worker *worker)
	__releases(wqe->lock)
{
481
	struct io_wqe_acct *acct = io_wqe_get_acct(worker);
482 483
	struct io_wqe *wqe = worker->wqe;
	struct io_wq *wq = wqe->wq;
484
	bool do_kill = test_bit(IO_WQ_BIT_EXIT, &wq->state);
485 486

	do {
487
		struct io_wq_work *work;
488
get_next:
489 490 491 492 493 494 495
		/*
		 * If we got some work, mark us as busy. If we didn't, but
		 * the list isn't empty, it means we stalled on hashed work.
		 * Mark us stalled so we don't keep looking for work when we
		 * can't make progress, any work completion or insertion will
		 * clear the stalled flag.
		 */
496
		work = io_get_next_work(acct, worker);
497 498 499
		if (work)
			__io_worker_busy(wqe, worker, work);

500
		raw_spin_unlock(&wqe->lock);
501 502
		if (!work)
			break;
503
		io_assign_current_work(worker, work);
504
		__set_current_state(TASK_RUNNING);
505

506 507
		/* handle a whole dependent link */
		do {
508
			struct io_wq_work *next_hashed, *linked;
P
Pavel Begunkov 已提交
509
			unsigned int hash = io_get_work_hash(work);
510

511
			next_hashed = wq_next_work(work);
512 513 514

			if (unlikely(do_kill) && (work->flags & IO_WQ_WORK_UNBOUND))
				work->flags |= IO_WQ_WORK_CANCEL;
515 516
			wq->do_work(work);
			io_assign_current_work(worker, NULL);
517

518
			linked = wq->free_work(work);
519 520 521 522 523 524 525 526 527 528
			work = next_hashed;
			if (!work && linked && !io_wq_is_hashed(linked)) {
				work = linked;
				linked = NULL;
			}
			io_assign_current_work(worker, work);
			if (linked)
				io_wqe_enqueue(wqe, linked);

			if (hash != -1U && !next_hashed) {
529
				clear_bit(hash, &wq->hash->map);
530
				clear_bit(IO_ACCT_STALLED_BIT, &acct->flags);
531 532
				if (wq_has_sleeper(&wq->hash->wait))
					wake_up(&wq->hash->wait);
533
				raw_spin_lock(&wqe->lock);
534 535 536
				/* skip unnecessary unlock-lock wqe->lock */
				if (!work)
					goto get_next;
537
				raw_spin_unlock(&wqe->lock);
538
			}
539
		} while (work);
540

541
		raw_spin_lock(&wqe->lock);
542 543 544 545 546 547
	} while (1);
}

static int io_wqe_worker(void *data)
{
	struct io_worker *worker = data;
548
	struct io_wqe_acct *acct = io_wqe_get_acct(worker);
549 550
	struct io_wqe *wqe = worker->wqe;
	struct io_wq *wq = wqe->wq;
J
Jens Axboe 已提交
551
	bool last_timeout = false;
552
	char buf[TASK_COMM_LEN];
553

554 555
	worker->flags |= (IO_WORKER_F_UP | IO_WORKER_F_RUNNING);

556
	snprintf(buf, sizeof(buf), "iou-wrk-%d", wq->task->pid);
557
	set_task_comm(current, buf);
558 559

	while (!test_bit(IO_WQ_BIT_EXIT, &wq->state)) {
560 561
		long ret;

J
Jens Axboe 已提交
562
		set_current_state(TASK_INTERRUPTIBLE);
563
loop:
564
		raw_spin_lock(&wqe->lock);
565
		if (io_acct_run_queue(acct)) {
566
			io_worker_handle_work(worker);
567
			goto loop;
568
		}
J
Jens Axboe 已提交
569 570
		/* timed out, exit unless we're the last worker */
		if (last_timeout && acct->nr_workers > 1) {
571
			acct->nr_workers--;
J
Jens Axboe 已提交
572 573 574 575 576
			raw_spin_unlock(&wqe->lock);
			__set_current_state(TASK_RUNNING);
			break;
		}
		last_timeout = false;
577
		__io_worker_idle(wqe, worker);
578
		raw_spin_unlock(&wqe->lock);
579 580
		if (io_flush_signals())
			continue;
581
		ret = schedule_timeout(WORKER_IDLE_TIMEOUT);
582 583 584 585 586
		if (signal_pending(current)) {
			struct ksignal ksig;

			if (!get_signal(&ksig))
				continue;
587
			break;
588
		}
J
Jens Axboe 已提交
589
		last_timeout = !ret;
590 591 592
	}

	if (test_bit(IO_WQ_BIT_EXIT, &wq->state)) {
593
		raw_spin_lock(&wqe->lock);
P
Pavel Begunkov 已提交
594
		io_worker_handle_work(worker);
595 596 597 598 599 600 601 602 603 604 605
	}

	io_worker_exit(worker);
	return 0;
}

/*
 * Called when a worker is scheduled in. Mark us as currently running.
 */
void io_wq_worker_running(struct task_struct *tsk)
{
606
	struct io_worker *worker = tsk->pf_io_worker;
607

608 609
	if (!worker)
		return;
610 611 612 613 614
	if (!(worker->flags & IO_WORKER_F_UP))
		return;
	if (worker->flags & IO_WORKER_F_RUNNING)
		return;
	worker->flags |= IO_WORKER_F_RUNNING;
615
	io_wqe_inc_running(worker);
616 617 618 619
}

/*
 * Called when worker is going to sleep. If there are no workers currently
620
 * running and we have work pending, wake up a free one or create a new one.
621 622 623
 */
void io_wq_worker_sleeping(struct task_struct *tsk)
{
624
	struct io_worker *worker = tsk->pf_io_worker;
625

626 627
	if (!worker)
		return;
628 629 630 631 632 633 634
	if (!(worker->flags & IO_WORKER_F_UP))
		return;
	if (!(worker->flags & IO_WORKER_F_RUNNING))
		return;

	worker->flags &= ~IO_WORKER_F_RUNNING;

635
	raw_spin_lock(&worker->wqe->lock);
636
	io_wqe_dec_running(worker);
637
	raw_spin_unlock(&worker->wqe->lock);
638 639
}

640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704
static void io_init_new_worker(struct io_wqe *wqe, struct io_worker *worker,
			       struct task_struct *tsk)
{
	tsk->pf_io_worker = worker;
	worker->task = tsk;
	set_cpus_allowed_ptr(tsk, wqe->cpu_mask);
	tsk->flags |= PF_NO_SETAFFINITY;

	raw_spin_lock(&wqe->lock);
	hlist_nulls_add_head_rcu(&worker->nulls_node, &wqe->free_list);
	list_add_tail_rcu(&worker->all_list, &wqe->all_list);
	worker->flags |= IO_WORKER_F_FREE;
	raw_spin_unlock(&wqe->lock);
	wake_up_new_task(tsk);
}

static bool io_wq_work_match_all(struct io_wq_work *work, void *data)
{
	return true;
}

static inline bool io_should_retry_thread(long err)
{
	switch (err) {
	case -EAGAIN:
	case -ERESTARTSYS:
	case -ERESTARTNOINTR:
	case -ERESTARTNOHAND:
		return true;
	default:
		return false;
	}
}

static void create_worker_cont(struct callback_head *cb)
{
	struct io_worker *worker;
	struct task_struct *tsk;
	struct io_wqe *wqe;

	worker = container_of(cb, struct io_worker, create_work);
	clear_bit_unlock(0, &worker->create_state);
	wqe = worker->wqe;
	tsk = create_io_thread(io_wqe_worker, worker, wqe->node);
	if (!IS_ERR(tsk)) {
		io_init_new_worker(wqe, worker, tsk);
		io_worker_release(worker);
		return;
	} else if (!io_should_retry_thread(PTR_ERR(tsk))) {
		struct io_wqe_acct *acct = io_wqe_get_acct(worker);

		atomic_dec(&acct->nr_running);
		raw_spin_lock(&wqe->lock);
		acct->nr_workers--;
		if (!acct->nr_workers) {
			struct io_cb_cancel_data match = {
				.fn		= io_wq_work_match_all,
				.cancel_all	= true,
			};

			while (io_acct_cancel_pending_work(wqe, acct, &match))
				raw_spin_lock(&wqe->lock);
		}
		raw_spin_unlock(&wqe->lock);
		io_worker_ref_put(wqe->wq);
705
		kfree(worker);
706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721
		return;
	}

	/* re-create attempts grab a new worker ref, drop the existing one */
	io_worker_release(worker);
	schedule_work(&worker->work);
}

static void io_workqueue_create(struct work_struct *work)
{
	struct io_worker *worker = container_of(work, struct io_worker, work);
	struct io_wqe_acct *acct = io_wqe_get_acct(worker);

	if (!io_queue_worker_create(worker, acct, create_worker_cont)) {
		clear_bit_unlock(0, &worker->create_state);
		io_worker_release(worker);
722
		kfree(worker);
723 724 725 726
	}
}

static bool create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index)
727
{
728
	struct io_wqe_acct *acct = &wqe->acct[index];
729
	struct io_worker *worker;
730
	struct task_struct *tsk;
731

732 733
	__set_current_state(TASK_RUNNING);

734
	worker = kzalloc_node(sizeof(*worker), GFP_KERNEL, wqe->node);
735
	if (!worker) {
736 737
fail:
		atomic_dec(&acct->nr_running);
738
		raw_spin_lock(&wqe->lock);
739
		acct->nr_workers--;
740
		raw_spin_unlock(&wqe->lock);
741
		io_worker_ref_put(wq);
742
		return false;
743
	}
744

745 746 747 748
	refcount_set(&worker->ref, 1);
	worker->wqe = wqe;
	spin_lock_init(&worker->lock);
	init_completion(&worker->ref_done);
749 750 751

	if (index == IO_WQ_ACCT_BOUND)
		worker->flags |= IO_WORKER_F_BOUND;
752 753 754 755 756

	tsk = create_io_thread(io_wqe_worker, worker, wqe->node);
	if (!IS_ERR(tsk)) {
		io_init_new_worker(wqe, worker, tsk);
	} else if (!io_should_retry_thread(PTR_ERR(tsk))) {
757
		kfree(worker);
758 759 760 761 762 763 764
		goto fail;
	} else {
		INIT_WORK(&worker->work, io_workqueue_create);
		schedule_work(&worker->work);
	}

	return true;
765 766
}

767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793
/*
 * Iterate the passed in list and call the specific function for each
 * worker that isn't exiting
 */
static bool io_wq_for_each_worker(struct io_wqe *wqe,
				  bool (*func)(struct io_worker *, void *),
				  void *data)
{
	struct io_worker *worker;
	bool ret = false;

	list_for_each_entry_rcu(worker, &wqe->all_list, all_list) {
		if (io_worker_get(worker)) {
			/* no task if node is/was offline */
			if (worker->task)
				ret = func(worker, data);
			io_worker_release(worker);
			if (ret)
				break;
		}
	}

	return ret;
}

static bool io_wq_worker_wake(struct io_worker *worker, void *data)
{
794
	set_notify_signal(worker->task);
795 796 797 798
	wake_up_process(worker->task);
	return false;
}

799
static void io_run_cancel(struct io_wq_work *work, struct io_wqe *wqe)
800
{
801 802
	struct io_wq *wq = wqe->wq;

803 804
	do {
		work->flags |= IO_WQ_WORK_CANCEL;
805 806
		wq->do_work(work);
		work = wq->free_work(work);
807 808 809
	} while (work);
}

810 811
static void io_wqe_insert_work(struct io_wqe *wqe, struct io_wq_work *work)
{
812
	struct io_wqe_acct *acct = io_work_get_acct(wqe, work);
813 814 815 816 817
	unsigned int hash;
	struct io_wq_work *tail;

	if (!io_wq_is_hashed(work)) {
append:
818
		wq_list_add_tail(&work->list, &acct->work_list);
819 820 821 822 823 824 825 826 827
		return;
	}

	hash = io_get_work_hash(work);
	tail = wqe->hash_tail[hash];
	wqe->hash_tail[hash] = work;
	if (!tail)
		goto append;

828
	wq_list_add_after(&work->list, &tail->list, &acct->work_list);
829 830
}

831 832 833 834 835
static bool io_wq_work_match_item(struct io_wq_work *work, void *data)
{
	return work == data;
}

836 837
static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work)
{
838
	struct io_wqe_acct *acct = io_work_get_acct(wqe, work);
839 840
	unsigned work_flags = work->flags;
	bool do_create;
841

842 843 844 845 846 847
	/*
	 * If io-wq is exiting for this task, or if the request has explicitly
	 * been marked as one that should not get executed, cancel it here.
	 */
	if (test_bit(IO_WQ_BIT_EXIT, &wqe->wq->state) ||
	    (work->flags & IO_WQ_WORK_CANCEL)) {
848
		io_run_cancel(work, wqe);
849 850 851
		return;
	}

852
	raw_spin_lock(&wqe->lock);
853
	io_wqe_insert_work(wqe, work);
854
	clear_bit(IO_ACCT_STALLED_BIT, &acct->flags);
855 856

	rcu_read_lock();
857
	do_create = !io_wqe_activate_free_worker(wqe, acct);
858 859
	rcu_read_unlock();

860
	raw_spin_unlock(&wqe->lock);
861

862
	if (do_create && ((work_flags & IO_WQ_WORK_CONCURRENT) ||
863 864 865 866
	    !atomic_read(&acct->nr_running))) {
		bool did_create;

		did_create = io_wqe_create_worker(wqe, acct);
867 868 869 870 871 872 873 874 875 876 877 878 879 880
		if (likely(did_create))
			return;

		raw_spin_lock(&wqe->lock);
		/* fatal condition, failed to create the first worker */
		if (!acct->nr_workers) {
			struct io_cb_cancel_data match = {
				.fn		= io_wq_work_match_item,
				.data		= work,
				.cancel_all	= false,
			};

			if (io_acct_cancel_pending_work(wqe, acct, &match))
				raw_spin_lock(&wqe->lock);
881
		}
882
		raw_spin_unlock(&wqe->lock);
883
	}
884 885 886 887 888 889 890 891 892 893
}

void io_wq_enqueue(struct io_wq *wq, struct io_wq_work *work)
{
	struct io_wqe *wqe = wq->wqes[numa_node_id()];

	io_wqe_enqueue(wqe, work);
}

/*
894 895
 * Work items that hash to the same value will not be done in parallel.
 * Used to limit concurrent writes, generally hashed by inode.
896
 */
897
void io_wq_hash_work(struct io_wq_work *work, void *val)
898
{
899
	unsigned int bit;
900 901 902 903 904

	bit = hash_ptr(val, IO_WQ_HASH_ORDER);
	work->flags |= (IO_WQ_WORK_HASHED | (bit << IO_WQ_HASH_SHIFT));
}

905
static bool io_wq_worker_cancel(struct io_worker *worker, void *data)
906
{
907
	struct io_cb_cancel_data *match = data;
908 909 910

	/*
	 * Hold the lock to avoid ->cur_work going out of scope, caller
911
	 * may dereference the passed in work.
912
	 */
913
	spin_lock(&worker->lock);
914
	if (worker->cur_work &&
915
	    match->fn(worker->cur_work, match->data)) {
916
		set_notify_signal(worker->task);
917
		match->nr_running++;
918
	}
919
	spin_unlock(&worker->lock);
920

921
	return match->nr_running && !match->cancel_all;
922 923
}

924 925 926 927
static inline void io_wqe_remove_pending(struct io_wqe *wqe,
					 struct io_wq_work *work,
					 struct io_wq_work_node *prev)
{
928
	struct io_wqe_acct *acct = io_work_get_acct(wqe, work);
929 930 931 932 933 934 935 936 937 938 939
	unsigned int hash = io_get_work_hash(work);
	struct io_wq_work *prev_work = NULL;

	if (io_wq_is_hashed(work) && work == wqe->hash_tail[hash]) {
		if (prev)
			prev_work = container_of(prev, struct io_wq_work, list);
		if (prev_work && io_get_work_hash(prev_work) == hash)
			wqe->hash_tail[hash] = prev_work;
		else
			wqe->hash_tail[hash] = NULL;
	}
940
	wq_list_del(&acct->work_list, &work->list, prev);
941 942
}

943 944 945 946
static bool io_acct_cancel_pending_work(struct io_wqe *wqe,
					struct io_wqe_acct *acct,
					struct io_cb_cancel_data *match)
	__releases(wqe->lock)
947
{
J
Jens Axboe 已提交
948
	struct io_wq_work_node *node, *prev;
949 950
	struct io_wq_work *work;

951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969
	wq_list_for_each(node, prev, &acct->work_list) {
		work = container_of(node, struct io_wq_work, list);
		if (!match->fn(work, match->data))
			continue;
		io_wqe_remove_pending(wqe, work, prev);
		raw_spin_unlock(&wqe->lock);
		io_run_cancel(work, wqe);
		match->nr_pending++;
		/* not safe to continue after unlock */
		return true;
	}

	return false;
}

static void io_wqe_cancel_pending_work(struct io_wqe *wqe,
				       struct io_cb_cancel_data *match)
{
	int i;
970
retry:
971
	raw_spin_lock(&wqe->lock);
972 973
	for (i = 0; i < IO_WQ_ACCT_NR; i++) {
		struct io_wqe_acct *acct = io_get_acct(wqe, i == 0);
974

975 976 977 978
		if (io_acct_cancel_pending_work(wqe, acct, match)) {
			if (match->cancel_all)
				goto retry;
			return;
979
		}
980
	}
981
	raw_spin_unlock(&wqe->lock);
982 983
}

984
static void io_wqe_cancel_running_work(struct io_wqe *wqe,
985 986
				       struct io_cb_cancel_data *match)
{
987
	rcu_read_lock();
988
	io_wq_for_each_worker(wqe, io_wq_worker_cancel, match);
989 990 991
	rcu_read_unlock();
}

992
enum io_wq_cancel io_wq_cancel_cb(struct io_wq *wq, work_cancel_fn *cancel,
993
				  void *data, bool cancel_all)
994
{
995
	struct io_cb_cancel_data match = {
996 997 998
		.fn		= cancel,
		.data		= data,
		.cancel_all	= cancel_all,
999
	};
J
Jann Horn 已提交
1000
	int node;
1001

1002 1003 1004 1005 1006
	/*
	 * First check pending list, if we're lucky we can just remove it
	 * from there. CANCEL_OK means that the work is returned as-new,
	 * no completion will be posted for it.
	 */
J
Jann Horn 已提交
1007 1008
	for_each_node(node) {
		struct io_wqe *wqe = wq->wqes[node];
1009

1010 1011
		io_wqe_cancel_pending_work(wqe, &match);
		if (match.nr_pending && !match.cancel_all)
1012
			return IO_WQ_CANCEL_OK;
1013 1014
	}

1015 1016 1017 1018 1019 1020 1021 1022 1023
	/*
	 * Now check if a free (going busy) or busy worker has the work
	 * currently running. If we find it there, we'll return CANCEL_RUNNING
	 * as an indication that we attempt to signal cancellation. The
	 * completion will run normally in this case.
	 */
	for_each_node(node) {
		struct io_wqe *wqe = wq->wqes[node];

1024 1025
		io_wqe_cancel_running_work(wqe, &match);
		if (match.nr_running && !match.cancel_all)
1026 1027 1028
			return IO_WQ_CANCEL_RUNNING;
	}

1029 1030 1031 1032
	if (match.nr_running)
		return IO_WQ_CANCEL_RUNNING;
	if (match.nr_pending)
		return IO_WQ_CANCEL_OK;
1033
	return IO_WQ_CANCEL_NOTFOUND;
1034 1035
}

1036 1037 1038 1039
static int io_wqe_hash_wake(struct wait_queue_entry *wait, unsigned mode,
			    int sync, void *key)
{
	struct io_wqe *wqe = container_of(wait, struct io_wqe, wait);
1040
	int i;
1041 1042 1043 1044

	list_del_init(&wait->entry);

	rcu_read_lock();
1045 1046 1047 1048 1049 1050
	for (i = 0; i < IO_WQ_ACCT_NR; i++) {
		struct io_wqe_acct *acct = &wqe->acct[i];

		if (test_and_clear_bit(IO_ACCT_STALLED_BIT, &acct->flags))
			io_wqe_activate_free_worker(wqe, acct);
	}
1051 1052 1053 1054
	rcu_read_unlock();
	return 1;
}

1055
struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data)
1056
{
1057
	int ret, node, i;
1058 1059
	struct io_wq *wq;

1060
	if (WARN_ON_ONCE(!data->free_work || !data->do_work))
1061
		return ERR_PTR(-EINVAL);
P
Pavel Begunkov 已提交
1062 1063
	if (WARN_ON_ONCE(!bounded))
		return ERR_PTR(-EINVAL);
1064

1065
	wq = kzalloc(struct_size(wq, wqes, nr_node_ids), GFP_KERNEL);
1066 1067
	if (!wq)
		return ERR_PTR(-ENOMEM);
1068 1069
	ret = cpuhp_state_add_instance_nocalls(io_wq_online, &wq->cpuhp_node);
	if (ret)
1070
		goto err_wq;
1071

1072 1073
	refcount_inc(&data->hash->refs);
	wq->hash = data->hash;
1074
	wq->free_work = data->free_work;
1075
	wq->do_work = data->do_work;
1076

1077
	ret = -ENOMEM;
J
Jann Horn 已提交
1078
	for_each_node(node) {
1079
		struct io_wqe *wqe;
1080
		int alloc_node = node;
1081

1082 1083 1084
		if (!node_online(alloc_node))
			alloc_node = NUMA_NO_NODE;
		wqe = kzalloc_node(sizeof(struct io_wqe), GFP_KERNEL, alloc_node);
1085
		if (!wqe)
J
Jann Horn 已提交
1086
			goto err;
J
Jens Axboe 已提交
1087 1088 1089
		if (!alloc_cpumask_var(&wqe->cpu_mask, GFP_KERNEL))
			goto err;
		cpumask_copy(wqe->cpu_mask, cpumask_of_node(node));
J
Jann Horn 已提交
1090
		wq->wqes[node] = wqe;
1091
		wqe->node = alloc_node;
1092
		wqe->acct[IO_WQ_ACCT_BOUND].max_workers = bounded;
J
Jens Axboe 已提交
1093
		wqe->acct[IO_WQ_ACCT_UNBOUND].max_workers =
1094
					task_rlimit(current, RLIMIT_NPROC);
1095
		INIT_LIST_HEAD(&wqe->wait.entry);
1096 1097 1098 1099 1100 1101 1102 1103
		wqe->wait.func = io_wqe_hash_wake;
		for (i = 0; i < IO_WQ_ACCT_NR; i++) {
			struct io_wqe_acct *acct = &wqe->acct[i];

			acct->index = i;
			atomic_set(&acct->nr_running, 0);
			INIT_WQ_LIST(&acct->work_list);
		}
1104
		wqe->wq = wq;
1105
		raw_spin_lock_init(&wqe->lock);
1106
		INIT_HLIST_NULLS_HEAD(&wqe->free_list, 0);
1107
		INIT_LIST_HEAD(&wqe->all_list);
1108 1109
	}

1110 1111 1112 1113
	wq->task = get_task_struct(data->task);
	atomic_set(&wq->worker_refs, 1);
	init_completion(&wq->worker_done);
	return wq;
1114
err:
1115
	io_wq_put_hash(data->hash);
1116
	cpuhp_state_remove_instance_nocalls(io_wq_online, &wq->cpuhp_node);
J
Jens Axboe 已提交
1117 1118 1119 1120
	for_each_node(node) {
		if (!wq->wqes[node])
			continue;
		free_cpumask_var(wq->wqes[node]->cpu_mask);
J
Jann Horn 已提交
1121
		kfree(wq->wqes[node]);
J
Jens Axboe 已提交
1122
	}
1123
err_wq:
1124
	kfree(wq);
1125 1126 1127
	return ERR_PTR(ret);
}

1128 1129
static bool io_task_work_match(struct callback_head *cb, void *data)
{
1130
	struct io_worker *worker;
1131

1132
	if (cb->func != create_worker_cb && cb->func != create_worker_cont)
1133
		return false;
1134 1135
	worker = container_of(cb, struct io_worker, create_work);
	return worker->wqe->wq == data;
1136 1137
}

1138 1139 1140 1141 1142
void io_wq_exit_start(struct io_wq *wq)
{
	set_bit(IO_WQ_BIT_EXIT, &wq->state);
}

1143
static void io_wq_exit_workers(struct io_wq *wq)
1144
{
1145 1146 1147 1148 1149 1150
	struct callback_head *cb;
	int node;

	if (!wq->task)
		return;

1151
	while ((cb = task_work_cancel_match(wq->task, io_task_work_match, wq)) != NULL) {
1152
		struct io_worker *worker;
1153
		struct io_wqe_acct *acct;
1154

1155
		worker = container_of(cb, struct io_worker, create_work);
1156 1157 1158 1159 1160
		acct = io_wqe_get_acct(worker);
		atomic_dec(&acct->nr_running);
		raw_spin_lock(&worker->wqe->lock);
		acct->nr_workers--;
		raw_spin_unlock(&worker->wqe->lock);
1161
		io_worker_ref_put(wq);
1162 1163
		clear_bit_unlock(0, &worker->create_state);
		io_worker_release(worker);
1164 1165 1166 1167 1168 1169 1170
	}

	rcu_read_lock();
	for_each_node(node) {
		struct io_wqe *wqe = wq->wqes[node];

		io_wq_for_each_worker(wqe, io_wq_worker_wake, NULL);
1171
	}
1172 1173 1174
	rcu_read_unlock();
	io_worker_ref_put(wq);
	wait_for_completion(&wq->worker_done);
1175 1176 1177 1178 1179 1180

	for_each_node(node) {
		spin_lock_irq(&wq->hash->wait.lock);
		list_del_init(&wq->wqes[node]->wait.entry);
		spin_unlock_irq(&wq->hash->wait.lock);
	}
1181 1182
	put_task_struct(wq->task);
	wq->task = NULL;
1183 1184
}

1185
static void io_wq_destroy(struct io_wq *wq)
1186
{
J
Jann Horn 已提交
1187
	int node;
1188

1189 1190
	cpuhp_state_remove_instance_nocalls(io_wq_online, &wq->cpuhp_node);

1191 1192
	for_each_node(node) {
		struct io_wqe *wqe = wq->wqes[node];
1193 1194 1195 1196 1197
		struct io_cb_cancel_data match = {
			.fn		= io_wq_work_match_all,
			.cancel_all	= true,
		};
		io_wqe_cancel_pending_work(wqe, &match);
J
Jens Axboe 已提交
1198
		free_cpumask_var(wqe->cpu_mask);
1199 1200 1201
		kfree(wqe);
	}
	io_wq_put_hash(wq->hash);
1202
	kfree(wq);
1203 1204
}

1205 1206
void io_wq_put_and_exit(struct io_wq *wq)
{
1207 1208
	WARN_ON_ONCE(!test_bit(IO_WQ_BIT_EXIT, &wq->state));

1209
	io_wq_exit_workers(wq);
1210
	io_wq_destroy(wq);
1211 1212
}

J
Jens Axboe 已提交
1213 1214 1215 1216 1217
struct online_data {
	unsigned int cpu;
	bool online;
};

1218 1219
static bool io_wq_worker_affinity(struct io_worker *worker, void *data)
{
J
Jens Axboe 已提交
1220
	struct online_data *od = data;
1221

J
Jens Axboe 已提交
1222 1223 1224 1225
	if (od->online)
		cpumask_set_cpu(od->cpu, worker->wqe->cpu_mask);
	else
		cpumask_clear_cpu(od->cpu, worker->wqe->cpu_mask);
1226 1227 1228
	return false;
}

J
Jens Axboe 已提交
1229
static int __io_wq_cpu_online(struct io_wq *wq, unsigned int cpu, bool online)
1230
{
J
Jens Axboe 已提交
1231 1232 1233 1234
	struct online_data od = {
		.cpu = cpu,
		.online = online
	};
1235 1236 1237 1238
	int i;

	rcu_read_lock();
	for_each_node(i)
J
Jens Axboe 已提交
1239
		io_wq_for_each_worker(wq->wqes[i], io_wq_worker_affinity, &od);
1240 1241 1242 1243
	rcu_read_unlock();
	return 0;
}

J
Jens Axboe 已提交
1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256 1257
static int io_wq_cpu_online(unsigned int cpu, struct hlist_node *node)
{
	struct io_wq *wq = hlist_entry_safe(node, struct io_wq, cpuhp_node);

	return __io_wq_cpu_online(wq, cpu, true);
}

static int io_wq_cpu_offline(unsigned int cpu, struct hlist_node *node)
{
	struct io_wq *wq = hlist_entry_safe(node, struct io_wq, cpuhp_node);

	return __io_wq_cpu_online(wq, cpu, false);
}

1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274
int io_wq_cpu_affinity(struct io_wq *wq, cpumask_var_t mask)
{
	int i;

	rcu_read_lock();
	for_each_node(i) {
		struct io_wqe *wqe = wq->wqes[i];

		if (mask)
			cpumask_copy(wqe->cpu_mask, mask);
		else
			cpumask_copy(wqe->cpu_mask, cpumask_of_node(i));
	}
	rcu_read_unlock();
	return 0;
}

1275 1276 1277 1278 1279 1280 1281 1282
/*
 * Set max number of unbounded workers, returns old value. If new_count is 0,
 * then just return the old value.
 */
int io_wq_max_workers(struct io_wq *wq, int *new_count)
{
	int i, node, prev = 0;

1283 1284 1285 1286
	BUILD_BUG_ON((int) IO_WQ_ACCT_BOUND   != (int) IO_WQ_BOUND);
	BUILD_BUG_ON((int) IO_WQ_ACCT_UNBOUND != (int) IO_WQ_UNBOUND);
	BUILD_BUG_ON((int) IO_WQ_ACCT_NR      != 2);

1287 1288 1289 1290 1291 1292 1293
	for (i = 0; i < 2; i++) {
		if (new_count[i] > task_rlimit(current, RLIMIT_NPROC))
			new_count[i] = task_rlimit(current, RLIMIT_NPROC);
	}

	rcu_read_lock();
	for_each_node(node) {
P
Pavel Begunkov 已提交
1294
		struct io_wqe *wqe = wq->wqes[node];
1295 1296
		struct io_wqe_acct *acct;

P
Pavel Begunkov 已提交
1297
		raw_spin_lock(&wqe->lock);
1298
		for (i = 0; i < IO_WQ_ACCT_NR; i++) {
P
Pavel Begunkov 已提交
1299
			acct = &wqe->acct[i];
1300 1301 1302 1303 1304
			prev = max_t(int, acct->max_workers, prev);
			if (new_count[i])
				acct->max_workers = new_count[i];
			new_count[i] = prev;
		}
P
Pavel Begunkov 已提交
1305
		raw_spin_unlock(&wqe->lock);
1306 1307 1308 1309 1310
	}
	rcu_read_unlock();
	return 0;
}

1311 1312 1313 1314 1315
static __init int io_wq_init(void)
{
	int ret;

	ret = cpuhp_setup_state_multi(CPUHP_AP_ONLINE_DYN, "io-wq/online",
J
Jens Axboe 已提交
1316
					io_wq_cpu_online, io_wq_cpu_offline);
1317 1318 1319 1320 1321 1322
	if (ret < 0)
		return ret;
	io_wq_online = ret;
	return 0;
}
subsys_initcall(io_wq_init);