rxrpc.c 21.0 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11
/* Maintain an RxRPC server socket to do AFS communications through
 *
 * Copyright (C) 2007 Red Hat, Inc. All Rights Reserved.
 * Written by David Howells (dhowells@redhat.com)
 *
 * This program is free software; you can redistribute it and/or
 * modify it under the terms of the GNU General Public License
 * as published by the Free Software Foundation; either version
 * 2 of the License, or (at your option) any later version.
 */

12
#include <linux/slab.h>
13 14
#include <linux/sched/signal.h>

15 16 17 18 19
#include <net/sock.h>
#include <net/af_rxrpc.h>
#include "internal.h"
#include "afs_cm.h"

20
struct workqueue_struct *afs_async_calls;
21

22
static void afs_wake_up_call_waiter(struct sock *, struct rxrpc_call *, unsigned long);
23
static long afs_wait_for_call_to_complete(struct afs_call *);
24 25
static void afs_wake_up_async_call(struct sock *, struct rxrpc_call *, unsigned long);
static void afs_process_async_call(struct work_struct *);
26 27
static void afs_rx_new_call(struct sock *, struct rxrpc_call *, unsigned long);
static void afs_rx_discard_new_call(struct rxrpc_call *, unsigned long);
28
static int afs_deliver_cm_op_id(struct afs_call *);
29 30 31

/* asynchronous incoming call initial processing */
static const struct afs_call_type afs_RXCMxxxx = {
D
David Howells 已提交
32
	.name		= "CB.xxxx",
33 34 35 36 37 38 39
	.deliver	= afs_deliver_cm_op_id,
};

/*
 * open an RxRPC socket and bind it to be a server for callback notifications
 * - the socket is left in blocking mode and non-blocking ops use MSG_DONTWAIT
 */
40
int afs_open_socket(struct afs_net *net)
41 42 43 44 45 46 47
{
	struct sockaddr_rxrpc srx;
	struct socket *socket;
	int ret;

	_enter("");

48
	ret = sock_create_kern(&init_net, AF_RXRPC, SOCK_DGRAM, PF_INET6, &socket);
49 50
	if (ret < 0)
		goto error_1;
51 52 53 54

	socket->sk->sk_allocation = GFP_NOFS;

	/* bind the callback manager's address to make this a server socket */
55
	memset(&srx, 0, sizeof(srx));
56 57 58
	srx.srx_family			= AF_RXRPC;
	srx.srx_service			= CM_SERVICE;
	srx.transport_type		= SOCK_DGRAM;
59 60 61
	srx.transport_len		= sizeof(srx.transport.sin6);
	srx.transport.sin6.sin6_family	= AF_INET6;
	srx.transport.sin6.sin6_port	= htons(AFS_CM_PORT);
62 63

	ret = kernel_bind(socket, (struct sockaddr *) &srx, sizeof(srx));
64 65 66
	if (ret < 0)
		goto error_2;

67 68
	rxrpc_kernel_new_call_notification(socket, afs_rx_new_call,
					   afs_rx_discard_new_call);
69

70 71 72
	ret = kernel_listen(socket, INT_MAX);
	if (ret < 0)
		goto error_2;
73

74 75
	net->socket = socket;
	afs_charge_preallocation(&net->charge_preallocation_work);
76 77
	_leave(" = 0");
	return 0;
78 79 80 81 82 83

error_2:
	sock_release(socket);
error_1:
	_leave(" = %d", ret);
	return ret;
84 85 86 87 88
}

/*
 * close the RxRPC socket AFS was using
 */
89
void afs_close_socket(struct afs_net *net)
90 91 92
{
	_enter("");

93
	kernel_listen(net->socket, 0);
94 95
	flush_workqueue(afs_async_calls);

96 97 98
	if (net->spare_incoming_call) {
		afs_put_call(net->spare_incoming_call);
		net->spare_incoming_call = NULL;
99 100
	}

101 102
	_debug("outstanding %u", atomic_read(&net->nr_outstanding_calls));
	wait_on_atomic_t(&net->nr_outstanding_calls, atomic_t_wait,
103 104 105
			 TASK_UNINTERRUPTIBLE);
	_debug("no outstanding calls");

106
	kernel_sock_shutdown(net->socket, SHUT_RDWR);
107
	flush_workqueue(afs_async_calls);
108
	sock_release(net->socket);
109 110 111 112 113

	_debug("dework");
	_leave("");
}

D
David Howells 已提交
114
/*
115
 * Allocate a call.
D
David Howells 已提交
116
 */
117 118
static struct afs_call *afs_alloc_call(struct afs_net *net,
				       const struct afs_call_type *type,
119
				       gfp_t gfp)
D
David Howells 已提交
120
{
121 122
	struct afs_call *call;
	int o;
D
David Howells 已提交
123

124 125 126
	call = kzalloc(sizeof(*call), gfp);
	if (!call)
		return NULL;
D
David Howells 已提交
127

128
	call->type = type;
129
	call->net = net;
130 131 132
	atomic_set(&call->usage, 1);
	INIT_WORK(&call->async_work, afs_process_async_call);
	init_waitqueue_head(&call->waitq);
133

134
	o = atomic_inc_return(&net->nr_outstanding_calls);
135 136 137
	trace_afs_call(call, afs_call_trace_alloc, 1, o,
		       __builtin_return_address(0));
	return call;
D
David Howells 已提交
138 139
}

140
/*
141
 * Dispose of a reference on a call.
142
 */
143
void afs_put_call(struct afs_call *call)
144
{
145
	struct afs_net *net = call->net;
146
	int n = atomic_dec_return(&call->usage);
147
	int o = atomic_read(&net->nr_outstanding_calls);
148 149 150 151 152 153 154 155 156 157

	trace_afs_call(call, afs_call_trace_put, n + 1, o,
		       __builtin_return_address(0));

	ASSERTCMP(n, >=, 0);
	if (n == 0) {
		ASSERT(!work_pending(&call->async_work));
		ASSERT(call->type->name != NULL);

		if (call->rxcall) {
158
			rxrpc_kernel_end_call(net->socket, call->rxcall);
159 160 161 162 163
			call->rxcall = NULL;
		}
		if (call->type->destructor)
			call->type->destructor(call);

164
		afs_put_server(call->net, call->cm_server);
165 166 167
		kfree(call->request);
		kfree(call);

168
		o = atomic_dec_return(&net->nr_outstanding_calls);
169 170 171
		trace_afs_call(call, afs_call_trace_free, 0, o,
			       __builtin_return_address(0));
		if (o == 0)
172
			wake_up_atomic_t(&net->nr_outstanding_calls);
173
	}
174 175 176
}

/*
177
 * Queue the call for actual work.  Returns 0 unconditionally for convenience.
178
 */
179
int afs_queue_call_work(struct afs_call *call)
180
{
181 182 183
	int u = atomic_inc_return(&call->usage);

	trace_afs_call(call, afs_call_trace_work, u,
184
		       atomic_read(&call->net->nr_outstanding_calls),
185 186 187 188 189 190 191
		       __builtin_return_address(0));

	INIT_WORK(&call->work, call->type->work);

	if (!queue_work(afs_wq, &call->work))
		afs_put_call(call);
	return 0;
192 193
}

194 195 196
/*
 * allocate a call with flat request and reply buffers
 */
197 198
struct afs_call *afs_alloc_flat_call(struct afs_net *net,
				     const struct afs_call_type *type,
199
				     size_t request_size, size_t reply_max)
200 201 202
{
	struct afs_call *call;

203
	call = afs_alloc_call(net, type, GFP_NOFS);
204 205 206 207
	if (!call)
		goto nomem_call;

	if (request_size) {
208
		call->request_size = request_size;
209 210
		call->request = kmalloc(request_size, GFP_NOFS);
		if (!call->request)
D
David Howells 已提交
211
			goto nomem_free;
212 213
	}

214
	if (reply_max) {
215
		call->reply_max = reply_max;
216
		call->buffer = kmalloc(reply_max, GFP_NOFS);
217
		if (!call->buffer)
D
David Howells 已提交
218
			goto nomem_free;
219 220 221 222 223
	}

	init_waitqueue_head(&call->waitq);
	return call;

D
David Howells 已提交
224
nomem_free:
225
	afs_put_call(call);
226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242
nomem_call:
	return NULL;
}

/*
 * clean up a call with flat buffer
 */
void afs_flat_call_destructor(struct afs_call *call)
{
	_enter("");

	kfree(call->request);
	call->request = NULL;
	kfree(call->buffer);
	call->buffer = NULL;
}

243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275
#define AFS_BVEC_MAX 8

/*
 * Load the given bvec with the next few pages.
 */
static void afs_load_bvec(struct afs_call *call, struct msghdr *msg,
			  struct bio_vec *bv, pgoff_t first, pgoff_t last,
			  unsigned offset)
{
	struct page *pages[AFS_BVEC_MAX];
	unsigned int nr, n, i, to, bytes = 0;

	nr = min_t(pgoff_t, last - first + 1, AFS_BVEC_MAX);
	n = find_get_pages_contig(call->mapping, first, nr, pages);
	ASSERTCMP(n, ==, nr);

	msg->msg_flags |= MSG_MORE;
	for (i = 0; i < nr; i++) {
		to = PAGE_SIZE;
		if (first + i >= last) {
			to = call->last_to;
			msg->msg_flags &= ~MSG_MORE;
		}
		bv[i].bv_page = pages[i];
		bv[i].bv_len = to - offset;
		bv[i].bv_offset = offset;
		bytes += to - offset;
		offset = 0;
	}

	iov_iter_bvec(&msg->msg_iter, WRITE | ITER_BVEC, bv, nr, bytes);
}

276 277 278 279 280 281 282 283 284 285 286 287 288
/*
 * Advance the AFS call state when the RxRPC call ends the transmit phase.
 */
static void afs_notify_end_request_tx(struct sock *sock,
				      struct rxrpc_call *rxcall,
				      unsigned long call_user_ID)
{
	struct afs_call *call = (struct afs_call *)call_user_ID;

	if (call->state == AFS_CALL_REQUESTING)
		call->state = AFS_CALL_AWAIT_REPLY;
}

289 290 291
/*
 * attach the data from a bunch of pages on an inode to a call
 */
A
Al Viro 已提交
292
static int afs_send_pages(struct afs_call *call, struct msghdr *msg)
293
{
294 295
	struct bio_vec bv[AFS_BVEC_MAX];
	unsigned int bytes, nr, loop, offset;
296 297 298 299 300 301 302
	pgoff_t first = call->first, last = call->last;
	int ret;

	offset = call->first_offset;
	call->first_offset = 0;

	do {
303 304 305 306 307
		afs_load_bvec(call, msg, bv, first, last, offset);
		offset = 0;
		bytes = msg->msg_iter.count;
		nr = msg->msg_iter.nr_segs;

308
		ret = rxrpc_kernel_send_data(call->net->socket, call->rxcall, msg,
309
					     bytes, afs_notify_end_request_tx);
310 311
		for (loop = 0; loop < nr; loop++)
			put_page(bv[loop].bv_page);
312 313
		if (ret < 0)
			break;
314 315

		first += nr;
D
David Howells 已提交
316
	} while (first <= last);
317 318 319 320

	return ret;
}

321 322 323
/*
 * initiate a call
 */
D
David Howells 已提交
324
long afs_make_call(struct afs_addr_cursor *ac, struct afs_call *call,
325
		   gfp_t gfp, bool async)
326
{
D
David Howells 已提交
327
	struct sockaddr_rxrpc *srx = ac->addr;
328 329 330
	struct rxrpc_call *rxcall;
	struct msghdr msg;
	struct kvec iov[1];
331
	size_t offset;
332
	s64 tx_total_len;
333
	u32 abort_code;
334 335
	int ret;

336
	_enter(",{%pISp},", &srx->transport);
337

D
David Howells 已提交
338 339 340
	ASSERT(call->type != NULL);
	ASSERT(call->type->name != NULL);

341 342
	_debug("____MAKE %p{%s,%x} [%d]____",
	       call, call->type->name, key_serial(call->key),
343
	       atomic_read(&call->net->nr_outstanding_calls));
D
David Howells 已提交
344

D
David Howells 已提交
345
	call->async = async;
346

347 348 349 350 351 352 353 354 355 356
	/* Work out the length we're going to transmit.  This is awkward for
	 * calls such as FS.StoreData where there's an extra injection of data
	 * after the initial fixed part.
	 */
	tx_total_len = call->request_size;
	if (call->send_pages) {
		tx_total_len += call->last_to - call->first_offset;
		tx_total_len += (call->last - call->first) * PAGE_SIZE;
	}

357
	/* create a call */
358
	rxcall = rxrpc_kernel_begin_call(call->net->socket, srx, call->key,
359 360
					 (unsigned long)call,
					 tx_total_len, gfp,
D
David Howells 已提交
361 362
					 (async ?
					  afs_wake_up_async_call :
363 364
					  afs_wake_up_call_waiter),
					 call->upgrade);
D
David Howells 已提交
365
	call->key = NULL;
366 367 368 369 370 371 372 373 374 375 376 377 378
	if (IS_ERR(rxcall)) {
		ret = PTR_ERR(rxcall);
		goto error_kill_call;
	}

	call->rxcall = rxcall;

	/* send the request */
	iov[0].iov_base	= call->request;
	iov[0].iov_len	= call->request_size;

	msg.msg_name		= NULL;
	msg.msg_namelen		= 0;
379
	iov_iter_kvec(&msg.msg_iter, WRITE | ITER_KVEC, iov, 1,
A
Al Viro 已提交
380
		      call->request_size);
381 382
	msg.msg_control		= NULL;
	msg.msg_controllen	= 0;
383
	msg.msg_flags		= MSG_WAITALL | (call->send_pages ? MSG_MORE : 0);
384

385 386 387 388 389
	/* We have to change the state *before* sending the last packet as
	 * rxrpc might give us the reply before it returns from sending the
	 * request.  Further, if the send fails, we may already have been given
	 * a notification and may have collected it.
	 */
390 391
	if (!call->send_pages)
		call->state = AFS_CALL_AWAIT_REPLY;
392
	ret = rxrpc_kernel_send_data(call->net->socket, rxcall,
393 394
				     &msg, call->request_size,
				     afs_notify_end_request_tx);
395 396 397
	if (ret < 0)
		goto error_do_abort;

398
	if (call->send_pages) {
A
Al Viro 已提交
399
		ret = afs_send_pages(call, &msg);
400 401 402 403
		if (ret < 0)
			goto error_do_abort;
	}

404 405
	/* at this point, an async call may no longer exist as it may have
	 * already completed */
D
David Howells 已提交
406 407 408 409
	if (call->async)
		return -EINPROGRESS;

	return afs_wait_for_call_to_complete(call);
410 411

error_do_abort:
412 413
	call->state = AFS_CALL_COMPLETE;
	if (ret != -ECONNABORTED) {
414 415
		rxrpc_kernel_abort_call(call->net->socket, rxcall,
					RX_USER_ABORT, ret, "KSD");
416 417 418
	} else {
		abort_code = 0;
		offset = 0;
419
		rxrpc_kernel_recv_data(call->net->socket, rxcall, NULL,
420
				       0, &offset, false, &call->abort_code,
421
				       &call->service_id);
422
		ret = afs_abort_to_error(call->abort_code);
423
	}
424
error_kill_call:
425
	afs_put_call(call);
426 427 428 429 430 431 432 433 434 435 436 437
	_leave(" = %d", ret);
	return ret;
}

/*
 * deliver messages to a call
 */
static void afs_deliver_to_call(struct afs_call *call)
{
	u32 abort_code;
	int ret;

438 439 440 441 442 443 444 445 446
	_enter("%s", call->type->name);

	while (call->state == AFS_CALL_AWAIT_REPLY ||
	       call->state == AFS_CALL_AWAIT_OP_ID ||
	       call->state == AFS_CALL_AWAIT_REQUEST ||
	       call->state == AFS_CALL_AWAIT_ACK
	       ) {
		if (call->state == AFS_CALL_AWAIT_ACK) {
			size_t offset = 0;
447 448
			ret = rxrpc_kernel_recv_data(call->net->socket,
						     call->rxcall,
449
						     NULL, 0, &offset, false,
450 451
						     &call->abort_code,
						     &call->service_id);
D
David Howells 已提交
452 453
			trace_afs_recv_data(call, 0, offset, false, ret);

454 455
			if (ret == -EINPROGRESS || ret == -EAGAIN)
				return;
456
			if (ret == 1 || ret < 0) {
457 458
				call->state = AFS_CALL_COMPLETE;
				goto done;
459
			}
460
			return;
461 462
		}

463 464 465 466 467 468 469 470 471
		ret = call->type->deliver(call);
		switch (ret) {
		case 0:
			if (call->state == AFS_CALL_AWAIT_REPLY)
				call->state = AFS_CALL_COMPLETE;
			goto done;
		case -EINPROGRESS:
		case -EAGAIN:
			goto out;
472
		case -ECONNABORTED:
473
			goto save_error;
474 475
		case -ENOTCONN:
			abort_code = RX_CALL_DEAD;
476
			rxrpc_kernel_abort_call(call->net->socket, call->rxcall,
477
						abort_code, ret, "KNC");
478
			goto save_error;
479
		case -ENOTSUPP:
480
			abort_code = RXGEN_OPCODE;
481
			rxrpc_kernel_abort_call(call->net->socket, call->rxcall,
482
						abort_code, ret, "KIV");
483
			goto save_error;
484 485 486 487 488 489 490
		case -ENODATA:
		case -EBADMSG:
		case -EMSGSIZE:
		default:
			abort_code = RXGEN_CC_UNMARSHAL;
			if (call->state != AFS_CALL_AWAIT_REPLY)
				abort_code = RXGEN_SS_UNMARSHAL;
491
			rxrpc_kernel_abort_call(call->net->socket, call->rxcall,
492
						abort_code, -EBADMSG, "KUM");
493
			goto save_error;
494
		}
495 496
	}

497 498
done:
	if (call->state == AFS_CALL_COMPLETE && call->incoming)
499
		afs_put_call(call);
500
out:
501
	_leave("");
502 503
	return;

504
save_error:
505 506 507
	call->error = ret;
	call->state = AFS_CALL_COMPLETE;
	goto done;
508 509 510 511 512
}

/*
 * wait synchronously for a call to complete
 */
513
static long afs_wait_for_call_to_complete(struct afs_call *call)
514
{
515
	signed long rtt2, timeout;
516
	long ret;
517 518
	u64 rtt;
	u32 life, last_life;
519 520 521 522 523

	DECLARE_WAITQUEUE(myself, current);

	_enter("");

524
	rtt = rxrpc_kernel_get_rtt(call->net->socket, call->rxcall);
525 526 527 528 529
	rtt2 = nsecs_to_jiffies64(rtt) * 2;
	if (rtt2 < 2)
		rtt2 = 2;

	timeout = rtt2;
530
	last_life = rxrpc_kernel_check_life(call->net->socket, call->rxcall);
531

532 533
	add_wait_queue(&call->waitq, &myself);
	for (;;) {
534
		set_current_state(TASK_UNINTERRUPTIBLE);
535 536

		/* deliver any messages that are in the queue */
537 538
		if (call->state < AFS_CALL_COMPLETE && call->need_attention) {
			call->need_attention = false;
539 540 541 542 543
			__set_current_state(TASK_RUNNING);
			afs_deliver_to_call(call);
			continue;
		}

544
		if (call->state == AFS_CALL_COMPLETE)
545
			break;
546

547
		life = rxrpc_kernel_check_life(call->net->socket, call->rxcall);
548 549 550 551 552 553 554 555 556 557
		if (timeout == 0 &&
		    life == last_life && signal_pending(current))
				break;

		if (life != last_life) {
			timeout = rtt2;
			last_life = life;
		}

		timeout = schedule_timeout(timeout);
558 559 560 561 562
	}

	remove_wait_queue(&call->waitq, &myself);
	__set_current_state(TASK_RUNNING);

563
	/* Kill off the call if it's still live. */
564
	if (call->state < AFS_CALL_COMPLETE) {
565
		_debug("call interrupted");
566
		rxrpc_kernel_abort_call(call->net->socket, call->rxcall,
567
					RX_USER_ABORT, -EINTR, "KWI");
568 569
	}

570
	ret = call->error;
571 572 573 574 575 576 577
	if (ret < 0) {
		ret = afs_abort_to_error(call->abort_code);
	} else if (ret == 0 && call->ret_reply0) {
		ret = (long)call->reply[0];
		call->reply[0] = NULL;
	}

578
	_debug("call complete");
579
	afs_put_call(call);
580
	_leave(" = %p", (void *)ret);
581 582 583 584 585 586
	return ret;
}

/*
 * wake up a waiting call
 */
587 588
static void afs_wake_up_call_waiter(struct sock *sk, struct rxrpc_call *rxcall,
				    unsigned long call_user_ID)
589
{
590 591 592
	struct afs_call *call = (struct afs_call *)call_user_ID;

	call->need_attention = true;
593 594 595 596 597 598
	wake_up(&call->waitq);
}

/*
 * wake up an asynchronous call
 */
599 600
static void afs_wake_up_async_call(struct sock *sk, struct rxrpc_call *rxcall,
				   unsigned long call_user_ID)
601
{
602
	struct afs_call *call = (struct afs_call *)call_user_ID;
603
	int u;
604

D
David Howells 已提交
605
	trace_afs_notify_call(rxcall, call);
606
	call->need_attention = true;
607 608 609 610

	u = __atomic_add_unless(&call->usage, 1, 0);
	if (u != 0) {
		trace_afs_call(call, afs_call_trace_wake, u,
611
			       atomic_read(&call->net->nr_outstanding_calls),
612 613 614 615 616
			       __builtin_return_address(0));

		if (!queue_work(afs_async_calls, &call->async_work))
			afs_put_call(call);
	}
617 618 619
}

/*
620 621
 * Delete an asynchronous call.  The work item carries a ref to the call struct
 * that we need to release.
622
 */
623
static void afs_delete_async_call(struct work_struct *work)
624
{
625 626
	struct afs_call *call = container_of(work, struct afs_call, async_work);

627 628
	_enter("");

629
	afs_put_call(call);
630 631 632 633 634

	_leave("");
}

/*
635 636
 * Perform I/O processing on an asynchronous call.  The work item carries a ref
 * to the call struct that we either need to release or to pass on.
637
 */
638
static void afs_process_async_call(struct work_struct *work)
639
{
640 641
	struct afs_call *call = container_of(work, struct afs_call, async_work);

642 643
	_enter("");

644 645
	if (call->state < AFS_CALL_COMPLETE && call->need_attention) {
		call->need_attention = false;
646
		afs_deliver_to_call(call);
647
	}
648

D
David Howells 已提交
649
	if (call->state == AFS_CALL_COMPLETE) {
650
		call->reply[0] = NULL;
651

652 653 654 655
		/* We have two refs to release - one from the alloc and one
		 * queued with the work item - and we can't just deallocate the
		 * call because the work item may be queued again.
		 */
656
		call->async_work.func = afs_delete_async_call;
657 658
		if (!queue_work(afs_async_calls, &call->async_work))
			afs_put_call(call);
659 660
	}

661
	afs_put_call(call);
662 663 664
	_leave("");
}

665 666 667 668 669 670 671 672 673 674
static void afs_rx_attach(struct rxrpc_call *rxcall, unsigned long user_call_ID)
{
	struct afs_call *call = (struct afs_call *)user_call_ID;

	call->rxcall = rxcall;
}

/*
 * Charge the incoming call preallocation.
 */
675
void afs_charge_preallocation(struct work_struct *work)
676
{
677 678 679
	struct afs_net *net =
		container_of(work, struct afs_net, charge_preallocation_work);
	struct afs_call *call = net->spare_incoming_call;
680 681 682

	for (;;) {
		if (!call) {
683
			call = afs_alloc_call(net, &afs_RXCMxxxx, GFP_KERNEL);
684 685 686
			if (!call)
				break;

D
David Howells 已提交
687
			call->async = true;
688
			call->state = AFS_CALL_AWAIT_OP_ID;
D
David Howells 已提交
689
			init_waitqueue_head(&call->waitq);
690 691
		}

692
		if (rxrpc_kernel_charge_accept(net->socket,
693 694 695 696 697 698 699
					       afs_wake_up_async_call,
					       afs_rx_attach,
					       (unsigned long)call,
					       GFP_KERNEL) < 0)
			break;
		call = NULL;
	}
700
	net->spare_incoming_call = call;
701 702 703 704 705 706 707 708 709 710 711
}

/*
 * Discard a preallocated call when a socket is shut down.
 */
static void afs_rx_discard_new_call(struct rxrpc_call *rxcall,
				    unsigned long user_call_ID)
{
	struct afs_call *call = (struct afs_call *)user_call_ID;

	call->rxcall = NULL;
712
	afs_put_call(call);
713 714
}

715 716 717
/*
 * Notification of an incoming call.
 */
718 719
static void afs_rx_new_call(struct sock *sk, struct rxrpc_call *rxcall,
			    unsigned long user_call_ID)
720
{
721 722 723
	struct afs_net *net = afs_sock2net(sk);

	queue_work(afs_wq, &net->charge_preallocation_work);
724 725
}

726
/*
727 728
 * Grab the operation ID from an incoming cache manager call.  The socket
 * buffer is discarded on error or if we don't yet have sufficient data.
729
 */
730
static int afs_deliver_cm_op_id(struct afs_call *call)
731
{
732
	int ret;
733

734
	_enter("{%zu}", call->offset);
735 736 737 738

	ASSERTCMP(call->offset, <, 4);

	/* the operation ID forms the first four bytes of the request data */
739
	ret = afs_extract_data(call, &call->tmp, 4, true);
740 741
	if (ret < 0)
		return ret;
742

743
	call->operation_ID = ntohl(call->tmp);
744
	call->state = AFS_CALL_AWAIT_REQUEST;
745
	call->offset = 0;
746 747 748 749 750 751

	/* ask the cache manager to route the call (it'll change the call type
	 * if successful) */
	if (!afs_cm_incoming_call(call))
		return -ENOTSUPP;

D
David Howells 已提交
752 753
	trace_afs_cb_call(call);

754 755
	/* pass responsibility for the remainer of this message off to the
	 * cache manager op */
756
	return call->type->deliver(call);
757 758
}

759 760 761 762 763 764 765 766 767 768 769 770 771 772
/*
 * Advance the AFS call state when an RxRPC service call ends the transmit
 * phase.
 */
static void afs_notify_end_reply_tx(struct sock *sock,
				    struct rxrpc_call *rxcall,
				    unsigned long call_user_ID)
{
	struct afs_call *call = (struct afs_call *)call_user_ID;

	if (call->state == AFS_CALL_REPLYING)
		call->state = AFS_CALL_AWAIT_ACK;
}

773 774 775 776 777
/*
 * send an empty reply
 */
void afs_send_empty_reply(struct afs_call *call)
{
778
	struct afs_net *net = call->net;
779 780 781 782
	struct msghdr msg;

	_enter("");

783
	rxrpc_kernel_set_tx_length(net->socket, call->rxcall, 0);
784

785 786
	msg.msg_name		= NULL;
	msg.msg_namelen		= 0;
787
	iov_iter_kvec(&msg.msg_iter, WRITE | ITER_KVEC, NULL, 0, 0);
788 789 790 791 792
	msg.msg_control		= NULL;
	msg.msg_controllen	= 0;
	msg.msg_flags		= 0;

	call->state = AFS_CALL_AWAIT_ACK;
793
	switch (rxrpc_kernel_send_data(net->socket, call->rxcall, &msg, 0,
794
				       afs_notify_end_reply_tx)) {
795 796 797 798 799 800
	case 0:
		_leave(" [replied]");
		return;

	case -ENOMEM:
		_debug("oom");
801
		rxrpc_kernel_abort_call(net->socket, call->rxcall,
802
					RX_USER_ABORT, -ENOMEM, "KOO");
803 804 805 806 807 808
	default:
		_leave(" [error]");
		return;
	}
}

809 810 811 812 813
/*
 * send a simple reply
 */
void afs_send_simple_reply(struct afs_call *call, const void *buf, size_t len)
{
814
	struct afs_net *net = call->net;
815
	struct msghdr msg;
816
	struct kvec iov[1];
817
	int n;
818 819 820

	_enter("");

821
	rxrpc_kernel_set_tx_length(net->socket, call->rxcall, len);
822

823 824 825 826
	iov[0].iov_base		= (void *) buf;
	iov[0].iov_len		= len;
	msg.msg_name		= NULL;
	msg.msg_namelen		= 0;
827
	iov_iter_kvec(&msg.msg_iter, WRITE | ITER_KVEC, iov, 1, len);
828 829 830 831 832
	msg.msg_control		= NULL;
	msg.msg_controllen	= 0;
	msg.msg_flags		= 0;

	call->state = AFS_CALL_AWAIT_ACK;
833
	n = rxrpc_kernel_send_data(net->socket, call->rxcall, &msg, len,
834
				   afs_notify_end_reply_tx);
835
	if (n >= 0) {
836
		/* Success */
837 838
		_leave(" [replied]");
		return;
839
	}
840

841
	if (n == -ENOMEM) {
842
		_debug("oom");
843
		rxrpc_kernel_abort_call(net->socket, call->rxcall,
844
					RX_USER_ABORT, -ENOMEM, "KOO");
845
	}
846
	_leave(" [error]");
847 848
}

849
/*
850
 * Extract a piece of data from the received data socket buffers.
851
 */
852 853
int afs_extract_data(struct afs_call *call, void *buf, size_t count,
		     bool want_more)
854
{
855
	struct afs_net *net = call->net;
856
	int ret;
857

858 859
	_enter("{%s,%zu},,%zu,%d",
	       call->type->name, call->offset, count, want_more);
860

861
	ASSERTCMP(call->offset, <=, count);
862

863
	ret = rxrpc_kernel_recv_data(net->socket, call->rxcall,
864
				     buf, count, &call->offset,
865 866
				     want_more, &call->abort_code,
				     &call->service_id);
D
David Howells 已提交
867
	trace_afs_recv_data(call, count, call->offset, want_more, ret);
868 869
	if (ret == 0 || ret == -EAGAIN)
		return ret;
870

871 872 873 874 875 876 877 878 879 880 881 882
	if (ret == 1) {
		switch (call->state) {
		case AFS_CALL_AWAIT_REPLY:
			call->state = AFS_CALL_COMPLETE;
			break;
		case AFS_CALL_AWAIT_REQUEST:
			call->state = AFS_CALL_REPLYING;
			break;
		default:
			break;
		}
		return 0;
883
	}
884 885

	if (ret == -ECONNABORTED)
886
		call->error = afs_abort_to_error(call->abort_code);
887 888 889 890
	else
		call->error = ret;
	call->state = AFS_CALL_COMPLETE;
	return ret;
891
}