io-wq.c 30.9 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14
// SPDX-License-Identifier: GPL-2.0
/*
 * Basic worker thread pool for io_uring
 *
 * Copyright (C) 2019 Jens Axboe
 *
 */
#include <linux/kernel.h>
#include <linux/init.h>
#include <linux/errno.h>
#include <linux/sched/signal.h>
#include <linux/percpu.h>
#include <linux/slab.h>
#include <linux/rculist_nulls.h>
15
#include <linux/cpu.h>
16
#include <linux/tracehook.h>
17
#include <uapi/linux/io_uring.h>
18 19 20 21 22 23 24 25 26

#include "io-wq.h"

#define WORKER_IDLE_TIMEOUT	(5 * HZ)

enum {
	IO_WORKER_F_UP		= 1,	/* up and active */
	IO_WORKER_F_RUNNING	= 2,	/* account as running */
	IO_WORKER_F_FREE	= 4,	/* worker on free list */
J
Jens Axboe 已提交
27
	IO_WORKER_F_BOUND	= 8,	/* is doing bounded work */
28 29 30 31 32 33 34
};

enum {
	IO_WQ_BIT_EXIT		= 0,	/* wq exiting */
};

enum {
35
	IO_ACCT_STALLED_BIT	= 0,	/* stalled on hash */
36 37 38 39 40 41 42 43 44
};

/*
 * One for each thread in a wqe pool
 */
struct io_worker {
	refcount_t ref;
	unsigned flags;
	struct hlist_nulls_node nulls_node;
45
	struct list_head all_list;
46 47
	struct task_struct *task;
	struct io_wqe *wqe;
48

49
	struct io_wq_work *cur_work;
50
	spinlock_t lock;
51

52 53
	struct completion ref_done;

54 55 56 57
	unsigned long create_state;
	struct callback_head create_work;
	int create_index;

58 59 60 61
	union {
		struct rcu_head rcu;
		struct work_struct work;
	};
62 63 64 65 66 67 68 69
};

#if BITS_PER_LONG == 64
#define IO_WQ_HASH_ORDER	6
#else
#define IO_WQ_HASH_ORDER	5
#endif

70 71
#define IO_WQ_NR_HASH_BUCKETS	(1u << IO_WQ_HASH_ORDER)

72 73 74
struct io_wqe_acct {
	unsigned nr_workers;
	unsigned max_workers;
75
	int index;
76
	atomic_t nr_running;
77 78
	struct io_wq_work_list work_list;
	unsigned long flags;
79 80 81 82 83
};

enum {
	IO_WQ_ACCT_BOUND,
	IO_WQ_ACCT_UNBOUND,
84
	IO_WQ_ACCT_NR,
85 86
};

87 88 89 90
/*
 * Per-node worker thread pool
 */
struct io_wqe {
91 92
	raw_spinlock_t lock;
	struct io_wqe_acct acct[2];
93 94 95

	int node;

96
	struct hlist_nulls_head free_list;
97
	struct list_head all_list;
98

99 100
	struct wait_queue_entry wait;

101
	struct io_wq *wq;
102
	struct io_wq_work *hash_tail[IO_WQ_NR_HASH_BUCKETS];
J
Jens Axboe 已提交
103 104

	cpumask_var_t cpu_mask;
105 106 107 108 109 110 111 112
};

/*
 * Per io_wq state
  */
struct io_wq {
	unsigned long state;

113
	free_work_fn *free_work;
114
	io_wq_work_fn *do_work;
115

116 117
	struct io_wq_hash *hash;

118 119 120
	atomic_t worker_refs;
	struct completion worker_done;

121
	struct hlist_node cpuhp_node;
122

123
	struct task_struct *task;
124 125

	struct io_wqe *wqes[];
126 127
};

128 129
static enum cpuhp_state io_wq_online;

130 131 132 133 134 135 136 137
struct io_cb_cancel_data {
	work_cancel_fn *fn;
	void *data;
	int nr_running;
	int nr_pending;
	bool cancel_all;
};

138
static bool create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index);
139
static void io_wqe_dec_running(struct io_worker *worker);
140 141 142
static bool io_acct_cancel_pending_work(struct io_wqe *wqe,
					struct io_wqe_acct *acct,
					struct io_cb_cancel_data *match);
143

144 145 146 147 148 149 150 151
static bool io_worker_get(struct io_worker *worker)
{
	return refcount_inc_not_zero(&worker->ref);
}

static void io_worker_release(struct io_worker *worker)
{
	if (refcount_dec_and_test(&worker->ref))
152
		complete(&worker->ref_done);
153 154
}

P
Pavel Begunkov 已提交
155 156 157 158 159
static inline struct io_wqe_acct *io_get_acct(struct io_wqe *wqe, bool bound)
{
	return &wqe->acct[bound ? IO_WQ_ACCT_BOUND : IO_WQ_ACCT_UNBOUND];
}

160 161 162
static inline struct io_wqe_acct *io_work_get_acct(struct io_wqe *wqe,
						   struct io_wq_work *work)
{
P
Pavel Begunkov 已提交
163
	return io_get_acct(wqe, !(work->flags & IO_WQ_WORK_UNBOUND));
164 165
}

166
static inline struct io_wqe_acct *io_wqe_get_acct(struct io_worker *worker)
167
{
P
Pavel Begunkov 已提交
168
	return io_get_acct(worker->wqe, worker->flags & IO_WORKER_F_BOUND);
169 170
}

171 172 173 174 175 176
static void io_worker_ref_put(struct io_wq *wq)
{
	if (atomic_dec_and_test(&wq->worker_refs))
		complete(&wq->worker_done);
}

177 178 179 180
static void io_worker_exit(struct io_worker *worker)
{
	struct io_wqe *wqe = worker->wqe;

181
	io_worker_release(worker);
182
	wait_for_completion(&worker->ref_done);
183

184
	raw_spin_lock(&wqe->lock);
185
	if (worker->flags & IO_WORKER_F_FREE)
186
		hlist_nulls_del_rcu(&worker->nulls_node);
187
	list_del_rcu(&worker->all_list);
188 189 190 191 192
	preempt_disable();
	io_wqe_dec_running(worker);
	worker->flags = 0;
	current->flags &= ~PF_IO_WORKER;
	preempt_enable();
193
	raw_spin_unlock(&wqe->lock);
194

195
	kfree_rcu(worker, rcu);
196
	io_worker_ref_put(wqe->wq);
197
	do_exit(0);
198 199
}

200
static inline bool io_acct_run_queue(struct io_wqe_acct *acct)
201
{
202 203
	if (!wq_list_empty(&acct->work_list) &&
	    !test_bit(IO_ACCT_STALLED_BIT, &acct->flags))
204 205 206 207 208 209
		return true;
	return false;
}

/*
 * Check head of free list for an available worker. If one isn't available,
210
 * caller must create one.
211
 */
212 213
static bool io_wqe_activate_free_worker(struct io_wqe *wqe,
					struct io_wqe_acct *acct)
214 215 216 217 218
	__must_hold(RCU)
{
	struct hlist_nulls_node *n;
	struct io_worker *worker;

219 220 221 222 223 224 225 226
	/*
	 * Iterate free_list and see if we can find an idle worker to
	 * activate. If a given worker is on the free_list but in the process
	 * of exiting, keep trying.
	 */
	hlist_nulls_for_each_entry_rcu(worker, n, &wqe->free_list, nulls_node) {
		if (!io_worker_get(worker))
			continue;
227 228 229 230
		if (io_wqe_get_acct(worker) != acct) {
			io_worker_release(worker);
			continue;
		}
231 232 233 234
		if (wake_up_process(worker->task)) {
			io_worker_release(worker);
			return true;
		}
235 236 237 238 239 240 241 242
		io_worker_release(worker);
	}

	return false;
}

/*
 * We need a worker. If we find a free one, we're good. If not, and we're
243
 * below the max number of workers, create one.
244
 */
245
static bool io_wqe_create_worker(struct io_wqe *wqe, struct io_wqe_acct *acct)
246 247 248 249 250
{
	/*
	 * Most likely an attempt to queue unbounded work on an io_wq that
	 * wasn't setup with any unbounded workers.
	 */
P
Pavel Begunkov 已提交
251 252
	if (unlikely(!acct->max_workers))
		pr_warn_once("io-wq is not configured for unbound workers");
253

254
	raw_spin_lock(&wqe->lock);
255 256 257
	if (acct->nr_workers == acct->max_workers) {
		raw_spin_unlock(&wqe->lock);
		return true;
258
	}
259
	acct->nr_workers++;
260
	raw_spin_unlock(&wqe->lock);
261 262 263
	atomic_inc(&acct->nr_running);
	atomic_inc(&wqe->wq->worker_refs);
	return create_io_worker(wqe->wq, wqe, acct->index);
264 265
}

266
static void io_wqe_inc_running(struct io_worker *worker)
267
{
268
	struct io_wqe_acct *acct = io_wqe_get_acct(worker);
269 270 271 272

	atomic_inc(&acct->nr_running);
}

273 274
static void create_worker_cb(struct callback_head *cb)
{
275
	struct io_worker *worker;
276
	struct io_wq *wq;
277 278
	struct io_wqe *wqe;
	struct io_wqe_acct *acct;
J
Jens Axboe 已提交
279
	bool do_create = false;
280

281 282
	worker = container_of(cb, struct io_worker, create_work);
	wqe = worker->wqe;
283
	wq = wqe->wq;
284
	acct = &wqe->acct[worker->create_index];
285
	raw_spin_lock(&wqe->lock);
286
	if (acct->nr_workers < acct->max_workers) {
287
		acct->nr_workers++;
288 289
		do_create = true;
	}
290
	raw_spin_unlock(&wqe->lock);
291
	if (do_create) {
J
Jens Axboe 已提交
292
		create_io_worker(wq, wqe, worker->create_index);
293 294 295 296
	} else {
		atomic_dec(&acct->nr_running);
		io_worker_ref_put(wq);
	}
297 298
	clear_bit_unlock(0, &worker->create_state);
	io_worker_release(worker);
299 300
}

301 302 303
static bool io_queue_worker_create(struct io_worker *worker,
				   struct io_wqe_acct *acct,
				   task_work_func_t func)
304
{
305
	struct io_wqe *wqe = worker->wqe;
306 307 308 309 310
	struct io_wq *wq = wqe->wq;

	/* raced with exit, just ignore create call */
	if (test_bit(IO_WQ_BIT_EXIT, &wq->state))
		goto fail;
311 312 313 314 315 316 317 318 319 320 321
	if (!io_worker_get(worker))
		goto fail;
	/*
	 * create_state manages ownership of create_work/index. We should
	 * only need one entry per worker, as the worker going to sleep
	 * will trigger the condition, and waking will clear it once it
	 * runs the task_work.
	 */
	if (test_bit(0, &worker->create_state) ||
	    test_and_set_bit_lock(0, &worker->create_state))
		goto fail_release;
322

323
	init_task_work(&worker->create_work, func);
324
	worker->create_index = acct->index;
325 326
	if (!task_work_add(wq->task, &worker->create_work, TWA_SIGNAL)) {
		clear_bit_unlock(0, &worker->create_state);
327
		return true;
328
	}
329 330 331
	clear_bit_unlock(0, &worker->create_state);
fail_release:
	io_worker_release(worker);
332 333 334
fail:
	atomic_dec(&acct->nr_running);
	io_worker_ref_put(wq);
335
	return false;
336 337
}

338
static void io_wqe_dec_running(struct io_worker *worker)
339 340
	__must_hold(wqe->lock)
{
341 342
	struct io_wqe_acct *acct = io_wqe_get_acct(worker);
	struct io_wqe *wqe = worker->wqe;
343

344 345 346
	if (!(worker->flags & IO_WORKER_F_UP))
		return;

347
	if (atomic_dec_and_test(&acct->nr_running) && io_acct_run_queue(acct)) {
348 349
		atomic_inc(&acct->nr_running);
		atomic_inc(&wqe->wq->worker_refs);
350
		io_queue_worker_create(worker, acct, create_worker_cb);
351
	}
352 353
}

354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374
/*
 * Worker will start processing some work. Move it to the busy list, if
 * it's currently on the freelist
 */
static void __io_worker_busy(struct io_wqe *wqe, struct io_worker *worker,
			     struct io_wq_work *work)
	__must_hold(wqe->lock)
{
	if (worker->flags & IO_WORKER_F_FREE) {
		worker->flags &= ~IO_WORKER_F_FREE;
		hlist_nulls_del_init_rcu(&worker->nulls_node);
	}
}

/*
 * No work, worker going to sleep. Move to freelist, and unuse mm if we
 * have one attached. Dropping the mm may potentially sleep, so we drop
 * the lock in that case and return success. Since the caller has to
 * retry the loop in that case (we changed task state), we don't regrab
 * the lock if we return success.
 */
375
static void __io_worker_idle(struct io_wqe *wqe, struct io_worker *worker)
376 377 378 379
	__must_hold(wqe->lock)
{
	if (!(worker->flags & IO_WORKER_F_FREE)) {
		worker->flags |= IO_WORKER_F_FREE;
380
		hlist_nulls_add_head_rcu(&worker->nulls_node, &wqe->free_list);
381 382 383
	}
}

P
Pavel Begunkov 已提交
384 385 386 387 388
static inline unsigned int io_get_work_hash(struct io_wq_work *work)
{
	return work->flags >> IO_WQ_HASH_SHIFT;
}

389 390 391 392
static void io_wait_on_hash(struct io_wqe *wqe, unsigned int hash)
{
	struct io_wq *wq = wqe->wq;

393
	spin_lock_irq(&wq->hash->wait.lock);
394 395 396 397 398 399 400
	if (list_empty(&wqe->wait.entry)) {
		__add_wait_queue(&wq->hash->wait, &wqe->wait);
		if (!test_bit(hash, &wq->hash->map)) {
			__set_current_state(TASK_RUNNING);
			list_del_init(&wqe->wait.entry);
		}
	}
401
	spin_unlock_irq(&wq->hash->wait.lock);
402 403
}

404
static struct io_wq_work *io_get_next_work(struct io_wqe_acct *acct,
J
Jens Axboe 已提交
405
					   struct io_worker *worker)
406 407
	__must_hold(wqe->lock)
{
J
Jens Axboe 已提交
408
	struct io_wq_work_node *node, *prev;
409
	struct io_wq_work *work, *tail;
410
	unsigned int stall_hash = -1U;
411
	struct io_wqe *wqe = worker->wqe;
412

413
	wq_list_for_each(node, prev, &acct->work_list) {
414 415
		unsigned int hash;

J
Jens Axboe 已提交
416 417
		work = container_of(node, struct io_wq_work, list);

418
		/* not hashed, can run anytime */
419
		if (!io_wq_is_hashed(work)) {
420
			wq_list_del(&acct->work_list, node, prev);
421 422 423
			return work;
		}

P
Pavel Begunkov 已提交
424
		hash = io_get_work_hash(work);
425 426 427 428 429
		/* all items with this hash lie in [work, tail] */
		tail = wqe->hash_tail[hash];

		/* hashed, can run if not already running */
		if (!test_and_set_bit(hash, &wqe->wq->hash->map)) {
430
			wqe->hash_tail[hash] = NULL;
431
			wq_list_cut(&acct->work_list, &tail->list, prev);
432 433
			return work;
		}
434 435 436 437 438 439 440
		if (stall_hash == -1U)
			stall_hash = hash;
		/* fast forward to a next hash, for-each will fix up @prev */
		node = &tail->list;
	}

	if (stall_hash != -1U) {
J
Jens Axboe 已提交
441 442 443 444
		/*
		 * Set this before dropping the lock to avoid racing with new
		 * work being added and clearing the stalled bit.
		 */
445
		set_bit(IO_ACCT_STALLED_BIT, &acct->flags);
446 447 448
		raw_spin_unlock(&wqe->lock);
		io_wait_on_hash(wqe, stall_hash);
		raw_spin_lock(&wqe->lock);
449 450 451 452 453
	}

	return NULL;
}

454
static bool io_flush_signals(void)
455
{
456
	if (unlikely(test_thread_flag(TIF_NOTIFY_SIGNAL))) {
457
		__set_current_state(TASK_RUNNING);
458
		tracehook_notify_signal();
459
		return true;
460
	}
461
	return false;
462 463 464 465 466
}

static void io_assign_current_work(struct io_worker *worker,
				   struct io_wq_work *work)
{
467
	if (work) {
468
		io_flush_signals();
469 470
		cond_resched();
	}
471

472
	spin_lock(&worker->lock);
473
	worker->cur_work = work;
474
	spin_unlock(&worker->lock);
475 476
}

P
Pavel Begunkov 已提交
477 478
static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work);

479 480 481
static void io_worker_handle_work(struct io_worker *worker)
	__releases(wqe->lock)
{
482
	struct io_wqe_acct *acct = io_wqe_get_acct(worker);
483 484
	struct io_wqe *wqe = worker->wqe;
	struct io_wq *wq = wqe->wq;
485
	bool do_kill = test_bit(IO_WQ_BIT_EXIT, &wq->state);
486 487

	do {
488
		struct io_wq_work *work;
489
get_next:
490 491 492 493 494 495 496
		/*
		 * If we got some work, mark us as busy. If we didn't, but
		 * the list isn't empty, it means we stalled on hashed work.
		 * Mark us stalled so we don't keep looking for work when we
		 * can't make progress, any work completion or insertion will
		 * clear the stalled flag.
		 */
497
		work = io_get_next_work(acct, worker);
498 499 500
		if (work)
			__io_worker_busy(wqe, worker, work);

501
		raw_spin_unlock(&wqe->lock);
502 503
		if (!work)
			break;
504
		io_assign_current_work(worker, work);
505
		__set_current_state(TASK_RUNNING);
506

507 508
		/* handle a whole dependent link */
		do {
509
			struct io_wq_work *next_hashed, *linked;
P
Pavel Begunkov 已提交
510
			unsigned int hash = io_get_work_hash(work);
511

512
			next_hashed = wq_next_work(work);
513 514 515

			if (unlikely(do_kill) && (work->flags & IO_WQ_WORK_UNBOUND))
				work->flags |= IO_WQ_WORK_CANCEL;
516 517
			wq->do_work(work);
			io_assign_current_work(worker, NULL);
518

519
			linked = wq->free_work(work);
520 521 522 523 524 525 526 527 528 529
			work = next_hashed;
			if (!work && linked && !io_wq_is_hashed(linked)) {
				work = linked;
				linked = NULL;
			}
			io_assign_current_work(worker, work);
			if (linked)
				io_wqe_enqueue(wqe, linked);

			if (hash != -1U && !next_hashed) {
530
				clear_bit(hash, &wq->hash->map);
531
				clear_bit(IO_ACCT_STALLED_BIT, &acct->flags);
532 533
				if (wq_has_sleeper(&wq->hash->wait))
					wake_up(&wq->hash->wait);
534
				raw_spin_lock(&wqe->lock);
535 536 537
				/* skip unnecessary unlock-lock wqe->lock */
				if (!work)
					goto get_next;
538
				raw_spin_unlock(&wqe->lock);
539
			}
540
		} while (work);
541

542
		raw_spin_lock(&wqe->lock);
543 544 545 546 547 548
	} while (1);
}

static int io_wqe_worker(void *data)
{
	struct io_worker *worker = data;
549
	struct io_wqe_acct *acct = io_wqe_get_acct(worker);
550 551
	struct io_wqe *wqe = worker->wqe;
	struct io_wq *wq = wqe->wq;
J
Jens Axboe 已提交
552
	bool last_timeout = false;
553
	char buf[TASK_COMM_LEN];
554

555 556
	worker->flags |= (IO_WORKER_F_UP | IO_WORKER_F_RUNNING);

557
	snprintf(buf, sizeof(buf), "iou-wrk-%d", wq->task->pid);
558
	set_task_comm(current, buf);
559 560

	while (!test_bit(IO_WQ_BIT_EXIT, &wq->state)) {
561 562
		long ret;

J
Jens Axboe 已提交
563
		set_current_state(TASK_INTERRUPTIBLE);
564
loop:
565
		raw_spin_lock(&wqe->lock);
566
		if (io_acct_run_queue(acct)) {
567
			io_worker_handle_work(worker);
568
			goto loop;
569
		}
J
Jens Axboe 已提交
570 571
		/* timed out, exit unless we're the last worker */
		if (last_timeout && acct->nr_workers > 1) {
572
			acct->nr_workers--;
J
Jens Axboe 已提交
573 574 575 576 577
			raw_spin_unlock(&wqe->lock);
			__set_current_state(TASK_RUNNING);
			break;
		}
		last_timeout = false;
578
		__io_worker_idle(wqe, worker);
579
		raw_spin_unlock(&wqe->lock);
580 581
		if (io_flush_signals())
			continue;
582
		ret = schedule_timeout(WORKER_IDLE_TIMEOUT);
583 584 585 586 587
		if (signal_pending(current)) {
			struct ksignal ksig;

			if (!get_signal(&ksig))
				continue;
588
			break;
589
		}
J
Jens Axboe 已提交
590
		last_timeout = !ret;
591 592 593
	}

	if (test_bit(IO_WQ_BIT_EXIT, &wq->state)) {
594
		raw_spin_lock(&wqe->lock);
P
Pavel Begunkov 已提交
595
		io_worker_handle_work(worker);
596 597 598 599 600 601 602 603 604 605 606
	}

	io_worker_exit(worker);
	return 0;
}

/*
 * Called when a worker is scheduled in. Mark us as currently running.
 */
void io_wq_worker_running(struct task_struct *tsk)
{
607
	struct io_worker *worker = tsk->pf_io_worker;
608

609 610
	if (!worker)
		return;
611 612 613 614 615
	if (!(worker->flags & IO_WORKER_F_UP))
		return;
	if (worker->flags & IO_WORKER_F_RUNNING)
		return;
	worker->flags |= IO_WORKER_F_RUNNING;
616
	io_wqe_inc_running(worker);
617 618 619 620
}

/*
 * Called when worker is going to sleep. If there are no workers currently
621
 * running and we have work pending, wake up a free one or create a new one.
622 623 624
 */
void io_wq_worker_sleeping(struct task_struct *tsk)
{
625
	struct io_worker *worker = tsk->pf_io_worker;
626

627 628
	if (!worker)
		return;
629 630 631 632 633 634 635
	if (!(worker->flags & IO_WORKER_F_UP))
		return;
	if (!(worker->flags & IO_WORKER_F_RUNNING))
		return;

	worker->flags &= ~IO_WORKER_F_RUNNING;

636
	raw_spin_lock(&worker->wqe->lock);
637
	io_wqe_dec_running(worker);
638
	raw_spin_unlock(&worker->wqe->lock);
639 640
}

641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705
static void io_init_new_worker(struct io_wqe *wqe, struct io_worker *worker,
			       struct task_struct *tsk)
{
	tsk->pf_io_worker = worker;
	worker->task = tsk;
	set_cpus_allowed_ptr(tsk, wqe->cpu_mask);
	tsk->flags |= PF_NO_SETAFFINITY;

	raw_spin_lock(&wqe->lock);
	hlist_nulls_add_head_rcu(&worker->nulls_node, &wqe->free_list);
	list_add_tail_rcu(&worker->all_list, &wqe->all_list);
	worker->flags |= IO_WORKER_F_FREE;
	raw_spin_unlock(&wqe->lock);
	wake_up_new_task(tsk);
}

static bool io_wq_work_match_all(struct io_wq_work *work, void *data)
{
	return true;
}

static inline bool io_should_retry_thread(long err)
{
	switch (err) {
	case -EAGAIN:
	case -ERESTARTSYS:
	case -ERESTARTNOINTR:
	case -ERESTARTNOHAND:
		return true;
	default:
		return false;
	}
}

static void create_worker_cont(struct callback_head *cb)
{
	struct io_worker *worker;
	struct task_struct *tsk;
	struct io_wqe *wqe;

	worker = container_of(cb, struct io_worker, create_work);
	clear_bit_unlock(0, &worker->create_state);
	wqe = worker->wqe;
	tsk = create_io_thread(io_wqe_worker, worker, wqe->node);
	if (!IS_ERR(tsk)) {
		io_init_new_worker(wqe, worker, tsk);
		io_worker_release(worker);
		return;
	} else if (!io_should_retry_thread(PTR_ERR(tsk))) {
		struct io_wqe_acct *acct = io_wqe_get_acct(worker);

		atomic_dec(&acct->nr_running);
		raw_spin_lock(&wqe->lock);
		acct->nr_workers--;
		if (!acct->nr_workers) {
			struct io_cb_cancel_data match = {
				.fn		= io_wq_work_match_all,
				.cancel_all	= true,
			};

			while (io_acct_cancel_pending_work(wqe, acct, &match))
				raw_spin_lock(&wqe->lock);
		}
		raw_spin_unlock(&wqe->lock);
		io_worker_ref_put(wqe->wq);
706
		kfree(worker);
707 708 709 710 711 712 713 714 715 716 717 718 719
		return;
	}

	/* re-create attempts grab a new worker ref, drop the existing one */
	io_worker_release(worker);
	schedule_work(&worker->work);
}

static void io_workqueue_create(struct work_struct *work)
{
	struct io_worker *worker = container_of(work, struct io_worker, work);
	struct io_wqe_acct *acct = io_wqe_get_acct(worker);

720
	if (!io_queue_worker_create(worker, acct, create_worker_cont))
721
		kfree(worker);
722 723 724
}

static bool create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index)
725
{
726
	struct io_wqe_acct *acct = &wqe->acct[index];
727
	struct io_worker *worker;
728
	struct task_struct *tsk;
729

730 731
	__set_current_state(TASK_RUNNING);

732
	worker = kzalloc_node(sizeof(*worker), GFP_KERNEL, wqe->node);
733
	if (!worker) {
734 735
fail:
		atomic_dec(&acct->nr_running);
736
		raw_spin_lock(&wqe->lock);
737
		acct->nr_workers--;
738
		raw_spin_unlock(&wqe->lock);
739
		io_worker_ref_put(wq);
740
		return false;
741
	}
742

743 744 745 746
	refcount_set(&worker->ref, 1);
	worker->wqe = wqe;
	spin_lock_init(&worker->lock);
	init_completion(&worker->ref_done);
747 748 749

	if (index == IO_WQ_ACCT_BOUND)
		worker->flags |= IO_WORKER_F_BOUND;
750 751 752 753 754

	tsk = create_io_thread(io_wqe_worker, worker, wqe->node);
	if (!IS_ERR(tsk)) {
		io_init_new_worker(wqe, worker, tsk);
	} else if (!io_should_retry_thread(PTR_ERR(tsk))) {
755
		kfree(worker);
756 757 758 759 760 761 762
		goto fail;
	} else {
		INIT_WORK(&worker->work, io_workqueue_create);
		schedule_work(&worker->work);
	}

	return true;
763 764
}

765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791
/*
 * Iterate the passed in list and call the specific function for each
 * worker that isn't exiting
 */
static bool io_wq_for_each_worker(struct io_wqe *wqe,
				  bool (*func)(struct io_worker *, void *),
				  void *data)
{
	struct io_worker *worker;
	bool ret = false;

	list_for_each_entry_rcu(worker, &wqe->all_list, all_list) {
		if (io_worker_get(worker)) {
			/* no task if node is/was offline */
			if (worker->task)
				ret = func(worker, data);
			io_worker_release(worker);
			if (ret)
				break;
		}
	}

	return ret;
}

static bool io_wq_worker_wake(struct io_worker *worker, void *data)
{
792
	set_notify_signal(worker->task);
793 794 795 796
	wake_up_process(worker->task);
	return false;
}

797
static void io_run_cancel(struct io_wq_work *work, struct io_wqe *wqe)
798
{
799 800
	struct io_wq *wq = wqe->wq;

801 802
	do {
		work->flags |= IO_WQ_WORK_CANCEL;
803 804
		wq->do_work(work);
		work = wq->free_work(work);
805 806 807
	} while (work);
}

808 809
static void io_wqe_insert_work(struct io_wqe *wqe, struct io_wq_work *work)
{
810
	struct io_wqe_acct *acct = io_work_get_acct(wqe, work);
811 812 813 814 815
	unsigned int hash;
	struct io_wq_work *tail;

	if (!io_wq_is_hashed(work)) {
append:
816
		wq_list_add_tail(&work->list, &acct->work_list);
817 818 819 820 821 822 823 824 825
		return;
	}

	hash = io_get_work_hash(work);
	tail = wqe->hash_tail[hash];
	wqe->hash_tail[hash] = work;
	if (!tail)
		goto append;

826
	wq_list_add_after(&work->list, &tail->list, &acct->work_list);
827 828
}

829 830 831 832 833
static bool io_wq_work_match_item(struct io_wq_work *work, void *data)
{
	return work == data;
}

834 835
static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work)
{
836
	struct io_wqe_acct *acct = io_work_get_acct(wqe, work);
837 838
	unsigned work_flags = work->flags;
	bool do_create;
839

840 841 842 843 844 845
	/*
	 * If io-wq is exiting for this task, or if the request has explicitly
	 * been marked as one that should not get executed, cancel it here.
	 */
	if (test_bit(IO_WQ_BIT_EXIT, &wqe->wq->state) ||
	    (work->flags & IO_WQ_WORK_CANCEL)) {
846
		io_run_cancel(work, wqe);
847 848 849
		return;
	}

850
	raw_spin_lock(&wqe->lock);
851
	io_wqe_insert_work(wqe, work);
852
	clear_bit(IO_ACCT_STALLED_BIT, &acct->flags);
853 854

	rcu_read_lock();
855
	do_create = !io_wqe_activate_free_worker(wqe, acct);
856 857
	rcu_read_unlock();

858
	raw_spin_unlock(&wqe->lock);
859

860
	if (do_create && ((work_flags & IO_WQ_WORK_CONCURRENT) ||
861 862 863 864
	    !atomic_read(&acct->nr_running))) {
		bool did_create;

		did_create = io_wqe_create_worker(wqe, acct);
865 866 867 868 869 870 871 872 873 874 875 876 877 878
		if (likely(did_create))
			return;

		raw_spin_lock(&wqe->lock);
		/* fatal condition, failed to create the first worker */
		if (!acct->nr_workers) {
			struct io_cb_cancel_data match = {
				.fn		= io_wq_work_match_item,
				.data		= work,
				.cancel_all	= false,
			};

			if (io_acct_cancel_pending_work(wqe, acct, &match))
				raw_spin_lock(&wqe->lock);
879
		}
880
		raw_spin_unlock(&wqe->lock);
881
	}
882 883 884 885 886 887 888 889 890 891
}

void io_wq_enqueue(struct io_wq *wq, struct io_wq_work *work)
{
	struct io_wqe *wqe = wq->wqes[numa_node_id()];

	io_wqe_enqueue(wqe, work);
}

/*
892 893
 * Work items that hash to the same value will not be done in parallel.
 * Used to limit concurrent writes, generally hashed by inode.
894
 */
895
void io_wq_hash_work(struct io_wq_work *work, void *val)
896
{
897
	unsigned int bit;
898 899 900 901 902

	bit = hash_ptr(val, IO_WQ_HASH_ORDER);
	work->flags |= (IO_WQ_WORK_HASHED | (bit << IO_WQ_HASH_SHIFT));
}

903
static bool io_wq_worker_cancel(struct io_worker *worker, void *data)
904
{
905
	struct io_cb_cancel_data *match = data;
906 907 908

	/*
	 * Hold the lock to avoid ->cur_work going out of scope, caller
909
	 * may dereference the passed in work.
910
	 */
911
	spin_lock(&worker->lock);
912
	if (worker->cur_work &&
913
	    match->fn(worker->cur_work, match->data)) {
914
		set_notify_signal(worker->task);
915
		match->nr_running++;
916
	}
917
	spin_unlock(&worker->lock);
918

919
	return match->nr_running && !match->cancel_all;
920 921
}

922 923 924 925
static inline void io_wqe_remove_pending(struct io_wqe *wqe,
					 struct io_wq_work *work,
					 struct io_wq_work_node *prev)
{
926
	struct io_wqe_acct *acct = io_work_get_acct(wqe, work);
927 928 929 930 931 932 933 934 935 936 937
	unsigned int hash = io_get_work_hash(work);
	struct io_wq_work *prev_work = NULL;

	if (io_wq_is_hashed(work) && work == wqe->hash_tail[hash]) {
		if (prev)
			prev_work = container_of(prev, struct io_wq_work, list);
		if (prev_work && io_get_work_hash(prev_work) == hash)
			wqe->hash_tail[hash] = prev_work;
		else
			wqe->hash_tail[hash] = NULL;
	}
938
	wq_list_del(&acct->work_list, &work->list, prev);
939 940
}

941 942 943 944
static bool io_acct_cancel_pending_work(struct io_wqe *wqe,
					struct io_wqe_acct *acct,
					struct io_cb_cancel_data *match)
	__releases(wqe->lock)
945
{
J
Jens Axboe 已提交
946
	struct io_wq_work_node *node, *prev;
947 948
	struct io_wq_work *work;

949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967
	wq_list_for_each(node, prev, &acct->work_list) {
		work = container_of(node, struct io_wq_work, list);
		if (!match->fn(work, match->data))
			continue;
		io_wqe_remove_pending(wqe, work, prev);
		raw_spin_unlock(&wqe->lock);
		io_run_cancel(work, wqe);
		match->nr_pending++;
		/* not safe to continue after unlock */
		return true;
	}

	return false;
}

static void io_wqe_cancel_pending_work(struct io_wqe *wqe,
				       struct io_cb_cancel_data *match)
{
	int i;
968
retry:
969
	raw_spin_lock(&wqe->lock);
970 971
	for (i = 0; i < IO_WQ_ACCT_NR; i++) {
		struct io_wqe_acct *acct = io_get_acct(wqe, i == 0);
972

973 974 975 976
		if (io_acct_cancel_pending_work(wqe, acct, match)) {
			if (match->cancel_all)
				goto retry;
			return;
977
		}
978
	}
979
	raw_spin_unlock(&wqe->lock);
980 981
}

982
static void io_wqe_cancel_running_work(struct io_wqe *wqe,
983 984
				       struct io_cb_cancel_data *match)
{
985
	rcu_read_lock();
986
	io_wq_for_each_worker(wqe, io_wq_worker_cancel, match);
987 988 989
	rcu_read_unlock();
}

990
enum io_wq_cancel io_wq_cancel_cb(struct io_wq *wq, work_cancel_fn *cancel,
991
				  void *data, bool cancel_all)
992
{
993
	struct io_cb_cancel_data match = {
994 995 996
		.fn		= cancel,
		.data		= data,
		.cancel_all	= cancel_all,
997
	};
J
Jann Horn 已提交
998
	int node;
999

1000 1001 1002 1003 1004
	/*
	 * First check pending list, if we're lucky we can just remove it
	 * from there. CANCEL_OK means that the work is returned as-new,
	 * no completion will be posted for it.
	 */
J
Jann Horn 已提交
1005 1006
	for_each_node(node) {
		struct io_wqe *wqe = wq->wqes[node];
1007

1008 1009
		io_wqe_cancel_pending_work(wqe, &match);
		if (match.nr_pending && !match.cancel_all)
1010
			return IO_WQ_CANCEL_OK;
1011 1012
	}

1013 1014 1015 1016 1017 1018 1019 1020 1021
	/*
	 * Now check if a free (going busy) or busy worker has the work
	 * currently running. If we find it there, we'll return CANCEL_RUNNING
	 * as an indication that we attempt to signal cancellation. The
	 * completion will run normally in this case.
	 */
	for_each_node(node) {
		struct io_wqe *wqe = wq->wqes[node];

1022 1023
		io_wqe_cancel_running_work(wqe, &match);
		if (match.nr_running && !match.cancel_all)
1024 1025 1026
			return IO_WQ_CANCEL_RUNNING;
	}

1027 1028 1029 1030
	if (match.nr_running)
		return IO_WQ_CANCEL_RUNNING;
	if (match.nr_pending)
		return IO_WQ_CANCEL_OK;
1031
	return IO_WQ_CANCEL_NOTFOUND;
1032 1033
}

1034 1035 1036 1037
static int io_wqe_hash_wake(struct wait_queue_entry *wait, unsigned mode,
			    int sync, void *key)
{
	struct io_wqe *wqe = container_of(wait, struct io_wqe, wait);
1038
	int i;
1039 1040 1041 1042

	list_del_init(&wait->entry);

	rcu_read_lock();
1043 1044 1045 1046 1047 1048
	for (i = 0; i < IO_WQ_ACCT_NR; i++) {
		struct io_wqe_acct *acct = &wqe->acct[i];

		if (test_and_clear_bit(IO_ACCT_STALLED_BIT, &acct->flags))
			io_wqe_activate_free_worker(wqe, acct);
	}
1049 1050 1051 1052
	rcu_read_unlock();
	return 1;
}

1053
struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data)
1054
{
1055
	int ret, node, i;
1056 1057
	struct io_wq *wq;

1058
	if (WARN_ON_ONCE(!data->free_work || !data->do_work))
1059
		return ERR_PTR(-EINVAL);
P
Pavel Begunkov 已提交
1060 1061
	if (WARN_ON_ONCE(!bounded))
		return ERR_PTR(-EINVAL);
1062

1063
	wq = kzalloc(struct_size(wq, wqes, nr_node_ids), GFP_KERNEL);
1064 1065
	if (!wq)
		return ERR_PTR(-ENOMEM);
1066 1067
	ret = cpuhp_state_add_instance_nocalls(io_wq_online, &wq->cpuhp_node);
	if (ret)
1068
		goto err_wq;
1069

1070 1071
	refcount_inc(&data->hash->refs);
	wq->hash = data->hash;
1072
	wq->free_work = data->free_work;
1073
	wq->do_work = data->do_work;
1074

1075
	ret = -ENOMEM;
J
Jann Horn 已提交
1076
	for_each_node(node) {
1077
		struct io_wqe *wqe;
1078
		int alloc_node = node;
1079

1080 1081 1082
		if (!node_online(alloc_node))
			alloc_node = NUMA_NO_NODE;
		wqe = kzalloc_node(sizeof(struct io_wqe), GFP_KERNEL, alloc_node);
1083
		if (!wqe)
J
Jann Horn 已提交
1084
			goto err;
J
Jens Axboe 已提交
1085 1086 1087
		if (!alloc_cpumask_var(&wqe->cpu_mask, GFP_KERNEL))
			goto err;
		cpumask_copy(wqe->cpu_mask, cpumask_of_node(node));
J
Jann Horn 已提交
1088
		wq->wqes[node] = wqe;
1089
		wqe->node = alloc_node;
1090
		wqe->acct[IO_WQ_ACCT_BOUND].max_workers = bounded;
J
Jens Axboe 已提交
1091
		wqe->acct[IO_WQ_ACCT_UNBOUND].max_workers =
1092
					task_rlimit(current, RLIMIT_NPROC);
1093
		INIT_LIST_HEAD(&wqe->wait.entry);
1094 1095 1096 1097 1098 1099 1100 1101
		wqe->wait.func = io_wqe_hash_wake;
		for (i = 0; i < IO_WQ_ACCT_NR; i++) {
			struct io_wqe_acct *acct = &wqe->acct[i];

			acct->index = i;
			atomic_set(&acct->nr_running, 0);
			INIT_WQ_LIST(&acct->work_list);
		}
1102
		wqe->wq = wq;
1103
		raw_spin_lock_init(&wqe->lock);
1104
		INIT_HLIST_NULLS_HEAD(&wqe->free_list, 0);
1105
		INIT_LIST_HEAD(&wqe->all_list);
1106 1107
	}

1108 1109 1110 1111
	wq->task = get_task_struct(data->task);
	atomic_set(&wq->worker_refs, 1);
	init_completion(&wq->worker_done);
	return wq;
1112
err:
1113
	io_wq_put_hash(data->hash);
1114
	cpuhp_state_remove_instance_nocalls(io_wq_online, &wq->cpuhp_node);
J
Jens Axboe 已提交
1115 1116 1117 1118
	for_each_node(node) {
		if (!wq->wqes[node])
			continue;
		free_cpumask_var(wq->wqes[node]->cpu_mask);
J
Jann Horn 已提交
1119
		kfree(wq->wqes[node]);
J
Jens Axboe 已提交
1120
	}
1121
err_wq:
1122
	kfree(wq);
1123 1124 1125
	return ERR_PTR(ret);
}

1126 1127
static bool io_task_work_match(struct callback_head *cb, void *data)
{
1128
	struct io_worker *worker;
1129

1130
	if (cb->func != create_worker_cb && cb->func != create_worker_cont)
1131
		return false;
1132 1133
	worker = container_of(cb, struct io_worker, create_work);
	return worker->wqe->wq == data;
1134 1135
}

1136 1137 1138 1139 1140
void io_wq_exit_start(struct io_wq *wq)
{
	set_bit(IO_WQ_BIT_EXIT, &wq->state);
}

1141
static void io_wq_exit_workers(struct io_wq *wq)
1142
{
1143 1144 1145 1146 1147 1148
	struct callback_head *cb;
	int node;

	if (!wq->task)
		return;

1149
	while ((cb = task_work_cancel_match(wq->task, io_task_work_match, wq)) != NULL) {
1150
		struct io_worker *worker;
1151
		struct io_wqe_acct *acct;
1152

1153
		worker = container_of(cb, struct io_worker, create_work);
1154 1155 1156 1157 1158
		acct = io_wqe_get_acct(worker);
		atomic_dec(&acct->nr_running);
		raw_spin_lock(&worker->wqe->lock);
		acct->nr_workers--;
		raw_spin_unlock(&worker->wqe->lock);
1159
		io_worker_ref_put(wq);
1160 1161
		clear_bit_unlock(0, &worker->create_state);
		io_worker_release(worker);
1162 1163 1164 1165 1166 1167 1168
	}

	rcu_read_lock();
	for_each_node(node) {
		struct io_wqe *wqe = wq->wqes[node];

		io_wq_for_each_worker(wqe, io_wq_worker_wake, NULL);
1169
	}
1170 1171 1172
	rcu_read_unlock();
	io_worker_ref_put(wq);
	wait_for_completion(&wq->worker_done);
1173 1174 1175 1176 1177 1178

	for_each_node(node) {
		spin_lock_irq(&wq->hash->wait.lock);
		list_del_init(&wq->wqes[node]->wait.entry);
		spin_unlock_irq(&wq->hash->wait.lock);
	}
1179 1180
	put_task_struct(wq->task);
	wq->task = NULL;
1181 1182
}

1183
static void io_wq_destroy(struct io_wq *wq)
1184
{
J
Jann Horn 已提交
1185
	int node;
1186

1187 1188
	cpuhp_state_remove_instance_nocalls(io_wq_online, &wq->cpuhp_node);

1189 1190
	for_each_node(node) {
		struct io_wqe *wqe = wq->wqes[node];
1191 1192 1193 1194 1195
		struct io_cb_cancel_data match = {
			.fn		= io_wq_work_match_all,
			.cancel_all	= true,
		};
		io_wqe_cancel_pending_work(wqe, &match);
J
Jens Axboe 已提交
1196
		free_cpumask_var(wqe->cpu_mask);
1197 1198 1199
		kfree(wqe);
	}
	io_wq_put_hash(wq->hash);
1200
	kfree(wq);
1201 1202
}

1203 1204
void io_wq_put_and_exit(struct io_wq *wq)
{
1205 1206
	WARN_ON_ONCE(!test_bit(IO_WQ_BIT_EXIT, &wq->state));

1207
	io_wq_exit_workers(wq);
1208
	io_wq_destroy(wq);
1209 1210
}

J
Jens Axboe 已提交
1211 1212 1213 1214 1215
struct online_data {
	unsigned int cpu;
	bool online;
};

1216 1217
static bool io_wq_worker_affinity(struct io_worker *worker, void *data)
{
J
Jens Axboe 已提交
1218
	struct online_data *od = data;
1219

J
Jens Axboe 已提交
1220 1221 1222 1223
	if (od->online)
		cpumask_set_cpu(od->cpu, worker->wqe->cpu_mask);
	else
		cpumask_clear_cpu(od->cpu, worker->wqe->cpu_mask);
1224 1225 1226
	return false;
}

J
Jens Axboe 已提交
1227
static int __io_wq_cpu_online(struct io_wq *wq, unsigned int cpu, bool online)
1228
{
J
Jens Axboe 已提交
1229 1230 1231 1232
	struct online_data od = {
		.cpu = cpu,
		.online = online
	};
1233 1234 1235 1236
	int i;

	rcu_read_lock();
	for_each_node(i)
J
Jens Axboe 已提交
1237
		io_wq_for_each_worker(wq->wqes[i], io_wq_worker_affinity, &od);
1238 1239 1240 1241
	rcu_read_unlock();
	return 0;
}

J
Jens Axboe 已提交
1242 1243 1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255
static int io_wq_cpu_online(unsigned int cpu, struct hlist_node *node)
{
	struct io_wq *wq = hlist_entry_safe(node, struct io_wq, cpuhp_node);

	return __io_wq_cpu_online(wq, cpu, true);
}

static int io_wq_cpu_offline(unsigned int cpu, struct hlist_node *node)
{
	struct io_wq *wq = hlist_entry_safe(node, struct io_wq, cpuhp_node);

	return __io_wq_cpu_online(wq, cpu, false);
}

1256 1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272
int io_wq_cpu_affinity(struct io_wq *wq, cpumask_var_t mask)
{
	int i;

	rcu_read_lock();
	for_each_node(i) {
		struct io_wqe *wqe = wq->wqes[i];

		if (mask)
			cpumask_copy(wqe->cpu_mask, mask);
		else
			cpumask_copy(wqe->cpu_mask, cpumask_of_node(i));
	}
	rcu_read_unlock();
	return 0;
}

1273 1274 1275 1276 1277 1278 1279 1280
/*
 * Set max number of unbounded workers, returns old value. If new_count is 0,
 * then just return the old value.
 */
int io_wq_max_workers(struct io_wq *wq, int *new_count)
{
	int i, node, prev = 0;

1281 1282 1283 1284
	BUILD_BUG_ON((int) IO_WQ_ACCT_BOUND   != (int) IO_WQ_BOUND);
	BUILD_BUG_ON((int) IO_WQ_ACCT_UNBOUND != (int) IO_WQ_UNBOUND);
	BUILD_BUG_ON((int) IO_WQ_ACCT_NR      != 2);

1285 1286 1287 1288 1289 1290 1291 1292 1293
	for (i = 0; i < 2; i++) {
		if (new_count[i] > task_rlimit(current, RLIMIT_NPROC))
			new_count[i] = task_rlimit(current, RLIMIT_NPROC);
	}

	rcu_read_lock();
	for_each_node(node) {
		struct io_wqe_acct *acct;

1294
		for (i = 0; i < IO_WQ_ACCT_NR; i++) {
1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305
			acct = &wq->wqes[node]->acct[i];
			prev = max_t(int, acct->max_workers, prev);
			if (new_count[i])
				acct->max_workers = new_count[i];
			new_count[i] = prev;
		}
	}
	rcu_read_unlock();
	return 0;
}

1306 1307 1308 1309 1310
static __init int io_wq_init(void)
{
	int ret;

	ret = cpuhp_setup_state_multi(CPUHP_AP_ONLINE_DYN, "io-wq/online",
J
Jens Axboe 已提交
1311
					io_wq_cpu_online, io_wq_cpu_offline);
1312 1313 1314 1315 1316 1317
	if (ret < 0)
		return ret;
	io_wq_online = ret;
	return 0;
}
subsys_initcall(io_wq_init);