io-wq.c 25.0 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
// SPDX-License-Identifier: GPL-2.0
/*
 * Basic worker thread pool for io_uring
 *
 * Copyright (C) 2019 Jens Axboe
 *
 */
#include <linux/kernel.h>
#include <linux/init.h>
#include <linux/errno.h>
#include <linux/sched/signal.h>
#include <linux/mm.h>
#include <linux/sched/mm.h>
#include <linux/percpu.h>
#include <linux/slab.h>
#include <linux/rculist_nulls.h>
17
#include <linux/cpu.h>
18
#include <linux/tracehook.h>
19 20 21 22 23 24 25 26 27

#include "io-wq.h"

#define WORKER_IDLE_TIMEOUT	(5 * HZ)

enum {
	IO_WORKER_F_UP		= 1,	/* up and active */
	IO_WORKER_F_RUNNING	= 2,	/* account as running */
	IO_WORKER_F_FREE	= 4,	/* worker on free list */
28 29
	IO_WORKER_F_FIXED	= 8,	/* static idle worker */
	IO_WORKER_F_BOUND	= 16,	/* is doing bounded work */
30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
};

enum {
	IO_WQ_BIT_EXIT		= 0,	/* wq exiting */
};

enum {
	IO_WQE_FLAG_STALLED	= 1,	/* stalled on hash */
};

/*
 * One for each thread in a wqe pool
 */
struct io_worker {
	refcount_t ref;
	unsigned flags;
	struct hlist_nulls_node nulls_node;
47
	struct list_head all_list;
48 49
	struct task_struct *task;
	struct io_wqe *wqe;
50

51
	struct io_wq_work *cur_work;
52
	spinlock_t lock;
53

54 55
	struct completion ref_done;

56 57 58 59 60 61 62 63 64
	struct rcu_head rcu;
};

#if BITS_PER_LONG == 64
#define IO_WQ_HASH_ORDER	6
#else
#define IO_WQ_HASH_ORDER	5
#endif

65 66
#define IO_WQ_NR_HASH_BUCKETS	(1u << IO_WQ_HASH_ORDER)

67 68 69
struct io_wqe_acct {
	unsigned nr_workers;
	unsigned max_workers;
70
	int index;
71 72 73 74 75 76 77 78
	atomic_t nr_running;
};

enum {
	IO_WQ_ACCT_BOUND,
	IO_WQ_ACCT_UNBOUND,
};

79 80 81 82 83
/*
 * Per-node worker thread pool
 */
struct io_wqe {
	struct {
84
		raw_spinlock_t lock;
J
Jens Axboe 已提交
85
		struct io_wq_work_list work_list;
86 87 88 89
		unsigned flags;
	} ____cacheline_aligned_in_smp;

	int node;
90
	struct io_wqe_acct acct[2];
91

92
	struct hlist_nulls_head free_list;
93
	struct list_head all_list;
94

95 96
	struct wait_queue_entry wait;

97
	struct io_wq *wq;
98
	struct io_wq_work *hash_tail[IO_WQ_NR_HASH_BUCKETS];
99 100 101 102 103 104 105 106
};

/*
 * Per io_wq state
  */
struct io_wq {
	unsigned long state;

107
	free_work_fn *free_work;
108
	io_wq_work_fn *do_work;
109

110 111
	struct io_wq_hash *hash;

112 113 114
	atomic_t worker_refs;
	struct completion worker_done;

115
	struct hlist_node cpuhp_node;
116

117
	struct task_struct *task;
118 119

	struct io_wqe *wqes[];
120 121
};

122 123
static enum cpuhp_state io_wq_online;

124 125 126 127 128 129 130 131
struct io_cb_cancel_data {
	work_cancel_fn *fn;
	void *data;
	int nr_running;
	int nr_pending;
	bool cancel_all;
};

132
static void create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index);
133

134 135 136 137 138 139 140 141
static bool io_worker_get(struct io_worker *worker)
{
	return refcount_inc_not_zero(&worker->ref);
}

static void io_worker_release(struct io_worker *worker)
{
	if (refcount_dec_and_test(&worker->ref))
142
		complete(&worker->ref_done);
143 144
}

P
Pavel Begunkov 已提交
145 146 147 148 149
static inline struct io_wqe_acct *io_get_acct(struct io_wqe *wqe, bool bound)
{
	return &wqe->acct[bound ? IO_WQ_ACCT_BOUND : IO_WQ_ACCT_UNBOUND];
}

150 151 152
static inline struct io_wqe_acct *io_work_get_acct(struct io_wqe *wqe,
						   struct io_wq_work *work)
{
P
Pavel Begunkov 已提交
153
	return io_get_acct(wqe, !(work->flags & IO_WQ_WORK_UNBOUND));
154 155
}

156
static inline struct io_wqe_acct *io_wqe_get_acct(struct io_worker *worker)
157
{
P
Pavel Begunkov 已提交
158
	return io_get_acct(worker->wqe, worker->flags & IO_WORKER_F_BOUND);
159 160
}

161 162 163 164 165 166
static void io_worker_ref_put(struct io_wq *wq)
{
	if (atomic_dec_and_test(&wq->worker_refs))
		complete(&wq->worker_done);
}

167 168 169
static void io_worker_exit(struct io_worker *worker)
{
	struct io_wqe *wqe = worker->wqe;
170
	struct io_wqe_acct *acct = io_wqe_get_acct(worker);
171
	unsigned flags;
172

173 174 175
	if (refcount_dec_and_test(&worker->ref))
		complete(&worker->ref_done);
	wait_for_completion(&worker->ref_done);
176 177 178

	preempt_disable();
	current->flags &= ~PF_IO_WORKER;
179 180 181
	flags = worker->flags;
	worker->flags = 0;
	if (flags & IO_WORKER_F_RUNNING)
182
		atomic_dec(&acct->nr_running);
183 184 185
	worker->flags = 0;
	preempt_enable();

186
	raw_spin_lock_irq(&wqe->lock);
187 188
	if (flags & IO_WORKER_F_FREE)
		hlist_nulls_del_rcu(&worker->nulls_node);
189
	list_del_rcu(&worker->all_list);
190
	acct->nr_workers--;
191
	raw_spin_unlock_irq(&wqe->lock);
192

193
	kfree_rcu(worker, rcu);
194
	io_worker_ref_put(wqe->wq);
195
	do_exit(0);
196 197
}

198 199 200
static inline bool io_wqe_run_queue(struct io_wqe *wqe)
	__must_hold(wqe->lock)
{
J
Jens Axboe 已提交
201 202
	if (!wq_list_empty(&wqe->work_list) &&
	    !(wqe->flags & IO_WQE_FLAG_STALLED))
203 204 205 206 207 208
		return true;
	return false;
}

/*
 * Check head of free list for an available worker. If one isn't available,
209
 * caller must create one.
210 211 212 213 214 215 216
 */
static bool io_wqe_activate_free_worker(struct io_wqe *wqe)
	__must_hold(RCU)
{
	struct hlist_nulls_node *n;
	struct io_worker *worker;

217
	n = rcu_dereference(hlist_nulls_first_rcu(&wqe->free_list));
218 219 220 221 222
	if (is_a_nulls(n))
		return false;

	worker = hlist_nulls_entry(n, struct io_worker, nulls_node);
	if (io_worker_get(worker)) {
J
Jens Axboe 已提交
223
		wake_up_process(worker->task);
224 225 226 227 228 229 230 231 232
		io_worker_release(worker);
		return true;
	}

	return false;
}

/*
 * We need a worker. If we find a free one, we're good. If not, and we're
233
 * below the max number of workers, create one.
234 235 236 237 238 239 240 241 242 243 244 245 246 247 248
 */
static void io_wqe_wake_worker(struct io_wqe *wqe, struct io_wqe_acct *acct)
{
	bool ret;

	/*
	 * Most likely an attempt to queue unbounded work on an io_wq that
	 * wasn't setup with any unbounded workers.
	 */
	WARN_ON_ONCE(!acct->max_workers);

	rcu_read_lock();
	ret = io_wqe_activate_free_worker(wqe);
	rcu_read_unlock();

249 250 251 252 253
	if (!ret && acct->nr_workers < acct->max_workers) {
		atomic_inc(&acct->nr_running);
		atomic_inc(&wqe->wq->worker_refs);
		create_io_worker(wqe->wq, wqe, acct->index);
	}
254 255
}

256
static void io_wqe_inc_running(struct io_worker *worker)
257
{
258
	struct io_wqe_acct *acct = io_wqe_get_acct(worker);
259 260 261 262

	atomic_inc(&acct->nr_running);
}

263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303
struct create_worker_data {
	struct callback_head work;
	struct io_wqe *wqe;
	int index;
};

static void create_worker_cb(struct callback_head *cb)
{
	struct create_worker_data *cwd;
	struct io_wq *wq;

	cwd = container_of(cb, struct create_worker_data, work);
	wq = cwd->wqe->wq;
	create_io_worker(wq, cwd->wqe, cwd->index);
	kfree(cwd);
}

static void io_queue_worker_create(struct io_wqe *wqe, struct io_wqe_acct *acct)
{
	struct create_worker_data *cwd;
	struct io_wq *wq = wqe->wq;

	/* raced with exit, just ignore create call */
	if (test_bit(IO_WQ_BIT_EXIT, &wq->state))
		goto fail;

	cwd = kmalloc(sizeof(*cwd), GFP_ATOMIC);
	if (cwd) {
		init_task_work(&cwd->work, create_worker_cb);
		cwd->wqe = wqe;
		cwd->index = acct->index;
		if (!task_work_add(wq->task, &cwd->work, TWA_SIGNAL))
			return;

		kfree(cwd);
	}
fail:
	atomic_dec(&acct->nr_running);
	io_worker_ref_put(wq);
}

304
static void io_wqe_dec_running(struct io_worker *worker)
305 306
	__must_hold(wqe->lock)
{
307 308
	struct io_wqe_acct *acct = io_wqe_get_acct(worker);
	struct io_wqe *wqe = worker->wqe;
309

310 311 312 313 314 315 316 317
	if (!(worker->flags & IO_WORKER_F_UP))
		return;

	if (atomic_dec_and_test(&acct->nr_running) && io_wqe_run_queue(wqe)) {
		atomic_inc(&acct->nr_running);
		atomic_inc(&wqe->wq->worker_refs);
		io_queue_worker_create(wqe, acct);
	}
318 319
}

320 321 322 323 324 325 326 327
/*
 * Worker will start processing some work. Move it to the busy list, if
 * it's currently on the freelist
 */
static void __io_worker_busy(struct io_wqe *wqe, struct io_worker *worker,
			     struct io_wq_work *work)
	__must_hold(wqe->lock)
{
328 329
	bool worker_bound, work_bound;

330 331
	BUILD_BUG_ON((IO_WQ_ACCT_UNBOUND ^ IO_WQ_ACCT_BOUND) != 1);

332 333 334 335
	if (worker->flags & IO_WORKER_F_FREE) {
		worker->flags &= ~IO_WORKER_F_FREE;
		hlist_nulls_del_init_rcu(&worker->nulls_node);
	}
336 337 338 339 340

	/*
	 * If worker is moving from bound to unbound (or vice versa), then
	 * ensure we update the running accounting.
	 */
341 342 343
	worker_bound = (worker->flags & IO_WORKER_F_BOUND) != 0;
	work_bound = (work->flags & IO_WQ_WORK_UNBOUND) == 0;
	if (worker_bound != work_bound) {
344
		int index = work_bound ? IO_WQ_ACCT_UNBOUND : IO_WQ_ACCT_BOUND;
345
		io_wqe_dec_running(worker);
346 347 348
		worker->flags ^= IO_WORKER_F_BOUND;
		wqe->acct[index].nr_workers--;
		wqe->acct[index ^ 1].nr_workers++;
349
		io_wqe_inc_running(worker);
350
	 }
351 352 353 354 355 356 357 358 359
}

/*
 * No work, worker going to sleep. Move to freelist, and unuse mm if we
 * have one attached. Dropping the mm may potentially sleep, so we drop
 * the lock in that case and return success. Since the caller has to
 * retry the loop in that case (we changed task state), we don't regrab
 * the lock if we return success.
 */
360
static void __io_worker_idle(struct io_wqe *wqe, struct io_worker *worker)
361 362 363 364
	__must_hold(wqe->lock)
{
	if (!(worker->flags & IO_WORKER_F_FREE)) {
		worker->flags |= IO_WORKER_F_FREE;
365
		hlist_nulls_add_head_rcu(&worker->nulls_node, &wqe->free_list);
366 367 368
	}
}

P
Pavel Begunkov 已提交
369 370 371 372 373
static inline unsigned int io_get_work_hash(struct io_wq_work *work)
{
	return work->flags >> IO_WQ_HASH_SHIFT;
}

374 375 376 377 378 379 380 381 382 383 384 385 386 387 388
static void io_wait_on_hash(struct io_wqe *wqe, unsigned int hash)
{
	struct io_wq *wq = wqe->wq;

	spin_lock(&wq->hash->wait.lock);
	if (list_empty(&wqe->wait.entry)) {
		__add_wait_queue(&wq->hash->wait, &wqe->wait);
		if (!test_bit(hash, &wq->hash->map)) {
			__set_current_state(TASK_RUNNING);
			list_del_init(&wqe->wait.entry);
		}
	}
	spin_unlock(&wq->hash->wait.lock);
}

P
Pavel Begunkov 已提交
389
static struct io_wq_work *io_get_next_work(struct io_wqe *wqe)
390 391
	__must_hold(wqe->lock)
{
J
Jens Axboe 已提交
392
	struct io_wq_work_node *node, *prev;
393
	struct io_wq_work *work, *tail;
394
	unsigned int stall_hash = -1U;
395

J
Jens Axboe 已提交
396
	wq_list_for_each(node, prev, &wqe->work_list) {
397 398
		unsigned int hash;

J
Jens Axboe 已提交
399 400
		work = container_of(node, struct io_wq_work, list);

401
		/* not hashed, can run anytime */
402
		if (!io_wq_is_hashed(work)) {
403
			wq_list_del(&wqe->work_list, node, prev);
404 405 406
			return work;
		}

P
Pavel Begunkov 已提交
407
		hash = io_get_work_hash(work);
408 409 410 411 412
		/* all items with this hash lie in [work, tail] */
		tail = wqe->hash_tail[hash];

		/* hashed, can run if not already running */
		if (!test_and_set_bit(hash, &wqe->wq->hash->map)) {
413 414
			wqe->hash_tail[hash] = NULL;
			wq_list_cut(&wqe->work_list, &tail->list, prev);
415 416
			return work;
		}
417 418 419 420 421 422 423 424 425 426
		if (stall_hash == -1U)
			stall_hash = hash;
		/* fast forward to a next hash, for-each will fix up @prev */
		node = &tail->list;
	}

	if (stall_hash != -1U) {
		raw_spin_unlock(&wqe->lock);
		io_wait_on_hash(wqe, stall_hash);
		raw_spin_lock(&wqe->lock);
427 428 429 430 431
	}

	return NULL;
}

432
static bool io_flush_signals(void)
433
{
434
	if (unlikely(test_thread_flag(TIF_NOTIFY_SIGNAL))) {
435
		__set_current_state(TASK_RUNNING);
436
		tracehook_notify_signal();
437
		return true;
438
	}
439
	return false;
440 441 442 443 444
}

static void io_assign_current_work(struct io_worker *worker,
				   struct io_wq_work *work)
{
445
	if (work) {
446
		io_flush_signals();
447 448
		cond_resched();
	}
449 450 451 452 453 454

	spin_lock_irq(&worker->lock);
	worker->cur_work = work;
	spin_unlock_irq(&worker->lock);
}

P
Pavel Begunkov 已提交
455 456
static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work);

457 458 459 460 461
static void io_worker_handle_work(struct io_worker *worker)
	__releases(wqe->lock)
{
	struct io_wqe *wqe = worker->wqe;
	struct io_wq *wq = wqe->wq;
462
	bool do_kill = test_bit(IO_WQ_BIT_EXIT, &wq->state);
463 464

	do {
465
		struct io_wq_work *work;
466
get_next:
467 468 469 470 471 472 473
		/*
		 * If we got some work, mark us as busy. If we didn't, but
		 * the list isn't empty, it means we stalled on hashed work.
		 * Mark us stalled so we don't keep looking for work when we
		 * can't make progress, any work completion or insertion will
		 * clear the stalled flag.
		 */
P
Pavel Begunkov 已提交
474
		work = io_get_next_work(wqe);
475 476
		if (work)
			__io_worker_busy(wqe, worker, work);
J
Jens Axboe 已提交
477
		else if (!wq_list_empty(&wqe->work_list))
478 479
			wqe->flags |= IO_WQE_FLAG_STALLED;

480
		raw_spin_unlock_irq(&wqe->lock);
481 482
		if (!work)
			break;
483
		io_assign_current_work(worker, work);
484
		__set_current_state(TASK_RUNNING);
485

486 487
		/* handle a whole dependent link */
		do {
488
			struct io_wq_work *next_hashed, *linked;
P
Pavel Begunkov 已提交
489
			unsigned int hash = io_get_work_hash(work);
490

491
			next_hashed = wq_next_work(work);
492 493 494

			if (unlikely(do_kill) && (work->flags & IO_WQ_WORK_UNBOUND))
				work->flags |= IO_WQ_WORK_CANCEL;
495 496
			wq->do_work(work);
			io_assign_current_work(worker, NULL);
497

498
			linked = wq->free_work(work);
499 500 501 502 503 504 505 506 507 508
			work = next_hashed;
			if (!work && linked && !io_wq_is_hashed(linked)) {
				work = linked;
				linked = NULL;
			}
			io_assign_current_work(worker, work);
			if (linked)
				io_wqe_enqueue(wqe, linked);

			if (hash != -1U && !next_hashed) {
509 510 511
				clear_bit(hash, &wq->hash->map);
				if (wq_has_sleeper(&wq->hash->wait))
					wake_up(&wq->hash->wait);
512
				raw_spin_lock_irq(&wqe->lock);
513
				wqe->flags &= ~IO_WQE_FLAG_STALLED;
514 515 516
				/* skip unnecessary unlock-lock wqe->lock */
				if (!work)
					goto get_next;
517
				raw_spin_unlock_irq(&wqe->lock);
518
			}
519
		} while (work);
520

521
		raw_spin_lock_irq(&wqe->lock);
522 523 524 525 526 527 528 529
	} while (1);
}

static int io_wqe_worker(void *data)
{
	struct io_worker *worker = data;
	struct io_wqe *wqe = worker->wqe;
	struct io_wq *wq = wqe->wq;
530
	char buf[TASK_COMM_LEN];
531

532 533
	worker->flags |= (IO_WORKER_F_UP | IO_WORKER_F_RUNNING);

534
	snprintf(buf, sizeof(buf), "iou-wrk-%d", wq->task->pid);
535
	set_task_comm(current, buf);
536 537

	while (!test_bit(IO_WQ_BIT_EXIT, &wq->state)) {
538 539
		long ret;

J
Jens Axboe 已提交
540
		set_current_state(TASK_INTERRUPTIBLE);
541
loop:
542
		raw_spin_lock_irq(&wqe->lock);
543 544
		if (io_wqe_run_queue(wqe)) {
			io_worker_handle_work(worker);
545
			goto loop;
546
		}
547
		__io_worker_idle(wqe, worker);
548
		raw_spin_unlock_irq(&wqe->lock);
549 550
		if (io_flush_signals())
			continue;
551
		ret = schedule_timeout(WORKER_IDLE_TIMEOUT);
552 553 554 555 556
		if (signal_pending(current)) {
			struct ksignal ksig;

			if (!get_signal(&ksig))
				continue;
557
			break;
558 559 560
		}
		if (ret)
			continue;
561 562 563 564 565 566 567
		/* timed out, exit unless we're the fixed worker */
		if (test_bit(IO_WQ_BIT_EXIT, &wq->state) ||
		    !(worker->flags & IO_WORKER_F_FIXED))
			break;
	}

	if (test_bit(IO_WQ_BIT_EXIT, &wq->state)) {
568
		raw_spin_lock_irq(&wqe->lock);
J
Jens Axboe 已提交
569
		if (!wq_list_empty(&wqe->work_list))
570 571
			io_worker_handle_work(worker);
		else
572
			raw_spin_unlock_irq(&wqe->lock);
573 574 575 576 577 578 579 580 581 582 583
	}

	io_worker_exit(worker);
	return 0;
}

/*
 * Called when a worker is scheduled in. Mark us as currently running.
 */
void io_wq_worker_running(struct task_struct *tsk)
{
584
	struct io_worker *worker = tsk->pf_io_worker;
585

586 587
	if (!worker)
		return;
588 589 590 591 592
	if (!(worker->flags & IO_WORKER_F_UP))
		return;
	if (worker->flags & IO_WORKER_F_RUNNING)
		return;
	worker->flags |= IO_WORKER_F_RUNNING;
593
	io_wqe_inc_running(worker);
594 595 596 597
}

/*
 * Called when worker is going to sleep. If there are no workers currently
598
 * running and we have work pending, wake up a free one or create a new one.
599 600 601
 */
void io_wq_worker_sleeping(struct task_struct *tsk)
{
602
	struct io_worker *worker = tsk->pf_io_worker;
603

604 605
	if (!worker)
		return;
606 607 608 609 610 611 612
	if (!(worker->flags & IO_WORKER_F_UP))
		return;
	if (!(worker->flags & IO_WORKER_F_RUNNING))
		return;

	worker->flags &= ~IO_WORKER_F_RUNNING;

613
	raw_spin_lock_irq(&worker->wqe->lock);
614
	io_wqe_dec_running(worker);
615
	raw_spin_unlock_irq(&worker->wqe->lock);
616 617
}

618
static void create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index)
619
{
620
	struct io_wqe_acct *acct = &wqe->acct[index];
621
	struct io_worker *worker;
622
	struct task_struct *tsk;
623

624 625
	__set_current_state(TASK_RUNNING);

626 627
	worker = kzalloc_node(sizeof(*worker), GFP_KERNEL, wqe->node);
	if (!worker)
628
		goto fail;
629 630 631 632 633

	refcount_set(&worker->ref, 1);
	worker->nulls_node.pprev = NULL;
	worker->wqe = wqe;
	spin_lock_init(&worker->lock);
634
	init_completion(&worker->ref_done);
635

636 637
	tsk = create_io_thread(io_wqe_worker, worker, wqe->node);
	if (IS_ERR(tsk)) {
638
		kfree(worker);
639 640 641 642
fail:
		atomic_dec(&acct->nr_running);
		io_worker_ref_put(wq);
		return;
643
	}
644 645 646 647

	tsk->pf_io_worker = worker;
	worker->task = tsk;
	set_cpus_allowed_ptr(tsk, cpumask_of_node(wqe->node));
648
	tsk->flags |= PF_NO_SETAFFINITY;
649 650 651 652 653 654 655 656 657 658 659 660

	raw_spin_lock_irq(&wqe->lock);
	hlist_nulls_add_head_rcu(&worker->nulls_node, &wqe->free_list);
	list_add_tail_rcu(&worker->all_list, &wqe->all_list);
	worker->flags |= IO_WORKER_F_FREE;
	if (index == IO_WQ_ACCT_BOUND)
		worker->flags |= IO_WORKER_F_BOUND;
	if (!acct->nr_workers && (worker->flags & IO_WORKER_F_BOUND))
		worker->flags |= IO_WORKER_F_FIXED;
	acct->nr_workers++;
	raw_spin_unlock_irq(&wqe->lock);
	wake_up_new_task(tsk);
661 662
}

663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689
/*
 * Iterate the passed in list and call the specific function for each
 * worker that isn't exiting
 */
static bool io_wq_for_each_worker(struct io_wqe *wqe,
				  bool (*func)(struct io_worker *, void *),
				  void *data)
{
	struct io_worker *worker;
	bool ret = false;

	list_for_each_entry_rcu(worker, &wqe->all_list, all_list) {
		if (io_worker_get(worker)) {
			/* no task if node is/was offline */
			if (worker->task)
				ret = func(worker, data);
			io_worker_release(worker);
			if (ret)
				break;
		}
	}

	return ret;
}

static bool io_wq_worker_wake(struct io_worker *worker, void *data)
{
690
	set_notify_signal(worker->task);
691 692 693 694
	wake_up_process(worker->task);
	return false;
}

695 696 697 698 699
static bool io_wq_work_match_all(struct io_wq_work *work, void *data)
{
	return true;
}

700
static void io_run_cancel(struct io_wq_work *work, struct io_wqe *wqe)
701
{
702 703
	struct io_wq *wq = wqe->wq;

704 705
	do {
		work->flags |= IO_WQ_WORK_CANCEL;
706 707
		wq->do_work(work);
		work = wq->free_work(work);
708 709 710
	} while (work);
}

711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730
static void io_wqe_insert_work(struct io_wqe *wqe, struct io_wq_work *work)
{
	unsigned int hash;
	struct io_wq_work *tail;

	if (!io_wq_is_hashed(work)) {
append:
		wq_list_add_tail(&work->list, &wqe->work_list);
		return;
	}

	hash = io_get_work_hash(work);
	tail = wqe->hash_tail[hash];
	wqe->hash_tail[hash] = work;
	if (!tail)
		goto append;

	wq_list_add_after(&work->list, &tail->list, &wqe->work_list);
}

731 732
static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work)
{
733
	struct io_wqe_acct *acct = io_work_get_acct(wqe, work);
734
	int work_flags;
735 736
	unsigned long flags;

737
	if (test_bit(IO_WQ_BIT_EXIT, &wqe->wq->state)) {
738
		io_run_cancel(work, wqe);
739 740 741
		return;
	}

742
	work_flags = work->flags;
743
	raw_spin_lock_irqsave(&wqe->lock, flags);
744
	io_wqe_insert_work(wqe, work);
745
	wqe->flags &= ~IO_WQE_FLAG_STALLED;
746
	raw_spin_unlock_irqrestore(&wqe->lock, flags);
747

748 749
	if ((work_flags & IO_WQ_WORK_CONCURRENT) ||
	    !atomic_read(&acct->nr_running))
750
		io_wqe_wake_worker(wqe, acct);
751 752 753 754 755 756 757 758 759 760
}

void io_wq_enqueue(struct io_wq *wq, struct io_wq_work *work)
{
	struct io_wqe *wqe = wq->wqes[numa_node_id()];

	io_wqe_enqueue(wqe, work);
}

/*
761 762
 * Work items that hash to the same value will not be done in parallel.
 * Used to limit concurrent writes, generally hashed by inode.
763
 */
764
void io_wq_hash_work(struct io_wq_work *work, void *val)
765
{
766
	unsigned int bit;
767 768 769 770 771

	bit = hash_ptr(val, IO_WQ_HASH_ORDER);
	work->flags |= (IO_WQ_WORK_HASHED | (bit << IO_WQ_HASH_SHIFT));
}

772
static bool io_wq_worker_cancel(struct io_worker *worker, void *data)
773
{
774
	struct io_cb_cancel_data *match = data;
775
	unsigned long flags;
776 777 778

	/*
	 * Hold the lock to avoid ->cur_work going out of scope, caller
779
	 * may dereference the passed in work.
780
	 */
781
	spin_lock_irqsave(&worker->lock, flags);
782
	if (worker->cur_work &&
783
	    match->fn(worker->cur_work, match->data)) {
784
		set_notify_signal(worker->task);
785
		match->nr_running++;
786
	}
787
	spin_unlock_irqrestore(&worker->lock, flags);
788

789
	return match->nr_running && !match->cancel_all;
790 791
}

792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809
static inline void io_wqe_remove_pending(struct io_wqe *wqe,
					 struct io_wq_work *work,
					 struct io_wq_work_node *prev)
{
	unsigned int hash = io_get_work_hash(work);
	struct io_wq_work *prev_work = NULL;

	if (io_wq_is_hashed(work) && work == wqe->hash_tail[hash]) {
		if (prev)
			prev_work = container_of(prev, struct io_wq_work, list);
		if (prev_work && io_get_work_hash(prev_work) == hash)
			wqe->hash_tail[hash] = prev_work;
		else
			wqe->hash_tail[hash] = NULL;
	}
	wq_list_del(&wqe->work_list, &work->list, prev);
}

810
static void io_wqe_cancel_pending_work(struct io_wqe *wqe,
811
				       struct io_cb_cancel_data *match)
812
{
J
Jens Axboe 已提交
813
	struct io_wq_work_node *node, *prev;
814
	struct io_wq_work *work;
815
	unsigned long flags;
816

817
retry:
818
	raw_spin_lock_irqsave(&wqe->lock, flags);
J
Jens Axboe 已提交
819 820
	wq_list_for_each(node, prev, &wqe->work_list) {
		work = container_of(node, struct io_wq_work, list);
821 822
		if (!match->fn(work, match->data))
			continue;
823
		io_wqe_remove_pending(wqe, work, prev);
824
		raw_spin_unlock_irqrestore(&wqe->lock, flags);
825 826 827 828 829 830 831
		io_run_cancel(work, wqe);
		match->nr_pending++;
		if (!match->cancel_all)
			return;

		/* not safe to continue after unlock */
		goto retry;
832
	}
833
	raw_spin_unlock_irqrestore(&wqe->lock, flags);
834 835
}

836
static void io_wqe_cancel_running_work(struct io_wqe *wqe,
837 838
				       struct io_cb_cancel_data *match)
{
839
	rcu_read_lock();
840
	io_wq_for_each_worker(wqe, io_wq_worker_cancel, match);
841 842 843
	rcu_read_unlock();
}

844
enum io_wq_cancel io_wq_cancel_cb(struct io_wq *wq, work_cancel_fn *cancel,
845
				  void *data, bool cancel_all)
846
{
847
	struct io_cb_cancel_data match = {
848 849 850
		.fn		= cancel,
		.data		= data,
		.cancel_all	= cancel_all,
851
	};
J
Jann Horn 已提交
852
	int node;
853

854 855 856 857 858
	/*
	 * First check pending list, if we're lucky we can just remove it
	 * from there. CANCEL_OK means that the work is returned as-new,
	 * no completion will be posted for it.
	 */
J
Jann Horn 已提交
859 860
	for_each_node(node) {
		struct io_wqe *wqe = wq->wqes[node];
861

862 863
		io_wqe_cancel_pending_work(wqe, &match);
		if (match.nr_pending && !match.cancel_all)
864
			return IO_WQ_CANCEL_OK;
865 866
	}

867 868 869 870 871 872 873 874 875
	/*
	 * Now check if a free (going busy) or busy worker has the work
	 * currently running. If we find it there, we'll return CANCEL_RUNNING
	 * as an indication that we attempt to signal cancellation. The
	 * completion will run normally in this case.
	 */
	for_each_node(node) {
		struct io_wqe *wqe = wq->wqes[node];

876 877
		io_wqe_cancel_running_work(wqe, &match);
		if (match.nr_running && !match.cancel_all)
878 879 880
			return IO_WQ_CANCEL_RUNNING;
	}

881 882 883 884
	if (match.nr_running)
		return IO_WQ_CANCEL_RUNNING;
	if (match.nr_pending)
		return IO_WQ_CANCEL_OK;
885
	return IO_WQ_CANCEL_NOTFOUND;
886 887
}

888 889 890 891 892 893 894 895
static int io_wqe_hash_wake(struct wait_queue_entry *wait, unsigned mode,
			    int sync, void *key)
{
	struct io_wqe *wqe = container_of(wait, struct io_wqe, wait);

	list_del_init(&wait->entry);

	rcu_read_lock();
896
	io_wqe_activate_free_worker(wqe);
897 898 899 900
	rcu_read_unlock();
	return 1;
}

901
struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data)
902
{
J
Jann Horn 已提交
903
	int ret = -ENOMEM, node;
904 905
	struct io_wq *wq;

906
	if (WARN_ON_ONCE(!data->free_work || !data->do_work))
907 908
		return ERR_PTR(-EINVAL);

909
	wq = kzalloc(struct_size(wq, wqes, nr_node_ids), GFP_KERNEL);
910 911
	if (!wq)
		return ERR_PTR(-ENOMEM);
912 913
	ret = cpuhp_state_add_instance_nocalls(io_wq_online, &wq->cpuhp_node);
	if (ret)
914
		goto err_wq;
915

916 917
	refcount_inc(&data->hash->refs);
	wq->hash = data->hash;
918
	wq->free_work = data->free_work;
919
	wq->do_work = data->do_work;
920

921
	ret = -ENOMEM;
J
Jann Horn 已提交
922
	for_each_node(node) {
923
		struct io_wqe *wqe;
924
		int alloc_node = node;
925

926 927 928
		if (!node_online(alloc_node))
			alloc_node = NUMA_NO_NODE;
		wqe = kzalloc_node(sizeof(struct io_wqe), GFP_KERNEL, alloc_node);
929
		if (!wqe)
J
Jann Horn 已提交
930 931
			goto err;
		wq->wqes[node] = wqe;
932
		wqe->node = alloc_node;
933 934
		wqe->acct[IO_WQ_ACCT_BOUND].index = IO_WQ_ACCT_BOUND;
		wqe->acct[IO_WQ_ACCT_UNBOUND].index = IO_WQ_ACCT_UNBOUND;
935 936
		wqe->acct[IO_WQ_ACCT_BOUND].max_workers = bounded;
		atomic_set(&wqe->acct[IO_WQ_ACCT_BOUND].nr_running, 0);
J
Jens Axboe 已提交
937
		wqe->acct[IO_WQ_ACCT_UNBOUND].max_workers =
938 939
					task_rlimit(current, RLIMIT_NPROC);
		atomic_set(&wqe->acct[IO_WQ_ACCT_UNBOUND].nr_running, 0);
940 941
		wqe->wait.func = io_wqe_hash_wake;
		INIT_LIST_HEAD(&wqe->wait.entry);
942
		wqe->wq = wq;
943
		raw_spin_lock_init(&wqe->lock);
J
Jens Axboe 已提交
944
		INIT_WQ_LIST(&wqe->work_list);
945
		INIT_HLIST_NULLS_HEAD(&wqe->free_list, 0);
946
		INIT_LIST_HEAD(&wqe->all_list);
947 948
	}

949 950 951 952
	wq->task = get_task_struct(data->task);
	atomic_set(&wq->worker_refs, 1);
	init_completion(&wq->worker_done);
	return wq;
953
err:
954
	io_wq_put_hash(data->hash);
955
	cpuhp_state_remove_instance_nocalls(io_wq_online, &wq->cpuhp_node);
J
Jann Horn 已提交
956 957
	for_each_node(node)
		kfree(wq->wqes[node]);
958
err_wq:
959
	kfree(wq);
960 961 962
	return ERR_PTR(ret);
}

963 964 965 966 967 968 969 970 971 972
static bool io_task_work_match(struct callback_head *cb, void *data)
{
	struct create_worker_data *cwd;

	if (cb->func != create_worker_cb)
		return false;
	cwd = container_of(cb, struct create_worker_data, work);
	return cwd->wqe->wq == data;
}

973 974 975 976 977
void io_wq_exit_start(struct io_wq *wq)
{
	set_bit(IO_WQ_BIT_EXIT, &wq->state);
}

978
static void io_wq_exit_workers(struct io_wq *wq)
979
{
980 981 982 983 984 985
	struct callback_head *cb;
	int node;

	if (!wq->task)
		return;

986
	while ((cb = task_work_cancel_match(wq->task, io_task_work_match, wq)) != NULL) {
987 988 989 990 991 992 993 994 995 996 997 998 999
		struct create_worker_data *cwd;

		cwd = container_of(cb, struct create_worker_data, work);
		atomic_dec(&cwd->wqe->acct[cwd->index].nr_running);
		io_worker_ref_put(wq);
		kfree(cwd);
	}

	rcu_read_lock();
	for_each_node(node) {
		struct io_wqe *wqe = wq->wqes[node];

		io_wq_for_each_worker(wqe, io_wq_worker_wake, NULL);
1000
	}
1001 1002 1003
	rcu_read_unlock();
	io_worker_ref_put(wq);
	wait_for_completion(&wq->worker_done);
1004 1005 1006 1007 1008 1009

	for_each_node(node) {
		spin_lock_irq(&wq->hash->wait.lock);
		list_del_init(&wq->wqes[node]->wait.entry);
		spin_unlock_irq(&wq->hash->wait.lock);
	}
1010 1011
	put_task_struct(wq->task);
	wq->task = NULL;
1012 1013
}

1014
static void io_wq_destroy(struct io_wq *wq)
1015
{
J
Jann Horn 已提交
1016
	int node;
1017

1018 1019
	cpuhp_state_remove_instance_nocalls(io_wq_online, &wq->cpuhp_node);

1020 1021
	for_each_node(node) {
		struct io_wqe *wqe = wq->wqes[node];
1022 1023 1024 1025 1026
		struct io_cb_cancel_data match = {
			.fn		= io_wq_work_match_all,
			.cancel_all	= true,
		};
		io_wqe_cancel_pending_work(wqe, &match);
1027 1028 1029
		kfree(wqe);
	}
	io_wq_put_hash(wq->hash);
1030
	kfree(wq);
1031 1032
}

1033 1034
void io_wq_put_and_exit(struct io_wq *wq)
{
1035 1036
	WARN_ON_ONCE(!test_bit(IO_WQ_BIT_EXIT, &wq->state));

1037
	io_wq_exit_workers(wq);
1038
	io_wq_destroy(wq);
1039 1040
}

1041 1042
static bool io_wq_worker_affinity(struct io_worker *worker, void *data)
{
1043 1044
	set_cpus_allowed_ptr(worker->task, cpumask_of_node(worker->wqe->node));

1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071
	return false;
}

static int io_wq_cpu_online(unsigned int cpu, struct hlist_node *node)
{
	struct io_wq *wq = hlist_entry_safe(node, struct io_wq, cpuhp_node);
	int i;

	rcu_read_lock();
	for_each_node(i)
		io_wq_for_each_worker(wq->wqes[i], io_wq_worker_affinity, NULL);
	rcu_read_unlock();
	return 0;
}

static __init int io_wq_init(void)
{
	int ret;

	ret = cpuhp_setup_state_multi(CPUHP_AP_ONLINE_DYN, "io-wq/online",
					io_wq_cpu_online, NULL);
	if (ret < 0)
		return ret;
	io_wq_online = ret;
	return 0;
}
subsys_initcall(io_wq_init);