rxrpc.c 21.1 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11
/* Maintain an RxRPC server socket to do AFS communications through
 *
 * Copyright (C) 2007 Red Hat, Inc. All Rights Reserved.
 * Written by David Howells (dhowells@redhat.com)
 *
 * This program is free software; you can redistribute it and/or
 * modify it under the terms of the GNU General Public License
 * as published by the Free Software Foundation; either version
 * 2 of the License, or (at your option) any later version.
 */

12
#include <linux/slab.h>
13 14
#include <linux/sched/signal.h>

15 16 17 18 19
#include <net/sock.h>
#include <net/af_rxrpc.h>
#include "internal.h"
#include "afs_cm.h"

20
struct workqueue_struct *afs_async_calls;
21

22
static void afs_wake_up_call_waiter(struct sock *, struct rxrpc_call *, unsigned long);
23
static long afs_wait_for_call_to_complete(struct afs_call *, struct afs_addr_cursor *);
24 25
static void afs_wake_up_async_call(struct sock *, struct rxrpc_call *, unsigned long);
static void afs_process_async_call(struct work_struct *);
26 27
static void afs_rx_new_call(struct sock *, struct rxrpc_call *, unsigned long);
static void afs_rx_discard_new_call(struct rxrpc_call *, unsigned long);
28
static int afs_deliver_cm_op_id(struct afs_call *);
29 30 31

/* asynchronous incoming call initial processing */
static const struct afs_call_type afs_RXCMxxxx = {
D
David Howells 已提交
32
	.name		= "CB.xxxx",
33 34 35 36 37 38 39
	.deliver	= afs_deliver_cm_op_id,
};

/*
 * open an RxRPC socket and bind it to be a server for callback notifications
 * - the socket is left in blocking mode and non-blocking ops use MSG_DONTWAIT
 */
40
int afs_open_socket(struct afs_net *net)
41 42 43 44 45 46 47
{
	struct sockaddr_rxrpc srx;
	struct socket *socket;
	int ret;

	_enter("");

48
	ret = sock_create_kern(&init_net, AF_RXRPC, SOCK_DGRAM, PF_INET6, &socket);
49 50
	if (ret < 0)
		goto error_1;
51 52 53 54

	socket->sk->sk_allocation = GFP_NOFS;

	/* bind the callback manager's address to make this a server socket */
55
	memset(&srx, 0, sizeof(srx));
56 57 58
	srx.srx_family			= AF_RXRPC;
	srx.srx_service			= CM_SERVICE;
	srx.transport_type		= SOCK_DGRAM;
59 60 61
	srx.transport_len		= sizeof(srx.transport.sin6);
	srx.transport.sin6.sin6_family	= AF_INET6;
	srx.transport.sin6.sin6_port	= htons(AFS_CM_PORT);
62 63

	ret = kernel_bind(socket, (struct sockaddr *) &srx, sizeof(srx));
64 65 66
	if (ret < 0)
		goto error_2;

67 68
	rxrpc_kernel_new_call_notification(socket, afs_rx_new_call,
					   afs_rx_discard_new_call);
69

70 71 72
	ret = kernel_listen(socket, INT_MAX);
	if (ret < 0)
		goto error_2;
73

74 75
	net->socket = socket;
	afs_charge_preallocation(&net->charge_preallocation_work);
76 77
	_leave(" = 0");
	return 0;
78 79 80 81 82 83

error_2:
	sock_release(socket);
error_1:
	_leave(" = %d", ret);
	return ret;
84 85 86 87 88
}

/*
 * close the RxRPC socket AFS was using
 */
89
void afs_close_socket(struct afs_net *net)
90 91 92
{
	_enter("");

93
	kernel_listen(net->socket, 0);
94 95
	flush_workqueue(afs_async_calls);

96 97 98
	if (net->spare_incoming_call) {
		afs_put_call(net->spare_incoming_call);
		net->spare_incoming_call = NULL;
99 100
	}

101 102
	_debug("outstanding %u", atomic_read(&net->nr_outstanding_calls));
	wait_on_atomic_t(&net->nr_outstanding_calls, atomic_t_wait,
103 104 105
			 TASK_UNINTERRUPTIBLE);
	_debug("no outstanding calls");

106
	kernel_sock_shutdown(net->socket, SHUT_RDWR);
107
	flush_workqueue(afs_async_calls);
108
	sock_release(net->socket);
109 110 111 112 113

	_debug("dework");
	_leave("");
}

D
David Howells 已提交
114
/*
115
 * Allocate a call.
D
David Howells 已提交
116
 */
117 118
static struct afs_call *afs_alloc_call(struct afs_net *net,
				       const struct afs_call_type *type,
119
				       gfp_t gfp)
D
David Howells 已提交
120
{
121 122
	struct afs_call *call;
	int o;
D
David Howells 已提交
123

124 125 126
	call = kzalloc(sizeof(*call), gfp);
	if (!call)
		return NULL;
D
David Howells 已提交
127

128
	call->type = type;
129
	call->net = net;
130 131 132
	atomic_set(&call->usage, 1);
	INIT_WORK(&call->async_work, afs_process_async_call);
	init_waitqueue_head(&call->waitq);
133

134
	o = atomic_inc_return(&net->nr_outstanding_calls);
135 136 137
	trace_afs_call(call, afs_call_trace_alloc, 1, o,
		       __builtin_return_address(0));
	return call;
D
David Howells 已提交
138 139
}

140
/*
141
 * Dispose of a reference on a call.
142
 */
143
void afs_put_call(struct afs_call *call)
144
{
145
	struct afs_net *net = call->net;
146
	int n = atomic_dec_return(&call->usage);
147
	int o = atomic_read(&net->nr_outstanding_calls);
148 149 150 151 152 153 154 155 156 157

	trace_afs_call(call, afs_call_trace_put, n + 1, o,
		       __builtin_return_address(0));

	ASSERTCMP(n, >=, 0);
	if (n == 0) {
		ASSERT(!work_pending(&call->async_work));
		ASSERT(call->type->name != NULL);

		if (call->rxcall) {
158
			rxrpc_kernel_end_call(net->socket, call->rxcall);
159 160 161 162 163
			call->rxcall = NULL;
		}
		if (call->type->destructor)
			call->type->destructor(call);

164
		afs_put_server(call->net, call->cm_server);
165
		afs_put_cb_interest(call->net, call->cbi);
166 167 168
		kfree(call->request);
		kfree(call);

169
		o = atomic_dec_return(&net->nr_outstanding_calls);
170 171 172
		trace_afs_call(call, afs_call_trace_free, 0, o,
			       __builtin_return_address(0));
		if (o == 0)
173
			wake_up_atomic_t(&net->nr_outstanding_calls);
174
	}
175 176 177
}

/*
178
 * Queue the call for actual work.  Returns 0 unconditionally for convenience.
179
 */
180
int afs_queue_call_work(struct afs_call *call)
181
{
182 183 184
	int u = atomic_inc_return(&call->usage);

	trace_afs_call(call, afs_call_trace_work, u,
185
		       atomic_read(&call->net->nr_outstanding_calls),
186 187 188 189 190 191 192
		       __builtin_return_address(0));

	INIT_WORK(&call->work, call->type->work);

	if (!queue_work(afs_wq, &call->work))
		afs_put_call(call);
	return 0;
193 194
}

195 196 197
/*
 * allocate a call with flat request and reply buffers
 */
198 199
struct afs_call *afs_alloc_flat_call(struct afs_net *net,
				     const struct afs_call_type *type,
200
				     size_t request_size, size_t reply_max)
201 202 203
{
	struct afs_call *call;

204
	call = afs_alloc_call(net, type, GFP_NOFS);
205 206 207 208
	if (!call)
		goto nomem_call;

	if (request_size) {
209
		call->request_size = request_size;
210 211
		call->request = kmalloc(request_size, GFP_NOFS);
		if (!call->request)
D
David Howells 已提交
212
			goto nomem_free;
213 214
	}

215
	if (reply_max) {
216
		call->reply_max = reply_max;
217
		call->buffer = kmalloc(reply_max, GFP_NOFS);
218
		if (!call->buffer)
D
David Howells 已提交
219
			goto nomem_free;
220 221 222 223 224
	}

	init_waitqueue_head(&call->waitq);
	return call;

D
David Howells 已提交
225
nomem_free:
226
	afs_put_call(call);
227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243
nomem_call:
	return NULL;
}

/*
 * clean up a call with flat buffer
 */
void afs_flat_call_destructor(struct afs_call *call)
{
	_enter("");

	kfree(call->request);
	call->request = NULL;
	kfree(call->buffer);
	call->buffer = NULL;
}

244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276
#define AFS_BVEC_MAX 8

/*
 * Load the given bvec with the next few pages.
 */
static void afs_load_bvec(struct afs_call *call, struct msghdr *msg,
			  struct bio_vec *bv, pgoff_t first, pgoff_t last,
			  unsigned offset)
{
	struct page *pages[AFS_BVEC_MAX];
	unsigned int nr, n, i, to, bytes = 0;

	nr = min_t(pgoff_t, last - first + 1, AFS_BVEC_MAX);
	n = find_get_pages_contig(call->mapping, first, nr, pages);
	ASSERTCMP(n, ==, nr);

	msg->msg_flags |= MSG_MORE;
	for (i = 0; i < nr; i++) {
		to = PAGE_SIZE;
		if (first + i >= last) {
			to = call->last_to;
			msg->msg_flags &= ~MSG_MORE;
		}
		bv[i].bv_page = pages[i];
		bv[i].bv_len = to - offset;
		bv[i].bv_offset = offset;
		bytes += to - offset;
		offset = 0;
	}

	iov_iter_bvec(&msg->msg_iter, WRITE | ITER_BVEC, bv, nr, bytes);
}

277 278 279 280 281 282 283 284 285 286 287 288 289
/*
 * Advance the AFS call state when the RxRPC call ends the transmit phase.
 */
static void afs_notify_end_request_tx(struct sock *sock,
				      struct rxrpc_call *rxcall,
				      unsigned long call_user_ID)
{
	struct afs_call *call = (struct afs_call *)call_user_ID;

	if (call->state == AFS_CALL_REQUESTING)
		call->state = AFS_CALL_AWAIT_REPLY;
}

290 291 292
/*
 * attach the data from a bunch of pages on an inode to a call
 */
A
Al Viro 已提交
293
static int afs_send_pages(struct afs_call *call, struct msghdr *msg)
294
{
295 296
	struct bio_vec bv[AFS_BVEC_MAX];
	unsigned int bytes, nr, loop, offset;
297 298 299 300 301 302 303
	pgoff_t first = call->first, last = call->last;
	int ret;

	offset = call->first_offset;
	call->first_offset = 0;

	do {
304 305 306 307 308
		afs_load_bvec(call, msg, bv, first, last, offset);
		offset = 0;
		bytes = msg->msg_iter.count;
		nr = msg->msg_iter.nr_segs;

309
		ret = rxrpc_kernel_send_data(call->net->socket, call->rxcall, msg,
310
					     bytes, afs_notify_end_request_tx);
311 312
		for (loop = 0; loop < nr; loop++)
			put_page(bv[loop].bv_page);
313 314
		if (ret < 0)
			break;
315 316

		first += nr;
D
David Howells 已提交
317
	} while (first <= last);
318 319 320 321

	return ret;
}

322 323 324
/*
 * initiate a call
 */
D
David Howells 已提交
325
long afs_make_call(struct afs_addr_cursor *ac, struct afs_call *call,
326
		   gfp_t gfp, bool async)
327
{
D
David Howells 已提交
328
	struct sockaddr_rxrpc *srx = ac->addr;
329 330 331
	struct rxrpc_call *rxcall;
	struct msghdr msg;
	struct kvec iov[1];
332
	size_t offset;
333
	s64 tx_total_len;
334 335
	int ret;

336
	_enter(",{%pISp},", &srx->transport);
337

D
David Howells 已提交
338 339 340
	ASSERT(call->type != NULL);
	ASSERT(call->type->name != NULL);

341 342
	_debug("____MAKE %p{%s,%x} [%d]____",
	       call, call->type->name, key_serial(call->key),
343
	       atomic_read(&call->net->nr_outstanding_calls));
D
David Howells 已提交
344

D
David Howells 已提交
345
	call->async = async;
346

347 348 349 350 351 352
	/* Work out the length we're going to transmit.  This is awkward for
	 * calls such as FS.StoreData where there's an extra injection of data
	 * after the initial fixed part.
	 */
	tx_total_len = call->request_size;
	if (call->send_pages) {
353 354 355 356 357 358 359 360 361 362 363
		if (call->last == call->first) {
			tx_total_len += call->last_to - call->first_offset;
		} else {
			/* It looks mathematically like you should be able to
			 * combine the following lines with the ones above, but
			 * unsigned arithmetic is fun when it wraps...
			 */
			tx_total_len += PAGE_SIZE - call->first_offset;
			tx_total_len += call->last_to;
			tx_total_len += (call->last - call->first - 1) * PAGE_SIZE;
		}
364 365
	}

366
	/* create a call */
367
	rxcall = rxrpc_kernel_begin_call(call->net->socket, srx, call->key,
368 369
					 (unsigned long)call,
					 tx_total_len, gfp,
D
David Howells 已提交
370 371
					 (async ?
					  afs_wake_up_async_call :
372 373
					  afs_wake_up_call_waiter),
					 call->upgrade);
374 375 376 377 378 379 380 381 382 383 384 385 386
	if (IS_ERR(rxcall)) {
		ret = PTR_ERR(rxcall);
		goto error_kill_call;
	}

	call->rxcall = rxcall;

	/* send the request */
	iov[0].iov_base	= call->request;
	iov[0].iov_len	= call->request_size;

	msg.msg_name		= NULL;
	msg.msg_namelen		= 0;
387
	iov_iter_kvec(&msg.msg_iter, WRITE | ITER_KVEC, iov, 1,
A
Al Viro 已提交
388
		      call->request_size);
389 390
	msg.msg_control		= NULL;
	msg.msg_controllen	= 0;
391
	msg.msg_flags		= MSG_WAITALL | (call->send_pages ? MSG_MORE : 0);
392

393
	ret = rxrpc_kernel_send_data(call->net->socket, rxcall,
394 395
				     &msg, call->request_size,
				     afs_notify_end_request_tx);
396 397 398
	if (ret < 0)
		goto error_do_abort;

399
	if (call->send_pages) {
A
Al Viro 已提交
400
		ret = afs_send_pages(call, &msg);
401 402 403 404
		if (ret < 0)
			goto error_do_abort;
	}

405 406
	/* at this point, an async call may no longer exist as it may have
	 * already completed */
D
David Howells 已提交
407 408 409
	if (call->async)
		return -EINPROGRESS;

410
	return afs_wait_for_call_to_complete(call, ac);
411 412

error_do_abort:
413 414
	call->state = AFS_CALL_COMPLETE;
	if (ret != -ECONNABORTED) {
415 416
		rxrpc_kernel_abort_call(call->net->socket, rxcall,
					RX_USER_ABORT, ret, "KSD");
417 418
	} else {
		offset = 0;
419
		rxrpc_kernel_recv_data(call->net->socket, rxcall, NULL,
420
				       0, &offset, false, &call->abort_code,
421
				       &call->service_id);
422 423
		ac->abort_code = call->abort_code;
		ac->responded = true;
424
	}
425
error_kill_call:
426
	afs_put_call(call);
427
	ac->error = ret;
428 429 430 431 432 433 434 435 436 437 438 439
	_leave(" = %d", ret);
	return ret;
}

/*
 * deliver messages to a call
 */
static void afs_deliver_to_call(struct afs_call *call)
{
	u32 abort_code;
	int ret;

440 441 442 443 444 445 446 447 448
	_enter("%s", call->type->name);

	while (call->state == AFS_CALL_AWAIT_REPLY ||
	       call->state == AFS_CALL_AWAIT_OP_ID ||
	       call->state == AFS_CALL_AWAIT_REQUEST ||
	       call->state == AFS_CALL_AWAIT_ACK
	       ) {
		if (call->state == AFS_CALL_AWAIT_ACK) {
			size_t offset = 0;
449 450
			ret = rxrpc_kernel_recv_data(call->net->socket,
						     call->rxcall,
451
						     NULL, 0, &offset, false,
452 453
						     &call->abort_code,
						     &call->service_id);
D
David Howells 已提交
454 455
			trace_afs_recv_data(call, 0, offset, false, ret);

456 457
			if (ret == -EINPROGRESS || ret == -EAGAIN)
				return;
458
			if (ret == 1 || ret < 0) {
459 460
				call->state = AFS_CALL_COMPLETE;
				goto done;
461
			}
462
			return;
463 464
		}

465 466 467 468 469 470 471 472 473
		ret = call->type->deliver(call);
		switch (ret) {
		case 0:
			if (call->state == AFS_CALL_AWAIT_REPLY)
				call->state = AFS_CALL_COMPLETE;
			goto done;
		case -EINPROGRESS:
		case -EAGAIN:
			goto out;
474
		case -ECONNABORTED:
475
			goto save_error;
476 477
		case -ENOTCONN:
			abort_code = RX_CALL_DEAD;
478
			rxrpc_kernel_abort_call(call->net->socket, call->rxcall,
479
						abort_code, ret, "KNC");
480
			goto save_error;
481
		case -ENOTSUPP:
482
			abort_code = RXGEN_OPCODE;
483
			rxrpc_kernel_abort_call(call->net->socket, call->rxcall,
484
						abort_code, ret, "KIV");
485
			goto save_error;
486 487 488 489 490 491 492
		case -ENODATA:
		case -EBADMSG:
		case -EMSGSIZE:
		default:
			abort_code = RXGEN_CC_UNMARSHAL;
			if (call->state != AFS_CALL_AWAIT_REPLY)
				abort_code = RXGEN_SS_UNMARSHAL;
493
			rxrpc_kernel_abort_call(call->net->socket, call->rxcall,
494
						abort_code, -EBADMSG, "KUM");
495
			goto save_error;
496
		}
497 498
	}

499 500
done:
	if (call->state == AFS_CALL_COMPLETE && call->incoming)
501
		afs_put_call(call);
502
out:
503
	_leave("");
504 505
	return;

506
save_error:
507 508 509
	call->error = ret;
	call->state = AFS_CALL_COMPLETE;
	goto done;
510 511 512 513 514
}

/*
 * wait synchronously for a call to complete
 */
515 516
static long afs_wait_for_call_to_complete(struct afs_call *call,
					  struct afs_addr_cursor *ac)
517
{
518
	signed long rtt2, timeout;
519
	long ret;
520 521
	u64 rtt;
	u32 life, last_life;
522 523 524 525 526

	DECLARE_WAITQUEUE(myself, current);

	_enter("");

527
	rtt = rxrpc_kernel_get_rtt(call->net->socket, call->rxcall);
528 529 530 531 532
	rtt2 = nsecs_to_jiffies64(rtt) * 2;
	if (rtt2 < 2)
		rtt2 = 2;

	timeout = rtt2;
533
	last_life = rxrpc_kernel_check_life(call->net->socket, call->rxcall);
534

535 536
	add_wait_queue(&call->waitq, &myself);
	for (;;) {
537
		set_current_state(TASK_UNINTERRUPTIBLE);
538 539

		/* deliver any messages that are in the queue */
540 541
		if (call->state < AFS_CALL_COMPLETE && call->need_attention) {
			call->need_attention = false;
542 543 544 545 546
			__set_current_state(TASK_RUNNING);
			afs_deliver_to_call(call);
			continue;
		}

547
		if (call->state == AFS_CALL_COMPLETE)
548
			break;
549

550
		life = rxrpc_kernel_check_life(call->net->socket, call->rxcall);
551 552 553 554 555 556 557 558 559 560
		if (timeout == 0 &&
		    life == last_life && signal_pending(current))
				break;

		if (life != last_life) {
			timeout = rtt2;
			last_life = life;
		}

		timeout = schedule_timeout(timeout);
561 562 563 564 565
	}

	remove_wait_queue(&call->waitq, &myself);
	__set_current_state(TASK_RUNNING);

566
	/* Kill off the call if it's still live. */
567
	if (call->state < AFS_CALL_COMPLETE) {
568
		_debug("call interrupted");
569 570 571
		if (rxrpc_kernel_abort_call(call->net->socket, call->rxcall,
					    RX_USER_ABORT, -EINTR, "KWI"))
			call->error = -ERESTARTSYS;
572 573
	}

574 575 576 577 578 579 580 581 582 583 584 585 586 587
	ac->abort_code = call->abort_code;
	ac->error = call->error;

	ret = ac->error;
	switch (ret) {
	case 0:
		if (call->ret_reply0) {
			ret = (long)call->reply[0];
			call->reply[0] = NULL;
		}
		/* Fall through */
	case -ECONNABORTED:
		ac->responded = true;
		break;
588 589
	}

590
	_debug("call complete");
591
	afs_put_call(call);
592
	_leave(" = %p", (void *)ret);
593 594 595 596 597 598
	return ret;
}

/*
 * wake up a waiting call
 */
599 600
static void afs_wake_up_call_waiter(struct sock *sk, struct rxrpc_call *rxcall,
				    unsigned long call_user_ID)
601
{
602 603 604
	struct afs_call *call = (struct afs_call *)call_user_ID;

	call->need_attention = true;
605 606 607 608 609 610
	wake_up(&call->waitq);
}

/*
 * wake up an asynchronous call
 */
611 612
static void afs_wake_up_async_call(struct sock *sk, struct rxrpc_call *rxcall,
				   unsigned long call_user_ID)
613
{
614
	struct afs_call *call = (struct afs_call *)call_user_ID;
615
	int u;
616

D
David Howells 已提交
617
	trace_afs_notify_call(rxcall, call);
618
	call->need_attention = true;
619 620 621 622

	u = __atomic_add_unless(&call->usage, 1, 0);
	if (u != 0) {
		trace_afs_call(call, afs_call_trace_wake, u,
623
			       atomic_read(&call->net->nr_outstanding_calls),
624 625 626 627 628
			       __builtin_return_address(0));

		if (!queue_work(afs_async_calls, &call->async_work))
			afs_put_call(call);
	}
629 630 631
}

/*
632 633
 * Delete an asynchronous call.  The work item carries a ref to the call struct
 * that we need to release.
634
 */
635
static void afs_delete_async_call(struct work_struct *work)
636
{
637 638
	struct afs_call *call = container_of(work, struct afs_call, async_work);

639 640
	_enter("");

641
	afs_put_call(call);
642 643 644 645 646

	_leave("");
}

/*
647 648
 * Perform I/O processing on an asynchronous call.  The work item carries a ref
 * to the call struct that we either need to release or to pass on.
649
 */
650
static void afs_process_async_call(struct work_struct *work)
651
{
652 653
	struct afs_call *call = container_of(work, struct afs_call, async_work);

654 655
	_enter("");

656 657
	if (call->state < AFS_CALL_COMPLETE && call->need_attention) {
		call->need_attention = false;
658
		afs_deliver_to_call(call);
659
	}
660

D
David Howells 已提交
661
	if (call->state == AFS_CALL_COMPLETE) {
662
		call->reply[0] = NULL;
663

664 665 666 667
		/* We have two refs to release - one from the alloc and one
		 * queued with the work item - and we can't just deallocate the
		 * call because the work item may be queued again.
		 */
668
		call->async_work.func = afs_delete_async_call;
669 670
		if (!queue_work(afs_async_calls, &call->async_work))
			afs_put_call(call);
671 672
	}

673
	afs_put_call(call);
674 675 676
	_leave("");
}

677 678 679 680 681 682 683 684 685 686
static void afs_rx_attach(struct rxrpc_call *rxcall, unsigned long user_call_ID)
{
	struct afs_call *call = (struct afs_call *)user_call_ID;

	call->rxcall = rxcall;
}

/*
 * Charge the incoming call preallocation.
 */
687
void afs_charge_preallocation(struct work_struct *work)
688
{
689 690 691
	struct afs_net *net =
		container_of(work, struct afs_net, charge_preallocation_work);
	struct afs_call *call = net->spare_incoming_call;
692 693 694

	for (;;) {
		if (!call) {
695
			call = afs_alloc_call(net, &afs_RXCMxxxx, GFP_KERNEL);
696 697 698
			if (!call)
				break;

D
David Howells 已提交
699
			call->async = true;
700
			call->state = AFS_CALL_AWAIT_OP_ID;
D
David Howells 已提交
701
			init_waitqueue_head(&call->waitq);
702 703
		}

704
		if (rxrpc_kernel_charge_accept(net->socket,
705 706 707 708 709 710 711
					       afs_wake_up_async_call,
					       afs_rx_attach,
					       (unsigned long)call,
					       GFP_KERNEL) < 0)
			break;
		call = NULL;
	}
712
	net->spare_incoming_call = call;
713 714 715 716 717 718 719 720 721 722 723
}

/*
 * Discard a preallocated call when a socket is shut down.
 */
static void afs_rx_discard_new_call(struct rxrpc_call *rxcall,
				    unsigned long user_call_ID)
{
	struct afs_call *call = (struct afs_call *)user_call_ID;

	call->rxcall = NULL;
724
	afs_put_call(call);
725 726
}

727 728 729
/*
 * Notification of an incoming call.
 */
730 731
static void afs_rx_new_call(struct sock *sk, struct rxrpc_call *rxcall,
			    unsigned long user_call_ID)
732
{
733 734 735
	struct afs_net *net = afs_sock2net(sk);

	queue_work(afs_wq, &net->charge_preallocation_work);
736 737
}

738
/*
739 740
 * Grab the operation ID from an incoming cache manager call.  The socket
 * buffer is discarded on error or if we don't yet have sufficient data.
741
 */
742
static int afs_deliver_cm_op_id(struct afs_call *call)
743
{
744
	int ret;
745

746
	_enter("{%zu}", call->offset);
747 748 749 750

	ASSERTCMP(call->offset, <, 4);

	/* the operation ID forms the first four bytes of the request data */
751
	ret = afs_extract_data(call, &call->tmp, 4, true);
752 753
	if (ret < 0)
		return ret;
754

755
	call->operation_ID = ntohl(call->tmp);
756
	call->state = AFS_CALL_AWAIT_REQUEST;
757
	call->offset = 0;
758 759 760 761 762 763

	/* ask the cache manager to route the call (it'll change the call type
	 * if successful) */
	if (!afs_cm_incoming_call(call))
		return -ENOTSUPP;

D
David Howells 已提交
764 765
	trace_afs_cb_call(call);

766 767
	/* pass responsibility for the remainer of this message off to the
	 * cache manager op */
768
	return call->type->deliver(call);
769 770
}

771 772 773 774 775 776 777 778 779 780 781 782 783 784
/*
 * Advance the AFS call state when an RxRPC service call ends the transmit
 * phase.
 */
static void afs_notify_end_reply_tx(struct sock *sock,
				    struct rxrpc_call *rxcall,
				    unsigned long call_user_ID)
{
	struct afs_call *call = (struct afs_call *)call_user_ID;

	if (call->state == AFS_CALL_REPLYING)
		call->state = AFS_CALL_AWAIT_ACK;
}

785 786 787 788 789
/*
 * send an empty reply
 */
void afs_send_empty_reply(struct afs_call *call)
{
790
	struct afs_net *net = call->net;
791 792 793 794
	struct msghdr msg;

	_enter("");

795
	rxrpc_kernel_set_tx_length(net->socket, call->rxcall, 0);
796

797 798
	msg.msg_name		= NULL;
	msg.msg_namelen		= 0;
799
	iov_iter_kvec(&msg.msg_iter, WRITE | ITER_KVEC, NULL, 0, 0);
800 801 802 803
	msg.msg_control		= NULL;
	msg.msg_controllen	= 0;
	msg.msg_flags		= 0;

804
	switch (rxrpc_kernel_send_data(net->socket, call->rxcall, &msg, 0,
805
				       afs_notify_end_reply_tx)) {
806 807 808 809 810 811
	case 0:
		_leave(" [replied]");
		return;

	case -ENOMEM:
		_debug("oom");
812
		rxrpc_kernel_abort_call(net->socket, call->rxcall,
813
					RX_USER_ABORT, -ENOMEM, "KOO");
814 815 816 817 818 819
	default:
		_leave(" [error]");
		return;
	}
}

820 821 822 823 824
/*
 * send a simple reply
 */
void afs_send_simple_reply(struct afs_call *call, const void *buf, size_t len)
{
825
	struct afs_net *net = call->net;
826
	struct msghdr msg;
827
	struct kvec iov[1];
828
	int n;
829 830 831

	_enter("");

832
	rxrpc_kernel_set_tx_length(net->socket, call->rxcall, len);
833

834 835 836 837
	iov[0].iov_base		= (void *) buf;
	iov[0].iov_len		= len;
	msg.msg_name		= NULL;
	msg.msg_namelen		= 0;
838
	iov_iter_kvec(&msg.msg_iter, WRITE | ITER_KVEC, iov, 1, len);
839 840 841 842
	msg.msg_control		= NULL;
	msg.msg_controllen	= 0;
	msg.msg_flags		= 0;

843
	n = rxrpc_kernel_send_data(net->socket, call->rxcall, &msg, len,
844
				   afs_notify_end_reply_tx);
845
	if (n >= 0) {
846
		/* Success */
847 848
		_leave(" [replied]");
		return;
849
	}
850

851
	if (n == -ENOMEM) {
852
		_debug("oom");
853
		rxrpc_kernel_abort_call(net->socket, call->rxcall,
854
					RX_USER_ABORT, -ENOMEM, "KOO");
855
	}
856
	_leave(" [error]");
857 858
}

859
/*
860
 * Extract a piece of data from the received data socket buffers.
861
 */
862 863
int afs_extract_data(struct afs_call *call, void *buf, size_t count,
		     bool want_more)
864
{
865
	struct afs_net *net = call->net;
866
	int ret;
867

868 869
	_enter("{%s,%zu},,%zu,%d",
	       call->type->name, call->offset, count, want_more);
870

871
	ASSERTCMP(call->offset, <=, count);
872

873
	ret = rxrpc_kernel_recv_data(net->socket, call->rxcall,
874
				     buf, count, &call->offset,
875 876
				     want_more, &call->abort_code,
				     &call->service_id);
D
David Howells 已提交
877
	trace_afs_recv_data(call, count, call->offset, want_more, ret);
878 879
	if (ret == 0 || ret == -EAGAIN)
		return ret;
880

881 882 883 884 885 886 887 888 889 890 891 892
	if (ret == 1) {
		switch (call->state) {
		case AFS_CALL_AWAIT_REPLY:
			call->state = AFS_CALL_COMPLETE;
			break;
		case AFS_CALL_AWAIT_REQUEST:
			call->state = AFS_CALL_REPLYING;
			break;
		default:
			break;
		}
		return 0;
893
	}
894

895
	call->error = ret;
896 897
	call->state = AFS_CALL_COMPLETE;
	return ret;
898
}