rxrpc.c 22.9 KB
Newer Older
1
// SPDX-License-Identifier: GPL-2.0-or-later
2 3 4 5 6 7
/* Maintain an RxRPC server socket to do AFS communications through
 *
 * Copyright (C) 2007 Red Hat, Inc. All Rights Reserved.
 * Written by David Howells (dhowells@redhat.com)
 */

8
#include <linux/slab.h>
9 10
#include <linux/sched/signal.h>

11 12 13 14
#include <net/sock.h>
#include <net/af_rxrpc.h>
#include "internal.h"
#include "afs_cm.h"
15
#include "protocol_yfs.h"
16

17
struct workqueue_struct *afs_async_calls;
18

19 20 21
static void afs_wake_up_call_waiter(struct sock *, struct rxrpc_call *, unsigned long);
static void afs_wake_up_async_call(struct sock *, struct rxrpc_call *, unsigned long);
static void afs_process_async_call(struct work_struct *);
22 23
static void afs_rx_new_call(struct sock *, struct rxrpc_call *, unsigned long);
static void afs_rx_discard_new_call(struct rxrpc_call *, unsigned long);
24
static int afs_deliver_cm_op_id(struct afs_call *);
25 26 27

/* asynchronous incoming call initial processing */
static const struct afs_call_type afs_RXCMxxxx = {
D
David Howells 已提交
28
	.name		= "CB.xxxx",
29 30 31 32 33 34 35
	.deliver	= afs_deliver_cm_op_id,
};

/*
 * open an RxRPC socket and bind it to be a server for callback notifications
 * - the socket is left in blocking mode and non-blocking ops use MSG_DONTWAIT
 */
36
int afs_open_socket(struct afs_net *net)
37 38 39 40 41 42 43
{
	struct sockaddr_rxrpc srx;
	struct socket *socket;
	int ret;

	_enter("");

44
	ret = sock_create_kern(net->net, AF_RXRPC, SOCK_DGRAM, PF_INET6, &socket);
45 46
	if (ret < 0)
		goto error_1;
47 48 49 50

	socket->sk->sk_allocation = GFP_NOFS;

	/* bind the callback manager's address to make this a server socket */
51
	memset(&srx, 0, sizeof(srx));
52 53 54
	srx.srx_family			= AF_RXRPC;
	srx.srx_service			= CM_SERVICE;
	srx.transport_type		= SOCK_DGRAM;
55 56 57
	srx.transport_len		= sizeof(srx.transport.sin6);
	srx.transport.sin6.sin6_family	= AF_INET6;
	srx.transport.sin6.sin6_port	= htons(AFS_CM_PORT);
58

59 60
	ret = rxrpc_sock_set_min_security_level(socket->sk,
						RXRPC_SECURITY_ENCRYPT);
61 62 63
	if (ret < 0)
		goto error_2;

64
	ret = kernel_bind(socket, (struct sockaddr *) &srx, sizeof(srx));
65 66 67 68
	if (ret == -EADDRINUSE) {
		srx.transport.sin6.sin6_port = 0;
		ret = kernel_bind(socket, (struct sockaddr *) &srx, sizeof(srx));
	}
69 70 71
	if (ret < 0)
		goto error_2;

72 73 74 75 76
	srx.srx_service = YFS_CM_SERVICE;
	ret = kernel_bind(socket, (struct sockaddr *) &srx, sizeof(srx));
	if (ret < 0)
		goto error_2;

77 78 79 80 81 82
	/* Ideally, we'd turn on service upgrade here, but we can't because
	 * OpenAFS is buggy and leaks the userStatus field from packet to
	 * packet and between FS packets and CB packets - so if we try to do an
	 * upgrade on an FS packet, OpenAFS will leak that into the CB packet
	 * it sends back to us.
	 */
83

84 85
	rxrpc_kernel_new_call_notification(socket, afs_rx_new_call,
					   afs_rx_discard_new_call);
86

87 88 89
	ret = kernel_listen(socket, INT_MAX);
	if (ret < 0)
		goto error_2;
90

91 92
	net->socket = socket;
	afs_charge_preallocation(&net->charge_preallocation_work);
93 94
	_leave(" = 0");
	return 0;
95 96 97 98 99 100

error_2:
	sock_release(socket);
error_1:
	_leave(" = %d", ret);
	return ret;
101 102 103 104 105
}

/*
 * close the RxRPC socket AFS was using
 */
106
void afs_close_socket(struct afs_net *net)
107 108 109
{
	_enter("");

110
	kernel_listen(net->socket, 0);
111 112
	flush_workqueue(afs_async_calls);

113 114 115
	if (net->spare_incoming_call) {
		afs_put_call(net->spare_incoming_call);
		net->spare_incoming_call = NULL;
116 117
	}

118
	_debug("outstanding %u", atomic_read(&net->nr_outstanding_calls));
119 120
	wait_var_event(&net->nr_outstanding_calls,
		       !atomic_read(&net->nr_outstanding_calls));
121 122
	_debug("no outstanding calls");

123
	kernel_sock_shutdown(net->socket, SHUT_RDWR);
124
	flush_workqueue(afs_async_calls);
125
	sock_release(net->socket);
126 127 128 129 130

	_debug("dework");
	_leave("");
}

D
David Howells 已提交
131
/*
132
 * Allocate a call.
D
David Howells 已提交
133
 */
134 135
static struct afs_call *afs_alloc_call(struct afs_net *net,
				       const struct afs_call_type *type,
136
				       gfp_t gfp)
D
David Howells 已提交
137
{
138 139
	struct afs_call *call;
	int o;
D
David Howells 已提交
140

141 142 143
	call = kzalloc(sizeof(*call), gfp);
	if (!call)
		return NULL;
D
David Howells 已提交
144

145
	call->type = type;
146
	call->net = net;
147
	call->debug_id = atomic_inc_return(&rxrpc_debug_id);
148 149 150
	atomic_set(&call->usage, 1);
	INIT_WORK(&call->async_work, afs_process_async_call);
	init_waitqueue_head(&call->waitq);
151
	spin_lock_init(&call->state_lock);
152
	call->iter = &call->def_iter;
153

154
	o = atomic_inc_return(&net->nr_outstanding_calls);
155 156 157
	trace_afs_call(call, afs_call_trace_alloc, 1, o,
		       __builtin_return_address(0));
	return call;
D
David Howells 已提交
158 159
}

160
/*
161
 * Dispose of a reference on a call.
162
 */
163
void afs_put_call(struct afs_call *call)
164
{
165
	struct afs_net *net = call->net;
166
	int n = atomic_dec_return(&call->usage);
167
	int o = atomic_read(&net->nr_outstanding_calls);
168

D
David Howells 已提交
169
	trace_afs_call(call, afs_call_trace_put, n, o,
170 171 172 173 174 175 176 177
		       __builtin_return_address(0));

	ASSERTCMP(n, >=, 0);
	if (n == 0) {
		ASSERT(!work_pending(&call->async_work));
		ASSERT(call->type->name != NULL);

		if (call->rxcall) {
178
			rxrpc_kernel_end_call(net->socket, call->rxcall);
179 180 181 182 183
			call->rxcall = NULL;
		}
		if (call->type->destructor)
			call->type->destructor(call);

184
		afs_unuse_server_notime(call->net, call->server, afs_server_trace_put_call);
185
		afs_put_addrlist(call->alist);
186 187 188 189
		kfree(call->request);

		trace_afs_call(call, afs_call_trace_free, 0, o,
			       __builtin_return_address(0));
190 191 192
		kfree(call);

		o = atomic_dec_return(&net->nr_outstanding_calls);
193
		if (o == 0)
194
			wake_up_var(&net->nr_outstanding_calls);
195
	}
196 197
}

198 199 200 201 202 203 204 205 206 207 208
static struct afs_call *afs_get_call(struct afs_call *call,
				     enum afs_call_trace why)
{
	int u = atomic_inc_return(&call->usage);

	trace_afs_call(call, why, u,
		       atomic_read(&call->net->nr_outstanding_calls),
		       __builtin_return_address(0));
	return call;
}

209
/*
210
 * Queue the call for actual work.
211
 */
212
static void afs_queue_call_work(struct afs_call *call)
213
{
214 215
	if (call->type->work) {
		INIT_WORK(&call->work, call->type->work);
216

217
		afs_get_call(call, afs_call_trace_work);
218 219 220
		if (!queue_work(afs_wq, &call->work))
			afs_put_call(call);
	}
221 222
}

223 224 225
/*
 * allocate a call with flat request and reply buffers
 */
226 227
struct afs_call *afs_alloc_flat_call(struct afs_net *net,
				     const struct afs_call_type *type,
228
				     size_t request_size, size_t reply_max)
229 230 231
{
	struct afs_call *call;

232
	call = afs_alloc_call(net, type, GFP_NOFS);
233 234 235 236
	if (!call)
		goto nomem_call;

	if (request_size) {
237
		call->request_size = request_size;
238 239
		call->request = kmalloc(request_size, GFP_NOFS);
		if (!call->request)
D
David Howells 已提交
240
			goto nomem_free;
241 242
	}

243
	if (reply_max) {
244
		call->reply_max = reply_max;
245
		call->buffer = kmalloc(reply_max, GFP_NOFS);
246
		if (!call->buffer)
D
David Howells 已提交
247
			goto nomem_free;
248 249
	}

250
	afs_extract_to_buf(call, call->reply_max);
251
	call->operation_ID = type->op;
252 253 254
	init_waitqueue_head(&call->waitq);
	return call;

D
David Howells 已提交
255
nomem_free:
256
	afs_put_call(call);
257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273
nomem_call:
	return NULL;
}

/*
 * clean up a call with flat buffer
 */
void afs_flat_call_destructor(struct afs_call *call)
{
	_enter("");

	kfree(call->request);
	call->request = NULL;
	kfree(call->buffer);
	call->buffer = NULL;
}

274 275 276 277 278 279 280 281 282
/*
 * Advance the AFS call state when the RxRPC call ends the transmit phase.
 */
static void afs_notify_end_request_tx(struct sock *sock,
				      struct rxrpc_call *rxcall,
				      unsigned long call_user_ID)
{
	struct afs_call *call = (struct afs_call *)call_user_ID;

283
	afs_set_call_state(call, AFS_CALL_CL_REQUESTING, AFS_CALL_CL_AWAIT_REPLY);
284 285
}

286
/*
287 288
 * Initiate a call and synchronously queue up the parameters for dispatch.  Any
 * error is stored into the call struct, which the caller must check for.
289
 */
290
void afs_make_call(struct afs_addr_cursor *ac, struct afs_call *call, gfp_t gfp)
291
{
292
	struct sockaddr_rxrpc *srx = &ac->alist->addrs[ac->index];
293 294 295
	struct rxrpc_call *rxcall;
	struct msghdr msg;
	struct kvec iov[1];
296
	size_t len;
297
	s64 tx_total_len;
298 299
	int ret;

300
	_enter(",{%pISp},", &srx->transport);
301

D
David Howells 已提交
302 303 304
	ASSERT(call->type != NULL);
	ASSERT(call->type->name != NULL);

305 306
	_debug("____MAKE %p{%s,%x} [%d]____",
	       call, call->type->name, key_serial(call->key),
307
	       atomic_read(&call->net->nr_outstanding_calls));
D
David Howells 已提交
308

309 310
	call->addr_ix = ac->index;
	call->alist = afs_get_addrlist(ac->alist);
311

312 313 314 315 316
	/* Work out the length we're going to transmit.  This is awkward for
	 * calls such as FS.StoreData where there's an extra injection of data
	 * after the initial fixed part.
	 */
	tx_total_len = call->request_size;
D
David Howells 已提交
317 318
	if (call->write_iter)
		tx_total_len += iov_iter_count(call->write_iter);
319

320 321 322
	/* If the call is going to be asynchronous, we need an extra ref for
	 * the call to hold itself so the caller need not hang on to its ref.
	 */
323
	if (call->async) {
324
		afs_get_call(call, afs_call_trace_get);
325 326
		call->drop_ref = true;
	}
327

328
	/* create a call */
329
	rxcall = rxrpc_kernel_begin_call(call->net->socket, srx, call->key,
330 331
					 (unsigned long)call,
					 tx_total_len, gfp,
332
					 (call->async ?
D
David Howells 已提交
333
					  afs_wake_up_async_call :
334
					  afs_wake_up_call_waiter),
335
					 call->upgrade,
336 337
					 (call->intr ? RXRPC_PREINTERRUPTIBLE :
					  RXRPC_UNINTERRUPTIBLE),
338
					 call->debug_id);
339 340
	if (IS_ERR(rxcall)) {
		ret = PTR_ERR(rxcall);
341
		call->error = ret;
342 343 344 345 346
		goto error_kill_call;
	}

	call->rxcall = rxcall;

347 348 349 350
	if (call->max_lifespan)
		rxrpc_kernel_set_max_life(call->net->socket, rxcall,
					  call->max_lifespan);

351 352 353 354 355 356
	/* send the request */
	iov[0].iov_base	= call->request;
	iov[0].iov_len	= call->request_size;

	msg.msg_name		= NULL;
	msg.msg_namelen		= 0;
357
	iov_iter_kvec(&msg.msg_iter, WRITE, iov, 1, call->request_size);
358 359
	msg.msg_control		= NULL;
	msg.msg_controllen	= 0;
D
David Howells 已提交
360
	msg.msg_flags		= MSG_WAITALL | (call->write_iter ? MSG_MORE : 0);
361

362
	ret = rxrpc_kernel_send_data(call->net->socket, rxcall,
363 364
				     &msg, call->request_size,
				     afs_notify_end_request_tx);
365 366 367
	if (ret < 0)
		goto error_do_abort;

D
David Howells 已提交
368 369 370 371 372 373 374 375 376 377 378 379
	if (call->write_iter) {
		msg.msg_iter = *call->write_iter;
		msg.msg_flags &= ~MSG_MORE;
		trace_afs_send_data(call, &msg);

		ret = rxrpc_kernel_send_data(call->net->socket,
					     call->rxcall, &msg,
					     iov_iter_count(&msg.msg_iter),
					     afs_notify_end_request_tx);
		*call->write_iter = msg.msg_iter;

		trace_afs_sent_data(call, &msg, ret);
380 381 382 383
		if (ret < 0)
			goto error_do_abort;
	}

384 385
	/* Note that at this point, we may have received the reply or an abort
	 * - and an asynchronous call may already have completed.
386 387 388
	 *
	 * afs_wait_for_call_to_complete(call, ac)
	 * must be called to synchronously clean up.
389
	 */
390
	return;
391 392

error_do_abort:
393
	if (ret != -ECONNABORTED) {
394 395
		rxrpc_kernel_abort_call(call->net->socket, rxcall,
					RX_USER_ABORT, ret, "KSD");
396
	} else {
397
		len = 0;
398
		iov_iter_kvec(&msg.msg_iter, READ, NULL, 0, 0);
399
		rxrpc_kernel_recv_data(call->net->socket, rxcall,
400
				       &msg.msg_iter, &len, false,
401
				       &call->abort_code, &call->service_id);
402 403
		ac->abort_code = call->abort_code;
		ac->responded = true;
404
	}
405 406
	call->error = ret;
	trace_afs_call_done(call);
407
error_kill_call:
408 409
	if (call->type->done)
		call->type->done(call);
410 411 412 413 414 415 416 417 418 419 420 421 422 423 424

	/* We need to dispose of the extra ref we grabbed for an async call.
	 * The call, however, might be queued on afs_async_calls and we need to
	 * make sure we don't get any more notifications that might requeue it.
	 */
	if (call->rxcall) {
		rxrpc_kernel_end_call(call->net->socket, call->rxcall);
		call->rxcall = NULL;
	}
	if (call->async) {
		if (cancel_work_sync(&call->async_work))
			afs_put_call(call);
		afs_put_call(call);
	}

425
	ac->error = ret;
426
	call->state = AFS_CALL_COMPLETE;
427 428 429
	_leave(" = %d", ret);
}

430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462
/*
 * Log remote abort codes that indicate that we have a protocol disagreement
 * with the server.
 */
static void afs_log_error(struct afs_call *call, s32 remote_abort)
{
	static int max = 0;
	const char *msg;
	int m;

	switch (remote_abort) {
	case RX_EOF:		 msg = "unexpected EOF";	break;
	case RXGEN_CC_MARSHAL:	 msg = "client marshalling";	break;
	case RXGEN_CC_UNMARSHAL: msg = "client unmarshalling";	break;
	case RXGEN_SS_MARSHAL:	 msg = "server marshalling";	break;
	case RXGEN_SS_UNMARSHAL: msg = "server unmarshalling";	break;
	case RXGEN_DECODE:	 msg = "opcode decode";		break;
	case RXGEN_SS_XDRFREE:	 msg = "server XDR cleanup";	break;
	case RXGEN_CC_XDRFREE:	 msg = "client XDR cleanup";	break;
	case -32:		 msg = "insufficient data";	break;
	default:
		return;
	}

	m = max;
	if (m < 3) {
		max = m + 1;
		pr_notice("kAFS: Peer reported %s failure on %s [%pISp]\n",
			  msg, call->type->name,
			  &call->alist->addrs[call->addr_ix].transport);
	}
}

463 464 465 466 467
/*
 * deliver messages to a call
 */
static void afs_deliver_to_call(struct afs_call *call)
{
468
	enum afs_call_state state;
469
	size_t len;
470
	u32 abort_code, remote_abort = 0;
471 472
	int ret;

473 474
	_enter("%s", call->type->name);

475 476 477 478 479
	while (state = READ_ONCE(call->state),
	       state == AFS_CALL_CL_AWAIT_REPLY ||
	       state == AFS_CALL_SV_AWAIT_OP_ID ||
	       state == AFS_CALL_SV_AWAIT_REQUEST ||
	       state == AFS_CALL_SV_AWAIT_ACK
480
	       ) {
481
		if (state == AFS_CALL_SV_AWAIT_ACK) {
482
			len = 0;
483
			iov_iter_kvec(&call->def_iter, READ, NULL, 0, 0);
484
			ret = rxrpc_kernel_recv_data(call->net->socket,
485
						     call->rxcall, &call->def_iter,
486
						     &len, false, &remote_abort,
487
						     &call->service_id);
488
			trace_afs_receive_data(call, &call->def_iter, false, ret);
D
David Howells 已提交
489

490 491
			if (ret == -EINPROGRESS || ret == -EAGAIN)
				return;
492 493 494
			if (ret < 0 || ret == 1) {
				if (ret == 1)
					ret = 0;
495
				goto call_complete;
496
			}
497
			return;
498 499
		}

D
David Howells 已提交
500
		if (!call->have_reply_time &&
501 502 503
		    rxrpc_kernel_get_reply_time(call->net->socket,
						call->rxcall,
						&call->reply_time))
D
David Howells 已提交
504
			call->have_reply_time = true;
505

506
		ret = call->type->deliver(call);
507
		state = READ_ONCE(call->state);
508 509
		if (ret == 0 && call->unmarshalling_error)
			ret = -EBADMSG;
510 511
		switch (ret) {
		case 0:
512
			afs_queue_call_work(call);
513
			if (state == AFS_CALL_CL_PROC_REPLY) {
514
				if (call->op)
515
					set_bit(AFS_SERVER_FL_MAY_HAVE_CB,
516
						&call->op->server->flags);
517
				goto call_complete;
518
			}
519
			ASSERTCMP(state, >, AFS_CALL_CL_PROC_REPLY);
520 521 522 523
			goto done;
		case -EINPROGRESS:
		case -EAGAIN:
			goto out;
524
		case -ECONNABORTED:
525
			ASSERTCMP(state, ==, AFS_CALL_COMPLETE);
526
			afs_log_error(call, call->abort_code);
527
			goto done;
528
		case -ENOTSUPP:
529
			abort_code = RXGEN_OPCODE;
530
			rxrpc_kernel_abort_call(call->net->socket, call->rxcall,
531
						abort_code, ret, "KIV");
532
			goto local_abort;
533 534 535
		case -EIO:
			pr_err("kAFS: Call %u in bad state %u\n",
			       call->debug_id, state);
536
			fallthrough;
537 538 539
		case -ENODATA:
		case -EBADMSG:
		case -EMSGSIZE:
540 541
		case -ENOMEM:
		case -EFAULT:
542
			abort_code = RXGEN_CC_UNMARSHAL;
543
			if (state != AFS_CALL_CL_AWAIT_REPLY)
544
				abort_code = RXGEN_SS_UNMARSHAL;
545
			rxrpc_kernel_abort_call(call->net->socket, call->rxcall,
546
						abort_code, ret, "KUM");
547
			goto local_abort;
548
		default:
549
			abort_code = RX_CALL_DEAD;
550 551 552
			rxrpc_kernel_abort_call(call->net->socket, call->rxcall,
						abort_code, ret, "KER");
			goto local_abort;
553
		}
554 555
	}

556
done:
557 558
	if (call->type->done)
		call->type->done(call);
559
out:
560
	_leave("");
561 562
	return;

563 564
local_abort:
	abort_code = 0;
565
call_complete:
566 567
	afs_set_call_complete(call, ret, remote_abort);
	state = AFS_CALL_COMPLETE;
568
	goto done;
569 570 571
}

/*
572
 * Wait synchronously for a call to complete and clean up the call struct.
573
 */
574 575
long afs_wait_for_call_to_complete(struct afs_call *call,
				   struct afs_addr_cursor *ac)
576
{
577
	long ret;
578
	bool rxrpc_complete = false;
579 580 581 582 583

	DECLARE_WAITQUEUE(myself, current);

	_enter("");

584 585 586 587
	ret = call->error;
	if (ret < 0)
		goto out;

588 589
	add_wait_queue(&call->waitq, &myself);
	for (;;) {
590
		set_current_state(TASK_UNINTERRUPTIBLE);
591 592

		/* deliver any messages that are in the queue */
593 594
		if (!afs_check_call_state(call, AFS_CALL_COMPLETE) &&
		    call->need_attention) {
595
			call->need_attention = false;
596 597 598 599 600
			__set_current_state(TASK_RUNNING);
			afs_deliver_to_call(call);
			continue;
		}

601
		if (afs_check_call_state(call, AFS_CALL_COMPLETE))
602
			break;
603

604
		if (!rxrpc_kernel_check_life(call->net->socket, call->rxcall)) {
605 606 607 608 609
			/* rxrpc terminated the call. */
			rxrpc_complete = true;
			break;
		}

610
		schedule();
611 612 613 614 615
	}

	remove_wait_queue(&call->waitq, &myself);
	__set_current_state(TASK_RUNNING);

616
	if (!afs_check_call_state(call, AFS_CALL_COMPLETE)) {
617 618 619 620 621 622 623 624 625
		if (rxrpc_complete) {
			afs_set_call_complete(call, call->error, call->abort_code);
		} else {
			/* Kill off the call if it's still live. */
			_debug("call interrupted");
			if (rxrpc_kernel_abort_call(call->net->socket, call->rxcall,
						    RX_USER_ABORT, -EINTR, "KWI"))
				afs_set_call_complete(call, -EINTR, 0);
		}
626 627
	}

628
	spin_lock_bh(&call->state_lock);
629 630
	ac->abort_code = call->abort_code;
	ac->error = call->error;
631
	spin_unlock_bh(&call->state_lock);
632 633 634 635

	ret = ac->error;
	switch (ret) {
	case 0:
636 637 638
		ret = call->ret0;
		call->ret0 = 0;

639
		fallthrough;
640 641 642
	case -ECONNABORTED:
		ac->responded = true;
		break;
643 644
	}

645
out:
646
	_debug("call complete");
647
	afs_put_call(call);
648
	_leave(" = %p", (void *)ret);
649 650 651 652 653 654
	return ret;
}

/*
 * wake up a waiting call
 */
655 656
static void afs_wake_up_call_waiter(struct sock *sk, struct rxrpc_call *rxcall,
				    unsigned long call_user_ID)
657
{
658 659 660
	struct afs_call *call = (struct afs_call *)call_user_ID;

	call->need_attention = true;
661 662 663 664 665 666
	wake_up(&call->waitq);
}

/*
 * wake up an asynchronous call
 */
667 668
static void afs_wake_up_async_call(struct sock *sk, struct rxrpc_call *rxcall,
				   unsigned long call_user_ID)
669
{
670
	struct afs_call *call = (struct afs_call *)call_user_ID;
671
	int u;
672

D
David Howells 已提交
673
	trace_afs_notify_call(rxcall, call);
674
	call->need_attention = true;
675

676
	u = atomic_fetch_add_unless(&call->usage, 1, 0);
677
	if (u != 0) {
D
David Howells 已提交
678
		trace_afs_call(call, afs_call_trace_wake, u + 1,
679
			       atomic_read(&call->net->nr_outstanding_calls),
680 681 682 683 684
			       __builtin_return_address(0));

		if (!queue_work(afs_async_calls, &call->async_work))
			afs_put_call(call);
	}
685 686 687
}

/*
688 689
 * Perform I/O processing on an asynchronous call.  The work item carries a ref
 * to the call struct that we either need to release or to pass on.
690
 */
691
static void afs_process_async_call(struct work_struct *work)
692
{
693 694
	struct afs_call *call = container_of(work, struct afs_call, async_work);

695 696
	_enter("");

697 698
	if (call->state < AFS_CALL_COMPLETE && call->need_attention) {
		call->need_attention = false;
699
		afs_deliver_to_call(call);
700
	}
701

702
	afs_put_call(call);
703 704 705
	_leave("");
}

706 707 708 709 710 711 712 713 714 715
static void afs_rx_attach(struct rxrpc_call *rxcall, unsigned long user_call_ID)
{
	struct afs_call *call = (struct afs_call *)user_call_ID;

	call->rxcall = rxcall;
}

/*
 * Charge the incoming call preallocation.
 */
716
void afs_charge_preallocation(struct work_struct *work)
717
{
718 719 720
	struct afs_net *net =
		container_of(work, struct afs_net, charge_preallocation_work);
	struct afs_call *call = net->spare_incoming_call;
721 722 723

	for (;;) {
		if (!call) {
724
			call = afs_alloc_call(net, &afs_RXCMxxxx, GFP_KERNEL);
725 726 727
			if (!call)
				break;

728
			call->drop_ref = true;
D
David Howells 已提交
729
			call->async = true;
730
			call->state = AFS_CALL_SV_AWAIT_OP_ID;
D
David Howells 已提交
731
			init_waitqueue_head(&call->waitq);
732
			afs_extract_to_tmp(call);
733 734
		}

735
		if (rxrpc_kernel_charge_accept(net->socket,
736 737 738
					       afs_wake_up_async_call,
					       afs_rx_attach,
					       (unsigned long)call,
739 740
					       GFP_KERNEL,
					       call->debug_id) < 0)
741 742 743
			break;
		call = NULL;
	}
744
	net->spare_incoming_call = call;
745 746 747 748 749 750 751 752 753 754 755
}

/*
 * Discard a preallocated call when a socket is shut down.
 */
static void afs_rx_discard_new_call(struct rxrpc_call *rxcall,
				    unsigned long user_call_ID)
{
	struct afs_call *call = (struct afs_call *)user_call_ID;

	call->rxcall = NULL;
756
	afs_put_call(call);
757 758
}

759 760 761
/*
 * Notification of an incoming call.
 */
762 763
static void afs_rx_new_call(struct sock *sk, struct rxrpc_call *rxcall,
			    unsigned long user_call_ID)
764
{
765 766 767
	struct afs_net *net = afs_sock2net(sk);

	queue_work(afs_wq, &net->charge_preallocation_work);
768 769
}

770
/*
771 772
 * Grab the operation ID from an incoming cache manager call.  The socket
 * buffer is discarded on error or if we don't yet have sufficient data.
773
 */
774
static int afs_deliver_cm_op_id(struct afs_call *call)
775
{
776
	int ret;
777

778
	_enter("{%zu}", iov_iter_count(call->iter));
779 780

	/* the operation ID forms the first four bytes of the request data */
781
	ret = afs_extract_data(call, true);
782 783
	if (ret < 0)
		return ret;
784

785
	call->operation_ID = ntohl(call->tmp);
786
	afs_set_call_state(call, AFS_CALL_SV_AWAIT_OP_ID, AFS_CALL_SV_AWAIT_REQUEST);
787 788 789 790 791 792

	/* ask the cache manager to route the call (it'll change the call type
	 * if successful) */
	if (!afs_cm_incoming_call(call))
		return -ENOTSUPP;

D
David Howells 已提交
793 794
	trace_afs_cb_call(call);

795 796
	/* pass responsibility for the remainer of this message off to the
	 * cache manager op */
797
	return call->type->deliver(call);
798 799
}

800 801 802 803 804 805 806 807 808 809
/*
 * Advance the AFS call state when an RxRPC service call ends the transmit
 * phase.
 */
static void afs_notify_end_reply_tx(struct sock *sock,
				    struct rxrpc_call *rxcall,
				    unsigned long call_user_ID)
{
	struct afs_call *call = (struct afs_call *)call_user_ID;

810
	afs_set_call_state(call, AFS_CALL_SV_REPLYING, AFS_CALL_SV_AWAIT_ACK);
811 812
}

813 814 815 816 817
/*
 * send an empty reply
 */
void afs_send_empty_reply(struct afs_call *call)
{
818
	struct afs_net *net = call->net;
819 820 821 822
	struct msghdr msg;

	_enter("");

823
	rxrpc_kernel_set_tx_length(net->socket, call->rxcall, 0);
824

825 826
	msg.msg_name		= NULL;
	msg.msg_namelen		= 0;
827
	iov_iter_kvec(&msg.msg_iter, WRITE, NULL, 0, 0);
828 829 830 831
	msg.msg_control		= NULL;
	msg.msg_controllen	= 0;
	msg.msg_flags		= 0;

832
	switch (rxrpc_kernel_send_data(net->socket, call->rxcall, &msg, 0,
833
				       afs_notify_end_reply_tx)) {
834 835 836 837 838 839
	case 0:
		_leave(" [replied]");
		return;

	case -ENOMEM:
		_debug("oom");
840
		rxrpc_kernel_abort_call(net->socket, call->rxcall,
841
					RXGEN_SS_MARSHAL, -ENOMEM, "KOO");
842
		fallthrough;
843 844 845 846 847 848
	default:
		_leave(" [error]");
		return;
	}
}

849 850 851 852 853
/*
 * send a simple reply
 */
void afs_send_simple_reply(struct afs_call *call, const void *buf, size_t len)
{
854
	struct afs_net *net = call->net;
855
	struct msghdr msg;
856
	struct kvec iov[1];
857
	int n;
858 859 860

	_enter("");

861
	rxrpc_kernel_set_tx_length(net->socket, call->rxcall, len);
862

863 864 865 866
	iov[0].iov_base		= (void *) buf;
	iov[0].iov_len		= len;
	msg.msg_name		= NULL;
	msg.msg_namelen		= 0;
867
	iov_iter_kvec(&msg.msg_iter, WRITE, iov, 1, len);
868 869 870 871
	msg.msg_control		= NULL;
	msg.msg_controllen	= 0;
	msg.msg_flags		= 0;

872
	n = rxrpc_kernel_send_data(net->socket, call->rxcall, &msg, len,
873
				   afs_notify_end_reply_tx);
874
	if (n >= 0) {
875
		/* Success */
876 877
		_leave(" [replied]");
		return;
878
	}
879

880
	if (n == -ENOMEM) {
881
		_debug("oom");
882
		rxrpc_kernel_abort_call(net->socket, call->rxcall,
883
					RXGEN_SS_MARSHAL, -ENOMEM, "KOO");
884
	}
885
	_leave(" [error]");
886 887
}

888
/*
889
 * Extract a piece of data from the received data socket buffers.
890
 */
891
int afs_extract_data(struct afs_call *call, bool want_more)
892
{
893
	struct afs_net *net = call->net;
894
	struct iov_iter *iter = call->iter;
895
	enum afs_call_state state;
896
	u32 remote_abort = 0;
897
	int ret;
898

899 900
	_enter("{%s,%zu,%zu},%d",
	       call->type->name, call->iov_len, iov_iter_count(iter), want_more);
901

902
	ret = rxrpc_kernel_recv_data(net->socket, call->rxcall, iter,
903
				     &call->iov_len, want_more, &remote_abort,
904
				     &call->service_id);
905 906
	if (ret == 0 || ret == -EAGAIN)
		return ret;
907

908
	state = READ_ONCE(call->state);
909
	if (ret == 1) {
910 911 912
		switch (state) {
		case AFS_CALL_CL_AWAIT_REPLY:
			afs_set_call_state(call, state, AFS_CALL_CL_PROC_REPLY);
913
			break;
914 915
		case AFS_CALL_SV_AWAIT_REQUEST:
			afs_set_call_state(call, state, AFS_CALL_SV_REPLYING);
916
			break;
917 918
		case AFS_CALL_COMPLETE:
			kdebug("prem complete %d", call->error);
919
			return afs_io_error(call, afs_io_error_extract);
920 921 922 923
		default:
			break;
		}
		return 0;
924
	}
925

926
	afs_set_call_complete(call, ret, remote_abort);
927
	return ret;
928
}
D
David Howells 已提交
929 930 931 932

/*
 * Log protocol error production.
 */
933
noinline int afs_protocol_error(struct afs_call *call,
934
				enum afs_eproto_cause cause)
D
David Howells 已提交
935
{
936
	trace_afs_protocol_error(call, cause);
937 938
	if (call)
		call->unmarshalling_error = true;
939
	return -EBADMSG;
D
David Howells 已提交
940
}