rxrpc.c 20.8 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11
/* Maintain an RxRPC server socket to do AFS communications through
 *
 * Copyright (C) 2007 Red Hat, Inc. All Rights Reserved.
 * Written by David Howells (dhowells@redhat.com)
 *
 * This program is free software; you can redistribute it and/or
 * modify it under the terms of the GNU General Public License
 * as published by the Free Software Foundation; either version
 * 2 of the License, or (at your option) any later version.
 */

12
#include <linux/slab.h>
13 14
#include <linux/sched/signal.h>

15 16 17 18 19
#include <net/sock.h>
#include <net/af_rxrpc.h>
#include "internal.h"
#include "afs_cm.h"

20
struct workqueue_struct *afs_async_calls;
21

22
static void afs_wake_up_call_waiter(struct sock *, struct rxrpc_call *, unsigned long);
23
static int afs_wait_for_call_to_complete(struct afs_call *);
24 25
static void afs_wake_up_async_call(struct sock *, struct rxrpc_call *, unsigned long);
static void afs_process_async_call(struct work_struct *);
26 27
static void afs_rx_new_call(struct sock *, struct rxrpc_call *, unsigned long);
static void afs_rx_discard_new_call(struct rxrpc_call *, unsigned long);
28
static int afs_deliver_cm_op_id(struct afs_call *);
29 30 31

/* asynchronous incoming call initial processing */
static const struct afs_call_type afs_RXCMxxxx = {
D
David Howells 已提交
32
	.name		= "CB.xxxx",
33 34 35 36 37 38 39
	.deliver	= afs_deliver_cm_op_id,
};

/*
 * open an RxRPC socket and bind it to be a server for callback notifications
 * - the socket is left in blocking mode and non-blocking ops use MSG_DONTWAIT
 */
40
int afs_open_socket(struct afs_net *net)
41 42 43 44 45 46 47
{
	struct sockaddr_rxrpc srx;
	struct socket *socket;
	int ret;

	_enter("");

48
	ret = sock_create_kern(&init_net, AF_RXRPC, SOCK_DGRAM, PF_INET6, &socket);
49 50
	if (ret < 0)
		goto error_1;
51 52 53 54

	socket->sk->sk_allocation = GFP_NOFS;

	/* bind the callback manager's address to make this a server socket */
55
	memset(&srx, 0, sizeof(srx));
56 57 58
	srx.srx_family			= AF_RXRPC;
	srx.srx_service			= CM_SERVICE;
	srx.transport_type		= SOCK_DGRAM;
59 60 61
	srx.transport_len		= sizeof(srx.transport.sin6);
	srx.transport.sin6.sin6_family	= AF_INET6;
	srx.transport.sin6.sin6_port	= htons(AFS_CM_PORT);
62 63

	ret = kernel_bind(socket, (struct sockaddr *) &srx, sizeof(srx));
64 65 66
	if (ret < 0)
		goto error_2;

67 68
	rxrpc_kernel_new_call_notification(socket, afs_rx_new_call,
					   afs_rx_discard_new_call);
69

70 71 72
	ret = kernel_listen(socket, INT_MAX);
	if (ret < 0)
		goto error_2;
73

74 75
	net->socket = socket;
	afs_charge_preallocation(&net->charge_preallocation_work);
76 77
	_leave(" = 0");
	return 0;
78 79 80 81 82 83

error_2:
	sock_release(socket);
error_1:
	_leave(" = %d", ret);
	return ret;
84 85 86 87 88
}

/*
 * close the RxRPC socket AFS was using
 */
89
void afs_close_socket(struct afs_net *net)
90 91 92
{
	_enter("");

93
	kernel_listen(net->socket, 0);
94 95
	flush_workqueue(afs_async_calls);

96 97 98
	if (net->spare_incoming_call) {
		afs_put_call(net->spare_incoming_call);
		net->spare_incoming_call = NULL;
99 100
	}

101 102
	_debug("outstanding %u", atomic_read(&net->nr_outstanding_calls));
	wait_on_atomic_t(&net->nr_outstanding_calls, atomic_t_wait,
103 104 105
			 TASK_UNINTERRUPTIBLE);
	_debug("no outstanding calls");

106
	kernel_sock_shutdown(net->socket, SHUT_RDWR);
107
	flush_workqueue(afs_async_calls);
108
	sock_release(net->socket);
109 110 111 112 113

	_debug("dework");
	_leave("");
}

D
David Howells 已提交
114
/*
115
 * Allocate a call.
D
David Howells 已提交
116
 */
117 118
static struct afs_call *afs_alloc_call(struct afs_net *net,
				       const struct afs_call_type *type,
119
				       gfp_t gfp)
D
David Howells 已提交
120
{
121 122
	struct afs_call *call;
	int o;
D
David Howells 已提交
123

124 125 126
	call = kzalloc(sizeof(*call), gfp);
	if (!call)
		return NULL;
D
David Howells 已提交
127

128
	call->type = type;
129
	call->net = net;
130 131 132
	atomic_set(&call->usage, 1);
	INIT_WORK(&call->async_work, afs_process_async_call);
	init_waitqueue_head(&call->waitq);
133

134
	o = atomic_inc_return(&net->nr_outstanding_calls);
135 136 137
	trace_afs_call(call, afs_call_trace_alloc, 1, o,
		       __builtin_return_address(0));
	return call;
D
David Howells 已提交
138 139
}

140
/*
141
 * Dispose of a reference on a call.
142
 */
143
void afs_put_call(struct afs_call *call)
144
{
145
	struct afs_net *net = call->net;
146
	int n = atomic_dec_return(&call->usage);
147
	int o = atomic_read(&net->nr_outstanding_calls);
148 149 150 151 152 153 154 155 156 157

	trace_afs_call(call, afs_call_trace_put, n + 1, o,
		       __builtin_return_address(0));

	ASSERTCMP(n, >=, 0);
	if (n == 0) {
		ASSERT(!work_pending(&call->async_work));
		ASSERT(call->type->name != NULL);

		if (call->rxcall) {
158
			rxrpc_kernel_end_call(net->socket, call->rxcall);
159 160 161 162 163 164 165 166
			call->rxcall = NULL;
		}
		if (call->type->destructor)
			call->type->destructor(call);

		kfree(call->request);
		kfree(call);

167
		o = atomic_dec_return(&net->nr_outstanding_calls);
168 169 170
		trace_afs_call(call, afs_call_trace_free, 0, o,
			       __builtin_return_address(0));
		if (o == 0)
171
			wake_up_atomic_t(&net->nr_outstanding_calls);
172
	}
173 174 175
}

/*
176
 * Queue the call for actual work.  Returns 0 unconditionally for convenience.
177
 */
178
int afs_queue_call_work(struct afs_call *call)
179
{
180 181 182
	int u = atomic_inc_return(&call->usage);

	trace_afs_call(call, afs_call_trace_work, u,
183
		       atomic_read(&call->net->nr_outstanding_calls),
184 185 186 187 188 189 190
		       __builtin_return_address(0));

	INIT_WORK(&call->work, call->type->work);

	if (!queue_work(afs_wq, &call->work))
		afs_put_call(call);
	return 0;
191 192
}

193 194 195
/*
 * allocate a call with flat request and reply buffers
 */
196 197
struct afs_call *afs_alloc_flat_call(struct afs_net *net,
				     const struct afs_call_type *type,
198
				     size_t request_size, size_t reply_max)
199 200 201
{
	struct afs_call *call;

202
	call = afs_alloc_call(net, type, GFP_NOFS);
203 204 205 206
	if (!call)
		goto nomem_call;

	if (request_size) {
207
		call->request_size = request_size;
208 209
		call->request = kmalloc(request_size, GFP_NOFS);
		if (!call->request)
D
David Howells 已提交
210
			goto nomem_free;
211 212
	}

213
	if (reply_max) {
214
		call->reply_max = reply_max;
215
		call->buffer = kmalloc(reply_max, GFP_NOFS);
216
		if (!call->buffer)
D
David Howells 已提交
217
			goto nomem_free;
218 219 220 221 222
	}

	init_waitqueue_head(&call->waitq);
	return call;

D
David Howells 已提交
223
nomem_free:
224
	afs_put_call(call);
225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241
nomem_call:
	return NULL;
}

/*
 * clean up a call with flat buffer
 */
void afs_flat_call_destructor(struct afs_call *call)
{
	_enter("");

	kfree(call->request);
	call->request = NULL;
	kfree(call->buffer);
	call->buffer = NULL;
}

242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274
#define AFS_BVEC_MAX 8

/*
 * Load the given bvec with the next few pages.
 */
static void afs_load_bvec(struct afs_call *call, struct msghdr *msg,
			  struct bio_vec *bv, pgoff_t first, pgoff_t last,
			  unsigned offset)
{
	struct page *pages[AFS_BVEC_MAX];
	unsigned int nr, n, i, to, bytes = 0;

	nr = min_t(pgoff_t, last - first + 1, AFS_BVEC_MAX);
	n = find_get_pages_contig(call->mapping, first, nr, pages);
	ASSERTCMP(n, ==, nr);

	msg->msg_flags |= MSG_MORE;
	for (i = 0; i < nr; i++) {
		to = PAGE_SIZE;
		if (first + i >= last) {
			to = call->last_to;
			msg->msg_flags &= ~MSG_MORE;
		}
		bv[i].bv_page = pages[i];
		bv[i].bv_len = to - offset;
		bv[i].bv_offset = offset;
		bytes += to - offset;
		offset = 0;
	}

	iov_iter_bvec(&msg->msg_iter, WRITE | ITER_BVEC, bv, nr, bytes);
}

275 276 277 278 279 280 281 282 283 284 285 286 287
/*
 * Advance the AFS call state when the RxRPC call ends the transmit phase.
 */
static void afs_notify_end_request_tx(struct sock *sock,
				      struct rxrpc_call *rxcall,
				      unsigned long call_user_ID)
{
	struct afs_call *call = (struct afs_call *)call_user_ID;

	if (call->state == AFS_CALL_REQUESTING)
		call->state = AFS_CALL_AWAIT_REPLY;
}

288 289 290
/*
 * attach the data from a bunch of pages on an inode to a call
 */
A
Al Viro 已提交
291
static int afs_send_pages(struct afs_call *call, struct msghdr *msg)
292
{
293 294
	struct bio_vec bv[AFS_BVEC_MAX];
	unsigned int bytes, nr, loop, offset;
295 296 297 298 299 300 301
	pgoff_t first = call->first, last = call->last;
	int ret;

	offset = call->first_offset;
	call->first_offset = 0;

	do {
302 303 304 305 306
		afs_load_bvec(call, msg, bv, first, last, offset);
		offset = 0;
		bytes = msg->msg_iter.count;
		nr = msg->msg_iter.nr_segs;

307
		ret = rxrpc_kernel_send_data(call->net->socket, call->rxcall, msg,
308
					     bytes, afs_notify_end_request_tx);
309 310
		for (loop = 0; loop < nr; loop++)
			put_page(bv[loop].bv_page);
311 312
		if (ret < 0)
			break;
313 314

		first += nr;
D
David Howells 已提交
315
	} while (first <= last);
316 317 318 319

	return ret;
}

320 321 322
/*
 * initiate a call
 */
323 324
int afs_make_call(struct sockaddr_rxrpc *srx, struct afs_call *call,
		  gfp_t gfp, bool async)
325 326 327 328
{
	struct rxrpc_call *rxcall;
	struct msghdr msg;
	struct kvec iov[1];
329
	size_t offset;
330
	s64 tx_total_len;
331
	u32 abort_code;
332 333
	int ret;

334
	_enter(",{%pISp},", &srx->transport);
335

D
David Howells 已提交
336 337 338
	ASSERT(call->type != NULL);
	ASSERT(call->type->name != NULL);

339 340
	_debug("____MAKE %p{%s,%x} [%d]____",
	       call, call->type->name, key_serial(call->key),
341
	       atomic_read(&call->net->nr_outstanding_calls));
D
David Howells 已提交
342

D
David Howells 已提交
343
	call->async = async;
344

345 346 347 348 349 350 351 352 353 354
	/* Work out the length we're going to transmit.  This is awkward for
	 * calls such as FS.StoreData where there's an extra injection of data
	 * after the initial fixed part.
	 */
	tx_total_len = call->request_size;
	if (call->send_pages) {
		tx_total_len += call->last_to - call->first_offset;
		tx_total_len += (call->last - call->first) * PAGE_SIZE;
	}

355
	/* create a call */
356
	rxcall = rxrpc_kernel_begin_call(call->net->socket, srx, call->key,
357 358
					 (unsigned long)call,
					 tx_total_len, gfp,
D
David Howells 已提交
359 360
					 (async ?
					  afs_wake_up_async_call :
361 362
					  afs_wake_up_call_waiter),
					 call->upgrade);
D
David Howells 已提交
363
	call->key = NULL;
364 365 366 367 368 369 370 371 372 373 374 375 376
	if (IS_ERR(rxcall)) {
		ret = PTR_ERR(rxcall);
		goto error_kill_call;
	}

	call->rxcall = rxcall;

	/* send the request */
	iov[0].iov_base	= call->request;
	iov[0].iov_len	= call->request_size;

	msg.msg_name		= NULL;
	msg.msg_namelen		= 0;
377
	iov_iter_kvec(&msg.msg_iter, WRITE | ITER_KVEC, iov, 1,
A
Al Viro 已提交
378
		      call->request_size);
379 380
	msg.msg_control		= NULL;
	msg.msg_controllen	= 0;
381
	msg.msg_flags		= MSG_WAITALL | (call->send_pages ? MSG_MORE : 0);
382

383 384 385 386 387
	/* We have to change the state *before* sending the last packet as
	 * rxrpc might give us the reply before it returns from sending the
	 * request.  Further, if the send fails, we may already have been given
	 * a notification and may have collected it.
	 */
388 389
	if (!call->send_pages)
		call->state = AFS_CALL_AWAIT_REPLY;
390
	ret = rxrpc_kernel_send_data(call->net->socket, rxcall,
391 392
				     &msg, call->request_size,
				     afs_notify_end_request_tx);
393 394 395
	if (ret < 0)
		goto error_do_abort;

396
	if (call->send_pages) {
A
Al Viro 已提交
397
		ret = afs_send_pages(call, &msg);
398 399 400 401
		if (ret < 0)
			goto error_do_abort;
	}

402 403
	/* at this point, an async call may no longer exist as it may have
	 * already completed */
D
David Howells 已提交
404 405 406 407
	if (call->async)
		return -EINPROGRESS;

	return afs_wait_for_call_to_complete(call);
408 409

error_do_abort:
410 411
	call->state = AFS_CALL_COMPLETE;
	if (ret != -ECONNABORTED) {
412 413
		rxrpc_kernel_abort_call(call->net->socket, rxcall,
					RX_USER_ABORT, ret, "KSD");
414 415 416
	} else {
		abort_code = 0;
		offset = 0;
417 418 419
		rxrpc_kernel_recv_data(call->net->socket, rxcall, NULL,
				       0, &offset, false, &abort_code,
				       &call->service_id);
420
		ret = afs_abort_to_error(abort_code);
421
	}
422
error_kill_call:
423
	afs_put_call(call);
424 425 426 427 428 429 430 431 432 433 434 435
	_leave(" = %d", ret);
	return ret;
}

/*
 * deliver messages to a call
 */
static void afs_deliver_to_call(struct afs_call *call)
{
	u32 abort_code;
	int ret;

436 437 438 439 440 441 442 443 444
	_enter("%s", call->type->name);

	while (call->state == AFS_CALL_AWAIT_REPLY ||
	       call->state == AFS_CALL_AWAIT_OP_ID ||
	       call->state == AFS_CALL_AWAIT_REQUEST ||
	       call->state == AFS_CALL_AWAIT_ACK
	       ) {
		if (call->state == AFS_CALL_AWAIT_ACK) {
			size_t offset = 0;
445 446
			ret = rxrpc_kernel_recv_data(call->net->socket,
						     call->rxcall,
447
						     NULL, 0, &offset, false,
448 449
						     &call->abort_code,
						     &call->service_id);
D
David Howells 已提交
450 451
			trace_afs_recv_data(call, 0, offset, false, ret);

452 453
			if (ret == -EINPROGRESS || ret == -EAGAIN)
				return;
454
			if (ret == 1 || ret < 0) {
455 456
				call->state = AFS_CALL_COMPLETE;
				goto done;
457
			}
458
			return;
459 460
		}

461 462 463 464 465 466 467 468 469
		ret = call->type->deliver(call);
		switch (ret) {
		case 0:
			if (call->state == AFS_CALL_AWAIT_REPLY)
				call->state = AFS_CALL_COMPLETE;
			goto done;
		case -EINPROGRESS:
		case -EAGAIN:
			goto out;
470 471
		case -ECONNABORTED:
			goto call_complete;
472 473
		case -ENOTCONN:
			abort_code = RX_CALL_DEAD;
474
			rxrpc_kernel_abort_call(call->net->socket, call->rxcall,
475
						abort_code, ret, "KNC");
476
			goto save_error;
477
		case -ENOTSUPP:
478
			abort_code = RXGEN_OPCODE;
479
			rxrpc_kernel_abort_call(call->net->socket, call->rxcall,
480
						abort_code, ret, "KIV");
481
			goto save_error;
482 483 484 485 486 487 488
		case -ENODATA:
		case -EBADMSG:
		case -EMSGSIZE:
		default:
			abort_code = RXGEN_CC_UNMARSHAL;
			if (call->state != AFS_CALL_AWAIT_REPLY)
				abort_code = RXGEN_SS_UNMARSHAL;
489
			rxrpc_kernel_abort_call(call->net->socket, call->rxcall,
490
						abort_code, -EBADMSG, "KUM");
491
			goto save_error;
492
		}
493 494
	}

495 496
done:
	if (call->state == AFS_CALL_COMPLETE && call->incoming)
497
		afs_put_call(call);
498
out:
499
	_leave("");
500 501
	return;

502
save_error:
503
	call->error = ret;
504
call_complete:
505 506
	call->state = AFS_CALL_COMPLETE;
	goto done;
507 508 509 510 511 512 513
}

/*
 * wait synchronously for a call to complete
 */
static int afs_wait_for_call_to_complete(struct afs_call *call)
{
514
	signed long rtt2, timeout;
515
	int ret;
516 517
	u64 rtt;
	u32 life, last_life;
518 519 520 521 522

	DECLARE_WAITQUEUE(myself, current);

	_enter("");

523
	rtt = rxrpc_kernel_get_rtt(call->net->socket, call->rxcall);
524 525 526 527 528
	rtt2 = nsecs_to_jiffies64(rtt) * 2;
	if (rtt2 < 2)
		rtt2 = 2;

	timeout = rtt2;
529
	last_life = rxrpc_kernel_check_life(call->net->socket, call->rxcall);
530

531 532
	add_wait_queue(&call->waitq, &myself);
	for (;;) {
533
		set_current_state(TASK_UNINTERRUPTIBLE);
534 535

		/* deliver any messages that are in the queue */
536 537
		if (call->state < AFS_CALL_COMPLETE && call->need_attention) {
			call->need_attention = false;
538 539 540 541 542
			__set_current_state(TASK_RUNNING);
			afs_deliver_to_call(call);
			continue;
		}

543
		if (call->state == AFS_CALL_COMPLETE)
544
			break;
545

546
		life = rxrpc_kernel_check_life(call->net->socket, call->rxcall);
547 548 549 550 551 552 553 554 555 556
		if (timeout == 0 &&
		    life == last_life && signal_pending(current))
				break;

		if (life != last_life) {
			timeout = rtt2;
			last_life = life;
		}

		timeout = schedule_timeout(timeout);
557 558 559 560 561
	}

	remove_wait_queue(&call->waitq, &myself);
	__set_current_state(TASK_RUNNING);

562
	/* Kill off the call if it's still live. */
563
	if (call->state < AFS_CALL_COMPLETE) {
564
		_debug("call interrupted");
565
		rxrpc_kernel_abort_call(call->net->socket, call->rxcall,
566
					RX_USER_ABORT, -EINTR, "KWI");
567 568
	}

569
	ret = call->error;
570
	_debug("call complete");
571
	afs_put_call(call);
572 573 574 575 576 577 578
	_leave(" = %d", ret);
	return ret;
}

/*
 * wake up a waiting call
 */
579 580
static void afs_wake_up_call_waiter(struct sock *sk, struct rxrpc_call *rxcall,
				    unsigned long call_user_ID)
581
{
582 583 584
	struct afs_call *call = (struct afs_call *)call_user_ID;

	call->need_attention = true;
585 586 587 588 589 590
	wake_up(&call->waitq);
}

/*
 * wake up an asynchronous call
 */
591 592
static void afs_wake_up_async_call(struct sock *sk, struct rxrpc_call *rxcall,
				   unsigned long call_user_ID)
593
{
594
	struct afs_call *call = (struct afs_call *)call_user_ID;
595
	int u;
596

D
David Howells 已提交
597
	trace_afs_notify_call(rxcall, call);
598
	call->need_attention = true;
599 600 601 602

	u = __atomic_add_unless(&call->usage, 1, 0);
	if (u != 0) {
		trace_afs_call(call, afs_call_trace_wake, u,
603
			       atomic_read(&call->net->nr_outstanding_calls),
604 605 606 607 608
			       __builtin_return_address(0));

		if (!queue_work(afs_async_calls, &call->async_work))
			afs_put_call(call);
	}
609 610 611
}

/*
612 613
 * Delete an asynchronous call.  The work item carries a ref to the call struct
 * that we need to release.
614
 */
615
static void afs_delete_async_call(struct work_struct *work)
616
{
617 618
	struct afs_call *call = container_of(work, struct afs_call, async_work);

619 620
	_enter("");

621
	afs_put_call(call);
622 623 624 625 626

	_leave("");
}

/*
627 628
 * Perform I/O processing on an asynchronous call.  The work item carries a ref
 * to the call struct that we either need to release or to pass on.
629
 */
630
static void afs_process_async_call(struct work_struct *work)
631
{
632 633
	struct afs_call *call = container_of(work, struct afs_call, async_work);

634 635
	_enter("");

636 637
	if (call->state < AFS_CALL_COMPLETE && call->need_attention) {
		call->need_attention = false;
638
		afs_deliver_to_call(call);
639
	}
640

D
David Howells 已提交
641
	if (call->state == AFS_CALL_COMPLETE) {
642 643
		call->reply = NULL;

644 645 646 647
		/* We have two refs to release - one from the alloc and one
		 * queued with the work item - and we can't just deallocate the
		 * call because the work item may be queued again.
		 */
648
		call->async_work.func = afs_delete_async_call;
649 650
		if (!queue_work(afs_async_calls, &call->async_work))
			afs_put_call(call);
651 652
	}

653
	afs_put_call(call);
654 655 656
	_leave("");
}

657 658 659 660 661 662 663 664 665 666
static void afs_rx_attach(struct rxrpc_call *rxcall, unsigned long user_call_ID)
{
	struct afs_call *call = (struct afs_call *)user_call_ID;

	call->rxcall = rxcall;
}

/*
 * Charge the incoming call preallocation.
 */
667
void afs_charge_preallocation(struct work_struct *work)
668
{
669 670 671
	struct afs_net *net =
		container_of(work, struct afs_net, charge_preallocation_work);
	struct afs_call *call = net->spare_incoming_call;
672 673 674

	for (;;) {
		if (!call) {
675
			call = afs_alloc_call(net, &afs_RXCMxxxx, GFP_KERNEL);
676 677 678
			if (!call)
				break;

D
David Howells 已提交
679
			call->async = true;
680
			call->state = AFS_CALL_AWAIT_OP_ID;
D
David Howells 已提交
681
			init_waitqueue_head(&call->waitq);
682 683
		}

684
		if (rxrpc_kernel_charge_accept(net->socket,
685 686 687 688 689 690 691
					       afs_wake_up_async_call,
					       afs_rx_attach,
					       (unsigned long)call,
					       GFP_KERNEL) < 0)
			break;
		call = NULL;
	}
692
	net->spare_incoming_call = call;
693 694 695 696 697 698 699 700 701 702 703
}

/*
 * Discard a preallocated call when a socket is shut down.
 */
static void afs_rx_discard_new_call(struct rxrpc_call *rxcall,
				    unsigned long user_call_ID)
{
	struct afs_call *call = (struct afs_call *)user_call_ID;

	call->rxcall = NULL;
704
	afs_put_call(call);
705 706
}

707 708 709
/*
 * Notification of an incoming call.
 */
710 711
static void afs_rx_new_call(struct sock *sk, struct rxrpc_call *rxcall,
			    unsigned long user_call_ID)
712
{
713 714 715
	struct afs_net *net = afs_sock2net(sk);

	queue_work(afs_wq, &net->charge_preallocation_work);
716 717
}

718
/*
719 720
 * Grab the operation ID from an incoming cache manager call.  The socket
 * buffer is discarded on error or if we don't yet have sufficient data.
721
 */
722
static int afs_deliver_cm_op_id(struct afs_call *call)
723
{
724
	int ret;
725

726
	_enter("{%zu}", call->offset);
727 728 729 730

	ASSERTCMP(call->offset, <, 4);

	/* the operation ID forms the first four bytes of the request data */
731
	ret = afs_extract_data(call, &call->tmp, 4, true);
732 733
	if (ret < 0)
		return ret;
734

735
	call->operation_ID = ntohl(call->tmp);
736
	call->state = AFS_CALL_AWAIT_REQUEST;
737
	call->offset = 0;
738 739 740 741 742 743

	/* ask the cache manager to route the call (it'll change the call type
	 * if successful) */
	if (!afs_cm_incoming_call(call))
		return -ENOTSUPP;

D
David Howells 已提交
744 745
	trace_afs_cb_call(call);

746 747
	/* pass responsibility for the remainer of this message off to the
	 * cache manager op */
748
	return call->type->deliver(call);
749 750
}

751 752 753 754 755 756 757 758 759 760 761 762 763 764
/*
 * Advance the AFS call state when an RxRPC service call ends the transmit
 * phase.
 */
static void afs_notify_end_reply_tx(struct sock *sock,
				    struct rxrpc_call *rxcall,
				    unsigned long call_user_ID)
{
	struct afs_call *call = (struct afs_call *)call_user_ID;

	if (call->state == AFS_CALL_REPLYING)
		call->state = AFS_CALL_AWAIT_ACK;
}

765 766 767 768 769
/*
 * send an empty reply
 */
void afs_send_empty_reply(struct afs_call *call)
{
770
	struct afs_net *net = call->net;
771 772 773 774
	struct msghdr msg;

	_enter("");

775
	rxrpc_kernel_set_tx_length(net->socket, call->rxcall, 0);
776

777 778
	msg.msg_name		= NULL;
	msg.msg_namelen		= 0;
779
	iov_iter_kvec(&msg.msg_iter, WRITE | ITER_KVEC, NULL, 0, 0);
780 781 782 783 784
	msg.msg_control		= NULL;
	msg.msg_controllen	= 0;
	msg.msg_flags		= 0;

	call->state = AFS_CALL_AWAIT_ACK;
785
	switch (rxrpc_kernel_send_data(net->socket, call->rxcall, &msg, 0,
786
				       afs_notify_end_reply_tx)) {
787 788 789 790 791 792
	case 0:
		_leave(" [replied]");
		return;

	case -ENOMEM:
		_debug("oom");
793
		rxrpc_kernel_abort_call(net->socket, call->rxcall,
794
					RX_USER_ABORT, -ENOMEM, "KOO");
795 796 797 798 799 800
	default:
		_leave(" [error]");
		return;
	}
}

801 802 803 804 805
/*
 * send a simple reply
 */
void afs_send_simple_reply(struct afs_call *call, const void *buf, size_t len)
{
806
	struct afs_net *net = call->net;
807
	struct msghdr msg;
808
	struct kvec iov[1];
809
	int n;
810 811 812

	_enter("");

813
	rxrpc_kernel_set_tx_length(net->socket, call->rxcall, len);
814

815 816 817 818
	iov[0].iov_base		= (void *) buf;
	iov[0].iov_len		= len;
	msg.msg_name		= NULL;
	msg.msg_namelen		= 0;
819
	iov_iter_kvec(&msg.msg_iter, WRITE | ITER_KVEC, iov, 1, len);
820 821 822 823 824
	msg.msg_control		= NULL;
	msg.msg_controllen	= 0;
	msg.msg_flags		= 0;

	call->state = AFS_CALL_AWAIT_ACK;
825
	n = rxrpc_kernel_send_data(net->socket, call->rxcall, &msg, len,
826
				   afs_notify_end_reply_tx);
827
	if (n >= 0) {
828
		/* Success */
829 830
		_leave(" [replied]");
		return;
831
	}
832

833
	if (n == -ENOMEM) {
834
		_debug("oom");
835
		rxrpc_kernel_abort_call(net->socket, call->rxcall,
836
					RX_USER_ABORT, -ENOMEM, "KOO");
837
	}
838
	_leave(" [error]");
839 840
}

841
/*
842
 * Extract a piece of data from the received data socket buffers.
843
 */
844 845
int afs_extract_data(struct afs_call *call, void *buf, size_t count,
		     bool want_more)
846
{
847
	struct afs_net *net = call->net;
848
	int ret;
849

850 851
	_enter("{%s,%zu},,%zu,%d",
	       call->type->name, call->offset, count, want_more);
852

853
	ASSERTCMP(call->offset, <=, count);
854

855
	ret = rxrpc_kernel_recv_data(net->socket, call->rxcall,
856
				     buf, count, &call->offset,
857 858
				     want_more, &call->abort_code,
				     &call->service_id);
D
David Howells 已提交
859
	trace_afs_recv_data(call, count, call->offset, want_more, ret);
860 861
	if (ret == 0 || ret == -EAGAIN)
		return ret;
862

863 864 865 866 867 868 869 870 871 872 873 874
	if (ret == 1) {
		switch (call->state) {
		case AFS_CALL_AWAIT_REPLY:
			call->state = AFS_CALL_COMPLETE;
			break;
		case AFS_CALL_AWAIT_REQUEST:
			call->state = AFS_CALL_REPLYING;
			break;
		default:
			break;
		}
		return 0;
875
	}
876 877

	if (ret == -ECONNABORTED)
878
		call->error = afs_abort_to_error(call->abort_code);
879 880 881 882
	else
		call->error = ret;
	call->state = AFS_CALL_COMPLETE;
	return ret;
883
}