io-wq.c 32.9 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14
// SPDX-License-Identifier: GPL-2.0
/*
 * Basic worker thread pool for io_uring
 *
 * Copyright (C) 2019 Jens Axboe
 *
 */
#include <linux/kernel.h>
#include <linux/init.h>
#include <linux/errno.h>
#include <linux/sched/signal.h>
#include <linux/percpu.h>
#include <linux/slab.h>
#include <linux/rculist_nulls.h>
15
#include <linux/cpu.h>
16
#include <linux/tracehook.h>
17
#include <linux/audit.h>
18
#include <uapi/linux/io_uring.h>
19 20 21 22 23 24 25 26 27

#include "io-wq.h"

#define WORKER_IDLE_TIMEOUT	(5 * HZ)

enum {
	IO_WORKER_F_UP		= 1,	/* up and active */
	IO_WORKER_F_RUNNING	= 2,	/* account as running */
	IO_WORKER_F_FREE	= 4,	/* worker on free list */
J
Jens Axboe 已提交
28
	IO_WORKER_F_BOUND	= 8,	/* is doing bounded work */
29 30 31 32 33 34 35
};

enum {
	IO_WQ_BIT_EXIT		= 0,	/* wq exiting */
};

enum {
36
	IO_ACCT_STALLED_BIT	= 0,	/* stalled on hash */
37 38 39 40 41 42 43 44 45
};

/*
 * One for each thread in a wqe pool
 */
struct io_worker {
	refcount_t ref;
	unsigned flags;
	struct hlist_nulls_node nulls_node;
46
	struct list_head all_list;
47 48
	struct task_struct *task;
	struct io_wqe *wqe;
49

50
	struct io_wq_work *cur_work;
51
	raw_spinlock_t lock;
52

53 54
	struct completion ref_done;

55 56 57 58
	unsigned long create_state;
	struct callback_head create_work;
	int create_index;

59 60 61 62
	union {
		struct rcu_head rcu;
		struct work_struct work;
	};
63 64 65 66 67 68 69 70
};

#if BITS_PER_LONG == 64
#define IO_WQ_HASH_ORDER	6
#else
#define IO_WQ_HASH_ORDER	5
#endif

71 72
#define IO_WQ_NR_HASH_BUCKETS	(1u << IO_WQ_HASH_ORDER)

73 74 75
struct io_wqe_acct {
	unsigned nr_workers;
	unsigned max_workers;
76
	int index;
77
	atomic_t nr_running;
78 79
	struct io_wq_work_list work_list;
	unsigned long flags;
80 81 82 83 84
};

enum {
	IO_WQ_ACCT_BOUND,
	IO_WQ_ACCT_UNBOUND,
85
	IO_WQ_ACCT_NR,
86 87
};

88 89 90 91
/*
 * Per-node worker thread pool
 */
struct io_wqe {
92 93
	raw_spinlock_t lock;
	struct io_wqe_acct acct[2];
94 95 96

	int node;

97
	struct hlist_nulls_head free_list;
98
	struct list_head all_list;
99

100 101
	struct wait_queue_entry wait;

102
	struct io_wq *wq;
103
	struct io_wq_work *hash_tail[IO_WQ_NR_HASH_BUCKETS];
J
Jens Axboe 已提交
104 105

	cpumask_var_t cpu_mask;
106 107 108 109 110 111 112 113
};

/*
 * Per io_wq state
  */
struct io_wq {
	unsigned long state;

114
	free_work_fn *free_work;
115
	io_wq_work_fn *do_work;
116

117 118
	struct io_wq_hash *hash;

119 120 121
	atomic_t worker_refs;
	struct completion worker_done;

122
	struct hlist_node cpuhp_node;
123

124
	struct task_struct *task;
125 126

	struct io_wqe *wqes[];
127 128
};

129 130
static enum cpuhp_state io_wq_online;

131 132 133 134 135 136 137 138
struct io_cb_cancel_data {
	work_cancel_fn *fn;
	void *data;
	int nr_running;
	int nr_pending;
	bool cancel_all;
};

139
static bool create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index);
140
static void io_wqe_dec_running(struct io_worker *worker);
141 142 143
static bool io_acct_cancel_pending_work(struct io_wqe *wqe,
					struct io_wqe_acct *acct,
					struct io_cb_cancel_data *match);
144
static void create_worker_cb(struct callback_head *cb);
145
static void io_wq_cancel_tw_create(struct io_wq *wq);
146

147 148 149 150 151 152 153 154
static bool io_worker_get(struct io_worker *worker)
{
	return refcount_inc_not_zero(&worker->ref);
}

static void io_worker_release(struct io_worker *worker)
{
	if (refcount_dec_and_test(&worker->ref))
155
		complete(&worker->ref_done);
156 157
}

P
Pavel Begunkov 已提交
158 159 160 161 162
static inline struct io_wqe_acct *io_get_acct(struct io_wqe *wqe, bool bound)
{
	return &wqe->acct[bound ? IO_WQ_ACCT_BOUND : IO_WQ_ACCT_UNBOUND];
}

163 164 165
static inline struct io_wqe_acct *io_work_get_acct(struct io_wqe *wqe,
						   struct io_wq_work *work)
{
P
Pavel Begunkov 已提交
166
	return io_get_acct(wqe, !(work->flags & IO_WQ_WORK_UNBOUND));
167 168
}

169
static inline struct io_wqe_acct *io_wqe_get_acct(struct io_worker *worker)
170
{
P
Pavel Begunkov 已提交
171
	return io_get_acct(worker->wqe, worker->flags & IO_WORKER_F_BOUND);
172 173
}

174 175 176 177 178 179
static void io_worker_ref_put(struct io_wq *wq)
{
	if (atomic_dec_and_test(&wq->worker_refs))
		complete(&wq->worker_done);
}

180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204
static void io_worker_cancel_cb(struct io_worker *worker)
{
	struct io_wqe_acct *acct = io_wqe_get_acct(worker);
	struct io_wqe *wqe = worker->wqe;
	struct io_wq *wq = wqe->wq;

	atomic_dec(&acct->nr_running);
	raw_spin_lock(&worker->wqe->lock);
	acct->nr_workers--;
	raw_spin_unlock(&worker->wqe->lock);
	io_worker_ref_put(wq);
	clear_bit_unlock(0, &worker->create_state);
	io_worker_release(worker);
}

static bool io_task_worker_match(struct callback_head *cb, void *data)
{
	struct io_worker *worker;

	if (cb->func != create_worker_cb)
		return false;
	worker = container_of(cb, struct io_worker, create_work);
	return worker == data;
}

205 206 207
static void io_worker_exit(struct io_worker *worker)
{
	struct io_wqe *wqe = worker->wqe;
208
	struct io_wq *wq = wqe->wq;
209

210 211 212 213 214 215 216 217
	while (1) {
		struct callback_head *cb = task_work_cancel_match(wq->task,
						io_task_worker_match, worker);

		if (!cb)
			break;
		io_worker_cancel_cb(worker);
	}
218

219
	io_worker_release(worker);
220
	wait_for_completion(&worker->ref_done);
221

222
	raw_spin_lock(&wqe->lock);
223
	if (worker->flags & IO_WORKER_F_FREE)
224
		hlist_nulls_del_rcu(&worker->nulls_node);
225
	list_del_rcu(&worker->all_list);
226 227 228 229 230
	preempt_disable();
	io_wqe_dec_running(worker);
	worker->flags = 0;
	current->flags &= ~PF_IO_WORKER;
	preempt_enable();
231
	raw_spin_unlock(&wqe->lock);
232

233
	kfree_rcu(worker, rcu);
234
	io_worker_ref_put(wqe->wq);
235
	do_exit(0);
236 237
}

238
static inline bool io_acct_run_queue(struct io_wqe_acct *acct)
239
{
240 241
	if (!wq_list_empty(&acct->work_list) &&
	    !test_bit(IO_ACCT_STALLED_BIT, &acct->flags))
242 243 244 245 246 247
		return true;
	return false;
}

/*
 * Check head of free list for an available worker. If one isn't available,
248
 * caller must create one.
249
 */
250 251
static bool io_wqe_activate_free_worker(struct io_wqe *wqe,
					struct io_wqe_acct *acct)
252 253 254 255 256
	__must_hold(RCU)
{
	struct hlist_nulls_node *n;
	struct io_worker *worker;

257 258 259 260 261 262 263 264
	/*
	 * Iterate free_list and see if we can find an idle worker to
	 * activate. If a given worker is on the free_list but in the process
	 * of exiting, keep trying.
	 */
	hlist_nulls_for_each_entry_rcu(worker, n, &wqe->free_list, nulls_node) {
		if (!io_worker_get(worker))
			continue;
265 266 267 268
		if (io_wqe_get_acct(worker) != acct) {
			io_worker_release(worker);
			continue;
		}
269 270 271 272
		if (wake_up_process(worker->task)) {
			io_worker_release(worker);
			return true;
		}
273 274 275 276 277 278 279 280
		io_worker_release(worker);
	}

	return false;
}

/*
 * We need a worker. If we find a free one, we're good. If not, and we're
281
 * below the max number of workers, create one.
282
 */
283
static bool io_wqe_create_worker(struct io_wqe *wqe, struct io_wqe_acct *acct)
284 285 286 287 288
{
	/*
	 * Most likely an attempt to queue unbounded work on an io_wq that
	 * wasn't setup with any unbounded workers.
	 */
P
Pavel Begunkov 已提交
289 290
	if (unlikely(!acct->max_workers))
		pr_warn_once("io-wq is not configured for unbound workers");
291

292
	raw_spin_lock(&wqe->lock);
P
Pavel Begunkov 已提交
293
	if (acct->nr_workers >= acct->max_workers) {
294 295
		raw_spin_unlock(&wqe->lock);
		return true;
296
	}
297
	acct->nr_workers++;
298
	raw_spin_unlock(&wqe->lock);
299 300 301
	atomic_inc(&acct->nr_running);
	atomic_inc(&wqe->wq->worker_refs);
	return create_io_worker(wqe->wq, wqe, acct->index);
302 303
}

304
static void io_wqe_inc_running(struct io_worker *worker)
305
{
306
	struct io_wqe_acct *acct = io_wqe_get_acct(worker);
307 308 309 310

	atomic_inc(&acct->nr_running);
}

311 312
static void create_worker_cb(struct callback_head *cb)
{
313
	struct io_worker *worker;
314
	struct io_wq *wq;
315 316
	struct io_wqe *wqe;
	struct io_wqe_acct *acct;
J
Jens Axboe 已提交
317
	bool do_create = false;
318

319 320
	worker = container_of(cb, struct io_worker, create_work);
	wqe = worker->wqe;
321
	wq = wqe->wq;
322
	acct = &wqe->acct[worker->create_index];
323
	raw_spin_lock(&wqe->lock);
324
	if (acct->nr_workers < acct->max_workers) {
325
		acct->nr_workers++;
326 327
		do_create = true;
	}
328
	raw_spin_unlock(&wqe->lock);
329
	if (do_create) {
J
Jens Axboe 已提交
330
		create_io_worker(wq, wqe, worker->create_index);
331 332 333 334
	} else {
		atomic_dec(&acct->nr_running);
		io_worker_ref_put(wq);
	}
335 336
	clear_bit_unlock(0, &worker->create_state);
	io_worker_release(worker);
337 338
}

339 340 341
static bool io_queue_worker_create(struct io_worker *worker,
				   struct io_wqe_acct *acct,
				   task_work_func_t func)
342
{
343
	struct io_wqe *wqe = worker->wqe;
344 345 346 347 348
	struct io_wq *wq = wqe->wq;

	/* raced with exit, just ignore create call */
	if (test_bit(IO_WQ_BIT_EXIT, &wq->state))
		goto fail;
349 350 351 352 353 354 355 356 357 358 359
	if (!io_worker_get(worker))
		goto fail;
	/*
	 * create_state manages ownership of create_work/index. We should
	 * only need one entry per worker, as the worker going to sleep
	 * will trigger the condition, and waking will clear it once it
	 * runs the task_work.
	 */
	if (test_bit(0, &worker->create_state) ||
	    test_and_set_bit_lock(0, &worker->create_state))
		goto fail_release;
360

361
	atomic_inc(&wq->worker_refs);
362
	init_task_work(&worker->create_work, func);
363
	worker->create_index = acct->index;
364 365 366 367 368 369 370 371 372 373
	if (!task_work_add(wq->task, &worker->create_work, TWA_SIGNAL)) {
		/*
		 * EXIT may have been set after checking it above, check after
		 * adding the task_work and remove any creation item if it is
		 * now set. wq exit does that too, but we can have added this
		 * work item after we canceled in io_wq_exit_workers().
		 */
		if (test_bit(IO_WQ_BIT_EXIT, &wq->state))
			io_wq_cancel_tw_create(wq);
		io_worker_ref_put(wq);
374
		return true;
375 376
	}
	io_worker_ref_put(wq);
377 378 379
	clear_bit_unlock(0, &worker->create_state);
fail_release:
	io_worker_release(worker);
380 381 382
fail:
	atomic_dec(&acct->nr_running);
	io_worker_ref_put(wq);
383
	return false;
384 385
}

386
static void io_wqe_dec_running(struct io_worker *worker)
387 388
	__must_hold(wqe->lock)
{
389 390
	struct io_wqe_acct *acct = io_wqe_get_acct(worker);
	struct io_wqe *wqe = worker->wqe;
391

392 393 394
	if (!(worker->flags & IO_WORKER_F_UP))
		return;

395
	if (atomic_dec_and_test(&acct->nr_running) && io_acct_run_queue(acct)) {
396 397
		atomic_inc(&acct->nr_running);
		atomic_inc(&wqe->wq->worker_refs);
398
		raw_spin_unlock(&wqe->lock);
399
		io_queue_worker_create(worker, acct, create_worker_cb);
400
		raw_spin_lock(&wqe->lock);
401
	}
402 403
}

404 405 406 407
/*
 * Worker will start processing some work. Move it to the busy list, if
 * it's currently on the freelist
 */
408
static void __io_worker_busy(struct io_wqe *wqe, struct io_worker *worker)
409 410 411 412 413 414 415 416 417 418 419 420 421 422 423
	__must_hold(wqe->lock)
{
	if (worker->flags & IO_WORKER_F_FREE) {
		worker->flags &= ~IO_WORKER_F_FREE;
		hlist_nulls_del_init_rcu(&worker->nulls_node);
	}
}

/*
 * No work, worker going to sleep. Move to freelist, and unuse mm if we
 * have one attached. Dropping the mm may potentially sleep, so we drop
 * the lock in that case and return success. Since the caller has to
 * retry the loop in that case (we changed task state), we don't regrab
 * the lock if we return success.
 */
424
static void __io_worker_idle(struct io_wqe *wqe, struct io_worker *worker)
425 426 427 428
	__must_hold(wqe->lock)
{
	if (!(worker->flags & IO_WORKER_F_FREE)) {
		worker->flags |= IO_WORKER_F_FREE;
429
		hlist_nulls_add_head_rcu(&worker->nulls_node, &wqe->free_list);
430 431 432
	}
}

P
Pavel Begunkov 已提交
433 434 435 436 437
static inline unsigned int io_get_work_hash(struct io_wq_work *work)
{
	return work->flags >> IO_WQ_HASH_SHIFT;
}

438
static bool io_wait_on_hash(struct io_wqe *wqe, unsigned int hash)
439 440
{
	struct io_wq *wq = wqe->wq;
441
	bool ret = false;
442

443
	spin_lock_irq(&wq->hash->wait.lock);
444 445 446 447 448
	if (list_empty(&wqe->wait.entry)) {
		__add_wait_queue(&wq->hash->wait, &wqe->wait);
		if (!test_bit(hash, &wq->hash->map)) {
			__set_current_state(TASK_RUNNING);
			list_del_init(&wqe->wait.entry);
449
			ret = true;
450 451
		}
	}
452
	spin_unlock_irq(&wq->hash->wait.lock);
453
	return ret;
454 455
}

456
static struct io_wq_work *io_get_next_work(struct io_wqe_acct *acct,
J
Jens Axboe 已提交
457
					   struct io_worker *worker)
458 459
	__must_hold(wqe->lock)
{
J
Jens Axboe 已提交
460
	struct io_wq_work_node *node, *prev;
461
	struct io_wq_work *work, *tail;
462
	unsigned int stall_hash = -1U;
463
	struct io_wqe *wqe = worker->wqe;
464

465
	wq_list_for_each(node, prev, &acct->work_list) {
466 467
		unsigned int hash;

J
Jens Axboe 已提交
468 469
		work = container_of(node, struct io_wq_work, list);

470
		/* not hashed, can run anytime */
471
		if (!io_wq_is_hashed(work)) {
472
			wq_list_del(&acct->work_list, node, prev);
473 474 475
			return work;
		}

P
Pavel Begunkov 已提交
476
		hash = io_get_work_hash(work);
477 478 479 480 481
		/* all items with this hash lie in [work, tail] */
		tail = wqe->hash_tail[hash];

		/* hashed, can run if not already running */
		if (!test_and_set_bit(hash, &wqe->wq->hash->map)) {
482
			wqe->hash_tail[hash] = NULL;
483
			wq_list_cut(&acct->work_list, &tail->list, prev);
484 485
			return work;
		}
486 487 488 489 490 491 492
		if (stall_hash == -1U)
			stall_hash = hash;
		/* fast forward to a next hash, for-each will fix up @prev */
		node = &tail->list;
	}

	if (stall_hash != -1U) {
493 494
		bool unstalled;

J
Jens Axboe 已提交
495 496 497 498
		/*
		 * Set this before dropping the lock to avoid racing with new
		 * work being added and clearing the stalled bit.
		 */
499
		set_bit(IO_ACCT_STALLED_BIT, &acct->flags);
500
		raw_spin_unlock(&wqe->lock);
501
		unstalled = io_wait_on_hash(wqe, stall_hash);
502
		raw_spin_lock(&wqe->lock);
503 504 505 506 507
		if (unstalled) {
			clear_bit(IO_ACCT_STALLED_BIT, &acct->flags);
			if (wq_has_sleeper(&wqe->wq->hash->wait))
				wake_up(&wqe->wq->hash->wait);
		}
508 509 510 511 512
	}

	return NULL;
}

513
static bool io_flush_signals(void)
514
{
515
	if (unlikely(test_thread_flag(TIF_NOTIFY_SIGNAL))) {
516
		__set_current_state(TASK_RUNNING);
517
		tracehook_notify_signal();
518
		return true;
519
	}
520
	return false;
521 522 523 524 525
}

static void io_assign_current_work(struct io_worker *worker,
				   struct io_wq_work *work)
{
526
	if (work) {
527
		io_flush_signals();
528 529
		cond_resched();
	}
530

531
	raw_spin_lock(&worker->lock);
532
	worker->cur_work = work;
533
	raw_spin_unlock(&worker->lock);
534 535
}

P
Pavel Begunkov 已提交
536 537
static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work);

538 539 540
static void io_worker_handle_work(struct io_worker *worker)
	__releases(wqe->lock)
{
541
	struct io_wqe_acct *acct = io_wqe_get_acct(worker);
542 543
	struct io_wqe *wqe = worker->wqe;
	struct io_wq *wq = wqe->wq;
544
	bool do_kill = test_bit(IO_WQ_BIT_EXIT, &wq->state);
545 546

	do {
547
		struct io_wq_work *work;
548
get_next:
549 550 551 552 553 554 555
		/*
		 * If we got some work, mark us as busy. If we didn't, but
		 * the list isn't empty, it means we stalled on hashed work.
		 * Mark us stalled so we don't keep looking for work when we
		 * can't make progress, any work completion or insertion will
		 * clear the stalled flag.
		 */
556
		work = io_get_next_work(acct, worker);
557
		if (work)
558
			__io_worker_busy(wqe, worker);
559

560
		raw_spin_unlock(&wqe->lock);
561 562
		if (!work)
			break;
563
		io_assign_current_work(worker, work);
564
		__set_current_state(TASK_RUNNING);
565

566 567
		/* handle a whole dependent link */
		do {
568
			struct io_wq_work *next_hashed, *linked;
P
Pavel Begunkov 已提交
569
			unsigned int hash = io_get_work_hash(work);
570

571
			next_hashed = wq_next_work(work);
572 573 574

			if (unlikely(do_kill) && (work->flags & IO_WQ_WORK_UNBOUND))
				work->flags |= IO_WQ_WORK_CANCEL;
575 576
			wq->do_work(work);
			io_assign_current_work(worker, NULL);
577

578
			linked = wq->free_work(work);
579 580 581 582 583 584 585 586 587 588
			work = next_hashed;
			if (!work && linked && !io_wq_is_hashed(linked)) {
				work = linked;
				linked = NULL;
			}
			io_assign_current_work(worker, work);
			if (linked)
				io_wqe_enqueue(wqe, linked);

			if (hash != -1U && !next_hashed) {
589 590
				/* serialize hash clear with wake_up() */
				spin_lock_irq(&wq->hash->wait.lock);
591
				clear_bit(hash, &wq->hash->map);
592
				clear_bit(IO_ACCT_STALLED_BIT, &acct->flags);
593
				spin_unlock_irq(&wq->hash->wait.lock);
594 595
				if (wq_has_sleeper(&wq->hash->wait))
					wake_up(&wq->hash->wait);
596
				raw_spin_lock(&wqe->lock);
597 598 599
				/* skip unnecessary unlock-lock wqe->lock */
				if (!work)
					goto get_next;
600
				raw_spin_unlock(&wqe->lock);
601
			}
602
		} while (work);
603

604
		raw_spin_lock(&wqe->lock);
605 606 607 608 609 610
	} while (1);
}

static int io_wqe_worker(void *data)
{
	struct io_worker *worker = data;
611
	struct io_wqe_acct *acct = io_wqe_get_acct(worker);
612 613
	struct io_wqe *wqe = worker->wqe;
	struct io_wq *wq = wqe->wq;
J
Jens Axboe 已提交
614
	bool last_timeout = false;
615
	char buf[TASK_COMM_LEN];
616

617 618
	worker->flags |= (IO_WORKER_F_UP | IO_WORKER_F_RUNNING);

619
	snprintf(buf, sizeof(buf), "iou-wrk-%d", wq->task->pid);
620
	set_task_comm(current, buf);
621

622 623
	audit_alloc_kernel(current);

624
	while (!test_bit(IO_WQ_BIT_EXIT, &wq->state)) {
625 626
		long ret;

J
Jens Axboe 已提交
627
		set_current_state(TASK_INTERRUPTIBLE);
628
loop:
629
		raw_spin_lock(&wqe->lock);
630
		if (io_acct_run_queue(acct)) {
631
			io_worker_handle_work(worker);
632
			goto loop;
633
		}
J
Jens Axboe 已提交
634 635
		/* timed out, exit unless we're the last worker */
		if (last_timeout && acct->nr_workers > 1) {
636
			acct->nr_workers--;
J
Jens Axboe 已提交
637 638 639 640 641
			raw_spin_unlock(&wqe->lock);
			__set_current_state(TASK_RUNNING);
			break;
		}
		last_timeout = false;
642
		__io_worker_idle(wqe, worker);
643
		raw_spin_unlock(&wqe->lock);
644 645
		if (io_flush_signals())
			continue;
646
		ret = schedule_timeout(WORKER_IDLE_TIMEOUT);
647 648 649 650 651
		if (signal_pending(current)) {
			struct ksignal ksig;

			if (!get_signal(&ksig))
				continue;
652
			break;
653
		}
J
Jens Axboe 已提交
654
		last_timeout = !ret;
655 656 657
	}

	if (test_bit(IO_WQ_BIT_EXIT, &wq->state)) {
658
		raw_spin_lock(&wqe->lock);
P
Pavel Begunkov 已提交
659
		io_worker_handle_work(worker);
660 661
	}

662
	audit_free(current);
663 664 665 666 667 668 669 670 671
	io_worker_exit(worker);
	return 0;
}

/*
 * Called when a worker is scheduled in. Mark us as currently running.
 */
void io_wq_worker_running(struct task_struct *tsk)
{
672
	struct io_worker *worker = tsk->pf_io_worker;
673

674 675
	if (!worker)
		return;
676 677 678 679 680
	if (!(worker->flags & IO_WORKER_F_UP))
		return;
	if (worker->flags & IO_WORKER_F_RUNNING)
		return;
	worker->flags |= IO_WORKER_F_RUNNING;
681
	io_wqe_inc_running(worker);
682 683 684 685
}

/*
 * Called when worker is going to sleep. If there are no workers currently
686
 * running and we have work pending, wake up a free one or create a new one.
687 688 689
 */
void io_wq_worker_sleeping(struct task_struct *tsk)
{
690
	struct io_worker *worker = tsk->pf_io_worker;
691

692 693
	if (!worker)
		return;
694 695 696 697 698 699 700
	if (!(worker->flags & IO_WORKER_F_UP))
		return;
	if (!(worker->flags & IO_WORKER_F_RUNNING))
		return;

	worker->flags &= ~IO_WORKER_F_RUNNING;

701
	raw_spin_lock(&worker->wqe->lock);
702
	io_wqe_dec_running(worker);
703
	raw_spin_unlock(&worker->wqe->lock);
704 705
}

706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728
static void io_init_new_worker(struct io_wqe *wqe, struct io_worker *worker,
			       struct task_struct *tsk)
{
	tsk->pf_io_worker = worker;
	worker->task = tsk;
	set_cpus_allowed_ptr(tsk, wqe->cpu_mask);
	tsk->flags |= PF_NO_SETAFFINITY;

	raw_spin_lock(&wqe->lock);
	hlist_nulls_add_head_rcu(&worker->nulls_node, &wqe->free_list);
	list_add_tail_rcu(&worker->all_list, &wqe->all_list);
	worker->flags |= IO_WORKER_F_FREE;
	raw_spin_unlock(&wqe->lock);
	wake_up_new_task(tsk);
}

static bool io_wq_work_match_all(struct io_wq_work *work, void *data)
{
	return true;
}

static inline bool io_should_retry_thread(long err)
{
729 730 731 732 733 734 735
	/*
	 * Prevent perpetual task_work retry, if the task (or its group) is
	 * exiting.
	 */
	if (fatal_signal_pending(current))
		return false;

736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777
	switch (err) {
	case -EAGAIN:
	case -ERESTARTSYS:
	case -ERESTARTNOINTR:
	case -ERESTARTNOHAND:
		return true;
	default:
		return false;
	}
}

static void create_worker_cont(struct callback_head *cb)
{
	struct io_worker *worker;
	struct task_struct *tsk;
	struct io_wqe *wqe;

	worker = container_of(cb, struct io_worker, create_work);
	clear_bit_unlock(0, &worker->create_state);
	wqe = worker->wqe;
	tsk = create_io_thread(io_wqe_worker, worker, wqe->node);
	if (!IS_ERR(tsk)) {
		io_init_new_worker(wqe, worker, tsk);
		io_worker_release(worker);
		return;
	} else if (!io_should_retry_thread(PTR_ERR(tsk))) {
		struct io_wqe_acct *acct = io_wqe_get_acct(worker);

		atomic_dec(&acct->nr_running);
		raw_spin_lock(&wqe->lock);
		acct->nr_workers--;
		if (!acct->nr_workers) {
			struct io_cb_cancel_data match = {
				.fn		= io_wq_work_match_all,
				.cancel_all	= true,
			};

			while (io_acct_cancel_pending_work(wqe, acct, &match))
				raw_spin_lock(&wqe->lock);
		}
		raw_spin_unlock(&wqe->lock);
		io_worker_ref_put(wqe->wq);
778
		kfree(worker);
779 780 781 782 783 784 785 786 787 788 789 790 791
		return;
	}

	/* re-create attempts grab a new worker ref, drop the existing one */
	io_worker_release(worker);
	schedule_work(&worker->work);
}

static void io_workqueue_create(struct work_struct *work)
{
	struct io_worker *worker = container_of(work, struct io_worker, work);
	struct io_wqe_acct *acct = io_wqe_get_acct(worker);

792
	if (!io_queue_worker_create(worker, acct, create_worker_cont))
793
		kfree(worker);
794 795 796
}

static bool create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index)
797
{
798
	struct io_wqe_acct *acct = &wqe->acct[index];
799
	struct io_worker *worker;
800
	struct task_struct *tsk;
801

802 803
	__set_current_state(TASK_RUNNING);

804
	worker = kzalloc_node(sizeof(*worker), GFP_KERNEL, wqe->node);
805
	if (!worker) {
806 807
fail:
		atomic_dec(&acct->nr_running);
808
		raw_spin_lock(&wqe->lock);
809
		acct->nr_workers--;
810
		raw_spin_unlock(&wqe->lock);
811
		io_worker_ref_put(wq);
812
		return false;
813
	}
814

815 816
	refcount_set(&worker->ref, 1);
	worker->wqe = wqe;
817
	raw_spin_lock_init(&worker->lock);
818
	init_completion(&worker->ref_done);
819 820 821

	if (index == IO_WQ_ACCT_BOUND)
		worker->flags |= IO_WORKER_F_BOUND;
822 823 824 825 826

	tsk = create_io_thread(io_wqe_worker, worker, wqe->node);
	if (!IS_ERR(tsk)) {
		io_init_new_worker(wqe, worker, tsk);
	} else if (!io_should_retry_thread(PTR_ERR(tsk))) {
827
		kfree(worker);
828 829 830 831 832 833 834
		goto fail;
	} else {
		INIT_WORK(&worker->work, io_workqueue_create);
		schedule_work(&worker->work);
	}

	return true;
835 836
}

837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863
/*
 * Iterate the passed in list and call the specific function for each
 * worker that isn't exiting
 */
static bool io_wq_for_each_worker(struct io_wqe *wqe,
				  bool (*func)(struct io_worker *, void *),
				  void *data)
{
	struct io_worker *worker;
	bool ret = false;

	list_for_each_entry_rcu(worker, &wqe->all_list, all_list) {
		if (io_worker_get(worker)) {
			/* no task if node is/was offline */
			if (worker->task)
				ret = func(worker, data);
			io_worker_release(worker);
			if (ret)
				break;
		}
	}

	return ret;
}

static bool io_wq_worker_wake(struct io_worker *worker, void *data)
{
864
	set_notify_signal(worker->task);
865 866 867 868
	wake_up_process(worker->task);
	return false;
}

869
static void io_run_cancel(struct io_wq_work *work, struct io_wqe *wqe)
870
{
871 872
	struct io_wq *wq = wqe->wq;

873 874
	do {
		work->flags |= IO_WQ_WORK_CANCEL;
875 876
		wq->do_work(work);
		work = wq->free_work(work);
877 878 879
	} while (work);
}

880 881
static void io_wqe_insert_work(struct io_wqe *wqe, struct io_wq_work *work)
{
882
	struct io_wqe_acct *acct = io_work_get_acct(wqe, work);
883 884 885 886 887
	unsigned int hash;
	struct io_wq_work *tail;

	if (!io_wq_is_hashed(work)) {
append:
888
		wq_list_add_tail(&work->list, &acct->work_list);
889 890 891 892 893 894 895 896 897
		return;
	}

	hash = io_get_work_hash(work);
	tail = wqe->hash_tail[hash];
	wqe->hash_tail[hash] = work;
	if (!tail)
		goto append;

898
	wq_list_add_after(&work->list, &tail->list, &acct->work_list);
899 900
}

901 902 903 904 905
static bool io_wq_work_match_item(struct io_wq_work *work, void *data)
{
	return work == data;
}

906 907
static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work)
{
908
	struct io_wqe_acct *acct = io_work_get_acct(wqe, work);
909 910
	unsigned work_flags = work->flags;
	bool do_create;
911

912 913 914 915 916 917
	/*
	 * If io-wq is exiting for this task, or if the request has explicitly
	 * been marked as one that should not get executed, cancel it here.
	 */
	if (test_bit(IO_WQ_BIT_EXIT, &wqe->wq->state) ||
	    (work->flags & IO_WQ_WORK_CANCEL)) {
918
		io_run_cancel(work, wqe);
919 920 921
		return;
	}

922
	raw_spin_lock(&wqe->lock);
923
	io_wqe_insert_work(wqe, work);
924
	clear_bit(IO_ACCT_STALLED_BIT, &acct->flags);
925 926

	rcu_read_lock();
927
	do_create = !io_wqe_activate_free_worker(wqe, acct);
928 929
	rcu_read_unlock();

930
	raw_spin_unlock(&wqe->lock);
931

932
	if (do_create && ((work_flags & IO_WQ_WORK_CONCURRENT) ||
933 934 935 936
	    !atomic_read(&acct->nr_running))) {
		bool did_create;

		did_create = io_wqe_create_worker(wqe, acct);
937 938 939 940 941 942 943 944 945 946 947 948 949 950
		if (likely(did_create))
			return;

		raw_spin_lock(&wqe->lock);
		/* fatal condition, failed to create the first worker */
		if (!acct->nr_workers) {
			struct io_cb_cancel_data match = {
				.fn		= io_wq_work_match_item,
				.data		= work,
				.cancel_all	= false,
			};

			if (io_acct_cancel_pending_work(wqe, acct, &match))
				raw_spin_lock(&wqe->lock);
951
		}
952
		raw_spin_unlock(&wqe->lock);
953
	}
954 955 956 957 958 959 960 961 962 963
}

void io_wq_enqueue(struct io_wq *wq, struct io_wq_work *work)
{
	struct io_wqe *wqe = wq->wqes[numa_node_id()];

	io_wqe_enqueue(wqe, work);
}

/*
964 965
 * Work items that hash to the same value will not be done in parallel.
 * Used to limit concurrent writes, generally hashed by inode.
966
 */
967
void io_wq_hash_work(struct io_wq_work *work, void *val)
968
{
969
	unsigned int bit;
970 971 972 973 974

	bit = hash_ptr(val, IO_WQ_HASH_ORDER);
	work->flags |= (IO_WQ_WORK_HASHED | (bit << IO_WQ_HASH_SHIFT));
}

975
static bool io_wq_worker_cancel(struct io_worker *worker, void *data)
976
{
977
	struct io_cb_cancel_data *match = data;
978 979 980

	/*
	 * Hold the lock to avoid ->cur_work going out of scope, caller
981
	 * may dereference the passed in work.
982
	 */
983
	raw_spin_lock(&worker->lock);
984
	if (worker->cur_work &&
985
	    match->fn(worker->cur_work, match->data)) {
986
		set_notify_signal(worker->task);
987
		match->nr_running++;
988
	}
989
	raw_spin_unlock(&worker->lock);
990

991
	return match->nr_running && !match->cancel_all;
992 993
}

994 995 996 997
static inline void io_wqe_remove_pending(struct io_wqe *wqe,
					 struct io_wq_work *work,
					 struct io_wq_work_node *prev)
{
998
	struct io_wqe_acct *acct = io_work_get_acct(wqe, work);
999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009
	unsigned int hash = io_get_work_hash(work);
	struct io_wq_work *prev_work = NULL;

	if (io_wq_is_hashed(work) && work == wqe->hash_tail[hash]) {
		if (prev)
			prev_work = container_of(prev, struct io_wq_work, list);
		if (prev_work && io_get_work_hash(prev_work) == hash)
			wqe->hash_tail[hash] = prev_work;
		else
			wqe->hash_tail[hash] = NULL;
	}
1010
	wq_list_del(&acct->work_list, &work->list, prev);
1011 1012
}

1013 1014 1015 1016
static bool io_acct_cancel_pending_work(struct io_wqe *wqe,
					struct io_wqe_acct *acct,
					struct io_cb_cancel_data *match)
	__releases(wqe->lock)
1017
{
J
Jens Axboe 已提交
1018
	struct io_wq_work_node *node, *prev;
1019 1020
	struct io_wq_work *work;

1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039
	wq_list_for_each(node, prev, &acct->work_list) {
		work = container_of(node, struct io_wq_work, list);
		if (!match->fn(work, match->data))
			continue;
		io_wqe_remove_pending(wqe, work, prev);
		raw_spin_unlock(&wqe->lock);
		io_run_cancel(work, wqe);
		match->nr_pending++;
		/* not safe to continue after unlock */
		return true;
	}

	return false;
}

static void io_wqe_cancel_pending_work(struct io_wqe *wqe,
				       struct io_cb_cancel_data *match)
{
	int i;
1040
retry:
1041
	raw_spin_lock(&wqe->lock);
1042 1043
	for (i = 0; i < IO_WQ_ACCT_NR; i++) {
		struct io_wqe_acct *acct = io_get_acct(wqe, i == 0);
1044

1045 1046 1047 1048
		if (io_acct_cancel_pending_work(wqe, acct, match)) {
			if (match->cancel_all)
				goto retry;
			return;
1049
		}
1050
	}
1051
	raw_spin_unlock(&wqe->lock);
1052 1053
}

1054
static void io_wqe_cancel_running_work(struct io_wqe *wqe,
1055 1056
				       struct io_cb_cancel_data *match)
{
1057
	rcu_read_lock();
1058
	io_wq_for_each_worker(wqe, io_wq_worker_cancel, match);
1059 1060 1061
	rcu_read_unlock();
}

1062
enum io_wq_cancel io_wq_cancel_cb(struct io_wq *wq, work_cancel_fn *cancel,
1063
				  void *data, bool cancel_all)
1064
{
1065
	struct io_cb_cancel_data match = {
1066 1067 1068
		.fn		= cancel,
		.data		= data,
		.cancel_all	= cancel_all,
1069
	};
J
Jann Horn 已提交
1070
	int node;
1071

1072 1073 1074 1075 1076
	/*
	 * First check pending list, if we're lucky we can just remove it
	 * from there. CANCEL_OK means that the work is returned as-new,
	 * no completion will be posted for it.
	 */
J
Jann Horn 已提交
1077 1078
	for_each_node(node) {
		struct io_wqe *wqe = wq->wqes[node];
1079

1080 1081
		io_wqe_cancel_pending_work(wqe, &match);
		if (match.nr_pending && !match.cancel_all)
1082
			return IO_WQ_CANCEL_OK;
1083 1084
	}

1085 1086 1087 1088 1089 1090 1091 1092 1093
	/*
	 * Now check if a free (going busy) or busy worker has the work
	 * currently running. If we find it there, we'll return CANCEL_RUNNING
	 * as an indication that we attempt to signal cancellation. The
	 * completion will run normally in this case.
	 */
	for_each_node(node) {
		struct io_wqe *wqe = wq->wqes[node];

1094 1095
		io_wqe_cancel_running_work(wqe, &match);
		if (match.nr_running && !match.cancel_all)
1096 1097 1098
			return IO_WQ_CANCEL_RUNNING;
	}

1099 1100 1101 1102
	if (match.nr_running)
		return IO_WQ_CANCEL_RUNNING;
	if (match.nr_pending)
		return IO_WQ_CANCEL_OK;
1103
	return IO_WQ_CANCEL_NOTFOUND;
1104 1105
}

1106 1107 1108 1109
static int io_wqe_hash_wake(struct wait_queue_entry *wait, unsigned mode,
			    int sync, void *key)
{
	struct io_wqe *wqe = container_of(wait, struct io_wqe, wait);
1110
	int i;
1111 1112 1113 1114

	list_del_init(&wait->entry);

	rcu_read_lock();
1115 1116 1117 1118 1119 1120
	for (i = 0; i < IO_WQ_ACCT_NR; i++) {
		struct io_wqe_acct *acct = &wqe->acct[i];

		if (test_and_clear_bit(IO_ACCT_STALLED_BIT, &acct->flags))
			io_wqe_activate_free_worker(wqe, acct);
	}
1121 1122 1123 1124
	rcu_read_unlock();
	return 1;
}

1125
struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data)
1126
{
1127
	int ret, node, i;
1128 1129
	struct io_wq *wq;

1130
	if (WARN_ON_ONCE(!data->free_work || !data->do_work))
1131
		return ERR_PTR(-EINVAL);
P
Pavel Begunkov 已提交
1132 1133
	if (WARN_ON_ONCE(!bounded))
		return ERR_PTR(-EINVAL);
1134

1135
	wq = kzalloc(struct_size(wq, wqes, nr_node_ids), GFP_KERNEL);
1136 1137
	if (!wq)
		return ERR_PTR(-ENOMEM);
1138 1139
	ret = cpuhp_state_add_instance_nocalls(io_wq_online, &wq->cpuhp_node);
	if (ret)
1140
		goto err_wq;
1141

1142 1143
	refcount_inc(&data->hash->refs);
	wq->hash = data->hash;
1144
	wq->free_work = data->free_work;
1145
	wq->do_work = data->do_work;
1146

1147
	ret = -ENOMEM;
J
Jann Horn 已提交
1148
	for_each_node(node) {
1149
		struct io_wqe *wqe;
1150
		int alloc_node = node;
1151

1152 1153 1154
		if (!node_online(alloc_node))
			alloc_node = NUMA_NO_NODE;
		wqe = kzalloc_node(sizeof(struct io_wqe), GFP_KERNEL, alloc_node);
1155
		if (!wqe)
J
Jann Horn 已提交
1156
			goto err;
J
Jens Axboe 已提交
1157 1158 1159
		if (!alloc_cpumask_var(&wqe->cpu_mask, GFP_KERNEL))
			goto err;
		cpumask_copy(wqe->cpu_mask, cpumask_of_node(node));
J
Jann Horn 已提交
1160
		wq->wqes[node] = wqe;
1161
		wqe->node = alloc_node;
1162
		wqe->acct[IO_WQ_ACCT_BOUND].max_workers = bounded;
J
Jens Axboe 已提交
1163
		wqe->acct[IO_WQ_ACCT_UNBOUND].max_workers =
1164
					task_rlimit(current, RLIMIT_NPROC);
1165
		INIT_LIST_HEAD(&wqe->wait.entry);
1166 1167 1168 1169 1170 1171 1172 1173
		wqe->wait.func = io_wqe_hash_wake;
		for (i = 0; i < IO_WQ_ACCT_NR; i++) {
			struct io_wqe_acct *acct = &wqe->acct[i];

			acct->index = i;
			atomic_set(&acct->nr_running, 0);
			INIT_WQ_LIST(&acct->work_list);
		}
1174
		wqe->wq = wq;
1175
		raw_spin_lock_init(&wqe->lock);
1176
		INIT_HLIST_NULLS_HEAD(&wqe->free_list, 0);
1177
		INIT_LIST_HEAD(&wqe->all_list);
1178 1179
	}

1180 1181 1182 1183
	wq->task = get_task_struct(data->task);
	atomic_set(&wq->worker_refs, 1);
	init_completion(&wq->worker_done);
	return wq;
1184
err:
1185
	io_wq_put_hash(data->hash);
1186
	cpuhp_state_remove_instance_nocalls(io_wq_online, &wq->cpuhp_node);
J
Jens Axboe 已提交
1187 1188 1189 1190
	for_each_node(node) {
		if (!wq->wqes[node])
			continue;
		free_cpumask_var(wq->wqes[node]->cpu_mask);
J
Jann Horn 已提交
1191
		kfree(wq->wqes[node]);
J
Jens Axboe 已提交
1192
	}
1193
err_wq:
1194
	kfree(wq);
1195 1196 1197
	return ERR_PTR(ret);
}

1198 1199
static bool io_task_work_match(struct callback_head *cb, void *data)
{
1200
	struct io_worker *worker;
1201

1202
	if (cb->func != create_worker_cb && cb->func != create_worker_cont)
1203
		return false;
1204 1205
	worker = container_of(cb, struct io_worker, create_work);
	return worker->wqe->wq == data;
1206 1207
}

1208 1209 1210 1211 1212
void io_wq_exit_start(struct io_wq *wq)
{
	set_bit(IO_WQ_BIT_EXIT, &wq->state);
}

1213
static void io_wq_cancel_tw_create(struct io_wq *wq)
1214
{
1215 1216
	struct callback_head *cb;

1217
	while ((cb = task_work_cancel_match(wq->task, io_task_work_match, wq)) != NULL) {
1218
		struct io_worker *worker;
1219

1220
		worker = container_of(cb, struct io_worker, create_work);
1221
		io_worker_cancel_cb(worker);
1222
	}
1223 1224 1225 1226 1227 1228 1229 1230 1231 1232
}

static void io_wq_exit_workers(struct io_wq *wq)
{
	int node;

	if (!wq->task)
		return;

	io_wq_cancel_tw_create(wq);
1233 1234 1235 1236 1237 1238

	rcu_read_lock();
	for_each_node(node) {
		struct io_wqe *wqe = wq->wqes[node];

		io_wq_for_each_worker(wqe, io_wq_worker_wake, NULL);
1239
	}
1240 1241 1242
	rcu_read_unlock();
	io_worker_ref_put(wq);
	wait_for_completion(&wq->worker_done);
1243 1244 1245 1246 1247 1248

	for_each_node(node) {
		spin_lock_irq(&wq->hash->wait.lock);
		list_del_init(&wq->wqes[node]->wait.entry);
		spin_unlock_irq(&wq->hash->wait.lock);
	}
1249 1250
	put_task_struct(wq->task);
	wq->task = NULL;
1251 1252
}

1253
static void io_wq_destroy(struct io_wq *wq)
1254
{
J
Jann Horn 已提交
1255
	int node;
1256

1257 1258
	cpuhp_state_remove_instance_nocalls(io_wq_online, &wq->cpuhp_node);

1259 1260
	for_each_node(node) {
		struct io_wqe *wqe = wq->wqes[node];
1261 1262 1263 1264 1265
		struct io_cb_cancel_data match = {
			.fn		= io_wq_work_match_all,
			.cancel_all	= true,
		};
		io_wqe_cancel_pending_work(wqe, &match);
J
Jens Axboe 已提交
1266
		free_cpumask_var(wqe->cpu_mask);
1267 1268 1269
		kfree(wqe);
	}
	io_wq_put_hash(wq->hash);
1270
	kfree(wq);
1271 1272
}

1273 1274
void io_wq_put_and_exit(struct io_wq *wq)
{
1275 1276
	WARN_ON_ONCE(!test_bit(IO_WQ_BIT_EXIT, &wq->state));

1277
	io_wq_exit_workers(wq);
1278
	io_wq_destroy(wq);
1279 1280
}

J
Jens Axboe 已提交
1281 1282 1283 1284 1285
struct online_data {
	unsigned int cpu;
	bool online;
};

1286 1287
static bool io_wq_worker_affinity(struct io_worker *worker, void *data)
{
J
Jens Axboe 已提交
1288
	struct online_data *od = data;
1289

J
Jens Axboe 已提交
1290 1291 1292 1293
	if (od->online)
		cpumask_set_cpu(od->cpu, worker->wqe->cpu_mask);
	else
		cpumask_clear_cpu(od->cpu, worker->wqe->cpu_mask);
1294 1295 1296
	return false;
}

J
Jens Axboe 已提交
1297
static int __io_wq_cpu_online(struct io_wq *wq, unsigned int cpu, bool online)
1298
{
J
Jens Axboe 已提交
1299 1300 1301 1302
	struct online_data od = {
		.cpu = cpu,
		.online = online
	};
1303 1304 1305 1306
	int i;

	rcu_read_lock();
	for_each_node(i)
J
Jens Axboe 已提交
1307
		io_wq_for_each_worker(wq->wqes[i], io_wq_worker_affinity, &od);
1308 1309 1310 1311
	rcu_read_unlock();
	return 0;
}

J
Jens Axboe 已提交
1312 1313 1314 1315 1316 1317 1318 1319 1320 1321 1322 1323 1324 1325
static int io_wq_cpu_online(unsigned int cpu, struct hlist_node *node)
{
	struct io_wq *wq = hlist_entry_safe(node, struct io_wq, cpuhp_node);

	return __io_wq_cpu_online(wq, cpu, true);
}

static int io_wq_cpu_offline(unsigned int cpu, struct hlist_node *node)
{
	struct io_wq *wq = hlist_entry_safe(node, struct io_wq, cpuhp_node);

	return __io_wq_cpu_online(wq, cpu, false);
}

1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341 1342
int io_wq_cpu_affinity(struct io_wq *wq, cpumask_var_t mask)
{
	int i;

	rcu_read_lock();
	for_each_node(i) {
		struct io_wqe *wqe = wq->wqes[i];

		if (mask)
			cpumask_copy(wqe->cpu_mask, mask);
		else
			cpumask_copy(wqe->cpu_mask, cpumask_of_node(i));
	}
	rcu_read_unlock();
	return 0;
}

1343 1344 1345 1346 1347 1348
/*
 * Set max number of unbounded workers, returns old value. If new_count is 0,
 * then just return the old value.
 */
int io_wq_max_workers(struct io_wq *wq, int *new_count)
{
1349 1350 1351
	int prev[IO_WQ_ACCT_NR];
	bool first_node = true;
	int i, node;
1352

1353 1354 1355 1356
	BUILD_BUG_ON((int) IO_WQ_ACCT_BOUND   != (int) IO_WQ_BOUND);
	BUILD_BUG_ON((int) IO_WQ_ACCT_UNBOUND != (int) IO_WQ_UNBOUND);
	BUILD_BUG_ON((int) IO_WQ_ACCT_NR      != 2);

1357 1358 1359 1360 1361
	for (i = 0; i < 2; i++) {
		if (new_count[i] > task_rlimit(current, RLIMIT_NPROC))
			new_count[i] = task_rlimit(current, RLIMIT_NPROC);
	}

1362 1363 1364
	for (i = 0; i < IO_WQ_ACCT_NR; i++)
		prev[i] = 0;

1365 1366
	rcu_read_lock();
	for_each_node(node) {
P
Pavel Begunkov 已提交
1367
		struct io_wqe *wqe = wq->wqes[node];
1368 1369
		struct io_wqe_acct *acct;

P
Pavel Begunkov 已提交
1370
		raw_spin_lock(&wqe->lock);
1371
		for (i = 0; i < IO_WQ_ACCT_NR; i++) {
P
Pavel Begunkov 已提交
1372
			acct = &wqe->acct[i];
1373 1374
			if (first_node)
				prev[i] = max_t(int, acct->max_workers, prev[i]);
1375 1376 1377
			if (new_count[i])
				acct->max_workers = new_count[i];
		}
P
Pavel Begunkov 已提交
1378
		raw_spin_unlock(&wqe->lock);
1379
		first_node = false;
1380 1381
	}
	rcu_read_unlock();
1382 1383 1384 1385

	for (i = 0; i < IO_WQ_ACCT_NR; i++)
		new_count[i] = prev[i];

1386 1387 1388
	return 0;
}

1389 1390 1391 1392 1393
static __init int io_wq_init(void)
{
	int ret;

	ret = cpuhp_setup_state_multi(CPUHP_AP_ONLINE_DYN, "io-wq/online",
J
Jens Axboe 已提交
1394
					io_wq_cpu_online, io_wq_cpu_offline);
1395 1396 1397 1398 1399 1400
	if (ret < 0)
		return ret;
	io_wq_online = ret;
	return 0;
}
subsys_initcall(io_wq_init);