rxrpc.c 22.0 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11
/* Maintain an RxRPC server socket to do AFS communications through
 *
 * Copyright (C) 2007 Red Hat, Inc. All Rights Reserved.
 * Written by David Howells (dhowells@redhat.com)
 *
 * This program is free software; you can redistribute it and/or
 * modify it under the terms of the GNU General Public License
 * as published by the Free Software Foundation; either version
 * 2 of the License, or (at your option) any later version.
 */

12
#include <linux/slab.h>
13 14
#include <linux/sched/signal.h>

15 16 17 18 19
#include <net/sock.h>
#include <net/af_rxrpc.h>
#include "internal.h"
#include "afs_cm.h"

20
struct workqueue_struct *afs_async_calls;
21

22
static void afs_wake_up_call_waiter(struct sock *, struct rxrpc_call *, unsigned long);
23
static long afs_wait_for_call_to_complete(struct afs_call *, struct afs_addr_cursor *);
24 25
static void afs_wake_up_async_call(struct sock *, struct rxrpc_call *, unsigned long);
static void afs_process_async_call(struct work_struct *);
26 27
static void afs_rx_new_call(struct sock *, struct rxrpc_call *, unsigned long);
static void afs_rx_discard_new_call(struct rxrpc_call *, unsigned long);
28
static int afs_deliver_cm_op_id(struct afs_call *);
29 30 31

/* asynchronous incoming call initial processing */
static const struct afs_call_type afs_RXCMxxxx = {
D
David Howells 已提交
32
	.name		= "CB.xxxx",
33 34 35 36 37 38 39
	.deliver	= afs_deliver_cm_op_id,
};

/*
 * open an RxRPC socket and bind it to be a server for callback notifications
 * - the socket is left in blocking mode and non-blocking ops use MSG_DONTWAIT
 */
40
int afs_open_socket(struct afs_net *net)
41 42 43 44 45 46 47
{
	struct sockaddr_rxrpc srx;
	struct socket *socket;
	int ret;

	_enter("");

48
	ret = sock_create_kern(&init_net, AF_RXRPC, SOCK_DGRAM, PF_INET6, &socket);
49 50
	if (ret < 0)
		goto error_1;
51 52 53 54

	socket->sk->sk_allocation = GFP_NOFS;

	/* bind the callback manager's address to make this a server socket */
55
	memset(&srx, 0, sizeof(srx));
56 57 58
	srx.srx_family			= AF_RXRPC;
	srx.srx_service			= CM_SERVICE;
	srx.transport_type		= SOCK_DGRAM;
59 60 61
	srx.transport_len		= sizeof(srx.transport.sin6);
	srx.transport.sin6.sin6_family	= AF_INET6;
	srx.transport.sin6.sin6_port	= htons(AFS_CM_PORT);
62 63

	ret = kernel_bind(socket, (struct sockaddr *) &srx, sizeof(srx));
64 65 66 67
	if (ret == -EADDRINUSE) {
		srx.transport.sin6.sin6_port = 0;
		ret = kernel_bind(socket, (struct sockaddr *) &srx, sizeof(srx));
	}
68 69 70
	if (ret < 0)
		goto error_2;

71 72
	rxrpc_kernel_new_call_notification(socket, afs_rx_new_call,
					   afs_rx_discard_new_call);
73

74 75 76
	ret = kernel_listen(socket, INT_MAX);
	if (ret < 0)
		goto error_2;
77

78 79
	net->socket = socket;
	afs_charge_preallocation(&net->charge_preallocation_work);
80 81
	_leave(" = 0");
	return 0;
82 83 84 85 86 87

error_2:
	sock_release(socket);
error_1:
	_leave(" = %d", ret);
	return ret;
88 89 90 91 92
}

/*
 * close the RxRPC socket AFS was using
 */
93
void afs_close_socket(struct afs_net *net)
94 95 96
{
	_enter("");

97
	kernel_listen(net->socket, 0);
98 99
	flush_workqueue(afs_async_calls);

100 101 102
	if (net->spare_incoming_call) {
		afs_put_call(net->spare_incoming_call);
		net->spare_incoming_call = NULL;
103 104
	}

105 106
	_debug("outstanding %u", atomic_read(&net->nr_outstanding_calls));
	wait_on_atomic_t(&net->nr_outstanding_calls, atomic_t_wait,
107 108 109
			 TASK_UNINTERRUPTIBLE);
	_debug("no outstanding calls");

110
	kernel_sock_shutdown(net->socket, SHUT_RDWR);
111
	flush_workqueue(afs_async_calls);
112
	sock_release(net->socket);
113 114 115 116 117

	_debug("dework");
	_leave("");
}

D
David Howells 已提交
118
/*
119
 * Allocate a call.
D
David Howells 已提交
120
 */
121 122
static struct afs_call *afs_alloc_call(struct afs_net *net,
				       const struct afs_call_type *type,
123
				       gfp_t gfp)
D
David Howells 已提交
124
{
125 126
	struct afs_call *call;
	int o;
D
David Howells 已提交
127

128 129 130
	call = kzalloc(sizeof(*call), gfp);
	if (!call)
		return NULL;
D
David Howells 已提交
131

132
	call->type = type;
133
	call->net = net;
134 135 136
	atomic_set(&call->usage, 1);
	INIT_WORK(&call->async_work, afs_process_async_call);
	init_waitqueue_head(&call->waitq);
137
	spin_lock_init(&call->state_lock);
138

139
	o = atomic_inc_return(&net->nr_outstanding_calls);
140 141 142
	trace_afs_call(call, afs_call_trace_alloc, 1, o,
		       __builtin_return_address(0));
	return call;
D
David Howells 已提交
143 144
}

145
/*
146
 * Dispose of a reference on a call.
147
 */
148
void afs_put_call(struct afs_call *call)
149
{
150
	struct afs_net *net = call->net;
151
	int n = atomic_dec_return(&call->usage);
152
	int o = atomic_read(&net->nr_outstanding_calls);
153 154 155 156 157 158 159 160 161 162

	trace_afs_call(call, afs_call_trace_put, n + 1, o,
		       __builtin_return_address(0));

	ASSERTCMP(n, >=, 0);
	if (n == 0) {
		ASSERT(!work_pending(&call->async_work));
		ASSERT(call->type->name != NULL);

		if (call->rxcall) {
163
			rxrpc_kernel_end_call(net->socket, call->rxcall);
164 165 166 167 168
			call->rxcall = NULL;
		}
		if (call->type->destructor)
			call->type->destructor(call);

169
		afs_put_server(call->net, call->cm_server);
170
		afs_put_cb_interest(call->net, call->cbi);
171 172 173
		kfree(call->request);
		kfree(call);

174
		o = atomic_dec_return(&net->nr_outstanding_calls);
175 176 177
		trace_afs_call(call, afs_call_trace_free, 0, o,
			       __builtin_return_address(0));
		if (o == 0)
178
			wake_up_atomic_t(&net->nr_outstanding_calls);
179
	}
180 181 182
}

/*
183
 * Queue the call for actual work.  Returns 0 unconditionally for convenience.
184
 */
185
int afs_queue_call_work(struct afs_call *call)
186
{
187 188 189
	int u = atomic_inc_return(&call->usage);

	trace_afs_call(call, afs_call_trace_work, u,
190
		       atomic_read(&call->net->nr_outstanding_calls),
191 192 193 194 195 196 197
		       __builtin_return_address(0));

	INIT_WORK(&call->work, call->type->work);

	if (!queue_work(afs_wq, &call->work))
		afs_put_call(call);
	return 0;
198 199
}

200 201 202
/*
 * allocate a call with flat request and reply buffers
 */
203 204
struct afs_call *afs_alloc_flat_call(struct afs_net *net,
				     const struct afs_call_type *type,
205
				     size_t request_size, size_t reply_max)
206 207 208
{
	struct afs_call *call;

209
	call = afs_alloc_call(net, type, GFP_NOFS);
210 211 212 213
	if (!call)
		goto nomem_call;

	if (request_size) {
214
		call->request_size = request_size;
215 216
		call->request = kmalloc(request_size, GFP_NOFS);
		if (!call->request)
D
David Howells 已提交
217
			goto nomem_free;
218 219
	}

220
	if (reply_max) {
221
		call->reply_max = reply_max;
222
		call->buffer = kmalloc(reply_max, GFP_NOFS);
223
		if (!call->buffer)
D
David Howells 已提交
224
			goto nomem_free;
225 226
	}

227
	call->operation_ID = type->op;
228 229 230
	init_waitqueue_head(&call->waitq);
	return call;

D
David Howells 已提交
231
nomem_free:
232
	afs_put_call(call);
233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249
nomem_call:
	return NULL;
}

/*
 * clean up a call with flat buffer
 */
void afs_flat_call_destructor(struct afs_call *call)
{
	_enter("");

	kfree(call->request);
	call->request = NULL;
	kfree(call->buffer);
	call->buffer = NULL;
}

250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282
#define AFS_BVEC_MAX 8

/*
 * Load the given bvec with the next few pages.
 */
static void afs_load_bvec(struct afs_call *call, struct msghdr *msg,
			  struct bio_vec *bv, pgoff_t first, pgoff_t last,
			  unsigned offset)
{
	struct page *pages[AFS_BVEC_MAX];
	unsigned int nr, n, i, to, bytes = 0;

	nr = min_t(pgoff_t, last - first + 1, AFS_BVEC_MAX);
	n = find_get_pages_contig(call->mapping, first, nr, pages);
	ASSERTCMP(n, ==, nr);

	msg->msg_flags |= MSG_MORE;
	for (i = 0; i < nr; i++) {
		to = PAGE_SIZE;
		if (first + i >= last) {
			to = call->last_to;
			msg->msg_flags &= ~MSG_MORE;
		}
		bv[i].bv_page = pages[i];
		bv[i].bv_len = to - offset;
		bv[i].bv_offset = offset;
		bytes += to - offset;
		offset = 0;
	}

	iov_iter_bvec(&msg->msg_iter, WRITE | ITER_BVEC, bv, nr, bytes);
}

283 284 285 286 287 288 289 290 291
/*
 * Advance the AFS call state when the RxRPC call ends the transmit phase.
 */
static void afs_notify_end_request_tx(struct sock *sock,
				      struct rxrpc_call *rxcall,
				      unsigned long call_user_ID)
{
	struct afs_call *call = (struct afs_call *)call_user_ID;

292
	afs_set_call_state(call, AFS_CALL_CL_REQUESTING, AFS_CALL_CL_AWAIT_REPLY);
293 294
}

295 296 297
/*
 * attach the data from a bunch of pages on an inode to a call
 */
A
Al Viro 已提交
298
static int afs_send_pages(struct afs_call *call, struct msghdr *msg)
299
{
300 301
	struct bio_vec bv[AFS_BVEC_MAX];
	unsigned int bytes, nr, loop, offset;
302 303 304 305 306 307 308
	pgoff_t first = call->first, last = call->last;
	int ret;

	offset = call->first_offset;
	call->first_offset = 0;

	do {
309
		afs_load_bvec(call, msg, bv, first, last, offset);
D
David Howells 已提交
310 311
		trace_afs_send_pages(call, msg, first, last, offset);

312 313 314 315
		offset = 0;
		bytes = msg->msg_iter.count;
		nr = msg->msg_iter.nr_segs;

316
		ret = rxrpc_kernel_send_data(call->net->socket, call->rxcall, msg,
317
					     bytes, afs_notify_end_request_tx);
318 319
		for (loop = 0; loop < nr; loop++)
			put_page(bv[loop].bv_page);
320 321
		if (ret < 0)
			break;
322 323

		first += nr;
D
David Howells 已提交
324
	} while (first <= last);
325

D
David Howells 已提交
326
	trace_afs_sent_pages(call, call->first, last, first, ret);
327 328 329
	return ret;
}

330 331 332
/*
 * initiate a call
 */
D
David Howells 已提交
333
long afs_make_call(struct afs_addr_cursor *ac, struct afs_call *call,
334
		   gfp_t gfp, bool async)
335
{
D
David Howells 已提交
336
	struct sockaddr_rxrpc *srx = ac->addr;
337 338 339
	struct rxrpc_call *rxcall;
	struct msghdr msg;
	struct kvec iov[1];
340
	size_t offset;
341
	s64 tx_total_len;
342 343
	int ret;

344
	_enter(",{%pISp},", &srx->transport);
345

D
David Howells 已提交
346 347 348
	ASSERT(call->type != NULL);
	ASSERT(call->type->name != NULL);

349 350
	_debug("____MAKE %p{%s,%x} [%d]____",
	       call, call->type->name, key_serial(call->key),
351
	       atomic_read(&call->net->nr_outstanding_calls));
D
David Howells 已提交
352

D
David Howells 已提交
353
	call->async = async;
354

355 356 357 358 359 360
	/* Work out the length we're going to transmit.  This is awkward for
	 * calls such as FS.StoreData where there's an extra injection of data
	 * after the initial fixed part.
	 */
	tx_total_len = call->request_size;
	if (call->send_pages) {
361 362 363 364 365 366 367 368 369 370 371
		if (call->last == call->first) {
			tx_total_len += call->last_to - call->first_offset;
		} else {
			/* It looks mathematically like you should be able to
			 * combine the following lines with the ones above, but
			 * unsigned arithmetic is fun when it wraps...
			 */
			tx_total_len += PAGE_SIZE - call->first_offset;
			tx_total_len += call->last_to;
			tx_total_len += (call->last - call->first - 1) * PAGE_SIZE;
		}
372 373
	}

374
	/* create a call */
375
	rxcall = rxrpc_kernel_begin_call(call->net->socket, srx, call->key,
376 377
					 (unsigned long)call,
					 tx_total_len, gfp,
D
David Howells 已提交
378 379
					 (async ?
					  afs_wake_up_async_call :
380 381
					  afs_wake_up_call_waiter),
					 call->upgrade);
382 383 384 385 386 387 388 389 390 391 392 393 394
	if (IS_ERR(rxcall)) {
		ret = PTR_ERR(rxcall);
		goto error_kill_call;
	}

	call->rxcall = rxcall;

	/* send the request */
	iov[0].iov_base	= call->request;
	iov[0].iov_len	= call->request_size;

	msg.msg_name		= NULL;
	msg.msg_namelen		= 0;
395
	iov_iter_kvec(&msg.msg_iter, WRITE | ITER_KVEC, iov, 1,
A
Al Viro 已提交
396
		      call->request_size);
397 398
	msg.msg_control		= NULL;
	msg.msg_controllen	= 0;
399
	msg.msg_flags		= MSG_WAITALL | (call->send_pages ? MSG_MORE : 0);
400

401
	ret = rxrpc_kernel_send_data(call->net->socket, rxcall,
402 403
				     &msg, call->request_size,
				     afs_notify_end_request_tx);
404 405 406
	if (ret < 0)
		goto error_do_abort;

407
	if (call->send_pages) {
A
Al Viro 已提交
408
		ret = afs_send_pages(call, &msg);
409 410 411 412
		if (ret < 0)
			goto error_do_abort;
	}

413 414
	/* at this point, an async call may no longer exist as it may have
	 * already completed */
D
David Howells 已提交
415 416 417
	if (call->async)
		return -EINPROGRESS;

418
	return afs_wait_for_call_to_complete(call, ac);
419 420

error_do_abort:
421 422
	call->state = AFS_CALL_COMPLETE;
	if (ret != -ECONNABORTED) {
423 424
		rxrpc_kernel_abort_call(call->net->socket, rxcall,
					RX_USER_ABORT, ret, "KSD");
425 426
	} else {
		offset = 0;
427
		rxrpc_kernel_recv_data(call->net->socket, rxcall, NULL,
428
				       0, &offset, false, &call->abort_code,
429
				       &call->service_id);
430 431
		ac->abort_code = call->abort_code;
		ac->responded = true;
432
	}
433 434
	call->error = ret;
	trace_afs_call_done(call);
435
error_kill_call:
436
	afs_put_call(call);
437
	ac->error = ret;
438 439 440 441 442 443 444 445 446
	_leave(" = %d", ret);
	return ret;
}

/*
 * deliver messages to a call
 */
static void afs_deliver_to_call(struct afs_call *call)
{
447 448
	enum afs_call_state state;
	u32 abort_code, remote_abort = 0;
449 450
	int ret;

451 452
	_enter("%s", call->type->name);

453 454 455 456 457
	while (state = READ_ONCE(call->state),
	       state == AFS_CALL_CL_AWAIT_REPLY ||
	       state == AFS_CALL_SV_AWAIT_OP_ID ||
	       state == AFS_CALL_SV_AWAIT_REQUEST ||
	       state == AFS_CALL_SV_AWAIT_ACK
458
	       ) {
459
		if (state == AFS_CALL_SV_AWAIT_ACK) {
460
			size_t offset = 0;
461 462
			ret = rxrpc_kernel_recv_data(call->net->socket,
						     call->rxcall,
463
						     NULL, 0, &offset, false,
464
						     &remote_abort,
465
						     &call->service_id);
D
David Howells 已提交
466 467
			trace_afs_recv_data(call, 0, offset, false, ret);

468 469
			if (ret == -EINPROGRESS || ret == -EAGAIN)
				return;
470 471 472
			if (ret < 0 || ret == 1) {
				if (ret == 1)
					ret = 0;
473
				goto call_complete;
474
			}
475
			return;
476 477
		}

478
		ret = call->type->deliver(call);
479
		state = READ_ONCE(call->state);
480 481
		switch (ret) {
		case 0:
482
			if (state == AFS_CALL_CL_PROC_REPLY)
483
				goto call_complete;
484
			ASSERTCMP(state, >, AFS_CALL_CL_PROC_REPLY);
485 486 487 488
			goto done;
		case -EINPROGRESS:
		case -EAGAIN:
			goto out;
489
		case -EIO:
490
		case -ECONNABORTED:
491 492
			ASSERTCMP(state, ==, AFS_CALL_COMPLETE);
			goto done;
493 494
		case -ENOTCONN:
			abort_code = RX_CALL_DEAD;
495
			rxrpc_kernel_abort_call(call->net->socket, call->rxcall,
496
						abort_code, ret, "KNC");
497
			goto local_abort;
498
		case -ENOTSUPP:
499
			abort_code = RXGEN_OPCODE;
500
			rxrpc_kernel_abort_call(call->net->socket, call->rxcall,
501
						abort_code, ret, "KIV");
502
			goto local_abort;
503 504 505 506 507
		case -ENODATA:
		case -EBADMSG:
		case -EMSGSIZE:
		default:
			abort_code = RXGEN_CC_UNMARSHAL;
508
			if (state != AFS_CALL_CL_AWAIT_REPLY)
509
				abort_code = RXGEN_SS_UNMARSHAL;
510
			rxrpc_kernel_abort_call(call->net->socket, call->rxcall,
511
						abort_code, -EBADMSG, "KUM");
512
			goto local_abort;
513
		}
514 515
	}

516
done:
517
	if (state == AFS_CALL_COMPLETE && call->incoming)
518
		afs_put_call(call);
519
out:
520
	_leave("");
521 522
	return;

523 524
local_abort:
	abort_code = 0;
525
call_complete:
526 527
	afs_set_call_complete(call, ret, remote_abort);
	state = AFS_CALL_COMPLETE;
528
	goto done;
529 530 531 532 533
}

/*
 * wait synchronously for a call to complete
 */
534 535
static long afs_wait_for_call_to_complete(struct afs_call *call,
					  struct afs_addr_cursor *ac)
536
{
537
	signed long rtt2, timeout;
538
	long ret;
539 540
	u64 rtt;
	u32 life, last_life;
541 542 543 544 545

	DECLARE_WAITQUEUE(myself, current);

	_enter("");

546
	rtt = rxrpc_kernel_get_rtt(call->net->socket, call->rxcall);
547 548 549 550 551
	rtt2 = nsecs_to_jiffies64(rtt) * 2;
	if (rtt2 < 2)
		rtt2 = 2;

	timeout = rtt2;
552
	last_life = rxrpc_kernel_check_life(call->net->socket, call->rxcall);
553

554 555
	add_wait_queue(&call->waitq, &myself);
	for (;;) {
556
		set_current_state(TASK_UNINTERRUPTIBLE);
557 558

		/* deliver any messages that are in the queue */
559 560
		if (!afs_check_call_state(call, AFS_CALL_COMPLETE) &&
		    call->need_attention) {
561
			call->need_attention = false;
562 563 564 565 566
			__set_current_state(TASK_RUNNING);
			afs_deliver_to_call(call);
			continue;
		}

567
		if (afs_check_call_state(call, AFS_CALL_COMPLETE))
568
			break;
569

570
		life = rxrpc_kernel_check_life(call->net->socket, call->rxcall);
571 572 573 574 575 576 577 578 579 580
		if (timeout == 0 &&
		    life == last_life && signal_pending(current))
				break;

		if (life != last_life) {
			timeout = rtt2;
			last_life = life;
		}

		timeout = schedule_timeout(timeout);
581 582 583 584 585
	}

	remove_wait_queue(&call->waitq, &myself);
	__set_current_state(TASK_RUNNING);

586
	/* Kill off the call if it's still live. */
587
	if (!afs_check_call_state(call, AFS_CALL_COMPLETE)) {
588
		_debug("call interrupted");
589
		if (rxrpc_kernel_abort_call(call->net->socket, call->rxcall,
590 591
					    RX_USER_ABORT, -EINTR, "KWI"))
			afs_set_call_complete(call, -EINTR, 0);
592 593
	}

594
	spin_lock_bh(&call->state_lock);
595 596
	ac->abort_code = call->abort_code;
	ac->error = call->error;
597
	spin_unlock_bh(&call->state_lock);
598 599 600 601 602 603 604 605 606 607 608 609

	ret = ac->error;
	switch (ret) {
	case 0:
		if (call->ret_reply0) {
			ret = (long)call->reply[0];
			call->reply[0] = NULL;
		}
		/* Fall through */
	case -ECONNABORTED:
		ac->responded = true;
		break;
610 611
	}

612
	_debug("call complete");
613
	afs_put_call(call);
614
	_leave(" = %p", (void *)ret);
615 616 617 618 619 620
	return ret;
}

/*
 * wake up a waiting call
 */
621 622
static void afs_wake_up_call_waiter(struct sock *sk, struct rxrpc_call *rxcall,
				    unsigned long call_user_ID)
623
{
624 625 626
	struct afs_call *call = (struct afs_call *)call_user_ID;

	call->need_attention = true;
627 628 629 630 631 632
	wake_up(&call->waitq);
}

/*
 * wake up an asynchronous call
 */
633 634
static void afs_wake_up_async_call(struct sock *sk, struct rxrpc_call *rxcall,
				   unsigned long call_user_ID)
635
{
636
	struct afs_call *call = (struct afs_call *)call_user_ID;
637
	int u;
638

D
David Howells 已提交
639
	trace_afs_notify_call(rxcall, call);
640
	call->need_attention = true;
641 642 643 644

	u = __atomic_add_unless(&call->usage, 1, 0);
	if (u != 0) {
		trace_afs_call(call, afs_call_trace_wake, u,
645
			       atomic_read(&call->net->nr_outstanding_calls),
646 647 648 649 650
			       __builtin_return_address(0));

		if (!queue_work(afs_async_calls, &call->async_work))
			afs_put_call(call);
	}
651 652 653
}

/*
654 655
 * Delete an asynchronous call.  The work item carries a ref to the call struct
 * that we need to release.
656
 */
657
static void afs_delete_async_call(struct work_struct *work)
658
{
659 660
	struct afs_call *call = container_of(work, struct afs_call, async_work);

661 662
	_enter("");

663
	afs_put_call(call);
664 665 666 667 668

	_leave("");
}

/*
669 670
 * Perform I/O processing on an asynchronous call.  The work item carries a ref
 * to the call struct that we either need to release or to pass on.
671
 */
672
static void afs_process_async_call(struct work_struct *work)
673
{
674 675
	struct afs_call *call = container_of(work, struct afs_call, async_work);

676 677
	_enter("");

678 679
	if (call->state < AFS_CALL_COMPLETE && call->need_attention) {
		call->need_attention = false;
680
		afs_deliver_to_call(call);
681
	}
682

D
David Howells 已提交
683
	if (call->state == AFS_CALL_COMPLETE) {
684
		call->reply[0] = NULL;
685

686 687 688 689
		/* We have two refs to release - one from the alloc and one
		 * queued with the work item - and we can't just deallocate the
		 * call because the work item may be queued again.
		 */
690
		call->async_work.func = afs_delete_async_call;
691 692
		if (!queue_work(afs_async_calls, &call->async_work))
			afs_put_call(call);
693 694
	}

695
	afs_put_call(call);
696 697 698
	_leave("");
}

699 700 701 702 703 704 705 706 707 708
static void afs_rx_attach(struct rxrpc_call *rxcall, unsigned long user_call_ID)
{
	struct afs_call *call = (struct afs_call *)user_call_ID;

	call->rxcall = rxcall;
}

/*
 * Charge the incoming call preallocation.
 */
709
void afs_charge_preallocation(struct work_struct *work)
710
{
711 712 713
	struct afs_net *net =
		container_of(work, struct afs_net, charge_preallocation_work);
	struct afs_call *call = net->spare_incoming_call;
714 715 716

	for (;;) {
		if (!call) {
717
			call = afs_alloc_call(net, &afs_RXCMxxxx, GFP_KERNEL);
718 719 720
			if (!call)
				break;

D
David Howells 已提交
721
			call->async = true;
722
			call->state = AFS_CALL_SV_AWAIT_OP_ID;
D
David Howells 已提交
723
			init_waitqueue_head(&call->waitq);
724 725
		}

726
		if (rxrpc_kernel_charge_accept(net->socket,
727 728 729 730 731 732 733
					       afs_wake_up_async_call,
					       afs_rx_attach,
					       (unsigned long)call,
					       GFP_KERNEL) < 0)
			break;
		call = NULL;
	}
734
	net->spare_incoming_call = call;
735 736 737 738 739 740 741 742 743 744 745
}

/*
 * Discard a preallocated call when a socket is shut down.
 */
static void afs_rx_discard_new_call(struct rxrpc_call *rxcall,
				    unsigned long user_call_ID)
{
	struct afs_call *call = (struct afs_call *)user_call_ID;

	call->rxcall = NULL;
746
	afs_put_call(call);
747 748
}

749 750 751
/*
 * Notification of an incoming call.
 */
752 753
static void afs_rx_new_call(struct sock *sk, struct rxrpc_call *rxcall,
			    unsigned long user_call_ID)
754
{
755 756 757
	struct afs_net *net = afs_sock2net(sk);

	queue_work(afs_wq, &net->charge_preallocation_work);
758 759
}

760
/*
761 762
 * Grab the operation ID from an incoming cache manager call.  The socket
 * buffer is discarded on error or if we don't yet have sufficient data.
763
 */
764
static int afs_deliver_cm_op_id(struct afs_call *call)
765
{
766
	int ret;
767

768
	_enter("{%zu}", call->offset);
769 770 771 772

	ASSERTCMP(call->offset, <, 4);

	/* the operation ID forms the first four bytes of the request data */
773
	ret = afs_extract_data(call, &call->tmp, 4, true);
774 775
	if (ret < 0)
		return ret;
776

777
	call->operation_ID = ntohl(call->tmp);
778
	afs_set_call_state(call, AFS_CALL_SV_AWAIT_OP_ID, AFS_CALL_SV_AWAIT_REQUEST);
779
	call->offset = 0;
780 781 782 783 784 785

	/* ask the cache manager to route the call (it'll change the call type
	 * if successful) */
	if (!afs_cm_incoming_call(call))
		return -ENOTSUPP;

D
David Howells 已提交
786 787
	trace_afs_cb_call(call);

788 789
	/* pass responsibility for the remainer of this message off to the
	 * cache manager op */
790
	return call->type->deliver(call);
791 792
}

793 794 795 796 797 798 799 800 801 802
/*
 * Advance the AFS call state when an RxRPC service call ends the transmit
 * phase.
 */
static void afs_notify_end_reply_tx(struct sock *sock,
				    struct rxrpc_call *rxcall,
				    unsigned long call_user_ID)
{
	struct afs_call *call = (struct afs_call *)call_user_ID;

803
	afs_set_call_state(call, AFS_CALL_SV_REPLYING, AFS_CALL_SV_AWAIT_ACK);
804 805
}

806 807 808 809 810
/*
 * send an empty reply
 */
void afs_send_empty_reply(struct afs_call *call)
{
811
	struct afs_net *net = call->net;
812 813 814 815
	struct msghdr msg;

	_enter("");

816
	rxrpc_kernel_set_tx_length(net->socket, call->rxcall, 0);
817

818 819
	msg.msg_name		= NULL;
	msg.msg_namelen		= 0;
820
	iov_iter_kvec(&msg.msg_iter, WRITE | ITER_KVEC, NULL, 0, 0);
821 822 823 824
	msg.msg_control		= NULL;
	msg.msg_controllen	= 0;
	msg.msg_flags		= 0;

825
	switch (rxrpc_kernel_send_data(net->socket, call->rxcall, &msg, 0,
826
				       afs_notify_end_reply_tx)) {
827 828 829 830 831 832
	case 0:
		_leave(" [replied]");
		return;

	case -ENOMEM:
		_debug("oom");
833
		rxrpc_kernel_abort_call(net->socket, call->rxcall,
834
					RX_USER_ABORT, -ENOMEM, "KOO");
835 836 837 838 839 840
	default:
		_leave(" [error]");
		return;
	}
}

841 842 843 844 845
/*
 * send a simple reply
 */
void afs_send_simple_reply(struct afs_call *call, const void *buf, size_t len)
{
846
	struct afs_net *net = call->net;
847
	struct msghdr msg;
848
	struct kvec iov[1];
849
	int n;
850 851 852

	_enter("");

853
	rxrpc_kernel_set_tx_length(net->socket, call->rxcall, len);
854

855 856 857 858
	iov[0].iov_base		= (void *) buf;
	iov[0].iov_len		= len;
	msg.msg_name		= NULL;
	msg.msg_namelen		= 0;
859
	iov_iter_kvec(&msg.msg_iter, WRITE | ITER_KVEC, iov, 1, len);
860 861 862 863
	msg.msg_control		= NULL;
	msg.msg_controllen	= 0;
	msg.msg_flags		= 0;

864
	n = rxrpc_kernel_send_data(net->socket, call->rxcall, &msg, len,
865
				   afs_notify_end_reply_tx);
866
	if (n >= 0) {
867
		/* Success */
868 869
		_leave(" [replied]");
		return;
870
	}
871

872
	if (n == -ENOMEM) {
873
		_debug("oom");
874
		rxrpc_kernel_abort_call(net->socket, call->rxcall,
875
					RX_USER_ABORT, -ENOMEM, "KOO");
876
	}
877
	_leave(" [error]");
878 879
}

880
/*
881
 * Extract a piece of data from the received data socket buffers.
882
 */
883 884
int afs_extract_data(struct afs_call *call, void *buf, size_t count,
		     bool want_more)
885
{
886
	struct afs_net *net = call->net;
887 888
	enum afs_call_state state;
	u32 remote_abort;
889
	int ret;
890

891 892
	_enter("{%s,%zu},,%zu,%d",
	       call->type->name, call->offset, count, want_more);
893

894
	ASSERTCMP(call->offset, <=, count);
895

896
	ret = rxrpc_kernel_recv_data(net->socket, call->rxcall,
897
				     buf, count, &call->offset,
898
				     want_more, &remote_abort,
899
				     &call->service_id);
D
David Howells 已提交
900
	trace_afs_recv_data(call, count, call->offset, want_more, ret);
901 902
	if (ret == 0 || ret == -EAGAIN)
		return ret;
903

904
	state = READ_ONCE(call->state);
905
	if (ret == 1) {
906 907 908
		switch (state) {
		case AFS_CALL_CL_AWAIT_REPLY:
			afs_set_call_state(call, state, AFS_CALL_CL_PROC_REPLY);
909
			break;
910 911
		case AFS_CALL_SV_AWAIT_REQUEST:
			afs_set_call_state(call, state, AFS_CALL_SV_REPLYING);
912
			break;
913 914 915
		case AFS_CALL_COMPLETE:
			kdebug("prem complete %d", call->error);
			return -EIO;
916 917 918 919
		default:
			break;
		}
		return 0;
920
	}
921

922
	afs_set_call_complete(call, ret, remote_abort);
923
	return ret;
924
}