sched.c 31.7 KB
Newer Older
L
Linus Torvalds 已提交
1 2 3 4 5 6
/*
 * linux/net/sunrpc/sched.c
 *
 * Scheduling for synchronous and asynchronous RPC requests.
 *
 * Copyright (C) 1996 Olaf Kirch, <okir@monad.swb.de>
7
 *
L
Linus Torvalds 已提交
8 9 10 11 12 13 14 15 16 17 18 19
 * TCP NFS related read + write fixes
 * (C) 1999 Dave Airlie, University of Limerick, Ireland <airlied@linux.ie>
 */

#include <linux/module.h>

#include <linux/sched.h>
#include <linux/interrupt.h>
#include <linux/slab.h>
#include <linux/mempool.h>
#include <linux/smp.h>
#include <linux/spinlock.h>
A
Arjan van de Ven 已提交
20
#include <linux/mutex.h>
21
#include <linux/freezer.h>
L
Linus Torvalds 已提交
22 23 24

#include <linux/sunrpc/clnt.h>

25 26
#include "sunrpc.h"

J
Jeff Layton 已提交
27
#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
L
Linus Torvalds 已提交
28 29 30
#define RPCDBG_FACILITY		RPCDBG_SCHED
#endif

31 32 33
#define CREATE_TRACE_POINTS
#include <trace/events/sunrpc.h>

L
Linus Torvalds 已提交
34 35 36 37 38 39
/*
 * RPC slabs and memory pools
 */
#define RPC_BUFFER_MAXSIZE	(2048)
#define RPC_BUFFER_POOLSIZE	(8)
#define RPC_TASK_POOLSIZE	(8)
40 41
static struct kmem_cache	*rpc_task_slabp __read_mostly;
static struct kmem_cache	*rpc_buffer_slabp __read_mostly;
42 43
static mempool_t	*rpc_task_mempool __read_mostly;
static mempool_t	*rpc_buffer_mempool __read_mostly;
L
Linus Torvalds 已提交
44

45
static void			rpc_async_schedule(struct work_struct *);
46
static void			 rpc_release_task(struct rpc_task *task);
47
static void __rpc_queue_timer_fn(struct timer_list *t);
L
Linus Torvalds 已提交
48 49 50 51

/*
 * RPC tasks sit here while waiting for conditions to improve.
 */
52
static struct rpc_wait_queue delay_queue;
L
Linus Torvalds 已提交
53 54 55 56

/*
 * rpciod-related stuff
 */
57 58
struct workqueue_struct *rpciod_workqueue __read_mostly;
struct workqueue_struct *xprtiod_workqueue __read_mostly;
L
Linus Torvalds 已提交
59 60 61 62 63 64

/*
 * Disable the timer for a given RPC task. Should be called with
 * queue->lock and bh_disabled in order to avoid races within
 * rpc_run_timer().
 */
65
static void
66
__rpc_disable_timer(struct rpc_wait_queue *queue, struct rpc_task *task)
L
Linus Torvalds 已提交
67
{
68 69
	if (task->tk_timeout == 0)
		return;
70
	dprintk("RPC: %5u disabling timer\n", task->tk_pid);
L
Linus Torvalds 已提交
71
	task->tk_timeout = 0;
72
	list_del(&task->u.tk_wait.timer_list);
73 74
	if (list_empty(&queue->timer_list.list))
		del_timer(&queue->timer_list.timer);
75 76 77 78 79 80 81
}

static void
rpc_set_queue_timer(struct rpc_wait_queue *queue, unsigned long expires)
{
	queue->timer_list.expires = expires;
	mod_timer(&queue->timer_list.timer, expires);
L
Linus Torvalds 已提交
82 83 84 85 86
}

/*
 * Set up a timer for the current task.
 */
87
static void
88
__rpc_add_timer(struct rpc_wait_queue *queue, struct rpc_task *task)
L
Linus Torvalds 已提交
89 90 91 92
{
	if (!task->tk_timeout)
		return;

93 94
	dprintk("RPC: %5u setting alarm for %u ms\n",
		task->tk_pid, jiffies_to_msecs(task->tk_timeout));
L
Linus Torvalds 已提交
95

96 97 98 99
	task->u.tk_wait.expires = jiffies + task->tk_timeout;
	if (list_empty(&queue->timer_list.list) || time_before(task->u.tk_wait.expires, queue->timer_list.expires))
		rpc_set_queue_timer(queue, task->u.tk_wait.expires);
	list_add(&task->u.tk_wait.timer_list, &queue->timer_list.list);
L
Linus Torvalds 已提交
100 101
}

102 103 104 105 106 107 108 109 110 111 112 113
static void rpc_rotate_queue_owner(struct rpc_wait_queue *queue)
{
	struct list_head *q = &queue->tasks[queue->priority];
	struct rpc_task *task;

	if (!list_empty(q)) {
		task = list_first_entry(q, struct rpc_task, u.tk_wait.list);
		if (task->tk_owner == queue->owner)
			list_move_tail(&task->u.tk_wait.list, q);
	}
}

114 115
static void rpc_set_waitqueue_priority(struct rpc_wait_queue *queue, int priority)
{
116 117 118 119 120
	if (queue->priority != priority) {
		/* Fairness: rotate the list when changing priority */
		rpc_rotate_queue_owner(queue);
		queue->priority = priority;
	}
121 122 123 124 125 126 127 128 129 130 131 132 133 134
}

static void rpc_set_waitqueue_owner(struct rpc_wait_queue *queue, pid_t pid)
{
	queue->owner = pid;
	queue->nr = RPC_BATCH_COUNT;
}

static void rpc_reset_waitqueue_priority(struct rpc_wait_queue *queue)
{
	rpc_set_waitqueue_priority(queue, queue->maxpriority);
	rpc_set_waitqueue_owner(queue, 0);
}

L
Linus Torvalds 已提交
135 136 137
/*
 * Add new request to a priority queue.
 */
138 139 140
static void __rpc_add_wait_queue_priority(struct rpc_wait_queue *queue,
		struct rpc_task *task,
		unsigned char queue_priority)
L
Linus Torvalds 已提交
141 142 143 144 145
{
	struct list_head *q;
	struct rpc_task *t;

	INIT_LIST_HEAD(&task->u.tk_wait.links);
146
	if (unlikely(queue_priority > queue->maxpriority))
147 148 149 150
		queue_priority = queue->maxpriority;
	if (queue_priority > queue->priority)
		rpc_set_waitqueue_priority(queue, queue_priority);
	q = &queue->tasks[queue_priority];
L
Linus Torvalds 已提交
151
	list_for_each_entry(t, q, u.tk_wait.list) {
152
		if (t->tk_owner == task->tk_owner) {
L
Linus Torvalds 已提交
153 154 155 156 157 158 159 160 161 162 163 164 165 166 167
			list_add_tail(&task->u.tk_wait.list, &t->u.tk_wait.links);
			return;
		}
	}
	list_add_tail(&task->u.tk_wait.list, q);
}

/*
 * Add new request to wait queue.
 *
 * Swapper tasks always get inserted at the head of the queue.
 * This should avoid many nasty memory deadlocks and hopefully
 * improve overall performance.
 * Everyone else gets appended to the queue to ensure proper FIFO behavior.
 */
168 169 170
static void __rpc_add_wait_queue(struct rpc_wait_queue *queue,
		struct rpc_task *task,
		unsigned char queue_priority)
L
Linus Torvalds 已提交
171
{
172 173 174
	WARN_ON_ONCE(RPC_IS_QUEUED(task));
	if (RPC_IS_QUEUED(task))
		return;
L
Linus Torvalds 已提交
175 176

	if (RPC_IS_PRIORITY(queue))
177
		__rpc_add_wait_queue_priority(queue, task, queue_priority);
L
Linus Torvalds 已提交
178 179 180 181
	else if (RPC_IS_SWAPPER(task))
		list_add(&task->u.tk_wait.list, &queue->tasks[0]);
	else
		list_add_tail(&task->u.tk_wait.list, &queue->tasks[0]);
182
	task->tk_waitqueue = queue;
183
	queue->qlen++;
184 185
	/* barrier matches the read in rpc_wake_up_task_queue_locked() */
	smp_wmb();
L
Linus Torvalds 已提交
186 187
	rpc_set_queued(task);

188 189
	dprintk("RPC: %5u added to queue %p \"%s\"\n",
			task->tk_pid, queue, rpc_qname(queue));
L
Linus Torvalds 已提交
190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209
}

/*
 * Remove request from a priority queue.
 */
static void __rpc_remove_wait_queue_priority(struct rpc_task *task)
{
	struct rpc_task *t;

	if (!list_empty(&task->u.tk_wait.links)) {
		t = list_entry(task->u.tk_wait.links.next, struct rpc_task, u.tk_wait.list);
		list_move(&t->u.tk_wait.list, &task->u.tk_wait.list);
		list_splice_init(&task->u.tk_wait.links, &t->u.tk_wait.links);
	}
}

/*
 * Remove request from queue.
 * Note: must be called with spin lock held.
 */
210
static void __rpc_remove_wait_queue(struct rpc_wait_queue *queue, struct rpc_task *task)
L
Linus Torvalds 已提交
211
{
212
	__rpc_disable_timer(queue, task);
L
Linus Torvalds 已提交
213 214
	if (RPC_IS_PRIORITY(queue))
		__rpc_remove_wait_queue_priority(task);
215
	list_del(&task->u.tk_wait.list);
216
	queue->qlen--;
217 218
	dprintk("RPC: %5u removed from queue %p \"%s\"\n",
			task->tk_pid, queue, rpc_qname(queue));
L
Linus Torvalds 已提交
219 220
}

221
static void __rpc_init_priority_wait_queue(struct rpc_wait_queue *queue, const char *qname, unsigned char nr_queues)
L
Linus Torvalds 已提交
222 223 224 225 226 227
{
	int i;

	spin_lock_init(&queue->lock);
	for (i = 0; i < ARRAY_SIZE(queue->tasks); i++)
		INIT_LIST_HEAD(&queue->tasks[i]);
228
	queue->maxpriority = nr_queues - 1;
L
Linus Torvalds 已提交
229
	rpc_reset_waitqueue_priority(queue);
230
	queue->qlen = 0;
231
	timer_setup(&queue->timer_list.timer, __rpc_queue_timer_fn, 0);
232
	INIT_LIST_HEAD(&queue->timer_list.list);
233
	rpc_assign_waitqueue_name(queue, qname);
L
Linus Torvalds 已提交
234 235 236 237
}

void rpc_init_priority_wait_queue(struct rpc_wait_queue *queue, const char *qname)
{
238
	__rpc_init_priority_wait_queue(queue, qname, RPC_NR_PRIORITY);
L
Linus Torvalds 已提交
239
}
240
EXPORT_SYMBOL_GPL(rpc_init_priority_wait_queue);
L
Linus Torvalds 已提交
241 242 243

void rpc_init_wait_queue(struct rpc_wait_queue *queue, const char *qname)
{
244
	__rpc_init_priority_wait_queue(queue, qname, 1);
L
Linus Torvalds 已提交
245
}
246
EXPORT_SYMBOL_GPL(rpc_init_wait_queue);
L
Linus Torvalds 已提交
247

248 249
void rpc_destroy_wait_queue(struct rpc_wait_queue *queue)
{
250
	del_timer_sync(&queue->timer_list.timer);
251 252 253
}
EXPORT_SYMBOL_GPL(rpc_destroy_wait_queue);

254
static int rpc_wait_bit_killable(struct wait_bit_key *key, int mode)
255
{
256
	freezable_schedule_unsafe();
257 258
	if (signal_pending_state(mode, current))
		return -ERESTARTSYS;
259 260 261
	return 0;
}

J
Jeff Layton 已提交
262
#if IS_ENABLED(CONFIG_SUNRPC_DEBUG) || IS_ENABLED(CONFIG_TRACEPOINTS)
263 264 265 266 267 268 269 270 271 272 273 274
static void rpc_task_set_debuginfo(struct rpc_task *task)
{
	static atomic_t rpc_pid;

	task->tk_pid = atomic_inc_return(&rpc_pid);
}
#else
static inline void rpc_task_set_debuginfo(struct rpc_task *task)
{
}
#endif

275 276
static void rpc_set_active(struct rpc_task *task)
{
277
	rpc_task_set_debuginfo(task);
278
	set_bit(RPC_TASK_ACTIVE, &task->tk_runstate);
279
	trace_rpc_task_begin(task, NULL);
280 281
}

282 283
/*
 * Mark an RPC call as having completed by clearing the 'active' bit
284
 * and then waking up all tasks that were sleeping.
285
 */
286
static int rpc_complete_task(struct rpc_task *task)
287
{
288 289 290 291 292 293
	void *m = &task->tk_runstate;
	wait_queue_head_t *wq = bit_waitqueue(m, RPC_TASK_ACTIVE);
	struct wait_bit_key k = __WAIT_BIT_KEY_INITIALIZER(m, RPC_TASK_ACTIVE);
	unsigned long flags;
	int ret;

294
	trace_rpc_task_complete(task, NULL);
295

296
	spin_lock_irqsave(&wq->lock, flags);
297
	clear_bit(RPC_TASK_ACTIVE, &task->tk_runstate);
298 299
	ret = atomic_dec_and_test(&task->tk_count);
	if (waitqueue_active(wq))
300
		__wake_up_locked_key(wq, TASK_NORMAL, &k);
301 302
	spin_unlock_irqrestore(&wq->lock, flags);
	return ret;
303 304 305 306
}

/*
 * Allow callers to wait for completion of an RPC call
307 308 309 310
 *
 * Note the use of out_of_line_wait_on_bit() rather than wait_on_bit()
 * to enforce taking of the wq->lock and hence avoid races with
 * rpc_complete_task().
311
 */
312
int __rpc_wait_for_completion_task(struct rpc_task *task, wait_bit_action_f *action)
313 314
{
	if (action == NULL)
315
		action = rpc_wait_bit_killable;
316
	return out_of_line_wait_on_bit(&task->tk_runstate, RPC_TASK_ACTIVE,
317
			action, TASK_KILLABLE);
318
}
319
EXPORT_SYMBOL_GPL(__rpc_wait_for_completion_task);
320

L
Linus Torvalds 已提交
321 322 323
/*
 * Make an RPC task runnable.
 *
324 325 326
 * Note: If the task is ASYNC, and is being made runnable after sitting on an
 * rpc_wait_queue, this must be called with the queue spinlock held to protect
 * the wait queue operation.
327 328 329 330
 * Note the ordering of rpc_test_and_set_running() and rpc_clear_queued(),
 * which is needed to ensure that __rpc_execute() doesn't loop (due to the
 * lockless RPC_IS_QUEUED() test) before we've had a chance to test
 * the RPC_TASK_RUNNING flag.
L
Linus Torvalds 已提交
331
 */
332 333
static void rpc_make_runnable(struct workqueue_struct *wq,
		struct rpc_task *task)
L
Linus Torvalds 已提交
334
{
335 336
	bool need_wakeup = !rpc_test_and_set_running(task);

L
Linus Torvalds 已提交
337
	rpc_clear_queued(task);
338
	if (!need_wakeup)
339
		return;
L
Linus Torvalds 已提交
340
	if (RPC_IS_ASYNC(task)) {
341
		INIT_WORK(&task->u.tk_work, rpc_async_schedule);
342
		queue_work(wq, &task->u.tk_work);
L
Linus Torvalds 已提交
343
	} else
344
		wake_up_bit(&task->tk_runstate, RPC_TASK_QUEUED);
L
Linus Torvalds 已提交
345 346 347 348 349 350 351 352
}

/*
 * Prepare for sleeping on a wait queue.
 * By always appending tasks to the list we ensure FIFO behavior.
 * NB: An RPC task will only receive interrupt-driven events as long
 * as it's on a wait queue.
 */
353 354 355 356
static void __rpc_sleep_on_priority(struct rpc_wait_queue *q,
		struct rpc_task *task,
		rpc_action action,
		unsigned char queue_priority)
L
Linus Torvalds 已提交
357
{
358 359
	dprintk("RPC: %5u sleep_on(queue \"%s\" time %lu)\n",
			task->tk_pid, rpc_qname(q), jiffies);
L
Linus Torvalds 已提交
360

361
	trace_rpc_task_sleep(task, q);
362

363
	__rpc_add_wait_queue(q, task, queue_priority);
L
Linus Torvalds 已提交
364

365
	WARN_ON_ONCE(task->tk_callback != NULL);
L
Linus Torvalds 已提交
366
	task->tk_callback = action;
367
	__rpc_add_timer(q, task);
L
Linus Torvalds 已提交
368 369 370
}

void rpc_sleep_on(struct rpc_wait_queue *q, struct rpc_task *task,
371
				rpc_action action)
L
Linus Torvalds 已提交
372
{
373
	/* We shouldn't ever put an inactive task to sleep */
374 375 376 377 378 379
	WARN_ON_ONCE(!RPC_IS_ACTIVATED(task));
	if (!RPC_IS_ACTIVATED(task)) {
		task->tk_status = -EIO;
		rpc_put_task_async(task);
		return;
	}
380

L
Linus Torvalds 已提交
381 382 383 384
	/*
	 * Protect the queue operations.
	 */
	spin_lock_bh(&q->lock);
385
	__rpc_sleep_on_priority(q, task, action, task->tk_priority);
L
Linus Torvalds 已提交
386 387
	spin_unlock_bh(&q->lock);
}
388
EXPORT_SYMBOL_GPL(rpc_sleep_on);
L
Linus Torvalds 已提交
389

390 391 392 393
void rpc_sleep_on_priority(struct rpc_wait_queue *q, struct rpc_task *task,
		rpc_action action, int priority)
{
	/* We shouldn't ever put an inactive task to sleep */
394 395 396 397 398 399
	WARN_ON_ONCE(!RPC_IS_ACTIVATED(task));
	if (!RPC_IS_ACTIVATED(task)) {
		task->tk_status = -EIO;
		rpc_put_task_async(task);
		return;
	}
400 401 402 403 404 405 406 407

	/*
	 * Protect the queue operations.
	 */
	spin_lock_bh(&q->lock);
	__rpc_sleep_on_priority(q, task, action, priority - RPC_PRIORITY_LOW);
	spin_unlock_bh(&q->lock);
}
408
EXPORT_SYMBOL_GPL(rpc_sleep_on_priority);
409

L
Linus Torvalds 已提交
410
/**
411 412
 * __rpc_do_wake_up_task_on_wq - wake up a single rpc_task
 * @wq: workqueue on which to run task
413
 * @queue: wait queue
L
Linus Torvalds 已提交
414 415 416 417
 * @task: task to be woken up
 *
 * Caller must hold queue->lock, and have cleared the task queued flag.
 */
418 419 420
static void __rpc_do_wake_up_task_on_wq(struct workqueue_struct *wq,
		struct rpc_wait_queue *queue,
		struct rpc_task *task)
L
Linus Torvalds 已提交
421
{
422 423
	dprintk("RPC: %5u __rpc_wake_up_task (now %lu)\n",
			task->tk_pid, jiffies);
L
Linus Torvalds 已提交
424 425 426 427 428 429 430

	/* Has the task been executed yet? If not, we cannot wake it up! */
	if (!RPC_IS_ACTIVATED(task)) {
		printk(KERN_ERR "RPC: Inactive task (%p) being woken up!\n", task);
		return;
	}

431
	trace_rpc_task_wakeup(task, queue);
432

433
	__rpc_remove_wait_queue(queue, task);
L
Linus Torvalds 已提交
434

435
	rpc_make_runnable(wq, task);
L
Linus Torvalds 已提交
436

437
	dprintk("RPC:       __rpc_wake_up_task done\n");
L
Linus Torvalds 已提交
438 439 440
}

/*
441
 * Wake up a queued task while the queue lock is being held
L
Linus Torvalds 已提交
442
 */
443 444 445 446
static struct rpc_task *
rpc_wake_up_task_on_wq_queue_action_locked(struct workqueue_struct *wq,
		struct rpc_wait_queue *queue, struct rpc_task *task,
		bool (*action)(struct rpc_task *, void *), void *data)
L
Linus Torvalds 已提交
447
{
448 449
	if (RPC_IS_QUEUED(task)) {
		smp_rmb();
450 451 452 453 454 455
		if (task->tk_waitqueue == queue) {
			if (action == NULL || action(task, data)) {
				__rpc_do_wake_up_task_on_wq(wq, queue, task);
				return task;
			}
		}
456
	}
457 458 459 460 461 462 463 464
	return NULL;
}

static void
rpc_wake_up_task_on_wq_queue_locked(struct workqueue_struct *wq,
		struct rpc_wait_queue *queue, struct rpc_task *task)
{
	rpc_wake_up_task_on_wq_queue_action_locked(wq, queue, task, NULL, NULL);
L
Linus Torvalds 已提交
465 466
}

467 468 469 470 471 472 473 474
/*
 * Wake up a queued task while the queue lock is being held
 */
static void rpc_wake_up_task_queue_locked(struct rpc_wait_queue *queue, struct rpc_task *task)
{
	rpc_wake_up_task_on_wq_queue_locked(rpciod_workqueue, queue, task);
}

475 476 477 478 479 480 481
/*
 * Wake up a task on a specific queue
 */
void rpc_wake_up_queued_task_on_wq(struct workqueue_struct *wq,
		struct rpc_wait_queue *queue,
		struct rpc_task *task)
{
482 483
	if (!RPC_IS_QUEUED(task))
		return;
484 485 486 487 488
	spin_lock_bh(&queue->lock);
	rpc_wake_up_task_on_wq_queue_locked(wq, queue, task);
	spin_unlock_bh(&queue->lock);
}

L
Linus Torvalds 已提交
489
/*
490
 * Wake up a task on a specific queue
L
Linus Torvalds 已提交
491
 */
492
void rpc_wake_up_queued_task(struct rpc_wait_queue *queue, struct rpc_task *task)
L
Linus Torvalds 已提交
493
{
494 495
	if (!RPC_IS_QUEUED(task))
		return;
496
	spin_lock_bh(&queue->lock);
497
	rpc_wake_up_task_queue_locked(queue, task);
498
	spin_unlock_bh(&queue->lock);
L
Linus Torvalds 已提交
499
}
500 501
EXPORT_SYMBOL_GPL(rpc_wake_up_queued_task);

502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535
static bool rpc_task_action_set_status(struct rpc_task *task, void *status)
{
	task->tk_status = *(int *)status;
	return true;
}

static void
rpc_wake_up_task_queue_set_status_locked(struct rpc_wait_queue *queue,
		struct rpc_task *task, int status)
{
	rpc_wake_up_task_on_wq_queue_action_locked(rpciod_workqueue, queue,
			task, rpc_task_action_set_status, &status);
}

/**
 * rpc_wake_up_queued_task_set_status - wake up a task and set task->tk_status
 * @queue: pointer to rpc_wait_queue
 * @task: pointer to rpc_task
 * @status: integer error value
 *
 * If @task is queued on @queue, then it is woken up, and @task->tk_status is
 * set to the value of @status.
 */
void
rpc_wake_up_queued_task_set_status(struct rpc_wait_queue *queue,
		struct rpc_task *task, int status)
{
	if (!RPC_IS_QUEUED(task))
		return;
	spin_lock_bh(&queue->lock);
	rpc_wake_up_task_queue_set_status_locked(queue, task, status);
	spin_unlock_bh(&queue->lock);
}

L
Linus Torvalds 已提交
536 537 538
/*
 * Wake up the next task on a priority queue.
 */
539
static struct rpc_task *__rpc_find_next_queued_priority(struct rpc_wait_queue *queue)
L
Linus Torvalds 已提交
540 541 542 543 544
{
	struct list_head *q;
	struct rpc_task *task;

	/*
545
	 * Service a batch of tasks from a single owner.
L
Linus Torvalds 已提交
546 547 548 549
	 */
	q = &queue->tasks[queue->priority];
	if (!list_empty(q)) {
		task = list_entry(q->next, struct rpc_task, u.tk_wait.list);
550
		if (queue->owner == task->tk_owner) {
L
Linus Torvalds 已提交
551 552 553 554 555 556 557
			if (--queue->nr)
				goto out;
			list_move_tail(&task->u.tk_wait.list, q);
		}
		/*
		 * Check if we need to switch queues.
		 */
558
		goto new_owner;
L
Linus Torvalds 已提交
559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579
	}

	/*
	 * Service the next queue.
	 */
	do {
		if (q == &queue->tasks[0])
			q = &queue->tasks[queue->maxpriority];
		else
			q = q - 1;
		if (!list_empty(q)) {
			task = list_entry(q->next, struct rpc_task, u.tk_wait.list);
			goto new_queue;
		}
	} while (q != &queue->tasks[queue->priority]);

	rpc_reset_waitqueue_priority(queue);
	return NULL;

new_queue:
	rpc_set_waitqueue_priority(queue, (unsigned int)(q - &queue->tasks[0]));
580 581
new_owner:
	rpc_set_waitqueue_owner(queue, task->tk_owner);
L
Linus Torvalds 已提交
582 583 584 585
out:
	return task;
}

586 587 588 589 590 591 592 593 594
static struct rpc_task *__rpc_find_next_queued(struct rpc_wait_queue *queue)
{
	if (RPC_IS_PRIORITY(queue))
		return __rpc_find_next_queued_priority(queue);
	if (!list_empty(&queue->tasks[0]))
		return list_first_entry(&queue->tasks[0], struct rpc_task, u.tk_wait.list);
	return NULL;
}

L
Linus Torvalds 已提交
595
/*
596
 * Wake up the first task on the wait queue.
L
Linus Torvalds 已提交
597
 */
598 599
struct rpc_task *rpc_wake_up_first_on_wq(struct workqueue_struct *wq,
		struct rpc_wait_queue *queue,
600
		bool (*func)(struct rpc_task *, void *), void *data)
L
Linus Torvalds 已提交
601 602 603
{
	struct rpc_task	*task = NULL;

604
	dprintk("RPC:       wake_up_first(%p \"%s\")\n",
605
			queue, rpc_qname(queue));
606
	spin_lock_bh(&queue->lock);
607
	task = __rpc_find_next_queued(queue);
608 609 610
	if (task != NULL)
		task = rpc_wake_up_task_on_wq_queue_action_locked(wq, queue,
				task, func, data);
611
	spin_unlock_bh(&queue->lock);
L
Linus Torvalds 已提交
612 613 614

	return task;
}
615 616 617 618 619 620 621 622 623

/*
 * Wake up the first task on the wait queue.
 */
struct rpc_task *rpc_wake_up_first(struct rpc_wait_queue *queue,
		bool (*func)(struct rpc_task *, void *), void *data)
{
	return rpc_wake_up_first_on_wq(rpciod_workqueue, queue, func, data);
}
624 625 626 627 628 629 630 631 632 633 634 635 636 637
EXPORT_SYMBOL_GPL(rpc_wake_up_first);

static bool rpc_wake_up_next_func(struct rpc_task *task, void *data)
{
	return true;
}

/*
 * Wake up the next task on the wait queue.
*/
struct rpc_task *rpc_wake_up_next(struct rpc_wait_queue *queue)
{
	return rpc_wake_up_first(queue, rpc_wake_up_next_func, NULL);
}
638
EXPORT_SYMBOL_GPL(rpc_wake_up_next);
L
Linus Torvalds 已提交
639 640 641 642 643 644 645 646 647 648

/**
 * rpc_wake_up - wake up all rpc_tasks
 * @queue: rpc_wait_queue on which the tasks are sleeping
 *
 * Grabs queue->lock
 */
void rpc_wake_up(struct rpc_wait_queue *queue)
{
	struct list_head *head;
649

650
	spin_lock_bh(&queue->lock);
L
Linus Torvalds 已提交
651 652
	head = &queue->tasks[queue->maxpriority];
	for (;;) {
653 654 655 656 657
		while (!list_empty(head)) {
			struct rpc_task *task;
			task = list_first_entry(head,
					struct rpc_task,
					u.tk_wait.list);
658
			rpc_wake_up_task_queue_locked(queue, task);
659
		}
L
Linus Torvalds 已提交
660 661 662 663
		if (head == &queue->tasks[0])
			break;
		head--;
	}
664
	spin_unlock_bh(&queue->lock);
L
Linus Torvalds 已提交
665
}
666
EXPORT_SYMBOL_GPL(rpc_wake_up);
L
Linus Torvalds 已提交
667 668 669 670 671 672 673 674 675 676 677 678

/**
 * rpc_wake_up_status - wake up all rpc_tasks and set their status value.
 * @queue: rpc_wait_queue on which the tasks are sleeping
 * @status: status value to set
 *
 * Grabs queue->lock
 */
void rpc_wake_up_status(struct rpc_wait_queue *queue, int status)
{
	struct list_head *head;

679
	spin_lock_bh(&queue->lock);
L
Linus Torvalds 已提交
680 681
	head = &queue->tasks[queue->maxpriority];
	for (;;) {
682 683 684 685 686
		while (!list_empty(head)) {
			struct rpc_task *task;
			task = list_first_entry(head,
					struct rpc_task,
					u.tk_wait.list);
L
Linus Torvalds 已提交
687
			task->tk_status = status;
688
			rpc_wake_up_task_queue_locked(queue, task);
L
Linus Torvalds 已提交
689 690 691 692 693
		}
		if (head == &queue->tasks[0])
			break;
		head--;
	}
694
	spin_unlock_bh(&queue->lock);
L
Linus Torvalds 已提交
695
}
696
EXPORT_SYMBOL_GPL(rpc_wake_up_status);
L
Linus Torvalds 已提交
697

698
static void __rpc_queue_timer_fn(struct timer_list *t)
699
{
700
	struct rpc_wait_queue *queue = from_timer(queue, t, timer_list.timer);
701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721
	struct rpc_task *task, *n;
	unsigned long expires, now, timeo;

	spin_lock(&queue->lock);
	expires = now = jiffies;
	list_for_each_entry_safe(task, n, &queue->timer_list.list, u.tk_wait.timer_list) {
		timeo = task->u.tk_wait.expires;
		if (time_after_eq(now, timeo)) {
			dprintk("RPC: %5u timeout\n", task->tk_pid);
			task->tk_status = -ETIMEDOUT;
			rpc_wake_up_task_queue_locked(queue, task);
			continue;
		}
		if (expires == now || time_after(expires, timeo))
			expires = timeo;
	}
	if (!list_empty(&queue->timer_list.list))
		rpc_set_queue_timer(queue, expires);
	spin_unlock(&queue->lock);
}

722 723
static void __rpc_atrun(struct rpc_task *task)
{
724 725
	if (task->tk_status == -ETIMEDOUT)
		task->tk_status = 0;
726 727
}

L
Linus Torvalds 已提交
728 729 730
/*
 * Run a task at a later time
 */
731
void rpc_delay(struct rpc_task *task, unsigned long delay)
L
Linus Torvalds 已提交
732 733
{
	task->tk_timeout = delay;
734
	rpc_sleep_on(&delay_queue, task, __rpc_atrun);
L
Linus Torvalds 已提交
735
}
736
EXPORT_SYMBOL_GPL(rpc_delay);
L
Linus Torvalds 已提交
737

T
Trond Myklebust 已提交
738 739 740
/*
 * Helper to call task->tk_ops->rpc_call_prepare
 */
741
void rpc_prepare_task(struct rpc_task *task)
T
Trond Myklebust 已提交
742 743 744 745
{
	task->tk_ops->rpc_call_prepare(task, task->tk_calldata);
}

746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766
static void
rpc_init_task_statistics(struct rpc_task *task)
{
	/* Initialize retry counters */
	task->tk_garb_retry = 2;
	task->tk_cred_retry = 2;
	task->tk_rebind_retry = 2;

	/* starting timestamp */
	task->tk_start = ktime_get();
}

static void
rpc_reset_task_statistics(struct rpc_task *task)
{
	task->tk_timeouts = 0;
	task->tk_flags &= ~(RPC_CALL_MAJORSEEN|RPC_TASK_KILLED|RPC_TASK_SENT);

	rpc_init_task_statistics(task);
}

767
/*
768
 * Helper that calls task->tk_ops->rpc_call_done if it exists
769
 */
T
Trond Myklebust 已提交
770
void rpc_exit_task(struct rpc_task *task)
771
{
T
Trond Myklebust 已提交
772
	task->tk_action = NULL;
773 774
	if (task->tk_ops->rpc_call_done != NULL) {
		task->tk_ops->rpc_call_done(task, task->tk_calldata);
775
		if (task->tk_action != NULL) {
T
Trond Myklebust 已提交
776 777 778
			WARN_ON(RPC_ASSASSINATED(task));
			/* Always release the RPC slot and buffer memory */
			xprt_release(task);
779
			rpc_reset_task_statistics(task);
780 781 782
		}
	}
}
783 784 785 786 787 788 789 790 791

void rpc_exit(struct rpc_task *task, int status)
{
	task->tk_status = status;
	task->tk_action = rpc_exit_task;
	if (RPC_IS_QUEUED(task))
		rpc_wake_up_queued_task(task->tk_waitqueue, task);
}
EXPORT_SYMBOL_GPL(rpc_exit);
792

793 794
void rpc_release_calldata(const struct rpc_call_ops *ops, void *calldata)
{
795
	if (ops->rpc_release != NULL)
796 797 798
		ops->rpc_release(calldata);
}

L
Linus Torvalds 已提交
799 800 801
/*
 * This is the RPC `scheduler' (or rather, the finite state machine).
 */
T
Trond Myklebust 已提交
802
static void __rpc_execute(struct rpc_task *task)
L
Linus Torvalds 已提交
803
{
804 805 806
	struct rpc_wait_queue *queue;
	int task_is_async = RPC_IS_ASYNC(task);
	int status = 0;
L
Linus Torvalds 已提交
807

808 809
	dprintk("RPC: %5u __rpc_execute flags=0x%x\n",
			task->tk_pid, task->tk_flags);
L
Linus Torvalds 已提交
810

811 812 813
	WARN_ON_ONCE(RPC_IS_QUEUED(task));
	if (RPC_IS_QUEUED(task))
		return;
L
Linus Torvalds 已提交
814

815
	for (;;) {
816
		void (*do_action)(struct rpc_task *);
L
Linus Torvalds 已提交
817 818

		/*
819 820 821 822 823
		 * Perform the next FSM step or a pending callback.
		 *
		 * tk_action may be NULL if the task has been killed.
		 * In particular, note that rpc_killall_tasks may
		 * do this at any time, so beware when dereferencing.
L
Linus Torvalds 已提交
824
		 */
825 826 827 828
		do_action = task->tk_action;
		if (task->tk_callback) {
			do_action = task->tk_callback;
			task->tk_callback = NULL;
L
Linus Torvalds 已提交
829
		}
830 831
		if (!do_action)
			break;
832
		trace_rpc_task_run_action(task, do_action);
833
		do_action(task);
L
Linus Torvalds 已提交
834 835 836 837 838 839

		/*
		 * Lockless check for whether task is sleeping or not.
		 */
		if (!RPC_IS_QUEUED(task))
			continue;
840 841 842 843 844 845 846 847 848 849 850 851 852
		/*
		 * The queue->lock protects against races with
		 * rpc_make_runnable().
		 *
		 * Note that once we clear RPC_TASK_RUNNING on an asynchronous
		 * rpc_task, rpc_make_runnable() can assign it to a
		 * different workqueue. We therefore cannot assume that the
		 * rpc_task pointer may still be dereferenced.
		 */
		queue = task->tk_waitqueue;
		spin_lock_bh(&queue->lock);
		if (!RPC_IS_QUEUED(task)) {
			spin_unlock_bh(&queue->lock);
L
Linus Torvalds 已提交
853 854
			continue;
		}
855 856 857 858
		rpc_clear_running(task);
		spin_unlock_bh(&queue->lock);
		if (task_is_async)
			return;
L
Linus Torvalds 已提交
859 860

		/* sync task: sleep here */
861
		dprintk("RPC: %5u sync task going to sleep\n", task->tk_pid);
862
		status = out_of_line_wait_on_bit(&task->tk_runstate,
863 864
				RPC_TASK_QUEUED, rpc_wait_bit_killable,
				TASK_KILLABLE);
865
		if (status == -ERESTARTSYS) {
L
Linus Torvalds 已提交
866 867 868 869 870 871
			/*
			 * When a sync task receives a signal, it exits with
			 * -ERESTARTSYS. In order to catch any callbacks that
			 * clean up after sleeping on some queue, we don't
			 * break the loop here, but go around once more.
			 */
872
			dprintk("RPC: %5u got signal\n", task->tk_pid);
873 874
			task->tk_flags |= RPC_TASK_KILLED;
			rpc_exit(task, -ERESTARTSYS);
L
Linus Torvalds 已提交
875
		}
876
		dprintk("RPC: %5u sync task resuming\n", task->tk_pid);
L
Linus Torvalds 已提交
877 878
	}

879 880
	dprintk("RPC: %5u return %d, status %d\n", task->tk_pid, status,
			task->tk_status);
L
Linus Torvalds 已提交
881 882 883 884 885 886 887 888 889 890 891 892 893
	/* Release all resources associated with the task */
	rpc_release_task(task);
}

/*
 * User-visible entry point to the scheduler.
 *
 * This may be called recursively if e.g. an async NFS task updates
 * the attributes and finds that dirty pages must be flushed.
 * NOTE: Upon exit of this function the task is guaranteed to be
 *	 released. In particular note that tk_release() will have
 *	 been called, so your task memory may have been freed.
 */
T
Trond Myklebust 已提交
894
void rpc_execute(struct rpc_task *task)
L
Linus Torvalds 已提交
895
{
896 897
	bool is_async = RPC_IS_ASYNC(task);

898
	rpc_set_active(task);
899
	rpc_make_runnable(rpciod_workqueue, task);
900
	if (!is_async)
901
		__rpc_execute(task);
L
Linus Torvalds 已提交
902 903
}

904
static void rpc_async_schedule(struct work_struct *work)
L
Linus Torvalds 已提交
905
{
906
	__rpc_execute(container_of(work, struct rpc_task, u.tk_work));
L
Linus Torvalds 已提交
907 908
}

909
/**
910 911 912 913 914 915
 * rpc_malloc - allocate RPC buffer resources
 * @task: RPC task
 *
 * A single memory region is allocated, which is split between the
 * RPC call and RPC reply that this task is being used for. When
 * this RPC is retired, the memory is released by calling rpc_free.
L
Linus Torvalds 已提交
916
 *
917
 * To prevent rpciod from hanging, this allocator never sleeps,
918 919 920
 * returning -ENOMEM and suppressing warning if the request cannot
 * be serviced immediately. The caller can arrange to sleep in a
 * way that is safe for rpciod.
921 922 923 924 925
 *
 * Most requests are 'small' (under 2KiB) and can be serviced from a
 * mempool, ensuring that NFS reads and writes can always proceed,
 * and that there is good locality of reference for these buffers.
 *
L
Linus Torvalds 已提交
926
 * In order to avoid memory starvation triggering more writebacks of
927
 * NFS requests, we avoid using GFP_KERNEL.
L
Linus Torvalds 已提交
928
 */
929
int rpc_malloc(struct rpc_task *task)
L
Linus Torvalds 已提交
930
{
931 932
	struct rpc_rqst *rqst = task->tk_rqstp;
	size_t size = rqst->rq_callsize + rqst->rq_rcvsize;
933
	struct rpc_buffer *buf;
934
	gfp_t gfp = GFP_NOIO | __GFP_NOWARN;
M
Mel Gorman 已提交
935 936

	if (RPC_IS_SWAPPER(task))
937
		gfp = __GFP_MEMALLOC | GFP_NOWAIT | __GFP_NOWARN;
L
Linus Torvalds 已提交
938

939
	size += sizeof(struct rpc_buffer);
940 941
	if (size <= RPC_BUFFER_MAXSIZE)
		buf = mempool_alloc(rpc_buffer_mempool, gfp);
L
Linus Torvalds 已提交
942
	else
943
		buf = kmalloc(size, gfp);
944 945

	if (!buf)
946
		return -ENOMEM;
947

948
	buf->len = size;
G
Geert Uytterhoeven 已提交
949
	dprintk("RPC: %5u allocated buffer of size %zu at %p\n",
950
			task->tk_pid, size, buf);
951
	rqst->rq_buffer = buf->data;
952
	rqst->rq_rbuffer = (char *)rqst->rq_buffer + rqst->rq_callsize;
953
	return 0;
L
Linus Torvalds 已提交
954
}
955
EXPORT_SYMBOL_GPL(rpc_malloc);
L
Linus Torvalds 已提交
956

957
/**
958 959
 * rpc_free - free RPC buffer resources allocated via rpc_malloc
 * @task: RPC task
960 961
 *
 */
962
void rpc_free(struct rpc_task *task)
L
Linus Torvalds 已提交
963
{
964
	void *buffer = task->tk_rqstp->rq_buffer;
965 966
	size_t size;
	struct rpc_buffer *buf;
967

968 969
	buf = container_of(buffer, struct rpc_buffer, data);
	size = buf->len;
970

G
Geert Uytterhoeven 已提交
971
	dprintk("RPC:       freeing buffer of size %zu at %p\n",
972
			size, buf);
973

974 975 976 977
	if (size <= RPC_BUFFER_MAXSIZE)
		mempool_free(buf, rpc_buffer_mempool);
	else
		kfree(buf);
L
Linus Torvalds 已提交
978
}
979
EXPORT_SYMBOL_GPL(rpc_free);
L
Linus Torvalds 已提交
980 981 982 983

/*
 * Creation and deletion of RPC task structures
 */
984
static void rpc_init_task(struct rpc_task *task, const struct rpc_task_setup *task_setup_data)
L
Linus Torvalds 已提交
985 986
{
	memset(task, 0, sizeof(*task));
987
	atomic_set(&task->tk_count, 1);
988 989 990
	task->tk_flags  = task_setup_data->flags;
	task->tk_ops = task_setup_data->callback_ops;
	task->tk_calldata = task_setup_data->callback_data;
991
	INIT_LIST_HEAD(&task->tk_task);
L
Linus Torvalds 已提交
992

993 994
	task->tk_priority = task_setup_data->priority - RPC_PRIORITY_LOW;
	task->tk_owner = current->tgid;
L
Linus Torvalds 已提交
995 996

	/* Initialize workqueue for async tasks */
997
	task->tk_workqueue = task_setup_data->workqueue;
L
Linus Torvalds 已提交
998

999 1000
	task->tk_xprt = xprt_get(task_setup_data->rpc_xprt);

1001 1002
	if (task->tk_ops->rpc_call_prepare != NULL)
		task->tk_action = rpc_prepare_task;
1003

1004
	rpc_init_task_statistics(task);
1005

1006
	dprintk("RPC:       new task initialized, procpid %u\n",
1007
				task_pid_nr(current));
L
Linus Torvalds 已提交
1008 1009 1010 1011 1012
}

static struct rpc_task *
rpc_alloc_task(void)
{
M
Mel Gorman 已提交
1013
	return (struct rpc_task *)mempool_alloc(rpc_task_mempool, GFP_NOIO);
L
Linus Torvalds 已提交
1014 1015 1016
}

/*
1017
 * Create a new task for the specified client.
L
Linus Torvalds 已提交
1018
 */
1019
struct rpc_task *rpc_new_task(const struct rpc_task_setup *setup_data)
L
Linus Torvalds 已提交
1020
{
1021 1022 1023 1024 1025 1026 1027
	struct rpc_task	*task = setup_data->task;
	unsigned short flags = 0;

	if (task == NULL) {
		task = rpc_alloc_task();
		flags = RPC_TASK_DYNAMIC;
	}
L
Linus Torvalds 已提交
1028

1029
	rpc_init_task(task, setup_data);
1030
	task->tk_flags |= flags;
1031
	dprintk("RPC:       allocated task %p\n", task);
L
Linus Torvalds 已提交
1032 1033 1034
	return task;
}

1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053
/*
 * rpc_free_task - release rpc task and perform cleanups
 *
 * Note that we free up the rpc_task _after_ rpc_release_calldata()
 * in order to work around a workqueue dependency issue.
 *
 * Tejun Heo states:
 * "Workqueue currently considers two work items to be the same if they're
 * on the same address and won't execute them concurrently - ie. it
 * makes a work item which is queued again while being executed wait
 * for the previous execution to complete.
 *
 * If a work function frees the work item, and then waits for an event
 * which should be performed by another work item and *that* work item
 * recycles the freed work item, it can create a false dependency loop.
 * There really is no reliable way to detect this short of verifying
 * every memory free."
 *
 */
1054
static void rpc_free_task(struct rpc_task *task)
L
Linus Torvalds 已提交
1055
{
1056 1057 1058
	unsigned short tk_flags = task->tk_flags;

	rpc_release_calldata(task->tk_ops, task->tk_calldata);
L
Linus Torvalds 已提交
1059

1060
	if (tk_flags & RPC_TASK_DYNAMIC) {
1061 1062 1063
		dprintk("RPC: %5u freeing task\n", task->tk_pid);
		mempool_free(task, rpc_task_mempool);
	}
1064 1065 1066 1067 1068 1069 1070
}

static void rpc_async_release(struct work_struct *work)
{
	rpc_free_task(container_of(work, struct rpc_task, u.tk_work));
}

1071
static void rpc_release_resources_task(struct rpc_task *task)
1072
{
1073
	xprt_release(task);
1074
	if (task->tk_msg.rpc_cred) {
1075
		put_rpccred(task->tk_msg.rpc_cred);
1076 1077
		task->tk_msg.rpc_cred = NULL;
	}
1078
	rpc_task_release_client(task);
1079 1080 1081 1082 1083 1084
}

static void rpc_final_put_task(struct rpc_task *task,
		struct workqueue_struct *q)
{
	if (q != NULL) {
1085
		INIT_WORK(&task->u.tk_work, rpc_async_release);
1086
		queue_work(q, &task->u.tk_work);
1087 1088
	} else
		rpc_free_task(task);
1089
}
1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102

static void rpc_do_put_task(struct rpc_task *task, struct workqueue_struct *q)
{
	if (atomic_dec_and_test(&task->tk_count)) {
		rpc_release_resources_task(task);
		rpc_final_put_task(task, q);
	}
}

void rpc_put_task(struct rpc_task *task)
{
	rpc_do_put_task(task, NULL);
}
1103
EXPORT_SYMBOL_GPL(rpc_put_task);
1104

1105 1106 1107 1108 1109 1110
void rpc_put_task_async(struct rpc_task *task)
{
	rpc_do_put_task(task, task->tk_workqueue);
}
EXPORT_SYMBOL_GPL(rpc_put_task_async);

1111
static void rpc_release_task(struct rpc_task *task)
1112
{
1113
	dprintk("RPC: %5u release task\n", task->tk_pid);
L
Linus Torvalds 已提交
1114

1115
	WARN_ON_ONCE(RPC_IS_QUEUED(task));
L
Linus Torvalds 已提交
1116

1117
	rpc_release_resources_task(task);
1118

1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132
	/*
	 * Note: at this point we have been removed from rpc_clnt->cl_tasks,
	 * so it should be safe to use task->tk_count as a test for whether
	 * or not any other processes still hold references to our rpc_task.
	 */
	if (atomic_read(&task->tk_count) != 1 + !RPC_IS_ASYNC(task)) {
		/* Wake up anyone who may be waiting for task completion */
		if (!rpc_complete_task(task))
			return;
	} else {
		if (!atomic_dec_and_test(&task->tk_count))
			return;
	}
	rpc_final_put_task(task, task->tk_workqueue);
L
Linus Torvalds 已提交
1133 1134
}

1135 1136 1137 1138 1139 1140 1141 1142 1143 1144
int rpciod_up(void)
{
	return try_module_get(THIS_MODULE) ? 0 : -EINVAL;
}

void rpciod_down(void)
{
	module_put(THIS_MODULE);
}

L
Linus Torvalds 已提交
1145
/*
1146
 * Start up the rpciod workqueue.
L
Linus Torvalds 已提交
1147
 */
1148
static int rpciod_start(void)
L
Linus Torvalds 已提交
1149 1150
{
	struct workqueue_struct *wq;
T
Trond Myklebust 已提交
1151

L
Linus Torvalds 已提交
1152 1153 1154
	/*
	 * Create the rpciod thread and wait for it to start.
	 */
T
Trond Myklebust 已提交
1155
	dprintk("RPC:       creating workqueue rpciod\n");
1156
	wq = alloc_workqueue("rpciod", WQ_MEM_RECLAIM | WQ_UNBOUND, 0);
1157 1158
	if (!wq)
		goto out_failed;
L
Linus Torvalds 已提交
1159
	rpciod_workqueue = wq;
1160
	/* Note: highpri because network receive is latency sensitive */
1161
	wq = alloc_workqueue("xprtiod", WQ_UNBOUND|WQ_MEM_RECLAIM|WQ_HIGHPRI, 0);
1162 1163 1164 1165 1166 1167 1168 1169 1170 1171
	if (!wq)
		goto free_rpciod;
	xprtiod_workqueue = wq;
	return 1;
free_rpciod:
	wq = rpciod_workqueue;
	rpciod_workqueue = NULL;
	destroy_workqueue(wq);
out_failed:
	return 0;
L
Linus Torvalds 已提交
1172 1173
}

1174
static void rpciod_stop(void)
L
Linus Torvalds 已提交
1175
{
1176
	struct workqueue_struct *wq = NULL;
T
Trond Myklebust 已提交
1177

1178 1179
	if (rpciod_workqueue == NULL)
		return;
T
Trond Myklebust 已提交
1180
	dprintk("RPC:       destroying workqueue rpciod\n");
L
Linus Torvalds 已提交
1181

1182 1183 1184
	wq = rpciod_workqueue;
	rpciod_workqueue = NULL;
	destroy_workqueue(wq);
1185 1186 1187
	wq = xprtiod_workqueue;
	xprtiod_workqueue = NULL;
	destroy_workqueue(wq);
L
Linus Torvalds 已提交
1188 1189 1190 1191 1192
}

void
rpc_destroy_mempool(void)
{
1193
	rpciod_stop();
1194 1195 1196 1197
	mempool_destroy(rpc_buffer_mempool);
	mempool_destroy(rpc_task_mempool);
	kmem_cache_destroy(rpc_task_slabp);
	kmem_cache_destroy(rpc_buffer_slabp);
1198
	rpc_destroy_wait_queue(&delay_queue);
L
Linus Torvalds 已提交
1199 1200 1201 1202 1203
}

int
rpc_init_mempool(void)
{
1204 1205 1206 1207 1208 1209 1210 1211
	/*
	 * The following is not strictly a mempool initialisation,
	 * but there is no harm in doing it here
	 */
	rpc_init_wait_queue(&delay_queue, "delayq");
	if (!rpciod_start())
		goto err_nomem;

L
Linus Torvalds 已提交
1212 1213 1214
	rpc_task_slabp = kmem_cache_create("rpc_tasks",
					     sizeof(struct rpc_task),
					     0, SLAB_HWCACHE_ALIGN,
1215
					     NULL);
L
Linus Torvalds 已提交
1216 1217 1218 1219 1220
	if (!rpc_task_slabp)
		goto err_nomem;
	rpc_buffer_slabp = kmem_cache_create("rpc_buffers",
					     RPC_BUFFER_MAXSIZE,
					     0, SLAB_HWCACHE_ALIGN,
1221
					     NULL);
L
Linus Torvalds 已提交
1222 1223
	if (!rpc_buffer_slabp)
		goto err_nomem;
1224 1225
	rpc_task_mempool = mempool_create_slab_pool(RPC_TASK_POOLSIZE,
						    rpc_task_slabp);
L
Linus Torvalds 已提交
1226 1227
	if (!rpc_task_mempool)
		goto err_nomem;
1228 1229
	rpc_buffer_mempool = mempool_create_slab_pool(RPC_BUFFER_POOLSIZE,
						      rpc_buffer_slabp);
L
Linus Torvalds 已提交
1230 1231 1232 1233 1234 1235 1236
	if (!rpc_buffer_mempool)
		goto err_nomem;
	return 0;
err_nomem:
	rpc_destroy_mempool();
	return -ENOMEM;
}