rxrpc.c 22.9 KB
Newer Older
1
// SPDX-License-Identifier: GPL-2.0-or-later
2 3 4 5 6 7
/* Maintain an RxRPC server socket to do AFS communications through
 *
 * Copyright (C) 2007 Red Hat, Inc. All Rights Reserved.
 * Written by David Howells (dhowells@redhat.com)
 */

8
#include <linux/slab.h>
9 10
#include <linux/sched/signal.h>

11 12 13 14
#include <net/sock.h>
#include <net/af_rxrpc.h>
#include "internal.h"
#include "afs_cm.h"
15
#include "protocol_yfs.h"
16

17
struct workqueue_struct *afs_async_calls;
18

19 20 21
static void afs_wake_up_call_waiter(struct sock *, struct rxrpc_call *, unsigned long);
static void afs_wake_up_async_call(struct sock *, struct rxrpc_call *, unsigned long);
static void afs_process_async_call(struct work_struct *);
22 23
static void afs_rx_new_call(struct sock *, struct rxrpc_call *, unsigned long);
static void afs_rx_discard_new_call(struct rxrpc_call *, unsigned long);
24
static int afs_deliver_cm_op_id(struct afs_call *);
25 26 27

/* asynchronous incoming call initial processing */
static const struct afs_call_type afs_RXCMxxxx = {
D
David Howells 已提交
28
	.name		= "CB.xxxx",
29 30 31 32 33 34 35
	.deliver	= afs_deliver_cm_op_id,
};

/*
 * open an RxRPC socket and bind it to be a server for callback notifications
 * - the socket is left in blocking mode and non-blocking ops use MSG_DONTWAIT
 */
36
int afs_open_socket(struct afs_net *net)
37 38 39 40 41 42 43
{
	struct sockaddr_rxrpc srx;
	struct socket *socket;
	int ret;

	_enter("");

44
	ret = sock_create_kern(net->net, AF_RXRPC, SOCK_DGRAM, PF_INET6, &socket);
45 46
	if (ret < 0)
		goto error_1;
47 48 49 50

	socket->sk->sk_allocation = GFP_NOFS;

	/* bind the callback manager's address to make this a server socket */
51
	memset(&srx, 0, sizeof(srx));
52 53 54
	srx.srx_family			= AF_RXRPC;
	srx.srx_service			= CM_SERVICE;
	srx.transport_type		= SOCK_DGRAM;
55 56 57
	srx.transport_len		= sizeof(srx.transport.sin6);
	srx.transport.sin6.sin6_family	= AF_INET6;
	srx.transport.sin6.sin6_port	= htons(AFS_CM_PORT);
58

59 60
	ret = rxrpc_sock_set_min_security_level(socket->sk,
						RXRPC_SECURITY_ENCRYPT);
61 62 63
	if (ret < 0)
		goto error_2;

64
	ret = kernel_bind(socket, (struct sockaddr *) &srx, sizeof(srx));
65 66 67 68
	if (ret == -EADDRINUSE) {
		srx.transport.sin6.sin6_port = 0;
		ret = kernel_bind(socket, (struct sockaddr *) &srx, sizeof(srx));
	}
69 70 71
	if (ret < 0)
		goto error_2;

72 73 74 75 76
	srx.srx_service = YFS_CM_SERVICE;
	ret = kernel_bind(socket, (struct sockaddr *) &srx, sizeof(srx));
	if (ret < 0)
		goto error_2;

77 78 79 80 81 82
	/* Ideally, we'd turn on service upgrade here, but we can't because
	 * OpenAFS is buggy and leaks the userStatus field from packet to
	 * packet and between FS packets and CB packets - so if we try to do an
	 * upgrade on an FS packet, OpenAFS will leak that into the CB packet
	 * it sends back to us.
	 */
83

84 85
	rxrpc_kernel_new_call_notification(socket, afs_rx_new_call,
					   afs_rx_discard_new_call);
86

87 88 89
	ret = kernel_listen(socket, INT_MAX);
	if (ret < 0)
		goto error_2;
90

91 92
	net->socket = socket;
	afs_charge_preallocation(&net->charge_preallocation_work);
93 94
	_leave(" = 0");
	return 0;
95 96 97 98 99 100

error_2:
	sock_release(socket);
error_1:
	_leave(" = %d", ret);
	return ret;
101 102 103 104 105
}

/*
 * close the RxRPC socket AFS was using
 */
106
void afs_close_socket(struct afs_net *net)
107 108 109
{
	_enter("");

110
	kernel_listen(net->socket, 0);
111 112
	flush_workqueue(afs_async_calls);

113 114 115
	if (net->spare_incoming_call) {
		afs_put_call(net->spare_incoming_call);
		net->spare_incoming_call = NULL;
116 117
	}

118
	_debug("outstanding %u", atomic_read(&net->nr_outstanding_calls));
119 120
	wait_var_event(&net->nr_outstanding_calls,
		       !atomic_read(&net->nr_outstanding_calls));
121 122
	_debug("no outstanding calls");

123
	kernel_sock_shutdown(net->socket, SHUT_RDWR);
124
	flush_workqueue(afs_async_calls);
125
	sock_release(net->socket);
126 127 128 129 130

	_debug("dework");
	_leave("");
}

D
David Howells 已提交
131
/*
132
 * Allocate a call.
D
David Howells 已提交
133
 */
134 135
static struct afs_call *afs_alloc_call(struct afs_net *net,
				       const struct afs_call_type *type,
136
				       gfp_t gfp)
D
David Howells 已提交
137
{
138 139
	struct afs_call *call;
	int o;
D
David Howells 已提交
140

141 142 143
	call = kzalloc(sizeof(*call), gfp);
	if (!call)
		return NULL;
D
David Howells 已提交
144

145
	call->type = type;
146
	call->net = net;
147
	call->debug_id = atomic_inc_return(&rxrpc_debug_id);
148
	refcount_set(&call->ref, 1);
149 150
	INIT_WORK(&call->async_work, afs_process_async_call);
	init_waitqueue_head(&call->waitq);
151
	spin_lock_init(&call->state_lock);
152
	call->iter = &call->def_iter;
153

154
	o = atomic_inc_return(&net->nr_outstanding_calls);
155
	trace_afs_call(call->debug_id, afs_call_trace_alloc, 1, o,
156 157
		       __builtin_return_address(0));
	return call;
D
David Howells 已提交
158 159
}

160
/*
161
 * Dispose of a reference on a call.
162
 */
163
void afs_put_call(struct afs_call *call)
164
{
165
	struct afs_net *net = call->net;
166
	unsigned int debug_id = call->debug_id;
167 168
	bool zero;
	int r, o;
169

170 171
	zero = __refcount_dec_and_test(&call->ref, &r);
	o = atomic_read(&net->nr_outstanding_calls);
172
	trace_afs_call(debug_id, afs_call_trace_put, r - 1, o,
173 174
		       __builtin_return_address(0));

175
	if (zero) {
176 177 178 179
		ASSERT(!work_pending(&call->async_work));
		ASSERT(call->type->name != NULL);

		if (call->rxcall) {
180
			rxrpc_kernel_end_call(net->socket, call->rxcall);
181 182 183 184 185
			call->rxcall = NULL;
		}
		if (call->type->destructor)
			call->type->destructor(call);

186
		afs_unuse_server_notime(call->net, call->server, afs_server_trace_put_call);
187
		afs_put_addrlist(call->alist);
188 189
		kfree(call->request);

190
		trace_afs_call(call->debug_id, afs_call_trace_free, 0, o,
191
			       __builtin_return_address(0));
192 193 194
		kfree(call);

		o = atomic_dec_return(&net->nr_outstanding_calls);
195
		if (o == 0)
196
			wake_up_var(&net->nr_outstanding_calls);
197
	}
198 199
}

200 201 202
static struct afs_call *afs_get_call(struct afs_call *call,
				     enum afs_call_trace why)
{
203
	int r;
204

205 206
	__refcount_inc(&call->ref, &r);

207
	trace_afs_call(call->debug_id, why, r + 1,
208 209 210 211 212
		       atomic_read(&call->net->nr_outstanding_calls),
		       __builtin_return_address(0));
	return call;
}

213
/*
214
 * Queue the call for actual work.
215
 */
216
static void afs_queue_call_work(struct afs_call *call)
217
{
218 219
	if (call->type->work) {
		INIT_WORK(&call->work, call->type->work);
220

221
		afs_get_call(call, afs_call_trace_work);
222 223 224
		if (!queue_work(afs_wq, &call->work))
			afs_put_call(call);
	}
225 226
}

227 228 229
/*
 * allocate a call with flat request and reply buffers
 */
230 231
struct afs_call *afs_alloc_flat_call(struct afs_net *net,
				     const struct afs_call_type *type,
232
				     size_t request_size, size_t reply_max)
233 234 235
{
	struct afs_call *call;

236
	call = afs_alloc_call(net, type, GFP_NOFS);
237 238 239 240
	if (!call)
		goto nomem_call;

	if (request_size) {
241
		call->request_size = request_size;
242 243
		call->request = kmalloc(request_size, GFP_NOFS);
		if (!call->request)
D
David Howells 已提交
244
			goto nomem_free;
245 246
	}

247
	if (reply_max) {
248
		call->reply_max = reply_max;
249
		call->buffer = kmalloc(reply_max, GFP_NOFS);
250
		if (!call->buffer)
D
David Howells 已提交
251
			goto nomem_free;
252 253
	}

254
	afs_extract_to_buf(call, call->reply_max);
255
	call->operation_ID = type->op;
256 257 258
	init_waitqueue_head(&call->waitq);
	return call;

D
David Howells 已提交
259
nomem_free:
260
	afs_put_call(call);
261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277
nomem_call:
	return NULL;
}

/*
 * clean up a call with flat buffer
 */
void afs_flat_call_destructor(struct afs_call *call)
{
	_enter("");

	kfree(call->request);
	call->request = NULL;
	kfree(call->buffer);
	call->buffer = NULL;
}

278 279 280 281 282 283 284 285 286
/*
 * Advance the AFS call state when the RxRPC call ends the transmit phase.
 */
static void afs_notify_end_request_tx(struct sock *sock,
				      struct rxrpc_call *rxcall,
				      unsigned long call_user_ID)
{
	struct afs_call *call = (struct afs_call *)call_user_ID;

287
	afs_set_call_state(call, AFS_CALL_CL_REQUESTING, AFS_CALL_CL_AWAIT_REPLY);
288 289
}

290
/*
291 292
 * Initiate a call and synchronously queue up the parameters for dispatch.  Any
 * error is stored into the call struct, which the caller must check for.
293
 */
294
void afs_make_call(struct afs_addr_cursor *ac, struct afs_call *call, gfp_t gfp)
295
{
296
	struct sockaddr_rxrpc *srx = &ac->alist->addrs[ac->index];
297 298 299
	struct rxrpc_call *rxcall;
	struct msghdr msg;
	struct kvec iov[1];
300
	size_t len;
301
	s64 tx_total_len;
302 303
	int ret;

304
	_enter(",{%pISp},", &srx->transport);
305

D
David Howells 已提交
306 307 308
	ASSERT(call->type != NULL);
	ASSERT(call->type->name != NULL);

309 310
	_debug("____MAKE %p{%s,%x} [%d]____",
	       call, call->type->name, key_serial(call->key),
311
	       atomic_read(&call->net->nr_outstanding_calls));
D
David Howells 已提交
312

313 314
	call->addr_ix = ac->index;
	call->alist = afs_get_addrlist(ac->alist);
315

316 317 318 319 320
	/* Work out the length we're going to transmit.  This is awkward for
	 * calls such as FS.StoreData where there's an extra injection of data
	 * after the initial fixed part.
	 */
	tx_total_len = call->request_size;
D
David Howells 已提交
321 322
	if (call->write_iter)
		tx_total_len += iov_iter_count(call->write_iter);
323

324 325 326
	/* If the call is going to be asynchronous, we need an extra ref for
	 * the call to hold itself so the caller need not hang on to its ref.
	 */
327
	if (call->async) {
328
		afs_get_call(call, afs_call_trace_get);
329 330
		call->drop_ref = true;
	}
331

332
	/* create a call */
333
	rxcall = rxrpc_kernel_begin_call(call->net->socket, srx, call->key,
334 335
					 (unsigned long)call,
					 tx_total_len, gfp,
336
					 (call->async ?
D
David Howells 已提交
337
					  afs_wake_up_async_call :
338
					  afs_wake_up_call_waiter),
339
					 call->upgrade,
340 341
					 (call->intr ? RXRPC_PREINTERRUPTIBLE :
					  RXRPC_UNINTERRUPTIBLE),
342
					 call->debug_id);
343 344
	if (IS_ERR(rxcall)) {
		ret = PTR_ERR(rxcall);
345
		call->error = ret;
346 347 348 349 350
		goto error_kill_call;
	}

	call->rxcall = rxcall;

351 352 353 354
	if (call->max_lifespan)
		rxrpc_kernel_set_max_life(call->net->socket, rxcall,
					  call->max_lifespan);

355 356 357 358 359 360
	/* send the request */
	iov[0].iov_base	= call->request;
	iov[0].iov_len	= call->request_size;

	msg.msg_name		= NULL;
	msg.msg_namelen		= 0;
361
	iov_iter_kvec(&msg.msg_iter, WRITE, iov, 1, call->request_size);
362 363
	msg.msg_control		= NULL;
	msg.msg_controllen	= 0;
D
David Howells 已提交
364
	msg.msg_flags		= MSG_WAITALL | (call->write_iter ? MSG_MORE : 0);
365

366
	ret = rxrpc_kernel_send_data(call->net->socket, rxcall,
367 368
				     &msg, call->request_size,
				     afs_notify_end_request_tx);
369 370 371
	if (ret < 0)
		goto error_do_abort;

D
David Howells 已提交
372 373 374 375 376 377 378 379 380 381 382 383
	if (call->write_iter) {
		msg.msg_iter = *call->write_iter;
		msg.msg_flags &= ~MSG_MORE;
		trace_afs_send_data(call, &msg);

		ret = rxrpc_kernel_send_data(call->net->socket,
					     call->rxcall, &msg,
					     iov_iter_count(&msg.msg_iter),
					     afs_notify_end_request_tx);
		*call->write_iter = msg.msg_iter;

		trace_afs_sent_data(call, &msg, ret);
384 385 386 387
		if (ret < 0)
			goto error_do_abort;
	}

388 389
	/* Note that at this point, we may have received the reply or an abort
	 * - and an asynchronous call may already have completed.
390 391 392
	 *
	 * afs_wait_for_call_to_complete(call, ac)
	 * must be called to synchronously clean up.
393
	 */
394
	return;
395 396

error_do_abort:
397
	if (ret != -ECONNABORTED) {
398 399
		rxrpc_kernel_abort_call(call->net->socket, rxcall,
					RX_USER_ABORT, ret, "KSD");
400
	} else {
401
		len = 0;
402
		iov_iter_kvec(&msg.msg_iter, READ, NULL, 0, 0);
403
		rxrpc_kernel_recv_data(call->net->socket, rxcall,
404
				       &msg.msg_iter, &len, false,
405
				       &call->abort_code, &call->service_id);
406 407
		ac->abort_code = call->abort_code;
		ac->responded = true;
408
	}
409 410
	call->error = ret;
	trace_afs_call_done(call);
411
error_kill_call:
412 413
	if (call->type->done)
		call->type->done(call);
414 415 416 417 418 419 420 421 422 423 424 425 426 427 428

	/* We need to dispose of the extra ref we grabbed for an async call.
	 * The call, however, might be queued on afs_async_calls and we need to
	 * make sure we don't get any more notifications that might requeue it.
	 */
	if (call->rxcall) {
		rxrpc_kernel_end_call(call->net->socket, call->rxcall);
		call->rxcall = NULL;
	}
	if (call->async) {
		if (cancel_work_sync(&call->async_work))
			afs_put_call(call);
		afs_put_call(call);
	}

429
	ac->error = ret;
430
	call->state = AFS_CALL_COMPLETE;
431 432 433
	_leave(" = %d", ret);
}

434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466
/*
 * Log remote abort codes that indicate that we have a protocol disagreement
 * with the server.
 */
static void afs_log_error(struct afs_call *call, s32 remote_abort)
{
	static int max = 0;
	const char *msg;
	int m;

	switch (remote_abort) {
	case RX_EOF:		 msg = "unexpected EOF";	break;
	case RXGEN_CC_MARSHAL:	 msg = "client marshalling";	break;
	case RXGEN_CC_UNMARSHAL: msg = "client unmarshalling";	break;
	case RXGEN_SS_MARSHAL:	 msg = "server marshalling";	break;
	case RXGEN_SS_UNMARSHAL: msg = "server unmarshalling";	break;
	case RXGEN_DECODE:	 msg = "opcode decode";		break;
	case RXGEN_SS_XDRFREE:	 msg = "server XDR cleanup";	break;
	case RXGEN_CC_XDRFREE:	 msg = "client XDR cleanup";	break;
	case -32:		 msg = "insufficient data";	break;
	default:
		return;
	}

	m = max;
	if (m < 3) {
		max = m + 1;
		pr_notice("kAFS: Peer reported %s failure on %s [%pISp]\n",
			  msg, call->type->name,
			  &call->alist->addrs[call->addr_ix].transport);
	}
}

467 468 469 470 471
/*
 * deliver messages to a call
 */
static void afs_deliver_to_call(struct afs_call *call)
{
472
	enum afs_call_state state;
473
	size_t len;
474
	u32 abort_code, remote_abort = 0;
475 476
	int ret;

477 478
	_enter("%s", call->type->name);

479 480 481 482 483
	while (state = READ_ONCE(call->state),
	       state == AFS_CALL_CL_AWAIT_REPLY ||
	       state == AFS_CALL_SV_AWAIT_OP_ID ||
	       state == AFS_CALL_SV_AWAIT_REQUEST ||
	       state == AFS_CALL_SV_AWAIT_ACK
484
	       ) {
485
		if (state == AFS_CALL_SV_AWAIT_ACK) {
486
			len = 0;
487
			iov_iter_kvec(&call->def_iter, READ, NULL, 0, 0);
488
			ret = rxrpc_kernel_recv_data(call->net->socket,
489
						     call->rxcall, &call->def_iter,
490
						     &len, false, &remote_abort,
491
						     &call->service_id);
492
			trace_afs_receive_data(call, &call->def_iter, false, ret);
D
David Howells 已提交
493

494 495
			if (ret == -EINPROGRESS || ret == -EAGAIN)
				return;
496 497 498
			if (ret < 0 || ret == 1) {
				if (ret == 1)
					ret = 0;
499
				goto call_complete;
500
			}
501
			return;
502 503
		}

D
David Howells 已提交
504
		if (!call->have_reply_time &&
505 506 507
		    rxrpc_kernel_get_reply_time(call->net->socket,
						call->rxcall,
						&call->reply_time))
D
David Howells 已提交
508
			call->have_reply_time = true;
509

510
		ret = call->type->deliver(call);
511
		state = READ_ONCE(call->state);
512 513
		if (ret == 0 && call->unmarshalling_error)
			ret = -EBADMSG;
514 515
		switch (ret) {
		case 0:
516
			afs_queue_call_work(call);
517
			if (state == AFS_CALL_CL_PROC_REPLY) {
518
				if (call->op)
519
					set_bit(AFS_SERVER_FL_MAY_HAVE_CB,
520
						&call->op->server->flags);
521
				goto call_complete;
522
			}
523
			ASSERTCMP(state, >, AFS_CALL_CL_PROC_REPLY);
524 525 526 527
			goto done;
		case -EINPROGRESS:
		case -EAGAIN:
			goto out;
528
		case -ECONNABORTED:
529
			ASSERTCMP(state, ==, AFS_CALL_COMPLETE);
530
			afs_log_error(call, call->abort_code);
531
			goto done;
532
		case -ENOTSUPP:
533
			abort_code = RXGEN_OPCODE;
534
			rxrpc_kernel_abort_call(call->net->socket, call->rxcall,
535
						abort_code, ret, "KIV");
536
			goto local_abort;
537 538 539
		case -EIO:
			pr_err("kAFS: Call %u in bad state %u\n",
			       call->debug_id, state);
540
			fallthrough;
541 542 543
		case -ENODATA:
		case -EBADMSG:
		case -EMSGSIZE:
544 545
		case -ENOMEM:
		case -EFAULT:
546
			abort_code = RXGEN_CC_UNMARSHAL;
547
			if (state != AFS_CALL_CL_AWAIT_REPLY)
548
				abort_code = RXGEN_SS_UNMARSHAL;
549
			rxrpc_kernel_abort_call(call->net->socket, call->rxcall,
550
						abort_code, ret, "KUM");
551
			goto local_abort;
552
		default:
553
			abort_code = RX_CALL_DEAD;
554 555 556
			rxrpc_kernel_abort_call(call->net->socket, call->rxcall,
						abort_code, ret, "KER");
			goto local_abort;
557
		}
558 559
	}

560
done:
561 562
	if (call->type->done)
		call->type->done(call);
563
out:
564
	_leave("");
565 566
	return;

567 568
local_abort:
	abort_code = 0;
569
call_complete:
570 571
	afs_set_call_complete(call, ret, remote_abort);
	state = AFS_CALL_COMPLETE;
572
	goto done;
573 574 575
}

/*
576
 * Wait synchronously for a call to complete and clean up the call struct.
577
 */
578 579
long afs_wait_for_call_to_complete(struct afs_call *call,
				   struct afs_addr_cursor *ac)
580
{
581
	long ret;
582
	bool rxrpc_complete = false;
583 584 585 586 587

	DECLARE_WAITQUEUE(myself, current);

	_enter("");

588 589 590 591
	ret = call->error;
	if (ret < 0)
		goto out;

592 593
	add_wait_queue(&call->waitq, &myself);
	for (;;) {
594
		set_current_state(TASK_UNINTERRUPTIBLE);
595 596

		/* deliver any messages that are in the queue */
597 598
		if (!afs_check_call_state(call, AFS_CALL_COMPLETE) &&
		    call->need_attention) {
599
			call->need_attention = false;
600 601 602 603 604
			__set_current_state(TASK_RUNNING);
			afs_deliver_to_call(call);
			continue;
		}

605
		if (afs_check_call_state(call, AFS_CALL_COMPLETE))
606
			break;
607

608
		if (!rxrpc_kernel_check_life(call->net->socket, call->rxcall)) {
609 610 611 612 613
			/* rxrpc terminated the call. */
			rxrpc_complete = true;
			break;
		}

614
		schedule();
615 616 617 618 619
	}

	remove_wait_queue(&call->waitq, &myself);
	__set_current_state(TASK_RUNNING);

620
	if (!afs_check_call_state(call, AFS_CALL_COMPLETE)) {
621 622 623 624 625 626 627 628 629
		if (rxrpc_complete) {
			afs_set_call_complete(call, call->error, call->abort_code);
		} else {
			/* Kill off the call if it's still live. */
			_debug("call interrupted");
			if (rxrpc_kernel_abort_call(call->net->socket, call->rxcall,
						    RX_USER_ABORT, -EINTR, "KWI"))
				afs_set_call_complete(call, -EINTR, 0);
		}
630 631
	}

632
	spin_lock_bh(&call->state_lock);
633 634
	ac->abort_code = call->abort_code;
	ac->error = call->error;
635
	spin_unlock_bh(&call->state_lock);
636 637 638 639

	ret = ac->error;
	switch (ret) {
	case 0:
640 641 642
		ret = call->ret0;
		call->ret0 = 0;

643
		fallthrough;
644 645 646
	case -ECONNABORTED:
		ac->responded = true;
		break;
647 648
	}

649
out:
650
	_debug("call complete");
651
	afs_put_call(call);
652
	_leave(" = %p", (void *)ret);
653 654 655 656 657 658
	return ret;
}

/*
 * wake up a waiting call
 */
659 660
static void afs_wake_up_call_waiter(struct sock *sk, struct rxrpc_call *rxcall,
				    unsigned long call_user_ID)
661
{
662 663 664
	struct afs_call *call = (struct afs_call *)call_user_ID;

	call->need_attention = true;
665 666 667 668 669 670
	wake_up(&call->waitq);
}

/*
 * wake up an asynchronous call
 */
671 672
static void afs_wake_up_async_call(struct sock *sk, struct rxrpc_call *rxcall,
				   unsigned long call_user_ID)
673
{
674
	struct afs_call *call = (struct afs_call *)call_user_ID;
675
	int r;
676

D
David Howells 已提交
677
	trace_afs_notify_call(rxcall, call);
678
	call->need_attention = true;
679

680
	if (__refcount_inc_not_zero(&call->ref, &r)) {
681
		trace_afs_call(call->debug_id, afs_call_trace_wake, r + 1,
682
			       atomic_read(&call->net->nr_outstanding_calls),
683 684 685 686 687
			       __builtin_return_address(0));

		if (!queue_work(afs_async_calls, &call->async_work))
			afs_put_call(call);
	}
688 689 690
}

/*
691 692
 * Perform I/O processing on an asynchronous call.  The work item carries a ref
 * to the call struct that we either need to release or to pass on.
693
 */
694
static void afs_process_async_call(struct work_struct *work)
695
{
696 697
	struct afs_call *call = container_of(work, struct afs_call, async_work);

698 699
	_enter("");

700 701
	if (call->state < AFS_CALL_COMPLETE && call->need_attention) {
		call->need_attention = false;
702
		afs_deliver_to_call(call);
703
	}
704

705
	afs_put_call(call);
706 707 708
	_leave("");
}

709 710 711 712 713 714 715 716 717 718
static void afs_rx_attach(struct rxrpc_call *rxcall, unsigned long user_call_ID)
{
	struct afs_call *call = (struct afs_call *)user_call_ID;

	call->rxcall = rxcall;
}

/*
 * Charge the incoming call preallocation.
 */
719
void afs_charge_preallocation(struct work_struct *work)
720
{
721 722 723
	struct afs_net *net =
		container_of(work, struct afs_net, charge_preallocation_work);
	struct afs_call *call = net->spare_incoming_call;
724 725 726

	for (;;) {
		if (!call) {
727
			call = afs_alloc_call(net, &afs_RXCMxxxx, GFP_KERNEL);
728 729 730
			if (!call)
				break;

731
			call->drop_ref = true;
D
David Howells 已提交
732
			call->async = true;
733
			call->state = AFS_CALL_SV_AWAIT_OP_ID;
D
David Howells 已提交
734
			init_waitqueue_head(&call->waitq);
735
			afs_extract_to_tmp(call);
736 737
		}

738
		if (rxrpc_kernel_charge_accept(net->socket,
739 740 741
					       afs_wake_up_async_call,
					       afs_rx_attach,
					       (unsigned long)call,
742 743
					       GFP_KERNEL,
					       call->debug_id) < 0)
744 745 746
			break;
		call = NULL;
	}
747
	net->spare_incoming_call = call;
748 749 750 751 752 753 754 755 756 757 758
}

/*
 * Discard a preallocated call when a socket is shut down.
 */
static void afs_rx_discard_new_call(struct rxrpc_call *rxcall,
				    unsigned long user_call_ID)
{
	struct afs_call *call = (struct afs_call *)user_call_ID;

	call->rxcall = NULL;
759
	afs_put_call(call);
760 761
}

762 763 764
/*
 * Notification of an incoming call.
 */
765 766
static void afs_rx_new_call(struct sock *sk, struct rxrpc_call *rxcall,
			    unsigned long user_call_ID)
767
{
768 769 770
	struct afs_net *net = afs_sock2net(sk);

	queue_work(afs_wq, &net->charge_preallocation_work);
771 772
}

773
/*
774 775
 * Grab the operation ID from an incoming cache manager call.  The socket
 * buffer is discarded on error or if we don't yet have sufficient data.
776
 */
777
static int afs_deliver_cm_op_id(struct afs_call *call)
778
{
779
	int ret;
780

781
	_enter("{%zu}", iov_iter_count(call->iter));
782 783

	/* the operation ID forms the first four bytes of the request data */
784
	ret = afs_extract_data(call, true);
785 786
	if (ret < 0)
		return ret;
787

788
	call->operation_ID = ntohl(call->tmp);
789
	afs_set_call_state(call, AFS_CALL_SV_AWAIT_OP_ID, AFS_CALL_SV_AWAIT_REQUEST);
790 791 792 793 794 795

	/* ask the cache manager to route the call (it'll change the call type
	 * if successful) */
	if (!afs_cm_incoming_call(call))
		return -ENOTSUPP;

D
David Howells 已提交
796 797
	trace_afs_cb_call(call);

798 799
	/* pass responsibility for the remainer of this message off to the
	 * cache manager op */
800
	return call->type->deliver(call);
801 802
}

803 804 805 806 807 808 809 810 811 812
/*
 * Advance the AFS call state when an RxRPC service call ends the transmit
 * phase.
 */
static void afs_notify_end_reply_tx(struct sock *sock,
				    struct rxrpc_call *rxcall,
				    unsigned long call_user_ID)
{
	struct afs_call *call = (struct afs_call *)call_user_ID;

813
	afs_set_call_state(call, AFS_CALL_SV_REPLYING, AFS_CALL_SV_AWAIT_ACK);
814 815
}

816 817 818 819 820
/*
 * send an empty reply
 */
void afs_send_empty_reply(struct afs_call *call)
{
821
	struct afs_net *net = call->net;
822 823 824 825
	struct msghdr msg;

	_enter("");

826
	rxrpc_kernel_set_tx_length(net->socket, call->rxcall, 0);
827

828 829
	msg.msg_name		= NULL;
	msg.msg_namelen		= 0;
830
	iov_iter_kvec(&msg.msg_iter, WRITE, NULL, 0, 0);
831 832 833 834
	msg.msg_control		= NULL;
	msg.msg_controllen	= 0;
	msg.msg_flags		= 0;

835
	switch (rxrpc_kernel_send_data(net->socket, call->rxcall, &msg, 0,
836
				       afs_notify_end_reply_tx)) {
837 838 839 840 841 842
	case 0:
		_leave(" [replied]");
		return;

	case -ENOMEM:
		_debug("oom");
843
		rxrpc_kernel_abort_call(net->socket, call->rxcall,
844
					RXGEN_SS_MARSHAL, -ENOMEM, "KOO");
845
		fallthrough;
846 847 848 849 850 851
	default:
		_leave(" [error]");
		return;
	}
}

852 853 854 855 856
/*
 * send a simple reply
 */
void afs_send_simple_reply(struct afs_call *call, const void *buf, size_t len)
{
857
	struct afs_net *net = call->net;
858
	struct msghdr msg;
859
	struct kvec iov[1];
860
	int n;
861 862 863

	_enter("");

864
	rxrpc_kernel_set_tx_length(net->socket, call->rxcall, len);
865

866 867 868 869
	iov[0].iov_base		= (void *) buf;
	iov[0].iov_len		= len;
	msg.msg_name		= NULL;
	msg.msg_namelen		= 0;
870
	iov_iter_kvec(&msg.msg_iter, WRITE, iov, 1, len);
871 872 873 874
	msg.msg_control		= NULL;
	msg.msg_controllen	= 0;
	msg.msg_flags		= 0;

875
	n = rxrpc_kernel_send_data(net->socket, call->rxcall, &msg, len,
876
				   afs_notify_end_reply_tx);
877
	if (n >= 0) {
878
		/* Success */
879 880
		_leave(" [replied]");
		return;
881
	}
882

883
	if (n == -ENOMEM) {
884
		_debug("oom");
885
		rxrpc_kernel_abort_call(net->socket, call->rxcall,
886
					RXGEN_SS_MARSHAL, -ENOMEM, "KOO");
887
	}
888
	_leave(" [error]");
889 890
}

891
/*
892
 * Extract a piece of data from the received data socket buffers.
893
 */
894
int afs_extract_data(struct afs_call *call, bool want_more)
895
{
896
	struct afs_net *net = call->net;
897
	struct iov_iter *iter = call->iter;
898
	enum afs_call_state state;
899
	u32 remote_abort = 0;
900
	int ret;
901

902 903
	_enter("{%s,%zu,%zu},%d",
	       call->type->name, call->iov_len, iov_iter_count(iter), want_more);
904

905
	ret = rxrpc_kernel_recv_data(net->socket, call->rxcall, iter,
906
				     &call->iov_len, want_more, &remote_abort,
907
				     &call->service_id);
908 909
	if (ret == 0 || ret == -EAGAIN)
		return ret;
910

911
	state = READ_ONCE(call->state);
912
	if (ret == 1) {
913 914 915
		switch (state) {
		case AFS_CALL_CL_AWAIT_REPLY:
			afs_set_call_state(call, state, AFS_CALL_CL_PROC_REPLY);
916
			break;
917 918
		case AFS_CALL_SV_AWAIT_REQUEST:
			afs_set_call_state(call, state, AFS_CALL_SV_REPLYING);
919
			break;
920 921
		case AFS_CALL_COMPLETE:
			kdebug("prem complete %d", call->error);
922
			return afs_io_error(call, afs_io_error_extract);
923 924 925 926
		default:
			break;
		}
		return 0;
927
	}
928

929
	afs_set_call_complete(call, ret, remote_abort);
930
	return ret;
931
}
D
David Howells 已提交
932 933 934 935

/*
 * Log protocol error production.
 */
936
noinline int afs_protocol_error(struct afs_call *call,
937
				enum afs_eproto_cause cause)
D
David Howells 已提交
938
{
939
	trace_afs_protocol_error(call, cause);
940 941
	if (call)
		call->unmarshalling_error = true;
942
	return -EBADMSG;
D
David Howells 已提交
943
}