io-wq.c 31.0 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14
// SPDX-License-Identifier: GPL-2.0
/*
 * Basic worker thread pool for io_uring
 *
 * Copyright (C) 2019 Jens Axboe
 *
 */
#include <linux/kernel.h>
#include <linux/init.h>
#include <linux/errno.h>
#include <linux/sched/signal.h>
#include <linux/percpu.h>
#include <linux/slab.h>
#include <linux/rculist_nulls.h>
15
#include <linux/cpu.h>
16
#include <linux/tracehook.h>
17
#include <uapi/linux/io_uring.h>
18 19 20 21 22 23 24 25 26

#include "io-wq.h"

#define WORKER_IDLE_TIMEOUT	(5 * HZ)

enum {
	IO_WORKER_F_UP		= 1,	/* up and active */
	IO_WORKER_F_RUNNING	= 2,	/* account as running */
	IO_WORKER_F_FREE	= 4,	/* worker on free list */
J
Jens Axboe 已提交
27
	IO_WORKER_F_BOUND	= 8,	/* is doing bounded work */
28 29 30 31 32 33 34
};

enum {
	IO_WQ_BIT_EXIT		= 0,	/* wq exiting */
};

enum {
35
	IO_ACCT_STALLED_BIT	= 0,	/* stalled on hash */
36 37 38 39 40 41 42 43 44
};

/*
 * One for each thread in a wqe pool
 */
struct io_worker {
	refcount_t ref;
	unsigned flags;
	struct hlist_nulls_node nulls_node;
45
	struct list_head all_list;
46 47
	struct task_struct *task;
	struct io_wqe *wqe;
48

49
	struct io_wq_work *cur_work;
50
	spinlock_t lock;
51

52 53
	struct completion ref_done;

54 55 56 57
	unsigned long create_state;
	struct callback_head create_work;
	int create_index;

58 59 60 61
	union {
		struct rcu_head rcu;
		struct work_struct work;
	};
62 63 64 65 66 67 68 69
};

#if BITS_PER_LONG == 64
#define IO_WQ_HASH_ORDER	6
#else
#define IO_WQ_HASH_ORDER	5
#endif

70 71
#define IO_WQ_NR_HASH_BUCKETS	(1u << IO_WQ_HASH_ORDER)

72 73 74
struct io_wqe_acct {
	unsigned nr_workers;
	unsigned max_workers;
75
	int index;
76
	atomic_t nr_running;
77 78
	struct io_wq_work_list work_list;
	unsigned long flags;
79 80 81 82 83
};

enum {
	IO_WQ_ACCT_BOUND,
	IO_WQ_ACCT_UNBOUND,
84
	IO_WQ_ACCT_NR,
85 86
};

87 88 89 90
/*
 * Per-node worker thread pool
 */
struct io_wqe {
91 92
	raw_spinlock_t lock;
	struct io_wqe_acct acct[2];
93 94 95

	int node;

96
	struct hlist_nulls_head free_list;
97
	struct list_head all_list;
98

99 100
	struct wait_queue_entry wait;

101
	struct io_wq *wq;
102
	struct io_wq_work *hash_tail[IO_WQ_NR_HASH_BUCKETS];
J
Jens Axboe 已提交
103 104

	cpumask_var_t cpu_mask;
105 106 107 108 109 110 111 112
};

/*
 * Per io_wq state
  */
struct io_wq {
	unsigned long state;

113
	free_work_fn *free_work;
114
	io_wq_work_fn *do_work;
115

116 117
	struct io_wq_hash *hash;

118 119 120
	atomic_t worker_refs;
	struct completion worker_done;

121
	struct hlist_node cpuhp_node;
122

123
	struct task_struct *task;
124 125

	struct io_wqe *wqes[];
126 127
};

128 129
static enum cpuhp_state io_wq_online;

130 131 132 133 134 135 136 137
struct io_cb_cancel_data {
	work_cancel_fn *fn;
	void *data;
	int nr_running;
	int nr_pending;
	bool cancel_all;
};

138
static bool create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index);
139
static void io_wqe_dec_running(struct io_worker *worker);
140 141 142
static bool io_acct_cancel_pending_work(struct io_wqe *wqe,
					struct io_wqe_acct *acct,
					struct io_cb_cancel_data *match);
143

144 145 146 147 148 149 150 151
static bool io_worker_get(struct io_worker *worker)
{
	return refcount_inc_not_zero(&worker->ref);
}

static void io_worker_release(struct io_worker *worker)
{
	if (refcount_dec_and_test(&worker->ref))
152
		complete(&worker->ref_done);
153 154
}

P
Pavel Begunkov 已提交
155 156 157 158 159
static inline struct io_wqe_acct *io_get_acct(struct io_wqe *wqe, bool bound)
{
	return &wqe->acct[bound ? IO_WQ_ACCT_BOUND : IO_WQ_ACCT_UNBOUND];
}

160 161 162
static inline struct io_wqe_acct *io_work_get_acct(struct io_wqe *wqe,
						   struct io_wq_work *work)
{
P
Pavel Begunkov 已提交
163
	return io_get_acct(wqe, !(work->flags & IO_WQ_WORK_UNBOUND));
164 165
}

166
static inline struct io_wqe_acct *io_wqe_get_acct(struct io_worker *worker)
167
{
P
Pavel Begunkov 已提交
168
	return io_get_acct(worker->wqe, worker->flags & IO_WORKER_F_BOUND);
169 170
}

171 172 173 174 175 176
static void io_worker_ref_put(struct io_wq *wq)
{
	if (atomic_dec_and_test(&wq->worker_refs))
		complete(&wq->worker_done);
}

177 178 179 180
static void io_worker_exit(struct io_worker *worker)
{
	struct io_wqe *wqe = worker->wqe;

181 182 183
	if (refcount_dec_and_test(&worker->ref))
		complete(&worker->ref_done);
	wait_for_completion(&worker->ref_done);
184

185
	raw_spin_lock(&wqe->lock);
186
	if (worker->flags & IO_WORKER_F_FREE)
187
		hlist_nulls_del_rcu(&worker->nulls_node);
188
	list_del_rcu(&worker->all_list);
189 190 191 192 193
	preempt_disable();
	io_wqe_dec_running(worker);
	worker->flags = 0;
	current->flags &= ~PF_IO_WORKER;
	preempt_enable();
194
	raw_spin_unlock(&wqe->lock);
195

196
	kfree_rcu(worker, rcu);
197
	io_worker_ref_put(wqe->wq);
198
	do_exit(0);
199 200
}

201
static inline bool io_acct_run_queue(struct io_wqe_acct *acct)
202
{
203 204
	if (!wq_list_empty(&acct->work_list) &&
	    !test_bit(IO_ACCT_STALLED_BIT, &acct->flags))
205 206 207 208 209 210
		return true;
	return false;
}

/*
 * Check head of free list for an available worker. If one isn't available,
211
 * caller must create one.
212
 */
213 214
static bool io_wqe_activate_free_worker(struct io_wqe *wqe,
					struct io_wqe_acct *acct)
215 216 217 218 219
	__must_hold(RCU)
{
	struct hlist_nulls_node *n;
	struct io_worker *worker;

220 221 222 223 224 225 226 227
	/*
	 * Iterate free_list and see if we can find an idle worker to
	 * activate. If a given worker is on the free_list but in the process
	 * of exiting, keep trying.
	 */
	hlist_nulls_for_each_entry_rcu(worker, n, &wqe->free_list, nulls_node) {
		if (!io_worker_get(worker))
			continue;
228 229 230 231
		if (io_wqe_get_acct(worker) != acct) {
			io_worker_release(worker);
			continue;
		}
232 233 234 235
		if (wake_up_process(worker->task)) {
			io_worker_release(worker);
			return true;
		}
236 237 238 239 240 241 242 243
		io_worker_release(worker);
	}

	return false;
}

/*
 * We need a worker. If we find a free one, we're good. If not, and we're
244
 * below the max number of workers, create one.
245
 */
246
static bool io_wqe_create_worker(struct io_wqe *wqe, struct io_wqe_acct *acct)
247 248 249 250 251
{
	/*
	 * Most likely an attempt to queue unbounded work on an io_wq that
	 * wasn't setup with any unbounded workers.
	 */
P
Pavel Begunkov 已提交
252 253
	if (unlikely(!acct->max_workers))
		pr_warn_once("io-wq is not configured for unbound workers");
254

255
	raw_spin_lock(&wqe->lock);
256 257 258
	if (acct->nr_workers == acct->max_workers) {
		raw_spin_unlock(&wqe->lock);
		return true;
259
	}
260
	acct->nr_workers++;
261
	raw_spin_unlock(&wqe->lock);
262 263 264
	atomic_inc(&acct->nr_running);
	atomic_inc(&wqe->wq->worker_refs);
	return create_io_worker(wqe->wq, wqe, acct->index);
265 266
}

267
static void io_wqe_inc_running(struct io_worker *worker)
268
{
269
	struct io_wqe_acct *acct = io_wqe_get_acct(worker);
270 271 272 273

	atomic_inc(&acct->nr_running);
}

274 275
static void create_worker_cb(struct callback_head *cb)
{
276
	struct io_worker *worker;
277
	struct io_wq *wq;
278 279
	struct io_wqe *wqe;
	struct io_wqe_acct *acct;
J
Jens Axboe 已提交
280
	bool do_create = false;
281

282 283
	worker = container_of(cb, struct io_worker, create_work);
	wqe = worker->wqe;
284
	wq = wqe->wq;
285
	acct = &wqe->acct[worker->create_index];
286
	raw_spin_lock(&wqe->lock);
287
	if (acct->nr_workers < acct->max_workers) {
288
		acct->nr_workers++;
289 290
		do_create = true;
	}
291
	raw_spin_unlock(&wqe->lock);
292
	if (do_create) {
J
Jens Axboe 已提交
293
		create_io_worker(wq, wqe, worker->create_index);
294 295 296 297
	} else {
		atomic_dec(&acct->nr_running);
		io_worker_ref_put(wq);
	}
298 299
	clear_bit_unlock(0, &worker->create_state);
	io_worker_release(worker);
300 301
}

302 303 304
static bool io_queue_worker_create(struct io_worker *worker,
				   struct io_wqe_acct *acct,
				   task_work_func_t func)
305
{
306
	struct io_wqe *wqe = worker->wqe;
307 308 309 310 311
	struct io_wq *wq = wqe->wq;

	/* raced with exit, just ignore create call */
	if (test_bit(IO_WQ_BIT_EXIT, &wq->state))
		goto fail;
312 313 314 315 316 317 318 319 320 321 322
	if (!io_worker_get(worker))
		goto fail;
	/*
	 * create_state manages ownership of create_work/index. We should
	 * only need one entry per worker, as the worker going to sleep
	 * will trigger the condition, and waking will clear it once it
	 * runs the task_work.
	 */
	if (test_bit(0, &worker->create_state) ||
	    test_and_set_bit_lock(0, &worker->create_state))
		goto fail_release;
323

324
	init_task_work(&worker->create_work, func);
325
	worker->create_index = acct->index;
326 327
	if (!task_work_add(wq->task, &worker->create_work, TWA_SIGNAL)) {
		clear_bit_unlock(0, &worker->create_state);
328
		return true;
329
	}
330 331 332
	clear_bit_unlock(0, &worker->create_state);
fail_release:
	io_worker_release(worker);
333 334 335
fail:
	atomic_dec(&acct->nr_running);
	io_worker_ref_put(wq);
336
	return false;
337 338
}

339
static void io_wqe_dec_running(struct io_worker *worker)
340 341
	__must_hold(wqe->lock)
{
342 343
	struct io_wqe_acct *acct = io_wqe_get_acct(worker);
	struct io_wqe *wqe = worker->wqe;
344

345 346 347
	if (!(worker->flags & IO_WORKER_F_UP))
		return;

348
	if (atomic_dec_and_test(&acct->nr_running) && io_acct_run_queue(acct)) {
349 350
		atomic_inc(&acct->nr_running);
		atomic_inc(&wqe->wq->worker_refs);
351
		io_queue_worker_create(worker, acct, create_worker_cb);
352
	}
353 354
}

355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375
/*
 * Worker will start processing some work. Move it to the busy list, if
 * it's currently on the freelist
 */
static void __io_worker_busy(struct io_wqe *wqe, struct io_worker *worker,
			     struct io_wq_work *work)
	__must_hold(wqe->lock)
{
	if (worker->flags & IO_WORKER_F_FREE) {
		worker->flags &= ~IO_WORKER_F_FREE;
		hlist_nulls_del_init_rcu(&worker->nulls_node);
	}
}

/*
 * No work, worker going to sleep. Move to freelist, and unuse mm if we
 * have one attached. Dropping the mm may potentially sleep, so we drop
 * the lock in that case and return success. Since the caller has to
 * retry the loop in that case (we changed task state), we don't regrab
 * the lock if we return success.
 */
376
static void __io_worker_idle(struct io_wqe *wqe, struct io_worker *worker)
377 378 379 380
	__must_hold(wqe->lock)
{
	if (!(worker->flags & IO_WORKER_F_FREE)) {
		worker->flags |= IO_WORKER_F_FREE;
381
		hlist_nulls_add_head_rcu(&worker->nulls_node, &wqe->free_list);
382 383 384
	}
}

P
Pavel Begunkov 已提交
385 386 387 388 389
static inline unsigned int io_get_work_hash(struct io_wq_work *work)
{
	return work->flags >> IO_WQ_HASH_SHIFT;
}

390 391 392 393
static void io_wait_on_hash(struct io_wqe *wqe, unsigned int hash)
{
	struct io_wq *wq = wqe->wq;

394
	spin_lock_irq(&wq->hash->wait.lock);
395 396 397 398 399 400 401
	if (list_empty(&wqe->wait.entry)) {
		__add_wait_queue(&wq->hash->wait, &wqe->wait);
		if (!test_bit(hash, &wq->hash->map)) {
			__set_current_state(TASK_RUNNING);
			list_del_init(&wqe->wait.entry);
		}
	}
402
	spin_unlock_irq(&wq->hash->wait.lock);
403 404
}

405
static struct io_wq_work *io_get_next_work(struct io_wqe_acct *acct,
J
Jens Axboe 已提交
406
					   struct io_worker *worker)
407 408
	__must_hold(wqe->lock)
{
J
Jens Axboe 已提交
409
	struct io_wq_work_node *node, *prev;
410
	struct io_wq_work *work, *tail;
411
	unsigned int stall_hash = -1U;
412
	struct io_wqe *wqe = worker->wqe;
413

414
	wq_list_for_each(node, prev, &acct->work_list) {
415 416
		unsigned int hash;

J
Jens Axboe 已提交
417 418
		work = container_of(node, struct io_wq_work, list);

419
		/* not hashed, can run anytime */
420
		if (!io_wq_is_hashed(work)) {
421
			wq_list_del(&acct->work_list, node, prev);
422 423 424
			return work;
		}

P
Pavel Begunkov 已提交
425
		hash = io_get_work_hash(work);
426 427 428 429 430
		/* all items with this hash lie in [work, tail] */
		tail = wqe->hash_tail[hash];

		/* hashed, can run if not already running */
		if (!test_and_set_bit(hash, &wqe->wq->hash->map)) {
431
			wqe->hash_tail[hash] = NULL;
432
			wq_list_cut(&acct->work_list, &tail->list, prev);
433 434
			return work;
		}
435 436 437 438 439 440 441
		if (stall_hash == -1U)
			stall_hash = hash;
		/* fast forward to a next hash, for-each will fix up @prev */
		node = &tail->list;
	}

	if (stall_hash != -1U) {
J
Jens Axboe 已提交
442 443 444 445
		/*
		 * Set this before dropping the lock to avoid racing with new
		 * work being added and clearing the stalled bit.
		 */
446
		set_bit(IO_ACCT_STALLED_BIT, &acct->flags);
447 448 449
		raw_spin_unlock(&wqe->lock);
		io_wait_on_hash(wqe, stall_hash);
		raw_spin_lock(&wqe->lock);
450 451 452 453 454
	}

	return NULL;
}

455
static bool io_flush_signals(void)
456
{
457
	if (unlikely(test_thread_flag(TIF_NOTIFY_SIGNAL))) {
458
		__set_current_state(TASK_RUNNING);
459
		tracehook_notify_signal();
460
		return true;
461
	}
462
	return false;
463 464 465 466 467
}

static void io_assign_current_work(struct io_worker *worker,
				   struct io_wq_work *work)
{
468
	if (work) {
469
		io_flush_signals();
470 471
		cond_resched();
	}
472

473
	spin_lock(&worker->lock);
474
	worker->cur_work = work;
475
	spin_unlock(&worker->lock);
476 477
}

P
Pavel Begunkov 已提交
478 479
static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work);

480 481 482
static void io_worker_handle_work(struct io_worker *worker)
	__releases(wqe->lock)
{
483
	struct io_wqe_acct *acct = io_wqe_get_acct(worker);
484 485
	struct io_wqe *wqe = worker->wqe;
	struct io_wq *wq = wqe->wq;
486
	bool do_kill = test_bit(IO_WQ_BIT_EXIT, &wq->state);
487 488

	do {
489
		struct io_wq_work *work;
490
get_next:
491 492 493 494 495 496 497
		/*
		 * If we got some work, mark us as busy. If we didn't, but
		 * the list isn't empty, it means we stalled on hashed work.
		 * Mark us stalled so we don't keep looking for work when we
		 * can't make progress, any work completion or insertion will
		 * clear the stalled flag.
		 */
498
		work = io_get_next_work(acct, worker);
499 500 501
		if (work)
			__io_worker_busy(wqe, worker, work);

502
		raw_spin_unlock(&wqe->lock);
503 504
		if (!work)
			break;
505
		io_assign_current_work(worker, work);
506
		__set_current_state(TASK_RUNNING);
507

508 509
		/* handle a whole dependent link */
		do {
510
			struct io_wq_work *next_hashed, *linked;
P
Pavel Begunkov 已提交
511
			unsigned int hash = io_get_work_hash(work);
512

513
			next_hashed = wq_next_work(work);
514 515 516

			if (unlikely(do_kill) && (work->flags & IO_WQ_WORK_UNBOUND))
				work->flags |= IO_WQ_WORK_CANCEL;
517 518
			wq->do_work(work);
			io_assign_current_work(worker, NULL);
519

520
			linked = wq->free_work(work);
521 522 523 524 525 526 527 528 529 530
			work = next_hashed;
			if (!work && linked && !io_wq_is_hashed(linked)) {
				work = linked;
				linked = NULL;
			}
			io_assign_current_work(worker, work);
			if (linked)
				io_wqe_enqueue(wqe, linked);

			if (hash != -1U && !next_hashed) {
531
				clear_bit(hash, &wq->hash->map);
532
				clear_bit(IO_ACCT_STALLED_BIT, &acct->flags);
533 534
				if (wq_has_sleeper(&wq->hash->wait))
					wake_up(&wq->hash->wait);
535
				raw_spin_lock(&wqe->lock);
536 537 538
				/* skip unnecessary unlock-lock wqe->lock */
				if (!work)
					goto get_next;
539
				raw_spin_unlock(&wqe->lock);
540
			}
541
		} while (work);
542

543
		raw_spin_lock(&wqe->lock);
544 545 546 547 548 549
	} while (1);
}

static int io_wqe_worker(void *data)
{
	struct io_worker *worker = data;
550
	struct io_wqe_acct *acct = io_wqe_get_acct(worker);
551 552
	struct io_wqe *wqe = worker->wqe;
	struct io_wq *wq = wqe->wq;
J
Jens Axboe 已提交
553
	bool last_timeout = false;
554
	char buf[TASK_COMM_LEN];
555

556 557
	worker->flags |= (IO_WORKER_F_UP | IO_WORKER_F_RUNNING);

558
	snprintf(buf, sizeof(buf), "iou-wrk-%d", wq->task->pid);
559
	set_task_comm(current, buf);
560 561

	while (!test_bit(IO_WQ_BIT_EXIT, &wq->state)) {
562 563
		long ret;

J
Jens Axboe 已提交
564
		set_current_state(TASK_INTERRUPTIBLE);
565
loop:
566
		raw_spin_lock(&wqe->lock);
567
		if (io_acct_run_queue(acct)) {
568
			io_worker_handle_work(worker);
569
			goto loop;
570
		}
J
Jens Axboe 已提交
571 572
		/* timed out, exit unless we're the last worker */
		if (last_timeout && acct->nr_workers > 1) {
573
			acct->nr_workers--;
J
Jens Axboe 已提交
574 575 576 577 578
			raw_spin_unlock(&wqe->lock);
			__set_current_state(TASK_RUNNING);
			break;
		}
		last_timeout = false;
579
		__io_worker_idle(wqe, worker);
580
		raw_spin_unlock(&wqe->lock);
581 582
		if (io_flush_signals())
			continue;
583
		ret = schedule_timeout(WORKER_IDLE_TIMEOUT);
584 585 586 587 588
		if (signal_pending(current)) {
			struct ksignal ksig;

			if (!get_signal(&ksig))
				continue;
589
			break;
590
		}
J
Jens Axboe 已提交
591
		last_timeout = !ret;
592 593 594
	}

	if (test_bit(IO_WQ_BIT_EXIT, &wq->state)) {
595
		raw_spin_lock(&wqe->lock);
P
Pavel Begunkov 已提交
596
		io_worker_handle_work(worker);
597 598 599 600 601 602 603 604 605 606 607
	}

	io_worker_exit(worker);
	return 0;
}

/*
 * Called when a worker is scheduled in. Mark us as currently running.
 */
void io_wq_worker_running(struct task_struct *tsk)
{
608
	struct io_worker *worker = tsk->pf_io_worker;
609

610 611
	if (!worker)
		return;
612 613 614 615 616
	if (!(worker->flags & IO_WORKER_F_UP))
		return;
	if (worker->flags & IO_WORKER_F_RUNNING)
		return;
	worker->flags |= IO_WORKER_F_RUNNING;
617
	io_wqe_inc_running(worker);
618 619 620 621
}

/*
 * Called when worker is going to sleep. If there are no workers currently
622
 * running and we have work pending, wake up a free one or create a new one.
623 624 625
 */
void io_wq_worker_sleeping(struct task_struct *tsk)
{
626
	struct io_worker *worker = tsk->pf_io_worker;
627

628 629
	if (!worker)
		return;
630 631 632 633 634 635 636
	if (!(worker->flags & IO_WORKER_F_UP))
		return;
	if (!(worker->flags & IO_WORKER_F_RUNNING))
		return;

	worker->flags &= ~IO_WORKER_F_RUNNING;

637
	raw_spin_lock(&worker->wqe->lock);
638
	io_wqe_dec_running(worker);
639
	raw_spin_unlock(&worker->wqe->lock);
640 641
}

642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706
static void io_init_new_worker(struct io_wqe *wqe, struct io_worker *worker,
			       struct task_struct *tsk)
{
	tsk->pf_io_worker = worker;
	worker->task = tsk;
	set_cpus_allowed_ptr(tsk, wqe->cpu_mask);
	tsk->flags |= PF_NO_SETAFFINITY;

	raw_spin_lock(&wqe->lock);
	hlist_nulls_add_head_rcu(&worker->nulls_node, &wqe->free_list);
	list_add_tail_rcu(&worker->all_list, &wqe->all_list);
	worker->flags |= IO_WORKER_F_FREE;
	raw_spin_unlock(&wqe->lock);
	wake_up_new_task(tsk);
}

static bool io_wq_work_match_all(struct io_wq_work *work, void *data)
{
	return true;
}

static inline bool io_should_retry_thread(long err)
{
	switch (err) {
	case -EAGAIN:
	case -ERESTARTSYS:
	case -ERESTARTNOINTR:
	case -ERESTARTNOHAND:
		return true;
	default:
		return false;
	}
}

static void create_worker_cont(struct callback_head *cb)
{
	struct io_worker *worker;
	struct task_struct *tsk;
	struct io_wqe *wqe;

	worker = container_of(cb, struct io_worker, create_work);
	clear_bit_unlock(0, &worker->create_state);
	wqe = worker->wqe;
	tsk = create_io_thread(io_wqe_worker, worker, wqe->node);
	if (!IS_ERR(tsk)) {
		io_init_new_worker(wqe, worker, tsk);
		io_worker_release(worker);
		return;
	} else if (!io_should_retry_thread(PTR_ERR(tsk))) {
		struct io_wqe_acct *acct = io_wqe_get_acct(worker);

		atomic_dec(&acct->nr_running);
		raw_spin_lock(&wqe->lock);
		acct->nr_workers--;
		if (!acct->nr_workers) {
			struct io_cb_cancel_data match = {
				.fn		= io_wq_work_match_all,
				.cancel_all	= true,
			};

			while (io_acct_cancel_pending_work(wqe, acct, &match))
				raw_spin_lock(&wqe->lock);
		}
		raw_spin_unlock(&wqe->lock);
		io_worker_ref_put(wqe->wq);
707
		kfree(worker);
708 709 710 711 712 713 714 715 716 717 718 719 720
		return;
	}

	/* re-create attempts grab a new worker ref, drop the existing one */
	io_worker_release(worker);
	schedule_work(&worker->work);
}

static void io_workqueue_create(struct work_struct *work)
{
	struct io_worker *worker = container_of(work, struct io_worker, work);
	struct io_wqe_acct *acct = io_wqe_get_acct(worker);

721
	if (!io_queue_worker_create(worker, acct, create_worker_cont))
722
		kfree(worker);
723 724 725
}

static bool create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index)
726
{
727
	struct io_wqe_acct *acct = &wqe->acct[index];
728
	struct io_worker *worker;
729
	struct task_struct *tsk;
730

731 732
	__set_current_state(TASK_RUNNING);

733
	worker = kzalloc_node(sizeof(*worker), GFP_KERNEL, wqe->node);
734
	if (!worker) {
735 736
fail:
		atomic_dec(&acct->nr_running);
737
		raw_spin_lock(&wqe->lock);
738
		acct->nr_workers--;
739
		raw_spin_unlock(&wqe->lock);
740
		io_worker_ref_put(wq);
741
		return false;
742
	}
743

744 745 746 747
	refcount_set(&worker->ref, 1);
	worker->wqe = wqe;
	spin_lock_init(&worker->lock);
	init_completion(&worker->ref_done);
748 749 750

	if (index == IO_WQ_ACCT_BOUND)
		worker->flags |= IO_WORKER_F_BOUND;
751 752 753 754 755

	tsk = create_io_thread(io_wqe_worker, worker, wqe->node);
	if (!IS_ERR(tsk)) {
		io_init_new_worker(wqe, worker, tsk);
	} else if (!io_should_retry_thread(PTR_ERR(tsk))) {
756
		kfree(worker);
757 758 759 760 761 762 763
		goto fail;
	} else {
		INIT_WORK(&worker->work, io_workqueue_create);
		schedule_work(&worker->work);
	}

	return true;
764 765
}

766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792
/*
 * Iterate the passed in list and call the specific function for each
 * worker that isn't exiting
 */
static bool io_wq_for_each_worker(struct io_wqe *wqe,
				  bool (*func)(struct io_worker *, void *),
				  void *data)
{
	struct io_worker *worker;
	bool ret = false;

	list_for_each_entry_rcu(worker, &wqe->all_list, all_list) {
		if (io_worker_get(worker)) {
			/* no task if node is/was offline */
			if (worker->task)
				ret = func(worker, data);
			io_worker_release(worker);
			if (ret)
				break;
		}
	}

	return ret;
}

static bool io_wq_worker_wake(struct io_worker *worker, void *data)
{
793
	set_notify_signal(worker->task);
794 795 796 797
	wake_up_process(worker->task);
	return false;
}

798
static void io_run_cancel(struct io_wq_work *work, struct io_wqe *wqe)
799
{
800 801
	struct io_wq *wq = wqe->wq;

802 803
	do {
		work->flags |= IO_WQ_WORK_CANCEL;
804 805
		wq->do_work(work);
		work = wq->free_work(work);
806 807 808
	} while (work);
}

809 810
static void io_wqe_insert_work(struct io_wqe *wqe, struct io_wq_work *work)
{
811
	struct io_wqe_acct *acct = io_work_get_acct(wqe, work);
812 813 814 815 816
	unsigned int hash;
	struct io_wq_work *tail;

	if (!io_wq_is_hashed(work)) {
append:
817
		wq_list_add_tail(&work->list, &acct->work_list);
818 819 820 821 822 823 824 825 826
		return;
	}

	hash = io_get_work_hash(work);
	tail = wqe->hash_tail[hash];
	wqe->hash_tail[hash] = work;
	if (!tail)
		goto append;

827
	wq_list_add_after(&work->list, &tail->list, &acct->work_list);
828 829
}

830 831 832 833 834
static bool io_wq_work_match_item(struct io_wq_work *work, void *data)
{
	return work == data;
}

835 836
static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work)
{
837
	struct io_wqe_acct *acct = io_work_get_acct(wqe, work);
838 839
	unsigned work_flags = work->flags;
	bool do_create;
840

841 842 843 844 845 846
	/*
	 * If io-wq is exiting for this task, or if the request has explicitly
	 * been marked as one that should not get executed, cancel it here.
	 */
	if (test_bit(IO_WQ_BIT_EXIT, &wqe->wq->state) ||
	    (work->flags & IO_WQ_WORK_CANCEL)) {
847
		io_run_cancel(work, wqe);
848 849 850
		return;
	}

851
	raw_spin_lock(&wqe->lock);
852
	io_wqe_insert_work(wqe, work);
853
	clear_bit(IO_ACCT_STALLED_BIT, &acct->flags);
854 855

	rcu_read_lock();
856
	do_create = !io_wqe_activate_free_worker(wqe, acct);
857 858
	rcu_read_unlock();

859
	raw_spin_unlock(&wqe->lock);
860

861
	if (do_create && ((work_flags & IO_WQ_WORK_CONCURRENT) ||
862 863 864 865
	    !atomic_read(&acct->nr_running))) {
		bool did_create;

		did_create = io_wqe_create_worker(wqe, acct);
866 867 868 869 870 871 872 873 874 875 876 877 878 879
		if (likely(did_create))
			return;

		raw_spin_lock(&wqe->lock);
		/* fatal condition, failed to create the first worker */
		if (!acct->nr_workers) {
			struct io_cb_cancel_data match = {
				.fn		= io_wq_work_match_item,
				.data		= work,
				.cancel_all	= false,
			};

			if (io_acct_cancel_pending_work(wqe, acct, &match))
				raw_spin_lock(&wqe->lock);
880
		}
881
		raw_spin_unlock(&wqe->lock);
882
	}
883 884 885 886 887 888 889 890 891 892
}

void io_wq_enqueue(struct io_wq *wq, struct io_wq_work *work)
{
	struct io_wqe *wqe = wq->wqes[numa_node_id()];

	io_wqe_enqueue(wqe, work);
}

/*
893 894
 * Work items that hash to the same value will not be done in parallel.
 * Used to limit concurrent writes, generally hashed by inode.
895
 */
896
void io_wq_hash_work(struct io_wq_work *work, void *val)
897
{
898
	unsigned int bit;
899 900 901 902 903

	bit = hash_ptr(val, IO_WQ_HASH_ORDER);
	work->flags |= (IO_WQ_WORK_HASHED | (bit << IO_WQ_HASH_SHIFT));
}

904
static bool io_wq_worker_cancel(struct io_worker *worker, void *data)
905
{
906
	struct io_cb_cancel_data *match = data;
907 908 909

	/*
	 * Hold the lock to avoid ->cur_work going out of scope, caller
910
	 * may dereference the passed in work.
911
	 */
912
	spin_lock(&worker->lock);
913
	if (worker->cur_work &&
914
	    match->fn(worker->cur_work, match->data)) {
915
		set_notify_signal(worker->task);
916
		match->nr_running++;
917
	}
918
	spin_unlock(&worker->lock);
919

920
	return match->nr_running && !match->cancel_all;
921 922
}

923 924 925 926
static inline void io_wqe_remove_pending(struct io_wqe *wqe,
					 struct io_wq_work *work,
					 struct io_wq_work_node *prev)
{
927
	struct io_wqe_acct *acct = io_work_get_acct(wqe, work);
928 929 930 931 932 933 934 935 936 937 938
	unsigned int hash = io_get_work_hash(work);
	struct io_wq_work *prev_work = NULL;

	if (io_wq_is_hashed(work) && work == wqe->hash_tail[hash]) {
		if (prev)
			prev_work = container_of(prev, struct io_wq_work, list);
		if (prev_work && io_get_work_hash(prev_work) == hash)
			wqe->hash_tail[hash] = prev_work;
		else
			wqe->hash_tail[hash] = NULL;
	}
939
	wq_list_del(&acct->work_list, &work->list, prev);
940 941
}

942 943 944 945
static bool io_acct_cancel_pending_work(struct io_wqe *wqe,
					struct io_wqe_acct *acct,
					struct io_cb_cancel_data *match)
	__releases(wqe->lock)
946
{
J
Jens Axboe 已提交
947
	struct io_wq_work_node *node, *prev;
948 949
	struct io_wq_work *work;

950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968
	wq_list_for_each(node, prev, &acct->work_list) {
		work = container_of(node, struct io_wq_work, list);
		if (!match->fn(work, match->data))
			continue;
		io_wqe_remove_pending(wqe, work, prev);
		raw_spin_unlock(&wqe->lock);
		io_run_cancel(work, wqe);
		match->nr_pending++;
		/* not safe to continue after unlock */
		return true;
	}

	return false;
}

static void io_wqe_cancel_pending_work(struct io_wqe *wqe,
				       struct io_cb_cancel_data *match)
{
	int i;
969
retry:
970
	raw_spin_lock(&wqe->lock);
971 972
	for (i = 0; i < IO_WQ_ACCT_NR; i++) {
		struct io_wqe_acct *acct = io_get_acct(wqe, i == 0);
973

974 975 976 977
		if (io_acct_cancel_pending_work(wqe, acct, match)) {
			if (match->cancel_all)
				goto retry;
			return;
978
		}
979
	}
980
	raw_spin_unlock(&wqe->lock);
981 982
}

983
static void io_wqe_cancel_running_work(struct io_wqe *wqe,
984 985
				       struct io_cb_cancel_data *match)
{
986
	rcu_read_lock();
987
	io_wq_for_each_worker(wqe, io_wq_worker_cancel, match);
988 989 990
	rcu_read_unlock();
}

991
enum io_wq_cancel io_wq_cancel_cb(struct io_wq *wq, work_cancel_fn *cancel,
992
				  void *data, bool cancel_all)
993
{
994
	struct io_cb_cancel_data match = {
995 996 997
		.fn		= cancel,
		.data		= data,
		.cancel_all	= cancel_all,
998
	};
J
Jann Horn 已提交
999
	int node;
1000

1001 1002 1003 1004 1005
	/*
	 * First check pending list, if we're lucky we can just remove it
	 * from there. CANCEL_OK means that the work is returned as-new,
	 * no completion will be posted for it.
	 */
J
Jann Horn 已提交
1006 1007
	for_each_node(node) {
		struct io_wqe *wqe = wq->wqes[node];
1008

1009 1010
		io_wqe_cancel_pending_work(wqe, &match);
		if (match.nr_pending && !match.cancel_all)
1011
			return IO_WQ_CANCEL_OK;
1012 1013
	}

1014 1015 1016 1017 1018 1019 1020 1021 1022
	/*
	 * Now check if a free (going busy) or busy worker has the work
	 * currently running. If we find it there, we'll return CANCEL_RUNNING
	 * as an indication that we attempt to signal cancellation. The
	 * completion will run normally in this case.
	 */
	for_each_node(node) {
		struct io_wqe *wqe = wq->wqes[node];

1023 1024
		io_wqe_cancel_running_work(wqe, &match);
		if (match.nr_running && !match.cancel_all)
1025 1026 1027
			return IO_WQ_CANCEL_RUNNING;
	}

1028 1029 1030 1031
	if (match.nr_running)
		return IO_WQ_CANCEL_RUNNING;
	if (match.nr_pending)
		return IO_WQ_CANCEL_OK;
1032
	return IO_WQ_CANCEL_NOTFOUND;
1033 1034
}

1035 1036 1037 1038
static int io_wqe_hash_wake(struct wait_queue_entry *wait, unsigned mode,
			    int sync, void *key)
{
	struct io_wqe *wqe = container_of(wait, struct io_wqe, wait);
1039
	int i;
1040 1041 1042 1043

	list_del_init(&wait->entry);

	rcu_read_lock();
1044 1045 1046 1047 1048 1049
	for (i = 0; i < IO_WQ_ACCT_NR; i++) {
		struct io_wqe_acct *acct = &wqe->acct[i];

		if (test_and_clear_bit(IO_ACCT_STALLED_BIT, &acct->flags))
			io_wqe_activate_free_worker(wqe, acct);
	}
1050 1051 1052 1053
	rcu_read_unlock();
	return 1;
}

1054
struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data)
1055
{
1056
	int ret, node, i;
1057 1058
	struct io_wq *wq;

1059
	if (WARN_ON_ONCE(!data->free_work || !data->do_work))
1060
		return ERR_PTR(-EINVAL);
P
Pavel Begunkov 已提交
1061 1062
	if (WARN_ON_ONCE(!bounded))
		return ERR_PTR(-EINVAL);
1063

1064
	wq = kzalloc(struct_size(wq, wqes, nr_node_ids), GFP_KERNEL);
1065 1066
	if (!wq)
		return ERR_PTR(-ENOMEM);
1067 1068
	ret = cpuhp_state_add_instance_nocalls(io_wq_online, &wq->cpuhp_node);
	if (ret)
1069
		goto err_wq;
1070

1071 1072
	refcount_inc(&data->hash->refs);
	wq->hash = data->hash;
1073
	wq->free_work = data->free_work;
1074
	wq->do_work = data->do_work;
1075

1076
	ret = -ENOMEM;
J
Jann Horn 已提交
1077
	for_each_node(node) {
1078
		struct io_wqe *wqe;
1079
		int alloc_node = node;
1080

1081 1082 1083
		if (!node_online(alloc_node))
			alloc_node = NUMA_NO_NODE;
		wqe = kzalloc_node(sizeof(struct io_wqe), GFP_KERNEL, alloc_node);
1084
		if (!wqe)
J
Jann Horn 已提交
1085
			goto err;
J
Jens Axboe 已提交
1086 1087 1088
		if (!alloc_cpumask_var(&wqe->cpu_mask, GFP_KERNEL))
			goto err;
		cpumask_copy(wqe->cpu_mask, cpumask_of_node(node));
J
Jann Horn 已提交
1089
		wq->wqes[node] = wqe;
1090
		wqe->node = alloc_node;
1091
		wqe->acct[IO_WQ_ACCT_BOUND].max_workers = bounded;
J
Jens Axboe 已提交
1092
		wqe->acct[IO_WQ_ACCT_UNBOUND].max_workers =
1093
					task_rlimit(current, RLIMIT_NPROC);
1094
		INIT_LIST_HEAD(&wqe->wait.entry);
1095 1096 1097 1098 1099 1100 1101 1102
		wqe->wait.func = io_wqe_hash_wake;
		for (i = 0; i < IO_WQ_ACCT_NR; i++) {
			struct io_wqe_acct *acct = &wqe->acct[i];

			acct->index = i;
			atomic_set(&acct->nr_running, 0);
			INIT_WQ_LIST(&acct->work_list);
		}
1103
		wqe->wq = wq;
1104
		raw_spin_lock_init(&wqe->lock);
1105
		INIT_HLIST_NULLS_HEAD(&wqe->free_list, 0);
1106
		INIT_LIST_HEAD(&wqe->all_list);
1107 1108
	}

1109 1110 1111 1112
	wq->task = get_task_struct(data->task);
	atomic_set(&wq->worker_refs, 1);
	init_completion(&wq->worker_done);
	return wq;
1113
err:
1114
	io_wq_put_hash(data->hash);
1115
	cpuhp_state_remove_instance_nocalls(io_wq_online, &wq->cpuhp_node);
J
Jens Axboe 已提交
1116 1117 1118 1119
	for_each_node(node) {
		if (!wq->wqes[node])
			continue;
		free_cpumask_var(wq->wqes[node]->cpu_mask);
J
Jann Horn 已提交
1120
		kfree(wq->wqes[node]);
J
Jens Axboe 已提交
1121
	}
1122
err_wq:
1123
	kfree(wq);
1124 1125 1126
	return ERR_PTR(ret);
}

1127 1128
static bool io_task_work_match(struct callback_head *cb, void *data)
{
1129
	struct io_worker *worker;
1130

1131
	if (cb->func != create_worker_cb && cb->func != create_worker_cont)
1132
		return false;
1133 1134
	worker = container_of(cb, struct io_worker, create_work);
	return worker->wqe->wq == data;
1135 1136
}

1137 1138 1139 1140 1141
void io_wq_exit_start(struct io_wq *wq)
{
	set_bit(IO_WQ_BIT_EXIT, &wq->state);
}

1142
static void io_wq_exit_workers(struct io_wq *wq)
1143
{
1144 1145 1146 1147 1148 1149
	struct callback_head *cb;
	int node;

	if (!wq->task)
		return;

1150
	while ((cb = task_work_cancel_match(wq->task, io_task_work_match, wq)) != NULL) {
1151
		struct io_worker *worker;
1152
		struct io_wqe_acct *acct;
1153

1154
		worker = container_of(cb, struct io_worker, create_work);
1155 1156 1157 1158 1159
		acct = io_wqe_get_acct(worker);
		atomic_dec(&acct->nr_running);
		raw_spin_lock(&worker->wqe->lock);
		acct->nr_workers--;
		raw_spin_unlock(&worker->wqe->lock);
1160
		io_worker_ref_put(wq);
1161 1162
		clear_bit_unlock(0, &worker->create_state);
		io_worker_release(worker);
1163 1164 1165 1166 1167 1168 1169
	}

	rcu_read_lock();
	for_each_node(node) {
		struct io_wqe *wqe = wq->wqes[node];

		io_wq_for_each_worker(wqe, io_wq_worker_wake, NULL);
1170
	}
1171 1172 1173
	rcu_read_unlock();
	io_worker_ref_put(wq);
	wait_for_completion(&wq->worker_done);
1174 1175 1176 1177 1178 1179

	for_each_node(node) {
		spin_lock_irq(&wq->hash->wait.lock);
		list_del_init(&wq->wqes[node]->wait.entry);
		spin_unlock_irq(&wq->hash->wait.lock);
	}
1180 1181
	put_task_struct(wq->task);
	wq->task = NULL;
1182 1183
}

1184
static void io_wq_destroy(struct io_wq *wq)
1185
{
J
Jann Horn 已提交
1186
	int node;
1187

1188 1189
	cpuhp_state_remove_instance_nocalls(io_wq_online, &wq->cpuhp_node);

1190 1191
	for_each_node(node) {
		struct io_wqe *wqe = wq->wqes[node];
1192 1193 1194 1195 1196
		struct io_cb_cancel_data match = {
			.fn		= io_wq_work_match_all,
			.cancel_all	= true,
		};
		io_wqe_cancel_pending_work(wqe, &match);
J
Jens Axboe 已提交
1197
		free_cpumask_var(wqe->cpu_mask);
1198 1199 1200
		kfree(wqe);
	}
	io_wq_put_hash(wq->hash);
1201
	kfree(wq);
1202 1203
}

1204 1205
void io_wq_put_and_exit(struct io_wq *wq)
{
1206 1207
	WARN_ON_ONCE(!test_bit(IO_WQ_BIT_EXIT, &wq->state));

1208
	io_wq_exit_workers(wq);
1209
	io_wq_destroy(wq);
1210 1211
}

J
Jens Axboe 已提交
1212 1213 1214 1215 1216
struct online_data {
	unsigned int cpu;
	bool online;
};

1217 1218
static bool io_wq_worker_affinity(struct io_worker *worker, void *data)
{
J
Jens Axboe 已提交
1219
	struct online_data *od = data;
1220

J
Jens Axboe 已提交
1221 1222 1223 1224
	if (od->online)
		cpumask_set_cpu(od->cpu, worker->wqe->cpu_mask);
	else
		cpumask_clear_cpu(od->cpu, worker->wqe->cpu_mask);
1225 1226 1227
	return false;
}

J
Jens Axboe 已提交
1228
static int __io_wq_cpu_online(struct io_wq *wq, unsigned int cpu, bool online)
1229
{
J
Jens Axboe 已提交
1230 1231 1232 1233
	struct online_data od = {
		.cpu = cpu,
		.online = online
	};
1234 1235 1236 1237
	int i;

	rcu_read_lock();
	for_each_node(i)
J
Jens Axboe 已提交
1238
		io_wq_for_each_worker(wq->wqes[i], io_wq_worker_affinity, &od);
1239 1240 1241 1242
	rcu_read_unlock();
	return 0;
}

J
Jens Axboe 已提交
1243 1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256
static int io_wq_cpu_online(unsigned int cpu, struct hlist_node *node)
{
	struct io_wq *wq = hlist_entry_safe(node, struct io_wq, cpuhp_node);

	return __io_wq_cpu_online(wq, cpu, true);
}

static int io_wq_cpu_offline(unsigned int cpu, struct hlist_node *node)
{
	struct io_wq *wq = hlist_entry_safe(node, struct io_wq, cpuhp_node);

	return __io_wq_cpu_online(wq, cpu, false);
}

1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273
int io_wq_cpu_affinity(struct io_wq *wq, cpumask_var_t mask)
{
	int i;

	rcu_read_lock();
	for_each_node(i) {
		struct io_wqe *wqe = wq->wqes[i];

		if (mask)
			cpumask_copy(wqe->cpu_mask, mask);
		else
			cpumask_copy(wqe->cpu_mask, cpumask_of_node(i));
	}
	rcu_read_unlock();
	return 0;
}

1274 1275 1276 1277 1278 1279 1280 1281
/*
 * Set max number of unbounded workers, returns old value. If new_count is 0,
 * then just return the old value.
 */
int io_wq_max_workers(struct io_wq *wq, int *new_count)
{
	int i, node, prev = 0;

1282 1283 1284 1285
	BUILD_BUG_ON((int) IO_WQ_ACCT_BOUND   != (int) IO_WQ_BOUND);
	BUILD_BUG_ON((int) IO_WQ_ACCT_UNBOUND != (int) IO_WQ_UNBOUND);
	BUILD_BUG_ON((int) IO_WQ_ACCT_NR      != 2);

1286 1287 1288 1289 1290 1291 1292 1293 1294
	for (i = 0; i < 2; i++) {
		if (new_count[i] > task_rlimit(current, RLIMIT_NPROC))
			new_count[i] = task_rlimit(current, RLIMIT_NPROC);
	}

	rcu_read_lock();
	for_each_node(node) {
		struct io_wqe_acct *acct;

1295
		for (i = 0; i < IO_WQ_ACCT_NR; i++) {
1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306
			acct = &wq->wqes[node]->acct[i];
			prev = max_t(int, acct->max_workers, prev);
			if (new_count[i])
				acct->max_workers = new_count[i];
			new_count[i] = prev;
		}
	}
	rcu_read_unlock();
	return 0;
}

1307 1308 1309 1310 1311
static __init int io_wq_init(void)
{
	int ret;

	ret = cpuhp_setup_state_multi(CPUHP_AP_ONLINE_DYN, "io-wq/online",
J
Jens Axboe 已提交
1312
					io_wq_cpu_online, io_wq_cpu_offline);
1313 1314 1315 1316 1317 1318
	if (ret < 0)
		return ret;
	io_wq_online = ret;
	return 0;
}
subsys_initcall(io_wq_init);