rxrpc.c 20.9 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11
/* Maintain an RxRPC server socket to do AFS communications through
 *
 * Copyright (C) 2007 Red Hat, Inc. All Rights Reserved.
 * Written by David Howells (dhowells@redhat.com)
 *
 * This program is free software; you can redistribute it and/or
 * modify it under the terms of the GNU General Public License
 * as published by the Free Software Foundation; either version
 * 2 of the License, or (at your option) any later version.
 */

12
#include <linux/slab.h>
13 14
#include <linux/sched/signal.h>

15 16 17 18 19
#include <net/sock.h>
#include <net/af_rxrpc.h>
#include "internal.h"
#include "afs_cm.h"

20
struct workqueue_struct *afs_async_calls;
21

22
static void afs_wake_up_call_waiter(struct sock *, struct rxrpc_call *, unsigned long);
23
static int afs_wait_for_call_to_complete(struct afs_call *);
24 25
static void afs_wake_up_async_call(struct sock *, struct rxrpc_call *, unsigned long);
static void afs_process_async_call(struct work_struct *);
26 27
static void afs_rx_new_call(struct sock *, struct rxrpc_call *, unsigned long);
static void afs_rx_discard_new_call(struct rxrpc_call *, unsigned long);
28
static int afs_deliver_cm_op_id(struct afs_call *);
29 30 31

/* asynchronous incoming call initial processing */
static const struct afs_call_type afs_RXCMxxxx = {
D
David Howells 已提交
32
	.name		= "CB.xxxx",
33 34 35 36 37 38 39 40
	.deliver	= afs_deliver_cm_op_id,
	.abort_to_error	= afs_abort_to_error,
};

/*
 * open an RxRPC socket and bind it to be a server for callback notifications
 * - the socket is left in blocking mode and non-blocking ops use MSG_DONTWAIT
 */
41
int afs_open_socket(struct afs_net *net)
42 43 44 45 46 47 48
{
	struct sockaddr_rxrpc srx;
	struct socket *socket;
	int ret;

	_enter("");

49
	ret = sock_create_kern(&init_net, AF_RXRPC, SOCK_DGRAM, PF_INET, &socket);
50 51
	if (ret < 0)
		goto error_1;
52 53 54 55 56 57 58 59 60 61 62 63 64 65

	socket->sk->sk_allocation = GFP_NOFS;

	/* bind the callback manager's address to make this a server socket */
	srx.srx_family			= AF_RXRPC;
	srx.srx_service			= CM_SERVICE;
	srx.transport_type		= SOCK_DGRAM;
	srx.transport_len		= sizeof(srx.transport.sin);
	srx.transport.sin.sin_family	= AF_INET;
	srx.transport.sin.sin_port	= htons(AFS_CM_PORT);
	memset(&srx.transport.sin.sin_addr, 0,
	       sizeof(srx.transport.sin.sin_addr));

	ret = kernel_bind(socket, (struct sockaddr *) &srx, sizeof(srx));
66 67 68
	if (ret < 0)
		goto error_2;

69 70
	rxrpc_kernel_new_call_notification(socket, afs_rx_new_call,
					   afs_rx_discard_new_call);
71

72 73 74
	ret = kernel_listen(socket, INT_MAX);
	if (ret < 0)
		goto error_2;
75

76 77
	net->socket = socket;
	afs_charge_preallocation(&net->charge_preallocation_work);
78 79
	_leave(" = 0");
	return 0;
80 81 82 83 84 85

error_2:
	sock_release(socket);
error_1:
	_leave(" = %d", ret);
	return ret;
86 87 88 89 90
}

/*
 * close the RxRPC socket AFS was using
 */
91
void afs_close_socket(struct afs_net *net)
92 93 94
{
	_enter("");

95
	kernel_listen(net->socket, 0);
96 97
	flush_workqueue(afs_async_calls);

98 99 100
	if (net->spare_incoming_call) {
		afs_put_call(net->spare_incoming_call);
		net->spare_incoming_call = NULL;
101 102
	}

103 104
	_debug("outstanding %u", atomic_read(&net->nr_outstanding_calls));
	wait_on_atomic_t(&net->nr_outstanding_calls, atomic_t_wait,
105 106 107
			 TASK_UNINTERRUPTIBLE);
	_debug("no outstanding calls");

108
	kernel_sock_shutdown(net->socket, SHUT_RDWR);
109
	flush_workqueue(afs_async_calls);
110
	sock_release(net->socket);
111 112 113 114 115

	_debug("dework");
	_leave("");
}

D
David Howells 已提交
116
/*
117
 * Allocate a call.
D
David Howells 已提交
118
 */
119 120
static struct afs_call *afs_alloc_call(struct afs_net *net,
				       const struct afs_call_type *type,
121
				       gfp_t gfp)
D
David Howells 已提交
122
{
123 124
	struct afs_call *call;
	int o;
D
David Howells 已提交
125

126 127 128
	call = kzalloc(sizeof(*call), gfp);
	if (!call)
		return NULL;
D
David Howells 已提交
129

130
	call->type = type;
131
	call->net = net;
132 133 134
	atomic_set(&call->usage, 1);
	INIT_WORK(&call->async_work, afs_process_async_call);
	init_waitqueue_head(&call->waitq);
135

136
	o = atomic_inc_return(&net->nr_outstanding_calls);
137 138 139
	trace_afs_call(call, afs_call_trace_alloc, 1, o,
		       __builtin_return_address(0));
	return call;
D
David Howells 已提交
140 141
}

142
/*
143
 * Dispose of a reference on a call.
144
 */
145
void afs_put_call(struct afs_call *call)
146
{
147
	struct afs_net *net = call->net;
148
	int n = atomic_dec_return(&call->usage);
149
	int o = atomic_read(&net->nr_outstanding_calls);
150 151 152 153 154 155 156 157 158 159

	trace_afs_call(call, afs_call_trace_put, n + 1, o,
		       __builtin_return_address(0));

	ASSERTCMP(n, >=, 0);
	if (n == 0) {
		ASSERT(!work_pending(&call->async_work));
		ASSERT(call->type->name != NULL);

		if (call->rxcall) {
160
			rxrpc_kernel_end_call(net->socket, call->rxcall);
161 162 163 164 165 166 167 168
			call->rxcall = NULL;
		}
		if (call->type->destructor)
			call->type->destructor(call);

		kfree(call->request);
		kfree(call);

169
		o = atomic_dec_return(&net->nr_outstanding_calls);
170 171 172
		trace_afs_call(call, afs_call_trace_free, 0, o,
			       __builtin_return_address(0));
		if (o == 0)
173
			wake_up_atomic_t(&net->nr_outstanding_calls);
174
	}
175 176 177
}

/*
178
 * Queue the call for actual work.  Returns 0 unconditionally for convenience.
179
 */
180
int afs_queue_call_work(struct afs_call *call)
181
{
182 183 184
	int u = atomic_inc_return(&call->usage);

	trace_afs_call(call, afs_call_trace_work, u,
185
		       atomic_read(&call->net->nr_outstanding_calls),
186 187 188 189 190 191 192
		       __builtin_return_address(0));

	INIT_WORK(&call->work, call->type->work);

	if (!queue_work(afs_wq, &call->work))
		afs_put_call(call);
	return 0;
193 194
}

195 196 197
/*
 * allocate a call with flat request and reply buffers
 */
198 199
struct afs_call *afs_alloc_flat_call(struct afs_net *net,
				     const struct afs_call_type *type,
200
				     size_t request_size, size_t reply_max)
201 202 203
{
	struct afs_call *call;

204
	call = afs_alloc_call(net, type, GFP_NOFS);
205 206 207 208
	if (!call)
		goto nomem_call;

	if (request_size) {
209
		call->request_size = request_size;
210 211
		call->request = kmalloc(request_size, GFP_NOFS);
		if (!call->request)
D
David Howells 已提交
212
			goto nomem_free;
213 214
	}

215
	if (reply_max) {
216
		call->reply_max = reply_max;
217
		call->buffer = kmalloc(reply_max, GFP_NOFS);
218
		if (!call->buffer)
D
David Howells 已提交
219
			goto nomem_free;
220 221 222 223 224
	}

	init_waitqueue_head(&call->waitq);
	return call;

D
David Howells 已提交
225
nomem_free:
226
	afs_put_call(call);
227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243
nomem_call:
	return NULL;
}

/*
 * clean up a call with flat buffer
 */
void afs_flat_call_destructor(struct afs_call *call)
{
	_enter("");

	kfree(call->request);
	call->request = NULL;
	kfree(call->buffer);
	call->buffer = NULL;
}

244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276
#define AFS_BVEC_MAX 8

/*
 * Load the given bvec with the next few pages.
 */
static void afs_load_bvec(struct afs_call *call, struct msghdr *msg,
			  struct bio_vec *bv, pgoff_t first, pgoff_t last,
			  unsigned offset)
{
	struct page *pages[AFS_BVEC_MAX];
	unsigned int nr, n, i, to, bytes = 0;

	nr = min_t(pgoff_t, last - first + 1, AFS_BVEC_MAX);
	n = find_get_pages_contig(call->mapping, first, nr, pages);
	ASSERTCMP(n, ==, nr);

	msg->msg_flags |= MSG_MORE;
	for (i = 0; i < nr; i++) {
		to = PAGE_SIZE;
		if (first + i >= last) {
			to = call->last_to;
			msg->msg_flags &= ~MSG_MORE;
		}
		bv[i].bv_page = pages[i];
		bv[i].bv_len = to - offset;
		bv[i].bv_offset = offset;
		bytes += to - offset;
		offset = 0;
	}

	iov_iter_bvec(&msg->msg_iter, WRITE | ITER_BVEC, bv, nr, bytes);
}

277 278 279 280 281 282 283 284 285 286 287 288 289
/*
 * Advance the AFS call state when the RxRPC call ends the transmit phase.
 */
static void afs_notify_end_request_tx(struct sock *sock,
				      struct rxrpc_call *rxcall,
				      unsigned long call_user_ID)
{
	struct afs_call *call = (struct afs_call *)call_user_ID;

	if (call->state == AFS_CALL_REQUESTING)
		call->state = AFS_CALL_AWAIT_REPLY;
}

290 291 292
/*
 * attach the data from a bunch of pages on an inode to a call
 */
A
Al Viro 已提交
293
static int afs_send_pages(struct afs_call *call, struct msghdr *msg)
294
{
295 296
	struct bio_vec bv[AFS_BVEC_MAX];
	unsigned int bytes, nr, loop, offset;
297 298 299 300 301 302 303
	pgoff_t first = call->first, last = call->last;
	int ret;

	offset = call->first_offset;
	call->first_offset = 0;

	do {
304 305 306 307 308
		afs_load_bvec(call, msg, bv, first, last, offset);
		offset = 0;
		bytes = msg->msg_iter.count;
		nr = msg->msg_iter.nr_segs;

309
		ret = rxrpc_kernel_send_data(call->net->socket, call->rxcall, msg,
310
					     bytes, afs_notify_end_request_tx);
311 312
		for (loop = 0; loop < nr; loop++)
			put_page(bv[loop].bv_page);
313 314
		if (ret < 0)
			break;
315 316

		first += nr;
D
David Howells 已提交
317
	} while (first <= last);
318 319 320 321

	return ret;
}

322 323 324
/*
 * initiate a call
 */
325 326
int afs_make_call(struct sockaddr_rxrpc *srx, struct afs_call *call,
		  gfp_t gfp, bool async)
327 328 329 330
{
	struct rxrpc_call *rxcall;
	struct msghdr msg;
	struct kvec iov[1];
331
	size_t offset;
332
	s64 tx_total_len;
333
	u32 abort_code;
334 335
	int ret;

336
	_enter(",{%pISp},", &srx->transport);
337

D
David Howells 已提交
338 339 340
	ASSERT(call->type != NULL);
	ASSERT(call->type->name != NULL);

341 342
	_debug("____MAKE %p{%s,%x} [%d]____",
	       call, call->type->name, key_serial(call->key),
343
	       atomic_read(&call->net->nr_outstanding_calls));
D
David Howells 已提交
344

D
David Howells 已提交
345
	call->async = async;
346

347 348 349 350 351 352 353 354 355 356
	/* Work out the length we're going to transmit.  This is awkward for
	 * calls such as FS.StoreData where there's an extra injection of data
	 * after the initial fixed part.
	 */
	tx_total_len = call->request_size;
	if (call->send_pages) {
		tx_total_len += call->last_to - call->first_offset;
		tx_total_len += (call->last - call->first) * PAGE_SIZE;
	}

357
	/* create a call */
358
	rxcall = rxrpc_kernel_begin_call(call->net->socket, srx, call->key,
359 360
					 (unsigned long)call,
					 tx_total_len, gfp,
D
David Howells 已提交
361 362
					 (async ?
					  afs_wake_up_async_call :
363 364
					  afs_wake_up_call_waiter),
					 call->upgrade);
D
David Howells 已提交
365
	call->key = NULL;
366 367 368 369 370 371 372 373 374 375 376 377 378
	if (IS_ERR(rxcall)) {
		ret = PTR_ERR(rxcall);
		goto error_kill_call;
	}

	call->rxcall = rxcall;

	/* send the request */
	iov[0].iov_base	= call->request;
	iov[0].iov_len	= call->request_size;

	msg.msg_name		= NULL;
	msg.msg_namelen		= 0;
379
	iov_iter_kvec(&msg.msg_iter, WRITE | ITER_KVEC, iov, 1,
A
Al Viro 已提交
380
		      call->request_size);
381 382
	msg.msg_control		= NULL;
	msg.msg_controllen	= 0;
383
	msg.msg_flags		= MSG_WAITALL | (call->send_pages ? MSG_MORE : 0);
384

385 386 387 388 389
	/* We have to change the state *before* sending the last packet as
	 * rxrpc might give us the reply before it returns from sending the
	 * request.  Further, if the send fails, we may already have been given
	 * a notification and may have collected it.
	 */
390 391
	if (!call->send_pages)
		call->state = AFS_CALL_AWAIT_REPLY;
392
	ret = rxrpc_kernel_send_data(call->net->socket, rxcall,
393 394
				     &msg, call->request_size,
				     afs_notify_end_request_tx);
395 396 397
	if (ret < 0)
		goto error_do_abort;

398
	if (call->send_pages) {
A
Al Viro 已提交
399
		ret = afs_send_pages(call, &msg);
400 401 402 403
		if (ret < 0)
			goto error_do_abort;
	}

404 405
	/* at this point, an async call may no longer exist as it may have
	 * already completed */
D
David Howells 已提交
406 407 408 409
	if (call->async)
		return -EINPROGRESS;

	return afs_wait_for_call_to_complete(call);
410 411

error_do_abort:
412 413
	call->state = AFS_CALL_COMPLETE;
	if (ret != -ECONNABORTED) {
414 415
		rxrpc_kernel_abort_call(call->net->socket, rxcall,
					RX_USER_ABORT, ret, "KSD");
416 417 418
	} else {
		abort_code = 0;
		offset = 0;
419 420 421
		rxrpc_kernel_recv_data(call->net->socket, rxcall, NULL,
				       0, &offset, false, &abort_code,
				       &call->service_id);
422 423
		ret = call->type->abort_to_error(abort_code);
	}
424
error_kill_call:
425
	afs_put_call(call);
426 427 428 429 430 431 432 433 434 435 436 437
	_leave(" = %d", ret);
	return ret;
}

/*
 * deliver messages to a call
 */
static void afs_deliver_to_call(struct afs_call *call)
{
	u32 abort_code;
	int ret;

438 439 440 441 442 443 444 445 446
	_enter("%s", call->type->name);

	while (call->state == AFS_CALL_AWAIT_REPLY ||
	       call->state == AFS_CALL_AWAIT_OP_ID ||
	       call->state == AFS_CALL_AWAIT_REQUEST ||
	       call->state == AFS_CALL_AWAIT_ACK
	       ) {
		if (call->state == AFS_CALL_AWAIT_ACK) {
			size_t offset = 0;
447 448
			ret = rxrpc_kernel_recv_data(call->net->socket,
						     call->rxcall,
449
						     NULL, 0, &offset, false,
450 451
						     &call->abort_code,
						     &call->service_id);
D
David Howells 已提交
452 453
			trace_afs_recv_data(call, 0, offset, false, ret);

454 455
			if (ret == -EINPROGRESS || ret == -EAGAIN)
				return;
456
			if (ret == 1 || ret < 0) {
457 458
				call->state = AFS_CALL_COMPLETE;
				goto done;
459
			}
460
			return;
461 462
		}

463 464 465 466 467 468 469 470 471
		ret = call->type->deliver(call);
		switch (ret) {
		case 0:
			if (call->state == AFS_CALL_AWAIT_REPLY)
				call->state = AFS_CALL_COMPLETE;
			goto done;
		case -EINPROGRESS:
		case -EAGAIN:
			goto out;
472 473
		case -ECONNABORTED:
			goto call_complete;
474 475
		case -ENOTCONN:
			abort_code = RX_CALL_DEAD;
476
			rxrpc_kernel_abort_call(call->net->socket, call->rxcall,
477
						abort_code, ret, "KNC");
478
			goto save_error;
479
		case -ENOTSUPP:
480
			abort_code = RXGEN_OPCODE;
481
			rxrpc_kernel_abort_call(call->net->socket, call->rxcall,
482
						abort_code, ret, "KIV");
483
			goto save_error;
484 485 486 487 488 489 490
		case -ENODATA:
		case -EBADMSG:
		case -EMSGSIZE:
		default:
			abort_code = RXGEN_CC_UNMARSHAL;
			if (call->state != AFS_CALL_AWAIT_REPLY)
				abort_code = RXGEN_SS_UNMARSHAL;
491
			rxrpc_kernel_abort_call(call->net->socket, call->rxcall,
492
						abort_code, -EBADMSG, "KUM");
493
			goto save_error;
494
		}
495 496
	}

497 498
done:
	if (call->state == AFS_CALL_COMPLETE && call->incoming)
499
		afs_put_call(call);
500
out:
501
	_leave("");
502 503
	return;

504
save_error:
505
	call->error = ret;
506
call_complete:
507 508
	call->state = AFS_CALL_COMPLETE;
	goto done;
509 510 511 512 513 514 515
}

/*
 * wait synchronously for a call to complete
 */
static int afs_wait_for_call_to_complete(struct afs_call *call)
{
516
	signed long rtt2, timeout;
517
	int ret;
518 519
	u64 rtt;
	u32 life, last_life;
520 521 522 523 524

	DECLARE_WAITQUEUE(myself, current);

	_enter("");

525
	rtt = rxrpc_kernel_get_rtt(call->net->socket, call->rxcall);
526 527 528 529 530
	rtt2 = nsecs_to_jiffies64(rtt) * 2;
	if (rtt2 < 2)
		rtt2 = 2;

	timeout = rtt2;
531
	last_life = rxrpc_kernel_check_life(call->net->socket, call->rxcall);
532

533 534
	add_wait_queue(&call->waitq, &myself);
	for (;;) {
535
		set_current_state(TASK_UNINTERRUPTIBLE);
536 537

		/* deliver any messages that are in the queue */
538 539
		if (call->state < AFS_CALL_COMPLETE && call->need_attention) {
			call->need_attention = false;
540 541 542 543 544
			__set_current_state(TASK_RUNNING);
			afs_deliver_to_call(call);
			continue;
		}

545
		if (call->state == AFS_CALL_COMPLETE)
546
			break;
547

548
		life = rxrpc_kernel_check_life(call->net->socket, call->rxcall);
549 550 551 552 553 554 555 556 557 558
		if (timeout == 0 &&
		    life == last_life && signal_pending(current))
				break;

		if (life != last_life) {
			timeout = rtt2;
			last_life = life;
		}

		timeout = schedule_timeout(timeout);
559 560 561 562 563
	}

	remove_wait_queue(&call->waitq, &myself);
	__set_current_state(TASK_RUNNING);

564
	/* Kill off the call if it's still live. */
565
	if (call->state < AFS_CALL_COMPLETE) {
566
		_debug("call interrupted");
567
		rxrpc_kernel_abort_call(call->net->socket, call->rxcall,
568
					RX_USER_ABORT, -EINTR, "KWI");
569 570
	}

571
	ret = call->error;
572
	_debug("call complete");
573
	afs_put_call(call);
574 575 576 577 578 579 580
	_leave(" = %d", ret);
	return ret;
}

/*
 * wake up a waiting call
 */
581 582
static void afs_wake_up_call_waiter(struct sock *sk, struct rxrpc_call *rxcall,
				    unsigned long call_user_ID)
583
{
584 585 586
	struct afs_call *call = (struct afs_call *)call_user_ID;

	call->need_attention = true;
587 588 589 590 591 592
	wake_up(&call->waitq);
}

/*
 * wake up an asynchronous call
 */
593 594
static void afs_wake_up_async_call(struct sock *sk, struct rxrpc_call *rxcall,
				   unsigned long call_user_ID)
595
{
596
	struct afs_call *call = (struct afs_call *)call_user_ID;
597
	int u;
598

D
David Howells 已提交
599
	trace_afs_notify_call(rxcall, call);
600
	call->need_attention = true;
601 602 603 604

	u = __atomic_add_unless(&call->usage, 1, 0);
	if (u != 0) {
		trace_afs_call(call, afs_call_trace_wake, u,
605
			       atomic_read(&call->net->nr_outstanding_calls),
606 607 608 609 610
			       __builtin_return_address(0));

		if (!queue_work(afs_async_calls, &call->async_work))
			afs_put_call(call);
	}
611 612 613
}

/*
614 615
 * Delete an asynchronous call.  The work item carries a ref to the call struct
 * that we need to release.
616
 */
617
static void afs_delete_async_call(struct work_struct *work)
618
{
619 620
	struct afs_call *call = container_of(work, struct afs_call, async_work);

621 622
	_enter("");

623
	afs_put_call(call);
624 625 626 627 628

	_leave("");
}

/*
629 630
 * Perform I/O processing on an asynchronous call.  The work item carries a ref
 * to the call struct that we either need to release or to pass on.
631
 */
632
static void afs_process_async_call(struct work_struct *work)
633
{
634 635
	struct afs_call *call = container_of(work, struct afs_call, async_work);

636 637
	_enter("");

638 639
	if (call->state < AFS_CALL_COMPLETE && call->need_attention) {
		call->need_attention = false;
640
		afs_deliver_to_call(call);
641
	}
642

D
David Howells 已提交
643
	if (call->state == AFS_CALL_COMPLETE) {
644 645
		call->reply = NULL;

646 647 648 649
		/* We have two refs to release - one from the alloc and one
		 * queued with the work item - and we can't just deallocate the
		 * call because the work item may be queued again.
		 */
650
		call->async_work.func = afs_delete_async_call;
651 652
		if (!queue_work(afs_async_calls, &call->async_work))
			afs_put_call(call);
653 654
	}

655
	afs_put_call(call);
656 657 658
	_leave("");
}

659 660 661 662 663 664 665 666 667 668
static void afs_rx_attach(struct rxrpc_call *rxcall, unsigned long user_call_ID)
{
	struct afs_call *call = (struct afs_call *)user_call_ID;

	call->rxcall = rxcall;
}

/*
 * Charge the incoming call preallocation.
 */
669
void afs_charge_preallocation(struct work_struct *work)
670
{
671 672 673
	struct afs_net *net =
		container_of(work, struct afs_net, charge_preallocation_work);
	struct afs_call *call = net->spare_incoming_call;
674 675 676

	for (;;) {
		if (!call) {
677
			call = afs_alloc_call(net, &afs_RXCMxxxx, GFP_KERNEL);
678 679 680
			if (!call)
				break;

D
David Howells 已提交
681
			call->async = true;
682
			call->state = AFS_CALL_AWAIT_OP_ID;
D
David Howells 已提交
683
			init_waitqueue_head(&call->waitq);
684 685
		}

686
		if (rxrpc_kernel_charge_accept(net->socket,
687 688 689 690 691 692 693
					       afs_wake_up_async_call,
					       afs_rx_attach,
					       (unsigned long)call,
					       GFP_KERNEL) < 0)
			break;
		call = NULL;
	}
694
	net->spare_incoming_call = call;
695 696 697 698 699 700 701 702 703 704 705
}

/*
 * Discard a preallocated call when a socket is shut down.
 */
static void afs_rx_discard_new_call(struct rxrpc_call *rxcall,
				    unsigned long user_call_ID)
{
	struct afs_call *call = (struct afs_call *)user_call_ID;

	call->rxcall = NULL;
706
	afs_put_call(call);
707 708
}

709 710 711
/*
 * Notification of an incoming call.
 */
712 713
static void afs_rx_new_call(struct sock *sk, struct rxrpc_call *rxcall,
			    unsigned long user_call_ID)
714
{
715 716 717
	struct afs_net *net = afs_sock2net(sk);

	queue_work(afs_wq, &net->charge_preallocation_work);
718 719
}

720
/*
721 722
 * Grab the operation ID from an incoming cache manager call.  The socket
 * buffer is discarded on error or if we don't yet have sufficient data.
723
 */
724
static int afs_deliver_cm_op_id(struct afs_call *call)
725
{
726
	int ret;
727

728
	_enter("{%zu}", call->offset);
729 730 731 732

	ASSERTCMP(call->offset, <, 4);

	/* the operation ID forms the first four bytes of the request data */
733
	ret = afs_extract_data(call, &call->tmp, 4, true);
734 735
	if (ret < 0)
		return ret;
736

737
	call->operation_ID = ntohl(call->tmp);
738
	call->state = AFS_CALL_AWAIT_REQUEST;
739
	call->offset = 0;
740 741 742 743 744 745

	/* ask the cache manager to route the call (it'll change the call type
	 * if successful) */
	if (!afs_cm_incoming_call(call))
		return -ENOTSUPP;

D
David Howells 已提交
746 747
	trace_afs_cb_call(call);

748 749
	/* pass responsibility for the remainer of this message off to the
	 * cache manager op */
750
	return call->type->deliver(call);
751 752
}

753 754 755 756 757 758 759 760 761 762 763 764 765 766
/*
 * Advance the AFS call state when an RxRPC service call ends the transmit
 * phase.
 */
static void afs_notify_end_reply_tx(struct sock *sock,
				    struct rxrpc_call *rxcall,
				    unsigned long call_user_ID)
{
	struct afs_call *call = (struct afs_call *)call_user_ID;

	if (call->state == AFS_CALL_REPLYING)
		call->state = AFS_CALL_AWAIT_ACK;
}

767 768 769 770 771
/*
 * send an empty reply
 */
void afs_send_empty_reply(struct afs_call *call)
{
772
	struct afs_net *net = call->net;
773 774 775 776
	struct msghdr msg;

	_enter("");

777
	rxrpc_kernel_set_tx_length(net->socket, call->rxcall, 0);
778

779 780
	msg.msg_name		= NULL;
	msg.msg_namelen		= 0;
781
	iov_iter_kvec(&msg.msg_iter, WRITE | ITER_KVEC, NULL, 0, 0);
782 783 784 785 786
	msg.msg_control		= NULL;
	msg.msg_controllen	= 0;
	msg.msg_flags		= 0;

	call->state = AFS_CALL_AWAIT_ACK;
787
	switch (rxrpc_kernel_send_data(net->socket, call->rxcall, &msg, 0,
788
				       afs_notify_end_reply_tx)) {
789 790 791 792 793 794
	case 0:
		_leave(" [replied]");
		return;

	case -ENOMEM:
		_debug("oom");
795
		rxrpc_kernel_abort_call(net->socket, call->rxcall,
796
					RX_USER_ABORT, -ENOMEM, "KOO");
797 798 799 800 801 802
	default:
		_leave(" [error]");
		return;
	}
}

803 804 805 806 807
/*
 * send a simple reply
 */
void afs_send_simple_reply(struct afs_call *call, const void *buf, size_t len)
{
808
	struct afs_net *net = call->net;
809
	struct msghdr msg;
810
	struct kvec iov[1];
811
	int n;
812 813 814

	_enter("");

815
	rxrpc_kernel_set_tx_length(net->socket, call->rxcall, len);
816

817 818 819 820
	iov[0].iov_base		= (void *) buf;
	iov[0].iov_len		= len;
	msg.msg_name		= NULL;
	msg.msg_namelen		= 0;
821
	iov_iter_kvec(&msg.msg_iter, WRITE | ITER_KVEC, iov, 1, len);
822 823 824 825 826
	msg.msg_control		= NULL;
	msg.msg_controllen	= 0;
	msg.msg_flags		= 0;

	call->state = AFS_CALL_AWAIT_ACK;
827
	n = rxrpc_kernel_send_data(net->socket, call->rxcall, &msg, len,
828
				   afs_notify_end_reply_tx);
829
	if (n >= 0) {
830
		/* Success */
831 832
		_leave(" [replied]");
		return;
833
	}
834

835
	if (n == -ENOMEM) {
836
		_debug("oom");
837
		rxrpc_kernel_abort_call(net->socket, call->rxcall,
838
					RX_USER_ABORT, -ENOMEM, "KOO");
839
	}
840
	_leave(" [error]");
841 842
}

843
/*
844
 * Extract a piece of data from the received data socket buffers.
845
 */
846 847
int afs_extract_data(struct afs_call *call, void *buf, size_t count,
		     bool want_more)
848
{
849
	struct afs_net *net = call->net;
850
	int ret;
851

852 853
	_enter("{%s,%zu},,%zu,%d",
	       call->type->name, call->offset, count, want_more);
854

855
	ASSERTCMP(call->offset, <=, count);
856

857
	ret = rxrpc_kernel_recv_data(net->socket, call->rxcall,
858
				     buf, count, &call->offset,
859 860
				     want_more, &call->abort_code,
				     &call->service_id);
D
David Howells 已提交
861
	trace_afs_recv_data(call, count, call->offset, want_more, ret);
862 863
	if (ret == 0 || ret == -EAGAIN)
		return ret;
864

865 866 867 868 869 870 871 872 873 874 875 876
	if (ret == 1) {
		switch (call->state) {
		case AFS_CALL_AWAIT_REPLY:
			call->state = AFS_CALL_COMPLETE;
			break;
		case AFS_CALL_AWAIT_REQUEST:
			call->state = AFS_CALL_REPLYING;
			break;
		default:
			break;
		}
		return 0;
877
	}
878 879 880 881 882 883 884

	if (ret == -ECONNABORTED)
		call->error = call->type->abort_to_error(call->abort_code);
	else
		call->error = ret;
	call->state = AFS_CALL_COMPLETE;
	return ret;
885
}