rxrpc.c 18.7 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11
/* Maintain an RxRPC server socket to do AFS communications through
 *
 * Copyright (C) 2007 Red Hat, Inc. All Rights Reserved.
 * Written by David Howells (dhowells@redhat.com)
 *
 * This program is free software; you can redistribute it and/or
 * modify it under the terms of the GNU General Public License
 * as published by the Free Software Foundation; either version
 * 2 of the License, or (at your option) any later version.
 */

12
#include <linux/slab.h>
13 14 15 16 17 18
#include <net/sock.h>
#include <net/af_rxrpc.h>
#include <rxrpc/packet.h>
#include "internal.h"
#include "afs_cm.h"

19
struct socket *afs_socket; /* my RxRPC socket */
20
static struct workqueue_struct *afs_async_calls;
21
static struct afs_call *afs_spare_incoming_call;
D
David Howells 已提交
22
static atomic_t afs_outstanding_calls;
23

24 25
static void afs_free_call(struct afs_call *);
static void afs_wake_up_call_waiter(struct sock *, struct rxrpc_call *, unsigned long);
26
static int afs_wait_for_call_to_complete(struct afs_call *);
27
static void afs_wake_up_async_call(struct sock *, struct rxrpc_call *, unsigned long);
28
static int afs_dont_wait_for_call_to_complete(struct afs_call *);
29
static void afs_process_async_call(struct work_struct *);
30 31
static void afs_rx_new_call(struct sock *, struct rxrpc_call *, unsigned long);
static void afs_rx_discard_new_call(struct rxrpc_call *, unsigned long);
32
static int afs_deliver_cm_op_id(struct afs_call *);
33 34 35

/* synchronous call management */
const struct afs_wait_mode afs_sync_call = {
36
	.notify_rx	= afs_wake_up_call_waiter,
37 38 39 40 41
	.wait		= afs_wait_for_call_to_complete,
};

/* asynchronous call management */
const struct afs_wait_mode afs_async_call = {
42
	.notify_rx	= afs_wake_up_async_call,
43 44 45 46 47
	.wait		= afs_dont_wait_for_call_to_complete,
};

/* asynchronous incoming call management */
static const struct afs_wait_mode afs_async_incoming_call = {
48
	.notify_rx	= afs_wake_up_async_call,
49 50 51 52
};

/* asynchronous incoming call initial processing */
static const struct afs_call_type afs_RXCMxxxx = {
D
David Howells 已提交
53
	.name		= "CB.xxxx",
54 55 56 57
	.deliver	= afs_deliver_cm_op_id,
	.abort_to_error	= afs_abort_to_error,
};

58
static void afs_charge_preallocation(struct work_struct *);
59

60
static DECLARE_WORK(afs_charge_preallocation_work, afs_charge_preallocation);
61

62 63 64 65 66 67
static int afs_wait_atomic_t(atomic_t *p)
{
	schedule();
	return 0;
}

68 69 70 71 72 73 74 75 76 77 78 79
/*
 * open an RxRPC socket and bind it to be a server for callback notifications
 * - the socket is left in blocking mode and non-blocking ops use MSG_DONTWAIT
 */
int afs_open_socket(void)
{
	struct sockaddr_rxrpc srx;
	struct socket *socket;
	int ret;

	_enter("");

80
	ret = -ENOMEM;
81
	afs_async_calls = alloc_workqueue("kafsd", WQ_MEM_RECLAIM, 0);
82 83
	if (!afs_async_calls)
		goto error_0;
84

85
	ret = sock_create_kern(&init_net, AF_RXRPC, SOCK_DGRAM, PF_INET, &socket);
86 87
	if (ret < 0)
		goto error_1;
88 89 90 91 92 93 94 95 96 97 98 99 100 101

	socket->sk->sk_allocation = GFP_NOFS;

	/* bind the callback manager's address to make this a server socket */
	srx.srx_family			= AF_RXRPC;
	srx.srx_service			= CM_SERVICE;
	srx.transport_type		= SOCK_DGRAM;
	srx.transport_len		= sizeof(srx.transport.sin);
	srx.transport.sin.sin_family	= AF_INET;
	srx.transport.sin.sin_port	= htons(AFS_CM_PORT);
	memset(&srx.transport.sin.sin_addr, 0,
	       sizeof(srx.transport.sin.sin_addr));

	ret = kernel_bind(socket, (struct sockaddr *) &srx, sizeof(srx));
102 103 104
	if (ret < 0)
		goto error_2;

105 106
	rxrpc_kernel_new_call_notification(socket, afs_rx_new_call,
					   afs_rx_discard_new_call);
107

108 109 110
	ret = kernel_listen(socket, INT_MAX);
	if (ret < 0)
		goto error_2;
111 112

	afs_socket = socket;
113
	afs_charge_preallocation(NULL);
114 115
	_leave(" = 0");
	return 0;
116 117 118 119 120 121 122 123

error_2:
	sock_release(socket);
error_1:
	destroy_workqueue(afs_async_calls);
error_0:
	_leave(" = %d", ret);
	return ret;
124 125 126 127 128 129 130 131 132
}

/*
 * close the RxRPC socket AFS was using
 */
void afs_close_socket(void)
{
	_enter("");

133 134 135 136 137 138
	if (afs_spare_incoming_call) {
		atomic_inc(&afs_outstanding_calls);
		afs_free_call(afs_spare_incoming_call);
		afs_spare_incoming_call = NULL;
	}

139
	_debug("outstanding %u", atomic_read(&afs_outstanding_calls));
140 141 142 143
	wait_on_atomic_t(&afs_outstanding_calls, afs_wait_atomic_t,
			 TASK_UNINTERRUPTIBLE);
	_debug("no outstanding calls");

144 145
	flush_workqueue(afs_async_calls);
	kernel_sock_shutdown(afs_socket, SHUT_RDWR);
146
	flush_workqueue(afs_async_calls);
147 148 149 150 151 152 153
	sock_release(afs_socket);

	_debug("dework");
	destroy_workqueue(afs_async_calls);
	_leave("");
}

D
David Howells 已提交
154 155 156 157 158 159 160 161 162 163 164 165 166 167
/*
 * free a call
 */
static void afs_free_call(struct afs_call *call)
{
	_debug("DONE %p{%s} [%d]",
	       call, call->type->name, atomic_read(&afs_outstanding_calls));

	ASSERTCMP(call->rxcall, ==, NULL);
	ASSERT(!work_pending(&call->async_work));
	ASSERT(call->type->name != NULL);

	kfree(call->request);
	kfree(call);
168 169 170

	if (atomic_dec_and_test(&afs_outstanding_calls))
		wake_up_atomic_t(&afs_outstanding_calls);
D
David Howells 已提交
171 172
}

173
/*
174
 * End a call but do not free it
175
 */
176
static void afs_end_call_nofree(struct afs_call *call)
177 178
{
	if (call->rxcall) {
179
		rxrpc_kernel_end_call(afs_socket, call->rxcall);
180 181
		call->rxcall = NULL;
	}
182 183 184 185 186 187 188 189 190 191
	if (call->type->destructor)
		call->type->destructor(call);
}

/*
 * End a call and free it
 */
static void afs_end_call(struct afs_call *call)
{
	afs_end_call_nofree(call);
192 193 194
	afs_free_call(call);
}

195 196 197 198
/*
 * allocate a call with flat request and reply buffers
 */
struct afs_call *afs_alloc_flat_call(const struct afs_call_type *type,
199
				     size_t request_size, size_t reply_max)
200 201 202 203 204 205 206
{
	struct afs_call *call;

	call = kzalloc(sizeof(*call), GFP_NOFS);
	if (!call)
		goto nomem_call;

D
David Howells 已提交
207 208 209 210 211 212
	_debug("CALL %p{%s} [%d]",
	       call, type->name, atomic_read(&afs_outstanding_calls));
	atomic_inc(&afs_outstanding_calls);

	call->type = type;
	call->request_size = request_size;
213
	call->reply_max = reply_max;
D
David Howells 已提交
214

215 216 217
	if (request_size) {
		call->request = kmalloc(request_size, GFP_NOFS);
		if (!call->request)
D
David Howells 已提交
218
			goto nomem_free;
219 220
	}

221 222
	if (reply_max) {
		call->buffer = kmalloc(reply_max, GFP_NOFS);
223
		if (!call->buffer)
D
David Howells 已提交
224
			goto nomem_free;
225 226 227 228 229
	}

	init_waitqueue_head(&call->waitq);
	return call;

D
David Howells 已提交
230 231
nomem_free:
	afs_free_call(call);
232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248
nomem_call:
	return NULL;
}

/*
 * clean up a call with flat buffer
 */
void afs_flat_call_destructor(struct afs_call *call)
{
	_enter("");

	kfree(call->request);
	call->request = NULL;
	kfree(call->buffer);
	call->buffer = NULL;
}

249 250 251
/*
 * attach the data from a bunch of pages on an inode to a call
 */
A
Adrian Bunk 已提交
252 253
static int afs_send_pages(struct afs_call *call, struct msghdr *msg,
			  struct kvec *iov)
254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287
{
	struct page *pages[8];
	unsigned count, n, loop, offset, to;
	pgoff_t first = call->first, last = call->last;
	int ret;

	_enter("");

	offset = call->first_offset;
	call->first_offset = 0;

	do {
		_debug("attach %lx-%lx", first, last);

		count = last - first + 1;
		if (count > ARRAY_SIZE(pages))
			count = ARRAY_SIZE(pages);
		n = find_get_pages_contig(call->mapping, first, count, pages);
		ASSERTCMP(n, ==, count);

		loop = 0;
		do {
			msg->msg_flags = 0;
			to = PAGE_SIZE;
			if (first + loop >= last)
				to = call->last_to;
			else
				msg->msg_flags = MSG_MORE;
			iov->iov_base = kmap(pages[loop]) + offset;
			iov->iov_len = to - offset;
			offset = 0;

			_debug("- range %u-%u%s",
			       offset, to, msg->msg_flags ? " [more]" : "");
288 289
			iov_iter_kvec(&msg->msg_iter, WRITE | ITER_KVEC,
				      iov, 1, to - offset);
290 291 292 293 294 295

			/* have to change the state *before* sending the last
			 * packet as RxRPC might give us the reply before it
			 * returns from sending the request */
			if (first + loop >= last)
				call->state = AFS_CALL_AWAIT_REPLY;
296 297
			ret = rxrpc_kernel_send_data(afs_socket, call->rxcall,
						     msg, to - offset);
298 299 300 301 302 303 304 305 306 307
			kunmap(pages[loop]);
			if (ret < 0)
				break;
		} while (++loop < count);
		first += count;

		for (loop = 0; loop < count; loop++)
			put_page(pages[loop]);
		if (ret < 0)
			break;
D
David Howells 已提交
308
	} while (first <= last);
309 310 311 312 313

	_leave(" = %d", ret);
	return ret;
}

314 315 316 317 318 319 320 321 322 323 324 325 326 327
/*
 * initiate a call
 */
int afs_make_call(struct in_addr *addr, struct afs_call *call, gfp_t gfp,
		  const struct afs_wait_mode *wait_mode)
{
	struct sockaddr_rxrpc srx;
	struct rxrpc_call *rxcall;
	struct msghdr msg;
	struct kvec iov[1];
	int ret;

	_enter("%x,{%d},", addr->s_addr, ntohs(call->port));

D
David Howells 已提交
328 329 330
	ASSERT(call->type != NULL);
	ASSERT(call->type->name != NULL);

331 332 333
	_debug("____MAKE %p{%s,%x} [%d]____",
	       call, call->type->name, key_serial(call->key),
	       atomic_read(&afs_outstanding_calls));
D
David Howells 已提交
334

335
	call->wait_mode = wait_mode;
336
	INIT_WORK(&call->async_work, afs_process_async_call);
337 338 339 340 341 342 343 344 345 346 347 348

	memset(&srx, 0, sizeof(srx));
	srx.srx_family = AF_RXRPC;
	srx.srx_service = call->service_id;
	srx.transport_type = SOCK_DGRAM;
	srx.transport_len = sizeof(srx.transport.sin);
	srx.transport.sin.sin_family = AF_INET;
	srx.transport.sin.sin_port = call->port;
	memcpy(&srx.transport.sin.sin_addr, addr, 4);

	/* create a call */
	rxcall = rxrpc_kernel_begin_call(afs_socket, &srx, call->key,
349 350
					 (unsigned long) call, gfp,
					 wait_mode->notify_rx);
D
David Howells 已提交
351
	call->key = NULL;
352 353 354 355 356 357 358 359 360 361 362 363 364
	if (IS_ERR(rxcall)) {
		ret = PTR_ERR(rxcall);
		goto error_kill_call;
	}

	call->rxcall = rxcall;

	/* send the request */
	iov[0].iov_base	= call->request;
	iov[0].iov_len	= call->request_size;

	msg.msg_name		= NULL;
	msg.msg_namelen		= 0;
365
	iov_iter_kvec(&msg.msg_iter, WRITE | ITER_KVEC, iov, 1,
A
Al Viro 已提交
366
		      call->request_size);
367 368
	msg.msg_control		= NULL;
	msg.msg_controllen	= 0;
369
	msg.msg_flags		= (call->send_pages ? MSG_MORE : 0);
370 371 372 373

	/* have to change the state *before* sending the last packet as RxRPC
	 * might give us the reply before it returns from sending the
	 * request */
374 375
	if (!call->send_pages)
		call->state = AFS_CALL_AWAIT_REPLY;
376 377
	ret = rxrpc_kernel_send_data(afs_socket, rxcall,
				     &msg, call->request_size);
378 379 380
	if (ret < 0)
		goto error_do_abort;

381 382 383 384 385 386
	if (call->send_pages) {
		ret = afs_send_pages(call, &msg, iov);
		if (ret < 0)
			goto error_do_abort;
	}

387 388 389 390 391
	/* at this point, an async call may no longer exist as it may have
	 * already completed */
	return wait_mode->wait(call);

error_do_abort:
392
	rxrpc_kernel_abort_call(afs_socket, rxcall, RX_USER_ABORT, -ret, "KSD");
393
error_kill_call:
394
	afs_end_call(call);
395 396 397 398 399 400 401 402 403 404 405 406
	_leave(" = %d", ret);
	return ret;
}

/*
 * deliver messages to a call
 */
static void afs_deliver_to_call(struct afs_call *call)
{
	u32 abort_code;
	int ret;

407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423
	_enter("%s", call->type->name);

	while (call->state == AFS_CALL_AWAIT_REPLY ||
	       call->state == AFS_CALL_AWAIT_OP_ID ||
	       call->state == AFS_CALL_AWAIT_REQUEST ||
	       call->state == AFS_CALL_AWAIT_ACK
	       ) {
		if (call->state == AFS_CALL_AWAIT_ACK) {
			size_t offset = 0;
			ret = rxrpc_kernel_recv_data(afs_socket, call->rxcall,
						     NULL, 0, &offset, false,
						     &call->abort_code);
			if (ret == -EINPROGRESS || ret == -EAGAIN)
				return;
			if (ret == 1) {
				call->state = AFS_CALL_COMPLETE;
				goto done;
424
			}
425
			return;
426 427
		}

428 429 430 431 432 433 434 435 436 437 438 439
		ret = call->type->deliver(call);
		switch (ret) {
		case 0:
			if (call->state == AFS_CALL_AWAIT_REPLY)
				call->state = AFS_CALL_COMPLETE;
			goto done;
		case -EINPROGRESS:
		case -EAGAIN:
			goto out;
		case -ENOTCONN:
			abort_code = RX_CALL_DEAD;
			rxrpc_kernel_abort_call(afs_socket, call->rxcall,
440
						abort_code, -ret, "KNC");
441 442 443 444
			goto do_abort;
		case -ENOTSUPP:
			abort_code = RX_INVALID_OPERATION;
			rxrpc_kernel_abort_call(afs_socket, call->rxcall,
445
						abort_code, -ret, "KIV");
446 447 448 449 450 451 452 453 454
			goto do_abort;
		case -ENODATA:
		case -EBADMSG:
		case -EMSGSIZE:
		default:
			abort_code = RXGEN_CC_UNMARSHAL;
			if (call->state != AFS_CALL_AWAIT_REPLY)
				abort_code = RXGEN_SS_UNMARSHAL;
			rxrpc_kernel_abort_call(afs_socket, call->rxcall,
455
						abort_code, EBADMSG, "KUM");
456 457
			goto do_abort;
		}
458 459
	}

460 461 462 463
done:
	if (call->state == AFS_CALL_COMPLETE && call->incoming)
		afs_end_call(call);
out:
464
	_leave("");
465 466 467 468 469 470
	return;

do_abort:
	call->error = ret;
	call->state = AFS_CALL_COMPLETE;
	goto done;
471 472 473 474 475 476 477
}

/*
 * wait synchronously for a call to complete
 */
static int afs_wait_for_call_to_complete(struct afs_call *call)
{
478
	const char *abort_why;
479 480 481 482 483 484 485 486 487 488 489
	int ret;

	DECLARE_WAITQUEUE(myself, current);

	_enter("");

	add_wait_queue(&call->waitq, &myself);
	for (;;) {
		set_current_state(TASK_INTERRUPTIBLE);

		/* deliver any messages that are in the queue */
490 491
		if (call->state < AFS_CALL_COMPLETE && call->need_attention) {
			call->need_attention = false;
492 493 494 495 496
			__set_current_state(TASK_RUNNING);
			afs_deliver_to_call(call);
			continue;
		}

497
		abort_why = "KWC";
498
		ret = call->error;
499
		if (call->state == AFS_CALL_COMPLETE)
500
			break;
501
		abort_why = "KWI";
502 503 504 505 506 507 508 509 510 511 512 513
		ret = -EINTR;
		if (signal_pending(current))
			break;
		schedule();
	}

	remove_wait_queue(&call->waitq, &myself);
	__set_current_state(TASK_RUNNING);

	/* kill the call */
	if (call->state < AFS_CALL_COMPLETE) {
		_debug("call incomplete");
514
		rxrpc_kernel_abort_call(afs_socket, call->rxcall,
515
					RX_CALL_DEAD, -ret, abort_why);
516 517 518
	}

	_debug("call complete");
519
	afs_end_call(call);
520 521 522 523 524 525 526
	_leave(" = %d", ret);
	return ret;
}

/*
 * wake up a waiting call
 */
527 528
static void afs_wake_up_call_waiter(struct sock *sk, struct rxrpc_call *rxcall,
				    unsigned long call_user_ID)
529
{
530 531 532
	struct afs_call *call = (struct afs_call *)call_user_ID;

	call->need_attention = true;
533 534 535 536 537 538
	wake_up(&call->waitq);
}

/*
 * wake up an asynchronous call
 */
539 540
static void afs_wake_up_async_call(struct sock *sk, struct rxrpc_call *rxcall,
				   unsigned long call_user_ID)
541
{
542 543 544
	struct afs_call *call = (struct afs_call *)call_user_ID;

	call->need_attention = true;
545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561
	queue_work(afs_async_calls, &call->async_work);
}

/*
 * put a call into asynchronous mode
 * - mustn't touch the call descriptor as the call my have completed by the
 *   time we get here
 */
static int afs_dont_wait_for_call_to_complete(struct afs_call *call)
{
	_enter("");
	return -EINPROGRESS;
}

/*
 * delete an asynchronous call
 */
562
static void afs_delete_async_call(struct work_struct *work)
563
{
564 565
	struct afs_call *call = container_of(work, struct afs_call, async_work);

566 567
	_enter("");

D
David Howells 已提交
568
	afs_free_call(call);
569 570 571 572 573 574 575

	_leave("");
}

/*
 * perform processing on an asynchronous call
 */
576
static void afs_process_async_call(struct work_struct *work)
577
{
578 579
	struct afs_call *call = container_of(work, struct afs_call, async_work);

580 581
	_enter("");

582 583
	if (call->state < AFS_CALL_COMPLETE && call->need_attention) {
		call->need_attention = false;
584
		afs_deliver_to_call(call);
585
	}
586

587
	if (call->state == AFS_CALL_COMPLETE && call->wait_mode) {
588 589 590 591 592 593
		if (call->wait_mode->async_complete)
			call->wait_mode->async_complete(call->reply,
							call->error);
		call->reply = NULL;

		/* kill the call */
594
		afs_end_call_nofree(call);
595 596 597

		/* we can't just delete the call because the work item may be
		 * queued */
598
		call->async_work.func = afs_delete_async_call;
599 600 601 602 603 604
		queue_work(afs_async_calls, &call->async_work);
	}

	_leave("");
}

605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655
static void afs_rx_attach(struct rxrpc_call *rxcall, unsigned long user_call_ID)
{
	struct afs_call *call = (struct afs_call *)user_call_ID;

	call->rxcall = rxcall;
}

/*
 * Charge the incoming call preallocation.
 */
static void afs_charge_preallocation(struct work_struct *work)
{
	struct afs_call *call = afs_spare_incoming_call;

	for (;;) {
		if (!call) {
			call = kzalloc(sizeof(struct afs_call), GFP_KERNEL);
			if (!call)
				break;

			INIT_WORK(&call->async_work, afs_process_async_call);
			call->wait_mode = &afs_async_incoming_call;
			call->type = &afs_RXCMxxxx;
			init_waitqueue_head(&call->waitq);
			call->state = AFS_CALL_AWAIT_OP_ID;
		}

		if (rxrpc_kernel_charge_accept(afs_socket,
					       afs_wake_up_async_call,
					       afs_rx_attach,
					       (unsigned long)call,
					       GFP_KERNEL) < 0)
			break;
		call = NULL;
	}
	afs_spare_incoming_call = call;
}

/*
 * Discard a preallocated call when a socket is shut down.
 */
static void afs_rx_discard_new_call(struct rxrpc_call *rxcall,
				    unsigned long user_call_ID)
{
	struct afs_call *call = (struct afs_call *)user_call_ID;

	atomic_inc(&afs_outstanding_calls);
	call->rxcall = NULL;
	afs_free_call(call);
}

656 657 658
/*
 * Notification of an incoming call.
 */
659 660
static void afs_rx_new_call(struct sock *sk, struct rxrpc_call *rxcall,
			    unsigned long user_call_ID)
661
{
662
	atomic_inc(&afs_outstanding_calls);
663
	queue_work(afs_wq, &afs_charge_preallocation_work);
664 665
}

666
/*
667 668
 * Grab the operation ID from an incoming cache manager call.  The socket
 * buffer is discarded on error or if we don't yet have sufficient data.
669
 */
670
static int afs_deliver_cm_op_id(struct afs_call *call)
671
{
672
	int ret;
673

674
	_enter("{%zu}", call->offset);
675 676 677 678

	ASSERTCMP(call->offset, <, 4);

	/* the operation ID forms the first four bytes of the request data */
679 680 681
	ret = afs_extract_data(call, &call->operation_ID, 4, true);
	if (ret < 0)
		return ret;
682 683

	call->state = AFS_CALL_AWAIT_REQUEST;
684
	call->offset = 0;
685 686 687 688 689 690 691 692

	/* ask the cache manager to route the call (it'll change the call type
	 * if successful) */
	if (!afs_cm_incoming_call(call))
		return -ENOTSUPP;

	/* pass responsibility for the remainer of this message off to the
	 * cache manager op */
693
	return call->type->deliver(call);
694 695 696 697 698 699 700 701 702 703 704 705 706
}

/*
 * send an empty reply
 */
void afs_send_empty_reply(struct afs_call *call)
{
	struct msghdr msg;

	_enter("");

	msg.msg_name		= NULL;
	msg.msg_namelen		= 0;
707
	iov_iter_kvec(&msg.msg_iter, WRITE | ITER_KVEC, NULL, 0, 0);
708 709 710 711 712
	msg.msg_control		= NULL;
	msg.msg_controllen	= 0;
	msg.msg_flags		= 0;

	call->state = AFS_CALL_AWAIT_ACK;
713
	switch (rxrpc_kernel_send_data(afs_socket, call->rxcall, &msg, 0)) {
714 715 716 717 718 719
	case 0:
		_leave(" [replied]");
		return;

	case -ENOMEM:
		_debug("oom");
720
		rxrpc_kernel_abort_call(afs_socket, call->rxcall,
721
					RX_USER_ABORT, ENOMEM, "KOO");
722
	default:
723
		afs_end_call(call);
724 725 726 727 728
		_leave(" [error]");
		return;
	}
}

729 730 731 732 733 734
/*
 * send a simple reply
 */
void afs_send_simple_reply(struct afs_call *call, const void *buf, size_t len)
{
	struct msghdr msg;
735
	struct kvec iov[1];
736
	int n;
737 738 739 740 741 742 743

	_enter("");

	iov[0].iov_base		= (void *) buf;
	iov[0].iov_len		= len;
	msg.msg_name		= NULL;
	msg.msg_namelen		= 0;
744
	iov_iter_kvec(&msg.msg_iter, WRITE | ITER_KVEC, iov, 1, len);
745 746 747 748 749
	msg.msg_control		= NULL;
	msg.msg_controllen	= 0;
	msg.msg_flags		= 0;

	call->state = AFS_CALL_AWAIT_ACK;
750
	n = rxrpc_kernel_send_data(afs_socket, call->rxcall, &msg, len);
751
	if (n >= 0) {
752
		/* Success */
753 754
		_leave(" [replied]");
		return;
755
	}
756

757
	if (n == -ENOMEM) {
758
		_debug("oom");
759
		rxrpc_kernel_abort_call(afs_socket, call->rxcall,
760
					RX_USER_ABORT, ENOMEM, "KOO");
761
	}
762
	afs_end_call(call);
763
	_leave(" [error]");
764 765
}

766
/*
767
 * Extract a piece of data from the received data socket buffers.
768
 */
769 770
int afs_extract_data(struct afs_call *call, void *buf, size_t count,
		     bool want_more)
771
{
772
	int ret;
773

774 775
	_enter("{%s,%zu},,%zu,%d",
	       call->type->name, call->offset, count, want_more);
776

777
	ASSERTCMP(call->offset, <=, count);
778

779 780 781 782 783
	ret = rxrpc_kernel_recv_data(afs_socket, call->rxcall,
				     buf, count, &call->offset,
				     want_more, &call->abort_code);
	if (ret == 0 || ret == -EAGAIN)
		return ret;
784

785 786 787 788 789 790 791 792 793 794 795 796
	if (ret == 1) {
		switch (call->state) {
		case AFS_CALL_AWAIT_REPLY:
			call->state = AFS_CALL_COMPLETE;
			break;
		case AFS_CALL_AWAIT_REQUEST:
			call->state = AFS_CALL_REPLYING;
			break;
		default:
			break;
		}
		return 0;
797
	}
798 799 800 801 802 803 804

	if (ret == -ECONNABORTED)
		call->error = call->type->abort_to_error(call->abort_code);
	else
		call->error = ret;
	call->state = AFS_CALL_COMPLETE;
	return ret;
805
}