sched.c 32.0 KB
Newer Older
L
Linus Torvalds 已提交
1 2 3 4 5 6
/*
 * linux/net/sunrpc/sched.c
 *
 * Scheduling for synchronous and asynchronous RPC requests.
 *
 * Copyright (C) 1996 Olaf Kirch, <okir@monad.swb.de>
7
 *
L
Linus Torvalds 已提交
8 9 10 11 12 13 14 15 16 17 18 19
 * TCP NFS related read + write fixes
 * (C) 1999 Dave Airlie, University of Limerick, Ireland <airlied@linux.ie>
 */

#include <linux/module.h>

#include <linux/sched.h>
#include <linux/interrupt.h>
#include <linux/slab.h>
#include <linux/mempool.h>
#include <linux/smp.h>
#include <linux/spinlock.h>
A
Arjan van de Ven 已提交
20
#include <linux/mutex.h>
21
#include <linux/freezer.h>
22
#include <linux/sched/mm.h>
L
Linus Torvalds 已提交
23 24 25

#include <linux/sunrpc/clnt.h>

26 27
#include "sunrpc.h"

J
Jeff Layton 已提交
28
#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
L
Linus Torvalds 已提交
29 30 31
#define RPCDBG_FACILITY		RPCDBG_SCHED
#endif

32 33 34
#define CREATE_TRACE_POINTS
#include <trace/events/sunrpc.h>

L
Linus Torvalds 已提交
35 36 37 38 39 40
/*
 * RPC slabs and memory pools
 */
#define RPC_BUFFER_MAXSIZE	(2048)
#define RPC_BUFFER_POOLSIZE	(8)
#define RPC_TASK_POOLSIZE	(8)
41 42
static struct kmem_cache	*rpc_task_slabp __read_mostly;
static struct kmem_cache	*rpc_buffer_slabp __read_mostly;
43 44
static mempool_t	*rpc_task_mempool __read_mostly;
static mempool_t	*rpc_buffer_mempool __read_mostly;
L
Linus Torvalds 已提交
45

46
static void			rpc_async_schedule(struct work_struct *);
47
static void			 rpc_release_task(struct rpc_task *task);
48
static void __rpc_queue_timer_fn(struct timer_list *t);
L
Linus Torvalds 已提交
49 50 51 52

/*
 * RPC tasks sit here while waiting for conditions to improve.
 */
53
static struct rpc_wait_queue delay_queue;
L
Linus Torvalds 已提交
54 55 56 57

/*
 * rpciod-related stuff
 */
58 59
struct workqueue_struct *rpciod_workqueue __read_mostly;
struct workqueue_struct *xprtiod_workqueue __read_mostly;
L
Linus Torvalds 已提交
60 61 62 63 64 65

/*
 * Disable the timer for a given RPC task. Should be called with
 * queue->lock and bh_disabled in order to avoid races within
 * rpc_run_timer().
 */
66
static void
67
__rpc_disable_timer(struct rpc_wait_queue *queue, struct rpc_task *task)
L
Linus Torvalds 已提交
68
{
69 70
	if (task->tk_timeout == 0)
		return;
71
	dprintk("RPC: %5u disabling timer\n", task->tk_pid);
L
Linus Torvalds 已提交
72
	task->tk_timeout = 0;
73
	list_del(&task->u.tk_wait.timer_list);
74 75
	if (list_empty(&queue->timer_list.list))
		del_timer(&queue->timer_list.timer);
76 77 78 79 80 81 82
}

static void
rpc_set_queue_timer(struct rpc_wait_queue *queue, unsigned long expires)
{
	queue->timer_list.expires = expires;
	mod_timer(&queue->timer_list.timer, expires);
L
Linus Torvalds 已提交
83 84 85 86 87
}

/*
 * Set up a timer for the current task.
 */
88
static void
89
__rpc_add_timer(struct rpc_wait_queue *queue, struct rpc_task *task)
L
Linus Torvalds 已提交
90 91 92 93
{
	if (!task->tk_timeout)
		return;

94 95
	dprintk("RPC: %5u setting alarm for %u ms\n",
		task->tk_pid, jiffies_to_msecs(task->tk_timeout));
L
Linus Torvalds 已提交
96

97 98 99 100
	task->u.tk_wait.expires = jiffies + task->tk_timeout;
	if (list_empty(&queue->timer_list.list) || time_before(task->u.tk_wait.expires, queue->timer_list.expires))
		rpc_set_queue_timer(queue, task->u.tk_wait.expires);
	list_add(&task->u.tk_wait.timer_list, &queue->timer_list.list);
L
Linus Torvalds 已提交
101 102
}

103 104
static void rpc_set_waitqueue_priority(struct rpc_wait_queue *queue, int priority)
{
105 106
	if (queue->priority != priority) {
		queue->priority = priority;
107
		queue->nr = 1U << priority;
108
	}
109 110 111 112 113 114 115
}

static void rpc_reset_waitqueue_priority(struct rpc_wait_queue *queue)
{
	rpc_set_waitqueue_priority(queue, queue->maxpriority);
}

L
Linus Torvalds 已提交
116
/*
117
 * Add a request to a queue list
L
Linus Torvalds 已提交
118
 */
119 120
static void
__rpc_list_enqueue_task(struct list_head *q, struct rpc_task *task)
L
Linus Torvalds 已提交
121 122 123 124
{
	struct rpc_task *t;

	list_for_each_entry(t, q, u.tk_wait.list) {
125
		if (t->tk_owner == task->tk_owner) {
126 127 128 129 130
			list_add_tail(&task->u.tk_wait.links,
					&t->u.tk_wait.links);
			/* Cache the queue head in task->u.tk_wait.list */
			task->u.tk_wait.list.next = q;
			task->u.tk_wait.list.prev = NULL;
L
Linus Torvalds 已提交
131 132 133
			return;
		}
	}
134
	INIT_LIST_HEAD(&task->u.tk_wait.links);
L
Linus Torvalds 已提交
135 136 137
	list_add_tail(&task->u.tk_wait.list, q);
}

138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174
/*
 * Remove request from a queue list
 */
static void
__rpc_list_dequeue_task(struct rpc_task *task)
{
	struct list_head *q;
	struct rpc_task *t;

	if (task->u.tk_wait.list.prev == NULL) {
		list_del(&task->u.tk_wait.links);
		return;
	}
	if (!list_empty(&task->u.tk_wait.links)) {
		t = list_first_entry(&task->u.tk_wait.links,
				struct rpc_task,
				u.tk_wait.links);
		/* Assume __rpc_list_enqueue_task() cached the queue head */
		q = t->u.tk_wait.list.next;
		list_add_tail(&t->u.tk_wait.list, q);
		list_del(&task->u.tk_wait.links);
	}
	list_del(&task->u.tk_wait.list);
}

/*
 * Add new request to a priority queue.
 */
static void __rpc_add_wait_queue_priority(struct rpc_wait_queue *queue,
		struct rpc_task *task,
		unsigned char queue_priority)
{
	if (unlikely(queue_priority > queue->maxpriority))
		queue_priority = queue->maxpriority;
	__rpc_list_enqueue_task(&queue->tasks[queue_priority], task);
}

L
Linus Torvalds 已提交
175 176 177 178 179 180 181 182
/*
 * Add new request to wait queue.
 *
 * Swapper tasks always get inserted at the head of the queue.
 * This should avoid many nasty memory deadlocks and hopefully
 * improve overall performance.
 * Everyone else gets appended to the queue to ensure proper FIFO behavior.
 */
183 184 185
static void __rpc_add_wait_queue(struct rpc_wait_queue *queue,
		struct rpc_task *task,
		unsigned char queue_priority)
L
Linus Torvalds 已提交
186
{
187 188 189
	WARN_ON_ONCE(RPC_IS_QUEUED(task));
	if (RPC_IS_QUEUED(task))
		return;
L
Linus Torvalds 已提交
190 191

	if (RPC_IS_PRIORITY(queue))
192
		__rpc_add_wait_queue_priority(queue, task, queue_priority);
L
Linus Torvalds 已提交
193 194 195 196
	else if (RPC_IS_SWAPPER(task))
		list_add(&task->u.tk_wait.list, &queue->tasks[0]);
	else
		list_add_tail(&task->u.tk_wait.list, &queue->tasks[0]);
197
	task->tk_waitqueue = queue;
198
	queue->qlen++;
199 200
	/* barrier matches the read in rpc_wake_up_task_queue_locked() */
	smp_wmb();
L
Linus Torvalds 已提交
201 202
	rpc_set_queued(task);

203 204
	dprintk("RPC: %5u added to queue %p \"%s\"\n",
			task->tk_pid, queue, rpc_qname(queue));
L
Linus Torvalds 已提交
205 206 207 208 209 210 211
}

/*
 * Remove request from a priority queue.
 */
static void __rpc_remove_wait_queue_priority(struct rpc_task *task)
{
212
	__rpc_list_dequeue_task(task);
L
Linus Torvalds 已提交
213 214 215 216 217 218
}

/*
 * Remove request from queue.
 * Note: must be called with spin lock held.
 */
219
static void __rpc_remove_wait_queue(struct rpc_wait_queue *queue, struct rpc_task *task)
L
Linus Torvalds 已提交
220
{
221
	__rpc_disable_timer(queue, task);
L
Linus Torvalds 已提交
222 223
	if (RPC_IS_PRIORITY(queue))
		__rpc_remove_wait_queue_priority(task);
224 225
	else
		list_del(&task->u.tk_wait.list);
226
	queue->qlen--;
227 228
	dprintk("RPC: %5u removed from queue %p \"%s\"\n",
			task->tk_pid, queue, rpc_qname(queue));
L
Linus Torvalds 已提交
229 230
}

231
static void __rpc_init_priority_wait_queue(struct rpc_wait_queue *queue, const char *qname, unsigned char nr_queues)
L
Linus Torvalds 已提交
232 233 234 235 236 237
{
	int i;

	spin_lock_init(&queue->lock);
	for (i = 0; i < ARRAY_SIZE(queue->tasks); i++)
		INIT_LIST_HEAD(&queue->tasks[i]);
238
	queue->maxpriority = nr_queues - 1;
L
Linus Torvalds 已提交
239
	rpc_reset_waitqueue_priority(queue);
240
	queue->qlen = 0;
241
	timer_setup(&queue->timer_list.timer, __rpc_queue_timer_fn, 0);
242
	INIT_LIST_HEAD(&queue->timer_list.list);
243
	rpc_assign_waitqueue_name(queue, qname);
L
Linus Torvalds 已提交
244 245 246 247
}

void rpc_init_priority_wait_queue(struct rpc_wait_queue *queue, const char *qname)
{
248
	__rpc_init_priority_wait_queue(queue, qname, RPC_NR_PRIORITY);
L
Linus Torvalds 已提交
249
}
250
EXPORT_SYMBOL_GPL(rpc_init_priority_wait_queue);
L
Linus Torvalds 已提交
251 252 253

void rpc_init_wait_queue(struct rpc_wait_queue *queue, const char *qname)
{
254
	__rpc_init_priority_wait_queue(queue, qname, 1);
L
Linus Torvalds 已提交
255
}
256
EXPORT_SYMBOL_GPL(rpc_init_wait_queue);
L
Linus Torvalds 已提交
257

258 259
void rpc_destroy_wait_queue(struct rpc_wait_queue *queue)
{
260
	del_timer_sync(&queue->timer_list.timer);
261 262 263
}
EXPORT_SYMBOL_GPL(rpc_destroy_wait_queue);

264
static int rpc_wait_bit_killable(struct wait_bit_key *key, int mode)
265
{
266
	freezable_schedule_unsafe();
267 268
	if (signal_pending_state(mode, current))
		return -ERESTARTSYS;
269 270 271
	return 0;
}

J
Jeff Layton 已提交
272
#if IS_ENABLED(CONFIG_SUNRPC_DEBUG) || IS_ENABLED(CONFIG_TRACEPOINTS)
273 274 275 276 277 278 279 280 281 282 283 284
static void rpc_task_set_debuginfo(struct rpc_task *task)
{
	static atomic_t rpc_pid;

	task->tk_pid = atomic_inc_return(&rpc_pid);
}
#else
static inline void rpc_task_set_debuginfo(struct rpc_task *task)
{
}
#endif

285 286
static void rpc_set_active(struct rpc_task *task)
{
287
	rpc_task_set_debuginfo(task);
288
	set_bit(RPC_TASK_ACTIVE, &task->tk_runstate);
289
	trace_rpc_task_begin(task, NULL);
290 291
}

292 293
/*
 * Mark an RPC call as having completed by clearing the 'active' bit
294
 * and then waking up all tasks that were sleeping.
295
 */
296
static int rpc_complete_task(struct rpc_task *task)
297
{
298 299 300 301 302 303
	void *m = &task->tk_runstate;
	wait_queue_head_t *wq = bit_waitqueue(m, RPC_TASK_ACTIVE);
	struct wait_bit_key k = __WAIT_BIT_KEY_INITIALIZER(m, RPC_TASK_ACTIVE);
	unsigned long flags;
	int ret;

304
	trace_rpc_task_complete(task, NULL);
305

306
	spin_lock_irqsave(&wq->lock, flags);
307
	clear_bit(RPC_TASK_ACTIVE, &task->tk_runstate);
308 309
	ret = atomic_dec_and_test(&task->tk_count);
	if (waitqueue_active(wq))
310
		__wake_up_locked_key(wq, TASK_NORMAL, &k);
311 312
	spin_unlock_irqrestore(&wq->lock, flags);
	return ret;
313 314 315 316
}

/*
 * Allow callers to wait for completion of an RPC call
317 318 319 320
 *
 * Note the use of out_of_line_wait_on_bit() rather than wait_on_bit()
 * to enforce taking of the wq->lock and hence avoid races with
 * rpc_complete_task().
321
 */
322
int __rpc_wait_for_completion_task(struct rpc_task *task, wait_bit_action_f *action)
323 324
{
	if (action == NULL)
325
		action = rpc_wait_bit_killable;
326
	return out_of_line_wait_on_bit(&task->tk_runstate, RPC_TASK_ACTIVE,
327
			action, TASK_KILLABLE);
328
}
329
EXPORT_SYMBOL_GPL(__rpc_wait_for_completion_task);
330

L
Linus Torvalds 已提交
331 332 333
/*
 * Make an RPC task runnable.
 *
334 335 336
 * Note: If the task is ASYNC, and is being made runnable after sitting on an
 * rpc_wait_queue, this must be called with the queue spinlock held to protect
 * the wait queue operation.
337 338 339 340
 * Note the ordering of rpc_test_and_set_running() and rpc_clear_queued(),
 * which is needed to ensure that __rpc_execute() doesn't loop (due to the
 * lockless RPC_IS_QUEUED() test) before we've had a chance to test
 * the RPC_TASK_RUNNING flag.
L
Linus Torvalds 已提交
341
 */
342 343
static void rpc_make_runnable(struct workqueue_struct *wq,
		struct rpc_task *task)
L
Linus Torvalds 已提交
344
{
345 346
	bool need_wakeup = !rpc_test_and_set_running(task);

L
Linus Torvalds 已提交
347
	rpc_clear_queued(task);
348
	if (!need_wakeup)
349
		return;
L
Linus Torvalds 已提交
350
	if (RPC_IS_ASYNC(task)) {
351
		INIT_WORK(&task->u.tk_work, rpc_async_schedule);
352
		queue_work(wq, &task->u.tk_work);
L
Linus Torvalds 已提交
353
	} else
354
		wake_up_bit(&task->tk_runstate, RPC_TASK_QUEUED);
L
Linus Torvalds 已提交
355 356 357 358 359 360 361 362
}

/*
 * Prepare for sleeping on a wait queue.
 * By always appending tasks to the list we ensure FIFO behavior.
 * NB: An RPC task will only receive interrupt-driven events as long
 * as it's on a wait queue.
 */
363 364 365
static void __rpc_sleep_on_priority(struct rpc_wait_queue *q,
		struct rpc_task *task,
		unsigned char queue_priority)
L
Linus Torvalds 已提交
366
{
367 368
	dprintk("RPC: %5u sleep_on(queue \"%s\" time %lu)\n",
			task->tk_pid, rpc_qname(q), jiffies);
L
Linus Torvalds 已提交
369

370
	trace_rpc_task_sleep(task, q);
371

372
	__rpc_add_wait_queue(q, task, queue_priority);
L
Linus Torvalds 已提交
373

374
	__rpc_add_timer(q, task);
L
Linus Torvalds 已提交
375 376
}

377 378 379 380 381 382 383
static void rpc_set_tk_callback(struct rpc_task *task, rpc_action action)
{
	if (action && !WARN_ON_ONCE(task->tk_callback != NULL))
		task->tk_callback = action;
}

static bool rpc_sleep_check_activated(struct rpc_task *task)
L
Linus Torvalds 已提交
384
{
385
	/* We shouldn't ever put an inactive task to sleep */
386
	if (WARN_ON_ONCE(!RPC_IS_ACTIVATED(task))) {
387 388
		task->tk_status = -EIO;
		rpc_put_task_async(task);
389
		return false;
390
	}
391 392 393 394 395 396 397 398 399 400
	return true;
}

void rpc_sleep_on(struct rpc_wait_queue *q, struct rpc_task *task,
				rpc_action action)
{
	if (!rpc_sleep_check_activated(task))
		return;

	rpc_set_tk_callback(task, action);
401

L
Linus Torvalds 已提交
402 403 404 405
	/*
	 * Protect the queue operations.
	 */
	spin_lock_bh(&q->lock);
406
	__rpc_sleep_on_priority(q, task, task->tk_priority);
L
Linus Torvalds 已提交
407 408
	spin_unlock_bh(&q->lock);
}
409
EXPORT_SYMBOL_GPL(rpc_sleep_on);
L
Linus Torvalds 已提交
410

411 412 413
void rpc_sleep_on_priority(struct rpc_wait_queue *q, struct rpc_task *task,
		rpc_action action, int priority)
{
414
	if (!rpc_sleep_check_activated(task))
415
		return;
416 417

	rpc_set_tk_callback(task, action);
418 419 420 421 422

	/*
	 * Protect the queue operations.
	 */
	spin_lock_bh(&q->lock);
423
	__rpc_sleep_on_priority(q, task, priority - RPC_PRIORITY_LOW);
424 425
	spin_unlock_bh(&q->lock);
}
426
EXPORT_SYMBOL_GPL(rpc_sleep_on_priority);
427

L
Linus Torvalds 已提交
428
/**
429 430
 * __rpc_do_wake_up_task_on_wq - wake up a single rpc_task
 * @wq: workqueue on which to run task
431
 * @queue: wait queue
L
Linus Torvalds 已提交
432 433 434 435
 * @task: task to be woken up
 *
 * Caller must hold queue->lock, and have cleared the task queued flag.
 */
436 437 438
static void __rpc_do_wake_up_task_on_wq(struct workqueue_struct *wq,
		struct rpc_wait_queue *queue,
		struct rpc_task *task)
L
Linus Torvalds 已提交
439
{
440 441
	dprintk("RPC: %5u __rpc_wake_up_task (now %lu)\n",
			task->tk_pid, jiffies);
L
Linus Torvalds 已提交
442 443 444 445 446 447 448

	/* Has the task been executed yet? If not, we cannot wake it up! */
	if (!RPC_IS_ACTIVATED(task)) {
		printk(KERN_ERR "RPC: Inactive task (%p) being woken up!\n", task);
		return;
	}

449
	trace_rpc_task_wakeup(task, queue);
450

451
	__rpc_remove_wait_queue(queue, task);
L
Linus Torvalds 已提交
452

453
	rpc_make_runnable(wq, task);
L
Linus Torvalds 已提交
454

455
	dprintk("RPC:       __rpc_wake_up_task done\n");
L
Linus Torvalds 已提交
456 457 458
}

/*
459
 * Wake up a queued task while the queue lock is being held
L
Linus Torvalds 已提交
460
 */
461 462 463 464
static struct rpc_task *
rpc_wake_up_task_on_wq_queue_action_locked(struct workqueue_struct *wq,
		struct rpc_wait_queue *queue, struct rpc_task *task,
		bool (*action)(struct rpc_task *, void *), void *data)
L
Linus Torvalds 已提交
465
{
466 467
	if (RPC_IS_QUEUED(task)) {
		smp_rmb();
468 469 470 471 472 473
		if (task->tk_waitqueue == queue) {
			if (action == NULL || action(task, data)) {
				__rpc_do_wake_up_task_on_wq(wq, queue, task);
				return task;
			}
		}
474
	}
475 476 477 478 479 480 481 482
	return NULL;
}

static void
rpc_wake_up_task_on_wq_queue_locked(struct workqueue_struct *wq,
		struct rpc_wait_queue *queue, struct rpc_task *task)
{
	rpc_wake_up_task_on_wq_queue_action_locked(wq, queue, task, NULL, NULL);
L
Linus Torvalds 已提交
483 484
}

485 486 487 488 489 490 491 492
/*
 * Wake up a queued task while the queue lock is being held
 */
static void rpc_wake_up_task_queue_locked(struct rpc_wait_queue *queue, struct rpc_task *task)
{
	rpc_wake_up_task_on_wq_queue_locked(rpciod_workqueue, queue, task);
}

493 494 495 496 497 498 499
/*
 * Wake up a task on a specific queue
 */
void rpc_wake_up_queued_task_on_wq(struct workqueue_struct *wq,
		struct rpc_wait_queue *queue,
		struct rpc_task *task)
{
500 501
	if (!RPC_IS_QUEUED(task))
		return;
502 503 504 505 506
	spin_lock_bh(&queue->lock);
	rpc_wake_up_task_on_wq_queue_locked(wq, queue, task);
	spin_unlock_bh(&queue->lock);
}

L
Linus Torvalds 已提交
507
/*
508
 * Wake up a task on a specific queue
L
Linus Torvalds 已提交
509
 */
510
void rpc_wake_up_queued_task(struct rpc_wait_queue *queue, struct rpc_task *task)
L
Linus Torvalds 已提交
511
{
512 513
	if (!RPC_IS_QUEUED(task))
		return;
514
	spin_lock_bh(&queue->lock);
515
	rpc_wake_up_task_queue_locked(queue, task);
516
	spin_unlock_bh(&queue->lock);
L
Linus Torvalds 已提交
517
}
518 519
EXPORT_SYMBOL_GPL(rpc_wake_up_queued_task);

520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553
static bool rpc_task_action_set_status(struct rpc_task *task, void *status)
{
	task->tk_status = *(int *)status;
	return true;
}

static void
rpc_wake_up_task_queue_set_status_locked(struct rpc_wait_queue *queue,
		struct rpc_task *task, int status)
{
	rpc_wake_up_task_on_wq_queue_action_locked(rpciod_workqueue, queue,
			task, rpc_task_action_set_status, &status);
}

/**
 * rpc_wake_up_queued_task_set_status - wake up a task and set task->tk_status
 * @queue: pointer to rpc_wait_queue
 * @task: pointer to rpc_task
 * @status: integer error value
 *
 * If @task is queued on @queue, then it is woken up, and @task->tk_status is
 * set to the value of @status.
 */
void
rpc_wake_up_queued_task_set_status(struct rpc_wait_queue *queue,
		struct rpc_task *task, int status)
{
	if (!RPC_IS_QUEUED(task))
		return;
	spin_lock_bh(&queue->lock);
	rpc_wake_up_task_queue_set_status_locked(queue, task, status);
	spin_unlock_bh(&queue->lock);
}

L
Linus Torvalds 已提交
554 555 556
/*
 * Wake up the next task on a priority queue.
 */
557
static struct rpc_task *__rpc_find_next_queued_priority(struct rpc_wait_queue *queue)
L
Linus Torvalds 已提交
558 559 560 561 562
{
	struct list_head *q;
	struct rpc_task *task;

	/*
563
	 * Service a batch of tasks from a single owner.
L
Linus Torvalds 已提交
564 565
	 */
	q = &queue->tasks[queue->priority];
566 567 568
	if (!list_empty(q) && --queue->nr) {
		task = list_first_entry(q, struct rpc_task, u.tk_wait.list);
		goto out;
L
Linus Torvalds 已提交
569 570 571 572 573 574 575 576 577 578 579
	}

	/*
	 * Service the next queue.
	 */
	do {
		if (q == &queue->tasks[0])
			q = &queue->tasks[queue->maxpriority];
		else
			q = q - 1;
		if (!list_empty(q)) {
580
			task = list_first_entry(q, struct rpc_task, u.tk_wait.list);
L
Linus Torvalds 已提交
581 582 583 584 585 586 587 588 589 590 591 592 593
			goto new_queue;
		}
	} while (q != &queue->tasks[queue->priority]);

	rpc_reset_waitqueue_priority(queue);
	return NULL;

new_queue:
	rpc_set_waitqueue_priority(queue, (unsigned int)(q - &queue->tasks[0]));
out:
	return task;
}

594 595 596 597 598 599 600 601 602
static struct rpc_task *__rpc_find_next_queued(struct rpc_wait_queue *queue)
{
	if (RPC_IS_PRIORITY(queue))
		return __rpc_find_next_queued_priority(queue);
	if (!list_empty(&queue->tasks[0]))
		return list_first_entry(&queue->tasks[0], struct rpc_task, u.tk_wait.list);
	return NULL;
}

L
Linus Torvalds 已提交
603
/*
604
 * Wake up the first task on the wait queue.
L
Linus Torvalds 已提交
605
 */
606 607
struct rpc_task *rpc_wake_up_first_on_wq(struct workqueue_struct *wq,
		struct rpc_wait_queue *queue,
608
		bool (*func)(struct rpc_task *, void *), void *data)
L
Linus Torvalds 已提交
609 610 611
{
	struct rpc_task	*task = NULL;

612
	dprintk("RPC:       wake_up_first(%p \"%s\")\n",
613
			queue, rpc_qname(queue));
614
	spin_lock_bh(&queue->lock);
615
	task = __rpc_find_next_queued(queue);
616 617 618
	if (task != NULL)
		task = rpc_wake_up_task_on_wq_queue_action_locked(wq, queue,
				task, func, data);
619
	spin_unlock_bh(&queue->lock);
L
Linus Torvalds 已提交
620 621 622

	return task;
}
623 624 625 626 627 628 629 630 631

/*
 * Wake up the first task on the wait queue.
 */
struct rpc_task *rpc_wake_up_first(struct rpc_wait_queue *queue,
		bool (*func)(struct rpc_task *, void *), void *data)
{
	return rpc_wake_up_first_on_wq(rpciod_workqueue, queue, func, data);
}
632 633 634 635 636 637 638 639 640 641 642 643 644 645
EXPORT_SYMBOL_GPL(rpc_wake_up_first);

static bool rpc_wake_up_next_func(struct rpc_task *task, void *data)
{
	return true;
}

/*
 * Wake up the next task on the wait queue.
*/
struct rpc_task *rpc_wake_up_next(struct rpc_wait_queue *queue)
{
	return rpc_wake_up_first(queue, rpc_wake_up_next_func, NULL);
}
646
EXPORT_SYMBOL_GPL(rpc_wake_up_next);
L
Linus Torvalds 已提交
647 648 649 650 651 652 653 654 655 656

/**
 * rpc_wake_up - wake up all rpc_tasks
 * @queue: rpc_wait_queue on which the tasks are sleeping
 *
 * Grabs queue->lock
 */
void rpc_wake_up(struct rpc_wait_queue *queue)
{
	struct list_head *head;
657

658
	spin_lock_bh(&queue->lock);
L
Linus Torvalds 已提交
659 660
	head = &queue->tasks[queue->maxpriority];
	for (;;) {
661 662 663 664 665
		while (!list_empty(head)) {
			struct rpc_task *task;
			task = list_first_entry(head,
					struct rpc_task,
					u.tk_wait.list);
666
			rpc_wake_up_task_queue_locked(queue, task);
667
		}
L
Linus Torvalds 已提交
668 669 670 671
		if (head == &queue->tasks[0])
			break;
		head--;
	}
672
	spin_unlock_bh(&queue->lock);
L
Linus Torvalds 已提交
673
}
674
EXPORT_SYMBOL_GPL(rpc_wake_up);
L
Linus Torvalds 已提交
675 676 677 678 679 680 681 682 683 684 685 686

/**
 * rpc_wake_up_status - wake up all rpc_tasks and set their status value.
 * @queue: rpc_wait_queue on which the tasks are sleeping
 * @status: status value to set
 *
 * Grabs queue->lock
 */
void rpc_wake_up_status(struct rpc_wait_queue *queue, int status)
{
	struct list_head *head;

687
	spin_lock_bh(&queue->lock);
L
Linus Torvalds 已提交
688 689
	head = &queue->tasks[queue->maxpriority];
	for (;;) {
690 691 692 693 694
		while (!list_empty(head)) {
			struct rpc_task *task;
			task = list_first_entry(head,
					struct rpc_task,
					u.tk_wait.list);
L
Linus Torvalds 已提交
695
			task->tk_status = status;
696
			rpc_wake_up_task_queue_locked(queue, task);
L
Linus Torvalds 已提交
697 698 699 700 701
		}
		if (head == &queue->tasks[0])
			break;
		head--;
	}
702
	spin_unlock_bh(&queue->lock);
L
Linus Torvalds 已提交
703
}
704
EXPORT_SYMBOL_GPL(rpc_wake_up_status);
L
Linus Torvalds 已提交
705

706
static void __rpc_queue_timer_fn(struct timer_list *t)
707
{
708
	struct rpc_wait_queue *queue = from_timer(queue, t, timer_list.timer);
709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729
	struct rpc_task *task, *n;
	unsigned long expires, now, timeo;

	spin_lock(&queue->lock);
	expires = now = jiffies;
	list_for_each_entry_safe(task, n, &queue->timer_list.list, u.tk_wait.timer_list) {
		timeo = task->u.tk_wait.expires;
		if (time_after_eq(now, timeo)) {
			dprintk("RPC: %5u timeout\n", task->tk_pid);
			task->tk_status = -ETIMEDOUT;
			rpc_wake_up_task_queue_locked(queue, task);
			continue;
		}
		if (expires == now || time_after(expires, timeo))
			expires = timeo;
	}
	if (!list_empty(&queue->timer_list.list))
		rpc_set_queue_timer(queue, expires);
	spin_unlock(&queue->lock);
}

730 731
static void __rpc_atrun(struct rpc_task *task)
{
732 733
	if (task->tk_status == -ETIMEDOUT)
		task->tk_status = 0;
734 735
}

L
Linus Torvalds 已提交
736 737 738
/*
 * Run a task at a later time
 */
739
void rpc_delay(struct rpc_task *task, unsigned long delay)
L
Linus Torvalds 已提交
740 741
{
	task->tk_timeout = delay;
742
	rpc_sleep_on(&delay_queue, task, __rpc_atrun);
L
Linus Torvalds 已提交
743
}
744
EXPORT_SYMBOL_GPL(rpc_delay);
L
Linus Torvalds 已提交
745

T
Trond Myklebust 已提交
746 747 748
/*
 * Helper to call task->tk_ops->rpc_call_prepare
 */
749
void rpc_prepare_task(struct rpc_task *task)
T
Trond Myklebust 已提交
750 751 752 753
{
	task->tk_ops->rpc_call_prepare(task, task->tk_calldata);
}

754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769
static void
rpc_init_task_statistics(struct rpc_task *task)
{
	/* Initialize retry counters */
	task->tk_garb_retry = 2;
	task->tk_cred_retry = 2;
	task->tk_rebind_retry = 2;

	/* starting timestamp */
	task->tk_start = ktime_get();
}

static void
rpc_reset_task_statistics(struct rpc_task *task)
{
	task->tk_timeouts = 0;
T
Trond Myklebust 已提交
770
	task->tk_flags &= ~(RPC_CALL_MAJORSEEN|RPC_TASK_SENT);
771 772 773
	rpc_init_task_statistics(task);
}

774
/*
775
 * Helper that calls task->tk_ops->rpc_call_done if it exists
776
 */
T
Trond Myklebust 已提交
777
void rpc_exit_task(struct rpc_task *task)
778
{
T
Trond Myklebust 已提交
779
	task->tk_action = NULL;
780 781
	if (task->tk_ops->rpc_call_done != NULL) {
		task->tk_ops->rpc_call_done(task, task->tk_calldata);
782
		if (task->tk_action != NULL) {
T
Trond Myklebust 已提交
783 784
			/* Always release the RPC slot and buffer memory */
			xprt_release(task);
785
			rpc_reset_task_statistics(task);
786 787 788
		}
	}
}
789

T
Trond Myklebust 已提交
790 791 792 793 794 795 796 797 798 799 800 801 802
void rpc_signal_task(struct rpc_task *task)
{
	struct rpc_wait_queue *queue;

	if (!RPC_IS_ACTIVATED(task))
		return;
	set_bit(RPC_TASK_SIGNALLED, &task->tk_runstate);
	smp_mb__after_atomic();
	queue = READ_ONCE(task->tk_waitqueue);
	if (queue)
		rpc_wake_up_queued_task_set_status(queue, task, -ERESTARTSYS);
}

803 804 805 806
void rpc_exit(struct rpc_task *task, int status)
{
	task->tk_status = status;
	task->tk_action = rpc_exit_task;
807
	rpc_wake_up_queued_task(task->tk_waitqueue, task);
808 809
}
EXPORT_SYMBOL_GPL(rpc_exit);
810

811 812
void rpc_release_calldata(const struct rpc_call_ops *ops, void *calldata)
{
813
	if (ops->rpc_release != NULL)
814 815 816
		ops->rpc_release(calldata);
}

L
Linus Torvalds 已提交
817 818 819
/*
 * This is the RPC `scheduler' (or rather, the finite state machine).
 */
T
Trond Myklebust 已提交
820
static void __rpc_execute(struct rpc_task *task)
L
Linus Torvalds 已提交
821
{
822 823 824
	struct rpc_wait_queue *queue;
	int task_is_async = RPC_IS_ASYNC(task);
	int status = 0;
L
Linus Torvalds 已提交
825

826 827
	dprintk("RPC: %5u __rpc_execute flags=0x%x\n",
			task->tk_pid, task->tk_flags);
L
Linus Torvalds 已提交
828

829 830 831
	WARN_ON_ONCE(RPC_IS_QUEUED(task));
	if (RPC_IS_QUEUED(task))
		return;
L
Linus Torvalds 已提交
832

833
	for (;;) {
834
		void (*do_action)(struct rpc_task *);
L
Linus Torvalds 已提交
835 836

		/*
837 838 839 840 841
		 * Perform the next FSM step or a pending callback.
		 *
		 * tk_action may be NULL if the task has been killed.
		 * In particular, note that rpc_killall_tasks may
		 * do this at any time, so beware when dereferencing.
L
Linus Torvalds 已提交
842
		 */
843 844 845 846
		do_action = task->tk_action;
		if (task->tk_callback) {
			do_action = task->tk_callback;
			task->tk_callback = NULL;
L
Linus Torvalds 已提交
847
		}
848 849
		if (!do_action)
			break;
850
		trace_rpc_task_run_action(task, do_action);
851
		do_action(task);
L
Linus Torvalds 已提交
852 853 854 855 856 857

		/*
		 * Lockless check for whether task is sleeping or not.
		 */
		if (!RPC_IS_QUEUED(task))
			continue;
T
Trond Myklebust 已提交
858 859 860 861 862 863 864

		/*
		 * Signalled tasks should exit rather than sleep.
		 */
		if (RPC_SIGNALLED(task))
			rpc_exit(task, -ERESTARTSYS);

865 866 867 868 869 870 871 872 873 874 875 876 877
		/*
		 * The queue->lock protects against races with
		 * rpc_make_runnable().
		 *
		 * Note that once we clear RPC_TASK_RUNNING on an asynchronous
		 * rpc_task, rpc_make_runnable() can assign it to a
		 * different workqueue. We therefore cannot assume that the
		 * rpc_task pointer may still be dereferenced.
		 */
		queue = task->tk_waitqueue;
		spin_lock_bh(&queue->lock);
		if (!RPC_IS_QUEUED(task)) {
			spin_unlock_bh(&queue->lock);
L
Linus Torvalds 已提交
878 879
			continue;
		}
880 881 882 883
		rpc_clear_running(task);
		spin_unlock_bh(&queue->lock);
		if (task_is_async)
			return;
L
Linus Torvalds 已提交
884 885

		/* sync task: sleep here */
886
		dprintk("RPC: %5u sync task going to sleep\n", task->tk_pid);
887
		status = out_of_line_wait_on_bit(&task->tk_runstate,
888 889
				RPC_TASK_QUEUED, rpc_wait_bit_killable,
				TASK_KILLABLE);
T
Trond Myklebust 已提交
890
		if (status < 0) {
L
Linus Torvalds 已提交
891 892 893 894 895 896
			/*
			 * When a sync task receives a signal, it exits with
			 * -ERESTARTSYS. In order to catch any callbacks that
			 * clean up after sleeping on some queue, we don't
			 * break the loop here, but go around once more.
			 */
897
			dprintk("RPC: %5u got signal\n", task->tk_pid);
T
Trond Myklebust 已提交
898
			set_bit(RPC_TASK_SIGNALLED, &task->tk_runstate);
899
			rpc_exit(task, -ERESTARTSYS);
L
Linus Torvalds 已提交
900
		}
901
		dprintk("RPC: %5u sync task resuming\n", task->tk_pid);
L
Linus Torvalds 已提交
902 903
	}

904 905
	dprintk("RPC: %5u return %d, status %d\n", task->tk_pid, status,
			task->tk_status);
L
Linus Torvalds 已提交
906 907 908 909 910 911 912 913 914 915 916 917 918
	/* Release all resources associated with the task */
	rpc_release_task(task);
}

/*
 * User-visible entry point to the scheduler.
 *
 * This may be called recursively if e.g. an async NFS task updates
 * the attributes and finds that dirty pages must be flushed.
 * NOTE: Upon exit of this function the task is guaranteed to be
 *	 released. In particular note that tk_release() will have
 *	 been called, so your task memory may have been freed.
 */
T
Trond Myklebust 已提交
919
void rpc_execute(struct rpc_task *task)
L
Linus Torvalds 已提交
920
{
921 922
	bool is_async = RPC_IS_ASYNC(task);

923
	rpc_set_active(task);
924
	rpc_make_runnable(rpciod_workqueue, task);
925
	if (!is_async)
926
		__rpc_execute(task);
L
Linus Torvalds 已提交
927 928
}

929
static void rpc_async_schedule(struct work_struct *work)
L
Linus Torvalds 已提交
930
{
931 932
	unsigned int pflags = memalloc_nofs_save();

933
	__rpc_execute(container_of(work, struct rpc_task, u.tk_work));
934
	memalloc_nofs_restore(pflags);
L
Linus Torvalds 已提交
935 936
}

937
/**
938 939 940 941 942 943
 * rpc_malloc - allocate RPC buffer resources
 * @task: RPC task
 *
 * A single memory region is allocated, which is split between the
 * RPC call and RPC reply that this task is being used for. When
 * this RPC is retired, the memory is released by calling rpc_free.
L
Linus Torvalds 已提交
944
 *
945
 * To prevent rpciod from hanging, this allocator never sleeps,
946 947 948
 * returning -ENOMEM and suppressing warning if the request cannot
 * be serviced immediately. The caller can arrange to sleep in a
 * way that is safe for rpciod.
949 950 951 952
 *
 * Most requests are 'small' (under 2KiB) and can be serviced from a
 * mempool, ensuring that NFS reads and writes can always proceed,
 * and that there is good locality of reference for these buffers.
L
Linus Torvalds 已提交
953
 */
954
int rpc_malloc(struct rpc_task *task)
L
Linus Torvalds 已提交
955
{
956 957
	struct rpc_rqst *rqst = task->tk_rqstp;
	size_t size = rqst->rq_callsize + rqst->rq_rcvsize;
958
	struct rpc_buffer *buf;
959
	gfp_t gfp = GFP_NOFS;
M
Mel Gorman 已提交
960 961

	if (RPC_IS_SWAPPER(task))
962
		gfp = __GFP_MEMALLOC | GFP_NOWAIT | __GFP_NOWARN;
L
Linus Torvalds 已提交
963

964
	size += sizeof(struct rpc_buffer);
965 966
	if (size <= RPC_BUFFER_MAXSIZE)
		buf = mempool_alloc(rpc_buffer_mempool, gfp);
L
Linus Torvalds 已提交
967
	else
968
		buf = kmalloc(size, gfp);
969 970

	if (!buf)
971
		return -ENOMEM;
972

973
	buf->len = size;
G
Geert Uytterhoeven 已提交
974
	dprintk("RPC: %5u allocated buffer of size %zu at %p\n",
975
			task->tk_pid, size, buf);
976
	rqst->rq_buffer = buf->data;
977
	rqst->rq_rbuffer = (char *)rqst->rq_buffer + rqst->rq_callsize;
978
	return 0;
L
Linus Torvalds 已提交
979
}
980
EXPORT_SYMBOL_GPL(rpc_malloc);
L
Linus Torvalds 已提交
981

982
/**
983 984
 * rpc_free - free RPC buffer resources allocated via rpc_malloc
 * @task: RPC task
985 986
 *
 */
987
void rpc_free(struct rpc_task *task)
L
Linus Torvalds 已提交
988
{
989
	void *buffer = task->tk_rqstp->rq_buffer;
990 991
	size_t size;
	struct rpc_buffer *buf;
992

993 994
	buf = container_of(buffer, struct rpc_buffer, data);
	size = buf->len;
995

G
Geert Uytterhoeven 已提交
996
	dprintk("RPC:       freeing buffer of size %zu at %p\n",
997
			size, buf);
998

999 1000 1001 1002
	if (size <= RPC_BUFFER_MAXSIZE)
		mempool_free(buf, rpc_buffer_mempool);
	else
		kfree(buf);
L
Linus Torvalds 已提交
1003
}
1004
EXPORT_SYMBOL_GPL(rpc_free);
L
Linus Torvalds 已提交
1005 1006 1007 1008

/*
 * Creation and deletion of RPC task structures
 */
1009
static void rpc_init_task(struct rpc_task *task, const struct rpc_task_setup *task_setup_data)
L
Linus Torvalds 已提交
1010 1011
{
	memset(task, 0, sizeof(*task));
1012
	atomic_set(&task->tk_count, 1);
1013 1014 1015
	task->tk_flags  = task_setup_data->flags;
	task->tk_ops = task_setup_data->callback_ops;
	task->tk_calldata = task_setup_data->callback_data;
1016
	INIT_LIST_HEAD(&task->tk_task);
L
Linus Torvalds 已提交
1017

1018 1019
	task->tk_priority = task_setup_data->priority - RPC_PRIORITY_LOW;
	task->tk_owner = current->tgid;
L
Linus Torvalds 已提交
1020 1021

	/* Initialize workqueue for async tasks */
1022
	task->tk_workqueue = task_setup_data->workqueue;
L
Linus Torvalds 已提交
1023

1024 1025
	task->tk_xprt = xprt_get(task_setup_data->rpc_xprt);

1026 1027
	task->tk_op_cred = get_rpccred(task_setup_data->rpc_op_cred);

1028 1029
	if (task->tk_ops->rpc_call_prepare != NULL)
		task->tk_action = rpc_prepare_task;
1030

1031
	rpc_init_task_statistics(task);
1032

1033
	dprintk("RPC:       new task initialized, procpid %u\n",
1034
				task_pid_nr(current));
L
Linus Torvalds 已提交
1035 1036 1037 1038 1039
}

static struct rpc_task *
rpc_alloc_task(void)
{
1040
	return (struct rpc_task *)mempool_alloc(rpc_task_mempool, GFP_NOFS);
L
Linus Torvalds 已提交
1041 1042 1043
}

/*
1044
 * Create a new task for the specified client.
L
Linus Torvalds 已提交
1045
 */
1046
struct rpc_task *rpc_new_task(const struct rpc_task_setup *setup_data)
L
Linus Torvalds 已提交
1047
{
1048 1049 1050 1051 1052 1053 1054
	struct rpc_task	*task = setup_data->task;
	unsigned short flags = 0;

	if (task == NULL) {
		task = rpc_alloc_task();
		flags = RPC_TASK_DYNAMIC;
	}
L
Linus Torvalds 已提交
1055

1056
	rpc_init_task(task, setup_data);
1057
	task->tk_flags |= flags;
1058
	dprintk("RPC:       allocated task %p\n", task);
L
Linus Torvalds 已提交
1059 1060 1061
	return task;
}

1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080
/*
 * rpc_free_task - release rpc task and perform cleanups
 *
 * Note that we free up the rpc_task _after_ rpc_release_calldata()
 * in order to work around a workqueue dependency issue.
 *
 * Tejun Heo states:
 * "Workqueue currently considers two work items to be the same if they're
 * on the same address and won't execute them concurrently - ie. it
 * makes a work item which is queued again while being executed wait
 * for the previous execution to complete.
 *
 * If a work function frees the work item, and then waits for an event
 * which should be performed by another work item and *that* work item
 * recycles the freed work item, it can create a false dependency loop.
 * There really is no reliable way to detect this short of verifying
 * every memory free."
 *
 */
1081
static void rpc_free_task(struct rpc_task *task)
L
Linus Torvalds 已提交
1082
{
1083 1084
	unsigned short tk_flags = task->tk_flags;

1085
	put_rpccred(task->tk_op_cred);
1086
	rpc_release_calldata(task->tk_ops, task->tk_calldata);
L
Linus Torvalds 已提交
1087

1088
	if (tk_flags & RPC_TASK_DYNAMIC) {
1089 1090 1091
		dprintk("RPC: %5u freeing task\n", task->tk_pid);
		mempool_free(task, rpc_task_mempool);
	}
1092 1093 1094 1095
}

static void rpc_async_release(struct work_struct *work)
{
1096 1097
	unsigned int pflags = memalloc_nofs_save();

1098
	rpc_free_task(container_of(work, struct rpc_task, u.tk_work));
1099
	memalloc_nofs_restore(pflags);
1100 1101
}

1102
static void rpc_release_resources_task(struct rpc_task *task)
1103
{
1104
	xprt_release(task);
1105
	if (task->tk_msg.rpc_cred) {
1106
		put_cred(task->tk_msg.rpc_cred);
1107 1108
		task->tk_msg.rpc_cred = NULL;
	}
1109
	rpc_task_release_client(task);
1110 1111 1112 1113 1114 1115
}

static void rpc_final_put_task(struct rpc_task *task,
		struct workqueue_struct *q)
{
	if (q != NULL) {
1116
		INIT_WORK(&task->u.tk_work, rpc_async_release);
1117
		queue_work(q, &task->u.tk_work);
1118 1119
	} else
		rpc_free_task(task);
1120
}
1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133

static void rpc_do_put_task(struct rpc_task *task, struct workqueue_struct *q)
{
	if (atomic_dec_and_test(&task->tk_count)) {
		rpc_release_resources_task(task);
		rpc_final_put_task(task, q);
	}
}

void rpc_put_task(struct rpc_task *task)
{
	rpc_do_put_task(task, NULL);
}
1134
EXPORT_SYMBOL_GPL(rpc_put_task);
1135

1136 1137 1138 1139 1140 1141
void rpc_put_task_async(struct rpc_task *task)
{
	rpc_do_put_task(task, task->tk_workqueue);
}
EXPORT_SYMBOL_GPL(rpc_put_task_async);

1142
static void rpc_release_task(struct rpc_task *task)
1143
{
1144
	dprintk("RPC: %5u release task\n", task->tk_pid);
L
Linus Torvalds 已提交
1145

1146
	WARN_ON_ONCE(RPC_IS_QUEUED(task));
L
Linus Torvalds 已提交
1147

1148
	rpc_release_resources_task(task);
1149

1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163
	/*
	 * Note: at this point we have been removed from rpc_clnt->cl_tasks,
	 * so it should be safe to use task->tk_count as a test for whether
	 * or not any other processes still hold references to our rpc_task.
	 */
	if (atomic_read(&task->tk_count) != 1 + !RPC_IS_ASYNC(task)) {
		/* Wake up anyone who may be waiting for task completion */
		if (!rpc_complete_task(task))
			return;
	} else {
		if (!atomic_dec_and_test(&task->tk_count))
			return;
	}
	rpc_final_put_task(task, task->tk_workqueue);
L
Linus Torvalds 已提交
1164 1165
}

1166 1167 1168 1169 1170 1171 1172 1173 1174 1175
int rpciod_up(void)
{
	return try_module_get(THIS_MODULE) ? 0 : -EINVAL;
}

void rpciod_down(void)
{
	module_put(THIS_MODULE);
}

L
Linus Torvalds 已提交
1176
/*
1177
 * Start up the rpciod workqueue.
L
Linus Torvalds 已提交
1178
 */
1179
static int rpciod_start(void)
L
Linus Torvalds 已提交
1180 1181
{
	struct workqueue_struct *wq;
T
Trond Myklebust 已提交
1182

L
Linus Torvalds 已提交
1183 1184 1185
	/*
	 * Create the rpciod thread and wait for it to start.
	 */
T
Trond Myklebust 已提交
1186
	dprintk("RPC:       creating workqueue rpciod\n");
1187
	wq = alloc_workqueue("rpciod", WQ_MEM_RECLAIM | WQ_UNBOUND, 0);
1188 1189
	if (!wq)
		goto out_failed;
L
Linus Torvalds 已提交
1190
	rpciod_workqueue = wq;
1191
	/* Note: highpri because network receive is latency sensitive */
1192
	wq = alloc_workqueue("xprtiod", WQ_UNBOUND|WQ_MEM_RECLAIM|WQ_HIGHPRI, 0);
1193 1194 1195 1196 1197 1198 1199 1200 1201 1202
	if (!wq)
		goto free_rpciod;
	xprtiod_workqueue = wq;
	return 1;
free_rpciod:
	wq = rpciod_workqueue;
	rpciod_workqueue = NULL;
	destroy_workqueue(wq);
out_failed:
	return 0;
L
Linus Torvalds 已提交
1203 1204
}

1205
static void rpciod_stop(void)
L
Linus Torvalds 已提交
1206
{
1207
	struct workqueue_struct *wq = NULL;
T
Trond Myklebust 已提交
1208

1209 1210
	if (rpciod_workqueue == NULL)
		return;
T
Trond Myklebust 已提交
1211
	dprintk("RPC:       destroying workqueue rpciod\n");
L
Linus Torvalds 已提交
1212

1213 1214 1215
	wq = rpciod_workqueue;
	rpciod_workqueue = NULL;
	destroy_workqueue(wq);
1216 1217 1218
	wq = xprtiod_workqueue;
	xprtiod_workqueue = NULL;
	destroy_workqueue(wq);
L
Linus Torvalds 已提交
1219 1220 1221 1222 1223
}

void
rpc_destroy_mempool(void)
{
1224
	rpciod_stop();
1225 1226 1227 1228
	mempool_destroy(rpc_buffer_mempool);
	mempool_destroy(rpc_task_mempool);
	kmem_cache_destroy(rpc_task_slabp);
	kmem_cache_destroy(rpc_buffer_slabp);
1229
	rpc_destroy_wait_queue(&delay_queue);
L
Linus Torvalds 已提交
1230 1231 1232 1233 1234
}

int
rpc_init_mempool(void)
{
1235 1236 1237 1238 1239 1240 1241 1242
	/*
	 * The following is not strictly a mempool initialisation,
	 * but there is no harm in doing it here
	 */
	rpc_init_wait_queue(&delay_queue, "delayq");
	if (!rpciod_start())
		goto err_nomem;

L
Linus Torvalds 已提交
1243 1244 1245
	rpc_task_slabp = kmem_cache_create("rpc_tasks",
					     sizeof(struct rpc_task),
					     0, SLAB_HWCACHE_ALIGN,
1246
					     NULL);
L
Linus Torvalds 已提交
1247 1248 1249 1250 1251
	if (!rpc_task_slabp)
		goto err_nomem;
	rpc_buffer_slabp = kmem_cache_create("rpc_buffers",
					     RPC_BUFFER_MAXSIZE,
					     0, SLAB_HWCACHE_ALIGN,
1252
					     NULL);
L
Linus Torvalds 已提交
1253 1254
	if (!rpc_buffer_slabp)
		goto err_nomem;
1255 1256
	rpc_task_mempool = mempool_create_slab_pool(RPC_TASK_POOLSIZE,
						    rpc_task_slabp);
L
Linus Torvalds 已提交
1257 1258
	if (!rpc_task_mempool)
		goto err_nomem;
1259 1260
	rpc_buffer_mempool = mempool_create_slab_pool(RPC_BUFFER_POOLSIZE,
						      rpc_buffer_slabp);
L
Linus Torvalds 已提交
1261 1262 1263 1264 1265 1266 1267
	if (!rpc_buffer_mempool)
		goto err_nomem;
	return 0;
err_nomem:
	rpc_destroy_mempool();
	return -ENOMEM;
}