call_object.c 18.1 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11
/* RxRPC individual remote procedure call handling
 *
 * Copyright (C) 2007 Red Hat, Inc. All Rights Reserved.
 * Written by David Howells (dhowells@redhat.com)
 *
 * This program is free software; you can redistribute it and/or
 * modify it under the terms of the GNU General Public License
 * as published by the Free Software Foundation; either version
 * 2 of the License, or (at your option) any later version.
 */

12 13
#define pr_fmt(fmt) KBUILD_MODNAME ": " fmt

14
#include <linux/slab.h>
15 16
#include <linux/module.h>
#include <linux/circ_buf.h>
17
#include <linux/spinlock_types.h>
18 19 20 21
#include <net/sock.h>
#include <net/af_rxrpc.h>
#include "ar-internal.h"

22
const char *const rxrpc_call_states[NR__RXRPC_CALL_STATES] = {
23
	[RXRPC_CALL_UNINITIALISED]		= "Uninit  ",
24
	[RXRPC_CALL_CLIENT_AWAIT_CONN]		= "ClWtConn",
25 26 27
	[RXRPC_CALL_CLIENT_SEND_REQUEST]	= "ClSndReq",
	[RXRPC_CALL_CLIENT_AWAIT_REPLY]		= "ClAwtRpl",
	[RXRPC_CALL_CLIENT_RECV_REPLY]		= "ClRcvRpl",
28
	[RXRPC_CALL_SERVER_PREALLOC]		= "SvPrealc",
29 30 31 32 33 34 35
	[RXRPC_CALL_SERVER_SECURING]		= "SvSecure",
	[RXRPC_CALL_SERVER_ACCEPTING]		= "SvAccept",
	[RXRPC_CALL_SERVER_RECV_REQUEST]	= "SvRcvReq",
	[RXRPC_CALL_SERVER_ACK_REQUEST]		= "SvAckReq",
	[RXRPC_CALL_SERVER_SEND_REPLY]		= "SvSndRpl",
	[RXRPC_CALL_SERVER_AWAIT_ACK]		= "SvAwtACK",
	[RXRPC_CALL_COMPLETE]			= "Complete",
36 37 38 39
};

const char *const rxrpc_call_completions[NR__RXRPC_CALL_COMPLETIONS] = {
	[RXRPC_CALL_SUCCEEDED]			= "Complete",
40 41
	[RXRPC_CALL_REMOTELY_ABORTED]		= "RmtAbort",
	[RXRPC_CALL_LOCALLY_ABORTED]		= "LocAbort",
42
	[RXRPC_CALL_LOCAL_ERROR]		= "LocError",
43 44 45
	[RXRPC_CALL_NETWORK_ERROR]		= "NetError",
};

46 47
struct kmem_cache *rxrpc_call_jar;

48 49 50 51 52 53
static void rxrpc_call_timer_expired(unsigned long _call)
{
	struct rxrpc_call *call = (struct rxrpc_call *)_call;

	_enter("%d", call->debug_id);

54 55
	if (call->state < RXRPC_CALL_COMPLETE)
		rxrpc_set_timer(call, rxrpc_timer_expired, ktime_get_real());
56
}
57

58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88
/*
 * find an extant server call
 * - called in process context with IRQs enabled
 */
struct rxrpc_call *rxrpc_find_call_by_user_ID(struct rxrpc_sock *rx,
					      unsigned long user_call_ID)
{
	struct rxrpc_call *call;
	struct rb_node *p;

	_enter("%p,%lx", rx, user_call_ID);

	read_lock(&rx->call_lock);

	p = rx->calls.rb_node;
	while (p) {
		call = rb_entry(p, struct rxrpc_call, sock_node);

		if (user_call_ID < call->user_call_ID)
			p = p->rb_left;
		else if (user_call_ID > call->user_call_ID)
			p = p->rb_right;
		else
			goto found_extant_call;
	}

	read_unlock(&rx->call_lock);
	_leave(" = NULL");
	return NULL;

found_extant_call:
89
	rxrpc_get_call(call, rxrpc_call_got);
90 91 92 93 94
	read_unlock(&rx->call_lock);
	_leave(" = %p [%d]", call, atomic_read(&call->usage));
	return call;
}

95 96 97
/*
 * allocate a new call
 */
98
struct rxrpc_call *rxrpc_alloc_call(gfp_t gfp)
99 100 101 102 103 104 105
{
	struct rxrpc_call *call;

	call = kmem_cache_zalloc(rxrpc_call_jar, gfp);
	if (!call)
		return NULL;

106 107
	call->rxtx_buffer = kcalloc(RXRPC_RXTX_BUFF_SIZE,
				    sizeof(struct sk_buff *),
108
				    gfp);
109 110
	if (!call->rxtx_buffer)
		goto nomem;
111

112 113 114 115
	call->rxtx_annotations = kcalloc(RXRPC_RXTX_BUFF_SIZE, sizeof(u8), gfp);
	if (!call->rxtx_annotations)
		goto nomem_2;

116
	mutex_init(&call->user_mutex);
117 118
	setup_timer(&call->timer, rxrpc_call_timer_expired,
		    (unsigned long)call);
119
	INIT_WORK(&call->processor, &rxrpc_process_call);
120
	INIT_LIST_HEAD(&call->link);
121
	INIT_LIST_HEAD(&call->chan_wait_link);
122
	INIT_LIST_HEAD(&call->accept_link);
123 124
	INIT_LIST_HEAD(&call->recvmsg_link);
	INIT_LIST_HEAD(&call->sock_link);
125
	init_waitqueue_head(&call->waitq);
126 127 128 129
	spin_lock_init(&call->lock);
	rwlock_init(&call->state_lock);
	atomic_set(&call->usage, 1);
	call->debug_id = atomic_inc_return(&rxrpc_debug_id);
130
	call->tx_total_len = -1;
131 132 133

	memset(&call->sock_node, 0xed, sizeof(call->sock_node));

134
	/* Leave space in the ring to handle a maxed-out jumbo packet */
135
	call->rx_winsize = rxrpc_rx_window_size;
136 137
	call->tx_winsize = 16;
	call->rx_expect_next = 1;
D
David Howells 已提交
138

139
	call->cong_cwnd = 2;
D
David Howells 已提交
140
	call->cong_ssthresh = RXRPC_RXTX_BUFF_SIZE - 1;
141
	return call;
142 143 144 145 146 147

nomem_2:
	kfree(call->rxtx_buffer);
nomem:
	kmem_cache_free(rxrpc_call_jar, call);
	return NULL;
148 149 150
}

/*
151
 * Allocate a new client call.
152
 */
153
static struct rxrpc_call *rxrpc_alloc_client_call(struct sockaddr_rxrpc *srx,
154
						  gfp_t gfp)
155 156
{
	struct rxrpc_call *call;
D
David Howells 已提交
157
	ktime_t now;
158 159 160 161 162 163

	_enter("");

	call = rxrpc_alloc_call(gfp);
	if (!call)
		return ERR_PTR(-ENOMEM);
164 165
	call->state = RXRPC_CALL_CLIENT_AWAIT_CONN;
	call->service_id = srx->srx_service;
D
David Howells 已提交
166
	call->tx_phase = true;
D
David Howells 已提交
167 168 169
	now = ktime_get_real();
	call->acks_latest_ts = now;
	call->cong_tstamp = now;
170 171 172 173 174 175

	_leave(" = %p", call);
	return call;
}

/*
176
 * Initiate the call ack/resend/expiry timer.
177
 */
178
static void rxrpc_start_call_timer(struct rxrpc_call *call)
179
{
180
	ktime_t now = ktime_get_real(), expire_at;
181

182
	expire_at = ktime_add_ms(now, rxrpc_max_call_lifetime);
183 184
	call->expire_at = expire_at;
	call->ack_at = expire_at;
185
	call->ping_at = expire_at;
186
	call->resend_at = expire_at;
187 188
	call->timer.expires = jiffies + LONG_MAX / 2;
	rxrpc_set_timer(call, rxrpc_timer_begin, now);
189 190 191
}

/*
192 193 194
 * Set up a call for the given parameters.
 * - Called with the socket lock held, which it must release.
 * - If it returns a call, the call's lock will need releasing by the caller.
195
 */
196
struct rxrpc_call *rxrpc_new_client_call(struct rxrpc_sock *rx,
197
					 struct rxrpc_conn_parameters *cp,
198
					 struct sockaddr_rxrpc *srx,
199
					 unsigned long user_call_ID,
200
					 s64 tx_total_len,
201
					 gfp_t gfp)
202
	__releases(&rx->sk.sk_lock.slock)
203
{
204
	struct rxrpc_call *call, *xcall;
205
	struct rxrpc_net *rxnet = rxrpc_net(sock_net(&rx->sk));
206
	struct rb_node *parent, **pp;
D
David Howells 已提交
207
	const void *here = __builtin_return_address(0);
208
	int ret;
209

210
	_enter("%p,%lx", rx, user_call_ID);
211

212
	call = rxrpc_alloc_client_call(srx, gfp);
213
	if (IS_ERR(call)) {
214
		release_sock(&rx->sk);
215 216
		_leave(" = %ld", PTR_ERR(call));
		return call;
217 218
	}

219
	call->tx_total_len = tx_total_len;
220 221
	trace_rxrpc_call(call, rxrpc_call_new_client, atomic_read(&call->usage),
			 here, (const void *)user_call_ID);
D
David Howells 已提交
222

223 224 225 226 227
	/* We need to protect a partially set up call against the user as we
	 * will be acting outside the socket lock.
	 */
	mutex_lock(&call->user_mutex);

228
	/* Publish the call, even though it is incompletely set up as yet */
229 230 231 232 233 234
	write_lock(&rx->call_lock);

	pp = &rx->calls.rb_node;
	parent = NULL;
	while (*pp) {
		parent = *pp;
235
		xcall = rb_entry(parent, struct rxrpc_call, sock_node);
236

237
		if (user_call_ID < xcall->user_call_ID)
238
			pp = &(*pp)->rb_left;
239
		else if (user_call_ID > xcall->user_call_ID)
240 241
			pp = &(*pp)->rb_right;
		else
242
			goto error_dup_user_ID;
243 244
	}

245
	rcu_assign_pointer(call->socket, rx);
246 247
	call->user_call_ID = user_call_ID;
	__set_bit(RXRPC_CALL_HAS_USERID, &call->flags);
248
	rxrpc_get_call(call, rxrpc_call_got_userid);
249 250
	rb_link_node(&call->sock_node, parent, pp);
	rb_insert_color(&call->sock_node, &rx->calls);
251 252
	list_add(&call->sock_link, &rx->sock_calls);

253 254
	write_unlock(&rx->call_lock);

255 256 257
	write_lock(&rxnet->call_lock);
	list_add_tail(&call->link, &rxnet->calls);
	write_unlock(&rxnet->call_lock);
258

259 260 261
	/* From this point on, the call is protected by its own lock. */
	release_sock(&rx->sk);

262 263 264 265
	/* Set up or get a connection record and set the protocol parameters,
	 * including channel number and call ID.
	 */
	ret = rxrpc_connect_call(call, cp, srx, gfp);
266 267 268
	if (ret < 0)
		goto error;

269
	trace_rxrpc_call(call, rxrpc_call_connected, atomic_read(&call->usage),
270
			 here, NULL);
271

272 273
	rxrpc_start_call_timer(call);

274 275 276 277 278
	_net("CALL new %d on CONN %d", call->debug_id, call->conn->debug_id);

	_leave(" = %p [new]", call);
	return call;

279 280 281 282 283
	/* We unexpectedly found the user ID in the list after taking
	 * the call_lock.  This shouldn't happen unless the user races
	 * with itself and tries to add the same user ID twice at the
	 * same time in different threads.
	 */
284
error_dup_user_ID:
285
	write_unlock(&rx->call_lock);
286
	release_sock(&rx->sk);
287
	ret = -EEXIST;
288 289 290 291

error:
	__rxrpc_set_call_completion(call, RXRPC_CALL_LOCAL_ERROR,
				    RX_CALL_DEAD, ret);
292 293
	trace_rxrpc_call(call, rxrpc_call_error, atomic_read(&call->usage),
			 here, ERR_PTR(ret));
294
	rxrpc_release_call(rx, call);
295
	mutex_unlock(&call->user_mutex);
296 297 298
	rxrpc_put_call(call, rxrpc_call_put);
	_leave(" = %d", ret);
	return ERR_PTR(ret);
299 300
}

301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342
/*
 * Retry a call to a new address.  It is expected that the Tx queue of the call
 * will contain data previously packaged for an old call.
 */
int rxrpc_retry_client_call(struct rxrpc_sock *rx,
			    struct rxrpc_call *call,
			    struct rxrpc_conn_parameters *cp,
			    struct sockaddr_rxrpc *srx,
			    gfp_t gfp)
{
	const void *here = __builtin_return_address(0);
	int ret;

	/* Set up or get a connection record and set the protocol parameters,
	 * including channel number and call ID.
	 */
	ret = rxrpc_connect_call(call, cp, srx, gfp);
	if (ret < 0)
		goto error;

	trace_rxrpc_call(call, rxrpc_call_connected, atomic_read(&call->usage),
			 here, NULL);

	rxrpc_start_call_timer(call);

	_net("CALL new %d on CONN %d", call->debug_id, call->conn->debug_id);

	if (!test_and_set_bit(RXRPC_CALL_EV_RESEND, &call->events))
		rxrpc_queue_call(call);

	_leave(" = 0");
	return 0;

error:
	rxrpc_set_call_completion(call, RXRPC_CALL_LOCAL_ERROR,
				  RX_CALL_DEAD, ret);
	trace_rxrpc_call(call, rxrpc_call_error, atomic_read(&call->usage),
			 here, ERR_PTR(ret));
	_leave(" = %d", ret);
	return ret;
}

343
/*
344 345
 * Set up an incoming call.  call->conn points to the connection.
 * This is called in BH context and isn't allowed to fail.
346
 */
347 348 349
void rxrpc_incoming_call(struct rxrpc_sock *rx,
			 struct rxrpc_call *call,
			 struct sk_buff *skb)
350
{
351
	struct rxrpc_connection *conn = call->conn;
352
	struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
353
	u32 chan;
354

355
	_enter(",%d", call->conn->debug_id);
D
David Howells 已提交
356

357 358 359 360 361 362 363
	rcu_assign_pointer(call->socket, rx);
	call->call_id		= sp->hdr.callNumber;
	call->service_id	= sp->hdr.serviceId;
	call->cid		= sp->hdr.cid;
	call->state		= RXRPC_CALL_SERVER_ACCEPTING;
	if (sp->hdr.securityIndex > 0)
		call->state	= RXRPC_CALL_SERVER_SECURING;
D
David Howells 已提交
364
	call->cong_tstamp	= skb->tstamp;
365 366 367 368 369 370

	/* Set the channel for this call.  We don't get channel_lock as we're
	 * only defending against the data_ready handler (which we're called
	 * from) and the RESPONSE packet parser (which is only really
	 * interested in call_counter and can cope with a disagreement with the
	 * call pointer).
371
	 */
372 373 374
	chan = sp->hdr.cid & RXRPC_CHANNELMASK;
	conn->channels[chan].call_counter = call->call_id;
	conn->channels[chan].call_id = call->call_id;
375
	rcu_assign_pointer(conn->channels[chan].call, call);
376

377 378 379
	spin_lock(&conn->params.peer->lock);
	hlist_add_head(&call->error_link, &conn->params.peer->error_targets);
	spin_unlock(&conn->params.peer->lock);
380 381 382

	_net("CALL incoming %d on CONN %d", call->debug_id, call->conn->debug_id);

383 384
	rxrpc_start_call_timer(call);
	_leave("");
385 386
}

387 388 389 390 391 392 393 394 395 396
/*
 * Queue a call's work processor, getting a ref to pass to the work queue.
 */
bool rxrpc_queue_call(struct rxrpc_call *call)
{
	const void *here = __builtin_return_address(0);
	int n = __atomic_add_unless(&call->usage, 1, 0);
	if (n == 0)
		return false;
	if (rxrpc_queue_work(&call->processor))
397
		trace_rxrpc_call(call, rxrpc_call_queued, n + 1, here, NULL);
398 399 400 401 402 403 404 405 406 407 408 409 410 411
	else
		rxrpc_put_call(call, rxrpc_call_put_noqueue);
	return true;
}

/*
 * Queue a call's work processor, passing the callers ref to the work queue.
 */
bool __rxrpc_queue_call(struct rxrpc_call *call)
{
	const void *here = __builtin_return_address(0);
	int n = atomic_read(&call->usage);
	ASSERTCMP(n, >=, 1);
	if (rxrpc_queue_work(&call->processor))
412
		trace_rxrpc_call(call, rxrpc_call_queued_ref, n, here, NULL);
413 414 415 416 417
	else
		rxrpc_put_call(call, rxrpc_call_put_noqueue);
	return true;
}

D
David Howells 已提交
418 419 420 421 422 423 424 425 426
/*
 * Note the re-emergence of a call.
 */
void rxrpc_see_call(struct rxrpc_call *call)
{
	const void *here = __builtin_return_address(0);
	if (call) {
		int n = atomic_read(&call->usage);

427
		trace_rxrpc_call(call, rxrpc_call_seen, n, here, NULL);
D
David Howells 已提交
428 429 430 431 432 433
	}
}

/*
 * Note the addition of a ref on a call.
 */
434
void rxrpc_get_call(struct rxrpc_call *call, enum rxrpc_call_trace op)
D
David Howells 已提交
435 436 437 438
{
	const void *here = __builtin_return_address(0);
	int n = atomic_inc_return(&call->usage);

439
	trace_rxrpc_call(call, op, n, here, NULL);
D
David Howells 已提交
440 441 442
}

/*
443
 * Detach a call from its owning socket.
D
David Howells 已提交
444
 */
445
void rxrpc_release_call(struct rxrpc_sock *rx, struct rxrpc_call *call)
D
David Howells 已提交
446
{
447
	const void *here = __builtin_return_address(0);
448 449 450
	struct rxrpc_connection *conn = call->conn;
	bool put = false;
	int i;
D
David Howells 已提交
451

452
	_enter("{%d,%d}", call->debug_id, atomic_read(&call->usage));
D
David Howells 已提交
453

454 455
	trace_rxrpc_call(call, rxrpc_call_release, atomic_read(&call->usage),
			 here, (const void *)call->flags);
456

457
	ASSERTCMP(call->state, ==, RXRPC_CALL_COMPLETE);
D
David Howells 已提交
458

459 460 461 462 463
	spin_lock_bh(&call->lock);
	if (test_and_set_bit(RXRPC_CALL_RELEASED, &call->flags))
		BUG();
	spin_unlock_bh(&call->lock);

464
	del_timer_sync(&call->timer);
465

466 467
	/* Make sure we don't get any more notifications */
	write_lock_bh(&rx->recvmsg_lock);
468

469
	if (!list_empty(&call->recvmsg_link)) {
470 471
		_debug("unlinking once-pending call %p { e=%lx f=%lx }",
		       call, call->events, call->flags);
472 473 474 475 476 477 478 479 480 481 482 483 484 485 486
		list_del(&call->recvmsg_link);
		put = true;
	}

	/* list_empty() must return false in rxrpc_notify_socket() */
	call->recvmsg_link.next = NULL;
	call->recvmsg_link.prev = NULL;

	write_unlock_bh(&rx->recvmsg_lock);
	if (put)
		rxrpc_put_call(call, rxrpc_call_put);

	write_lock(&rx->call_lock);

	if (test_and_clear_bit(RXRPC_CALL_HAS_USERID, &call->flags)) {
487 488
		rb_erase(&call->sock_node, &rx->calls);
		memset(&call->sock_node, 0xdd, sizeof(call->sock_node));
489
		rxrpc_put_call(call, rxrpc_call_put_userid);
490 491
	}

492 493 494 495 496 497
	list_del(&call->sock_link);
	write_unlock(&rx->call_lock);

	_debug("RELEASE CALL %p (%d CONN %p)", call, call->debug_id, conn);

	if (conn)
498
		rxrpc_disconnect_call(call);
499

500
	for (i = 0; i < RXRPC_RXTX_BUFF_SIZE; i++) {
D
David Howells 已提交
501 502 503
		rxrpc_free_skb(call->rxtx_buffer[i],
			       (call->tx_phase ? rxrpc_skb_tx_cleaned :
				rxrpc_skb_rx_cleaned));
504
		call->rxtx_buffer[i] = NULL;
505 506 507 508 509
	}

	_leave("");
}

510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564
/*
 * Prepare a kernel service call for retry.
 */
int rxrpc_prepare_call_for_retry(struct rxrpc_sock *rx, struct rxrpc_call *call)
{
	const void *here = __builtin_return_address(0);
	int i;
	u8 last = 0;

	_enter("{%d,%d}", call->debug_id, atomic_read(&call->usage));

	trace_rxrpc_call(call, rxrpc_call_release, atomic_read(&call->usage),
			 here, (const void *)call->flags);

	ASSERTCMP(call->state, ==, RXRPC_CALL_COMPLETE);
	ASSERTCMP(call->completion, !=, RXRPC_CALL_REMOTELY_ABORTED);
	ASSERTCMP(call->completion, !=, RXRPC_CALL_LOCALLY_ABORTED);
	ASSERT(list_empty(&call->recvmsg_link));

	del_timer_sync(&call->timer);

	_debug("RELEASE CALL %p (%d CONN %p)", call, call->debug_id, call->conn);

	if (call->conn)
		rxrpc_disconnect_call(call);

	if (rxrpc_is_service_call(call) ||
	    !call->tx_phase ||
	    call->tx_hard_ack != 0 ||
	    call->rx_hard_ack != 0 ||
	    call->rx_top != 0)
		return -EINVAL;

	call->state = RXRPC_CALL_UNINITIALISED;
	call->completion = RXRPC_CALL_SUCCEEDED;
	call->call_id = 0;
	call->cid = 0;
	call->cong_cwnd = 0;
	call->cong_extra = 0;
	call->cong_ssthresh = 0;
	call->cong_mode = 0;
	call->cong_dup_acks = 0;
	call->cong_cumul_acks = 0;
	call->acks_lowest_nak = 0;

	for (i = 0; i < RXRPC_RXTX_BUFF_SIZE; i++) {
		last |= call->rxtx_annotations[i];
		call->rxtx_annotations[i] &= RXRPC_TX_ANNO_LAST;
		call->rxtx_annotations[i] |= RXRPC_TX_ANNO_RETRANS;
	}

	_leave(" = 0");
	return 0;
}

565 566 567 568 569 570 571 572 573
/*
 * release all the calls associated with a socket
 */
void rxrpc_release_calls_on_socket(struct rxrpc_sock *rx)
{
	struct rxrpc_call *call;

	_enter("%p", rx);

574 575 576 577
	while (!list_empty(&rx->to_be_accepted)) {
		call = list_entry(rx->to_be_accepted.next,
				  struct rxrpc_call, accept_link);
		list_del(&call->accept_link);
578
		rxrpc_abort_call("SKR", call, 0, RX_CALL_DEAD, -ECONNRESET);
579 580 581
		rxrpc_put_call(call, rxrpc_call_put);
	}

582 583 584 585
	while (!list_empty(&rx->sock_calls)) {
		call = list_entry(rx->sock_calls.next,
				  struct rxrpc_call, sock_link);
		rxrpc_get_call(call, rxrpc_call_got);
586
		rxrpc_abort_call("SKT", call, 0, RX_CALL_DEAD, -ECONNRESET);
587
		rxrpc_send_abort_packet(call);
588
		rxrpc_release_call(rx, call);
589
		rxrpc_put_call(call, rxrpc_call_put);
590 591
	}

592 593 594 595 596 597
	_leave("");
}

/*
 * release a call
 */
598
void rxrpc_put_call(struct rxrpc_call *call, enum rxrpc_call_trace op)
599
{
600
	struct rxrpc_net *rxnet;
D
David Howells 已提交
601
	const void *here = __builtin_return_address(0);
602
	int n;
603

D
David Howells 已提交
604
	ASSERT(call != NULL);
605

D
David Howells 已提交
606
	n = atomic_dec_return(&call->usage);
607
	trace_rxrpc_call(call, op, n, here, NULL);
D
David Howells 已提交
608 609 610
	ASSERTCMP(n, >=, 0);
	if (n == 0) {
		_debug("call %d dead", call->debug_id);
611
		ASSERTCMP(call->state, ==, RXRPC_CALL_COMPLETE);
612

613 614 615 616 617 618
		if (!list_empty(&call->link)) {
			rxnet = rxrpc_net(sock_net(&call->socket->sk));
			write_lock(&rxnet->call_lock);
			list_del_init(&call->link);
			write_unlock(&rxnet->call_lock);
		}
D
David Howells 已提交
619

620
		rxrpc_cleanup_call(call);
621 622 623
	}
}

624 625 626 627 628 629 630
/*
 * Final call destruction under RCU.
 */
static void rxrpc_rcu_destroy_call(struct rcu_head *rcu)
{
	struct rxrpc_call *call = container_of(rcu, struct rxrpc_call, rcu);

631
	rxrpc_put_peer(call->peer);
632 633
	kfree(call->rxtx_buffer);
	kfree(call->rxtx_annotations);
634 635 636
	kmem_cache_free(rxrpc_call_jar, call);
}

637 638 639
/*
 * clean up a call
 */
640
void rxrpc_cleanup_call(struct rxrpc_call *call)
641
{
642
	int i;
643

644
	_net("DESTROY CALL %d", call->debug_id);
645 646 647

	memset(&call->sock_node, 0xcd, sizeof(call->sock_node));

648
	del_timer_sync(&call->timer);
649

650
	ASSERTCMP(call->state, ==, RXRPC_CALL_COMPLETE);
651
	ASSERT(test_bit(RXRPC_CALL_RELEASED, &call->flags));
652
	ASSERTCMP(call->conn, ==, NULL);
653

654 655
	/* Clean up the Rx/Tx buffer */
	for (i = 0; i < RXRPC_RXTX_BUFF_SIZE; i++)
D
David Howells 已提交
656 657 658
		rxrpc_free_skb(call->rxtx_buffer[i],
			       (call->tx_phase ? rxrpc_skb_tx_cleaned :
				rxrpc_skb_rx_cleaned));
659

D
David Howells 已提交
660
	rxrpc_free_skb(call->tx_pending, rxrpc_skb_tx_cleaned);
661

662
	call_rcu(&call->rcu, rxrpc_rcu_destroy_call);
663 664 665
}

/*
666 667 668
 * Make sure that all calls are gone from a network namespace.  To reach this
 * point, any open UDP sockets in that namespace must have been closed, so any
 * outstanding calls cannot be doing I/O.
669
 */
670
void rxrpc_destroy_all_calls(struct rxrpc_net *rxnet)
671 672 673 674
{
	struct rxrpc_call *call;

	_enter("");
675

676
	if (list_empty(&rxnet->calls))
677
		return;
678

679
	write_lock(&rxnet->call_lock);
680

681 682
	while (!list_empty(&rxnet->calls)) {
		call = list_entry(rxnet->calls.next, struct rxrpc_call, link);
683 684
		_debug("Zapping call %p", call);

D
David Howells 已提交
685
		rxrpc_see_call(call);
686 687
		list_del_init(&call->link);

688
		pr_err("Call %p still in use (%d,%s,%lx,%lx)!\n",
689 690 691
		       call, atomic_read(&call->usage),
		       rxrpc_call_states[call->state],
		       call->flags, call->events);
692

693
		write_unlock(&rxnet->call_lock);
694
		cond_resched();
695
		write_lock(&rxnet->call_lock);
696 697
	}

698
	write_unlock(&rxnet->call_lock);
699
}