io-wq.c 30.9 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14
// SPDX-License-Identifier: GPL-2.0
/*
 * Basic worker thread pool for io_uring
 *
 * Copyright (C) 2019 Jens Axboe
 *
 */
#include <linux/kernel.h>
#include <linux/init.h>
#include <linux/errno.h>
#include <linux/sched/signal.h>
#include <linux/percpu.h>
#include <linux/slab.h>
#include <linux/rculist_nulls.h>
15
#include <linux/cpu.h>
16
#include <linux/tracehook.h>
17 18 19 20 21 22 23 24 25

#include "io-wq.h"

#define WORKER_IDLE_TIMEOUT	(5 * HZ)

enum {
	IO_WORKER_F_UP		= 1,	/* up and active */
	IO_WORKER_F_RUNNING	= 2,	/* account as running */
	IO_WORKER_F_FREE	= 4,	/* worker on free list */
J
Jens Axboe 已提交
26
	IO_WORKER_F_BOUND	= 8,	/* is doing bounded work */
27 28 29 30 31 32 33
};

enum {
	IO_WQ_BIT_EXIT		= 0,	/* wq exiting */
};

enum {
34
	IO_ACCT_STALLED_BIT	= 0,	/* stalled on hash */
35 36 37 38 39 40 41 42 43
};

/*
 * One for each thread in a wqe pool
 */
struct io_worker {
	refcount_t ref;
	unsigned flags;
	struct hlist_nulls_node nulls_node;
44
	struct list_head all_list;
45 46
	struct task_struct *task;
	struct io_wqe *wqe;
47

48
	struct io_wq_work *cur_work;
49
	spinlock_t lock;
50

51 52
	struct completion ref_done;

53 54 55 56
	unsigned long create_state;
	struct callback_head create_work;
	int create_index;

57 58 59 60
	union {
		struct rcu_head rcu;
		struct work_struct work;
	};
61 62 63 64 65 66 67 68
};

#if BITS_PER_LONG == 64
#define IO_WQ_HASH_ORDER	6
#else
#define IO_WQ_HASH_ORDER	5
#endif

69 70
#define IO_WQ_NR_HASH_BUCKETS	(1u << IO_WQ_HASH_ORDER)

71 72 73
struct io_wqe_acct {
	unsigned nr_workers;
	unsigned max_workers;
74
	int index;
75
	atomic_t nr_running;
76 77
	struct io_wq_work_list work_list;
	unsigned long flags;
78 79 80 81 82
};

enum {
	IO_WQ_ACCT_BOUND,
	IO_WQ_ACCT_UNBOUND,
83
	IO_WQ_ACCT_NR,
84 85
};

86 87 88 89
/*
 * Per-node worker thread pool
 */
struct io_wqe {
90 91
	raw_spinlock_t lock;
	struct io_wqe_acct acct[2];
92 93 94

	int node;

95
	struct hlist_nulls_head free_list;
96
	struct list_head all_list;
97

98 99
	struct wait_queue_entry wait;

100
	struct io_wq *wq;
101
	struct io_wq_work *hash_tail[IO_WQ_NR_HASH_BUCKETS];
J
Jens Axboe 已提交
102 103

	cpumask_var_t cpu_mask;
104 105 106 107 108 109 110 111
};

/*
 * Per io_wq state
  */
struct io_wq {
	unsigned long state;

112
	free_work_fn *free_work;
113
	io_wq_work_fn *do_work;
114

115 116
	struct io_wq_hash *hash;

117 118 119
	atomic_t worker_refs;
	struct completion worker_done;

120
	struct hlist_node cpuhp_node;
121

122
	struct task_struct *task;
123 124

	struct io_wqe *wqes[];
125 126
};

127 128
static enum cpuhp_state io_wq_online;

129 130 131 132 133 134 135 136
struct io_cb_cancel_data {
	work_cancel_fn *fn;
	void *data;
	int nr_running;
	int nr_pending;
	bool cancel_all;
};

137
static bool create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index);
138
static void io_wqe_dec_running(struct io_worker *worker);
139 140 141
static bool io_acct_cancel_pending_work(struct io_wqe *wqe,
					struct io_wqe_acct *acct,
					struct io_cb_cancel_data *match);
142

143 144 145 146 147 148 149 150
static bool io_worker_get(struct io_worker *worker)
{
	return refcount_inc_not_zero(&worker->ref);
}

static void io_worker_release(struct io_worker *worker)
{
	if (refcount_dec_and_test(&worker->ref))
151
		complete(&worker->ref_done);
152 153
}

P
Pavel Begunkov 已提交
154 155 156 157 158
static inline struct io_wqe_acct *io_get_acct(struct io_wqe *wqe, bool bound)
{
	return &wqe->acct[bound ? IO_WQ_ACCT_BOUND : IO_WQ_ACCT_UNBOUND];
}

159 160 161
static inline struct io_wqe_acct *io_work_get_acct(struct io_wqe *wqe,
						   struct io_wq_work *work)
{
P
Pavel Begunkov 已提交
162
	return io_get_acct(wqe, !(work->flags & IO_WQ_WORK_UNBOUND));
163 164
}

165
static inline struct io_wqe_acct *io_wqe_get_acct(struct io_worker *worker)
166
{
P
Pavel Begunkov 已提交
167
	return io_get_acct(worker->wqe, worker->flags & IO_WORKER_F_BOUND);
168 169
}

170 171 172 173 174 175
static void io_worker_ref_put(struct io_wq *wq)
{
	if (atomic_dec_and_test(&wq->worker_refs))
		complete(&wq->worker_done);
}

176 177 178
static void io_worker_exit(struct io_worker *worker)
{
	struct io_wqe *wqe = worker->wqe;
179
	struct io_wqe_acct *acct = io_wqe_get_acct(worker);
180

181 182 183
	if (refcount_dec_and_test(&worker->ref))
		complete(&worker->ref_done);
	wait_for_completion(&worker->ref_done);
184

185
	raw_spin_lock(&wqe->lock);
186
	if (worker->flags & IO_WORKER_F_FREE)
187
		hlist_nulls_del_rcu(&worker->nulls_node);
188
	list_del_rcu(&worker->all_list);
189
	acct->nr_workers--;
190 191 192 193 194
	preempt_disable();
	io_wqe_dec_running(worker);
	worker->flags = 0;
	current->flags &= ~PF_IO_WORKER;
	preempt_enable();
195
	raw_spin_unlock(&wqe->lock);
196

197
	kfree_rcu(worker, rcu);
198
	io_worker_ref_put(wqe->wq);
199
	do_exit(0);
200 201
}

202
static inline bool io_acct_run_queue(struct io_wqe_acct *acct)
203
{
204 205
	if (!wq_list_empty(&acct->work_list) &&
	    !test_bit(IO_ACCT_STALLED_BIT, &acct->flags))
206 207 208 209 210 211
		return true;
	return false;
}

/*
 * Check head of free list for an available worker. If one isn't available,
212
 * caller must create one.
213
 */
214 215
static bool io_wqe_activate_free_worker(struct io_wqe *wqe,
					struct io_wqe_acct *acct)
216 217 218 219 220
	__must_hold(RCU)
{
	struct hlist_nulls_node *n;
	struct io_worker *worker;

221 222 223 224 225 226 227 228
	/*
	 * Iterate free_list and see if we can find an idle worker to
	 * activate. If a given worker is on the free_list but in the process
	 * of exiting, keep trying.
	 */
	hlist_nulls_for_each_entry_rcu(worker, n, &wqe->free_list, nulls_node) {
		if (!io_worker_get(worker))
			continue;
229 230 231 232
		if (io_wqe_get_acct(worker) != acct) {
			io_worker_release(worker);
			continue;
		}
233 234 235 236
		if (wake_up_process(worker->task)) {
			io_worker_release(worker);
			return true;
		}
237 238 239 240 241 242 243 244
		io_worker_release(worker);
	}

	return false;
}

/*
 * We need a worker. If we find a free one, we're good. If not, and we're
245
 * below the max number of workers, create one.
246
 */
247
static bool io_wqe_create_worker(struct io_wqe *wqe, struct io_wqe_acct *acct)
248
{
J
Jens Axboe 已提交
249
	bool do_create = false;
250 251 252 253 254

	/*
	 * Most likely an attempt to queue unbounded work on an io_wq that
	 * wasn't setup with any unbounded workers.
	 */
P
Pavel Begunkov 已提交
255 256
	if (unlikely(!acct->max_workers))
		pr_warn_once("io-wq is not configured for unbound workers");
257

258 259 260 261 262 263 264 265 266
	raw_spin_lock(&wqe->lock);
	if (acct->nr_workers < acct->max_workers) {
		acct->nr_workers++;
		do_create = true;
	}
	raw_spin_unlock(&wqe->lock);
	if (do_create) {
		atomic_inc(&acct->nr_running);
		atomic_inc(&wqe->wq->worker_refs);
267
		return create_io_worker(wqe->wq, wqe, acct->index);
268
	}
269 270

	return true;
271 272
}

273
static void io_wqe_inc_running(struct io_worker *worker)
274
{
275
	struct io_wqe_acct *acct = io_wqe_get_acct(worker);
276 277 278 279

	atomic_inc(&acct->nr_running);
}

280 281
static void create_worker_cb(struct callback_head *cb)
{
282
	struct io_worker *worker;
283
	struct io_wq *wq;
284 285
	struct io_wqe *wqe;
	struct io_wqe_acct *acct;
J
Jens Axboe 已提交
286
	bool do_create = false;
287

288 289
	worker = container_of(cb, struct io_worker, create_work);
	wqe = worker->wqe;
290
	wq = wqe->wq;
291
	acct = &wqe->acct[worker->create_index];
292
	raw_spin_lock(&wqe->lock);
293
	if (acct->nr_workers < acct->max_workers) {
294
		acct->nr_workers++;
295 296
		do_create = true;
	}
297
	raw_spin_unlock(&wqe->lock);
298
	if (do_create) {
J
Jens Axboe 已提交
299
		create_io_worker(wq, wqe, worker->create_index);
300 301 302 303
	} else {
		atomic_dec(&acct->nr_running);
		io_worker_ref_put(wq);
	}
304 305
	clear_bit_unlock(0, &worker->create_state);
	io_worker_release(worker);
306 307
}

308 309 310
static bool io_queue_worker_create(struct io_worker *worker,
				   struct io_wqe_acct *acct,
				   task_work_func_t func)
311
{
312
	struct io_wqe *wqe = worker->wqe;
313 314 315 316 317
	struct io_wq *wq = wqe->wq;

	/* raced with exit, just ignore create call */
	if (test_bit(IO_WQ_BIT_EXIT, &wq->state))
		goto fail;
318 319 320 321 322 323 324 325 326 327 328
	if (!io_worker_get(worker))
		goto fail;
	/*
	 * create_state manages ownership of create_work/index. We should
	 * only need one entry per worker, as the worker going to sleep
	 * will trigger the condition, and waking will clear it once it
	 * runs the task_work.
	 */
	if (test_bit(0, &worker->create_state) ||
	    test_and_set_bit_lock(0, &worker->create_state))
		goto fail_release;
329

330
	init_task_work(&worker->create_work, func);
331 332
	worker->create_index = acct->index;
	if (!task_work_add(wq->task, &worker->create_work, TWA_SIGNAL))
333
		return true;
334 335 336
	clear_bit_unlock(0, &worker->create_state);
fail_release:
	io_worker_release(worker);
337 338 339
fail:
	atomic_dec(&acct->nr_running);
	io_worker_ref_put(wq);
340
	return false;
341 342
}

343
static void io_wqe_dec_running(struct io_worker *worker)
344 345
	__must_hold(wqe->lock)
{
346 347
	struct io_wqe_acct *acct = io_wqe_get_acct(worker);
	struct io_wqe *wqe = worker->wqe;
348

349 350 351
	if (!(worker->flags & IO_WORKER_F_UP))
		return;

352
	if (atomic_dec_and_test(&acct->nr_running) && io_acct_run_queue(acct)) {
353 354
		atomic_inc(&acct->nr_running);
		atomic_inc(&wqe->wq->worker_refs);
355
		io_queue_worker_create(worker, acct, create_worker_cb);
356
	}
357 358
}

359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379
/*
 * Worker will start processing some work. Move it to the busy list, if
 * it's currently on the freelist
 */
static void __io_worker_busy(struct io_wqe *wqe, struct io_worker *worker,
			     struct io_wq_work *work)
	__must_hold(wqe->lock)
{
	if (worker->flags & IO_WORKER_F_FREE) {
		worker->flags &= ~IO_WORKER_F_FREE;
		hlist_nulls_del_init_rcu(&worker->nulls_node);
	}
}

/*
 * No work, worker going to sleep. Move to freelist, and unuse mm if we
 * have one attached. Dropping the mm may potentially sleep, so we drop
 * the lock in that case and return success. Since the caller has to
 * retry the loop in that case (we changed task state), we don't regrab
 * the lock if we return success.
 */
380
static void __io_worker_idle(struct io_wqe *wqe, struct io_worker *worker)
381 382 383 384
	__must_hold(wqe->lock)
{
	if (!(worker->flags & IO_WORKER_F_FREE)) {
		worker->flags |= IO_WORKER_F_FREE;
385
		hlist_nulls_add_head_rcu(&worker->nulls_node, &wqe->free_list);
386 387 388
	}
}

P
Pavel Begunkov 已提交
389 390 391 392 393
static inline unsigned int io_get_work_hash(struct io_wq_work *work)
{
	return work->flags >> IO_WQ_HASH_SHIFT;
}

394 395 396 397
static void io_wait_on_hash(struct io_wqe *wqe, unsigned int hash)
{
	struct io_wq *wq = wqe->wq;

398
	spin_lock_irq(&wq->hash->wait.lock);
399 400 401 402 403 404 405
	if (list_empty(&wqe->wait.entry)) {
		__add_wait_queue(&wq->hash->wait, &wqe->wait);
		if (!test_bit(hash, &wq->hash->map)) {
			__set_current_state(TASK_RUNNING);
			list_del_init(&wqe->wait.entry);
		}
	}
406
	spin_unlock_irq(&wq->hash->wait.lock);
407 408
}

409
static struct io_wq_work *io_get_next_work(struct io_wqe_acct *acct,
J
Jens Axboe 已提交
410
					   struct io_worker *worker)
411 412
	__must_hold(wqe->lock)
{
J
Jens Axboe 已提交
413
	struct io_wq_work_node *node, *prev;
414
	struct io_wq_work *work, *tail;
415
	unsigned int stall_hash = -1U;
416
	struct io_wqe *wqe = worker->wqe;
417

418
	wq_list_for_each(node, prev, &acct->work_list) {
419 420
		unsigned int hash;

J
Jens Axboe 已提交
421 422
		work = container_of(node, struct io_wq_work, list);

423
		/* not hashed, can run anytime */
424
		if (!io_wq_is_hashed(work)) {
425
			wq_list_del(&acct->work_list, node, prev);
426 427 428
			return work;
		}

P
Pavel Begunkov 已提交
429
		hash = io_get_work_hash(work);
430 431 432 433 434
		/* all items with this hash lie in [work, tail] */
		tail = wqe->hash_tail[hash];

		/* hashed, can run if not already running */
		if (!test_and_set_bit(hash, &wqe->wq->hash->map)) {
435
			wqe->hash_tail[hash] = NULL;
436
			wq_list_cut(&acct->work_list, &tail->list, prev);
437 438
			return work;
		}
439 440 441 442 443 444 445
		if (stall_hash == -1U)
			stall_hash = hash;
		/* fast forward to a next hash, for-each will fix up @prev */
		node = &tail->list;
	}

	if (stall_hash != -1U) {
J
Jens Axboe 已提交
446 447 448 449
		/*
		 * Set this before dropping the lock to avoid racing with new
		 * work being added and clearing the stalled bit.
		 */
450
		set_bit(IO_ACCT_STALLED_BIT, &acct->flags);
451 452 453
		raw_spin_unlock(&wqe->lock);
		io_wait_on_hash(wqe, stall_hash);
		raw_spin_lock(&wqe->lock);
454 455 456 457 458
	}

	return NULL;
}

459
static bool io_flush_signals(void)
460
{
461
	if (unlikely(test_thread_flag(TIF_NOTIFY_SIGNAL))) {
462
		__set_current_state(TASK_RUNNING);
463
		tracehook_notify_signal();
464
		return true;
465
	}
466
	return false;
467 468 469 470 471
}

static void io_assign_current_work(struct io_worker *worker,
				   struct io_wq_work *work)
{
472
	if (work) {
473
		io_flush_signals();
474 475
		cond_resched();
	}
476

477
	spin_lock(&worker->lock);
478
	worker->cur_work = work;
479
	spin_unlock(&worker->lock);
480 481
}

P
Pavel Begunkov 已提交
482 483
static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work);

484 485 486
static void io_worker_handle_work(struct io_worker *worker)
	__releases(wqe->lock)
{
487
	struct io_wqe_acct *acct = io_wqe_get_acct(worker);
488 489
	struct io_wqe *wqe = worker->wqe;
	struct io_wq *wq = wqe->wq;
490
	bool do_kill = test_bit(IO_WQ_BIT_EXIT, &wq->state);
491 492

	do {
493
		struct io_wq_work *work;
494
get_next:
495 496 497 498 499 500 501
		/*
		 * If we got some work, mark us as busy. If we didn't, but
		 * the list isn't empty, it means we stalled on hashed work.
		 * Mark us stalled so we don't keep looking for work when we
		 * can't make progress, any work completion or insertion will
		 * clear the stalled flag.
		 */
502
		work = io_get_next_work(acct, worker);
503 504 505
		if (work)
			__io_worker_busy(wqe, worker, work);

506
		raw_spin_unlock(&wqe->lock);
507 508
		if (!work)
			break;
509
		io_assign_current_work(worker, work);
510
		__set_current_state(TASK_RUNNING);
511

512 513
		/* handle a whole dependent link */
		do {
514
			struct io_wq_work *next_hashed, *linked;
P
Pavel Begunkov 已提交
515
			unsigned int hash = io_get_work_hash(work);
516

517
			next_hashed = wq_next_work(work);
518 519 520

			if (unlikely(do_kill) && (work->flags & IO_WQ_WORK_UNBOUND))
				work->flags |= IO_WQ_WORK_CANCEL;
521 522
			wq->do_work(work);
			io_assign_current_work(worker, NULL);
523

524
			linked = wq->free_work(work);
525 526 527 528 529 530 531 532 533 534
			work = next_hashed;
			if (!work && linked && !io_wq_is_hashed(linked)) {
				work = linked;
				linked = NULL;
			}
			io_assign_current_work(worker, work);
			if (linked)
				io_wqe_enqueue(wqe, linked);

			if (hash != -1U && !next_hashed) {
535
				clear_bit(hash, &wq->hash->map);
536
				clear_bit(IO_ACCT_STALLED_BIT, &acct->flags);
537 538
				if (wq_has_sleeper(&wq->hash->wait))
					wake_up(&wq->hash->wait);
539
				raw_spin_lock(&wqe->lock);
540 541 542
				/* skip unnecessary unlock-lock wqe->lock */
				if (!work)
					goto get_next;
543
				raw_spin_unlock(&wqe->lock);
544
			}
545
		} while (work);
546

547
		raw_spin_lock(&wqe->lock);
548 549 550 551 552 553
	} while (1);
}

static int io_wqe_worker(void *data)
{
	struct io_worker *worker = data;
554
	struct io_wqe_acct *acct = io_wqe_get_acct(worker);
555 556
	struct io_wqe *wqe = worker->wqe;
	struct io_wq *wq = wqe->wq;
J
Jens Axboe 已提交
557
	bool last_timeout = false;
558
	char buf[TASK_COMM_LEN];
559

560 561
	worker->flags |= (IO_WORKER_F_UP | IO_WORKER_F_RUNNING);

562
	snprintf(buf, sizeof(buf), "iou-wrk-%d", wq->task->pid);
563
	set_task_comm(current, buf);
564 565

	while (!test_bit(IO_WQ_BIT_EXIT, &wq->state)) {
566 567
		long ret;

J
Jens Axboe 已提交
568
		set_current_state(TASK_INTERRUPTIBLE);
569
loop:
570
		raw_spin_lock(&wqe->lock);
571
		if (io_acct_run_queue(acct)) {
572
			io_worker_handle_work(worker);
573
			goto loop;
574
		}
J
Jens Axboe 已提交
575 576 577 578 579 580 581
		/* timed out, exit unless we're the last worker */
		if (last_timeout && acct->nr_workers > 1) {
			raw_spin_unlock(&wqe->lock);
			__set_current_state(TASK_RUNNING);
			break;
		}
		last_timeout = false;
582
		__io_worker_idle(wqe, worker);
583
		raw_spin_unlock(&wqe->lock);
584 585
		if (io_flush_signals())
			continue;
586
		ret = schedule_timeout(WORKER_IDLE_TIMEOUT);
587 588 589 590 591
		if (signal_pending(current)) {
			struct ksignal ksig;

			if (!get_signal(&ksig))
				continue;
J
Jens Axboe 已提交
592 593 594
			if (fatal_signal_pending(current))
				break;
			continue;
595
		}
J
Jens Axboe 已提交
596
		last_timeout = !ret;
597 598 599
	}

	if (test_bit(IO_WQ_BIT_EXIT, &wq->state)) {
600
		raw_spin_lock(&wqe->lock);
P
Pavel Begunkov 已提交
601
		io_worker_handle_work(worker);
602 603 604 605 606 607 608 609 610 611 612
	}

	io_worker_exit(worker);
	return 0;
}

/*
 * Called when a worker is scheduled in. Mark us as currently running.
 */
void io_wq_worker_running(struct task_struct *tsk)
{
613
	struct io_worker *worker = tsk->pf_io_worker;
614

615 616
	if (!worker)
		return;
617 618 619 620 621
	if (!(worker->flags & IO_WORKER_F_UP))
		return;
	if (worker->flags & IO_WORKER_F_RUNNING)
		return;
	worker->flags |= IO_WORKER_F_RUNNING;
622
	io_wqe_inc_running(worker);
623 624 625 626
}

/*
 * Called when worker is going to sleep. If there are no workers currently
627
 * running and we have work pending, wake up a free one or create a new one.
628 629 630
 */
void io_wq_worker_sleeping(struct task_struct *tsk)
{
631
	struct io_worker *worker = tsk->pf_io_worker;
632

633 634
	if (!worker)
		return;
635 636 637 638 639 640 641
	if (!(worker->flags & IO_WORKER_F_UP))
		return;
	if (!(worker->flags & IO_WORKER_F_RUNNING))
		return;

	worker->flags &= ~IO_WORKER_F_RUNNING;

642
	raw_spin_lock(&worker->wqe->lock);
643
	io_wqe_dec_running(worker);
644
	raw_spin_unlock(&worker->wqe->lock);
645 646
}

647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731
static void io_init_new_worker(struct io_wqe *wqe, struct io_worker *worker,
			       struct task_struct *tsk)
{
	tsk->pf_io_worker = worker;
	worker->task = tsk;
	set_cpus_allowed_ptr(tsk, wqe->cpu_mask);
	tsk->flags |= PF_NO_SETAFFINITY;

	raw_spin_lock(&wqe->lock);
	hlist_nulls_add_head_rcu(&worker->nulls_node, &wqe->free_list);
	list_add_tail_rcu(&worker->all_list, &wqe->all_list);
	worker->flags |= IO_WORKER_F_FREE;
	raw_spin_unlock(&wqe->lock);
	wake_up_new_task(tsk);
}

static bool io_wq_work_match_all(struct io_wq_work *work, void *data)
{
	return true;
}

static inline bool io_should_retry_thread(long err)
{
	switch (err) {
	case -EAGAIN:
	case -ERESTARTSYS:
	case -ERESTARTNOINTR:
	case -ERESTARTNOHAND:
		return true;
	default:
		return false;
	}
}

static void create_worker_cont(struct callback_head *cb)
{
	struct io_worker *worker;
	struct task_struct *tsk;
	struct io_wqe *wqe;

	worker = container_of(cb, struct io_worker, create_work);
	clear_bit_unlock(0, &worker->create_state);
	wqe = worker->wqe;
	tsk = create_io_thread(io_wqe_worker, worker, wqe->node);
	if (!IS_ERR(tsk)) {
		io_init_new_worker(wqe, worker, tsk);
		io_worker_release(worker);
		return;
	} else if (!io_should_retry_thread(PTR_ERR(tsk))) {
		struct io_wqe_acct *acct = io_wqe_get_acct(worker);

		atomic_dec(&acct->nr_running);
		raw_spin_lock(&wqe->lock);
		acct->nr_workers--;
		if (!acct->nr_workers) {
			struct io_cb_cancel_data match = {
				.fn		= io_wq_work_match_all,
				.cancel_all	= true,
			};

			while (io_acct_cancel_pending_work(wqe, acct, &match))
				raw_spin_lock(&wqe->lock);
		}
		raw_spin_unlock(&wqe->lock);
		io_worker_ref_put(wqe->wq);
		return;
	}

	/* re-create attempts grab a new worker ref, drop the existing one */
	io_worker_release(worker);
	schedule_work(&worker->work);
}

static void io_workqueue_create(struct work_struct *work)
{
	struct io_worker *worker = container_of(work, struct io_worker, work);
	struct io_wqe_acct *acct = io_wqe_get_acct(worker);

	if (!io_queue_worker_create(worker, acct, create_worker_cont)) {
		clear_bit_unlock(0, &worker->create_state);
		io_worker_release(worker);
	}
}

static bool create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index)
732
{
733
	struct io_wqe_acct *acct = &wqe->acct[index];
734
	struct io_worker *worker;
735
	struct task_struct *tsk;
736

737 738
	__set_current_state(TASK_RUNNING);

739
	worker = kzalloc_node(sizeof(*worker), GFP_KERNEL, wqe->node);
740
	if (!worker) {
741 742
fail:
		atomic_dec(&acct->nr_running);
743
		raw_spin_lock(&wqe->lock);
744
		acct->nr_workers--;
745
		raw_spin_unlock(&wqe->lock);
746
		io_worker_ref_put(wq);
747
		return false;
748
	}
749

750 751 752 753
	refcount_set(&worker->ref, 1);
	worker->wqe = wqe;
	spin_lock_init(&worker->lock);
	init_completion(&worker->ref_done);
754 755 756

	if (index == IO_WQ_ACCT_BOUND)
		worker->flags |= IO_WORKER_F_BOUND;
757 758 759 760 761 762 763 764 765 766 767 768

	tsk = create_io_thread(io_wqe_worker, worker, wqe->node);
	if (!IS_ERR(tsk)) {
		io_init_new_worker(wqe, worker, tsk);
	} else if (!io_should_retry_thread(PTR_ERR(tsk))) {
		goto fail;
	} else {
		INIT_WORK(&worker->work, io_workqueue_create);
		schedule_work(&worker->work);
	}

	return true;
769 770
}

771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797
/*
 * Iterate the passed in list and call the specific function for each
 * worker that isn't exiting
 */
static bool io_wq_for_each_worker(struct io_wqe *wqe,
				  bool (*func)(struct io_worker *, void *),
				  void *data)
{
	struct io_worker *worker;
	bool ret = false;

	list_for_each_entry_rcu(worker, &wqe->all_list, all_list) {
		if (io_worker_get(worker)) {
			/* no task if node is/was offline */
			if (worker->task)
				ret = func(worker, data);
			io_worker_release(worker);
			if (ret)
				break;
		}
	}

	return ret;
}

static bool io_wq_worker_wake(struct io_worker *worker, void *data)
{
798
	set_notify_signal(worker->task);
799 800 801 802
	wake_up_process(worker->task);
	return false;
}

803
static void io_run_cancel(struct io_wq_work *work, struct io_wqe *wqe)
804
{
805 806
	struct io_wq *wq = wqe->wq;

807 808
	do {
		work->flags |= IO_WQ_WORK_CANCEL;
809 810
		wq->do_work(work);
		work = wq->free_work(work);
811 812 813
	} while (work);
}

814 815
static void io_wqe_insert_work(struct io_wqe *wqe, struct io_wq_work *work)
{
816
	struct io_wqe_acct *acct = io_work_get_acct(wqe, work);
817 818 819 820 821
	unsigned int hash;
	struct io_wq_work *tail;

	if (!io_wq_is_hashed(work)) {
append:
822
		wq_list_add_tail(&work->list, &acct->work_list);
823 824 825 826 827 828 829 830 831
		return;
	}

	hash = io_get_work_hash(work);
	tail = wqe->hash_tail[hash];
	wqe->hash_tail[hash] = work;
	if (!tail)
		goto append;

832
	wq_list_add_after(&work->list, &tail->list, &acct->work_list);
833 834
}

835 836 837 838 839
static bool io_wq_work_match_item(struct io_wq_work *work, void *data)
{
	return work == data;
}

840 841
static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work)
{
842
	struct io_wqe_acct *acct = io_work_get_acct(wqe, work);
843 844
	unsigned work_flags = work->flags;
	bool do_create;
845

846 847 848 849 850 851
	/*
	 * If io-wq is exiting for this task, or if the request has explicitly
	 * been marked as one that should not get executed, cancel it here.
	 */
	if (test_bit(IO_WQ_BIT_EXIT, &wqe->wq->state) ||
	    (work->flags & IO_WQ_WORK_CANCEL)) {
852
		io_run_cancel(work, wqe);
853 854 855
		return;
	}

856
	raw_spin_lock(&wqe->lock);
857
	io_wqe_insert_work(wqe, work);
858
	clear_bit(IO_ACCT_STALLED_BIT, &acct->flags);
859 860

	rcu_read_lock();
861
	do_create = !io_wqe_activate_free_worker(wqe, acct);
862 863
	rcu_read_unlock();

864
	raw_spin_unlock(&wqe->lock);
865

866
	if (do_create && ((work_flags & IO_WQ_WORK_CONCURRENT) ||
867 868 869 870
	    !atomic_read(&acct->nr_running))) {
		bool did_create;

		did_create = io_wqe_create_worker(wqe, acct);
871 872 873 874 875 876 877 878 879 880 881 882 883 884
		if (likely(did_create))
			return;

		raw_spin_lock(&wqe->lock);
		/* fatal condition, failed to create the first worker */
		if (!acct->nr_workers) {
			struct io_cb_cancel_data match = {
				.fn		= io_wq_work_match_item,
				.data		= work,
				.cancel_all	= false,
			};

			if (io_acct_cancel_pending_work(wqe, acct, &match))
				raw_spin_lock(&wqe->lock);
885
		}
886
		raw_spin_unlock(&wqe->lock);
887
	}
888 889 890 891 892 893 894 895 896 897
}

void io_wq_enqueue(struct io_wq *wq, struct io_wq_work *work)
{
	struct io_wqe *wqe = wq->wqes[numa_node_id()];

	io_wqe_enqueue(wqe, work);
}

/*
898 899
 * Work items that hash to the same value will not be done in parallel.
 * Used to limit concurrent writes, generally hashed by inode.
900
 */
901
void io_wq_hash_work(struct io_wq_work *work, void *val)
902
{
903
	unsigned int bit;
904 905 906 907 908

	bit = hash_ptr(val, IO_WQ_HASH_ORDER);
	work->flags |= (IO_WQ_WORK_HASHED | (bit << IO_WQ_HASH_SHIFT));
}

909
static bool io_wq_worker_cancel(struct io_worker *worker, void *data)
910
{
911
	struct io_cb_cancel_data *match = data;
912 913 914

	/*
	 * Hold the lock to avoid ->cur_work going out of scope, caller
915
	 * may dereference the passed in work.
916
	 */
917
	spin_lock(&worker->lock);
918
	if (worker->cur_work &&
919
	    match->fn(worker->cur_work, match->data)) {
920
		set_notify_signal(worker->task);
921
		match->nr_running++;
922
	}
923
	spin_unlock(&worker->lock);
924

925
	return match->nr_running && !match->cancel_all;
926 927
}

928 929 930 931
static inline void io_wqe_remove_pending(struct io_wqe *wqe,
					 struct io_wq_work *work,
					 struct io_wq_work_node *prev)
{
932
	struct io_wqe_acct *acct = io_work_get_acct(wqe, work);
933 934 935 936 937 938 939 940 941 942 943
	unsigned int hash = io_get_work_hash(work);
	struct io_wq_work *prev_work = NULL;

	if (io_wq_is_hashed(work) && work == wqe->hash_tail[hash]) {
		if (prev)
			prev_work = container_of(prev, struct io_wq_work, list);
		if (prev_work && io_get_work_hash(prev_work) == hash)
			wqe->hash_tail[hash] = prev_work;
		else
			wqe->hash_tail[hash] = NULL;
	}
944
	wq_list_del(&acct->work_list, &work->list, prev);
945 946
}

947 948 949 950
static bool io_acct_cancel_pending_work(struct io_wqe *wqe,
					struct io_wqe_acct *acct,
					struct io_cb_cancel_data *match)
	__releases(wqe->lock)
951
{
J
Jens Axboe 已提交
952
	struct io_wq_work_node *node, *prev;
953 954
	struct io_wq_work *work;

955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973
	wq_list_for_each(node, prev, &acct->work_list) {
		work = container_of(node, struct io_wq_work, list);
		if (!match->fn(work, match->data))
			continue;
		io_wqe_remove_pending(wqe, work, prev);
		raw_spin_unlock(&wqe->lock);
		io_run_cancel(work, wqe);
		match->nr_pending++;
		/* not safe to continue after unlock */
		return true;
	}

	return false;
}

static void io_wqe_cancel_pending_work(struct io_wqe *wqe,
				       struct io_cb_cancel_data *match)
{
	int i;
974
retry:
975
	raw_spin_lock(&wqe->lock);
976 977
	for (i = 0; i < IO_WQ_ACCT_NR; i++) {
		struct io_wqe_acct *acct = io_get_acct(wqe, i == 0);
978

979 980 981 982
		if (io_acct_cancel_pending_work(wqe, acct, match)) {
			if (match->cancel_all)
				goto retry;
			return;
983
		}
984
	}
985
	raw_spin_unlock(&wqe->lock);
986 987
}

988
static void io_wqe_cancel_running_work(struct io_wqe *wqe,
989 990
				       struct io_cb_cancel_data *match)
{
991
	rcu_read_lock();
992
	io_wq_for_each_worker(wqe, io_wq_worker_cancel, match);
993 994 995
	rcu_read_unlock();
}

996
enum io_wq_cancel io_wq_cancel_cb(struct io_wq *wq, work_cancel_fn *cancel,
997
				  void *data, bool cancel_all)
998
{
999
	struct io_cb_cancel_data match = {
1000 1001 1002
		.fn		= cancel,
		.data		= data,
		.cancel_all	= cancel_all,
1003
	};
J
Jann Horn 已提交
1004
	int node;
1005

1006 1007 1008 1009 1010
	/*
	 * First check pending list, if we're lucky we can just remove it
	 * from there. CANCEL_OK means that the work is returned as-new,
	 * no completion will be posted for it.
	 */
J
Jann Horn 已提交
1011 1012
	for_each_node(node) {
		struct io_wqe *wqe = wq->wqes[node];
1013

1014 1015
		io_wqe_cancel_pending_work(wqe, &match);
		if (match.nr_pending && !match.cancel_all)
1016
			return IO_WQ_CANCEL_OK;
1017 1018
	}

1019 1020 1021 1022 1023 1024 1025 1026 1027
	/*
	 * Now check if a free (going busy) or busy worker has the work
	 * currently running. If we find it there, we'll return CANCEL_RUNNING
	 * as an indication that we attempt to signal cancellation. The
	 * completion will run normally in this case.
	 */
	for_each_node(node) {
		struct io_wqe *wqe = wq->wqes[node];

1028 1029
		io_wqe_cancel_running_work(wqe, &match);
		if (match.nr_running && !match.cancel_all)
1030 1031 1032
			return IO_WQ_CANCEL_RUNNING;
	}

1033 1034 1035 1036
	if (match.nr_running)
		return IO_WQ_CANCEL_RUNNING;
	if (match.nr_pending)
		return IO_WQ_CANCEL_OK;
1037
	return IO_WQ_CANCEL_NOTFOUND;
1038 1039
}

1040 1041 1042 1043
static int io_wqe_hash_wake(struct wait_queue_entry *wait, unsigned mode,
			    int sync, void *key)
{
	struct io_wqe *wqe = container_of(wait, struct io_wqe, wait);
1044
	int i;
1045 1046 1047 1048

	list_del_init(&wait->entry);

	rcu_read_lock();
1049 1050 1051 1052 1053 1054
	for (i = 0; i < IO_WQ_ACCT_NR; i++) {
		struct io_wqe_acct *acct = &wqe->acct[i];

		if (test_and_clear_bit(IO_ACCT_STALLED_BIT, &acct->flags))
			io_wqe_activate_free_worker(wqe, acct);
	}
1055 1056 1057 1058
	rcu_read_unlock();
	return 1;
}

1059
struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data)
1060
{
1061
	int ret, node, i;
1062 1063
	struct io_wq *wq;

1064
	if (WARN_ON_ONCE(!data->free_work || !data->do_work))
1065
		return ERR_PTR(-EINVAL);
P
Pavel Begunkov 已提交
1066 1067
	if (WARN_ON_ONCE(!bounded))
		return ERR_PTR(-EINVAL);
1068

1069
	wq = kzalloc(struct_size(wq, wqes, nr_node_ids), GFP_KERNEL);
1070 1071
	if (!wq)
		return ERR_PTR(-ENOMEM);
1072 1073
	ret = cpuhp_state_add_instance_nocalls(io_wq_online, &wq->cpuhp_node);
	if (ret)
1074
		goto err_wq;
1075

1076 1077
	refcount_inc(&data->hash->refs);
	wq->hash = data->hash;
1078
	wq->free_work = data->free_work;
1079
	wq->do_work = data->do_work;
1080

1081
	ret = -ENOMEM;
J
Jann Horn 已提交
1082
	for_each_node(node) {
1083
		struct io_wqe *wqe;
1084
		int alloc_node = node;
1085

1086 1087 1088
		if (!node_online(alloc_node))
			alloc_node = NUMA_NO_NODE;
		wqe = kzalloc_node(sizeof(struct io_wqe), GFP_KERNEL, alloc_node);
1089
		if (!wqe)
J
Jann Horn 已提交
1090
			goto err;
J
Jens Axboe 已提交
1091 1092 1093
		if (!alloc_cpumask_var(&wqe->cpu_mask, GFP_KERNEL))
			goto err;
		cpumask_copy(wqe->cpu_mask, cpumask_of_node(node));
J
Jann Horn 已提交
1094
		wq->wqes[node] = wqe;
1095
		wqe->node = alloc_node;
1096
		wqe->acct[IO_WQ_ACCT_BOUND].max_workers = bounded;
J
Jens Axboe 已提交
1097
		wqe->acct[IO_WQ_ACCT_UNBOUND].max_workers =
1098
					task_rlimit(current, RLIMIT_NPROC);
1099
		INIT_LIST_HEAD(&wqe->wait.entry);
1100 1101 1102 1103 1104 1105 1106 1107
		wqe->wait.func = io_wqe_hash_wake;
		for (i = 0; i < IO_WQ_ACCT_NR; i++) {
			struct io_wqe_acct *acct = &wqe->acct[i];

			acct->index = i;
			atomic_set(&acct->nr_running, 0);
			INIT_WQ_LIST(&acct->work_list);
		}
1108
		wqe->wq = wq;
1109
		raw_spin_lock_init(&wqe->lock);
1110
		INIT_HLIST_NULLS_HEAD(&wqe->free_list, 0);
1111
		INIT_LIST_HEAD(&wqe->all_list);
1112 1113
	}

1114 1115 1116 1117
	wq->task = get_task_struct(data->task);
	atomic_set(&wq->worker_refs, 1);
	init_completion(&wq->worker_done);
	return wq;
1118
err:
1119
	io_wq_put_hash(data->hash);
1120
	cpuhp_state_remove_instance_nocalls(io_wq_online, &wq->cpuhp_node);
J
Jens Axboe 已提交
1121 1122 1123 1124
	for_each_node(node) {
		if (!wq->wqes[node])
			continue;
		free_cpumask_var(wq->wqes[node]->cpu_mask);
J
Jann Horn 已提交
1125
		kfree(wq->wqes[node]);
J
Jens Axboe 已提交
1126
	}
1127
err_wq:
1128
	kfree(wq);
1129 1130 1131
	return ERR_PTR(ret);
}

1132 1133
static bool io_task_work_match(struct callback_head *cb, void *data)
{
1134
	struct io_worker *worker;
1135

1136
	if (cb->func != create_worker_cb && cb->func != create_worker_cont)
1137
		return false;
1138 1139
	worker = container_of(cb, struct io_worker, create_work);
	return worker->wqe->wq == data;
1140 1141
}

1142 1143 1144 1145 1146
void io_wq_exit_start(struct io_wq *wq)
{
	set_bit(IO_WQ_BIT_EXIT, &wq->state);
}

1147
static void io_wq_exit_workers(struct io_wq *wq)
1148
{
1149 1150 1151 1152 1153 1154
	struct callback_head *cb;
	int node;

	if (!wq->task)
		return;

1155
	while ((cb = task_work_cancel_match(wq->task, io_task_work_match, wq)) != NULL) {
1156
		struct io_worker *worker;
1157
		struct io_wqe_acct *acct;
1158

1159
		worker = container_of(cb, struct io_worker, create_work);
1160 1161 1162 1163 1164
		acct = io_wqe_get_acct(worker);
		atomic_dec(&acct->nr_running);
		raw_spin_lock(&worker->wqe->lock);
		acct->nr_workers--;
		raw_spin_unlock(&worker->wqe->lock);
1165
		io_worker_ref_put(wq);
1166 1167
		clear_bit_unlock(0, &worker->create_state);
		io_worker_release(worker);
1168 1169 1170 1171 1172 1173 1174
	}

	rcu_read_lock();
	for_each_node(node) {
		struct io_wqe *wqe = wq->wqes[node];

		io_wq_for_each_worker(wqe, io_wq_worker_wake, NULL);
1175
	}
1176 1177 1178
	rcu_read_unlock();
	io_worker_ref_put(wq);
	wait_for_completion(&wq->worker_done);
1179 1180 1181 1182 1183 1184

	for_each_node(node) {
		spin_lock_irq(&wq->hash->wait.lock);
		list_del_init(&wq->wqes[node]->wait.entry);
		spin_unlock_irq(&wq->hash->wait.lock);
	}
1185 1186
	put_task_struct(wq->task);
	wq->task = NULL;
1187 1188
}

1189
static void io_wq_destroy(struct io_wq *wq)
1190
{
J
Jann Horn 已提交
1191
	int node;
1192

1193 1194
	cpuhp_state_remove_instance_nocalls(io_wq_online, &wq->cpuhp_node);

1195 1196
	for_each_node(node) {
		struct io_wqe *wqe = wq->wqes[node];
1197 1198 1199 1200 1201
		struct io_cb_cancel_data match = {
			.fn		= io_wq_work_match_all,
			.cancel_all	= true,
		};
		io_wqe_cancel_pending_work(wqe, &match);
J
Jens Axboe 已提交
1202
		free_cpumask_var(wqe->cpu_mask);
1203 1204 1205
		kfree(wqe);
	}
	io_wq_put_hash(wq->hash);
1206
	kfree(wq);
1207 1208
}

1209 1210
void io_wq_put_and_exit(struct io_wq *wq)
{
1211 1212
	WARN_ON_ONCE(!test_bit(IO_WQ_BIT_EXIT, &wq->state));

1213
	io_wq_exit_workers(wq);
1214
	io_wq_destroy(wq);
1215 1216
}

J
Jens Axboe 已提交
1217 1218 1219 1220 1221
struct online_data {
	unsigned int cpu;
	bool online;
};

1222 1223
static bool io_wq_worker_affinity(struct io_worker *worker, void *data)
{
J
Jens Axboe 已提交
1224
	struct online_data *od = data;
1225

J
Jens Axboe 已提交
1226 1227 1228 1229
	if (od->online)
		cpumask_set_cpu(od->cpu, worker->wqe->cpu_mask);
	else
		cpumask_clear_cpu(od->cpu, worker->wqe->cpu_mask);
1230 1231 1232
	return false;
}

J
Jens Axboe 已提交
1233
static int __io_wq_cpu_online(struct io_wq *wq, unsigned int cpu, bool online)
1234
{
J
Jens Axboe 已提交
1235 1236 1237 1238
	struct online_data od = {
		.cpu = cpu,
		.online = online
	};
1239 1240 1241 1242
	int i;

	rcu_read_lock();
	for_each_node(i)
J
Jens Axboe 已提交
1243
		io_wq_for_each_worker(wq->wqes[i], io_wq_worker_affinity, &od);
1244 1245 1246 1247
	rcu_read_unlock();
	return 0;
}

J
Jens Axboe 已提交
1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 1259 1260 1261
static int io_wq_cpu_online(unsigned int cpu, struct hlist_node *node)
{
	struct io_wq *wq = hlist_entry_safe(node, struct io_wq, cpuhp_node);

	return __io_wq_cpu_online(wq, cpu, true);
}

static int io_wq_cpu_offline(unsigned int cpu, struct hlist_node *node)
{
	struct io_wq *wq = hlist_entry_safe(node, struct io_wq, cpuhp_node);

	return __io_wq_cpu_online(wq, cpu, false);
}

1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274 1275 1276 1277 1278
int io_wq_cpu_affinity(struct io_wq *wq, cpumask_var_t mask)
{
	int i;

	rcu_read_lock();
	for_each_node(i) {
		struct io_wqe *wqe = wq->wqes[i];

		if (mask)
			cpumask_copy(wqe->cpu_mask, mask);
		else
			cpumask_copy(wqe->cpu_mask, cpumask_of_node(i));
	}
	rcu_read_unlock();
	return 0;
}

1279 1280 1281 1282 1283 1284 1285 1286 1287 1288 1289 1290 1291 1292 1293 1294 1295
/*
 * Set max number of unbounded workers, returns old value. If new_count is 0,
 * then just return the old value.
 */
int io_wq_max_workers(struct io_wq *wq, int *new_count)
{
	int i, node, prev = 0;

	for (i = 0; i < 2; i++) {
		if (new_count[i] > task_rlimit(current, RLIMIT_NPROC))
			new_count[i] = task_rlimit(current, RLIMIT_NPROC);
	}

	rcu_read_lock();
	for_each_node(node) {
		struct io_wqe_acct *acct;

1296
		for (i = 0; i < IO_WQ_ACCT_NR; i++) {
1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307
			acct = &wq->wqes[node]->acct[i];
			prev = max_t(int, acct->max_workers, prev);
			if (new_count[i])
				acct->max_workers = new_count[i];
			new_count[i] = prev;
		}
	}
	rcu_read_unlock();
	return 0;
}

1308 1309 1310 1311 1312
static __init int io_wq_init(void)
{
	int ret;

	ret = cpuhp_setup_state_multi(CPUHP_AP_ONLINE_DYN, "io-wq/online",
J
Jens Axboe 已提交
1313
					io_wq_cpu_online, io_wq_cpu_offline);
1314 1315 1316 1317 1318 1319
	if (ret < 0)
		return ret;
	io_wq_online = ret;
	return 0;
}
subsys_initcall(io_wq_init);