io-wq.c 32.3 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14
// SPDX-License-Identifier: GPL-2.0
/*
 * Basic worker thread pool for io_uring
 *
 * Copyright (C) 2019 Jens Axboe
 *
 */
#include <linux/kernel.h>
#include <linux/init.h>
#include <linux/errno.h>
#include <linux/sched/signal.h>
#include <linux/percpu.h>
#include <linux/slab.h>
#include <linux/rculist_nulls.h>
15
#include <linux/cpu.h>
16
#include <linux/tracehook.h>
17
#include <linux/audit.h>
18
#include <uapi/linux/io_uring.h>
19 20 21 22 23 24 25 26 27

#include "io-wq.h"

#define WORKER_IDLE_TIMEOUT	(5 * HZ)

enum {
	IO_WORKER_F_UP		= 1,	/* up and active */
	IO_WORKER_F_RUNNING	= 2,	/* account as running */
	IO_WORKER_F_FREE	= 4,	/* worker on free list */
J
Jens Axboe 已提交
28
	IO_WORKER_F_BOUND	= 8,	/* is doing bounded work */
29 30 31 32 33 34 35
};

enum {
	IO_WQ_BIT_EXIT		= 0,	/* wq exiting */
};

enum {
36
	IO_ACCT_STALLED_BIT	= 0,	/* stalled on hash */
37 38 39 40 41 42 43 44 45
};

/*
 * One for each thread in a wqe pool
 */
struct io_worker {
	refcount_t ref;
	unsigned flags;
	struct hlist_nulls_node nulls_node;
46
	struct list_head all_list;
47 48
	struct task_struct *task;
	struct io_wqe *wqe;
49

50
	struct io_wq_work *cur_work;
51
	spinlock_t lock;
52

53 54
	struct completion ref_done;

55 56 57 58
	unsigned long create_state;
	struct callback_head create_work;
	int create_index;

59 60 61 62
	union {
		struct rcu_head rcu;
		struct work_struct work;
	};
63 64 65 66 67 68 69 70
};

#if BITS_PER_LONG == 64
#define IO_WQ_HASH_ORDER	6
#else
#define IO_WQ_HASH_ORDER	5
#endif

71 72
#define IO_WQ_NR_HASH_BUCKETS	(1u << IO_WQ_HASH_ORDER)

73 74 75
struct io_wqe_acct {
	unsigned nr_workers;
	unsigned max_workers;
76
	int index;
77
	atomic_t nr_running;
78 79
	struct io_wq_work_list work_list;
	unsigned long flags;
80 81 82 83 84
};

enum {
	IO_WQ_ACCT_BOUND,
	IO_WQ_ACCT_UNBOUND,
85
	IO_WQ_ACCT_NR,
86 87
};

88 89 90 91
/*
 * Per-node worker thread pool
 */
struct io_wqe {
92 93
	raw_spinlock_t lock;
	struct io_wqe_acct acct[2];
94 95 96

	int node;

97
	struct hlist_nulls_head free_list;
98
	struct list_head all_list;
99

100 101
	struct wait_queue_entry wait;

102
	struct io_wq *wq;
103
	struct io_wq_work *hash_tail[IO_WQ_NR_HASH_BUCKETS];
J
Jens Axboe 已提交
104 105

	cpumask_var_t cpu_mask;
106 107 108 109 110 111 112 113
};

/*
 * Per io_wq state
  */
struct io_wq {
	unsigned long state;

114
	free_work_fn *free_work;
115
	io_wq_work_fn *do_work;
116

117 118
	struct io_wq_hash *hash;

119 120 121
	atomic_t worker_refs;
	struct completion worker_done;

122
	struct hlist_node cpuhp_node;
123

124
	struct task_struct *task;
125 126

	struct io_wqe *wqes[];
127 128
};

129 130
static enum cpuhp_state io_wq_online;

131 132 133 134 135 136 137 138
struct io_cb_cancel_data {
	work_cancel_fn *fn;
	void *data;
	int nr_running;
	int nr_pending;
	bool cancel_all;
};

139
static bool create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index);
140
static void io_wqe_dec_running(struct io_worker *worker);
141 142 143
static bool io_acct_cancel_pending_work(struct io_wqe *wqe,
					struct io_wqe_acct *acct,
					struct io_cb_cancel_data *match);
144
static void create_worker_cb(struct callback_head *cb);
145

146 147 148 149 150 151 152 153
static bool io_worker_get(struct io_worker *worker)
{
	return refcount_inc_not_zero(&worker->ref);
}

static void io_worker_release(struct io_worker *worker)
{
	if (refcount_dec_and_test(&worker->ref))
154
		complete(&worker->ref_done);
155 156
}

P
Pavel Begunkov 已提交
157 158 159 160 161
static inline struct io_wqe_acct *io_get_acct(struct io_wqe *wqe, bool bound)
{
	return &wqe->acct[bound ? IO_WQ_ACCT_BOUND : IO_WQ_ACCT_UNBOUND];
}

162 163 164
static inline struct io_wqe_acct *io_work_get_acct(struct io_wqe *wqe,
						   struct io_wq_work *work)
{
P
Pavel Begunkov 已提交
165
	return io_get_acct(wqe, !(work->flags & IO_WQ_WORK_UNBOUND));
166 167
}

168
static inline struct io_wqe_acct *io_wqe_get_acct(struct io_worker *worker)
169
{
P
Pavel Begunkov 已提交
170
	return io_get_acct(worker->wqe, worker->flags & IO_WORKER_F_BOUND);
171 172
}

173 174 175 176 177 178
static void io_worker_ref_put(struct io_wq *wq)
{
	if (atomic_dec_and_test(&wq->worker_refs))
		complete(&wq->worker_done);
}

179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203
static void io_worker_cancel_cb(struct io_worker *worker)
{
	struct io_wqe_acct *acct = io_wqe_get_acct(worker);
	struct io_wqe *wqe = worker->wqe;
	struct io_wq *wq = wqe->wq;

	atomic_dec(&acct->nr_running);
	raw_spin_lock(&worker->wqe->lock);
	acct->nr_workers--;
	raw_spin_unlock(&worker->wqe->lock);
	io_worker_ref_put(wq);
	clear_bit_unlock(0, &worker->create_state);
	io_worker_release(worker);
}

static bool io_task_worker_match(struct callback_head *cb, void *data)
{
	struct io_worker *worker;

	if (cb->func != create_worker_cb)
		return false;
	worker = container_of(cb, struct io_worker, create_work);
	return worker == data;
}

204 205 206
static void io_worker_exit(struct io_worker *worker)
{
	struct io_wqe *wqe = worker->wqe;
207
	struct io_wq *wq = wqe->wq;
208

209 210 211 212 213 214 215 216
	while (1) {
		struct callback_head *cb = task_work_cancel_match(wq->task,
						io_task_worker_match, worker);

		if (!cb)
			break;
		io_worker_cancel_cb(worker);
	}
217

218
	io_worker_release(worker);
219
	wait_for_completion(&worker->ref_done);
220

221
	raw_spin_lock(&wqe->lock);
222
	if (worker->flags & IO_WORKER_F_FREE)
223
		hlist_nulls_del_rcu(&worker->nulls_node);
224
	list_del_rcu(&worker->all_list);
225 226 227 228 229
	preempt_disable();
	io_wqe_dec_running(worker);
	worker->flags = 0;
	current->flags &= ~PF_IO_WORKER;
	preempt_enable();
230
	raw_spin_unlock(&wqe->lock);
231

232
	kfree_rcu(worker, rcu);
233
	io_worker_ref_put(wqe->wq);
234
	do_exit(0);
235 236
}

237
static inline bool io_acct_run_queue(struct io_wqe_acct *acct)
238
{
239 240
	if (!wq_list_empty(&acct->work_list) &&
	    !test_bit(IO_ACCT_STALLED_BIT, &acct->flags))
241 242 243 244 245 246
		return true;
	return false;
}

/*
 * Check head of free list for an available worker. If one isn't available,
247
 * caller must create one.
248
 */
249 250
static bool io_wqe_activate_free_worker(struct io_wqe *wqe,
					struct io_wqe_acct *acct)
251 252 253 254 255
	__must_hold(RCU)
{
	struct hlist_nulls_node *n;
	struct io_worker *worker;

256 257 258 259 260 261 262 263
	/*
	 * Iterate free_list and see if we can find an idle worker to
	 * activate. If a given worker is on the free_list but in the process
	 * of exiting, keep trying.
	 */
	hlist_nulls_for_each_entry_rcu(worker, n, &wqe->free_list, nulls_node) {
		if (!io_worker_get(worker))
			continue;
264 265 266 267
		if (io_wqe_get_acct(worker) != acct) {
			io_worker_release(worker);
			continue;
		}
268 269 270 271
		if (wake_up_process(worker->task)) {
			io_worker_release(worker);
			return true;
		}
272 273 274 275 276 277 278 279
		io_worker_release(worker);
	}

	return false;
}

/*
 * We need a worker. If we find a free one, we're good. If not, and we're
280
 * below the max number of workers, create one.
281
 */
282
static bool io_wqe_create_worker(struct io_wqe *wqe, struct io_wqe_acct *acct)
283 284 285 286 287
{
	/*
	 * Most likely an attempt to queue unbounded work on an io_wq that
	 * wasn't setup with any unbounded workers.
	 */
P
Pavel Begunkov 已提交
288 289
	if (unlikely(!acct->max_workers))
		pr_warn_once("io-wq is not configured for unbound workers");
290

291
	raw_spin_lock(&wqe->lock);
P
Pavel Begunkov 已提交
292
	if (acct->nr_workers >= acct->max_workers) {
293 294
		raw_spin_unlock(&wqe->lock);
		return true;
295
	}
296
	acct->nr_workers++;
297
	raw_spin_unlock(&wqe->lock);
298 299 300
	atomic_inc(&acct->nr_running);
	atomic_inc(&wqe->wq->worker_refs);
	return create_io_worker(wqe->wq, wqe, acct->index);
301 302
}

303
static void io_wqe_inc_running(struct io_worker *worker)
304
{
305
	struct io_wqe_acct *acct = io_wqe_get_acct(worker);
306 307 308 309

	atomic_inc(&acct->nr_running);
}

310 311
static void create_worker_cb(struct callback_head *cb)
{
312
	struct io_worker *worker;
313
	struct io_wq *wq;
314 315
	struct io_wqe *wqe;
	struct io_wqe_acct *acct;
J
Jens Axboe 已提交
316
	bool do_create = false;
317

318 319
	worker = container_of(cb, struct io_worker, create_work);
	wqe = worker->wqe;
320
	wq = wqe->wq;
321
	acct = &wqe->acct[worker->create_index];
322
	raw_spin_lock(&wqe->lock);
323
	if (acct->nr_workers < acct->max_workers) {
324
		acct->nr_workers++;
325 326
		do_create = true;
	}
327
	raw_spin_unlock(&wqe->lock);
328
	if (do_create) {
J
Jens Axboe 已提交
329
		create_io_worker(wq, wqe, worker->create_index);
330 331 332 333
	} else {
		atomic_dec(&acct->nr_running);
		io_worker_ref_put(wq);
	}
334 335
	clear_bit_unlock(0, &worker->create_state);
	io_worker_release(worker);
336 337
}

338 339 340
static bool io_queue_worker_create(struct io_worker *worker,
				   struct io_wqe_acct *acct,
				   task_work_func_t func)
341
{
342
	struct io_wqe *wqe = worker->wqe;
343 344 345 346 347
	struct io_wq *wq = wqe->wq;

	/* raced with exit, just ignore create call */
	if (test_bit(IO_WQ_BIT_EXIT, &wq->state))
		goto fail;
348 349 350 351 352 353 354 355 356 357 358
	if (!io_worker_get(worker))
		goto fail;
	/*
	 * create_state manages ownership of create_work/index. We should
	 * only need one entry per worker, as the worker going to sleep
	 * will trigger the condition, and waking will clear it once it
	 * runs the task_work.
	 */
	if (test_bit(0, &worker->create_state) ||
	    test_and_set_bit_lock(0, &worker->create_state))
		goto fail_release;
359

360
	init_task_work(&worker->create_work, func);
361
	worker->create_index = acct->index;
362 363
	if (!task_work_add(wq->task, &worker->create_work, TWA_SIGNAL)) {
		clear_bit_unlock(0, &worker->create_state);
364
		return true;
365
	}
366 367 368
	clear_bit_unlock(0, &worker->create_state);
fail_release:
	io_worker_release(worker);
369 370 371
fail:
	atomic_dec(&acct->nr_running);
	io_worker_ref_put(wq);
372
	return false;
373 374
}

375
static void io_wqe_dec_running(struct io_worker *worker)
376 377
	__must_hold(wqe->lock)
{
378 379
	struct io_wqe_acct *acct = io_wqe_get_acct(worker);
	struct io_wqe *wqe = worker->wqe;
380

381 382 383
	if (!(worker->flags & IO_WORKER_F_UP))
		return;

384
	if (atomic_dec_and_test(&acct->nr_running) && io_acct_run_queue(acct)) {
385 386
		atomic_inc(&acct->nr_running);
		atomic_inc(&wqe->wq->worker_refs);
387
		io_queue_worker_create(worker, acct, create_worker_cb);
388
	}
389 390
}

391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411
/*
 * Worker will start processing some work. Move it to the busy list, if
 * it's currently on the freelist
 */
static void __io_worker_busy(struct io_wqe *wqe, struct io_worker *worker,
			     struct io_wq_work *work)
	__must_hold(wqe->lock)
{
	if (worker->flags & IO_WORKER_F_FREE) {
		worker->flags &= ~IO_WORKER_F_FREE;
		hlist_nulls_del_init_rcu(&worker->nulls_node);
	}
}

/*
 * No work, worker going to sleep. Move to freelist, and unuse mm if we
 * have one attached. Dropping the mm may potentially sleep, so we drop
 * the lock in that case and return success. Since the caller has to
 * retry the loop in that case (we changed task state), we don't regrab
 * the lock if we return success.
 */
412
static void __io_worker_idle(struct io_wqe *wqe, struct io_worker *worker)
413 414 415 416
	__must_hold(wqe->lock)
{
	if (!(worker->flags & IO_WORKER_F_FREE)) {
		worker->flags |= IO_WORKER_F_FREE;
417
		hlist_nulls_add_head_rcu(&worker->nulls_node, &wqe->free_list);
418 419 420
	}
}

P
Pavel Begunkov 已提交
421 422 423 424 425
static inline unsigned int io_get_work_hash(struct io_wq_work *work)
{
	return work->flags >> IO_WQ_HASH_SHIFT;
}

426
static bool io_wait_on_hash(struct io_wqe *wqe, unsigned int hash)
427 428
{
	struct io_wq *wq = wqe->wq;
429
	bool ret = false;
430

431
	spin_lock_irq(&wq->hash->wait.lock);
432 433 434 435 436
	if (list_empty(&wqe->wait.entry)) {
		__add_wait_queue(&wq->hash->wait, &wqe->wait);
		if (!test_bit(hash, &wq->hash->map)) {
			__set_current_state(TASK_RUNNING);
			list_del_init(&wqe->wait.entry);
437
			ret = true;
438 439
		}
	}
440
	spin_unlock_irq(&wq->hash->wait.lock);
441
	return ret;
442 443
}

444
static struct io_wq_work *io_get_next_work(struct io_wqe_acct *acct,
J
Jens Axboe 已提交
445
					   struct io_worker *worker)
446 447
	__must_hold(wqe->lock)
{
J
Jens Axboe 已提交
448
	struct io_wq_work_node *node, *prev;
449
	struct io_wq_work *work, *tail;
450
	unsigned int stall_hash = -1U;
451
	struct io_wqe *wqe = worker->wqe;
452

453
	wq_list_for_each(node, prev, &acct->work_list) {
454 455
		unsigned int hash;

J
Jens Axboe 已提交
456 457
		work = container_of(node, struct io_wq_work, list);

458
		/* not hashed, can run anytime */
459
		if (!io_wq_is_hashed(work)) {
460
			wq_list_del(&acct->work_list, node, prev);
461 462 463
			return work;
		}

P
Pavel Begunkov 已提交
464
		hash = io_get_work_hash(work);
465 466 467 468 469
		/* all items with this hash lie in [work, tail] */
		tail = wqe->hash_tail[hash];

		/* hashed, can run if not already running */
		if (!test_and_set_bit(hash, &wqe->wq->hash->map)) {
470
			wqe->hash_tail[hash] = NULL;
471
			wq_list_cut(&acct->work_list, &tail->list, prev);
472 473
			return work;
		}
474 475 476 477 478 479 480
		if (stall_hash == -1U)
			stall_hash = hash;
		/* fast forward to a next hash, for-each will fix up @prev */
		node = &tail->list;
	}

	if (stall_hash != -1U) {
481 482
		bool unstalled;

J
Jens Axboe 已提交
483 484 485 486
		/*
		 * Set this before dropping the lock to avoid racing with new
		 * work being added and clearing the stalled bit.
		 */
487
		set_bit(IO_ACCT_STALLED_BIT, &acct->flags);
488
		raw_spin_unlock(&wqe->lock);
489
		unstalled = io_wait_on_hash(wqe, stall_hash);
490
		raw_spin_lock(&wqe->lock);
491 492 493 494 495
		if (unstalled) {
			clear_bit(IO_ACCT_STALLED_BIT, &acct->flags);
			if (wq_has_sleeper(&wqe->wq->hash->wait))
				wake_up(&wqe->wq->hash->wait);
		}
496 497 498 499 500
	}

	return NULL;
}

501
static bool io_flush_signals(void)
502
{
503
	if (unlikely(test_thread_flag(TIF_NOTIFY_SIGNAL))) {
504
		__set_current_state(TASK_RUNNING);
505
		tracehook_notify_signal();
506
		return true;
507
	}
508
	return false;
509 510 511 512 513
}

static void io_assign_current_work(struct io_worker *worker,
				   struct io_wq_work *work)
{
514
	if (work) {
515
		io_flush_signals();
516 517
		cond_resched();
	}
518

519
	spin_lock(&worker->lock);
520
	worker->cur_work = work;
521
	spin_unlock(&worker->lock);
522 523
}

P
Pavel Begunkov 已提交
524 525
static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work);

526 527 528
static void io_worker_handle_work(struct io_worker *worker)
	__releases(wqe->lock)
{
529
	struct io_wqe_acct *acct = io_wqe_get_acct(worker);
530 531
	struct io_wqe *wqe = worker->wqe;
	struct io_wq *wq = wqe->wq;
532
	bool do_kill = test_bit(IO_WQ_BIT_EXIT, &wq->state);
533 534

	do {
535
		struct io_wq_work *work;
536
get_next:
537 538 539 540 541 542 543
		/*
		 * If we got some work, mark us as busy. If we didn't, but
		 * the list isn't empty, it means we stalled on hashed work.
		 * Mark us stalled so we don't keep looking for work when we
		 * can't make progress, any work completion or insertion will
		 * clear the stalled flag.
		 */
544
		work = io_get_next_work(acct, worker);
545 546 547
		if (work)
			__io_worker_busy(wqe, worker, work);

548
		raw_spin_unlock(&wqe->lock);
549 550
		if (!work)
			break;
551
		io_assign_current_work(worker, work);
552
		__set_current_state(TASK_RUNNING);
553

554 555
		/* handle a whole dependent link */
		do {
556
			struct io_wq_work *next_hashed, *linked;
P
Pavel Begunkov 已提交
557
			unsigned int hash = io_get_work_hash(work);
558

559
			next_hashed = wq_next_work(work);
560 561 562

			if (unlikely(do_kill) && (work->flags & IO_WQ_WORK_UNBOUND))
				work->flags |= IO_WQ_WORK_CANCEL;
563 564
			wq->do_work(work);
			io_assign_current_work(worker, NULL);
565

566
			linked = wq->free_work(work);
567 568 569 570 571 572 573 574 575 576
			work = next_hashed;
			if (!work && linked && !io_wq_is_hashed(linked)) {
				work = linked;
				linked = NULL;
			}
			io_assign_current_work(worker, work);
			if (linked)
				io_wqe_enqueue(wqe, linked);

			if (hash != -1U && !next_hashed) {
577 578
				/* serialize hash clear with wake_up() */
				spin_lock_irq(&wq->hash->wait.lock);
579
				clear_bit(hash, &wq->hash->map);
580
				clear_bit(IO_ACCT_STALLED_BIT, &acct->flags);
581
				spin_unlock_irq(&wq->hash->wait.lock);
582 583
				if (wq_has_sleeper(&wq->hash->wait))
					wake_up(&wq->hash->wait);
584
				raw_spin_lock(&wqe->lock);
585 586 587
				/* skip unnecessary unlock-lock wqe->lock */
				if (!work)
					goto get_next;
588
				raw_spin_unlock(&wqe->lock);
589
			}
590
		} while (work);
591

592
		raw_spin_lock(&wqe->lock);
593 594 595 596 597 598
	} while (1);
}

static int io_wqe_worker(void *data)
{
	struct io_worker *worker = data;
599
	struct io_wqe_acct *acct = io_wqe_get_acct(worker);
600 601
	struct io_wqe *wqe = worker->wqe;
	struct io_wq *wq = wqe->wq;
J
Jens Axboe 已提交
602
	bool last_timeout = false;
603
	char buf[TASK_COMM_LEN];
604

605 606
	worker->flags |= (IO_WORKER_F_UP | IO_WORKER_F_RUNNING);

607
	snprintf(buf, sizeof(buf), "iou-wrk-%d", wq->task->pid);
608
	set_task_comm(current, buf);
609

610 611
	audit_alloc_kernel(current);

612
	while (!test_bit(IO_WQ_BIT_EXIT, &wq->state)) {
613 614
		long ret;

J
Jens Axboe 已提交
615
		set_current_state(TASK_INTERRUPTIBLE);
616
loop:
617
		raw_spin_lock(&wqe->lock);
618
		if (io_acct_run_queue(acct)) {
619
			io_worker_handle_work(worker);
620
			goto loop;
621
		}
J
Jens Axboe 已提交
622 623
		/* timed out, exit unless we're the last worker */
		if (last_timeout && acct->nr_workers > 1) {
624
			acct->nr_workers--;
J
Jens Axboe 已提交
625 626 627 628 629
			raw_spin_unlock(&wqe->lock);
			__set_current_state(TASK_RUNNING);
			break;
		}
		last_timeout = false;
630
		__io_worker_idle(wqe, worker);
631
		raw_spin_unlock(&wqe->lock);
632 633
		if (io_flush_signals())
			continue;
634
		ret = schedule_timeout(WORKER_IDLE_TIMEOUT);
635 636 637 638 639
		if (signal_pending(current)) {
			struct ksignal ksig;

			if (!get_signal(&ksig))
				continue;
640
			break;
641
		}
J
Jens Axboe 已提交
642
		last_timeout = !ret;
643 644 645
	}

	if (test_bit(IO_WQ_BIT_EXIT, &wq->state)) {
646
		raw_spin_lock(&wqe->lock);
P
Pavel Begunkov 已提交
647
		io_worker_handle_work(worker);
648 649
	}

650
	audit_free(current);
651 652 653 654 655 656 657 658 659
	io_worker_exit(worker);
	return 0;
}

/*
 * Called when a worker is scheduled in. Mark us as currently running.
 */
void io_wq_worker_running(struct task_struct *tsk)
{
660
	struct io_worker *worker = tsk->worker_private;
661

662 663
	if (!worker)
		return;
664 665 666 667 668
	if (!(worker->flags & IO_WORKER_F_UP))
		return;
	if (worker->flags & IO_WORKER_F_RUNNING)
		return;
	worker->flags |= IO_WORKER_F_RUNNING;
669
	io_wqe_inc_running(worker);
670 671 672 673
}

/*
 * Called when worker is going to sleep. If there are no workers currently
674
 * running and we have work pending, wake up a free one or create a new one.
675 676 677
 */
void io_wq_worker_sleeping(struct task_struct *tsk)
{
678
	struct io_worker *worker = tsk->worker_private;
679

680 681
	if (!worker)
		return;
682 683 684 685 686 687 688
	if (!(worker->flags & IO_WORKER_F_UP))
		return;
	if (!(worker->flags & IO_WORKER_F_RUNNING))
		return;

	worker->flags &= ~IO_WORKER_F_RUNNING;

689
	raw_spin_lock(&worker->wqe->lock);
690
	io_wqe_dec_running(worker);
691
	raw_spin_unlock(&worker->wqe->lock);
692 693
}

694 695 696
static void io_init_new_worker(struct io_wqe *wqe, struct io_worker *worker,
			       struct task_struct *tsk)
{
697
	tsk->worker_private = worker;
698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758
	worker->task = tsk;
	set_cpus_allowed_ptr(tsk, wqe->cpu_mask);
	tsk->flags |= PF_NO_SETAFFINITY;

	raw_spin_lock(&wqe->lock);
	hlist_nulls_add_head_rcu(&worker->nulls_node, &wqe->free_list);
	list_add_tail_rcu(&worker->all_list, &wqe->all_list);
	worker->flags |= IO_WORKER_F_FREE;
	raw_spin_unlock(&wqe->lock);
	wake_up_new_task(tsk);
}

static bool io_wq_work_match_all(struct io_wq_work *work, void *data)
{
	return true;
}

static inline bool io_should_retry_thread(long err)
{
	switch (err) {
	case -EAGAIN:
	case -ERESTARTSYS:
	case -ERESTARTNOINTR:
	case -ERESTARTNOHAND:
		return true;
	default:
		return false;
	}
}

static void create_worker_cont(struct callback_head *cb)
{
	struct io_worker *worker;
	struct task_struct *tsk;
	struct io_wqe *wqe;

	worker = container_of(cb, struct io_worker, create_work);
	clear_bit_unlock(0, &worker->create_state);
	wqe = worker->wqe;
	tsk = create_io_thread(io_wqe_worker, worker, wqe->node);
	if (!IS_ERR(tsk)) {
		io_init_new_worker(wqe, worker, tsk);
		io_worker_release(worker);
		return;
	} else if (!io_should_retry_thread(PTR_ERR(tsk))) {
		struct io_wqe_acct *acct = io_wqe_get_acct(worker);

		atomic_dec(&acct->nr_running);
		raw_spin_lock(&wqe->lock);
		acct->nr_workers--;
		if (!acct->nr_workers) {
			struct io_cb_cancel_data match = {
				.fn		= io_wq_work_match_all,
				.cancel_all	= true,
			};

			while (io_acct_cancel_pending_work(wqe, acct, &match))
				raw_spin_lock(&wqe->lock);
		}
		raw_spin_unlock(&wqe->lock);
		io_worker_ref_put(wqe->wq);
759
		kfree(worker);
760 761 762 763 764 765 766 767 768 769 770 771 772
		return;
	}

	/* re-create attempts grab a new worker ref, drop the existing one */
	io_worker_release(worker);
	schedule_work(&worker->work);
}

static void io_workqueue_create(struct work_struct *work)
{
	struct io_worker *worker = container_of(work, struct io_worker, work);
	struct io_wqe_acct *acct = io_wqe_get_acct(worker);

773
	if (!io_queue_worker_create(worker, acct, create_worker_cont))
774
		kfree(worker);
775 776 777
}

static bool create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index)
778
{
779
	struct io_wqe_acct *acct = &wqe->acct[index];
780
	struct io_worker *worker;
781
	struct task_struct *tsk;
782

783 784
	__set_current_state(TASK_RUNNING);

785
	worker = kzalloc_node(sizeof(*worker), GFP_KERNEL, wqe->node);
786
	if (!worker) {
787 788
fail:
		atomic_dec(&acct->nr_running);
789
		raw_spin_lock(&wqe->lock);
790
		acct->nr_workers--;
791
		raw_spin_unlock(&wqe->lock);
792
		io_worker_ref_put(wq);
793
		return false;
794
	}
795

796 797 798 799
	refcount_set(&worker->ref, 1);
	worker->wqe = wqe;
	spin_lock_init(&worker->lock);
	init_completion(&worker->ref_done);
800 801 802

	if (index == IO_WQ_ACCT_BOUND)
		worker->flags |= IO_WORKER_F_BOUND;
803 804 805 806 807

	tsk = create_io_thread(io_wqe_worker, worker, wqe->node);
	if (!IS_ERR(tsk)) {
		io_init_new_worker(wqe, worker, tsk);
	} else if (!io_should_retry_thread(PTR_ERR(tsk))) {
808
		kfree(worker);
809 810 811 812 813 814 815
		goto fail;
	} else {
		INIT_WORK(&worker->work, io_workqueue_create);
		schedule_work(&worker->work);
	}

	return true;
816 817
}

818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844
/*
 * Iterate the passed in list and call the specific function for each
 * worker that isn't exiting
 */
static bool io_wq_for_each_worker(struct io_wqe *wqe,
				  bool (*func)(struct io_worker *, void *),
				  void *data)
{
	struct io_worker *worker;
	bool ret = false;

	list_for_each_entry_rcu(worker, &wqe->all_list, all_list) {
		if (io_worker_get(worker)) {
			/* no task if node is/was offline */
			if (worker->task)
				ret = func(worker, data);
			io_worker_release(worker);
			if (ret)
				break;
		}
	}

	return ret;
}

static bool io_wq_worker_wake(struct io_worker *worker, void *data)
{
845
	set_notify_signal(worker->task);
846 847 848 849
	wake_up_process(worker->task);
	return false;
}

850
static void io_run_cancel(struct io_wq_work *work, struct io_wqe *wqe)
851
{
852 853
	struct io_wq *wq = wqe->wq;

854 855
	do {
		work->flags |= IO_WQ_WORK_CANCEL;
856 857
		wq->do_work(work);
		work = wq->free_work(work);
858 859 860
	} while (work);
}

861 862
static void io_wqe_insert_work(struct io_wqe *wqe, struct io_wq_work *work)
{
863
	struct io_wqe_acct *acct = io_work_get_acct(wqe, work);
864 865 866 867 868
	unsigned int hash;
	struct io_wq_work *tail;

	if (!io_wq_is_hashed(work)) {
append:
869
		wq_list_add_tail(&work->list, &acct->work_list);
870 871 872 873 874 875 876 877 878
		return;
	}

	hash = io_get_work_hash(work);
	tail = wqe->hash_tail[hash];
	wqe->hash_tail[hash] = work;
	if (!tail)
		goto append;

879
	wq_list_add_after(&work->list, &tail->list, &acct->work_list);
880 881
}

882 883 884 885 886
static bool io_wq_work_match_item(struct io_wq_work *work, void *data)
{
	return work == data;
}

887 888
static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work)
{
889
	struct io_wqe_acct *acct = io_work_get_acct(wqe, work);
890 891
	unsigned work_flags = work->flags;
	bool do_create;
892

893 894 895 896 897 898
	/*
	 * If io-wq is exiting for this task, or if the request has explicitly
	 * been marked as one that should not get executed, cancel it here.
	 */
	if (test_bit(IO_WQ_BIT_EXIT, &wqe->wq->state) ||
	    (work->flags & IO_WQ_WORK_CANCEL)) {
899
		io_run_cancel(work, wqe);
900 901 902
		return;
	}

903
	raw_spin_lock(&wqe->lock);
904
	io_wqe_insert_work(wqe, work);
905
	clear_bit(IO_ACCT_STALLED_BIT, &acct->flags);
906 907

	rcu_read_lock();
908
	do_create = !io_wqe_activate_free_worker(wqe, acct);
909 910
	rcu_read_unlock();

911
	raw_spin_unlock(&wqe->lock);
912

913
	if (do_create && ((work_flags & IO_WQ_WORK_CONCURRENT) ||
914 915 916 917
	    !atomic_read(&acct->nr_running))) {
		bool did_create;

		did_create = io_wqe_create_worker(wqe, acct);
918 919 920 921 922 923 924 925 926 927 928 929 930 931
		if (likely(did_create))
			return;

		raw_spin_lock(&wqe->lock);
		/* fatal condition, failed to create the first worker */
		if (!acct->nr_workers) {
			struct io_cb_cancel_data match = {
				.fn		= io_wq_work_match_item,
				.data		= work,
				.cancel_all	= false,
			};

			if (io_acct_cancel_pending_work(wqe, acct, &match))
				raw_spin_lock(&wqe->lock);
932
		}
933
		raw_spin_unlock(&wqe->lock);
934
	}
935 936 937 938 939 940 941 942 943 944
}

void io_wq_enqueue(struct io_wq *wq, struct io_wq_work *work)
{
	struct io_wqe *wqe = wq->wqes[numa_node_id()];

	io_wqe_enqueue(wqe, work);
}

/*
945 946
 * Work items that hash to the same value will not be done in parallel.
 * Used to limit concurrent writes, generally hashed by inode.
947
 */
948
void io_wq_hash_work(struct io_wq_work *work, void *val)
949
{
950
	unsigned int bit;
951 952 953 954 955

	bit = hash_ptr(val, IO_WQ_HASH_ORDER);
	work->flags |= (IO_WQ_WORK_HASHED | (bit << IO_WQ_HASH_SHIFT));
}

956
static bool io_wq_worker_cancel(struct io_worker *worker, void *data)
957
{
958
	struct io_cb_cancel_data *match = data;
959 960 961

	/*
	 * Hold the lock to avoid ->cur_work going out of scope, caller
962
	 * may dereference the passed in work.
963
	 */
964
	spin_lock(&worker->lock);
965
	if (worker->cur_work &&
966
	    match->fn(worker->cur_work, match->data)) {
967
		set_notify_signal(worker->task);
968
		match->nr_running++;
969
	}
970
	spin_unlock(&worker->lock);
971

972
	return match->nr_running && !match->cancel_all;
973 974
}

975 976 977 978
static inline void io_wqe_remove_pending(struct io_wqe *wqe,
					 struct io_wq_work *work,
					 struct io_wq_work_node *prev)
{
979
	struct io_wqe_acct *acct = io_work_get_acct(wqe, work);
980 981 982 983 984 985 986 987 988 989 990
	unsigned int hash = io_get_work_hash(work);
	struct io_wq_work *prev_work = NULL;

	if (io_wq_is_hashed(work) && work == wqe->hash_tail[hash]) {
		if (prev)
			prev_work = container_of(prev, struct io_wq_work, list);
		if (prev_work && io_get_work_hash(prev_work) == hash)
			wqe->hash_tail[hash] = prev_work;
		else
			wqe->hash_tail[hash] = NULL;
	}
991
	wq_list_del(&acct->work_list, &work->list, prev);
992 993
}

994 995 996 997
static bool io_acct_cancel_pending_work(struct io_wqe *wqe,
					struct io_wqe_acct *acct,
					struct io_cb_cancel_data *match)
	__releases(wqe->lock)
998
{
J
Jens Axboe 已提交
999
	struct io_wq_work_node *node, *prev;
1000 1001
	struct io_wq_work *work;

1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020
	wq_list_for_each(node, prev, &acct->work_list) {
		work = container_of(node, struct io_wq_work, list);
		if (!match->fn(work, match->data))
			continue;
		io_wqe_remove_pending(wqe, work, prev);
		raw_spin_unlock(&wqe->lock);
		io_run_cancel(work, wqe);
		match->nr_pending++;
		/* not safe to continue after unlock */
		return true;
	}

	return false;
}

static void io_wqe_cancel_pending_work(struct io_wqe *wqe,
				       struct io_cb_cancel_data *match)
{
	int i;
1021
retry:
1022
	raw_spin_lock(&wqe->lock);
1023 1024
	for (i = 0; i < IO_WQ_ACCT_NR; i++) {
		struct io_wqe_acct *acct = io_get_acct(wqe, i == 0);
1025

1026 1027 1028 1029
		if (io_acct_cancel_pending_work(wqe, acct, match)) {
			if (match->cancel_all)
				goto retry;
			return;
1030
		}
1031
	}
1032
	raw_spin_unlock(&wqe->lock);
1033 1034
}

1035
static void io_wqe_cancel_running_work(struct io_wqe *wqe,
1036 1037
				       struct io_cb_cancel_data *match)
{
1038
	rcu_read_lock();
1039
	io_wq_for_each_worker(wqe, io_wq_worker_cancel, match);
1040 1041 1042
	rcu_read_unlock();
}

1043
enum io_wq_cancel io_wq_cancel_cb(struct io_wq *wq, work_cancel_fn *cancel,
1044
				  void *data, bool cancel_all)
1045
{
1046
	struct io_cb_cancel_data match = {
1047 1048 1049
		.fn		= cancel,
		.data		= data,
		.cancel_all	= cancel_all,
1050
	};
J
Jann Horn 已提交
1051
	int node;
1052

1053 1054 1055 1056 1057
	/*
	 * First check pending list, if we're lucky we can just remove it
	 * from there. CANCEL_OK means that the work is returned as-new,
	 * no completion will be posted for it.
	 */
J
Jann Horn 已提交
1058 1059
	for_each_node(node) {
		struct io_wqe *wqe = wq->wqes[node];
1060

1061 1062
		io_wqe_cancel_pending_work(wqe, &match);
		if (match.nr_pending && !match.cancel_all)
1063
			return IO_WQ_CANCEL_OK;
1064 1065
	}

1066 1067 1068 1069 1070 1071 1072 1073 1074
	/*
	 * Now check if a free (going busy) or busy worker has the work
	 * currently running. If we find it there, we'll return CANCEL_RUNNING
	 * as an indication that we attempt to signal cancellation. The
	 * completion will run normally in this case.
	 */
	for_each_node(node) {
		struct io_wqe *wqe = wq->wqes[node];

1075 1076
		io_wqe_cancel_running_work(wqe, &match);
		if (match.nr_running && !match.cancel_all)
1077 1078 1079
			return IO_WQ_CANCEL_RUNNING;
	}

1080 1081 1082 1083
	if (match.nr_running)
		return IO_WQ_CANCEL_RUNNING;
	if (match.nr_pending)
		return IO_WQ_CANCEL_OK;
1084
	return IO_WQ_CANCEL_NOTFOUND;
1085 1086
}

1087 1088 1089 1090
static int io_wqe_hash_wake(struct wait_queue_entry *wait, unsigned mode,
			    int sync, void *key)
{
	struct io_wqe *wqe = container_of(wait, struct io_wqe, wait);
1091
	int i;
1092 1093 1094 1095

	list_del_init(&wait->entry);

	rcu_read_lock();
1096 1097 1098 1099 1100 1101
	for (i = 0; i < IO_WQ_ACCT_NR; i++) {
		struct io_wqe_acct *acct = &wqe->acct[i];

		if (test_and_clear_bit(IO_ACCT_STALLED_BIT, &acct->flags))
			io_wqe_activate_free_worker(wqe, acct);
	}
1102 1103 1104 1105
	rcu_read_unlock();
	return 1;
}

1106
struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data)
1107
{
1108
	int ret, node, i;
1109 1110
	struct io_wq *wq;

1111
	if (WARN_ON_ONCE(!data->free_work || !data->do_work))
1112
		return ERR_PTR(-EINVAL);
P
Pavel Begunkov 已提交
1113 1114
	if (WARN_ON_ONCE(!bounded))
		return ERR_PTR(-EINVAL);
1115

1116
	wq = kzalloc(struct_size(wq, wqes, nr_node_ids), GFP_KERNEL);
1117 1118
	if (!wq)
		return ERR_PTR(-ENOMEM);
1119 1120
	ret = cpuhp_state_add_instance_nocalls(io_wq_online, &wq->cpuhp_node);
	if (ret)
1121
		goto err_wq;
1122

1123 1124
	refcount_inc(&data->hash->refs);
	wq->hash = data->hash;
1125
	wq->free_work = data->free_work;
1126
	wq->do_work = data->do_work;
1127

1128
	ret = -ENOMEM;
J
Jann Horn 已提交
1129
	for_each_node(node) {
1130
		struct io_wqe *wqe;
1131
		int alloc_node = node;
1132

1133 1134 1135
		if (!node_online(alloc_node))
			alloc_node = NUMA_NO_NODE;
		wqe = kzalloc_node(sizeof(struct io_wqe), GFP_KERNEL, alloc_node);
1136
		if (!wqe)
J
Jann Horn 已提交
1137
			goto err;
J
Jens Axboe 已提交
1138 1139 1140
		if (!alloc_cpumask_var(&wqe->cpu_mask, GFP_KERNEL))
			goto err;
		cpumask_copy(wqe->cpu_mask, cpumask_of_node(node));
J
Jann Horn 已提交
1141
		wq->wqes[node] = wqe;
1142
		wqe->node = alloc_node;
1143
		wqe->acct[IO_WQ_ACCT_BOUND].max_workers = bounded;
J
Jens Axboe 已提交
1144
		wqe->acct[IO_WQ_ACCT_UNBOUND].max_workers =
1145
					task_rlimit(current, RLIMIT_NPROC);
1146
		INIT_LIST_HEAD(&wqe->wait.entry);
1147 1148 1149 1150 1151 1152 1153 1154
		wqe->wait.func = io_wqe_hash_wake;
		for (i = 0; i < IO_WQ_ACCT_NR; i++) {
			struct io_wqe_acct *acct = &wqe->acct[i];

			acct->index = i;
			atomic_set(&acct->nr_running, 0);
			INIT_WQ_LIST(&acct->work_list);
		}
1155
		wqe->wq = wq;
1156
		raw_spin_lock_init(&wqe->lock);
1157
		INIT_HLIST_NULLS_HEAD(&wqe->free_list, 0);
1158
		INIT_LIST_HEAD(&wqe->all_list);
1159 1160
	}

1161 1162 1163 1164
	wq->task = get_task_struct(data->task);
	atomic_set(&wq->worker_refs, 1);
	init_completion(&wq->worker_done);
	return wq;
1165
err:
1166
	io_wq_put_hash(data->hash);
1167
	cpuhp_state_remove_instance_nocalls(io_wq_online, &wq->cpuhp_node);
J
Jens Axboe 已提交
1168 1169 1170 1171
	for_each_node(node) {
		if (!wq->wqes[node])
			continue;
		free_cpumask_var(wq->wqes[node]->cpu_mask);
J
Jann Horn 已提交
1172
		kfree(wq->wqes[node]);
J
Jens Axboe 已提交
1173
	}
1174
err_wq:
1175
	kfree(wq);
1176 1177 1178
	return ERR_PTR(ret);
}

1179 1180
static bool io_task_work_match(struct callback_head *cb, void *data)
{
1181
	struct io_worker *worker;
1182

1183
	if (cb->func != create_worker_cb && cb->func != create_worker_cont)
1184
		return false;
1185 1186
	worker = container_of(cb, struct io_worker, create_work);
	return worker->wqe->wq == data;
1187 1188
}

1189 1190 1191 1192 1193
void io_wq_exit_start(struct io_wq *wq)
{
	set_bit(IO_WQ_BIT_EXIT, &wq->state);
}

1194
static void io_wq_exit_workers(struct io_wq *wq)
1195
{
1196 1197 1198 1199 1200 1201
	struct callback_head *cb;
	int node;

	if (!wq->task)
		return;

1202
	while ((cb = task_work_cancel_match(wq->task, io_task_work_match, wq)) != NULL) {
1203
		struct io_worker *worker;
1204

1205
		worker = container_of(cb, struct io_worker, create_work);
1206
		io_worker_cancel_cb(worker);
1207 1208 1209 1210 1211 1212 1213
	}

	rcu_read_lock();
	for_each_node(node) {
		struct io_wqe *wqe = wq->wqes[node];

		io_wq_for_each_worker(wqe, io_wq_worker_wake, NULL);
1214
	}
1215 1216 1217
	rcu_read_unlock();
	io_worker_ref_put(wq);
	wait_for_completion(&wq->worker_done);
1218 1219 1220 1221 1222 1223

	for_each_node(node) {
		spin_lock_irq(&wq->hash->wait.lock);
		list_del_init(&wq->wqes[node]->wait.entry);
		spin_unlock_irq(&wq->hash->wait.lock);
	}
1224 1225
	put_task_struct(wq->task);
	wq->task = NULL;
1226 1227
}

1228
static void io_wq_destroy(struct io_wq *wq)
1229
{
J
Jann Horn 已提交
1230
	int node;
1231

1232 1233
	cpuhp_state_remove_instance_nocalls(io_wq_online, &wq->cpuhp_node);

1234 1235
	for_each_node(node) {
		struct io_wqe *wqe = wq->wqes[node];
1236 1237 1238 1239 1240
		struct io_cb_cancel_data match = {
			.fn		= io_wq_work_match_all,
			.cancel_all	= true,
		};
		io_wqe_cancel_pending_work(wqe, &match);
J
Jens Axboe 已提交
1241
		free_cpumask_var(wqe->cpu_mask);
1242 1243 1244
		kfree(wqe);
	}
	io_wq_put_hash(wq->hash);
1245
	kfree(wq);
1246 1247
}

1248 1249
void io_wq_put_and_exit(struct io_wq *wq)
{
1250 1251
	WARN_ON_ONCE(!test_bit(IO_WQ_BIT_EXIT, &wq->state));

1252
	io_wq_exit_workers(wq);
1253
	io_wq_destroy(wq);
1254 1255
}

J
Jens Axboe 已提交
1256 1257 1258 1259 1260
struct online_data {
	unsigned int cpu;
	bool online;
};

1261 1262
static bool io_wq_worker_affinity(struct io_worker *worker, void *data)
{
J
Jens Axboe 已提交
1263
	struct online_data *od = data;
1264

J
Jens Axboe 已提交
1265 1266 1267 1268
	if (od->online)
		cpumask_set_cpu(od->cpu, worker->wqe->cpu_mask);
	else
		cpumask_clear_cpu(od->cpu, worker->wqe->cpu_mask);
1269 1270 1271
	return false;
}

J
Jens Axboe 已提交
1272
static int __io_wq_cpu_online(struct io_wq *wq, unsigned int cpu, bool online)
1273
{
J
Jens Axboe 已提交
1274 1275 1276 1277
	struct online_data od = {
		.cpu = cpu,
		.online = online
	};
1278 1279 1280 1281
	int i;

	rcu_read_lock();
	for_each_node(i)
J
Jens Axboe 已提交
1282
		io_wq_for_each_worker(wq->wqes[i], io_wq_worker_affinity, &od);
1283 1284 1285 1286
	rcu_read_unlock();
	return 0;
}

J
Jens Axboe 已提交
1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300
static int io_wq_cpu_online(unsigned int cpu, struct hlist_node *node)
{
	struct io_wq *wq = hlist_entry_safe(node, struct io_wq, cpuhp_node);

	return __io_wq_cpu_online(wq, cpu, true);
}

static int io_wq_cpu_offline(unsigned int cpu, struct hlist_node *node)
{
	struct io_wq *wq = hlist_entry_safe(node, struct io_wq, cpuhp_node);

	return __io_wq_cpu_online(wq, cpu, false);
}

1301 1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312 1313 1314 1315 1316 1317
int io_wq_cpu_affinity(struct io_wq *wq, cpumask_var_t mask)
{
	int i;

	rcu_read_lock();
	for_each_node(i) {
		struct io_wqe *wqe = wq->wqes[i];

		if (mask)
			cpumask_copy(wqe->cpu_mask, mask);
		else
			cpumask_copy(wqe->cpu_mask, cpumask_of_node(i));
	}
	rcu_read_unlock();
	return 0;
}

1318 1319 1320 1321 1322 1323
/*
 * Set max number of unbounded workers, returns old value. If new_count is 0,
 * then just return the old value.
 */
int io_wq_max_workers(struct io_wq *wq, int *new_count)
{
1324 1325 1326
	int prev[IO_WQ_ACCT_NR];
	bool first_node = true;
	int i, node;
1327

1328 1329 1330 1331
	BUILD_BUG_ON((int) IO_WQ_ACCT_BOUND   != (int) IO_WQ_BOUND);
	BUILD_BUG_ON((int) IO_WQ_ACCT_UNBOUND != (int) IO_WQ_UNBOUND);
	BUILD_BUG_ON((int) IO_WQ_ACCT_NR      != 2);

1332 1333 1334 1335 1336
	for (i = 0; i < 2; i++) {
		if (new_count[i] > task_rlimit(current, RLIMIT_NPROC))
			new_count[i] = task_rlimit(current, RLIMIT_NPROC);
	}

1337 1338 1339
	for (i = 0; i < IO_WQ_ACCT_NR; i++)
		prev[i] = 0;

1340 1341
	rcu_read_lock();
	for_each_node(node) {
P
Pavel Begunkov 已提交
1342
		struct io_wqe *wqe = wq->wqes[node];
1343 1344
		struct io_wqe_acct *acct;

P
Pavel Begunkov 已提交
1345
		raw_spin_lock(&wqe->lock);
1346
		for (i = 0; i < IO_WQ_ACCT_NR; i++) {
P
Pavel Begunkov 已提交
1347
			acct = &wqe->acct[i];
1348 1349
			if (first_node)
				prev[i] = max_t(int, acct->max_workers, prev[i]);
1350 1351 1352
			if (new_count[i])
				acct->max_workers = new_count[i];
		}
P
Pavel Begunkov 已提交
1353
		raw_spin_unlock(&wqe->lock);
1354
		first_node = false;
1355 1356
	}
	rcu_read_unlock();
1357 1358 1359 1360

	for (i = 0; i < IO_WQ_ACCT_NR; i++)
		new_count[i] = prev[i];

1361 1362 1363
	return 0;
}

1364 1365 1366 1367 1368
static __init int io_wq_init(void)
{
	int ret;

	ret = cpuhp_setup_state_multi(CPUHP_AP_ONLINE_DYN, "io-wq/online",
J
Jens Axboe 已提交
1369
					io_wq_cpu_online, io_wq_cpu_offline);
1370 1371 1372 1373 1374 1375
	if (ret < 0)
		return ret;
	io_wq_online = ret;
	return 0;
}
subsys_initcall(io_wq_init);