io-wq.c 25.9 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14
// SPDX-License-Identifier: GPL-2.0
/*
 * Basic worker thread pool for io_uring
 *
 * Copyright (C) 2019 Jens Axboe
 *
 */
#include <linux/kernel.h>
#include <linux/init.h>
#include <linux/errno.h>
#include <linux/sched/signal.h>
#include <linux/percpu.h>
#include <linux/slab.h>
#include <linux/rculist_nulls.h>
15
#include <linux/cpu.h>
16
#include <linux/tracehook.h>
17 18 19 20 21 22 23 24 25

#include "io-wq.h"

#define WORKER_IDLE_TIMEOUT	(5 * HZ)

enum {
	IO_WORKER_F_UP		= 1,	/* up and active */
	IO_WORKER_F_RUNNING	= 2,	/* account as running */
	IO_WORKER_F_FREE	= 4,	/* worker on free list */
26 27
	IO_WORKER_F_FIXED	= 8,	/* static idle worker */
	IO_WORKER_F_BOUND	= 16,	/* is doing bounded work */
28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
};

enum {
	IO_WQ_BIT_EXIT		= 0,	/* wq exiting */
};

enum {
	IO_WQE_FLAG_STALLED	= 1,	/* stalled on hash */
};

/*
 * One for each thread in a wqe pool
 */
struct io_worker {
	refcount_t ref;
	unsigned flags;
	struct hlist_nulls_node nulls_node;
45
	struct list_head all_list;
46 47
	struct task_struct *task;
	struct io_wqe *wqe;
48

49
	struct io_wq_work *cur_work;
50
	spinlock_t lock;
51

52 53
	struct completion ref_done;

54 55 56 57 58 59 60 61 62
	struct rcu_head rcu;
};

#if BITS_PER_LONG == 64
#define IO_WQ_HASH_ORDER	6
#else
#define IO_WQ_HASH_ORDER	5
#endif

63 64
#define IO_WQ_NR_HASH_BUCKETS	(1u << IO_WQ_HASH_ORDER)

65 66 67
struct io_wqe_acct {
	unsigned nr_workers;
	unsigned max_workers;
68
	int index;
69 70 71 72 73 74 75 76
	atomic_t nr_running;
};

enum {
	IO_WQ_ACCT_BOUND,
	IO_WQ_ACCT_UNBOUND,
};

77 78 79 80 81
/*
 * Per-node worker thread pool
 */
struct io_wqe {
	struct {
82
		raw_spinlock_t lock;
J
Jens Axboe 已提交
83
		struct io_wq_work_list work_list;
84 85 86 87
		unsigned flags;
	} ____cacheline_aligned_in_smp;

	int node;
88
	struct io_wqe_acct acct[2];
89

90
	struct hlist_nulls_head free_list;
91
	struct list_head all_list;
92

93 94
	struct wait_queue_entry wait;

95
	struct io_wq *wq;
96
	struct io_wq_work *hash_tail[IO_WQ_NR_HASH_BUCKETS];
J
Jens Axboe 已提交
97 98

	cpumask_var_t cpu_mask;
99 100 101 102 103 104 105 106
};

/*
 * Per io_wq state
  */
struct io_wq {
	unsigned long state;

107
	free_work_fn *free_work;
108
	io_wq_work_fn *do_work;
109

110 111
	struct io_wq_hash *hash;

112 113 114
	atomic_t worker_refs;
	struct completion worker_done;

115
	struct hlist_node cpuhp_node;
116

117
	struct task_struct *task;
118 119

	struct io_wqe *wqes[];
120 121
};

122 123
static enum cpuhp_state io_wq_online;

124 125 126 127 128 129 130 131
struct io_cb_cancel_data {
	work_cancel_fn *fn;
	void *data;
	int nr_running;
	int nr_pending;
	bool cancel_all;
};

132
static void create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index);
133

134 135 136 137 138 139 140 141
static bool io_worker_get(struct io_worker *worker)
{
	return refcount_inc_not_zero(&worker->ref);
}

static void io_worker_release(struct io_worker *worker)
{
	if (refcount_dec_and_test(&worker->ref))
142
		complete(&worker->ref_done);
143 144
}

P
Pavel Begunkov 已提交
145 146 147 148 149
static inline struct io_wqe_acct *io_get_acct(struct io_wqe *wqe, bool bound)
{
	return &wqe->acct[bound ? IO_WQ_ACCT_BOUND : IO_WQ_ACCT_UNBOUND];
}

150 151 152
static inline struct io_wqe_acct *io_work_get_acct(struct io_wqe *wqe,
						   struct io_wq_work *work)
{
P
Pavel Begunkov 已提交
153
	return io_get_acct(wqe, !(work->flags & IO_WQ_WORK_UNBOUND));
154 155
}

156
static inline struct io_wqe_acct *io_wqe_get_acct(struct io_worker *worker)
157
{
P
Pavel Begunkov 已提交
158
	return io_get_acct(worker->wqe, worker->flags & IO_WORKER_F_BOUND);
159 160
}

161 162 163 164 165 166
static void io_worker_ref_put(struct io_wq *wq)
{
	if (atomic_dec_and_test(&wq->worker_refs))
		complete(&wq->worker_done);
}

167 168 169
static void io_worker_exit(struct io_worker *worker)
{
	struct io_wqe *wqe = worker->wqe;
170
	struct io_wqe_acct *acct = io_wqe_get_acct(worker);
171
	unsigned flags;
172

173 174 175
	if (refcount_dec_and_test(&worker->ref))
		complete(&worker->ref_done);
	wait_for_completion(&worker->ref_done);
176 177 178

	preempt_disable();
	current->flags &= ~PF_IO_WORKER;
179 180 181
	flags = worker->flags;
	worker->flags = 0;
	if (flags & IO_WORKER_F_RUNNING)
182
		atomic_dec(&acct->nr_running);
183 184 185
	worker->flags = 0;
	preempt_enable();

186
	raw_spin_lock_irq(&wqe->lock);
187 188
	if (flags & IO_WORKER_F_FREE)
		hlist_nulls_del_rcu(&worker->nulls_node);
189
	list_del_rcu(&worker->all_list);
190
	acct->nr_workers--;
191
	raw_spin_unlock_irq(&wqe->lock);
192

193
	kfree_rcu(worker, rcu);
194
	io_worker_ref_put(wqe->wq);
195
	do_exit(0);
196 197
}

198 199 200
static inline bool io_wqe_run_queue(struct io_wqe *wqe)
	__must_hold(wqe->lock)
{
J
Jens Axboe 已提交
201 202
	if (!wq_list_empty(&wqe->work_list) &&
	    !(wqe->flags & IO_WQE_FLAG_STALLED))
203 204 205 206 207 208
		return true;
	return false;
}

/*
 * Check head of free list for an available worker. If one isn't available,
209
 * caller must create one.
210 211 212 213 214 215 216
 */
static bool io_wqe_activate_free_worker(struct io_wqe *wqe)
	__must_hold(RCU)
{
	struct hlist_nulls_node *n;
	struct io_worker *worker;

217
	n = rcu_dereference(hlist_nulls_first_rcu(&wqe->free_list));
218 219 220 221 222
	if (is_a_nulls(n))
		return false;

	worker = hlist_nulls_entry(n, struct io_worker, nulls_node);
	if (io_worker_get(worker)) {
J
Jens Axboe 已提交
223
		wake_up_process(worker->task);
224 225 226 227 228 229 230 231 232
		io_worker_release(worker);
		return true;
	}

	return false;
}

/*
 * We need a worker. If we find a free one, we're good. If not, and we're
233
 * below the max number of workers, create one.
234 235 236 237 238 239 240 241 242 243 244 245 246 247 248
 */
static void io_wqe_wake_worker(struct io_wqe *wqe, struct io_wqe_acct *acct)
{
	bool ret;

	/*
	 * Most likely an attempt to queue unbounded work on an io_wq that
	 * wasn't setup with any unbounded workers.
	 */
	WARN_ON_ONCE(!acct->max_workers);

	rcu_read_lock();
	ret = io_wqe_activate_free_worker(wqe);
	rcu_read_unlock();

249 250 251 252 253
	if (!ret && acct->nr_workers < acct->max_workers) {
		atomic_inc(&acct->nr_running);
		atomic_inc(&wqe->wq->worker_refs);
		create_io_worker(wqe->wq, wqe, acct->index);
	}
254 255
}

256
static void io_wqe_inc_running(struct io_worker *worker)
257
{
258
	struct io_wqe_acct *acct = io_wqe_get_acct(worker);
259 260 261 262

	atomic_inc(&acct->nr_running);
}

263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303
struct create_worker_data {
	struct callback_head work;
	struct io_wqe *wqe;
	int index;
};

static void create_worker_cb(struct callback_head *cb)
{
	struct create_worker_data *cwd;
	struct io_wq *wq;

	cwd = container_of(cb, struct create_worker_data, work);
	wq = cwd->wqe->wq;
	create_io_worker(wq, cwd->wqe, cwd->index);
	kfree(cwd);
}

static void io_queue_worker_create(struct io_wqe *wqe, struct io_wqe_acct *acct)
{
	struct create_worker_data *cwd;
	struct io_wq *wq = wqe->wq;

	/* raced with exit, just ignore create call */
	if (test_bit(IO_WQ_BIT_EXIT, &wq->state))
		goto fail;

	cwd = kmalloc(sizeof(*cwd), GFP_ATOMIC);
	if (cwd) {
		init_task_work(&cwd->work, create_worker_cb);
		cwd->wqe = wqe;
		cwd->index = acct->index;
		if (!task_work_add(wq->task, &cwd->work, TWA_SIGNAL))
			return;

		kfree(cwd);
	}
fail:
	atomic_dec(&acct->nr_running);
	io_worker_ref_put(wq);
}

304
static void io_wqe_dec_running(struct io_worker *worker)
305 306
	__must_hold(wqe->lock)
{
307 308
	struct io_wqe_acct *acct = io_wqe_get_acct(worker);
	struct io_wqe *wqe = worker->wqe;
309

310 311 312 313 314 315 316 317
	if (!(worker->flags & IO_WORKER_F_UP))
		return;

	if (atomic_dec_and_test(&acct->nr_running) && io_wqe_run_queue(wqe)) {
		atomic_inc(&acct->nr_running);
		atomic_inc(&wqe->wq->worker_refs);
		io_queue_worker_create(wqe, acct);
	}
318 319
}

320 321 322 323 324 325 326 327
/*
 * Worker will start processing some work. Move it to the busy list, if
 * it's currently on the freelist
 */
static void __io_worker_busy(struct io_wqe *wqe, struct io_worker *worker,
			     struct io_wq_work *work)
	__must_hold(wqe->lock)
{
328 329
	bool worker_bound, work_bound;

330 331
	BUILD_BUG_ON((IO_WQ_ACCT_UNBOUND ^ IO_WQ_ACCT_BOUND) != 1);

332 333 334 335
	if (worker->flags & IO_WORKER_F_FREE) {
		worker->flags &= ~IO_WORKER_F_FREE;
		hlist_nulls_del_init_rcu(&worker->nulls_node);
	}
336 337 338 339 340

	/*
	 * If worker is moving from bound to unbound (or vice versa), then
	 * ensure we update the running accounting.
	 */
341 342 343
	worker_bound = (worker->flags & IO_WORKER_F_BOUND) != 0;
	work_bound = (work->flags & IO_WQ_WORK_UNBOUND) == 0;
	if (worker_bound != work_bound) {
344
		int index = work_bound ? IO_WQ_ACCT_UNBOUND : IO_WQ_ACCT_BOUND;
345
		io_wqe_dec_running(worker);
346 347 348
		worker->flags ^= IO_WORKER_F_BOUND;
		wqe->acct[index].nr_workers--;
		wqe->acct[index ^ 1].nr_workers++;
349
		io_wqe_inc_running(worker);
350
	 }
351 352 353 354 355 356 357 358 359
}

/*
 * No work, worker going to sleep. Move to freelist, and unuse mm if we
 * have one attached. Dropping the mm may potentially sleep, so we drop
 * the lock in that case and return success. Since the caller has to
 * retry the loop in that case (we changed task state), we don't regrab
 * the lock if we return success.
 */
360
static void __io_worker_idle(struct io_wqe *wqe, struct io_worker *worker)
361 362 363 364
	__must_hold(wqe->lock)
{
	if (!(worker->flags & IO_WORKER_F_FREE)) {
		worker->flags |= IO_WORKER_F_FREE;
365
		hlist_nulls_add_head_rcu(&worker->nulls_node, &wqe->free_list);
366 367 368
	}
}

P
Pavel Begunkov 已提交
369 370 371 372 373
static inline unsigned int io_get_work_hash(struct io_wq_work *work)
{
	return work->flags >> IO_WQ_HASH_SHIFT;
}

374 375 376 377 378 379 380 381 382 383 384 385 386 387 388
static void io_wait_on_hash(struct io_wqe *wqe, unsigned int hash)
{
	struct io_wq *wq = wqe->wq;

	spin_lock(&wq->hash->wait.lock);
	if (list_empty(&wqe->wait.entry)) {
		__add_wait_queue(&wq->hash->wait, &wqe->wait);
		if (!test_bit(hash, &wq->hash->map)) {
			__set_current_state(TASK_RUNNING);
			list_del_init(&wqe->wait.entry);
		}
	}
	spin_unlock(&wq->hash->wait.lock);
}

P
Pavel Begunkov 已提交
389
static struct io_wq_work *io_get_next_work(struct io_wqe *wqe)
390 391
	__must_hold(wqe->lock)
{
J
Jens Axboe 已提交
392
	struct io_wq_work_node *node, *prev;
393
	struct io_wq_work *work, *tail;
394
	unsigned int stall_hash = -1U;
395

J
Jens Axboe 已提交
396
	wq_list_for_each(node, prev, &wqe->work_list) {
397 398
		unsigned int hash;

J
Jens Axboe 已提交
399 400
		work = container_of(node, struct io_wq_work, list);

401
		/* not hashed, can run anytime */
402
		if (!io_wq_is_hashed(work)) {
403
			wq_list_del(&wqe->work_list, node, prev);
404 405 406
			return work;
		}

P
Pavel Begunkov 已提交
407
		hash = io_get_work_hash(work);
408 409 410 411 412
		/* all items with this hash lie in [work, tail] */
		tail = wqe->hash_tail[hash];

		/* hashed, can run if not already running */
		if (!test_and_set_bit(hash, &wqe->wq->hash->map)) {
413 414
			wqe->hash_tail[hash] = NULL;
			wq_list_cut(&wqe->work_list, &tail->list, prev);
415 416
			return work;
		}
417 418 419 420 421 422 423 424 425 426
		if (stall_hash == -1U)
			stall_hash = hash;
		/* fast forward to a next hash, for-each will fix up @prev */
		node = &tail->list;
	}

	if (stall_hash != -1U) {
		raw_spin_unlock(&wqe->lock);
		io_wait_on_hash(wqe, stall_hash);
		raw_spin_lock(&wqe->lock);
427 428 429 430 431
	}

	return NULL;
}

432
static bool io_flush_signals(void)
433
{
434
	if (unlikely(test_thread_flag(TIF_NOTIFY_SIGNAL))) {
435
		__set_current_state(TASK_RUNNING);
436
		tracehook_notify_signal();
437
		return true;
438
	}
439
	return false;
440 441 442 443 444
}

static void io_assign_current_work(struct io_worker *worker,
				   struct io_wq_work *work)
{
445
	if (work) {
446
		io_flush_signals();
447 448
		cond_resched();
	}
449 450 451 452 453 454

	spin_lock_irq(&worker->lock);
	worker->cur_work = work;
	spin_unlock_irq(&worker->lock);
}

P
Pavel Begunkov 已提交
455 456
static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work);

457 458 459 460 461
static void io_worker_handle_work(struct io_worker *worker)
	__releases(wqe->lock)
{
	struct io_wqe *wqe = worker->wqe;
	struct io_wq *wq = wqe->wq;
462
	bool do_kill = test_bit(IO_WQ_BIT_EXIT, &wq->state);
463 464

	do {
465
		struct io_wq_work *work;
466
get_next:
467 468 469 470 471 472 473
		/*
		 * If we got some work, mark us as busy. If we didn't, but
		 * the list isn't empty, it means we stalled on hashed work.
		 * Mark us stalled so we don't keep looking for work when we
		 * can't make progress, any work completion or insertion will
		 * clear the stalled flag.
		 */
P
Pavel Begunkov 已提交
474
		work = io_get_next_work(wqe);
475 476
		if (work)
			__io_worker_busy(wqe, worker, work);
J
Jens Axboe 已提交
477
		else if (!wq_list_empty(&wqe->work_list))
478 479
			wqe->flags |= IO_WQE_FLAG_STALLED;

480
		raw_spin_unlock_irq(&wqe->lock);
481 482
		if (!work)
			break;
483
		io_assign_current_work(worker, work);
484
		__set_current_state(TASK_RUNNING);
485

486 487
		/* handle a whole dependent link */
		do {
488
			struct io_wq_work *next_hashed, *linked;
P
Pavel Begunkov 已提交
489
			unsigned int hash = io_get_work_hash(work);
490

491
			next_hashed = wq_next_work(work);
492 493 494

			if (unlikely(do_kill) && (work->flags & IO_WQ_WORK_UNBOUND))
				work->flags |= IO_WQ_WORK_CANCEL;
495 496
			wq->do_work(work);
			io_assign_current_work(worker, NULL);
497

498
			linked = wq->free_work(work);
499 500 501 502 503 504 505 506 507 508
			work = next_hashed;
			if (!work && linked && !io_wq_is_hashed(linked)) {
				work = linked;
				linked = NULL;
			}
			io_assign_current_work(worker, work);
			if (linked)
				io_wqe_enqueue(wqe, linked);

			if (hash != -1U && !next_hashed) {
509 510 511
				clear_bit(hash, &wq->hash->map);
				if (wq_has_sleeper(&wq->hash->wait))
					wake_up(&wq->hash->wait);
512
				raw_spin_lock_irq(&wqe->lock);
513
				wqe->flags &= ~IO_WQE_FLAG_STALLED;
514 515 516
				/* skip unnecessary unlock-lock wqe->lock */
				if (!work)
					goto get_next;
517
				raw_spin_unlock_irq(&wqe->lock);
518
			}
519
		} while (work);
520

521
		raw_spin_lock_irq(&wqe->lock);
522 523 524 525 526 527 528 529
	} while (1);
}

static int io_wqe_worker(void *data)
{
	struct io_worker *worker = data;
	struct io_wqe *wqe = worker->wqe;
	struct io_wq *wq = wqe->wq;
530
	char buf[TASK_COMM_LEN];
531

532 533
	worker->flags |= (IO_WORKER_F_UP | IO_WORKER_F_RUNNING);

534
	snprintf(buf, sizeof(buf), "iou-wrk-%d", wq->task->pid);
535
	set_task_comm(current, buf);
536 537

	while (!test_bit(IO_WQ_BIT_EXIT, &wq->state)) {
538 539
		long ret;

J
Jens Axboe 已提交
540
		set_current_state(TASK_INTERRUPTIBLE);
541
loop:
542
		raw_spin_lock_irq(&wqe->lock);
543 544
		if (io_wqe_run_queue(wqe)) {
			io_worker_handle_work(worker);
545
			goto loop;
546
		}
547
		__io_worker_idle(wqe, worker);
548
		raw_spin_unlock_irq(&wqe->lock);
549 550
		if (io_flush_signals())
			continue;
551
		ret = schedule_timeout(WORKER_IDLE_TIMEOUT);
552 553 554 555 556
		if (signal_pending(current)) {
			struct ksignal ksig;

			if (!get_signal(&ksig))
				continue;
557
			break;
558 559 560
		}
		if (ret)
			continue;
561
		/* timed out, exit unless we're the fixed worker */
562
		if (!(worker->flags & IO_WORKER_F_FIXED))
563 564 565 566
			break;
	}

	if (test_bit(IO_WQ_BIT_EXIT, &wq->state)) {
567
		raw_spin_lock_irq(&wqe->lock);
P
Pavel Begunkov 已提交
568
		io_worker_handle_work(worker);
569 570 571 572 573 574 575 576 577 578 579
	}

	io_worker_exit(worker);
	return 0;
}

/*
 * Called when a worker is scheduled in. Mark us as currently running.
 */
void io_wq_worker_running(struct task_struct *tsk)
{
580
	struct io_worker *worker = tsk->pf_io_worker;
581

582 583
	if (!worker)
		return;
584 585 586 587 588
	if (!(worker->flags & IO_WORKER_F_UP))
		return;
	if (worker->flags & IO_WORKER_F_RUNNING)
		return;
	worker->flags |= IO_WORKER_F_RUNNING;
589
	io_wqe_inc_running(worker);
590 591 592 593
}

/*
 * Called when worker is going to sleep. If there are no workers currently
594
 * running and we have work pending, wake up a free one or create a new one.
595 596 597
 */
void io_wq_worker_sleeping(struct task_struct *tsk)
{
598
	struct io_worker *worker = tsk->pf_io_worker;
599

600 601
	if (!worker)
		return;
602 603 604 605 606 607 608
	if (!(worker->flags & IO_WORKER_F_UP))
		return;
	if (!(worker->flags & IO_WORKER_F_RUNNING))
		return;

	worker->flags &= ~IO_WORKER_F_RUNNING;

609
	raw_spin_lock_irq(&worker->wqe->lock);
610
	io_wqe_dec_running(worker);
611
	raw_spin_unlock_irq(&worker->wqe->lock);
612 613
}

614
static void create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index)
615
{
616
	struct io_wqe_acct *acct = &wqe->acct[index];
617
	struct io_worker *worker;
618
	struct task_struct *tsk;
619

620 621
	__set_current_state(TASK_RUNNING);

622 623
	worker = kzalloc_node(sizeof(*worker), GFP_KERNEL, wqe->node);
	if (!worker)
624
		goto fail;
625 626 627 628 629

	refcount_set(&worker->ref, 1);
	worker->nulls_node.pprev = NULL;
	worker->wqe = wqe;
	spin_lock_init(&worker->lock);
630
	init_completion(&worker->ref_done);
631

632 633
	tsk = create_io_thread(io_wqe_worker, worker, wqe->node);
	if (IS_ERR(tsk)) {
634
		kfree(worker);
635 636 637 638
fail:
		atomic_dec(&acct->nr_running);
		io_worker_ref_put(wq);
		return;
639
	}
640 641 642

	tsk->pf_io_worker = worker;
	worker->task = tsk;
J
Jens Axboe 已提交
643
	set_cpus_allowed_ptr(tsk, wqe->cpu_mask);
644
	tsk->flags |= PF_NO_SETAFFINITY;
645 646 647 648 649 650 651 652 653 654 655 656

	raw_spin_lock_irq(&wqe->lock);
	hlist_nulls_add_head_rcu(&worker->nulls_node, &wqe->free_list);
	list_add_tail_rcu(&worker->all_list, &wqe->all_list);
	worker->flags |= IO_WORKER_F_FREE;
	if (index == IO_WQ_ACCT_BOUND)
		worker->flags |= IO_WORKER_F_BOUND;
	if (!acct->nr_workers && (worker->flags & IO_WORKER_F_BOUND))
		worker->flags |= IO_WORKER_F_FIXED;
	acct->nr_workers++;
	raw_spin_unlock_irq(&wqe->lock);
	wake_up_new_task(tsk);
657 658
}

659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685
/*
 * Iterate the passed in list and call the specific function for each
 * worker that isn't exiting
 */
static bool io_wq_for_each_worker(struct io_wqe *wqe,
				  bool (*func)(struct io_worker *, void *),
				  void *data)
{
	struct io_worker *worker;
	bool ret = false;

	list_for_each_entry_rcu(worker, &wqe->all_list, all_list) {
		if (io_worker_get(worker)) {
			/* no task if node is/was offline */
			if (worker->task)
				ret = func(worker, data);
			io_worker_release(worker);
			if (ret)
				break;
		}
	}

	return ret;
}

static bool io_wq_worker_wake(struct io_worker *worker, void *data)
{
686
	set_notify_signal(worker->task);
687 688 689 690
	wake_up_process(worker->task);
	return false;
}

691 692 693 694 695
static bool io_wq_work_match_all(struct io_wq_work *work, void *data)
{
	return true;
}

696
static void io_run_cancel(struct io_wq_work *work, struct io_wqe *wqe)
697
{
698 699
	struct io_wq *wq = wqe->wq;

700 701
	do {
		work->flags |= IO_WQ_WORK_CANCEL;
702 703
		wq->do_work(work);
		work = wq->free_work(work);
704 705 706
	} while (work);
}

707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726
static void io_wqe_insert_work(struct io_wqe *wqe, struct io_wq_work *work)
{
	unsigned int hash;
	struct io_wq_work *tail;

	if (!io_wq_is_hashed(work)) {
append:
		wq_list_add_tail(&work->list, &wqe->work_list);
		return;
	}

	hash = io_get_work_hash(work);
	tail = wqe->hash_tail[hash];
	wqe->hash_tail[hash] = work;
	if (!tail)
		goto append;

	wq_list_add_after(&work->list, &tail->list, &wqe->work_list);
}

727 728
static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work)
{
729
	struct io_wqe_acct *acct = io_work_get_acct(wqe, work);
730
	int work_flags;
731 732
	unsigned long flags;

733
	if (test_bit(IO_WQ_BIT_EXIT, &wqe->wq->state)) {
734
		io_run_cancel(work, wqe);
735 736 737
		return;
	}

738
	work_flags = work->flags;
739
	raw_spin_lock_irqsave(&wqe->lock, flags);
740
	io_wqe_insert_work(wqe, work);
741
	wqe->flags &= ~IO_WQE_FLAG_STALLED;
742
	raw_spin_unlock_irqrestore(&wqe->lock, flags);
743

744 745
	if ((work_flags & IO_WQ_WORK_CONCURRENT) ||
	    !atomic_read(&acct->nr_running))
746
		io_wqe_wake_worker(wqe, acct);
747 748 749 750 751 752 753 754 755 756
}

void io_wq_enqueue(struct io_wq *wq, struct io_wq_work *work)
{
	struct io_wqe *wqe = wq->wqes[numa_node_id()];

	io_wqe_enqueue(wqe, work);
}

/*
757 758
 * Work items that hash to the same value will not be done in parallel.
 * Used to limit concurrent writes, generally hashed by inode.
759
 */
760
void io_wq_hash_work(struct io_wq_work *work, void *val)
761
{
762
	unsigned int bit;
763 764 765 766 767

	bit = hash_ptr(val, IO_WQ_HASH_ORDER);
	work->flags |= (IO_WQ_WORK_HASHED | (bit << IO_WQ_HASH_SHIFT));
}

768
static bool io_wq_worker_cancel(struct io_worker *worker, void *data)
769
{
770
	struct io_cb_cancel_data *match = data;
771
	unsigned long flags;
772 773 774

	/*
	 * Hold the lock to avoid ->cur_work going out of scope, caller
775
	 * may dereference the passed in work.
776
	 */
777
	spin_lock_irqsave(&worker->lock, flags);
778
	if (worker->cur_work &&
779
	    match->fn(worker->cur_work, match->data)) {
780
		set_notify_signal(worker->task);
781
		match->nr_running++;
782
	}
783
	spin_unlock_irqrestore(&worker->lock, flags);
784

785
	return match->nr_running && !match->cancel_all;
786 787
}

788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805
static inline void io_wqe_remove_pending(struct io_wqe *wqe,
					 struct io_wq_work *work,
					 struct io_wq_work_node *prev)
{
	unsigned int hash = io_get_work_hash(work);
	struct io_wq_work *prev_work = NULL;

	if (io_wq_is_hashed(work) && work == wqe->hash_tail[hash]) {
		if (prev)
			prev_work = container_of(prev, struct io_wq_work, list);
		if (prev_work && io_get_work_hash(prev_work) == hash)
			wqe->hash_tail[hash] = prev_work;
		else
			wqe->hash_tail[hash] = NULL;
	}
	wq_list_del(&wqe->work_list, &work->list, prev);
}

806
static void io_wqe_cancel_pending_work(struct io_wqe *wqe,
807
				       struct io_cb_cancel_data *match)
808
{
J
Jens Axboe 已提交
809
	struct io_wq_work_node *node, *prev;
810
	struct io_wq_work *work;
811
	unsigned long flags;
812

813
retry:
814
	raw_spin_lock_irqsave(&wqe->lock, flags);
J
Jens Axboe 已提交
815 816
	wq_list_for_each(node, prev, &wqe->work_list) {
		work = container_of(node, struct io_wq_work, list);
817 818
		if (!match->fn(work, match->data))
			continue;
819
		io_wqe_remove_pending(wqe, work, prev);
820
		raw_spin_unlock_irqrestore(&wqe->lock, flags);
821 822 823 824 825 826 827
		io_run_cancel(work, wqe);
		match->nr_pending++;
		if (!match->cancel_all)
			return;

		/* not safe to continue after unlock */
		goto retry;
828
	}
829
	raw_spin_unlock_irqrestore(&wqe->lock, flags);
830 831
}

832
static void io_wqe_cancel_running_work(struct io_wqe *wqe,
833 834
				       struct io_cb_cancel_data *match)
{
835
	rcu_read_lock();
836
	io_wq_for_each_worker(wqe, io_wq_worker_cancel, match);
837 838 839
	rcu_read_unlock();
}

840
enum io_wq_cancel io_wq_cancel_cb(struct io_wq *wq, work_cancel_fn *cancel,
841
				  void *data, bool cancel_all)
842
{
843
	struct io_cb_cancel_data match = {
844 845 846
		.fn		= cancel,
		.data		= data,
		.cancel_all	= cancel_all,
847
	};
J
Jann Horn 已提交
848
	int node;
849

850 851 852 853 854
	/*
	 * First check pending list, if we're lucky we can just remove it
	 * from there. CANCEL_OK means that the work is returned as-new,
	 * no completion will be posted for it.
	 */
J
Jann Horn 已提交
855 856
	for_each_node(node) {
		struct io_wqe *wqe = wq->wqes[node];
857

858 859
		io_wqe_cancel_pending_work(wqe, &match);
		if (match.nr_pending && !match.cancel_all)
860
			return IO_WQ_CANCEL_OK;
861 862
	}

863 864 865 866 867 868 869 870 871
	/*
	 * Now check if a free (going busy) or busy worker has the work
	 * currently running. If we find it there, we'll return CANCEL_RUNNING
	 * as an indication that we attempt to signal cancellation. The
	 * completion will run normally in this case.
	 */
	for_each_node(node) {
		struct io_wqe *wqe = wq->wqes[node];

872 873
		io_wqe_cancel_running_work(wqe, &match);
		if (match.nr_running && !match.cancel_all)
874 875 876
			return IO_WQ_CANCEL_RUNNING;
	}

877 878 879 880
	if (match.nr_running)
		return IO_WQ_CANCEL_RUNNING;
	if (match.nr_pending)
		return IO_WQ_CANCEL_OK;
881
	return IO_WQ_CANCEL_NOTFOUND;
882 883
}

884 885 886 887 888 889 890 891
static int io_wqe_hash_wake(struct wait_queue_entry *wait, unsigned mode,
			    int sync, void *key)
{
	struct io_wqe *wqe = container_of(wait, struct io_wqe, wait);

	list_del_init(&wait->entry);

	rcu_read_lock();
892
	io_wqe_activate_free_worker(wqe);
893 894 895 896
	rcu_read_unlock();
	return 1;
}

897
struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data)
898
{
899
	int ret, node;
900 901
	struct io_wq *wq;

902
	if (WARN_ON_ONCE(!data->free_work || !data->do_work))
903 904
		return ERR_PTR(-EINVAL);

905
	wq = kzalloc(struct_size(wq, wqes, nr_node_ids), GFP_KERNEL);
906 907
	if (!wq)
		return ERR_PTR(-ENOMEM);
908 909
	ret = cpuhp_state_add_instance_nocalls(io_wq_online, &wq->cpuhp_node);
	if (ret)
910
		goto err_wq;
911

912 913
	refcount_inc(&data->hash->refs);
	wq->hash = data->hash;
914
	wq->free_work = data->free_work;
915
	wq->do_work = data->do_work;
916

917
	ret = -ENOMEM;
J
Jann Horn 已提交
918
	for_each_node(node) {
919
		struct io_wqe *wqe;
920
		int alloc_node = node;
921

922 923 924
		if (!node_online(alloc_node))
			alloc_node = NUMA_NO_NODE;
		wqe = kzalloc_node(sizeof(struct io_wqe), GFP_KERNEL, alloc_node);
925
		if (!wqe)
J
Jann Horn 已提交
926
			goto err;
J
Jens Axboe 已提交
927 928 929
		if (!alloc_cpumask_var(&wqe->cpu_mask, GFP_KERNEL))
			goto err;
		cpumask_copy(wqe->cpu_mask, cpumask_of_node(node));
J
Jann Horn 已提交
930
		wq->wqes[node] = wqe;
931
		wqe->node = alloc_node;
932 933
		wqe->acct[IO_WQ_ACCT_BOUND].index = IO_WQ_ACCT_BOUND;
		wqe->acct[IO_WQ_ACCT_UNBOUND].index = IO_WQ_ACCT_UNBOUND;
934 935
		wqe->acct[IO_WQ_ACCT_BOUND].max_workers = bounded;
		atomic_set(&wqe->acct[IO_WQ_ACCT_BOUND].nr_running, 0);
J
Jens Axboe 已提交
936
		wqe->acct[IO_WQ_ACCT_UNBOUND].max_workers =
937 938
					task_rlimit(current, RLIMIT_NPROC);
		atomic_set(&wqe->acct[IO_WQ_ACCT_UNBOUND].nr_running, 0);
939 940
		wqe->wait.func = io_wqe_hash_wake;
		INIT_LIST_HEAD(&wqe->wait.entry);
941
		wqe->wq = wq;
942
		raw_spin_lock_init(&wqe->lock);
J
Jens Axboe 已提交
943
		INIT_WQ_LIST(&wqe->work_list);
944
		INIT_HLIST_NULLS_HEAD(&wqe->free_list, 0);
945
		INIT_LIST_HEAD(&wqe->all_list);
946 947
	}

948 949 950 951
	wq->task = get_task_struct(data->task);
	atomic_set(&wq->worker_refs, 1);
	init_completion(&wq->worker_done);
	return wq;
952
err:
953
	io_wq_put_hash(data->hash);
954
	cpuhp_state_remove_instance_nocalls(io_wq_online, &wq->cpuhp_node);
J
Jens Axboe 已提交
955 956 957 958
	for_each_node(node) {
		if (!wq->wqes[node])
			continue;
		free_cpumask_var(wq->wqes[node]->cpu_mask);
J
Jann Horn 已提交
959
		kfree(wq->wqes[node]);
J
Jens Axboe 已提交
960
	}
961
err_wq:
962
	kfree(wq);
963 964 965
	return ERR_PTR(ret);
}

966 967 968 969 970 971 972 973 974 975
static bool io_task_work_match(struct callback_head *cb, void *data)
{
	struct create_worker_data *cwd;

	if (cb->func != create_worker_cb)
		return false;
	cwd = container_of(cb, struct create_worker_data, work);
	return cwd->wqe->wq == data;
}

976 977 978 979 980
void io_wq_exit_start(struct io_wq *wq)
{
	set_bit(IO_WQ_BIT_EXIT, &wq->state);
}

981
static void io_wq_exit_workers(struct io_wq *wq)
982
{
983 984 985 986 987 988
	struct callback_head *cb;
	int node;

	if (!wq->task)
		return;

989
	while ((cb = task_work_cancel_match(wq->task, io_task_work_match, wq)) != NULL) {
990 991 992 993 994 995 996 997 998 999 1000 1001 1002
		struct create_worker_data *cwd;

		cwd = container_of(cb, struct create_worker_data, work);
		atomic_dec(&cwd->wqe->acct[cwd->index].nr_running);
		io_worker_ref_put(wq);
		kfree(cwd);
	}

	rcu_read_lock();
	for_each_node(node) {
		struct io_wqe *wqe = wq->wqes[node];

		io_wq_for_each_worker(wqe, io_wq_worker_wake, NULL);
1003
	}
1004 1005 1006
	rcu_read_unlock();
	io_worker_ref_put(wq);
	wait_for_completion(&wq->worker_done);
1007 1008 1009 1010 1011 1012

	for_each_node(node) {
		spin_lock_irq(&wq->hash->wait.lock);
		list_del_init(&wq->wqes[node]->wait.entry);
		spin_unlock_irq(&wq->hash->wait.lock);
	}
1013 1014
	put_task_struct(wq->task);
	wq->task = NULL;
1015 1016
}

1017
static void io_wq_destroy(struct io_wq *wq)
1018
{
J
Jann Horn 已提交
1019
	int node;
1020

1021 1022
	cpuhp_state_remove_instance_nocalls(io_wq_online, &wq->cpuhp_node);

1023 1024
	for_each_node(node) {
		struct io_wqe *wqe = wq->wqes[node];
1025 1026 1027 1028 1029
		struct io_cb_cancel_data match = {
			.fn		= io_wq_work_match_all,
			.cancel_all	= true,
		};
		io_wqe_cancel_pending_work(wqe, &match);
J
Jens Axboe 已提交
1030
		free_cpumask_var(wqe->cpu_mask);
1031 1032 1033
		kfree(wqe);
	}
	io_wq_put_hash(wq->hash);
1034
	kfree(wq);
1035 1036
}

1037 1038
void io_wq_put_and_exit(struct io_wq *wq)
{
1039 1040
	WARN_ON_ONCE(!test_bit(IO_WQ_BIT_EXIT, &wq->state));

1041
	io_wq_exit_workers(wq);
1042
	io_wq_destroy(wq);
1043 1044
}

J
Jens Axboe 已提交
1045 1046 1047 1048 1049
struct online_data {
	unsigned int cpu;
	bool online;
};

1050 1051
static bool io_wq_worker_affinity(struct io_worker *worker, void *data)
{
J
Jens Axboe 已提交
1052
	struct online_data *od = data;
1053

J
Jens Axboe 已提交
1054 1055 1056 1057
	if (od->online)
		cpumask_set_cpu(od->cpu, worker->wqe->cpu_mask);
	else
		cpumask_clear_cpu(od->cpu, worker->wqe->cpu_mask);
1058 1059 1060
	return false;
}

J
Jens Axboe 已提交
1061
static int __io_wq_cpu_online(struct io_wq *wq, unsigned int cpu, bool online)
1062
{
J
Jens Axboe 已提交
1063 1064 1065 1066
	struct online_data od = {
		.cpu = cpu,
		.online = online
	};
1067 1068 1069 1070
	int i;

	rcu_read_lock();
	for_each_node(i)
J
Jens Axboe 已提交
1071
		io_wq_for_each_worker(wq->wqes[i], io_wq_worker_affinity, &od);
1072 1073 1074 1075
	rcu_read_unlock();
	return 0;
}

J
Jens Axboe 已提交
1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089
static int io_wq_cpu_online(unsigned int cpu, struct hlist_node *node)
{
	struct io_wq *wq = hlist_entry_safe(node, struct io_wq, cpuhp_node);

	return __io_wq_cpu_online(wq, cpu, true);
}

static int io_wq_cpu_offline(unsigned int cpu, struct hlist_node *node)
{
	struct io_wq *wq = hlist_entry_safe(node, struct io_wq, cpuhp_node);

	return __io_wq_cpu_online(wq, cpu, false);
}

1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106
int io_wq_cpu_affinity(struct io_wq *wq, cpumask_var_t mask)
{
	int i;

	rcu_read_lock();
	for_each_node(i) {
		struct io_wqe *wqe = wq->wqes[i];

		if (mask)
			cpumask_copy(wqe->cpu_mask, mask);
		else
			cpumask_copy(wqe->cpu_mask, cpumask_of_node(i));
	}
	rcu_read_unlock();
	return 0;
}

1107 1108 1109 1110 1111
static __init int io_wq_init(void)
{
	int ret;

	ret = cpuhp_setup_state_multi(CPUHP_AP_ONLINE_DYN, "io-wq/online",
J
Jens Axboe 已提交
1112
					io_wq_cpu_online, io_wq_cpu_offline);
1113 1114 1115 1116 1117 1118
	if (ret < 0)
		return ret;
	io_wq_online = ret;
	return 0;
}
subsys_initcall(io_wq_init);