io-wq.c 26.5 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14
// SPDX-License-Identifier: GPL-2.0
/*
 * Basic worker thread pool for io_uring
 *
 * Copyright (C) 2019 Jens Axboe
 *
 */
#include <linux/kernel.h>
#include <linux/init.h>
#include <linux/errno.h>
#include <linux/sched/signal.h>
#include <linux/percpu.h>
#include <linux/slab.h>
#include <linux/rculist_nulls.h>
15
#include <linux/cpu.h>
16
#include <linux/tracehook.h>
17 18 19 20 21 22 23 24 25

#include "io-wq.h"

#define WORKER_IDLE_TIMEOUT	(5 * HZ)

enum {
	IO_WORKER_F_UP		= 1,	/* up and active */
	IO_WORKER_F_RUNNING	= 2,	/* account as running */
	IO_WORKER_F_FREE	= 4,	/* worker on free list */
26 27
	IO_WORKER_F_FIXED	= 8,	/* static idle worker */
	IO_WORKER_F_BOUND	= 16,	/* is doing bounded work */
28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
};

enum {
	IO_WQ_BIT_EXIT		= 0,	/* wq exiting */
};

enum {
	IO_WQE_FLAG_STALLED	= 1,	/* stalled on hash */
};

/*
 * One for each thread in a wqe pool
 */
struct io_worker {
	refcount_t ref;
	unsigned flags;
	struct hlist_nulls_node nulls_node;
45
	struct list_head all_list;
46 47
	struct task_struct *task;
	struct io_wqe *wqe;
48

49
	struct io_wq_work *cur_work;
50
	spinlock_t lock;
51

52 53
	struct completion ref_done;

54 55 56 57 58 59 60 61 62
	struct rcu_head rcu;
};

#if BITS_PER_LONG == 64
#define IO_WQ_HASH_ORDER	6
#else
#define IO_WQ_HASH_ORDER	5
#endif

63 64
#define IO_WQ_NR_HASH_BUCKETS	(1u << IO_WQ_HASH_ORDER)

65 66 67
struct io_wqe_acct {
	unsigned nr_workers;
	unsigned max_workers;
68
	int index;
69 70 71 72 73 74 75 76
	atomic_t nr_running;
};

enum {
	IO_WQ_ACCT_BOUND,
	IO_WQ_ACCT_UNBOUND,
};

77 78 79 80 81
/*
 * Per-node worker thread pool
 */
struct io_wqe {
	struct {
82
		raw_spinlock_t lock;
J
Jens Axboe 已提交
83
		struct io_wq_work_list work_list;
84 85 86 87
		unsigned flags;
	} ____cacheline_aligned_in_smp;

	int node;
88
	struct io_wqe_acct acct[2];
89

90
	struct hlist_nulls_head free_list;
91
	struct list_head all_list;
92

93 94
	struct wait_queue_entry wait;

95
	struct io_wq *wq;
96
	struct io_wq_work *hash_tail[IO_WQ_NR_HASH_BUCKETS];
J
Jens Axboe 已提交
97 98

	cpumask_var_t cpu_mask;
99 100 101 102 103 104 105 106
};

/*
 * Per io_wq state
  */
struct io_wq {
	unsigned long state;

107
	free_work_fn *free_work;
108
	io_wq_work_fn *do_work;
109

110 111
	struct io_wq_hash *hash;

112 113 114
	atomic_t worker_refs;
	struct completion worker_done;

115
	struct hlist_node cpuhp_node;
116

117
	struct task_struct *task;
118 119

	struct io_wqe *wqes[];
120 121
};

122 123
static enum cpuhp_state io_wq_online;

124 125 126 127 128 129 130 131
struct io_cb_cancel_data {
	work_cancel_fn *fn;
	void *data;
	int nr_running;
	int nr_pending;
	bool cancel_all;
};

132
static void create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index);
133
static void io_wqe_dec_running(struct io_worker *worker);
134

135 136 137 138 139 140 141 142
static bool io_worker_get(struct io_worker *worker)
{
	return refcount_inc_not_zero(&worker->ref);
}

static void io_worker_release(struct io_worker *worker)
{
	if (refcount_dec_and_test(&worker->ref))
143
		complete(&worker->ref_done);
144 145
}

P
Pavel Begunkov 已提交
146 147 148 149 150
static inline struct io_wqe_acct *io_get_acct(struct io_wqe *wqe, bool bound)
{
	return &wqe->acct[bound ? IO_WQ_ACCT_BOUND : IO_WQ_ACCT_UNBOUND];
}

151 152 153
static inline struct io_wqe_acct *io_work_get_acct(struct io_wqe *wqe,
						   struct io_wq_work *work)
{
P
Pavel Begunkov 已提交
154
	return io_get_acct(wqe, !(work->flags & IO_WQ_WORK_UNBOUND));
155 156
}

157
static inline struct io_wqe_acct *io_wqe_get_acct(struct io_worker *worker)
158
{
P
Pavel Begunkov 已提交
159
	return io_get_acct(worker->wqe, worker->flags & IO_WORKER_F_BOUND);
160 161
}

162 163 164 165 166 167
static void io_worker_ref_put(struct io_wq *wq)
{
	if (atomic_dec_and_test(&wq->worker_refs))
		complete(&wq->worker_done);
}

168 169 170
static void io_worker_exit(struct io_worker *worker)
{
	struct io_wqe *wqe = worker->wqe;
171
	struct io_wqe_acct *acct = io_wqe_get_acct(worker);
172

173 174 175
	if (refcount_dec_and_test(&worker->ref))
		complete(&worker->ref_done);
	wait_for_completion(&worker->ref_done);
176

177
	raw_spin_lock_irq(&wqe->lock);
178
	if (worker->flags & IO_WORKER_F_FREE)
179
		hlist_nulls_del_rcu(&worker->nulls_node);
180
	list_del_rcu(&worker->all_list);
181
	acct->nr_workers--;
182 183 184 185 186
	preempt_disable();
	io_wqe_dec_running(worker);
	worker->flags = 0;
	current->flags &= ~PF_IO_WORKER;
	preempt_enable();
187
	raw_spin_unlock_irq(&wqe->lock);
188

189
	kfree_rcu(worker, rcu);
190
	io_worker_ref_put(wqe->wq);
191
	do_exit(0);
192 193
}

194 195 196
static inline bool io_wqe_run_queue(struct io_wqe *wqe)
	__must_hold(wqe->lock)
{
J
Jens Axboe 已提交
197 198
	if (!wq_list_empty(&wqe->work_list) &&
	    !(wqe->flags & IO_WQE_FLAG_STALLED))
199 200 201 202 203 204
		return true;
	return false;
}

/*
 * Check head of free list for an available worker. If one isn't available,
205
 * caller must create one.
206 207 208 209 210 211 212
 */
static bool io_wqe_activate_free_worker(struct io_wqe *wqe)
	__must_hold(RCU)
{
	struct hlist_nulls_node *n;
	struct io_worker *worker;

213 214 215 216 217 218 219 220 221 222 223 224
	/*
	 * Iterate free_list and see if we can find an idle worker to
	 * activate. If a given worker is on the free_list but in the process
	 * of exiting, keep trying.
	 */
	hlist_nulls_for_each_entry_rcu(worker, n, &wqe->free_list, nulls_node) {
		if (!io_worker_get(worker))
			continue;
		if (wake_up_process(worker->task)) {
			io_worker_release(worker);
			return true;
		}
225 226 227 228 229 230 231 232
		io_worker_release(worker);
	}

	return false;
}

/*
 * We need a worker. If we find a free one, we're good. If not, and we're
233
 * below the max number of workers, create one.
234 235 236 237 238 239 240 241 242
 */
static void io_wqe_wake_worker(struct io_wqe *wqe, struct io_wqe_acct *acct)
{
	bool ret;

	/*
	 * Most likely an attempt to queue unbounded work on an io_wq that
	 * wasn't setup with any unbounded workers.
	 */
P
Pavel Begunkov 已提交
243 244
	if (unlikely(!acct->max_workers))
		pr_warn_once("io-wq is not configured for unbound workers");
245 246 247 248 249

	rcu_read_lock();
	ret = io_wqe_activate_free_worker(wqe);
	rcu_read_unlock();

250 251 252 253 254 255 256 257 258 259 260 261 262
	if (!ret) {
		bool do_create = false;

		raw_spin_lock_irq(&wqe->lock);
		if (acct->nr_workers < acct->max_workers) {
			atomic_inc(&acct->nr_running);
			atomic_inc(&wqe->wq->worker_refs);
			acct->nr_workers++;
			do_create = true;
		}
		raw_spin_unlock_irq(&wqe->lock);
		if (do_create)
			create_io_worker(wqe->wq, wqe, acct->index);
263
	}
264 265
}

266
static void io_wqe_inc_running(struct io_worker *worker)
267
{
268
	struct io_wqe_acct *acct = io_wqe_get_acct(worker);
269 270 271 272

	atomic_inc(&acct->nr_running);
}

273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313
struct create_worker_data {
	struct callback_head work;
	struct io_wqe *wqe;
	int index;
};

static void create_worker_cb(struct callback_head *cb)
{
	struct create_worker_data *cwd;
	struct io_wq *wq;

	cwd = container_of(cb, struct create_worker_data, work);
	wq = cwd->wqe->wq;
	create_io_worker(wq, cwd->wqe, cwd->index);
	kfree(cwd);
}

static void io_queue_worker_create(struct io_wqe *wqe, struct io_wqe_acct *acct)
{
	struct create_worker_data *cwd;
	struct io_wq *wq = wqe->wq;

	/* raced with exit, just ignore create call */
	if (test_bit(IO_WQ_BIT_EXIT, &wq->state))
		goto fail;

	cwd = kmalloc(sizeof(*cwd), GFP_ATOMIC);
	if (cwd) {
		init_task_work(&cwd->work, create_worker_cb);
		cwd->wqe = wqe;
		cwd->index = acct->index;
		if (!task_work_add(wq->task, &cwd->work, TWA_SIGNAL))
			return;

		kfree(cwd);
	}
fail:
	atomic_dec(&acct->nr_running);
	io_worker_ref_put(wq);
}

314
static void io_wqe_dec_running(struct io_worker *worker)
315 316
	__must_hold(wqe->lock)
{
317 318
	struct io_wqe_acct *acct = io_wqe_get_acct(worker);
	struct io_wqe *wqe = worker->wqe;
319

320 321 322 323 324 325 326 327
	if (!(worker->flags & IO_WORKER_F_UP))
		return;

	if (atomic_dec_and_test(&acct->nr_running) && io_wqe_run_queue(wqe)) {
		atomic_inc(&acct->nr_running);
		atomic_inc(&wqe->wq->worker_refs);
		io_queue_worker_create(wqe, acct);
	}
328 329
}

330 331 332 333 334 335 336 337
/*
 * Worker will start processing some work. Move it to the busy list, if
 * it's currently on the freelist
 */
static void __io_worker_busy(struct io_wqe *wqe, struct io_worker *worker,
			     struct io_wq_work *work)
	__must_hold(wqe->lock)
{
338 339
	bool worker_bound, work_bound;

340 341
	BUILD_BUG_ON((IO_WQ_ACCT_UNBOUND ^ IO_WQ_ACCT_BOUND) != 1);

342 343 344 345
	if (worker->flags & IO_WORKER_F_FREE) {
		worker->flags &= ~IO_WORKER_F_FREE;
		hlist_nulls_del_init_rcu(&worker->nulls_node);
	}
346 347 348 349 350

	/*
	 * If worker is moving from bound to unbound (or vice versa), then
	 * ensure we update the running accounting.
	 */
351 352 353
	worker_bound = (worker->flags & IO_WORKER_F_BOUND) != 0;
	work_bound = (work->flags & IO_WQ_WORK_UNBOUND) == 0;
	if (worker_bound != work_bound) {
354
		int index = work_bound ? IO_WQ_ACCT_UNBOUND : IO_WQ_ACCT_BOUND;
355
		io_wqe_dec_running(worker);
356 357 358
		worker->flags ^= IO_WORKER_F_BOUND;
		wqe->acct[index].nr_workers--;
		wqe->acct[index ^ 1].nr_workers++;
359
		io_wqe_inc_running(worker);
360
	 }
361 362 363 364 365 366 367 368 369
}

/*
 * No work, worker going to sleep. Move to freelist, and unuse mm if we
 * have one attached. Dropping the mm may potentially sleep, so we drop
 * the lock in that case and return success. Since the caller has to
 * retry the loop in that case (we changed task state), we don't regrab
 * the lock if we return success.
 */
370
static void __io_worker_idle(struct io_wqe *wqe, struct io_worker *worker)
371 372 373 374
	__must_hold(wqe->lock)
{
	if (!(worker->flags & IO_WORKER_F_FREE)) {
		worker->flags |= IO_WORKER_F_FREE;
375
		hlist_nulls_add_head_rcu(&worker->nulls_node, &wqe->free_list);
376 377 378
	}
}

P
Pavel Begunkov 已提交
379 380 381 382 383
static inline unsigned int io_get_work_hash(struct io_wq_work *work)
{
	return work->flags >> IO_WQ_HASH_SHIFT;
}

384 385 386 387 388 389 390 391 392 393 394 395 396 397 398
static void io_wait_on_hash(struct io_wqe *wqe, unsigned int hash)
{
	struct io_wq *wq = wqe->wq;

	spin_lock(&wq->hash->wait.lock);
	if (list_empty(&wqe->wait.entry)) {
		__add_wait_queue(&wq->hash->wait, &wqe->wait);
		if (!test_bit(hash, &wq->hash->map)) {
			__set_current_state(TASK_RUNNING);
			list_del_init(&wqe->wait.entry);
		}
	}
	spin_unlock(&wq->hash->wait.lock);
}

P
Pavel Begunkov 已提交
399
static struct io_wq_work *io_get_next_work(struct io_wqe *wqe)
400 401
	__must_hold(wqe->lock)
{
J
Jens Axboe 已提交
402
	struct io_wq_work_node *node, *prev;
403
	struct io_wq_work *work, *tail;
404
	unsigned int stall_hash = -1U;
405

J
Jens Axboe 已提交
406
	wq_list_for_each(node, prev, &wqe->work_list) {
407 408
		unsigned int hash;

J
Jens Axboe 已提交
409 410
		work = container_of(node, struct io_wq_work, list);

411
		/* not hashed, can run anytime */
412
		if (!io_wq_is_hashed(work)) {
413
			wq_list_del(&wqe->work_list, node, prev);
414 415 416
			return work;
		}

P
Pavel Begunkov 已提交
417
		hash = io_get_work_hash(work);
418 419 420 421 422
		/* all items with this hash lie in [work, tail] */
		tail = wqe->hash_tail[hash];

		/* hashed, can run if not already running */
		if (!test_and_set_bit(hash, &wqe->wq->hash->map)) {
423 424
			wqe->hash_tail[hash] = NULL;
			wq_list_cut(&wqe->work_list, &tail->list, prev);
425 426
			return work;
		}
427 428 429 430 431 432 433 434 435 436
		if (stall_hash == -1U)
			stall_hash = hash;
		/* fast forward to a next hash, for-each will fix up @prev */
		node = &tail->list;
	}

	if (stall_hash != -1U) {
		raw_spin_unlock(&wqe->lock);
		io_wait_on_hash(wqe, stall_hash);
		raw_spin_lock(&wqe->lock);
437 438 439 440 441
	}

	return NULL;
}

442
static bool io_flush_signals(void)
443
{
444
	if (unlikely(test_thread_flag(TIF_NOTIFY_SIGNAL))) {
445
		__set_current_state(TASK_RUNNING);
446
		tracehook_notify_signal();
447
		return true;
448
	}
449
	return false;
450 451 452 453 454
}

static void io_assign_current_work(struct io_worker *worker,
				   struct io_wq_work *work)
{
455
	if (work) {
456
		io_flush_signals();
457 458
		cond_resched();
	}
459 460 461 462 463 464

	spin_lock_irq(&worker->lock);
	worker->cur_work = work;
	spin_unlock_irq(&worker->lock);
}

P
Pavel Begunkov 已提交
465 466
static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work);

467 468 469 470 471
static void io_worker_handle_work(struct io_worker *worker)
	__releases(wqe->lock)
{
	struct io_wqe *wqe = worker->wqe;
	struct io_wq *wq = wqe->wq;
472
	bool do_kill = test_bit(IO_WQ_BIT_EXIT, &wq->state);
473 474

	do {
475
		struct io_wq_work *work;
476
get_next:
477 478 479 480 481 482 483
		/*
		 * If we got some work, mark us as busy. If we didn't, but
		 * the list isn't empty, it means we stalled on hashed work.
		 * Mark us stalled so we don't keep looking for work when we
		 * can't make progress, any work completion or insertion will
		 * clear the stalled flag.
		 */
P
Pavel Begunkov 已提交
484
		work = io_get_next_work(wqe);
485 486
		if (work)
			__io_worker_busy(wqe, worker, work);
J
Jens Axboe 已提交
487
		else if (!wq_list_empty(&wqe->work_list))
488 489
			wqe->flags |= IO_WQE_FLAG_STALLED;

490
		raw_spin_unlock_irq(&wqe->lock);
491 492
		if (!work)
			break;
493
		io_assign_current_work(worker, work);
494
		__set_current_state(TASK_RUNNING);
495

496 497
		/* handle a whole dependent link */
		do {
498
			struct io_wq_work *next_hashed, *linked;
P
Pavel Begunkov 已提交
499
			unsigned int hash = io_get_work_hash(work);
500

501
			next_hashed = wq_next_work(work);
502 503 504

			if (unlikely(do_kill) && (work->flags & IO_WQ_WORK_UNBOUND))
				work->flags |= IO_WQ_WORK_CANCEL;
505 506
			wq->do_work(work);
			io_assign_current_work(worker, NULL);
507

508
			linked = wq->free_work(work);
509 510 511 512 513 514 515 516 517 518
			work = next_hashed;
			if (!work && linked && !io_wq_is_hashed(linked)) {
				work = linked;
				linked = NULL;
			}
			io_assign_current_work(worker, work);
			if (linked)
				io_wqe_enqueue(wqe, linked);

			if (hash != -1U && !next_hashed) {
519 520 521
				clear_bit(hash, &wq->hash->map);
				if (wq_has_sleeper(&wq->hash->wait))
					wake_up(&wq->hash->wait);
522
				raw_spin_lock_irq(&wqe->lock);
523
				wqe->flags &= ~IO_WQE_FLAG_STALLED;
524 525 526
				/* skip unnecessary unlock-lock wqe->lock */
				if (!work)
					goto get_next;
527
				raw_spin_unlock_irq(&wqe->lock);
528
			}
529
		} while (work);
530

531
		raw_spin_lock_irq(&wqe->lock);
532 533 534 535 536 537 538 539
	} while (1);
}

static int io_wqe_worker(void *data)
{
	struct io_worker *worker = data;
	struct io_wqe *wqe = worker->wqe;
	struct io_wq *wq = wqe->wq;
540
	char buf[TASK_COMM_LEN];
541

542 543
	worker->flags |= (IO_WORKER_F_UP | IO_WORKER_F_RUNNING);

544
	snprintf(buf, sizeof(buf), "iou-wrk-%d", wq->task->pid);
545
	set_task_comm(current, buf);
546 547

	while (!test_bit(IO_WQ_BIT_EXIT, &wq->state)) {
548 549
		long ret;

J
Jens Axboe 已提交
550
		set_current_state(TASK_INTERRUPTIBLE);
551
loop:
552
		raw_spin_lock_irq(&wqe->lock);
553 554
		if (io_wqe_run_queue(wqe)) {
			io_worker_handle_work(worker);
555
			goto loop;
556
		}
557
		__io_worker_idle(wqe, worker);
558
		raw_spin_unlock_irq(&wqe->lock);
559 560
		if (io_flush_signals())
			continue;
561
		ret = schedule_timeout(WORKER_IDLE_TIMEOUT);
562 563 564 565 566
		if (signal_pending(current)) {
			struct ksignal ksig;

			if (!get_signal(&ksig))
				continue;
567
			break;
568 569 570
		}
		if (ret)
			continue;
571
		/* timed out, exit unless we're the fixed worker */
572
		if (!(worker->flags & IO_WORKER_F_FIXED))
573 574 575 576
			break;
	}

	if (test_bit(IO_WQ_BIT_EXIT, &wq->state)) {
577
		raw_spin_lock_irq(&wqe->lock);
P
Pavel Begunkov 已提交
578
		io_worker_handle_work(worker);
579 580 581 582 583 584 585 586 587 588 589
	}

	io_worker_exit(worker);
	return 0;
}

/*
 * Called when a worker is scheduled in. Mark us as currently running.
 */
void io_wq_worker_running(struct task_struct *tsk)
{
590
	struct io_worker *worker = tsk->pf_io_worker;
591

592 593
	if (!worker)
		return;
594 595 596 597 598
	if (!(worker->flags & IO_WORKER_F_UP))
		return;
	if (worker->flags & IO_WORKER_F_RUNNING)
		return;
	worker->flags |= IO_WORKER_F_RUNNING;
599
	io_wqe_inc_running(worker);
600 601 602 603
}

/*
 * Called when worker is going to sleep. If there are no workers currently
604
 * running and we have work pending, wake up a free one or create a new one.
605 606 607
 */
void io_wq_worker_sleeping(struct task_struct *tsk)
{
608
	struct io_worker *worker = tsk->pf_io_worker;
609

610 611
	if (!worker)
		return;
612 613 614 615 616 617 618
	if (!(worker->flags & IO_WORKER_F_UP))
		return;
	if (!(worker->flags & IO_WORKER_F_RUNNING))
		return;

	worker->flags &= ~IO_WORKER_F_RUNNING;

619
	raw_spin_lock_irq(&worker->wqe->lock);
620
	io_wqe_dec_running(worker);
621
	raw_spin_unlock_irq(&worker->wqe->lock);
622 623
}

624
static void create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index)
625
{
626
	struct io_wqe_acct *acct = &wqe->acct[index];
627
	struct io_worker *worker;
628
	struct task_struct *tsk;
629

630 631
	__set_current_state(TASK_RUNNING);

632 633
	worker = kzalloc_node(sizeof(*worker), GFP_KERNEL, wqe->node);
	if (!worker)
634
		goto fail;
635 636 637 638 639

	refcount_set(&worker->ref, 1);
	worker->nulls_node.pprev = NULL;
	worker->wqe = wqe;
	spin_lock_init(&worker->lock);
640
	init_completion(&worker->ref_done);
641

642 643
	tsk = create_io_thread(io_wqe_worker, worker, wqe->node);
	if (IS_ERR(tsk)) {
644
		kfree(worker);
645 646
fail:
		atomic_dec(&acct->nr_running);
647 648 649
		raw_spin_lock_irq(&wqe->lock);
		acct->nr_workers--;
		raw_spin_unlock_irq(&wqe->lock);
650 651
		io_worker_ref_put(wq);
		return;
652
	}
653 654 655

	tsk->pf_io_worker = worker;
	worker->task = tsk;
J
Jens Axboe 已提交
656
	set_cpus_allowed_ptr(tsk, wqe->cpu_mask);
657
	tsk->flags |= PF_NO_SETAFFINITY;
658 659 660 661 662 663 664

	raw_spin_lock_irq(&wqe->lock);
	hlist_nulls_add_head_rcu(&worker->nulls_node, &wqe->free_list);
	list_add_tail_rcu(&worker->all_list, &wqe->all_list);
	worker->flags |= IO_WORKER_F_FREE;
	if (index == IO_WQ_ACCT_BOUND)
		worker->flags |= IO_WORKER_F_BOUND;
665
	if ((acct->nr_workers == 1) && (worker->flags & IO_WORKER_F_BOUND))
666 667 668
		worker->flags |= IO_WORKER_F_FIXED;
	raw_spin_unlock_irq(&wqe->lock);
	wake_up_new_task(tsk);
669 670
}

671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697
/*
 * Iterate the passed in list and call the specific function for each
 * worker that isn't exiting
 */
static bool io_wq_for_each_worker(struct io_wqe *wqe,
				  bool (*func)(struct io_worker *, void *),
				  void *data)
{
	struct io_worker *worker;
	bool ret = false;

	list_for_each_entry_rcu(worker, &wqe->all_list, all_list) {
		if (io_worker_get(worker)) {
			/* no task if node is/was offline */
			if (worker->task)
				ret = func(worker, data);
			io_worker_release(worker);
			if (ret)
				break;
		}
	}

	return ret;
}

static bool io_wq_worker_wake(struct io_worker *worker, void *data)
{
698
	set_notify_signal(worker->task);
699 700 701 702
	wake_up_process(worker->task);
	return false;
}

703 704 705 706 707
static bool io_wq_work_match_all(struct io_wq_work *work, void *data)
{
	return true;
}

708
static void io_run_cancel(struct io_wq_work *work, struct io_wqe *wqe)
709
{
710 711
	struct io_wq *wq = wqe->wq;

712 713
	do {
		work->flags |= IO_WQ_WORK_CANCEL;
714 715
		wq->do_work(work);
		work = wq->free_work(work);
716 717 718
	} while (work);
}

719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738
static void io_wqe_insert_work(struct io_wqe *wqe, struct io_wq_work *work)
{
	unsigned int hash;
	struct io_wq_work *tail;

	if (!io_wq_is_hashed(work)) {
append:
		wq_list_add_tail(&work->list, &wqe->work_list);
		return;
	}

	hash = io_get_work_hash(work);
	tail = wqe->hash_tail[hash];
	wqe->hash_tail[hash] = work;
	if (!tail)
		goto append;

	wq_list_add_after(&work->list, &tail->list, &wqe->work_list);
}

739 740
static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work)
{
741
	struct io_wqe_acct *acct = io_work_get_acct(wqe, work);
742
	int work_flags;
743 744
	unsigned long flags;

745 746 747 748 749 750
	/*
	 * If io-wq is exiting for this task, or if the request has explicitly
	 * been marked as one that should not get executed, cancel it here.
	 */
	if (test_bit(IO_WQ_BIT_EXIT, &wqe->wq->state) ||
	    (work->flags & IO_WQ_WORK_CANCEL)) {
751
		io_run_cancel(work, wqe);
752 753 754
		return;
	}

755
	work_flags = work->flags;
756
	raw_spin_lock_irqsave(&wqe->lock, flags);
757
	io_wqe_insert_work(wqe, work);
758
	wqe->flags &= ~IO_WQE_FLAG_STALLED;
759
	raw_spin_unlock_irqrestore(&wqe->lock, flags);
760

761 762
	if ((work_flags & IO_WQ_WORK_CONCURRENT) ||
	    !atomic_read(&acct->nr_running))
763
		io_wqe_wake_worker(wqe, acct);
764 765 766 767 768 769 770 771 772 773
}

void io_wq_enqueue(struct io_wq *wq, struct io_wq_work *work)
{
	struct io_wqe *wqe = wq->wqes[numa_node_id()];

	io_wqe_enqueue(wqe, work);
}

/*
774 775
 * Work items that hash to the same value will not be done in parallel.
 * Used to limit concurrent writes, generally hashed by inode.
776
 */
777
void io_wq_hash_work(struct io_wq_work *work, void *val)
778
{
779
	unsigned int bit;
780 781 782 783 784

	bit = hash_ptr(val, IO_WQ_HASH_ORDER);
	work->flags |= (IO_WQ_WORK_HASHED | (bit << IO_WQ_HASH_SHIFT));
}

785
static bool io_wq_worker_cancel(struct io_worker *worker, void *data)
786
{
787
	struct io_cb_cancel_data *match = data;
788
	unsigned long flags;
789 790 791

	/*
	 * Hold the lock to avoid ->cur_work going out of scope, caller
792
	 * may dereference the passed in work.
793
	 */
794
	spin_lock_irqsave(&worker->lock, flags);
795
	if (worker->cur_work &&
796
	    match->fn(worker->cur_work, match->data)) {
797
		set_notify_signal(worker->task);
798
		match->nr_running++;
799
	}
800
	spin_unlock_irqrestore(&worker->lock, flags);
801

802
	return match->nr_running && !match->cancel_all;
803 804
}

805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822
static inline void io_wqe_remove_pending(struct io_wqe *wqe,
					 struct io_wq_work *work,
					 struct io_wq_work_node *prev)
{
	unsigned int hash = io_get_work_hash(work);
	struct io_wq_work *prev_work = NULL;

	if (io_wq_is_hashed(work) && work == wqe->hash_tail[hash]) {
		if (prev)
			prev_work = container_of(prev, struct io_wq_work, list);
		if (prev_work && io_get_work_hash(prev_work) == hash)
			wqe->hash_tail[hash] = prev_work;
		else
			wqe->hash_tail[hash] = NULL;
	}
	wq_list_del(&wqe->work_list, &work->list, prev);
}

823
static void io_wqe_cancel_pending_work(struct io_wqe *wqe,
824
				       struct io_cb_cancel_data *match)
825
{
J
Jens Axboe 已提交
826
	struct io_wq_work_node *node, *prev;
827
	struct io_wq_work *work;
828
	unsigned long flags;
829

830
retry:
831
	raw_spin_lock_irqsave(&wqe->lock, flags);
J
Jens Axboe 已提交
832 833
	wq_list_for_each(node, prev, &wqe->work_list) {
		work = container_of(node, struct io_wq_work, list);
834 835
		if (!match->fn(work, match->data))
			continue;
836
		io_wqe_remove_pending(wqe, work, prev);
837
		raw_spin_unlock_irqrestore(&wqe->lock, flags);
838 839 840 841 842 843 844
		io_run_cancel(work, wqe);
		match->nr_pending++;
		if (!match->cancel_all)
			return;

		/* not safe to continue after unlock */
		goto retry;
845
	}
846
	raw_spin_unlock_irqrestore(&wqe->lock, flags);
847 848
}

849
static void io_wqe_cancel_running_work(struct io_wqe *wqe,
850 851
				       struct io_cb_cancel_data *match)
{
852
	rcu_read_lock();
853
	io_wq_for_each_worker(wqe, io_wq_worker_cancel, match);
854 855 856
	rcu_read_unlock();
}

857
enum io_wq_cancel io_wq_cancel_cb(struct io_wq *wq, work_cancel_fn *cancel,
858
				  void *data, bool cancel_all)
859
{
860
	struct io_cb_cancel_data match = {
861 862 863
		.fn		= cancel,
		.data		= data,
		.cancel_all	= cancel_all,
864
	};
J
Jann Horn 已提交
865
	int node;
866

867 868 869 870 871
	/*
	 * First check pending list, if we're lucky we can just remove it
	 * from there. CANCEL_OK means that the work is returned as-new,
	 * no completion will be posted for it.
	 */
J
Jann Horn 已提交
872 873
	for_each_node(node) {
		struct io_wqe *wqe = wq->wqes[node];
874

875 876
		io_wqe_cancel_pending_work(wqe, &match);
		if (match.nr_pending && !match.cancel_all)
877
			return IO_WQ_CANCEL_OK;
878 879
	}

880 881 882 883 884 885 886 887 888
	/*
	 * Now check if a free (going busy) or busy worker has the work
	 * currently running. If we find it there, we'll return CANCEL_RUNNING
	 * as an indication that we attempt to signal cancellation. The
	 * completion will run normally in this case.
	 */
	for_each_node(node) {
		struct io_wqe *wqe = wq->wqes[node];

889 890
		io_wqe_cancel_running_work(wqe, &match);
		if (match.nr_running && !match.cancel_all)
891 892 893
			return IO_WQ_CANCEL_RUNNING;
	}

894 895 896 897
	if (match.nr_running)
		return IO_WQ_CANCEL_RUNNING;
	if (match.nr_pending)
		return IO_WQ_CANCEL_OK;
898
	return IO_WQ_CANCEL_NOTFOUND;
899 900
}

901 902 903 904 905 906 907 908
static int io_wqe_hash_wake(struct wait_queue_entry *wait, unsigned mode,
			    int sync, void *key)
{
	struct io_wqe *wqe = container_of(wait, struct io_wqe, wait);

	list_del_init(&wait->entry);

	rcu_read_lock();
909
	io_wqe_activate_free_worker(wqe);
910 911 912 913
	rcu_read_unlock();
	return 1;
}

914
struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data)
915
{
916
	int ret, node;
917 918
	struct io_wq *wq;

919
	if (WARN_ON_ONCE(!data->free_work || !data->do_work))
920
		return ERR_PTR(-EINVAL);
P
Pavel Begunkov 已提交
921 922
	if (WARN_ON_ONCE(!bounded))
		return ERR_PTR(-EINVAL);
923

924
	wq = kzalloc(struct_size(wq, wqes, nr_node_ids), GFP_KERNEL);
925 926
	if (!wq)
		return ERR_PTR(-ENOMEM);
927 928
	ret = cpuhp_state_add_instance_nocalls(io_wq_online, &wq->cpuhp_node);
	if (ret)
929
		goto err_wq;
930

931 932
	refcount_inc(&data->hash->refs);
	wq->hash = data->hash;
933
	wq->free_work = data->free_work;
934
	wq->do_work = data->do_work;
935

936
	ret = -ENOMEM;
J
Jann Horn 已提交
937
	for_each_node(node) {
938
		struct io_wqe *wqe;
939
		int alloc_node = node;
940

941 942 943
		if (!node_online(alloc_node))
			alloc_node = NUMA_NO_NODE;
		wqe = kzalloc_node(sizeof(struct io_wqe), GFP_KERNEL, alloc_node);
944
		if (!wqe)
J
Jann Horn 已提交
945
			goto err;
J
Jens Axboe 已提交
946 947 948
		if (!alloc_cpumask_var(&wqe->cpu_mask, GFP_KERNEL))
			goto err;
		cpumask_copy(wqe->cpu_mask, cpumask_of_node(node));
J
Jann Horn 已提交
949
		wq->wqes[node] = wqe;
950
		wqe->node = alloc_node;
951 952
		wqe->acct[IO_WQ_ACCT_BOUND].index = IO_WQ_ACCT_BOUND;
		wqe->acct[IO_WQ_ACCT_UNBOUND].index = IO_WQ_ACCT_UNBOUND;
953 954
		wqe->acct[IO_WQ_ACCT_BOUND].max_workers = bounded;
		atomic_set(&wqe->acct[IO_WQ_ACCT_BOUND].nr_running, 0);
J
Jens Axboe 已提交
955
		wqe->acct[IO_WQ_ACCT_UNBOUND].max_workers =
956 957
					task_rlimit(current, RLIMIT_NPROC);
		atomic_set(&wqe->acct[IO_WQ_ACCT_UNBOUND].nr_running, 0);
958 959
		wqe->wait.func = io_wqe_hash_wake;
		INIT_LIST_HEAD(&wqe->wait.entry);
960
		wqe->wq = wq;
961
		raw_spin_lock_init(&wqe->lock);
J
Jens Axboe 已提交
962
		INIT_WQ_LIST(&wqe->work_list);
963
		INIT_HLIST_NULLS_HEAD(&wqe->free_list, 0);
964
		INIT_LIST_HEAD(&wqe->all_list);
965 966
	}

967 968 969 970
	wq->task = get_task_struct(data->task);
	atomic_set(&wq->worker_refs, 1);
	init_completion(&wq->worker_done);
	return wq;
971
err:
972
	io_wq_put_hash(data->hash);
973
	cpuhp_state_remove_instance_nocalls(io_wq_online, &wq->cpuhp_node);
J
Jens Axboe 已提交
974 975 976 977
	for_each_node(node) {
		if (!wq->wqes[node])
			continue;
		free_cpumask_var(wq->wqes[node]->cpu_mask);
J
Jann Horn 已提交
978
		kfree(wq->wqes[node]);
J
Jens Axboe 已提交
979
	}
980
err_wq:
981
	kfree(wq);
982 983 984
	return ERR_PTR(ret);
}

985 986 987 988 989 990 991 992 993 994
static bool io_task_work_match(struct callback_head *cb, void *data)
{
	struct create_worker_data *cwd;

	if (cb->func != create_worker_cb)
		return false;
	cwd = container_of(cb, struct create_worker_data, work);
	return cwd->wqe->wq == data;
}

995 996 997 998 999
void io_wq_exit_start(struct io_wq *wq)
{
	set_bit(IO_WQ_BIT_EXIT, &wq->state);
}

1000
static void io_wq_exit_workers(struct io_wq *wq)
1001
{
1002 1003 1004 1005 1006 1007
	struct callback_head *cb;
	int node;

	if (!wq->task)
		return;

1008
	while ((cb = task_work_cancel_match(wq->task, io_task_work_match, wq)) != NULL) {
1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021
		struct create_worker_data *cwd;

		cwd = container_of(cb, struct create_worker_data, work);
		atomic_dec(&cwd->wqe->acct[cwd->index].nr_running);
		io_worker_ref_put(wq);
		kfree(cwd);
	}

	rcu_read_lock();
	for_each_node(node) {
		struct io_wqe *wqe = wq->wqes[node];

		io_wq_for_each_worker(wqe, io_wq_worker_wake, NULL);
1022
	}
1023 1024 1025
	rcu_read_unlock();
	io_worker_ref_put(wq);
	wait_for_completion(&wq->worker_done);
1026 1027 1028 1029 1030 1031

	for_each_node(node) {
		spin_lock_irq(&wq->hash->wait.lock);
		list_del_init(&wq->wqes[node]->wait.entry);
		spin_unlock_irq(&wq->hash->wait.lock);
	}
1032 1033
	put_task_struct(wq->task);
	wq->task = NULL;
1034 1035
}

1036
static void io_wq_destroy(struct io_wq *wq)
1037
{
J
Jann Horn 已提交
1038
	int node;
1039

1040 1041
	cpuhp_state_remove_instance_nocalls(io_wq_online, &wq->cpuhp_node);

1042 1043
	for_each_node(node) {
		struct io_wqe *wqe = wq->wqes[node];
1044 1045 1046 1047 1048
		struct io_cb_cancel_data match = {
			.fn		= io_wq_work_match_all,
			.cancel_all	= true,
		};
		io_wqe_cancel_pending_work(wqe, &match);
J
Jens Axboe 已提交
1049
		free_cpumask_var(wqe->cpu_mask);
1050 1051 1052
		kfree(wqe);
	}
	io_wq_put_hash(wq->hash);
1053
	kfree(wq);
1054 1055
}

1056 1057
void io_wq_put_and_exit(struct io_wq *wq)
{
1058 1059
	WARN_ON_ONCE(!test_bit(IO_WQ_BIT_EXIT, &wq->state));

1060
	io_wq_exit_workers(wq);
1061
	io_wq_destroy(wq);
1062 1063
}

J
Jens Axboe 已提交
1064 1065 1066 1067 1068
struct online_data {
	unsigned int cpu;
	bool online;
};

1069 1070
static bool io_wq_worker_affinity(struct io_worker *worker, void *data)
{
J
Jens Axboe 已提交
1071
	struct online_data *od = data;
1072

J
Jens Axboe 已提交
1073 1074 1075 1076
	if (od->online)
		cpumask_set_cpu(od->cpu, worker->wqe->cpu_mask);
	else
		cpumask_clear_cpu(od->cpu, worker->wqe->cpu_mask);
1077 1078 1079
	return false;
}

J
Jens Axboe 已提交
1080
static int __io_wq_cpu_online(struct io_wq *wq, unsigned int cpu, bool online)
1081
{
J
Jens Axboe 已提交
1082 1083 1084 1085
	struct online_data od = {
		.cpu = cpu,
		.online = online
	};
1086 1087 1088 1089
	int i;

	rcu_read_lock();
	for_each_node(i)
J
Jens Axboe 已提交
1090
		io_wq_for_each_worker(wq->wqes[i], io_wq_worker_affinity, &od);
1091 1092 1093 1094
	rcu_read_unlock();
	return 0;
}

J
Jens Axboe 已提交
1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108
static int io_wq_cpu_online(unsigned int cpu, struct hlist_node *node)
{
	struct io_wq *wq = hlist_entry_safe(node, struct io_wq, cpuhp_node);

	return __io_wq_cpu_online(wq, cpu, true);
}

static int io_wq_cpu_offline(unsigned int cpu, struct hlist_node *node)
{
	struct io_wq *wq = hlist_entry_safe(node, struct io_wq, cpuhp_node);

	return __io_wq_cpu_online(wq, cpu, false);
}

1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125
int io_wq_cpu_affinity(struct io_wq *wq, cpumask_var_t mask)
{
	int i;

	rcu_read_lock();
	for_each_node(i) {
		struct io_wqe *wqe = wq->wqes[i];

		if (mask)
			cpumask_copy(wqe->cpu_mask, mask);
		else
			cpumask_copy(wqe->cpu_mask, cpumask_of_node(i));
	}
	rcu_read_unlock();
	return 0;
}

1126 1127 1128 1129 1130
static __init int io_wq_init(void)
{
	int ret;

	ret = cpuhp_setup_state_multi(CPUHP_AP_ONLINE_DYN, "io-wq/online",
J
Jens Axboe 已提交
1131
					io_wq_cpu_online, io_wq_cpu_offline);
1132 1133 1134 1135 1136 1137
	if (ret < 0)
		return ret;
	io_wq_online = ret;
	return 0;
}
subsys_initcall(io_wq_init);