call_object.c 18.9 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11
/* RxRPC individual remote procedure call handling
 *
 * Copyright (C) 2007 Red Hat, Inc. All Rights Reserved.
 * Written by David Howells (dhowells@redhat.com)
 *
 * This program is free software; you can redistribute it and/or
 * modify it under the terms of the GNU General Public License
 * as published by the Free Software Foundation; either version
 * 2 of the License, or (at your option) any later version.
 */

12 13
#define pr_fmt(fmt) KBUILD_MODNAME ": " fmt

14
#include <linux/slab.h>
15 16
#include <linux/module.h>
#include <linux/circ_buf.h>
17
#include <linux/spinlock_types.h>
18 19 20 21
#include <net/sock.h>
#include <net/af_rxrpc.h>
#include "ar-internal.h"

22
const char *const rxrpc_call_states[NR__RXRPC_CALL_STATES] = {
23
	[RXRPC_CALL_UNINITIALISED]		= "Uninit  ",
24
	[RXRPC_CALL_CLIENT_AWAIT_CONN]		= "ClWtConn",
25 26 27
	[RXRPC_CALL_CLIENT_SEND_REQUEST]	= "ClSndReq",
	[RXRPC_CALL_CLIENT_AWAIT_REPLY]		= "ClAwtRpl",
	[RXRPC_CALL_CLIENT_RECV_REPLY]		= "ClRcvRpl",
28
	[RXRPC_CALL_SERVER_PREALLOC]		= "SvPrealc",
29 30 31 32 33 34 35
	[RXRPC_CALL_SERVER_SECURING]		= "SvSecure",
	[RXRPC_CALL_SERVER_ACCEPTING]		= "SvAccept",
	[RXRPC_CALL_SERVER_RECV_REQUEST]	= "SvRcvReq",
	[RXRPC_CALL_SERVER_ACK_REQUEST]		= "SvAckReq",
	[RXRPC_CALL_SERVER_SEND_REPLY]		= "SvSndRpl",
	[RXRPC_CALL_SERVER_AWAIT_ACK]		= "SvAwtACK",
	[RXRPC_CALL_COMPLETE]			= "Complete",
36 37 38 39
};

const char *const rxrpc_call_completions[NR__RXRPC_CALL_COMPLETIONS] = {
	[RXRPC_CALL_SUCCEEDED]			= "Complete",
40 41
	[RXRPC_CALL_REMOTELY_ABORTED]		= "RmtAbort",
	[RXRPC_CALL_LOCALLY_ABORTED]		= "LocAbort",
42
	[RXRPC_CALL_LOCAL_ERROR]		= "LocError",
43 44 45
	[RXRPC_CALL_NETWORK_ERROR]		= "NetError",
};

46 47
struct kmem_cache *rxrpc_call_jar;

48
static void rxrpc_call_timer_expired(struct timer_list *t)
49
{
50
	struct rxrpc_call *call = from_timer(call, t, timer);
51 52 53

	_enter("%d", call->debug_id);

D
David Howells 已提交
54 55 56 57
	if (call->state < RXRPC_CALL_COMPLETE) {
		trace_rxrpc_timer(call, rxrpc_timer_expired, jiffies);
		rxrpc_queue_call(call);
	}
58
}
59

60 61
static struct lock_class_key rxrpc_call_user_mutex_lock_class_key;

62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92
/*
 * find an extant server call
 * - called in process context with IRQs enabled
 */
struct rxrpc_call *rxrpc_find_call_by_user_ID(struct rxrpc_sock *rx,
					      unsigned long user_call_ID)
{
	struct rxrpc_call *call;
	struct rb_node *p;

	_enter("%p,%lx", rx, user_call_ID);

	read_lock(&rx->call_lock);

	p = rx->calls.rb_node;
	while (p) {
		call = rb_entry(p, struct rxrpc_call, sock_node);

		if (user_call_ID < call->user_call_ID)
			p = p->rb_left;
		else if (user_call_ID > call->user_call_ID)
			p = p->rb_right;
		else
			goto found_extant_call;
	}

	read_unlock(&rx->call_lock);
	_leave(" = NULL");
	return NULL;

found_extant_call:
93
	rxrpc_get_call(call, rxrpc_call_got);
94 95 96 97 98
	read_unlock(&rx->call_lock);
	_leave(" = %p [%d]", call, atomic_read(&call->usage));
	return call;
}

99 100 101
/*
 * allocate a new call
 */
102 103
struct rxrpc_call *rxrpc_alloc_call(struct rxrpc_sock *rx, gfp_t gfp,
				    unsigned int debug_id)
104 105
{
	struct rxrpc_call *call;
106
	struct rxrpc_net *rxnet = rxrpc_net(sock_net(&rx->sk));
107 108 109 110 111

	call = kmem_cache_zalloc(rxrpc_call_jar, gfp);
	if (!call)
		return NULL;

112 113
	call->rxtx_buffer = kcalloc(RXRPC_RXTX_BUFF_SIZE,
				    sizeof(struct sk_buff *),
114
				    gfp);
115 116
	if (!call->rxtx_buffer)
		goto nomem;
117

118 119 120 121
	call->rxtx_annotations = kcalloc(RXRPC_RXTX_BUFF_SIZE, sizeof(u8), gfp);
	if (!call->rxtx_annotations)
		goto nomem_2;

122
	mutex_init(&call->user_mutex);
123 124 125 126 127 128 129 130

	/* Prevent lockdep reporting a deadlock false positive between the afs
	 * filesystem and sys_sendmsg() via the mmap sem.
	 */
	if (rx->sk.sk_kern_sock)
		lockdep_set_class(&call->user_mutex,
				  &rxrpc_call_user_mutex_lock_class_key);

131
	timer_setup(&call->timer, rxrpc_call_timer_expired, 0);
132
	INIT_WORK(&call->processor, &rxrpc_process_call);
133
	INIT_LIST_HEAD(&call->link);
134
	INIT_LIST_HEAD(&call->chan_wait_link);
135
	INIT_LIST_HEAD(&call->accept_link);
136 137
	INIT_LIST_HEAD(&call->recvmsg_link);
	INIT_LIST_HEAD(&call->sock_link);
138
	init_waitqueue_head(&call->waitq);
139
	spin_lock_init(&call->lock);
140
	spin_lock_init(&call->notify_lock);
141 142
	rwlock_init(&call->state_lock);
	atomic_set(&call->usage, 1);
143
	call->debug_id = debug_id;
144
	call->tx_total_len = -1;
D
David Howells 已提交
145 146
	call->next_rx_timo = 20 * HZ;
	call->next_req_timo = 1 * HZ;
147 148 149

	memset(&call->sock_node, 0xed, sizeof(call->sock_node));

150
	/* Leave space in the ring to handle a maxed-out jumbo packet */
151
	call->rx_winsize = rxrpc_rx_window_size;
152 153
	call->tx_winsize = 16;
	call->rx_expect_next = 1;
D
David Howells 已提交
154

155
	call->cong_cwnd = 2;
D
David Howells 已提交
156
	call->cong_ssthresh = RXRPC_RXTX_BUFF_SIZE - 1;
157 158 159

	call->rxnet = rxnet;
	atomic_inc(&rxnet->nr_calls);
160
	return call;
161 162 163 164 165 166

nomem_2:
	kfree(call->rxtx_buffer);
nomem:
	kmem_cache_free(rxrpc_call_jar, call);
	return NULL;
167 168 169
}

/*
170
 * Allocate a new client call.
171
 */
172 173
static struct rxrpc_call *rxrpc_alloc_client_call(struct rxrpc_sock *rx,
						  struct sockaddr_rxrpc *srx,
174 175
						  gfp_t gfp,
						  unsigned int debug_id)
176 177
{
	struct rxrpc_call *call;
D
David Howells 已提交
178
	ktime_t now;
179 180 181

	_enter("");

182
	call = rxrpc_alloc_call(rx, gfp, debug_id);
183 184
	if (!call)
		return ERR_PTR(-ENOMEM);
185 186
	call->state = RXRPC_CALL_CLIENT_AWAIT_CONN;
	call->service_id = srx->srx_service;
D
David Howells 已提交
187
	call->tx_phase = true;
D
David Howells 已提交
188 189 190
	now = ktime_get_real();
	call->acks_latest_ts = now;
	call->cong_tstamp = now;
191 192 193 194 195 196

	_leave(" = %p", call);
	return call;
}

/*
197
 * Initiate the call ack/resend/expiry timer.
198
 */
199
static void rxrpc_start_call_timer(struct rxrpc_call *call)
200
{
D
David Howells 已提交
201 202 203 204
	unsigned long now = jiffies;
	unsigned long j = now + MAX_JIFFY_OFFSET;

	call->ack_at = j;
205
	call->ack_lost_at = j;
D
David Howells 已提交
206 207 208 209 210 211
	call->resend_at = j;
	call->ping_at = j;
	call->expect_rx_by = j;
	call->expect_req_by = j;
	call->expect_term_by = j;
	call->timer.expires = now;
212 213 214
}

/*
215 216 217
 * Set up a call for the given parameters.
 * - Called with the socket lock held, which it must release.
 * - If it returns a call, the call's lock will need releasing by the caller.
218
 */
219
struct rxrpc_call *rxrpc_new_client_call(struct rxrpc_sock *rx,
220
					 struct rxrpc_conn_parameters *cp,
221
					 struct sockaddr_rxrpc *srx,
222
					 struct rxrpc_call_params *p,
223 224
					 gfp_t gfp,
					 unsigned int debug_id)
225
	__releases(&rx->sk.sk_lock.slock)
226
	__acquires(&call->user_mutex)
227
{
228
	struct rxrpc_call *call, *xcall;
229
	struct rxrpc_net *rxnet;
230
	struct rb_node *parent, **pp;
D
David Howells 已提交
231
	const void *here = __builtin_return_address(0);
232
	int ret;
233

234
	_enter("%p,%lx", rx, p->user_call_ID);
235

236
	call = rxrpc_alloc_client_call(rx, srx, gfp, debug_id);
237
	if (IS_ERR(call)) {
238
		release_sock(&rx->sk);
239 240
		_leave(" = %ld", PTR_ERR(call));
		return call;
241 242
	}

243
	call->tx_total_len = p->tx_total_len;
244
	trace_rxrpc_call(call, rxrpc_call_new_client, atomic_read(&call->usage),
245
			 here, (const void *)p->user_call_ID);
D
David Howells 已提交
246

247 248 249 250 251
	/* We need to protect a partially set up call against the user as we
	 * will be acting outside the socket lock.
	 */
	mutex_lock(&call->user_mutex);

252
	/* Publish the call, even though it is incompletely set up as yet */
253 254 255 256 257 258
	write_lock(&rx->call_lock);

	pp = &rx->calls.rb_node;
	parent = NULL;
	while (*pp) {
		parent = *pp;
259
		xcall = rb_entry(parent, struct rxrpc_call, sock_node);
260

261
		if (p->user_call_ID < xcall->user_call_ID)
262
			pp = &(*pp)->rb_left;
263
		else if (p->user_call_ID > xcall->user_call_ID)
264 265
			pp = &(*pp)->rb_right;
		else
266
			goto error_dup_user_ID;
267 268
	}

269
	rcu_assign_pointer(call->socket, rx);
270
	call->user_call_ID = p->user_call_ID;
271
	__set_bit(RXRPC_CALL_HAS_USERID, &call->flags);
272
	rxrpc_get_call(call, rxrpc_call_got_userid);
273 274
	rb_link_node(&call->sock_node, parent, pp);
	rb_insert_color(&call->sock_node, &rx->calls);
275 276
	list_add(&call->sock_link, &rx->sock_calls);

277 278
	write_unlock(&rx->call_lock);

279
	rxnet = call->rxnet;
280 281 282
	write_lock(&rxnet->call_lock);
	list_add_tail(&call->link, &rxnet->calls);
	write_unlock(&rxnet->call_lock);
283

284 285 286
	/* From this point on, the call is protected by its own lock. */
	release_sock(&rx->sk);

287 288 289 290
	/* Set up or get a connection record and set the protocol parameters,
	 * including channel number and call ID.
	 */
	ret = rxrpc_connect_call(call, cp, srx, gfp);
291 292 293
	if (ret < 0)
		goto error;

294
	trace_rxrpc_call(call, rxrpc_call_connected, atomic_read(&call->usage),
295
			 here, NULL);
296

297 298
	rxrpc_start_call_timer(call);

299 300 301 302 303
	_net("CALL new %d on CONN %d", call->debug_id, call->conn->debug_id);

	_leave(" = %p [new]", call);
	return call;

304 305 306 307 308
	/* We unexpectedly found the user ID in the list after taking
	 * the call_lock.  This shouldn't happen unless the user races
	 * with itself and tries to add the same user ID twice at the
	 * same time in different threads.
	 */
309
error_dup_user_ID:
310
	write_unlock(&rx->call_lock);
311
	release_sock(&rx->sk);
312
	ret = -EEXIST;
313 314 315 316

error:
	__rxrpc_set_call_completion(call, RXRPC_CALL_LOCAL_ERROR,
				    RX_CALL_DEAD, ret);
317 318
	trace_rxrpc_call(call, rxrpc_call_error, atomic_read(&call->usage),
			 here, ERR_PTR(ret));
319
	rxrpc_release_call(rx, call);
320
	mutex_unlock(&call->user_mutex);
321 322 323
	rxrpc_put_call(call, rxrpc_call_put);
	_leave(" = %d", ret);
	return ERR_PTR(ret);
324 325
}

326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367
/*
 * Retry a call to a new address.  It is expected that the Tx queue of the call
 * will contain data previously packaged for an old call.
 */
int rxrpc_retry_client_call(struct rxrpc_sock *rx,
			    struct rxrpc_call *call,
			    struct rxrpc_conn_parameters *cp,
			    struct sockaddr_rxrpc *srx,
			    gfp_t gfp)
{
	const void *here = __builtin_return_address(0);
	int ret;

	/* Set up or get a connection record and set the protocol parameters,
	 * including channel number and call ID.
	 */
	ret = rxrpc_connect_call(call, cp, srx, gfp);
	if (ret < 0)
		goto error;

	trace_rxrpc_call(call, rxrpc_call_connected, atomic_read(&call->usage),
			 here, NULL);

	rxrpc_start_call_timer(call);

	_net("CALL new %d on CONN %d", call->debug_id, call->conn->debug_id);

	if (!test_and_set_bit(RXRPC_CALL_EV_RESEND, &call->events))
		rxrpc_queue_call(call);

	_leave(" = 0");
	return 0;

error:
	rxrpc_set_call_completion(call, RXRPC_CALL_LOCAL_ERROR,
				  RX_CALL_DEAD, ret);
	trace_rxrpc_call(call, rxrpc_call_error, atomic_read(&call->usage),
			 here, ERR_PTR(ret));
	_leave(" = %d", ret);
	return ret;
}

368
/*
369 370
 * Set up an incoming call.  call->conn points to the connection.
 * This is called in BH context and isn't allowed to fail.
371
 */
372 373 374
void rxrpc_incoming_call(struct rxrpc_sock *rx,
			 struct rxrpc_call *call,
			 struct sk_buff *skb)
375
{
376
	struct rxrpc_connection *conn = call->conn;
377
	struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
378
	u32 chan;
379

380
	_enter(",%d", call->conn->debug_id);
D
David Howells 已提交
381

382 383 384 385 386 387 388
	rcu_assign_pointer(call->socket, rx);
	call->call_id		= sp->hdr.callNumber;
	call->service_id	= sp->hdr.serviceId;
	call->cid		= sp->hdr.cid;
	call->state		= RXRPC_CALL_SERVER_ACCEPTING;
	if (sp->hdr.securityIndex > 0)
		call->state	= RXRPC_CALL_SERVER_SECURING;
D
David Howells 已提交
389
	call->cong_tstamp	= skb->tstamp;
390 391 392 393 394 395

	/* Set the channel for this call.  We don't get channel_lock as we're
	 * only defending against the data_ready handler (which we're called
	 * from) and the RESPONSE packet parser (which is only really
	 * interested in call_counter and can cope with a disagreement with the
	 * call pointer).
396
	 */
397 398 399
	chan = sp->hdr.cid & RXRPC_CHANNELMASK;
	conn->channels[chan].call_counter = call->call_id;
	conn->channels[chan].call_id = call->call_id;
400
	rcu_assign_pointer(conn->channels[chan].call, call);
401

402 403 404
	spin_lock(&conn->params.peer->lock);
	hlist_add_head(&call->error_link, &conn->params.peer->error_targets);
	spin_unlock(&conn->params.peer->lock);
405 406 407

	_net("CALL incoming %d on CONN %d", call->debug_id, call->conn->debug_id);

408 409
	rxrpc_start_call_timer(call);
	_leave("");
410 411
}

412 413 414 415 416 417 418 419 420 421
/*
 * Queue a call's work processor, getting a ref to pass to the work queue.
 */
bool rxrpc_queue_call(struct rxrpc_call *call)
{
	const void *here = __builtin_return_address(0);
	int n = __atomic_add_unless(&call->usage, 1, 0);
	if (n == 0)
		return false;
	if (rxrpc_queue_work(&call->processor))
422
		trace_rxrpc_call(call, rxrpc_call_queued, n + 1, here, NULL);
423 424 425 426 427 428 429 430 431 432 433 434 435 436
	else
		rxrpc_put_call(call, rxrpc_call_put_noqueue);
	return true;
}

/*
 * Queue a call's work processor, passing the callers ref to the work queue.
 */
bool __rxrpc_queue_call(struct rxrpc_call *call)
{
	const void *here = __builtin_return_address(0);
	int n = atomic_read(&call->usage);
	ASSERTCMP(n, >=, 1);
	if (rxrpc_queue_work(&call->processor))
437
		trace_rxrpc_call(call, rxrpc_call_queued_ref, n, here, NULL);
438 439 440 441 442
	else
		rxrpc_put_call(call, rxrpc_call_put_noqueue);
	return true;
}

D
David Howells 已提交
443 444 445 446 447 448 449 450 451
/*
 * Note the re-emergence of a call.
 */
void rxrpc_see_call(struct rxrpc_call *call)
{
	const void *here = __builtin_return_address(0);
	if (call) {
		int n = atomic_read(&call->usage);

452
		trace_rxrpc_call(call, rxrpc_call_seen, n, here, NULL);
D
David Howells 已提交
453 454 455 456 457 458
	}
}

/*
 * Note the addition of a ref on a call.
 */
459
void rxrpc_get_call(struct rxrpc_call *call, enum rxrpc_call_trace op)
D
David Howells 已提交
460 461 462 463
{
	const void *here = __builtin_return_address(0);
	int n = atomic_inc_return(&call->usage);

464
	trace_rxrpc_call(call, op, n, here, NULL);
D
David Howells 已提交
465 466 467
}

/*
468
 * Detach a call from its owning socket.
D
David Howells 已提交
469
 */
470
void rxrpc_release_call(struct rxrpc_sock *rx, struct rxrpc_call *call)
D
David Howells 已提交
471
{
472
	const void *here = __builtin_return_address(0);
473 474 475
	struct rxrpc_connection *conn = call->conn;
	bool put = false;
	int i;
D
David Howells 已提交
476

477
	_enter("{%d,%d}", call->debug_id, atomic_read(&call->usage));
D
David Howells 已提交
478

479 480
	trace_rxrpc_call(call, rxrpc_call_release, atomic_read(&call->usage),
			 here, (const void *)call->flags);
481

482
	ASSERTCMP(call->state, ==, RXRPC_CALL_COMPLETE);
D
David Howells 已提交
483

484 485 486 487 488
	spin_lock_bh(&call->lock);
	if (test_and_set_bit(RXRPC_CALL_RELEASED, &call->flags))
		BUG();
	spin_unlock_bh(&call->lock);

489
	del_timer_sync(&call->timer);
490

491 492
	/* Make sure we don't get any more notifications */
	write_lock_bh(&rx->recvmsg_lock);
493

494
	if (!list_empty(&call->recvmsg_link)) {
495 496
		_debug("unlinking once-pending call %p { e=%lx f=%lx }",
		       call, call->events, call->flags);
497 498 499 500 501 502 503 504 505 506 507 508 509 510 511
		list_del(&call->recvmsg_link);
		put = true;
	}

	/* list_empty() must return false in rxrpc_notify_socket() */
	call->recvmsg_link.next = NULL;
	call->recvmsg_link.prev = NULL;

	write_unlock_bh(&rx->recvmsg_lock);
	if (put)
		rxrpc_put_call(call, rxrpc_call_put);

	write_lock(&rx->call_lock);

	if (test_and_clear_bit(RXRPC_CALL_HAS_USERID, &call->flags)) {
512 513
		rb_erase(&call->sock_node, &rx->calls);
		memset(&call->sock_node, 0xdd, sizeof(call->sock_node));
514
		rxrpc_put_call(call, rxrpc_call_put_userid);
515 516
	}

517 518 519 520 521 522
	list_del(&call->sock_link);
	write_unlock(&rx->call_lock);

	_debug("RELEASE CALL %p (%d CONN %p)", call, call->debug_id, conn);

	if (conn)
523
		rxrpc_disconnect_call(call);
524

525
	for (i = 0; i < RXRPC_RXTX_BUFF_SIZE; i++) {
D
David Howells 已提交
526 527 528
		rxrpc_free_skb(call->rxtx_buffer[i],
			       (call->tx_phase ? rxrpc_skb_tx_cleaned :
				rxrpc_skb_rx_cleaned));
529
		call->rxtx_buffer[i] = NULL;
530 531 532 533 534
	}

	_leave("");
}

535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589
/*
 * Prepare a kernel service call for retry.
 */
int rxrpc_prepare_call_for_retry(struct rxrpc_sock *rx, struct rxrpc_call *call)
{
	const void *here = __builtin_return_address(0);
	int i;
	u8 last = 0;

	_enter("{%d,%d}", call->debug_id, atomic_read(&call->usage));

	trace_rxrpc_call(call, rxrpc_call_release, atomic_read(&call->usage),
			 here, (const void *)call->flags);

	ASSERTCMP(call->state, ==, RXRPC_CALL_COMPLETE);
	ASSERTCMP(call->completion, !=, RXRPC_CALL_REMOTELY_ABORTED);
	ASSERTCMP(call->completion, !=, RXRPC_CALL_LOCALLY_ABORTED);
	ASSERT(list_empty(&call->recvmsg_link));

	del_timer_sync(&call->timer);

	_debug("RELEASE CALL %p (%d CONN %p)", call, call->debug_id, call->conn);

	if (call->conn)
		rxrpc_disconnect_call(call);

	if (rxrpc_is_service_call(call) ||
	    !call->tx_phase ||
	    call->tx_hard_ack != 0 ||
	    call->rx_hard_ack != 0 ||
	    call->rx_top != 0)
		return -EINVAL;

	call->state = RXRPC_CALL_UNINITIALISED;
	call->completion = RXRPC_CALL_SUCCEEDED;
	call->call_id = 0;
	call->cid = 0;
	call->cong_cwnd = 0;
	call->cong_extra = 0;
	call->cong_ssthresh = 0;
	call->cong_mode = 0;
	call->cong_dup_acks = 0;
	call->cong_cumul_acks = 0;
	call->acks_lowest_nak = 0;

	for (i = 0; i < RXRPC_RXTX_BUFF_SIZE; i++) {
		last |= call->rxtx_annotations[i];
		call->rxtx_annotations[i] &= RXRPC_TX_ANNO_LAST;
		call->rxtx_annotations[i] |= RXRPC_TX_ANNO_RETRANS;
	}

	_leave(" = 0");
	return 0;
}

590 591 592 593 594 595 596 597 598
/*
 * release all the calls associated with a socket
 */
void rxrpc_release_calls_on_socket(struct rxrpc_sock *rx)
{
	struct rxrpc_call *call;

	_enter("%p", rx);

599 600 601 602
	while (!list_empty(&rx->to_be_accepted)) {
		call = list_entry(rx->to_be_accepted.next,
				  struct rxrpc_call, accept_link);
		list_del(&call->accept_link);
603
		rxrpc_abort_call("SKR", call, 0, RX_CALL_DEAD, -ECONNRESET);
604 605 606
		rxrpc_put_call(call, rxrpc_call_put);
	}

607 608 609 610
	while (!list_empty(&rx->sock_calls)) {
		call = list_entry(rx->sock_calls.next,
				  struct rxrpc_call, sock_link);
		rxrpc_get_call(call, rxrpc_call_got);
611
		rxrpc_abort_call("SKT", call, 0, RX_CALL_DEAD, -ECONNRESET);
612
		rxrpc_send_abort_packet(call);
613
		rxrpc_release_call(rx, call);
614
		rxrpc_put_call(call, rxrpc_call_put);
615 616
	}

617 618 619 620 621 622
	_leave("");
}

/*
 * release a call
 */
623
void rxrpc_put_call(struct rxrpc_call *call, enum rxrpc_call_trace op)
624
{
625
	struct rxrpc_net *rxnet = call->rxnet;
D
David Howells 已提交
626
	const void *here = __builtin_return_address(0);
627
	int n;
628

D
David Howells 已提交
629
	ASSERT(call != NULL);
630

D
David Howells 已提交
631
	n = atomic_dec_return(&call->usage);
632
	trace_rxrpc_call(call, op, n, here, NULL);
D
David Howells 已提交
633 634 635
	ASSERTCMP(n, >=, 0);
	if (n == 0) {
		_debug("call %d dead", call->debug_id);
636
		ASSERTCMP(call->state, ==, RXRPC_CALL_COMPLETE);
637

638 639 640 641 642
		if (!list_empty(&call->link)) {
			write_lock(&rxnet->call_lock);
			list_del_init(&call->link);
			write_unlock(&rxnet->call_lock);
		}
D
David Howells 已提交
643

644
		rxrpc_cleanup_call(call);
645 646 647
	}
}

648 649 650 651 652 653
/*
 * Final call destruction under RCU.
 */
static void rxrpc_rcu_destroy_call(struct rcu_head *rcu)
{
	struct rxrpc_call *call = container_of(rcu, struct rxrpc_call, rcu);
654
	struct rxrpc_net *rxnet = call->rxnet;
655

656
	rxrpc_put_peer(call->peer);
657 658
	kfree(call->rxtx_buffer);
	kfree(call->rxtx_annotations);
659
	kmem_cache_free(rxrpc_call_jar, call);
660 661
	if (atomic_dec_and_test(&rxnet->nr_calls))
		wake_up_atomic_t(&rxnet->nr_calls);
662 663
}

664 665 666
/*
 * clean up a call
 */
667
void rxrpc_cleanup_call(struct rxrpc_call *call)
668
{
669
	int i;
670

671
	_net("DESTROY CALL %d", call->debug_id);
672 673 674

	memset(&call->sock_node, 0xcd, sizeof(call->sock_node));

675
	del_timer_sync(&call->timer);
676

677
	ASSERTCMP(call->state, ==, RXRPC_CALL_COMPLETE);
678
	ASSERT(test_bit(RXRPC_CALL_RELEASED, &call->flags));
679
	ASSERTCMP(call->conn, ==, NULL);
680

681 682
	/* Clean up the Rx/Tx buffer */
	for (i = 0; i < RXRPC_RXTX_BUFF_SIZE; i++)
D
David Howells 已提交
683 684 685
		rxrpc_free_skb(call->rxtx_buffer[i],
			       (call->tx_phase ? rxrpc_skb_tx_cleaned :
				rxrpc_skb_rx_cleaned));
686

D
David Howells 已提交
687
	rxrpc_free_skb(call->tx_pending, rxrpc_skb_tx_cleaned);
688

689
	call_rcu(&call->rcu, rxrpc_rcu_destroy_call);
690 691 692
}

/*
693 694 695
 * Make sure that all calls are gone from a network namespace.  To reach this
 * point, any open UDP sockets in that namespace must have been closed, so any
 * outstanding calls cannot be doing I/O.
696
 */
697
void rxrpc_destroy_all_calls(struct rxrpc_net *rxnet)
698 699 700 701
{
	struct rxrpc_call *call;

	_enter("");
702

703
	if (list_empty(&rxnet->calls))
704
		return;
705

706
	write_lock(&rxnet->call_lock);
707

708 709
	while (!list_empty(&rxnet->calls)) {
		call = list_entry(rxnet->calls.next, struct rxrpc_call, link);
710 711
		_debug("Zapping call %p", call);

D
David Howells 已提交
712
		rxrpc_see_call(call);
713 714
		list_del_init(&call->link);

715
		pr_err("Call %p still in use (%d,%s,%lx,%lx)!\n",
716 717 718
		       call, atomic_read(&call->usage),
		       rxrpc_call_states[call->state],
		       call->flags, call->events);
719

720
		write_unlock(&rxnet->call_lock);
721
		cond_resched();
722
		write_lock(&rxnet->call_lock);
723 724
	}

725
	write_unlock(&rxnet->call_lock);
726 727 728

	atomic_dec(&rxnet->nr_calls);
	wait_on_atomic_t(&rxnet->nr_calls, atomic_t_wait, TASK_UNINTERRUPTIBLE);
729
}