rxrpc.c 22.8 KB
Newer Older
1
// SPDX-License-Identifier: GPL-2.0-or-later
2 3 4 5 6 7
/* Maintain an RxRPC server socket to do AFS communications through
 *
 * Copyright (C) 2007 Red Hat, Inc. All Rights Reserved.
 * Written by David Howells (dhowells@redhat.com)
 */

8
#include <linux/slab.h>
9 10
#include <linux/sched/signal.h>

11 12 13 14
#include <net/sock.h>
#include <net/af_rxrpc.h>
#include "internal.h"
#include "afs_cm.h"
15
#include "protocol_yfs.h"
16

17
struct workqueue_struct *afs_async_calls;
18

19 20 21
static void afs_wake_up_call_waiter(struct sock *, struct rxrpc_call *, unsigned long);
static void afs_wake_up_async_call(struct sock *, struct rxrpc_call *, unsigned long);
static void afs_process_async_call(struct work_struct *);
22 23
static void afs_rx_new_call(struct sock *, struct rxrpc_call *, unsigned long);
static void afs_rx_discard_new_call(struct rxrpc_call *, unsigned long);
24
static int afs_deliver_cm_op_id(struct afs_call *);
25 26 27

/* asynchronous incoming call initial processing */
static const struct afs_call_type afs_RXCMxxxx = {
D
David Howells 已提交
28
	.name		= "CB.xxxx",
29 30 31 32 33 34 35
	.deliver	= afs_deliver_cm_op_id,
};

/*
 * open an RxRPC socket and bind it to be a server for callback notifications
 * - the socket is left in blocking mode and non-blocking ops use MSG_DONTWAIT
 */
36
int afs_open_socket(struct afs_net *net)
37 38 39 40 41 42 43
{
	struct sockaddr_rxrpc srx;
	struct socket *socket;
	int ret;

	_enter("");

44
	ret = sock_create_kern(net->net, AF_RXRPC, SOCK_DGRAM, PF_INET6, &socket);
45 46
	if (ret < 0)
		goto error_1;
47 48 49 50

	socket->sk->sk_allocation = GFP_NOFS;

	/* bind the callback manager's address to make this a server socket */
51
	memset(&srx, 0, sizeof(srx));
52 53 54
	srx.srx_family			= AF_RXRPC;
	srx.srx_service			= CM_SERVICE;
	srx.transport_type		= SOCK_DGRAM;
55 56 57
	srx.transport_len		= sizeof(srx.transport.sin6);
	srx.transport.sin6.sin6_family	= AF_INET6;
	srx.transport.sin6.sin6_port	= htons(AFS_CM_PORT);
58

59 60
	ret = rxrpc_sock_set_min_security_level(socket->sk,
						RXRPC_SECURITY_ENCRYPT);
61 62 63
	if (ret < 0)
		goto error_2;

64
	ret = kernel_bind(socket, (struct sockaddr *) &srx, sizeof(srx));
65 66 67 68
	if (ret == -EADDRINUSE) {
		srx.transport.sin6.sin6_port = 0;
		ret = kernel_bind(socket, (struct sockaddr *) &srx, sizeof(srx));
	}
69 70 71
	if (ret < 0)
		goto error_2;

72 73 74 75 76
	srx.srx_service = YFS_CM_SERVICE;
	ret = kernel_bind(socket, (struct sockaddr *) &srx, sizeof(srx));
	if (ret < 0)
		goto error_2;

77 78 79 80 81 82
	/* Ideally, we'd turn on service upgrade here, but we can't because
	 * OpenAFS is buggy and leaks the userStatus field from packet to
	 * packet and between FS packets and CB packets - so if we try to do an
	 * upgrade on an FS packet, OpenAFS will leak that into the CB packet
	 * it sends back to us.
	 */
83

84 85
	rxrpc_kernel_new_call_notification(socket, afs_rx_new_call,
					   afs_rx_discard_new_call);
86

87 88 89
	ret = kernel_listen(socket, INT_MAX);
	if (ret < 0)
		goto error_2;
90

91 92
	net->socket = socket;
	afs_charge_preallocation(&net->charge_preallocation_work);
93 94
	_leave(" = 0");
	return 0;
95 96 97 98 99 100

error_2:
	sock_release(socket);
error_1:
	_leave(" = %d", ret);
	return ret;
101 102 103 104 105
}

/*
 * close the RxRPC socket AFS was using
 */
106
void afs_close_socket(struct afs_net *net)
107 108 109
{
	_enter("");

110
	kernel_listen(net->socket, 0);
111 112
	flush_workqueue(afs_async_calls);

113 114 115
	if (net->spare_incoming_call) {
		afs_put_call(net->spare_incoming_call);
		net->spare_incoming_call = NULL;
116 117
	}

118
	_debug("outstanding %u", atomic_read(&net->nr_outstanding_calls));
119 120
	wait_var_event(&net->nr_outstanding_calls,
		       !atomic_read(&net->nr_outstanding_calls));
121 122
	_debug("no outstanding calls");

123
	kernel_sock_shutdown(net->socket, SHUT_RDWR);
124
	flush_workqueue(afs_async_calls);
125
	sock_release(net->socket);
126 127 128 129 130

	_debug("dework");
	_leave("");
}

D
David Howells 已提交
131
/*
132
 * Allocate a call.
D
David Howells 已提交
133
 */
134 135
static struct afs_call *afs_alloc_call(struct afs_net *net,
				       const struct afs_call_type *type,
136
				       gfp_t gfp)
D
David Howells 已提交
137
{
138 139
	struct afs_call *call;
	int o;
D
David Howells 已提交
140

141 142 143
	call = kzalloc(sizeof(*call), gfp);
	if (!call)
		return NULL;
D
David Howells 已提交
144

145
	call->type = type;
146
	call->net = net;
147
	call->debug_id = atomic_inc_return(&rxrpc_debug_id);
148 149 150
	atomic_set(&call->usage, 1);
	INIT_WORK(&call->async_work, afs_process_async_call);
	init_waitqueue_head(&call->waitq);
151
	spin_lock_init(&call->state_lock);
152
	call->iter = &call->def_iter;
153

154
	o = atomic_inc_return(&net->nr_outstanding_calls);
155 156 157
	trace_afs_call(call, afs_call_trace_alloc, 1, o,
		       __builtin_return_address(0));
	return call;
D
David Howells 已提交
158 159
}

160
/*
161
 * Dispose of a reference on a call.
162
 */
163
void afs_put_call(struct afs_call *call)
164
{
165
	struct afs_net *net = call->net;
166
	int n = atomic_dec_return(&call->usage);
167
	int o = atomic_read(&net->nr_outstanding_calls);
168

D
David Howells 已提交
169
	trace_afs_call(call, afs_call_trace_put, n, o,
170 171 172 173 174 175 176 177
		       __builtin_return_address(0));

	ASSERTCMP(n, >=, 0);
	if (n == 0) {
		ASSERT(!work_pending(&call->async_work));
		ASSERT(call->type->name != NULL);

		if (call->rxcall) {
178
			rxrpc_kernel_end_call(net->socket, call->rxcall);
179 180 181 182 183
			call->rxcall = NULL;
		}
		if (call->type->destructor)
			call->type->destructor(call);

184
		afs_unuse_server_notime(call->net, call->server, afs_server_trace_put_call);
185
		afs_put_addrlist(call->alist);
186 187 188 189
		kfree(call->request);

		trace_afs_call(call, afs_call_trace_free, 0, o,
			       __builtin_return_address(0));
190 191 192
		kfree(call);

		o = atomic_dec_return(&net->nr_outstanding_calls);
193
		if (o == 0)
194
			wake_up_var(&net->nr_outstanding_calls);
195
	}
196 197
}

198 199 200 201 202 203 204 205 206 207 208
static struct afs_call *afs_get_call(struct afs_call *call,
				     enum afs_call_trace why)
{
	int u = atomic_inc_return(&call->usage);

	trace_afs_call(call, why, u,
		       atomic_read(&call->net->nr_outstanding_calls),
		       __builtin_return_address(0));
	return call;
}

209
/*
210
 * Queue the call for actual work.
211
 */
212
static void afs_queue_call_work(struct afs_call *call)
213
{
214 215
	if (call->type->work) {
		INIT_WORK(&call->work, call->type->work);
216

217
		afs_get_call(call, afs_call_trace_work);
218 219 220
		if (!queue_work(afs_wq, &call->work))
			afs_put_call(call);
	}
221 222
}

223 224 225
/*
 * allocate a call with flat request and reply buffers
 */
226 227
struct afs_call *afs_alloc_flat_call(struct afs_net *net,
				     const struct afs_call_type *type,
228
				     size_t request_size, size_t reply_max)
229 230 231
{
	struct afs_call *call;

232
	call = afs_alloc_call(net, type, GFP_NOFS);
233 234 235 236
	if (!call)
		goto nomem_call;

	if (request_size) {
237
		call->request_size = request_size;
238 239
		call->request = kmalloc(request_size, GFP_NOFS);
		if (!call->request)
D
David Howells 已提交
240
			goto nomem_free;
241 242
	}

243
	if (reply_max) {
244
		call->reply_max = reply_max;
245
		call->buffer = kmalloc(reply_max, GFP_NOFS);
246
		if (!call->buffer)
D
David Howells 已提交
247
			goto nomem_free;
248 249
	}

250
	afs_extract_to_buf(call, call->reply_max);
251
	call->operation_ID = type->op;
252 253 254
	init_waitqueue_head(&call->waitq);
	return call;

D
David Howells 已提交
255
nomem_free:
256
	afs_put_call(call);
257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273
nomem_call:
	return NULL;
}

/*
 * clean up a call with flat buffer
 */
void afs_flat_call_destructor(struct afs_call *call)
{
	_enter("");

	kfree(call->request);
	call->request = NULL;
	kfree(call->buffer);
	call->buffer = NULL;
}

274 275 276 277 278 279 280 281 282
/*
 * Advance the AFS call state when the RxRPC call ends the transmit phase.
 */
static void afs_notify_end_request_tx(struct sock *sock,
				      struct rxrpc_call *rxcall,
				      unsigned long call_user_ID)
{
	struct afs_call *call = (struct afs_call *)call_user_ID;

283
	afs_set_call_state(call, AFS_CALL_CL_REQUESTING, AFS_CALL_CL_AWAIT_REPLY);
284 285
}

286
/*
287 288
 * Initiate a call and synchronously queue up the parameters for dispatch.  Any
 * error is stored into the call struct, which the caller must check for.
289
 */
290
void afs_make_call(struct afs_addr_cursor *ac, struct afs_call *call, gfp_t gfp)
291
{
292
	struct sockaddr_rxrpc *srx = &ac->alist->addrs[ac->index];
293 294 295
	struct rxrpc_call *rxcall;
	struct msghdr msg;
	struct kvec iov[1];
296
	size_t len;
297
	s64 tx_total_len;
298 299
	int ret;

300
	_enter(",{%pISp},", &srx->transport);
301

D
David Howells 已提交
302 303 304
	ASSERT(call->type != NULL);
	ASSERT(call->type->name != NULL);

305 306
	_debug("____MAKE %p{%s,%x} [%d]____",
	       call, call->type->name, key_serial(call->key),
307
	       atomic_read(&call->net->nr_outstanding_calls));
D
David Howells 已提交
308

309 310
	call->addr_ix = ac->index;
	call->alist = afs_get_addrlist(ac->alist);
311

312 313 314 315 316
	/* Work out the length we're going to transmit.  This is awkward for
	 * calls such as FS.StoreData where there's an extra injection of data
	 * after the initial fixed part.
	 */
	tx_total_len = call->request_size;
D
David Howells 已提交
317 318
	if (call->write_iter)
		tx_total_len += iov_iter_count(call->write_iter);
319

320 321 322
	/* If the call is going to be asynchronous, we need an extra ref for
	 * the call to hold itself so the caller need not hang on to its ref.
	 */
323
	if (call->async) {
324
		afs_get_call(call, afs_call_trace_get);
325 326
		call->drop_ref = true;
	}
327

328
	/* create a call */
329
	rxcall = rxrpc_kernel_begin_call(call->net->socket, srx, call->key,
330 331
					 (unsigned long)call,
					 tx_total_len, gfp,
332
					 (call->async ?
D
David Howells 已提交
333
					  afs_wake_up_async_call :
334
					  afs_wake_up_call_waiter),
335
					 call->upgrade,
336 337
					 (call->intr ? RXRPC_PREINTERRUPTIBLE :
					  RXRPC_UNINTERRUPTIBLE),
338
					 call->debug_id);
339 340
	if (IS_ERR(rxcall)) {
		ret = PTR_ERR(rxcall);
341
		call->error = ret;
342 343 344 345 346
		goto error_kill_call;
	}

	call->rxcall = rxcall;

347 348 349 350
	if (call->max_lifespan)
		rxrpc_kernel_set_max_life(call->net->socket, rxcall,
					  call->max_lifespan);

351 352 353 354 355 356
	/* send the request */
	iov[0].iov_base	= call->request;
	iov[0].iov_len	= call->request_size;

	msg.msg_name		= NULL;
	msg.msg_namelen		= 0;
357
	iov_iter_kvec(&msg.msg_iter, WRITE, iov, 1, call->request_size);
358 359
	msg.msg_control		= NULL;
	msg.msg_controllen	= 0;
D
David Howells 已提交
360
	msg.msg_flags		= MSG_WAITALL | (call->write_iter ? MSG_MORE : 0);
361

362
	ret = rxrpc_kernel_send_data(call->net->socket, rxcall,
363 364
				     &msg, call->request_size,
				     afs_notify_end_request_tx);
365 366 367
	if (ret < 0)
		goto error_do_abort;

D
David Howells 已提交
368 369 370 371 372 373 374 375 376 377 378 379
	if (call->write_iter) {
		msg.msg_iter = *call->write_iter;
		msg.msg_flags &= ~MSG_MORE;
		trace_afs_send_data(call, &msg);

		ret = rxrpc_kernel_send_data(call->net->socket,
					     call->rxcall, &msg,
					     iov_iter_count(&msg.msg_iter),
					     afs_notify_end_request_tx);
		*call->write_iter = msg.msg_iter;

		trace_afs_sent_data(call, &msg, ret);
380 381 382 383
		if (ret < 0)
			goto error_do_abort;
	}

384 385
	/* Note that at this point, we may have received the reply or an abort
	 * - and an asynchronous call may already have completed.
386 387 388
	 *
	 * afs_wait_for_call_to_complete(call, ac)
	 * must be called to synchronously clean up.
389
	 */
390
	return;
391 392

error_do_abort:
393
	if (ret != -ECONNABORTED) {
394 395
		rxrpc_kernel_abort_call(call->net->socket, rxcall,
					RX_USER_ABORT, ret, "KSD");
396
	} else {
397
		len = 0;
398
		iov_iter_kvec(&msg.msg_iter, READ, NULL, 0, 0);
399
		rxrpc_kernel_recv_data(call->net->socket, rxcall,
400
				       &msg.msg_iter, &len, false,
401
				       &call->abort_code, &call->service_id);
402 403
		ac->abort_code = call->abort_code;
		ac->responded = true;
404
	}
405 406
	call->error = ret;
	trace_afs_call_done(call);
407
error_kill_call:
408 409
	if (call->type->done)
		call->type->done(call);
410 411 412 413 414 415 416 417 418 419 420 421 422 423 424

	/* We need to dispose of the extra ref we grabbed for an async call.
	 * The call, however, might be queued on afs_async_calls and we need to
	 * make sure we don't get any more notifications that might requeue it.
	 */
	if (call->rxcall) {
		rxrpc_kernel_end_call(call->net->socket, call->rxcall);
		call->rxcall = NULL;
	}
	if (call->async) {
		if (cancel_work_sync(&call->async_work))
			afs_put_call(call);
		afs_put_call(call);
	}

425
	ac->error = ret;
426
	call->state = AFS_CALL_COMPLETE;
427 428 429
	_leave(" = %d", ret);
}

430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462
/*
 * Log remote abort codes that indicate that we have a protocol disagreement
 * with the server.
 */
static void afs_log_error(struct afs_call *call, s32 remote_abort)
{
	static int max = 0;
	const char *msg;
	int m;

	switch (remote_abort) {
	case RX_EOF:		 msg = "unexpected EOF";	break;
	case RXGEN_CC_MARSHAL:	 msg = "client marshalling";	break;
	case RXGEN_CC_UNMARSHAL: msg = "client unmarshalling";	break;
	case RXGEN_SS_MARSHAL:	 msg = "server marshalling";	break;
	case RXGEN_SS_UNMARSHAL: msg = "server unmarshalling";	break;
	case RXGEN_DECODE:	 msg = "opcode decode";		break;
	case RXGEN_SS_XDRFREE:	 msg = "server XDR cleanup";	break;
	case RXGEN_CC_XDRFREE:	 msg = "client XDR cleanup";	break;
	case -32:		 msg = "insufficient data";	break;
	default:
		return;
	}

	m = max;
	if (m < 3) {
		max = m + 1;
		pr_notice("kAFS: Peer reported %s failure on %s [%pISp]\n",
			  msg, call->type->name,
			  &call->alist->addrs[call->addr_ix].transport);
	}
}

463 464 465 466 467
/*
 * deliver messages to a call
 */
static void afs_deliver_to_call(struct afs_call *call)
{
468
	enum afs_call_state state;
469
	size_t len;
470
	u32 abort_code, remote_abort = 0;
471 472
	int ret;

473 474
	_enter("%s", call->type->name);

475 476 477 478 479
	while (state = READ_ONCE(call->state),
	       state == AFS_CALL_CL_AWAIT_REPLY ||
	       state == AFS_CALL_SV_AWAIT_OP_ID ||
	       state == AFS_CALL_SV_AWAIT_REQUEST ||
	       state == AFS_CALL_SV_AWAIT_ACK
480
	       ) {
481
		if (state == AFS_CALL_SV_AWAIT_ACK) {
482
			len = 0;
483
			iov_iter_kvec(&call->def_iter, READ, NULL, 0, 0);
484
			ret = rxrpc_kernel_recv_data(call->net->socket,
485
						     call->rxcall, &call->def_iter,
486
						     &len, false, &remote_abort,
487
						     &call->service_id);
488
			trace_afs_receive_data(call, &call->def_iter, false, ret);
D
David Howells 已提交
489

490 491
			if (ret == -EINPROGRESS || ret == -EAGAIN)
				return;
492 493 494
			if (ret < 0 || ret == 1) {
				if (ret == 1)
					ret = 0;
495
				goto call_complete;
496
			}
497
			return;
498 499
		}

D
David Howells 已提交
500
		if (!call->have_reply_time &&
501 502 503
		    rxrpc_kernel_get_reply_time(call->net->socket,
						call->rxcall,
						&call->reply_time))
D
David Howells 已提交
504
			call->have_reply_time = true;
505

506
		ret = call->type->deliver(call);
507
		state = READ_ONCE(call->state);
508 509
		if (ret == 0 && call->unmarshalling_error)
			ret = -EBADMSG;
510 511
		switch (ret) {
		case 0:
512
			afs_queue_call_work(call);
513
			if (state == AFS_CALL_CL_PROC_REPLY) {
514
				if (call->op)
515
					set_bit(AFS_SERVER_FL_MAY_HAVE_CB,
516
						&call->op->server->flags);
517
				goto call_complete;
518
			}
519
			ASSERTCMP(state, >, AFS_CALL_CL_PROC_REPLY);
520 521 522 523
			goto done;
		case -EINPROGRESS:
		case -EAGAIN:
			goto out;
524
		case -ECONNABORTED:
525
			ASSERTCMP(state, ==, AFS_CALL_COMPLETE);
526
			afs_log_error(call, call->abort_code);
527
			goto done;
528
		case -ENOTSUPP:
529
			abort_code = RXGEN_OPCODE;
530
			rxrpc_kernel_abort_call(call->net->socket, call->rxcall,
531
						abort_code, ret, "KIV");
532
			goto local_abort;
533 534 535
		case -EIO:
			pr_err("kAFS: Call %u in bad state %u\n",
			       call->debug_id, state);
536
			fallthrough;
537 538 539 540
		case -ENODATA:
		case -EBADMSG:
		case -EMSGSIZE:
			abort_code = RXGEN_CC_UNMARSHAL;
541
			if (state != AFS_CALL_CL_AWAIT_REPLY)
542
				abort_code = RXGEN_SS_UNMARSHAL;
543
			rxrpc_kernel_abort_call(call->net->socket, call->rxcall,
544
						abort_code, ret, "KUM");
545
			goto local_abort;
546 547 548 549 550
		default:
			abort_code = RX_USER_ABORT;
			rxrpc_kernel_abort_call(call->net->socket, call->rxcall,
						abort_code, ret, "KER");
			goto local_abort;
551
		}
552 553
	}

554
done:
555 556
	if (call->type->done)
		call->type->done(call);
557
out:
558
	_leave("");
559 560
	return;

561 562
local_abort:
	abort_code = 0;
563
call_complete:
564 565
	afs_set_call_complete(call, ret, remote_abort);
	state = AFS_CALL_COMPLETE;
566
	goto done;
567 568 569
}

/*
570
 * Wait synchronously for a call to complete and clean up the call struct.
571
 */
572 573
long afs_wait_for_call_to_complete(struct afs_call *call,
				   struct afs_addr_cursor *ac)
574
{
575
	long ret;
576
	bool rxrpc_complete = false;
577 578 579 580 581

	DECLARE_WAITQUEUE(myself, current);

	_enter("");

582 583 584 585
	ret = call->error;
	if (ret < 0)
		goto out;

586 587
	add_wait_queue(&call->waitq, &myself);
	for (;;) {
588
		set_current_state(TASK_UNINTERRUPTIBLE);
589 590

		/* deliver any messages that are in the queue */
591 592
		if (!afs_check_call_state(call, AFS_CALL_COMPLETE) &&
		    call->need_attention) {
593
			call->need_attention = false;
594 595 596 597 598
			__set_current_state(TASK_RUNNING);
			afs_deliver_to_call(call);
			continue;
		}

599
		if (afs_check_call_state(call, AFS_CALL_COMPLETE))
600
			break;
601

602
		if (!rxrpc_kernel_check_life(call->net->socket, call->rxcall)) {
603 604 605 606 607
			/* rxrpc terminated the call. */
			rxrpc_complete = true;
			break;
		}

608
		schedule();
609 610 611 612 613
	}

	remove_wait_queue(&call->waitq, &myself);
	__set_current_state(TASK_RUNNING);

614
	if (!afs_check_call_state(call, AFS_CALL_COMPLETE)) {
615 616 617 618 619 620 621 622 623
		if (rxrpc_complete) {
			afs_set_call_complete(call, call->error, call->abort_code);
		} else {
			/* Kill off the call if it's still live. */
			_debug("call interrupted");
			if (rxrpc_kernel_abort_call(call->net->socket, call->rxcall,
						    RX_USER_ABORT, -EINTR, "KWI"))
				afs_set_call_complete(call, -EINTR, 0);
		}
624 625
	}

626
	spin_lock_bh(&call->state_lock);
627 628
	ac->abort_code = call->abort_code;
	ac->error = call->error;
629
	spin_unlock_bh(&call->state_lock);
630 631 632 633

	ret = ac->error;
	switch (ret) {
	case 0:
634 635 636
		ret = call->ret0;
		call->ret0 = 0;

637
		fallthrough;
638 639 640
	case -ECONNABORTED:
		ac->responded = true;
		break;
641 642
	}

643
out:
644
	_debug("call complete");
645
	afs_put_call(call);
646
	_leave(" = %p", (void *)ret);
647 648 649 650 651 652
	return ret;
}

/*
 * wake up a waiting call
 */
653 654
static void afs_wake_up_call_waiter(struct sock *sk, struct rxrpc_call *rxcall,
				    unsigned long call_user_ID)
655
{
656 657 658
	struct afs_call *call = (struct afs_call *)call_user_ID;

	call->need_attention = true;
659 660 661 662 663 664
	wake_up(&call->waitq);
}

/*
 * wake up an asynchronous call
 */
665 666
static void afs_wake_up_async_call(struct sock *sk, struct rxrpc_call *rxcall,
				   unsigned long call_user_ID)
667
{
668
	struct afs_call *call = (struct afs_call *)call_user_ID;
669
	int u;
670

D
David Howells 已提交
671
	trace_afs_notify_call(rxcall, call);
672
	call->need_attention = true;
673

674
	u = atomic_fetch_add_unless(&call->usage, 1, 0);
675
	if (u != 0) {
D
David Howells 已提交
676
		trace_afs_call(call, afs_call_trace_wake, u + 1,
677
			       atomic_read(&call->net->nr_outstanding_calls),
678 679 680 681 682
			       __builtin_return_address(0));

		if (!queue_work(afs_async_calls, &call->async_work))
			afs_put_call(call);
	}
683 684 685
}

/*
686 687
 * Perform I/O processing on an asynchronous call.  The work item carries a ref
 * to the call struct that we either need to release or to pass on.
688
 */
689
static void afs_process_async_call(struct work_struct *work)
690
{
691 692
	struct afs_call *call = container_of(work, struct afs_call, async_work);

693 694
	_enter("");

695 696
	if (call->state < AFS_CALL_COMPLETE && call->need_attention) {
		call->need_attention = false;
697
		afs_deliver_to_call(call);
698
	}
699

700
	afs_put_call(call);
701 702 703
	_leave("");
}

704 705 706 707 708 709 710 711 712 713
static void afs_rx_attach(struct rxrpc_call *rxcall, unsigned long user_call_ID)
{
	struct afs_call *call = (struct afs_call *)user_call_ID;

	call->rxcall = rxcall;
}

/*
 * Charge the incoming call preallocation.
 */
714
void afs_charge_preallocation(struct work_struct *work)
715
{
716 717 718
	struct afs_net *net =
		container_of(work, struct afs_net, charge_preallocation_work);
	struct afs_call *call = net->spare_incoming_call;
719 720 721

	for (;;) {
		if (!call) {
722
			call = afs_alloc_call(net, &afs_RXCMxxxx, GFP_KERNEL);
723 724 725
			if (!call)
				break;

726
			call->drop_ref = true;
D
David Howells 已提交
727
			call->async = true;
728
			call->state = AFS_CALL_SV_AWAIT_OP_ID;
D
David Howells 已提交
729
			init_waitqueue_head(&call->waitq);
730
			afs_extract_to_tmp(call);
731 732
		}

733
		if (rxrpc_kernel_charge_accept(net->socket,
734 735 736
					       afs_wake_up_async_call,
					       afs_rx_attach,
					       (unsigned long)call,
737 738
					       GFP_KERNEL,
					       call->debug_id) < 0)
739 740 741
			break;
		call = NULL;
	}
742
	net->spare_incoming_call = call;
743 744 745 746 747 748 749 750 751 752 753
}

/*
 * Discard a preallocated call when a socket is shut down.
 */
static void afs_rx_discard_new_call(struct rxrpc_call *rxcall,
				    unsigned long user_call_ID)
{
	struct afs_call *call = (struct afs_call *)user_call_ID;

	call->rxcall = NULL;
754
	afs_put_call(call);
755 756
}

757 758 759
/*
 * Notification of an incoming call.
 */
760 761
static void afs_rx_new_call(struct sock *sk, struct rxrpc_call *rxcall,
			    unsigned long user_call_ID)
762
{
763 764 765
	struct afs_net *net = afs_sock2net(sk);

	queue_work(afs_wq, &net->charge_preallocation_work);
766 767
}

768
/*
769 770
 * Grab the operation ID from an incoming cache manager call.  The socket
 * buffer is discarded on error or if we don't yet have sufficient data.
771
 */
772
static int afs_deliver_cm_op_id(struct afs_call *call)
773
{
774
	int ret;
775

776
	_enter("{%zu}", iov_iter_count(call->iter));
777 778

	/* the operation ID forms the first four bytes of the request data */
779
	ret = afs_extract_data(call, true);
780 781
	if (ret < 0)
		return ret;
782

783
	call->operation_ID = ntohl(call->tmp);
784
	afs_set_call_state(call, AFS_CALL_SV_AWAIT_OP_ID, AFS_CALL_SV_AWAIT_REQUEST);
785 786 787 788 789 790

	/* ask the cache manager to route the call (it'll change the call type
	 * if successful) */
	if (!afs_cm_incoming_call(call))
		return -ENOTSUPP;

D
David Howells 已提交
791 792
	trace_afs_cb_call(call);

793 794
	/* pass responsibility for the remainer of this message off to the
	 * cache manager op */
795
	return call->type->deliver(call);
796 797
}

798 799 800 801 802 803 804 805 806 807
/*
 * Advance the AFS call state when an RxRPC service call ends the transmit
 * phase.
 */
static void afs_notify_end_reply_tx(struct sock *sock,
				    struct rxrpc_call *rxcall,
				    unsigned long call_user_ID)
{
	struct afs_call *call = (struct afs_call *)call_user_ID;

808
	afs_set_call_state(call, AFS_CALL_SV_REPLYING, AFS_CALL_SV_AWAIT_ACK);
809 810
}

811 812 813 814 815
/*
 * send an empty reply
 */
void afs_send_empty_reply(struct afs_call *call)
{
816
	struct afs_net *net = call->net;
817 818 819 820
	struct msghdr msg;

	_enter("");

821
	rxrpc_kernel_set_tx_length(net->socket, call->rxcall, 0);
822

823 824
	msg.msg_name		= NULL;
	msg.msg_namelen		= 0;
825
	iov_iter_kvec(&msg.msg_iter, WRITE, NULL, 0, 0);
826 827 828 829
	msg.msg_control		= NULL;
	msg.msg_controllen	= 0;
	msg.msg_flags		= 0;

830
	switch (rxrpc_kernel_send_data(net->socket, call->rxcall, &msg, 0,
831
				       afs_notify_end_reply_tx)) {
832 833 834 835 836 837
	case 0:
		_leave(" [replied]");
		return;

	case -ENOMEM:
		_debug("oom");
838
		rxrpc_kernel_abort_call(net->socket, call->rxcall,
839
					RX_USER_ABORT, -ENOMEM, "KOO");
840
		fallthrough;
841 842 843 844 845 846
	default:
		_leave(" [error]");
		return;
	}
}

847 848 849 850 851
/*
 * send a simple reply
 */
void afs_send_simple_reply(struct afs_call *call, const void *buf, size_t len)
{
852
	struct afs_net *net = call->net;
853
	struct msghdr msg;
854
	struct kvec iov[1];
855
	int n;
856 857 858

	_enter("");

859
	rxrpc_kernel_set_tx_length(net->socket, call->rxcall, len);
860

861 862 863 864
	iov[0].iov_base		= (void *) buf;
	iov[0].iov_len		= len;
	msg.msg_name		= NULL;
	msg.msg_namelen		= 0;
865
	iov_iter_kvec(&msg.msg_iter, WRITE, iov, 1, len);
866 867 868 869
	msg.msg_control		= NULL;
	msg.msg_controllen	= 0;
	msg.msg_flags		= 0;

870
	n = rxrpc_kernel_send_data(net->socket, call->rxcall, &msg, len,
871
				   afs_notify_end_reply_tx);
872
	if (n >= 0) {
873
		/* Success */
874 875
		_leave(" [replied]");
		return;
876
	}
877

878
	if (n == -ENOMEM) {
879
		_debug("oom");
880
		rxrpc_kernel_abort_call(net->socket, call->rxcall,
881
					RX_USER_ABORT, -ENOMEM, "KOO");
882
	}
883
	_leave(" [error]");
884 885
}

886
/*
887
 * Extract a piece of data from the received data socket buffers.
888
 */
889
int afs_extract_data(struct afs_call *call, bool want_more)
890
{
891
	struct afs_net *net = call->net;
892
	struct iov_iter *iter = call->iter;
893
	enum afs_call_state state;
894
	u32 remote_abort = 0;
895
	int ret;
896

897 898
	_enter("{%s,%zu,%zu},%d",
	       call->type->name, call->iov_len, iov_iter_count(iter), want_more);
899

900
	ret = rxrpc_kernel_recv_data(net->socket, call->rxcall, iter,
901
				     &call->iov_len, want_more, &remote_abort,
902
				     &call->service_id);
903 904
	if (ret == 0 || ret == -EAGAIN)
		return ret;
905

906
	state = READ_ONCE(call->state);
907
	if (ret == 1) {
908 909 910
		switch (state) {
		case AFS_CALL_CL_AWAIT_REPLY:
			afs_set_call_state(call, state, AFS_CALL_CL_PROC_REPLY);
911
			break;
912 913
		case AFS_CALL_SV_AWAIT_REQUEST:
			afs_set_call_state(call, state, AFS_CALL_SV_REPLYING);
914
			break;
915 916
		case AFS_CALL_COMPLETE:
			kdebug("prem complete %d", call->error);
917
			return afs_io_error(call, afs_io_error_extract);
918 919 920 921
		default:
			break;
		}
		return 0;
922
	}
923

924
	afs_set_call_complete(call, ret, remote_abort);
925
	return ret;
926
}
D
David Howells 已提交
927 928 929 930

/*
 * Log protocol error production.
 */
931
noinline int afs_protocol_error(struct afs_call *call,
932
				enum afs_eproto_cause cause)
D
David Howells 已提交
933
{
934
	trace_afs_protocol_error(call, cause);
935 936
	if (call)
		call->unmarshalling_error = true;
937
	return -EBADMSG;
D
David Howells 已提交
938
}