io-wq.c 30.5 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14
// SPDX-License-Identifier: GPL-2.0
/*
 * Basic worker thread pool for io_uring
 *
 * Copyright (C) 2019 Jens Axboe
 *
 */
#include <linux/kernel.h>
#include <linux/init.h>
#include <linux/errno.h>
#include <linux/sched/signal.h>
#include <linux/percpu.h>
#include <linux/slab.h>
#include <linux/rculist_nulls.h>
15
#include <linux/cpu.h>
16
#include <linux/tracehook.h>
17 18 19 20 21 22 23 24 25

#include "io-wq.h"

#define WORKER_IDLE_TIMEOUT	(5 * HZ)

enum {
	IO_WORKER_F_UP		= 1,	/* up and active */
	IO_WORKER_F_RUNNING	= 2,	/* account as running */
	IO_WORKER_F_FREE	= 4,	/* worker on free list */
J
Jens Axboe 已提交
26
	IO_WORKER_F_BOUND	= 8,	/* is doing bounded work */
27 28 29 30 31 32 33
};

enum {
	IO_WQ_BIT_EXIT		= 0,	/* wq exiting */
};

enum {
34
	IO_ACCT_STALLED_BIT	= 0,	/* stalled on hash */
35 36 37 38 39 40 41 42 43
};

/*
 * One for each thread in a wqe pool
 */
struct io_worker {
	refcount_t ref;
	unsigned flags;
	struct hlist_nulls_node nulls_node;
44
	struct list_head all_list;
45 46
	struct task_struct *task;
	struct io_wqe *wqe;
47

48
	struct io_wq_work *cur_work;
49
	spinlock_t lock;
50

51 52
	struct completion ref_done;

53 54 55 56
	unsigned long create_state;
	struct callback_head create_work;
	int create_index;

57 58 59 60
	union {
		struct rcu_head rcu;
		struct work_struct work;
	};
61 62 63 64 65 66 67 68
};

#if BITS_PER_LONG == 64
#define IO_WQ_HASH_ORDER	6
#else
#define IO_WQ_HASH_ORDER	5
#endif

69 70
#define IO_WQ_NR_HASH_BUCKETS	(1u << IO_WQ_HASH_ORDER)

71 72 73
struct io_wqe_acct {
	unsigned nr_workers;
	unsigned max_workers;
74
	int index;
75
	atomic_t nr_running;
76 77
	struct io_wq_work_list work_list;
	unsigned long flags;
78 79 80 81 82
};

enum {
	IO_WQ_ACCT_BOUND,
	IO_WQ_ACCT_UNBOUND,
83
	IO_WQ_ACCT_NR,
84 85
};

86 87 88 89
/*
 * Per-node worker thread pool
 */
struct io_wqe {
90 91
	raw_spinlock_t lock;
	struct io_wqe_acct acct[2];
92 93 94

	int node;

95
	struct hlist_nulls_head free_list;
96
	struct list_head all_list;
97

98 99
	struct wait_queue_entry wait;

100
	struct io_wq *wq;
101
	struct io_wq_work *hash_tail[IO_WQ_NR_HASH_BUCKETS];
J
Jens Axboe 已提交
102 103

	cpumask_var_t cpu_mask;
104 105 106 107 108 109 110 111
};

/*
 * Per io_wq state
  */
struct io_wq {
	unsigned long state;

112
	free_work_fn *free_work;
113
	io_wq_work_fn *do_work;
114

115 116
	struct io_wq_hash *hash;

117 118 119
	atomic_t worker_refs;
	struct completion worker_done;

120
	struct hlist_node cpuhp_node;
121

122
	struct task_struct *task;
123 124

	struct io_wqe *wqes[];
125 126
};

127 128
static enum cpuhp_state io_wq_online;

129 130 131 132 133 134 135 136
struct io_cb_cancel_data {
	work_cancel_fn *fn;
	void *data;
	int nr_running;
	int nr_pending;
	bool cancel_all;
};

137
static bool create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index);
138
static void io_wqe_dec_running(struct io_worker *worker);
139 140 141
static bool io_acct_cancel_pending_work(struct io_wqe *wqe,
					struct io_wqe_acct *acct,
					struct io_cb_cancel_data *match);
142

143 144 145 146 147 148 149 150
static bool io_worker_get(struct io_worker *worker)
{
	return refcount_inc_not_zero(&worker->ref);
}

static void io_worker_release(struct io_worker *worker)
{
	if (refcount_dec_and_test(&worker->ref))
151
		complete(&worker->ref_done);
152 153
}

P
Pavel Begunkov 已提交
154 155 156 157 158
static inline struct io_wqe_acct *io_get_acct(struct io_wqe *wqe, bool bound)
{
	return &wqe->acct[bound ? IO_WQ_ACCT_BOUND : IO_WQ_ACCT_UNBOUND];
}

159 160 161
static inline struct io_wqe_acct *io_work_get_acct(struct io_wqe *wqe,
						   struct io_wq_work *work)
{
P
Pavel Begunkov 已提交
162
	return io_get_acct(wqe, !(work->flags & IO_WQ_WORK_UNBOUND));
163 164
}

165
static inline struct io_wqe_acct *io_wqe_get_acct(struct io_worker *worker)
166
{
P
Pavel Begunkov 已提交
167
	return io_get_acct(worker->wqe, worker->flags & IO_WORKER_F_BOUND);
168 169
}

170 171 172 173 174 175
static void io_worker_ref_put(struct io_wq *wq)
{
	if (atomic_dec_and_test(&wq->worker_refs))
		complete(&wq->worker_done);
}

176 177 178
static void io_worker_exit(struct io_worker *worker)
{
	struct io_wqe *wqe = worker->wqe;
179
	struct io_wqe_acct *acct = io_wqe_get_acct(worker);
180

181 182 183
	if (refcount_dec_and_test(&worker->ref))
		complete(&worker->ref_done);
	wait_for_completion(&worker->ref_done);
184

185
	raw_spin_lock(&wqe->lock);
186
	if (worker->flags & IO_WORKER_F_FREE)
187
		hlist_nulls_del_rcu(&worker->nulls_node);
188
	list_del_rcu(&worker->all_list);
189
	acct->nr_workers--;
190 191 192 193 194
	preempt_disable();
	io_wqe_dec_running(worker);
	worker->flags = 0;
	current->flags &= ~PF_IO_WORKER;
	preempt_enable();
195
	raw_spin_unlock(&wqe->lock);
196

197
	kfree_rcu(worker, rcu);
198
	io_worker_ref_put(wqe->wq);
199
	do_exit(0);
200 201
}

202
static inline bool io_acct_run_queue(struct io_wqe_acct *acct)
203
{
204 205
	if (!wq_list_empty(&acct->work_list) &&
	    !test_bit(IO_ACCT_STALLED_BIT, &acct->flags))
206 207 208 209 210 211
		return true;
	return false;
}

/*
 * Check head of free list for an available worker. If one isn't available,
212
 * caller must create one.
213
 */
214 215
static bool io_wqe_activate_free_worker(struct io_wqe *wqe,
					struct io_wqe_acct *acct)
216 217 218 219 220
	__must_hold(RCU)
{
	struct hlist_nulls_node *n;
	struct io_worker *worker;

221 222 223 224 225 226 227 228
	/*
	 * Iterate free_list and see if we can find an idle worker to
	 * activate. If a given worker is on the free_list but in the process
	 * of exiting, keep trying.
	 */
	hlist_nulls_for_each_entry_rcu(worker, n, &wqe->free_list, nulls_node) {
		if (!io_worker_get(worker))
			continue;
229 230 231 232
		if (io_wqe_get_acct(worker) != acct) {
			io_worker_release(worker);
			continue;
		}
233 234 235 236
		if (wake_up_process(worker->task)) {
			io_worker_release(worker);
			return true;
		}
237 238 239 240 241 242 243 244
		io_worker_release(worker);
	}

	return false;
}

/*
 * We need a worker. If we find a free one, we're good. If not, and we're
245
 * below the max number of workers, create one.
246
 */
247
static bool io_wqe_create_worker(struct io_wqe *wqe, struct io_wqe_acct *acct)
248
{
J
Jens Axboe 已提交
249
	bool do_create = false;
250 251 252 253 254

	/*
	 * Most likely an attempt to queue unbounded work on an io_wq that
	 * wasn't setup with any unbounded workers.
	 */
P
Pavel Begunkov 已提交
255 256
	if (unlikely(!acct->max_workers))
		pr_warn_once("io-wq is not configured for unbound workers");
257

258 259 260 261 262 263 264 265 266
	raw_spin_lock(&wqe->lock);
	if (acct->nr_workers < acct->max_workers) {
		acct->nr_workers++;
		do_create = true;
	}
	raw_spin_unlock(&wqe->lock);
	if (do_create) {
		atomic_inc(&acct->nr_running);
		atomic_inc(&wqe->wq->worker_refs);
267
		return create_io_worker(wqe->wq, wqe, acct->index);
268
	}
269 270

	return true;
271 272
}

273
static void io_wqe_inc_running(struct io_worker *worker)
274
{
275
	struct io_wqe_acct *acct = io_wqe_get_acct(worker);
276 277 278 279

	atomic_inc(&acct->nr_running);
}

280 281
static void create_worker_cb(struct callback_head *cb)
{
282
	struct io_worker *worker;
283
	struct io_wq *wq;
284 285
	struct io_wqe *wqe;
	struct io_wqe_acct *acct;
J
Jens Axboe 已提交
286
	bool do_create = false;
287

288 289
	worker = container_of(cb, struct io_worker, create_work);
	wqe = worker->wqe;
290
	wq = wqe->wq;
291
	acct = &wqe->acct[worker->create_index];
292
	raw_spin_lock(&wqe->lock);
293
	if (acct->nr_workers < acct->max_workers) {
294
		acct->nr_workers++;
295 296
		do_create = true;
	}
297
	raw_spin_unlock(&wqe->lock);
298
	if (do_create) {
J
Jens Axboe 已提交
299
		create_io_worker(wq, wqe, worker->create_index);
300 301 302 303
	} else {
		atomic_dec(&acct->nr_running);
		io_worker_ref_put(wq);
	}
304 305
	clear_bit_unlock(0, &worker->create_state);
	io_worker_release(worker);
306 307
}

308 309 310
static bool io_queue_worker_create(struct io_worker *worker,
				   struct io_wqe_acct *acct,
				   task_work_func_t func)
311
{
312
	struct io_wqe *wqe = worker->wqe;
313 314 315 316 317
	struct io_wq *wq = wqe->wq;

	/* raced with exit, just ignore create call */
	if (test_bit(IO_WQ_BIT_EXIT, &wq->state))
		goto fail;
318 319 320 321 322 323 324 325 326 327 328
	if (!io_worker_get(worker))
		goto fail;
	/*
	 * create_state manages ownership of create_work/index. We should
	 * only need one entry per worker, as the worker going to sleep
	 * will trigger the condition, and waking will clear it once it
	 * runs the task_work.
	 */
	if (test_bit(0, &worker->create_state) ||
	    test_and_set_bit_lock(0, &worker->create_state))
		goto fail_release;
329

330
	init_task_work(&worker->create_work, func);
331 332
	worker->create_index = acct->index;
	if (!task_work_add(wq->task, &worker->create_work, TWA_SIGNAL))
333
		return true;
334 335 336
	clear_bit_unlock(0, &worker->create_state);
fail_release:
	io_worker_release(worker);
337 338 339
fail:
	atomic_dec(&acct->nr_running);
	io_worker_ref_put(wq);
340
	return false;
341 342
}

343
static void io_wqe_dec_running(struct io_worker *worker)
344 345
	__must_hold(wqe->lock)
{
346 347
	struct io_wqe_acct *acct = io_wqe_get_acct(worker);
	struct io_wqe *wqe = worker->wqe;
348

349 350 351
	if (!(worker->flags & IO_WORKER_F_UP))
		return;

352
	if (atomic_dec_and_test(&acct->nr_running) && io_acct_run_queue(acct)) {
353 354
		atomic_inc(&acct->nr_running);
		atomic_inc(&wqe->wq->worker_refs);
355
		io_queue_worker_create(worker, acct, create_worker_cb);
356
	}
357 358
}

359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379
/*
 * Worker will start processing some work. Move it to the busy list, if
 * it's currently on the freelist
 */
static void __io_worker_busy(struct io_wqe *wqe, struct io_worker *worker,
			     struct io_wq_work *work)
	__must_hold(wqe->lock)
{
	if (worker->flags & IO_WORKER_F_FREE) {
		worker->flags &= ~IO_WORKER_F_FREE;
		hlist_nulls_del_init_rcu(&worker->nulls_node);
	}
}

/*
 * No work, worker going to sleep. Move to freelist, and unuse mm if we
 * have one attached. Dropping the mm may potentially sleep, so we drop
 * the lock in that case and return success. Since the caller has to
 * retry the loop in that case (we changed task state), we don't regrab
 * the lock if we return success.
 */
380
static void __io_worker_idle(struct io_wqe *wqe, struct io_worker *worker)
381 382 383 384
	__must_hold(wqe->lock)
{
	if (!(worker->flags & IO_WORKER_F_FREE)) {
		worker->flags |= IO_WORKER_F_FREE;
385
		hlist_nulls_add_head_rcu(&worker->nulls_node, &wqe->free_list);
386 387 388
	}
}

P
Pavel Begunkov 已提交
389 390 391 392 393
static inline unsigned int io_get_work_hash(struct io_wq_work *work)
{
	return work->flags >> IO_WQ_HASH_SHIFT;
}

394 395 396 397
static void io_wait_on_hash(struct io_wqe *wqe, unsigned int hash)
{
	struct io_wq *wq = wqe->wq;

398
	spin_lock_irq(&wq->hash->wait.lock);
399 400 401 402 403 404 405
	if (list_empty(&wqe->wait.entry)) {
		__add_wait_queue(&wq->hash->wait, &wqe->wait);
		if (!test_bit(hash, &wq->hash->map)) {
			__set_current_state(TASK_RUNNING);
			list_del_init(&wqe->wait.entry);
		}
	}
406
	spin_unlock_irq(&wq->hash->wait.lock);
407 408
}

409
static struct io_wq_work *io_get_next_work(struct io_wqe_acct *acct,
J
Jens Axboe 已提交
410
					   struct io_worker *worker)
411 412
	__must_hold(wqe->lock)
{
J
Jens Axboe 已提交
413
	struct io_wq_work_node *node, *prev;
414
	struct io_wq_work *work, *tail;
415
	unsigned int stall_hash = -1U;
416
	struct io_wqe *wqe = worker->wqe;
417

418
	wq_list_for_each(node, prev, &acct->work_list) {
419 420
		unsigned int hash;

J
Jens Axboe 已提交
421 422
		work = container_of(node, struct io_wq_work, list);

423
		/* not hashed, can run anytime */
424
		if (!io_wq_is_hashed(work)) {
425
			wq_list_del(&acct->work_list, node, prev);
426 427 428
			return work;
		}

P
Pavel Begunkov 已提交
429
		hash = io_get_work_hash(work);
430 431 432 433 434
		/* all items with this hash lie in [work, tail] */
		tail = wqe->hash_tail[hash];

		/* hashed, can run if not already running */
		if (!test_and_set_bit(hash, &wqe->wq->hash->map)) {
435
			wqe->hash_tail[hash] = NULL;
436
			wq_list_cut(&acct->work_list, &tail->list, prev);
437 438
			return work;
		}
439 440 441 442 443 444 445
		if (stall_hash == -1U)
			stall_hash = hash;
		/* fast forward to a next hash, for-each will fix up @prev */
		node = &tail->list;
	}

	if (stall_hash != -1U) {
J
Jens Axboe 已提交
446 447 448 449
		/*
		 * Set this before dropping the lock to avoid racing with new
		 * work being added and clearing the stalled bit.
		 */
450
		set_bit(IO_ACCT_STALLED_BIT, &acct->flags);
451 452 453
		raw_spin_unlock(&wqe->lock);
		io_wait_on_hash(wqe, stall_hash);
		raw_spin_lock(&wqe->lock);
454 455 456 457 458
	}

	return NULL;
}

459
static bool io_flush_signals(void)
460
{
461
	if (unlikely(test_thread_flag(TIF_NOTIFY_SIGNAL))) {
462
		__set_current_state(TASK_RUNNING);
463
		tracehook_notify_signal();
464
		return true;
465
	}
466
	return false;
467 468 469 470 471
}

static void io_assign_current_work(struct io_worker *worker,
				   struct io_wq_work *work)
{
472
	if (work) {
473
		io_flush_signals();
474 475
		cond_resched();
	}
476

477
	spin_lock(&worker->lock);
478
	worker->cur_work = work;
479
	spin_unlock(&worker->lock);
480 481
}

P
Pavel Begunkov 已提交
482 483
static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work);

484 485 486
static void io_worker_handle_work(struct io_worker *worker)
	__releases(wqe->lock)
{
487
	struct io_wqe_acct *acct = io_wqe_get_acct(worker);
488 489
	struct io_wqe *wqe = worker->wqe;
	struct io_wq *wq = wqe->wq;
490
	bool do_kill = test_bit(IO_WQ_BIT_EXIT, &wq->state);
491 492

	do {
493
		struct io_wq_work *work;
494
get_next:
495 496 497 498 499 500 501
		/*
		 * If we got some work, mark us as busy. If we didn't, but
		 * the list isn't empty, it means we stalled on hashed work.
		 * Mark us stalled so we don't keep looking for work when we
		 * can't make progress, any work completion or insertion will
		 * clear the stalled flag.
		 */
502
		work = io_get_next_work(acct, worker);
503 504 505
		if (work)
			__io_worker_busy(wqe, worker, work);

506
		raw_spin_unlock(&wqe->lock);
507 508
		if (!work)
			break;
509
		io_assign_current_work(worker, work);
510
		__set_current_state(TASK_RUNNING);
511

512 513
		/* handle a whole dependent link */
		do {
514
			struct io_wq_work *next_hashed, *linked;
P
Pavel Begunkov 已提交
515
			unsigned int hash = io_get_work_hash(work);
516

517
			next_hashed = wq_next_work(work);
518 519 520

			if (unlikely(do_kill) && (work->flags & IO_WQ_WORK_UNBOUND))
				work->flags |= IO_WQ_WORK_CANCEL;
521 522
			wq->do_work(work);
			io_assign_current_work(worker, NULL);
523

524
			linked = wq->free_work(work);
525 526 527 528 529 530 531 532 533 534
			work = next_hashed;
			if (!work && linked && !io_wq_is_hashed(linked)) {
				work = linked;
				linked = NULL;
			}
			io_assign_current_work(worker, work);
			if (linked)
				io_wqe_enqueue(wqe, linked);

			if (hash != -1U && !next_hashed) {
535
				clear_bit(hash, &wq->hash->map);
536
				clear_bit(IO_ACCT_STALLED_BIT, &acct->flags);
537 538
				if (wq_has_sleeper(&wq->hash->wait))
					wake_up(&wq->hash->wait);
539
				raw_spin_lock(&wqe->lock);
540 541 542
				/* skip unnecessary unlock-lock wqe->lock */
				if (!work)
					goto get_next;
543
				raw_spin_unlock(&wqe->lock);
544
			}
545
		} while (work);
546

547
		raw_spin_lock(&wqe->lock);
548 549 550 551 552 553
	} while (1);
}

static int io_wqe_worker(void *data)
{
	struct io_worker *worker = data;
554
	struct io_wqe_acct *acct = io_wqe_get_acct(worker);
555 556
	struct io_wqe *wqe = worker->wqe;
	struct io_wq *wq = wqe->wq;
J
Jens Axboe 已提交
557
	bool last_timeout = false;
558
	char buf[TASK_COMM_LEN];
559

560 561
	worker->flags |= (IO_WORKER_F_UP | IO_WORKER_F_RUNNING);

562
	snprintf(buf, sizeof(buf), "iou-wrk-%d", wq->task->pid);
563
	set_task_comm(current, buf);
564 565

	while (!test_bit(IO_WQ_BIT_EXIT, &wq->state)) {
566 567
		long ret;

J
Jens Axboe 已提交
568
		set_current_state(TASK_INTERRUPTIBLE);
569
loop:
570
		raw_spin_lock(&wqe->lock);
571
		if (io_acct_run_queue(acct)) {
572
			io_worker_handle_work(worker);
573
			goto loop;
574
		}
J
Jens Axboe 已提交
575 576 577 578 579 580 581
		/* timed out, exit unless we're the last worker */
		if (last_timeout && acct->nr_workers > 1) {
			raw_spin_unlock(&wqe->lock);
			__set_current_state(TASK_RUNNING);
			break;
		}
		last_timeout = false;
582
		__io_worker_idle(wqe, worker);
583
		raw_spin_unlock(&wqe->lock);
584 585
		if (io_flush_signals())
			continue;
586
		ret = schedule_timeout(WORKER_IDLE_TIMEOUT);
587 588 589 590 591
		if (signal_pending(current)) {
			struct ksignal ksig;

			if (!get_signal(&ksig))
				continue;
J
Jens Axboe 已提交
592 593 594
			if (fatal_signal_pending(current))
				break;
			continue;
595
		}
J
Jens Axboe 已提交
596
		last_timeout = !ret;
597 598 599
	}

	if (test_bit(IO_WQ_BIT_EXIT, &wq->state)) {
600
		raw_spin_lock(&wqe->lock);
P
Pavel Begunkov 已提交
601
		io_worker_handle_work(worker);
602 603 604 605 606 607 608 609 610 611 612
	}

	io_worker_exit(worker);
	return 0;
}

/*
 * Called when a worker is scheduled in. Mark us as currently running.
 */
void io_wq_worker_running(struct task_struct *tsk)
{
613
	struct io_worker *worker = tsk->pf_io_worker;
614

615 616
	if (!worker)
		return;
617 618 619 620 621
	if (!(worker->flags & IO_WORKER_F_UP))
		return;
	if (worker->flags & IO_WORKER_F_RUNNING)
		return;
	worker->flags |= IO_WORKER_F_RUNNING;
622
	io_wqe_inc_running(worker);
623 624 625 626
}

/*
 * Called when worker is going to sleep. If there are no workers currently
627
 * running and we have work pending, wake up a free one or create a new one.
628 629 630
 */
void io_wq_worker_sleeping(struct task_struct *tsk)
{
631
	struct io_worker *worker = tsk->pf_io_worker;
632

633 634
	if (!worker)
		return;
635 636 637 638 639 640 641
	if (!(worker->flags & IO_WORKER_F_UP))
		return;
	if (!(worker->flags & IO_WORKER_F_RUNNING))
		return;

	worker->flags &= ~IO_WORKER_F_RUNNING;

642
	raw_spin_lock(&worker->wqe->lock);
643
	io_wqe_dec_running(worker);
644
	raw_spin_unlock(&worker->wqe->lock);
645 646
}

647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731
static void io_init_new_worker(struct io_wqe *wqe, struct io_worker *worker,
			       struct task_struct *tsk)
{
	tsk->pf_io_worker = worker;
	worker->task = tsk;
	set_cpus_allowed_ptr(tsk, wqe->cpu_mask);
	tsk->flags |= PF_NO_SETAFFINITY;

	raw_spin_lock(&wqe->lock);
	hlist_nulls_add_head_rcu(&worker->nulls_node, &wqe->free_list);
	list_add_tail_rcu(&worker->all_list, &wqe->all_list);
	worker->flags |= IO_WORKER_F_FREE;
	raw_spin_unlock(&wqe->lock);
	wake_up_new_task(tsk);
}

static bool io_wq_work_match_all(struct io_wq_work *work, void *data)
{
	return true;
}

static inline bool io_should_retry_thread(long err)
{
	switch (err) {
	case -EAGAIN:
	case -ERESTARTSYS:
	case -ERESTARTNOINTR:
	case -ERESTARTNOHAND:
		return true;
	default:
		return false;
	}
}

static void create_worker_cont(struct callback_head *cb)
{
	struct io_worker *worker;
	struct task_struct *tsk;
	struct io_wqe *wqe;

	worker = container_of(cb, struct io_worker, create_work);
	clear_bit_unlock(0, &worker->create_state);
	wqe = worker->wqe;
	tsk = create_io_thread(io_wqe_worker, worker, wqe->node);
	if (!IS_ERR(tsk)) {
		io_init_new_worker(wqe, worker, tsk);
		io_worker_release(worker);
		return;
	} else if (!io_should_retry_thread(PTR_ERR(tsk))) {
		struct io_wqe_acct *acct = io_wqe_get_acct(worker);

		atomic_dec(&acct->nr_running);
		raw_spin_lock(&wqe->lock);
		acct->nr_workers--;
		if (!acct->nr_workers) {
			struct io_cb_cancel_data match = {
				.fn		= io_wq_work_match_all,
				.cancel_all	= true,
			};

			while (io_acct_cancel_pending_work(wqe, acct, &match))
				raw_spin_lock(&wqe->lock);
		}
		raw_spin_unlock(&wqe->lock);
		io_worker_ref_put(wqe->wq);
		return;
	}

	/* re-create attempts grab a new worker ref, drop the existing one */
	io_worker_release(worker);
	schedule_work(&worker->work);
}

static void io_workqueue_create(struct work_struct *work)
{
	struct io_worker *worker = container_of(work, struct io_worker, work);
	struct io_wqe_acct *acct = io_wqe_get_acct(worker);

	if (!io_queue_worker_create(worker, acct, create_worker_cont)) {
		clear_bit_unlock(0, &worker->create_state);
		io_worker_release(worker);
	}
}

static bool create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index)
732
{
733
	struct io_wqe_acct *acct = &wqe->acct[index];
734
	struct io_worker *worker;
735
	struct task_struct *tsk;
736

737 738
	__set_current_state(TASK_RUNNING);

739
	worker = kzalloc_node(sizeof(*worker), GFP_KERNEL, wqe->node);
740
	if (!worker) {
741 742
fail:
		atomic_dec(&acct->nr_running);
743
		raw_spin_lock(&wqe->lock);
744
		acct->nr_workers--;
745
		raw_spin_unlock(&wqe->lock);
746
		io_worker_ref_put(wq);
747
		return false;
748
	}
749

750 751 752 753
	refcount_set(&worker->ref, 1);
	worker->wqe = wqe;
	spin_lock_init(&worker->lock);
	init_completion(&worker->ref_done);
754 755 756

	if (index == IO_WQ_ACCT_BOUND)
		worker->flags |= IO_WORKER_F_BOUND;
757 758 759 760 761 762 763 764 765 766 767 768

	tsk = create_io_thread(io_wqe_worker, worker, wqe->node);
	if (!IS_ERR(tsk)) {
		io_init_new_worker(wqe, worker, tsk);
	} else if (!io_should_retry_thread(PTR_ERR(tsk))) {
		goto fail;
	} else {
		INIT_WORK(&worker->work, io_workqueue_create);
		schedule_work(&worker->work);
	}

	return true;
769 770
}

771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797
/*
 * Iterate the passed in list and call the specific function for each
 * worker that isn't exiting
 */
static bool io_wq_for_each_worker(struct io_wqe *wqe,
				  bool (*func)(struct io_worker *, void *),
				  void *data)
{
	struct io_worker *worker;
	bool ret = false;

	list_for_each_entry_rcu(worker, &wqe->all_list, all_list) {
		if (io_worker_get(worker)) {
			/* no task if node is/was offline */
			if (worker->task)
				ret = func(worker, data);
			io_worker_release(worker);
			if (ret)
				break;
		}
	}

	return ret;
}

static bool io_wq_worker_wake(struct io_worker *worker, void *data)
{
798
	set_notify_signal(worker->task);
799 800 801 802
	wake_up_process(worker->task);
	return false;
}

803
static void io_run_cancel(struct io_wq_work *work, struct io_wqe *wqe)
804
{
805 806
	struct io_wq *wq = wqe->wq;

807 808
	do {
		work->flags |= IO_WQ_WORK_CANCEL;
809 810
		wq->do_work(work);
		work = wq->free_work(work);
811 812 813
	} while (work);
}

814 815
static void io_wqe_insert_work(struct io_wqe *wqe, struct io_wq_work *work)
{
816
	struct io_wqe_acct *acct = io_work_get_acct(wqe, work);
817 818 819 820 821
	unsigned int hash;
	struct io_wq_work *tail;

	if (!io_wq_is_hashed(work)) {
append:
822
		wq_list_add_tail(&work->list, &acct->work_list);
823 824 825 826 827 828 829 830 831
		return;
	}

	hash = io_get_work_hash(work);
	tail = wqe->hash_tail[hash];
	wqe->hash_tail[hash] = work;
	if (!tail)
		goto append;

832
	wq_list_add_after(&work->list, &tail->list, &acct->work_list);
833 834
}

835 836
static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work)
{
837
	struct io_wqe_acct *acct = io_work_get_acct(wqe, work);
838 839
	unsigned work_flags = work->flags;
	bool do_create;
840

841 842 843 844 845 846
	/*
	 * If io-wq is exiting for this task, or if the request has explicitly
	 * been marked as one that should not get executed, cancel it here.
	 */
	if (test_bit(IO_WQ_BIT_EXIT, &wqe->wq->state) ||
	    (work->flags & IO_WQ_WORK_CANCEL)) {
847
run_cancel:
848
		io_run_cancel(work, wqe);
849 850 851
		return;
	}

852
	raw_spin_lock(&wqe->lock);
853
	io_wqe_insert_work(wqe, work);
854
	clear_bit(IO_ACCT_STALLED_BIT, &acct->flags);
855 856

	rcu_read_lock();
857
	do_create = !io_wqe_activate_free_worker(wqe, acct);
858 859
	rcu_read_unlock();

860
	raw_spin_unlock(&wqe->lock);
861

862
	if (do_create && ((work_flags & IO_WQ_WORK_CONCURRENT) ||
863 864 865 866 867 868 869 870 871 872 873 874 875 876
	    !atomic_read(&acct->nr_running))) {
		bool did_create;

		did_create = io_wqe_create_worker(wqe, acct);
		if (unlikely(!did_create)) {
			raw_spin_lock(&wqe->lock);
			/* fatal condition, failed to create the first worker */
			if (!acct->nr_workers) {
				raw_spin_unlock(&wqe->lock);
				goto run_cancel;
			}
			raw_spin_unlock(&wqe->lock);
		}
	}
877 878 879 880 881 882 883 884 885 886
}

void io_wq_enqueue(struct io_wq *wq, struct io_wq_work *work)
{
	struct io_wqe *wqe = wq->wqes[numa_node_id()];

	io_wqe_enqueue(wqe, work);
}

/*
887 888
 * Work items that hash to the same value will not be done in parallel.
 * Used to limit concurrent writes, generally hashed by inode.
889
 */
890
void io_wq_hash_work(struct io_wq_work *work, void *val)
891
{
892
	unsigned int bit;
893 894 895 896 897

	bit = hash_ptr(val, IO_WQ_HASH_ORDER);
	work->flags |= (IO_WQ_WORK_HASHED | (bit << IO_WQ_HASH_SHIFT));
}

898
static bool io_wq_worker_cancel(struct io_worker *worker, void *data)
899
{
900
	struct io_cb_cancel_data *match = data;
901 902 903

	/*
	 * Hold the lock to avoid ->cur_work going out of scope, caller
904
	 * may dereference the passed in work.
905
	 */
906
	spin_lock(&worker->lock);
907
	if (worker->cur_work &&
908
	    match->fn(worker->cur_work, match->data)) {
909
		set_notify_signal(worker->task);
910
		match->nr_running++;
911
	}
912
	spin_unlock(&worker->lock);
913

914
	return match->nr_running && !match->cancel_all;
915 916
}

917 918 919 920
static inline void io_wqe_remove_pending(struct io_wqe *wqe,
					 struct io_wq_work *work,
					 struct io_wq_work_node *prev)
{
921
	struct io_wqe_acct *acct = io_work_get_acct(wqe, work);
922 923 924 925 926 927 928 929 930 931 932
	unsigned int hash = io_get_work_hash(work);
	struct io_wq_work *prev_work = NULL;

	if (io_wq_is_hashed(work) && work == wqe->hash_tail[hash]) {
		if (prev)
			prev_work = container_of(prev, struct io_wq_work, list);
		if (prev_work && io_get_work_hash(prev_work) == hash)
			wqe->hash_tail[hash] = prev_work;
		else
			wqe->hash_tail[hash] = NULL;
	}
933
	wq_list_del(&acct->work_list, &work->list, prev);
934 935
}

936 937 938 939
static bool io_acct_cancel_pending_work(struct io_wqe *wqe,
					struct io_wqe_acct *acct,
					struct io_cb_cancel_data *match)
	__releases(wqe->lock)
940
{
J
Jens Axboe 已提交
941
	struct io_wq_work_node *node, *prev;
942 943
	struct io_wq_work *work;

944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962
	wq_list_for_each(node, prev, &acct->work_list) {
		work = container_of(node, struct io_wq_work, list);
		if (!match->fn(work, match->data))
			continue;
		io_wqe_remove_pending(wqe, work, prev);
		raw_spin_unlock(&wqe->lock);
		io_run_cancel(work, wqe);
		match->nr_pending++;
		/* not safe to continue after unlock */
		return true;
	}

	return false;
}

static void io_wqe_cancel_pending_work(struct io_wqe *wqe,
				       struct io_cb_cancel_data *match)
{
	int i;
963
retry:
964
	raw_spin_lock(&wqe->lock);
965 966
	for (i = 0; i < IO_WQ_ACCT_NR; i++) {
		struct io_wqe_acct *acct = io_get_acct(wqe, i == 0);
967

968 969 970 971
		if (io_acct_cancel_pending_work(wqe, acct, match)) {
			if (match->cancel_all)
				goto retry;
			return;
972
		}
973
	}
974
	raw_spin_unlock(&wqe->lock);
975 976
}

977
static void io_wqe_cancel_running_work(struct io_wqe *wqe,
978 979
				       struct io_cb_cancel_data *match)
{
980
	rcu_read_lock();
981
	io_wq_for_each_worker(wqe, io_wq_worker_cancel, match);
982 983 984
	rcu_read_unlock();
}

985
enum io_wq_cancel io_wq_cancel_cb(struct io_wq *wq, work_cancel_fn *cancel,
986
				  void *data, bool cancel_all)
987
{
988
	struct io_cb_cancel_data match = {
989 990 991
		.fn		= cancel,
		.data		= data,
		.cancel_all	= cancel_all,
992
	};
J
Jann Horn 已提交
993
	int node;
994

995 996 997 998 999
	/*
	 * First check pending list, if we're lucky we can just remove it
	 * from there. CANCEL_OK means that the work is returned as-new,
	 * no completion will be posted for it.
	 */
J
Jann Horn 已提交
1000 1001
	for_each_node(node) {
		struct io_wqe *wqe = wq->wqes[node];
1002

1003 1004
		io_wqe_cancel_pending_work(wqe, &match);
		if (match.nr_pending && !match.cancel_all)
1005
			return IO_WQ_CANCEL_OK;
1006 1007
	}

1008 1009 1010 1011 1012 1013 1014 1015 1016
	/*
	 * Now check if a free (going busy) or busy worker has the work
	 * currently running. If we find it there, we'll return CANCEL_RUNNING
	 * as an indication that we attempt to signal cancellation. The
	 * completion will run normally in this case.
	 */
	for_each_node(node) {
		struct io_wqe *wqe = wq->wqes[node];

1017 1018
		io_wqe_cancel_running_work(wqe, &match);
		if (match.nr_running && !match.cancel_all)
1019 1020 1021
			return IO_WQ_CANCEL_RUNNING;
	}

1022 1023 1024 1025
	if (match.nr_running)
		return IO_WQ_CANCEL_RUNNING;
	if (match.nr_pending)
		return IO_WQ_CANCEL_OK;
1026
	return IO_WQ_CANCEL_NOTFOUND;
1027 1028
}

1029 1030 1031 1032
static int io_wqe_hash_wake(struct wait_queue_entry *wait, unsigned mode,
			    int sync, void *key)
{
	struct io_wqe *wqe = container_of(wait, struct io_wqe, wait);
1033
	int i;
1034 1035 1036 1037

	list_del_init(&wait->entry);

	rcu_read_lock();
1038 1039 1040 1041 1042 1043
	for (i = 0; i < IO_WQ_ACCT_NR; i++) {
		struct io_wqe_acct *acct = &wqe->acct[i];

		if (test_and_clear_bit(IO_ACCT_STALLED_BIT, &acct->flags))
			io_wqe_activate_free_worker(wqe, acct);
	}
1044 1045 1046 1047
	rcu_read_unlock();
	return 1;
}

1048
struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data)
1049
{
1050
	int ret, node, i;
1051 1052
	struct io_wq *wq;

1053
	if (WARN_ON_ONCE(!data->free_work || !data->do_work))
1054
		return ERR_PTR(-EINVAL);
P
Pavel Begunkov 已提交
1055 1056
	if (WARN_ON_ONCE(!bounded))
		return ERR_PTR(-EINVAL);
1057

1058
	wq = kzalloc(struct_size(wq, wqes, nr_node_ids), GFP_KERNEL);
1059 1060
	if (!wq)
		return ERR_PTR(-ENOMEM);
1061 1062
	ret = cpuhp_state_add_instance_nocalls(io_wq_online, &wq->cpuhp_node);
	if (ret)
1063
		goto err_wq;
1064

1065 1066
	refcount_inc(&data->hash->refs);
	wq->hash = data->hash;
1067
	wq->free_work = data->free_work;
1068
	wq->do_work = data->do_work;
1069

1070
	ret = -ENOMEM;
J
Jann Horn 已提交
1071
	for_each_node(node) {
1072
		struct io_wqe *wqe;
1073
		int alloc_node = node;
1074

1075 1076 1077
		if (!node_online(alloc_node))
			alloc_node = NUMA_NO_NODE;
		wqe = kzalloc_node(sizeof(struct io_wqe), GFP_KERNEL, alloc_node);
1078
		if (!wqe)
J
Jann Horn 已提交
1079
			goto err;
J
Jens Axboe 已提交
1080 1081 1082
		if (!alloc_cpumask_var(&wqe->cpu_mask, GFP_KERNEL))
			goto err;
		cpumask_copy(wqe->cpu_mask, cpumask_of_node(node));
J
Jann Horn 已提交
1083
		wq->wqes[node] = wqe;
1084
		wqe->node = alloc_node;
1085
		wqe->acct[IO_WQ_ACCT_BOUND].max_workers = bounded;
J
Jens Axboe 已提交
1086
		wqe->acct[IO_WQ_ACCT_UNBOUND].max_workers =
1087
					task_rlimit(current, RLIMIT_NPROC);
1088
		INIT_LIST_HEAD(&wqe->wait.entry);
1089 1090 1091 1092 1093 1094 1095 1096
		wqe->wait.func = io_wqe_hash_wake;
		for (i = 0; i < IO_WQ_ACCT_NR; i++) {
			struct io_wqe_acct *acct = &wqe->acct[i];

			acct->index = i;
			atomic_set(&acct->nr_running, 0);
			INIT_WQ_LIST(&acct->work_list);
		}
1097
		wqe->wq = wq;
1098
		raw_spin_lock_init(&wqe->lock);
1099
		INIT_HLIST_NULLS_HEAD(&wqe->free_list, 0);
1100
		INIT_LIST_HEAD(&wqe->all_list);
1101 1102
	}

1103 1104 1105 1106
	wq->task = get_task_struct(data->task);
	atomic_set(&wq->worker_refs, 1);
	init_completion(&wq->worker_done);
	return wq;
1107
err:
1108
	io_wq_put_hash(data->hash);
1109
	cpuhp_state_remove_instance_nocalls(io_wq_online, &wq->cpuhp_node);
J
Jens Axboe 已提交
1110 1111 1112 1113
	for_each_node(node) {
		if (!wq->wqes[node])
			continue;
		free_cpumask_var(wq->wqes[node]->cpu_mask);
J
Jann Horn 已提交
1114
		kfree(wq->wqes[node]);
J
Jens Axboe 已提交
1115
	}
1116
err_wq:
1117
	kfree(wq);
1118 1119 1120
	return ERR_PTR(ret);
}

1121 1122
static bool io_task_work_match(struct callback_head *cb, void *data)
{
1123
	struct io_worker *worker;
1124

1125
	if (cb->func != create_worker_cb || cb->func != create_worker_cont)
1126
		return false;
1127 1128
	worker = container_of(cb, struct io_worker, create_work);
	return worker->wqe->wq == data;
1129 1130
}

1131 1132 1133 1134 1135
void io_wq_exit_start(struct io_wq *wq)
{
	set_bit(IO_WQ_BIT_EXIT, &wq->state);
}

1136
static void io_wq_exit_workers(struct io_wq *wq)
1137
{
1138 1139 1140 1141 1142 1143
	struct callback_head *cb;
	int node;

	if (!wq->task)
		return;

1144
	while ((cb = task_work_cancel_match(wq->task, io_task_work_match, wq)) != NULL) {
1145
		struct io_worker *worker;
1146

1147 1148
		worker = container_of(cb, struct io_worker, create_work);
		atomic_dec(&worker->wqe->acct[worker->create_index].nr_running);
1149
		io_worker_ref_put(wq);
1150 1151
		clear_bit_unlock(0, &worker->create_state);
		io_worker_release(worker);
1152 1153 1154 1155 1156 1157 1158
	}

	rcu_read_lock();
	for_each_node(node) {
		struct io_wqe *wqe = wq->wqes[node];

		io_wq_for_each_worker(wqe, io_wq_worker_wake, NULL);
1159
	}
1160 1161 1162
	rcu_read_unlock();
	io_worker_ref_put(wq);
	wait_for_completion(&wq->worker_done);
1163 1164 1165 1166 1167 1168

	for_each_node(node) {
		spin_lock_irq(&wq->hash->wait.lock);
		list_del_init(&wq->wqes[node]->wait.entry);
		spin_unlock_irq(&wq->hash->wait.lock);
	}
1169 1170
	put_task_struct(wq->task);
	wq->task = NULL;
1171 1172
}

1173
static void io_wq_destroy(struct io_wq *wq)
1174
{
J
Jann Horn 已提交
1175
	int node;
1176

1177 1178
	cpuhp_state_remove_instance_nocalls(io_wq_online, &wq->cpuhp_node);

1179 1180
	for_each_node(node) {
		struct io_wqe *wqe = wq->wqes[node];
1181 1182 1183 1184 1185
		struct io_cb_cancel_data match = {
			.fn		= io_wq_work_match_all,
			.cancel_all	= true,
		};
		io_wqe_cancel_pending_work(wqe, &match);
J
Jens Axboe 已提交
1186
		free_cpumask_var(wqe->cpu_mask);
1187 1188 1189
		kfree(wqe);
	}
	io_wq_put_hash(wq->hash);
1190
	kfree(wq);
1191 1192
}

1193 1194
void io_wq_put_and_exit(struct io_wq *wq)
{
1195 1196
	WARN_ON_ONCE(!test_bit(IO_WQ_BIT_EXIT, &wq->state));

1197
	io_wq_exit_workers(wq);
1198
	io_wq_destroy(wq);
1199 1200
}

J
Jens Axboe 已提交
1201 1202 1203 1204 1205
struct online_data {
	unsigned int cpu;
	bool online;
};

1206 1207
static bool io_wq_worker_affinity(struct io_worker *worker, void *data)
{
J
Jens Axboe 已提交
1208
	struct online_data *od = data;
1209

J
Jens Axboe 已提交
1210 1211 1212 1213
	if (od->online)
		cpumask_set_cpu(od->cpu, worker->wqe->cpu_mask);
	else
		cpumask_clear_cpu(od->cpu, worker->wqe->cpu_mask);
1214 1215 1216
	return false;
}

J
Jens Axboe 已提交
1217
static int __io_wq_cpu_online(struct io_wq *wq, unsigned int cpu, bool online)
1218
{
J
Jens Axboe 已提交
1219 1220 1221 1222
	struct online_data od = {
		.cpu = cpu,
		.online = online
	};
1223 1224 1225 1226
	int i;

	rcu_read_lock();
	for_each_node(i)
J
Jens Axboe 已提交
1227
		io_wq_for_each_worker(wq->wqes[i], io_wq_worker_affinity, &od);
1228 1229 1230 1231
	rcu_read_unlock();
	return 0;
}

J
Jens Axboe 已提交
1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245
static int io_wq_cpu_online(unsigned int cpu, struct hlist_node *node)
{
	struct io_wq *wq = hlist_entry_safe(node, struct io_wq, cpuhp_node);

	return __io_wq_cpu_online(wq, cpu, true);
}

static int io_wq_cpu_offline(unsigned int cpu, struct hlist_node *node)
{
	struct io_wq *wq = hlist_entry_safe(node, struct io_wq, cpuhp_node);

	return __io_wq_cpu_online(wq, cpu, false);
}

1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 1259 1260 1261 1262
int io_wq_cpu_affinity(struct io_wq *wq, cpumask_var_t mask)
{
	int i;

	rcu_read_lock();
	for_each_node(i) {
		struct io_wqe *wqe = wq->wqes[i];

		if (mask)
			cpumask_copy(wqe->cpu_mask, mask);
		else
			cpumask_copy(wqe->cpu_mask, cpumask_of_node(i));
	}
	rcu_read_unlock();
	return 0;
}

1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274 1275 1276 1277 1278 1279
/*
 * Set max number of unbounded workers, returns old value. If new_count is 0,
 * then just return the old value.
 */
int io_wq_max_workers(struct io_wq *wq, int *new_count)
{
	int i, node, prev = 0;

	for (i = 0; i < 2; i++) {
		if (new_count[i] > task_rlimit(current, RLIMIT_NPROC))
			new_count[i] = task_rlimit(current, RLIMIT_NPROC);
	}

	rcu_read_lock();
	for_each_node(node) {
		struct io_wqe_acct *acct;

1280
		for (i = 0; i < IO_WQ_ACCT_NR; i++) {
1281 1282 1283 1284 1285 1286 1287 1288 1289 1290 1291
			acct = &wq->wqes[node]->acct[i];
			prev = max_t(int, acct->max_workers, prev);
			if (new_count[i])
				acct->max_workers = new_count[i];
			new_count[i] = prev;
		}
	}
	rcu_read_unlock();
	return 0;
}

1292 1293 1294 1295 1296
static __init int io_wq_init(void)
{
	int ret;

	ret = cpuhp_setup_state_multi(CPUHP_AP_ONLINE_DYN, "io-wq/online",
J
Jens Axboe 已提交
1297
					io_wq_cpu_online, io_wq_cpu_offline);
1298 1299 1300 1301 1302 1303
	if (ret < 0)
		return ret;
	io_wq_online = ret;
	return 0;
}
subsys_initcall(io_wq_init);