xprtsock.c 42.9 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13
/*
 * linux/net/sunrpc/xprtsock.c
 *
 * Client-side transport implementation for sockets.
 *
 * TCP callback races fixes (C) 1998 Red Hat Software <alan@redhat.com>
 * TCP send fixes (C) 1998 Red Hat Software <alan@redhat.com>
 * TCP NFS related read + write fixes
 *  (C) 1999 Dave Airlie, University of Limerick, Ireland <airlied@linux.ie>
 *
 * Rewrite of larges part of the code in order to stabilize TCP stuff.
 * Fix behaviour when socket buffer is full.
 *  (C) 1999 Trond Myklebust <trond.myklebust@fys.uio.no>
14 15
 *
 * IP socket transport implementation, (C) 2005 Chuck Lever <cel@netapp.com>
16 17 18 19 20 21 22 23 24 25 26 27 28 29
 */

#include <linux/types.h>
#include <linux/slab.h>
#include <linux/capability.h>
#include <linux/pagemap.h>
#include <linux/errno.h>
#include <linux/socket.h>
#include <linux/in.h>
#include <linux/net.h>
#include <linux/mm.h>
#include <linux/udp.h>
#include <linux/tcp.h>
#include <linux/sunrpc/clnt.h>
30
#include <linux/sunrpc/sched.h>
31 32 33 34 35 36 37
#include <linux/file.h>

#include <net/sock.h>
#include <net/checksum.h>
#include <net/udp.h>
#include <net/tcp.h>

38 39 40 41 42 43 44 45 46
/*
 * xprtsock tunables
 */
unsigned int xprt_udp_slot_table_entries = RPC_DEF_SLOT_TABLE;
unsigned int xprt_tcp_slot_table_entries = RPC_DEF_SLOT_TABLE;

unsigned int xprt_min_resvport = RPC_DEF_MIN_RESVPORT;
unsigned int xprt_max_resvport = RPC_DEF_MAX_RESVPORT;

47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132
/*
 * We can register our own files under /proc/sys/sunrpc by
 * calling register_sysctl_table() again.  The files in that
 * directory become the union of all files registered there.
 *
 * We simply need to make sure that we don't collide with
 * someone else's file names!
 */

#ifdef RPC_DEBUG

static unsigned int min_slot_table_size = RPC_MIN_SLOT_TABLE;
static unsigned int max_slot_table_size = RPC_MAX_SLOT_TABLE;
static unsigned int xprt_min_resvport_limit = RPC_MIN_RESVPORT;
static unsigned int xprt_max_resvport_limit = RPC_MAX_RESVPORT;

static struct ctl_table_header *sunrpc_table_header;

/*
 * FIXME: changing the UDP slot table size should also resize the UDP
 *        socket buffers for existing UDP transports
 */
static ctl_table xs_tunables_table[] = {
	{
		.ctl_name	= CTL_SLOTTABLE_UDP,
		.procname	= "udp_slot_table_entries",
		.data		= &xprt_udp_slot_table_entries,
		.maxlen		= sizeof(unsigned int),
		.mode		= 0644,
		.proc_handler	= &proc_dointvec_minmax,
		.strategy	= &sysctl_intvec,
		.extra1		= &min_slot_table_size,
		.extra2		= &max_slot_table_size
	},
	{
		.ctl_name	= CTL_SLOTTABLE_TCP,
		.procname	= "tcp_slot_table_entries",
		.data		= &xprt_tcp_slot_table_entries,
		.maxlen		= sizeof(unsigned int),
		.mode		= 0644,
		.proc_handler	= &proc_dointvec_minmax,
		.strategy	= &sysctl_intvec,
		.extra1		= &min_slot_table_size,
		.extra2		= &max_slot_table_size
	},
	{
		.ctl_name	= CTL_MIN_RESVPORT,
		.procname	= "min_resvport",
		.data		= &xprt_min_resvport,
		.maxlen		= sizeof(unsigned int),
		.mode		= 0644,
		.proc_handler	= &proc_dointvec_minmax,
		.strategy	= &sysctl_intvec,
		.extra1		= &xprt_min_resvport_limit,
		.extra2		= &xprt_max_resvport_limit
	},
	{
		.ctl_name	= CTL_MAX_RESVPORT,
		.procname	= "max_resvport",
		.data		= &xprt_max_resvport,
		.maxlen		= sizeof(unsigned int),
		.mode		= 0644,
		.proc_handler	= &proc_dointvec_minmax,
		.strategy	= &sysctl_intvec,
		.extra1		= &xprt_min_resvport_limit,
		.extra2		= &xprt_max_resvport_limit
	},
	{
		.ctl_name = 0,
	},
};

static ctl_table sunrpc_table[] = {
	{
		.ctl_name	= CTL_SUNRPC,
		.procname	= "sunrpc",
		.mode		= 0555,
		.child		= xs_tunables_table
	},
	{
		.ctl_name = 0,
	},
};

#endif

133 134 135 136 137 138
/*
 * How many times to try sending a request on a socket before waiting
 * for the socket buffer to clear.
 */
#define XS_SENDMSG_RETRY	(10U)

139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182
/*
 * Time out for an RPC UDP socket connect.  UDP socket connects are
 * synchronous, but we set a timeout anyway in case of resource
 * exhaustion on the local host.
 */
#define XS_UDP_CONN_TO		(5U * HZ)

/*
 * Wait duration for an RPC TCP connection to be established.  Solaris
 * NFS over TCP uses 60 seconds, for example, which is in line with how
 * long a server takes to reboot.
 */
#define XS_TCP_CONN_TO		(60U * HZ)

/*
 * Wait duration for a reply from the RPC portmapper.
 */
#define XS_BIND_TO		(60U * HZ)

/*
 * Delay if a UDP socket connect error occurs.  This is most likely some
 * kind of resource problem on the local host.
 */
#define XS_UDP_REEST_TO		(2U * HZ)

/*
 * The reestablish timeout allows clients to delay for a bit before attempting
 * to reconnect to a server that just dropped our connection.
 *
 * We implement an exponential backoff when trying to reestablish a TCP
 * transport connection with the server.  Some servers like to drop a TCP
 * connection when they are overworked, so we start with a short timeout and
 * increase over time if the server is down or not responding.
 */
#define XS_TCP_INIT_REEST_TO	(3U * HZ)
#define XS_TCP_MAX_REEST_TO	(5U * 60 * HZ)

/*
 * TCP idle timeout; client drops the transport socket if it is idle
 * for this long.  Note that we also timeout UDP sockets to prevent
 * holding port numbers when there is no RPC traffic.
 */
#define XS_IDLE_DISC_TO		(5U * 60 * HZ)

183 184
#ifdef RPC_DEBUG
# undef  RPC_DEBUG_DATA
185
# define RPCDBG_FACILITY	RPCDBG_TRANS
186 187 188
#endif

#ifdef RPC_DEBUG_DATA
189
static void xs_pktdump(char *msg, u32 *packet, unsigned int count)
190
{
191 192
	u8 *buf = (u8 *) packet;
	int j;
193

194
	dprintk("RPC:       %s\n", msg);
195 196 197 198 199 200 201 202 203 204 205 206
	for (j = 0; j < count && j < 128; j += 4) {
		if (!(j & 31)) {
			if (j)
				dprintk("\n");
			dprintk("0x%04x ", j);
		}
		dprintk("%02x%02x%02x%02x ",
			buf[j], buf[j+1], buf[j+2], buf[j+3]);
	}
	dprintk("\n");
}
#else
207
static inline void xs_pktdump(char *msg, u32 *packet, unsigned int count)
208 209 210 211 212
{
	/* NOP */
}
#endif

213 214
struct sock_xprt {
	struct rpc_xprt		xprt;
215 216 217 218 219 220

	/*
	 * Network layer
	 */
	struct socket *		sock;
	struct sock *		inet;
221 222 223 224 225 226 227 228 229 230 231 232

	/*
	 * State of TCP reply receive
	 */
	__be32			tcp_fraghdr,
				tcp_xid;

	u32			tcp_offset,
				tcp_reclen;

	unsigned long		tcp_copied,
				tcp_flags;
233 234 235 236

	/*
	 * Connection of transports
	 */
237
	struct delayed_work	connect_worker;
238
	struct sockaddr_storage	addr;
239
	unsigned short		port;
240 241 242 243 244 245

	/*
	 * UDP socket buffer size parameters
	 */
	size_t			rcvsize,
				sndsize;
246 247 248 249 250 251 252

	/*
	 * Saved socket callback addresses
	 */
	void			(*old_data_ready)(struct sock *, int);
	void			(*old_state_change)(struct sock *);
	void			(*old_write_space)(struct sock *);
253 254
};

255 256 257 258 259 260 261 262
/*
 * TCP receive state flags
 */
#define TCP_RCV_LAST_FRAG	(1UL << 0)
#define TCP_RCV_COPY_FRAGHDR	(1UL << 1)
#define TCP_RCV_COPY_XID	(1UL << 2)
#define TCP_RCV_COPY_DATA	(1UL << 3)

263 264 265 266 267 268 269
static void xs_format_peer_addresses(struct rpc_xprt *xprt)
{
	struct sockaddr_in *addr = (struct sockaddr_in *) &xprt->addr;
	char *buf;

	buf = kzalloc(20, GFP_KERNEL);
	if (buf) {
270
		snprintf(buf, 20, NIPQUAD_FMT,
271 272 273 274 275 276 277 278 279 280 281
				NIPQUAD(addr->sin_addr.s_addr));
	}
	xprt->address_strings[RPC_DISPLAY_ADDR] = buf;

	buf = kzalloc(8, GFP_KERNEL);
	if (buf) {
		snprintf(buf, 8, "%u",
				ntohs(addr->sin_port));
	}
	xprt->address_strings[RPC_DISPLAY_PORT] = buf;

282 283 284 285 286 287 288 289
	buf = kzalloc(8, GFP_KERNEL);
	if (buf) {
		if (xprt->prot == IPPROTO_UDP)
			snprintf(buf, 8, "udp");
		else
			snprintf(buf, 8, "tcp");
	}
	xprt->address_strings[RPC_DISPLAY_PROTO] = buf;
290 291 292

	buf = kzalloc(48, GFP_KERNEL);
	if (buf) {
293
		snprintf(buf, 48, "addr="NIPQUAD_FMT" port=%u proto=%s",
294 295 296 297 298 299 300 301 302
			NIPQUAD(addr->sin_addr.s_addr),
			ntohs(addr->sin_port),
			xprt->prot == IPPROTO_UDP ? "udp" : "tcp");
	}
	xprt->address_strings[RPC_DISPLAY_ALL] = buf;
}

static void xs_free_peer_addresses(struct rpc_xprt *xprt)
{
303 304 305 306
	int i;

	for (i = 0; i < RPC_DISPLAY_MAX; i++)
		kfree(xprt->address_strings[i]);
307 308
}

309 310
#define XS_SENDMSG_FLAGS	(MSG_DONTWAIT | MSG_NOSIGNAL)

T
Trond Myklebust 已提交
311
static int xs_send_kvec(struct socket *sock, struct sockaddr *addr, int addrlen, struct kvec *vec, unsigned int base, int more)
312 313 314 315
{
	struct msghdr msg = {
		.msg_name	= addr,
		.msg_namelen	= addrlen,
T
Trond Myklebust 已提交
316 317 318 319 320
		.msg_flags	= XS_SENDMSG_FLAGS | (more ? MSG_MORE : 0),
	};
	struct kvec iov = {
		.iov_base	= vec->iov_base + base,
		.iov_len	= vec->iov_len - base,
321 322
	};

T
Trond Myklebust 已提交
323
	if (iov.iov_len != 0)
324 325 326 327
		return kernel_sendmsg(sock, &msg, &iov, 1, iov.iov_len);
	return kernel_sendmsg(sock, &msg, NULL, 0, 0);
}

T
Trond Myklebust 已提交
328
static int xs_send_pagedata(struct socket *sock, struct xdr_buf *xdr, unsigned int base, int more)
329
{
T
Trond Myklebust 已提交
330 331 332 333 334 335 336 337 338 339 340
	struct page **ppage;
	unsigned int remainder;
	int err, sent = 0;

	remainder = xdr->page_len - base;
	base += xdr->page_base;
	ppage = xdr->pages + (base >> PAGE_SHIFT);
	base &= ~PAGE_MASK;
	for(;;) {
		unsigned int len = min_t(unsigned int, PAGE_SIZE - base, remainder);
		int flags = XS_SENDMSG_FLAGS;
341

T
Trond Myklebust 已提交
342 343 344 345 346 347 348 349 350 351 352 353 354 355 356
		remainder -= len;
		if (remainder != 0 || more)
			flags |= MSG_MORE;
		err = sock->ops->sendpage(sock, *ppage, base, len, flags);
		if (remainder == 0 || err != len)
			break;
		sent += err;
		ppage++;
		base = 0;
	}
	if (sent == 0)
		return err;
	if (err > 0)
		sent += err;
	return sent;
357 358
}

359 360 361 362 363 364 365 366
/**
 * xs_sendpages - write pages directly to a socket
 * @sock: socket to send on
 * @addr: UDP only -- address of destination
 * @addrlen: UDP only -- length of destination address
 * @xdr: buffer containing this request
 * @base: starting position in the buffer
 *
367
 */
T
Trond Myklebust 已提交
368
static int xs_sendpages(struct socket *sock, struct sockaddr *addr, int addrlen, struct xdr_buf *xdr, unsigned int base)
369
{
T
Trond Myklebust 已提交
370 371
	unsigned int remainder = xdr->len - base;
	int err, sent = 0;
372

373 374 375 376
	if (unlikely(!sock))
		return -ENOTCONN;

	clear_bit(SOCK_ASYNC_NOSPACE, &sock->flags);
T
Trond Myklebust 已提交
377 378 379 380
	if (base != 0) {
		addr = NULL;
		addrlen = 0;
	}
381

T
Trond Myklebust 已提交
382 383 384 385 386
	if (base < xdr->head[0].iov_len || addr != NULL) {
		unsigned int len = xdr->head[0].iov_len - base;
		remainder -= len;
		err = xs_send_kvec(sock, addr, addrlen, &xdr->head[0], base, remainder != 0);
		if (remainder == 0 || err != len)
387
			goto out;
T
Trond Myklebust 已提交
388
		sent += err;
389 390
		base = 0;
	} else
T
Trond Myklebust 已提交
391
		base -= xdr->head[0].iov_len;
392

T
Trond Myklebust 已提交
393 394 395 396 397
	if (base < xdr->page_len) {
		unsigned int len = xdr->page_len - base;
		remainder -= len;
		err = xs_send_pagedata(sock, xdr, base, remainder != 0);
		if (remainder == 0 || err != len)
398
			goto out;
T
Trond Myklebust 已提交
399
		sent += err;
400
		base = 0;
T
Trond Myklebust 已提交
401 402 403 404 405 406
	} else
		base -= xdr->page_len;

	if (base >= xdr->tail[0].iov_len)
		return sent;
	err = xs_send_kvec(sock, NULL, 0, &xdr->tail[0], base, 0);
407
out:
T
Trond Myklebust 已提交
408 409 410 411 412
	if (sent == 0)
		return err;
	if (err > 0)
		sent += err;
	return sent;
413 414
}

415
/**
416 417
 * xs_nospace - place task on wait queue if transmit was incomplete
 * @task: task to put to sleep
418
 *
419
 */
420
static void xs_nospace(struct rpc_task *task)
421
{
422 423
	struct rpc_rqst *req = task->tk_rqstp;
	struct rpc_xprt *xprt = req->rq_xprt;
424
	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
425

426
	dprintk("RPC: %5u xmit incomplete (%u left of %u)\n",
427 428 429
			task->tk_pid, req->rq_slen - req->rq_bytes_sent,
			req->rq_slen);

430
	if (test_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags)) {
431 432 433 434 435 436
		/* Protect against races with write_space */
		spin_lock_bh(&xprt->transport_lock);

		/* Don't race with disconnect */
		if (!xprt_connected(xprt))
			task->tk_status = -ENOTCONN;
437
		else if (test_bit(SOCK_NOSPACE, &transport->sock->flags))
438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460
			xprt_wait_for_buffer_space(task);

		spin_unlock_bh(&xprt->transport_lock);
	} else
		/* Keep holding the socket if it is blocked */
		rpc_delay(task, HZ>>4);
}

/**
 * xs_udp_send_request - write an RPC request to a UDP socket
 * @task: address of RPC task that manages the state of an RPC request
 *
 * Return values:
 *        0:	The request has been sent
 *   EAGAIN:	The socket was blocked, please call again later to
 *		complete the request
 * ENOTCONN:	Caller needs to invoke connect logic then call again
 *    other:	Some other error occured, the request was not sent
 */
static int xs_udp_send_request(struct rpc_task *task)
{
	struct rpc_rqst *req = task->tk_rqstp;
	struct rpc_xprt *xprt = req->rq_xprt;
461
	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
462 463
	struct xdr_buf *xdr = &req->rq_snd_buf;
	int status;
464

465
	xs_pktdump("packet data:",
466 467 468
				req->rq_svec->iov_base,
				req->rq_svec->iov_len);

469
	req->rq_xtime = jiffies;
470 471 472 473
	status = xs_sendpages(transport->sock,
			      (struct sockaddr *) &xprt->addr,
			      xprt->addrlen, xdr,
			      req->rq_bytes_sent);
474

475
	dprintk("RPC:       xs_udp_send_request(%u) = %d\n",
476
			xdr->len - req->rq_bytes_sent, status);
477

478 479
	if (likely(status >= (int) req->rq_slen))
		return 0;
480

481 482 483
	/* Still some bytes left; set up for a retry later. */
	if (status > 0)
		status = -EAGAIN;
484

485 486 487
	switch (status) {
	case -ENETUNREACH:
	case -EPIPE:
488 489
	case -ECONNREFUSED:
		/* When the server has died, an ICMP port unreachable message
490
		 * prompts ECONNREFUSED. */
491
		break;
492 493
	case -EAGAIN:
		xs_nospace(task);
494 495
		break;
	default:
496
		dprintk("RPC:       sendmsg returned unrecognized error %d\n",
497
			-status);
498
		break;
499
	}
500 501

	return status;
502 503
}

504 505 506 507 508 509 510
static inline void xs_encode_tcp_record_marker(struct xdr_buf *buf)
{
	u32 reclen = buf->len - sizeof(rpc_fraghdr);
	rpc_fraghdr *base = buf->head[0].iov_base;
	*base = htonl(RPC_LAST_STREAM_FRAGMENT | reclen);
}

511
/**
512
 * xs_tcp_send_request - write an RPC request to a TCP socket
513 514 515
 * @task: address of RPC task that manages the state of an RPC request
 *
 * Return values:
516 517 518 519 520
 *        0:	The request has been sent
 *   EAGAIN:	The socket was blocked, please call again later to
 *		complete the request
 * ENOTCONN:	Caller needs to invoke connect logic then call again
 *    other:	Some other error occured, the request was not sent
521 522
 *
 * XXX: In the case of soft timeouts, should we eventually give up
523
 *	if sendmsg is not able to make progress?
524
 */
525
static int xs_tcp_send_request(struct rpc_task *task)
526 527 528
{
	struct rpc_rqst *req = task->tk_rqstp;
	struct rpc_xprt *xprt = req->rq_xprt;
529
	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
530
	struct xdr_buf *xdr = &req->rq_snd_buf;
531 532
	int status;
	unsigned int retry = 0;
533

534
	xs_encode_tcp_record_marker(&req->rq_snd_buf);
535

536 537 538
	xs_pktdump("packet data:",
				req->rq_svec->iov_base,
				req->rq_svec->iov_len);
539 540 541

	/* Continue transmitting the packet/record. We must be careful
	 * to cope with writespace callbacks arriving _after_ we have
542
	 * called sendmsg(). */
543 544
	while (1) {
		req->rq_xtime = jiffies;
545 546
		status = xs_sendpages(transport->sock,
					NULL, 0, xdr, req->rq_bytes_sent);
547

548
		dprintk("RPC:       xs_tcp_send_request(%u) = %d\n",
549
				xdr->len - req->rq_bytes_sent, status);
550

551
		if (unlikely(status < 0))
552 553
			break;

554 555 556
		/* If we've sent the entire packet, immediately
		 * reset the count of bytes sent. */
		req->rq_bytes_sent += status;
557
		task->tk_bytes_sent += status;
558 559 560 561
		if (likely(req->rq_bytes_sent >= req->rq_slen)) {
			req->rq_bytes_sent = 0;
			return 0;
		}
562 563

		status = -EAGAIN;
564
		if (retry++ > XS_SENDMSG_RETRY)
565 566 567
			break;
	}

568 569 570 571 572 573 574 575 576 577 578
	switch (status) {
	case -EAGAIN:
		xs_nospace(task);
		break;
	case -ECONNREFUSED:
	case -ECONNRESET:
	case -ENOTCONN:
	case -EPIPE:
		status = -ENOTCONN;
		break;
	default:
579
		dprintk("RPC:       sendmsg returned unrecognized error %d\n",
580
			-status);
581
		xprt_disconnect(xprt);
582
		break;
583
	}
584

585 586 587
	return status;
}

588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614
/**
 * xs_tcp_release_xprt - clean up after a tcp transmission
 * @xprt: transport
 * @task: rpc task
 *
 * This cleans up if an error causes us to abort the transmission of a request.
 * In this case, the socket may need to be reset in order to avoid confusing
 * the server.
 */
static void xs_tcp_release_xprt(struct rpc_xprt *xprt, struct rpc_task *task)
{
	struct rpc_rqst *req;

	if (task != xprt->snd_task)
		return;
	if (task == NULL)
		goto out_release;
	req = task->tk_rqstp;
	if (req->rq_bytes_sent == 0)
		goto out_release;
	if (req->rq_bytes_sent == req->rq_snd_buf.len)
		goto out_release;
	set_bit(XPRT_CLOSE_WAIT, &task->tk_xprt->state);
out_release:
	xprt_release_xprt(xprt, task);
}

615 616 617 618
/**
 * xs_close - close a socket
 * @xprt: transport
 *
619 620
 * This is used when all requests are complete; ie, no DRC state remains
 * on the server we want to save.
621
 */
622
static void xs_close(struct rpc_xprt *xprt)
623
{
624 625 626
	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
	struct socket *sock = transport->sock;
	struct sock *sk = transport->inet;
627 628

	if (!sk)
629
		goto clear_close_wait;
630

631
	dprintk("RPC:       xs_close xprt %p\n", xprt);
632

633
	write_lock_bh(&sk->sk_callback_lock);
634 635
	transport->inet = NULL;
	transport->sock = NULL;
636

637
	sk->sk_user_data = NULL;
638 639 640
	sk->sk_data_ready = transport->old_data_ready;
	sk->sk_state_change = transport->old_state_change;
	sk->sk_write_space = transport->old_write_space;
641 642
	write_unlock_bh(&sk->sk_callback_lock);

643
	sk->sk_no_check = 0;
644 645

	sock_release(sock);
646 647 648 649
clear_close_wait:
	smp_mb__before_clear_bit();
	clear_bit(XPRT_CLOSE_WAIT, &xprt->state);
	smp_mb__after_clear_bit();
650 651
}

652 653 654 655 656 657
/**
 * xs_destroy - prepare to shutdown a transport
 * @xprt: doomed transport
 *
 */
static void xs_destroy(struct rpc_xprt *xprt)
658
{
659 660
	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);

661
	dprintk("RPC:       xs_destroy xprt %p\n", xprt);
662

663
	cancel_rearming_delayed_work(&transport->connect_worker);
664 665

	xprt_disconnect(xprt);
666
	xs_close(xprt);
667
	xs_free_peer_addresses(xprt);
668
	kfree(xprt->slot);
669
	kfree(xprt);
670 671
}

672 673 674 675 676 677 678 679 680 681
static inline struct rpc_xprt *xprt_from_sock(struct sock *sk)
{
	return (struct rpc_xprt *) sk->sk_user_data;
}

/**
 * xs_udp_data_ready - "data ready" callback for UDP sockets
 * @sk: socket with data to read
 * @len: how much data to read
 *
682
 */
683
static void xs_udp_data_ready(struct sock *sk, int len)
684
{
685 686
	struct rpc_task *task;
	struct rpc_xprt *xprt;
687
	struct rpc_rqst *rovr;
688
	struct sk_buff *skb;
689
	int err, repsize, copied;
690 691
	u32 _xid;
	__be32 *xp;
692 693

	read_lock(&sk->sk_callback_lock);
694
	dprintk("RPC:       xs_udp_data_ready...\n");
695
	if (!(xprt = xprt_from_sock(sk)))
696 697 698 699 700 701 702 703 704 705
		goto out;

	if ((skb = skb_recv_datagram(sk, 0, 1, &err)) == NULL)
		goto out;

	if (xprt->shutdown)
		goto dropit;

	repsize = skb->len - sizeof(struct udphdr);
	if (repsize < 4) {
706
		dprintk("RPC:       impossible RPC reply size %d!\n", repsize);
707 708 709 710 711 712 713 714 715 716
		goto dropit;
	}

	/* Copy the XID from the skb... */
	xp = skb_header_pointer(skb, sizeof(struct udphdr),
				sizeof(_xid), &_xid);
	if (xp == NULL)
		goto dropit;

	/* Look up and lock the request corresponding to the given XID */
C
Chuck Lever 已提交
717
	spin_lock(&xprt->transport_lock);
718 719 720 721 722 723 724 725 726 727 728 729 730 731 732
	rovr = xprt_lookup_rqst(xprt, *xp);
	if (!rovr)
		goto out_unlock;
	task = rovr->rq_task;

	if ((copied = rovr->rq_private_buf.buflen) > repsize)
		copied = repsize;

	/* Suck it into the iovec, verify checksum if not done by hw. */
	if (csum_partial_copy_to_xdr(&rovr->rq_private_buf, skb))
		goto out_unlock;

	/* Something worked... */
	dst_confirm(skb->dst);

733 734 735
	xprt_adjust_cwnd(task, copied);
	xprt_update_rtt(task);
	xprt_complete_rqst(task, copied);
736 737

 out_unlock:
C
Chuck Lever 已提交
738
	spin_unlock(&xprt->transport_lock);
739 740 741 742 743 744
 dropit:
	skb_free_datagram(sk, skb);
 out:
	read_unlock(&sk->sk_callback_lock);
}

745
static inline void xs_tcp_read_fraghdr(struct rpc_xprt *xprt, struct xdr_skb_reader *desc)
746
{
747
	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
748 749 750
	size_t len, used;
	char *p;

751 752
	p = ((char *) &transport->tcp_fraghdr) + transport->tcp_offset;
	len = sizeof(transport->tcp_fraghdr) - transport->tcp_offset;
753
	used = xdr_skb_read_bits(desc, p, len);
754
	transport->tcp_offset += used;
755 756
	if (used != len)
		return;
757

758 759
	transport->tcp_reclen = ntohl(transport->tcp_fraghdr);
	if (transport->tcp_reclen & RPC_LAST_STREAM_FRAGMENT)
760
		transport->tcp_flags |= TCP_RCV_LAST_FRAG;
761
	else
762
		transport->tcp_flags &= ~TCP_RCV_LAST_FRAG;
763
	transport->tcp_reclen &= RPC_FRAGMENT_SIZE_MASK;
764

765
	transport->tcp_flags &= ~TCP_RCV_COPY_FRAGHDR;
766
	transport->tcp_offset = 0;
767

768
	/* Sanity check of the record length */
769
	if (unlikely(transport->tcp_reclen < 4)) {
770
		dprintk("RPC:       invalid TCP record fragment length\n");
771
		xprt_disconnect(xprt);
772
		return;
773
	}
774
	dprintk("RPC:       reading TCP record fragment of length %d\n",
775
			transport->tcp_reclen);
776 777
}

778
static void xs_tcp_check_fraghdr(struct sock_xprt *transport)
779
{
780
	if (transport->tcp_offset == transport->tcp_reclen) {
781
		transport->tcp_flags |= TCP_RCV_COPY_FRAGHDR;
782
		transport->tcp_offset = 0;
783 784 785
		if (transport->tcp_flags & TCP_RCV_LAST_FRAG) {
			transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
			transport->tcp_flags |= TCP_RCV_COPY_XID;
786
			transport->tcp_copied = 0;
787 788 789 790
		}
	}
}

791
static inline void xs_tcp_read_xid(struct sock_xprt *transport, struct xdr_skb_reader *desc)
792 793 794 795
{
	size_t len, used;
	char *p;

796
	len = sizeof(transport->tcp_xid) - transport->tcp_offset;
797
	dprintk("RPC:       reading XID (%Zu bytes)\n", len);
798
	p = ((char *) &transport->tcp_xid) + transport->tcp_offset;
799
	used = xdr_skb_read_bits(desc, p, len);
800
	transport->tcp_offset += used;
801 802
	if (used != len)
		return;
803 804
	transport->tcp_flags &= ~TCP_RCV_COPY_XID;
	transport->tcp_flags |= TCP_RCV_COPY_DATA;
805
	transport->tcp_copied = 4;
806
	dprintk("RPC:       reading reply for XID %08x\n",
807 808
			ntohl(transport->tcp_xid));
	xs_tcp_check_fraghdr(transport);
809 810
}

811
static inline void xs_tcp_read_request(struct rpc_xprt *xprt, struct xdr_skb_reader *desc)
812
{
813
	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
814 815 816 817 818 819
	struct rpc_rqst *req;
	struct xdr_buf *rcvbuf;
	size_t len;
	ssize_t r;

	/* Find and lock the request corresponding to this xid */
C
Chuck Lever 已提交
820
	spin_lock(&xprt->transport_lock);
821
	req = xprt_lookup_rqst(xprt, transport->tcp_xid);
822
	if (!req) {
823
		transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
824
		dprintk("RPC:       XID %08x request not found!\n",
825
				ntohl(transport->tcp_xid));
C
Chuck Lever 已提交
826
		spin_unlock(&xprt->transport_lock);
827 828 829 830 831
		return;
	}

	rcvbuf = &req->rq_private_buf;
	len = desc->count;
832
	if (len > transport->tcp_reclen - transport->tcp_offset) {
833
		struct xdr_skb_reader my_desc;
834

835
		len = transport->tcp_reclen - transport->tcp_offset;
836 837
		memcpy(&my_desc, desc, sizeof(my_desc));
		my_desc.count = len;
838
		r = xdr_partial_copy_from_skb(rcvbuf, transport->tcp_copied,
839
					  &my_desc, xdr_skb_read_bits);
840 841 842
		desc->count -= r;
		desc->offset += r;
	} else
843
		r = xdr_partial_copy_from_skb(rcvbuf, transport->tcp_copied,
844
					  desc, xdr_skb_read_bits);
845 846

	if (r > 0) {
847 848
		transport->tcp_copied += r;
		transport->tcp_offset += r;
849 850 851 852 853
	}
	if (r != len) {
		/* Error when copying to the receive buffer,
		 * usually because we weren't able to allocate
		 * additional buffer pages. All we can do now
854
		 * is turn off TCP_RCV_COPY_DATA, so the request
855 856 857 858 859
		 * will not receive any additional updates,
		 * and time out.
		 * Any remaining data from this record will
		 * be discarded.
		 */
860
		transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
861
		dprintk("RPC:       XID %08x truncated request\n",
862
				ntohl(transport->tcp_xid));
863 864 865 866
		dprintk("RPC:       xprt = %p, tcp_copied = %lu, "
				"tcp_offset = %u, tcp_reclen = %u\n",
				xprt, transport->tcp_copied,
				transport->tcp_offset, transport->tcp_reclen);
867 868 869
		goto out;
	}

870
	dprintk("RPC:       XID %08x read %Zd bytes\n",
871
			ntohl(transport->tcp_xid), r);
872 873 874
	dprintk("RPC:       xprt = %p, tcp_copied = %lu, tcp_offset = %u, "
			"tcp_reclen = %u\n", xprt, transport->tcp_copied,
			transport->tcp_offset, transport->tcp_reclen);
875 876

	if (transport->tcp_copied == req->rq_private_buf.buflen)
877
		transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
878
	else if (transport->tcp_offset == transport->tcp_reclen) {
879 880
		if (transport->tcp_flags & TCP_RCV_LAST_FRAG)
			transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
881 882 883
	}

out:
884
	if (!(transport->tcp_flags & TCP_RCV_COPY_DATA))
885
		xprt_complete_rqst(req->rq_task, transport->tcp_copied);
C
Chuck Lever 已提交
886
	spin_unlock(&xprt->transport_lock);
887
	xs_tcp_check_fraghdr(transport);
888 889
}

890
static inline void xs_tcp_read_discard(struct sock_xprt *transport, struct xdr_skb_reader *desc)
891 892 893
{
	size_t len;

894
	len = transport->tcp_reclen - transport->tcp_offset;
895 896 897 898
	if (len > desc->count)
		len = desc->count;
	desc->count -= len;
	desc->offset += len;
899
	transport->tcp_offset += len;
900
	dprintk("RPC:       discarded %Zu bytes\n", len);
901
	xs_tcp_check_fraghdr(transport);
902 903
}

904
static int xs_tcp_data_recv(read_descriptor_t *rd_desc, struct sk_buff *skb, unsigned int offset, size_t len)
905 906
{
	struct rpc_xprt *xprt = rd_desc->arg.data;
907
	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
908
	struct xdr_skb_reader desc = {
909 910 911
		.skb	= skb,
		.offset	= offset,
		.count	= len,
912
	};
913

914
	dprintk("RPC:       xs_tcp_data_recv started\n");
915 916 917
	do {
		/* Read in a new fragment marker if necessary */
		/* Can we ever really expect to get completely empty fragments? */
918
		if (transport->tcp_flags & TCP_RCV_COPY_FRAGHDR) {
919
			xs_tcp_read_fraghdr(xprt, &desc);
920 921 922
			continue;
		}
		/* Read in the xid if necessary */
923
		if (transport->tcp_flags & TCP_RCV_COPY_XID) {
924
			xs_tcp_read_xid(transport, &desc);
925 926 927
			continue;
		}
		/* Read in the request data */
928
		if (transport->tcp_flags & TCP_RCV_COPY_DATA) {
929
			xs_tcp_read_request(xprt, &desc);
930 931 932
			continue;
		}
		/* Skip over any trailing bytes on short reads */
933
		xs_tcp_read_discard(transport, &desc);
934
	} while (desc.count);
935
	dprintk("RPC:       xs_tcp_data_recv done\n");
936 937 938
	return len - desc.count;
}

939 940 941 942 943 944 945
/**
 * xs_tcp_data_ready - "data ready" callback for TCP sockets
 * @sk: socket with data to read
 * @bytes: how much data to read
 *
 */
static void xs_tcp_data_ready(struct sock *sk, int bytes)
946 947 948 949
{
	struct rpc_xprt *xprt;
	read_descriptor_t rd_desc;

950 951
	dprintk("RPC:       xs_tcp_data_ready...\n");

952
	read_lock(&sk->sk_callback_lock);
953
	if (!(xprt = xprt_from_sock(sk)))
954 955 956 957
		goto out;
	if (xprt->shutdown)
		goto out;

958
	/* We use rd_desc to pass struct xprt to xs_tcp_data_recv */
959 960
	rd_desc.arg.data = xprt;
	rd_desc.count = 65536;
961
	tcp_read_sock(sk, &rd_desc, xs_tcp_data_recv);
962 963 964 965
out:
	read_unlock(&sk->sk_callback_lock);
}

966 967 968 969 970 971
/**
 * xs_tcp_state_change - callback to handle TCP socket state changes
 * @sk: socket whose state has changed
 *
 */
static void xs_tcp_state_change(struct sock *sk)
972
{
973
	struct rpc_xprt *xprt;
974 975 976 977

	read_lock(&sk->sk_callback_lock);
	if (!(xprt = xprt_from_sock(sk)))
		goto out;
978 979 980 981 982
	dprintk("RPC:       xs_tcp_state_change client %p...\n", xprt);
	dprintk("RPC:       state %x conn %d dead %d zapped %d\n",
			sk->sk_state, xprt_connected(xprt),
			sock_flag(sk, SOCK_DEAD),
			sock_flag(sk, SOCK_ZAPPED));
983 984 985

	switch (sk->sk_state) {
	case TCP_ESTABLISHED:
C
Chuck Lever 已提交
986
		spin_lock_bh(&xprt->transport_lock);
987
		if (!xprt_test_and_set_connected(xprt)) {
988 989 990
			struct sock_xprt *transport = container_of(xprt,
					struct sock_xprt, xprt);

991
			/* Reset TCP record info */
992 993 994
			transport->tcp_offset = 0;
			transport->tcp_reclen = 0;
			transport->tcp_copied = 0;
995 996
			transport->tcp_flags =
				TCP_RCV_COPY_FRAGHDR | TCP_RCV_COPY_XID;
997

998
			xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO;
999
			xprt_wake_pending_tasks(xprt, 0);
1000
		}
C
Chuck Lever 已提交
1001
		spin_unlock_bh(&xprt->transport_lock);
1002 1003 1004 1005
		break;
	case TCP_SYN_SENT:
	case TCP_SYN_RECV:
		break;
1006 1007 1008 1009
	case TCP_CLOSE_WAIT:
		/* Try to schedule an autoclose RPC calls */
		set_bit(XPRT_CLOSE_WAIT, &xprt->state);
		if (test_and_set_bit(XPRT_LOCKED, &xprt->state) == 0)
1010
			queue_work(rpciod_workqueue, &xprt->task_cleanup);
1011 1012 1013 1014 1015 1016 1017
	default:
		xprt_disconnect(xprt);
	}
 out:
	read_unlock(&sk->sk_callback_lock);
}

1018
/**
1019 1020
 * xs_udp_write_space - callback invoked when socket buffer space
 *                             becomes available
1021 1022
 * @sk: socket whose state has changed
 *
1023 1024
 * Called when more output buffer space is available for this socket.
 * We try not to wake our writers until they can make "significant"
1025
 * progress, otherwise we'll waste resources thrashing kernel_sendmsg
1026 1027
 * with a bunch of small requests.
 */
1028
static void xs_udp_write_space(struct sock *sk)
1029 1030 1031
{
	read_lock(&sk->sk_callback_lock);

1032 1033 1034 1035 1036 1037
	/* from net/core/sock.c:sock_def_write_space */
	if (sock_writeable(sk)) {
		struct socket *sock;
		struct rpc_xprt *xprt;

		if (unlikely(!(sock = sk->sk_socket)))
1038
			goto out;
1039 1040 1041
		if (unlikely(!(xprt = xprt_from_sock(sk))))
			goto out;
		if (unlikely(!test_and_clear_bit(SOCK_NOSPACE, &sock->flags)))
1042
			goto out;
1043 1044

		xprt_write_space(xprt);
1045 1046
	}

1047 1048 1049
 out:
	read_unlock(&sk->sk_callback_lock);
}
1050

1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080
/**
 * xs_tcp_write_space - callback invoked when socket buffer space
 *                             becomes available
 * @sk: socket whose state has changed
 *
 * Called when more output buffer space is available for this socket.
 * We try not to wake our writers until they can make "significant"
 * progress, otherwise we'll waste resources thrashing kernel_sendmsg
 * with a bunch of small requests.
 */
static void xs_tcp_write_space(struct sock *sk)
{
	read_lock(&sk->sk_callback_lock);

	/* from net/core/stream.c:sk_stream_write_space */
	if (sk_stream_wspace(sk) >= sk_stream_min_wspace(sk)) {
		struct socket *sock;
		struct rpc_xprt *xprt;

		if (unlikely(!(sock = sk->sk_socket)))
			goto out;
		if (unlikely(!(xprt = xprt_from_sock(sk))))
			goto out;
		if (unlikely(!test_and_clear_bit(SOCK_NOSPACE, &sock->flags)))
			goto out;

		xprt_write_space(xprt);
	}

 out:
1081 1082 1083
	read_unlock(&sk->sk_callback_lock);
}

1084
static void xs_udp_do_set_buffer_size(struct rpc_xprt *xprt)
1085
{
1086 1087
	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
	struct sock *sk = transport->inet;
1088

1089
	if (transport->rcvsize) {
1090
		sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
1091
		sk->sk_rcvbuf = transport->rcvsize * xprt->max_reqs * 2;
1092
	}
1093
	if (transport->sndsize) {
1094
		sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
1095
		sk->sk_sndbuf = transport->sndsize * xprt->max_reqs * 2;
1096 1097 1098 1099
		sk->sk_write_space(sk);
	}
}

1100
/**
1101
 * xs_udp_set_buffer_size - set send and receive limits
1102
 * @xprt: generic transport
1103 1104
 * @sndsize: requested size of send buffer, in bytes
 * @rcvsize: requested size of receive buffer, in bytes
1105
 *
1106
 * Set socket send and receive buffer size limits.
1107
 */
1108
static void xs_udp_set_buffer_size(struct rpc_xprt *xprt, size_t sndsize, size_t rcvsize)
1109
{
1110 1111 1112
	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);

	transport->sndsize = 0;
1113
	if (sndsize)
1114 1115
		transport->sndsize = sndsize + 1024;
	transport->rcvsize = 0;
1116
	if (rcvsize)
1117
		transport->rcvsize = rcvsize + 1024;
1118 1119

	xs_udp_do_set_buffer_size(xprt);
1120 1121
}

1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132
/**
 * xs_udp_timer - called when a retransmit timeout occurs on a UDP transport
 * @task: task that timed out
 *
 * Adjust the congestion window after a retransmit timeout has occurred.
 */
static void xs_udp_timer(struct rpc_task *task)
{
	xprt_adjust_cwnd(task, -ETIMEDOUT);
}

1133 1134 1135 1136 1137 1138 1139
static unsigned short xs_get_random_port(void)
{
	unsigned short range = xprt_max_resvport - xprt_min_resvport;
	unsigned short rand = (unsigned short) net_random() % range;
	return rand + xprt_min_resvport;
}

1140 1141 1142 1143 1144 1145 1146 1147
/**
 * xs_set_port - reset the port number in the remote endpoint address
 * @xprt: generic transport
 * @port: new port number
 *
 */
static void xs_set_port(struct rpc_xprt *xprt, unsigned short port)
{
1148 1149
	struct sockaddr_in *sap = (struct sockaddr_in *) &xprt->addr;

1150
	dprintk("RPC:       setting port for xprt %p to %u\n", xprt, port);
1151 1152

	sap->sin_port = htons(port);
1153 1154
}

1155
static int xs_bind(struct sock_xprt *transport, struct socket *sock)
1156 1157 1158 1159
{
	struct sockaddr_in myaddr = {
		.sin_family = AF_INET,
	};
1160
	struct sockaddr_in *sa;
1161
	int err;
1162
	unsigned short port = transport->port;
1163

1164 1165 1166 1167
	if (!transport->xprt.resvport)
		port = 0;
	sa = (struct sockaddr_in *)&transport->addr;
	myaddr.sin_addr = sa->sin_addr;
1168 1169
	do {
		myaddr.sin_port = htons(port);
1170
		err = kernel_bind(sock, (struct sockaddr *) &myaddr,
1171
						sizeof(myaddr));
1172 1173
		if (!transport->xprt.resvport)
			break;
1174
		if (err == 0) {
1175
			transport->port = port;
1176
			break;
1177
		}
1178 1179 1180 1181
		if (port <= xprt_min_resvport)
			port = xprt_max_resvport;
		else
			port--;
1182
	} while (err == -EADDRINUSE && port != transport->port);
1183 1184
	dprintk("RPC:       xs_bind "NIPQUAD_FMT":%u: %s (%d)\n",
		NIPQUAD(myaddr.sin_addr), port, err ? "failed" : "ok", err);
1185 1186 1187
	return err;
}

1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216
#ifdef CONFIG_DEBUG_LOCK_ALLOC
static struct lock_class_key xs_key[2];
static struct lock_class_key xs_slock_key[2];

static inline void xs_reclassify_socket(struct socket *sock)
{
	struct sock *sk = sock->sk;
	BUG_ON(sk->sk_lock.owner != NULL);
	switch (sk->sk_family) {
	case AF_INET:
		sock_lock_init_class_and_name(sk, "slock-AF_INET-NFS",
			&xs_slock_key[0], "sk_lock-AF_INET-NFS", &xs_key[0]);
		break;

	case AF_INET6:
		sock_lock_init_class_and_name(sk, "slock-AF_INET6-NFS",
			&xs_slock_key[1], "sk_lock-AF_INET6-NFS", &xs_key[1]);
		break;

	default:
		BUG();
	}
}
#else
static inline void xs_reclassify_socket(struct socket *sock)
{
}
#endif

1217 1218
/**
 * xs_udp_connect_worker - set up a UDP socket
1219
 * @work: RPC transport to connect
1220 1221 1222
 *
 * Invoked by a work queue tasklet.
 */
1223
static void xs_udp_connect_worker(struct work_struct *work)
1224
{
1225 1226
	struct sock_xprt *transport =
		container_of(work, struct sock_xprt, connect_worker.work);
1227
	struct rpc_xprt *xprt = &transport->xprt;
1228
	struct socket *sock = transport->sock;
1229
	int err, status = -EIO;
1230

1231
	if (xprt->shutdown || !xprt_bound(xprt))
1232
		goto out;
1233

1234 1235
	/* Start by resetting any existing state */
	xs_close(xprt);
1236

1237
	if ((err = sock_create_kern(PF_INET, SOCK_DGRAM, IPPROTO_UDP, &sock)) < 0) {
1238
		dprintk("RPC:       can't create UDP transport socket (%d).\n", -err);
1239 1240
		goto out;
	}
1241
	xs_reclassify_socket(sock);
1242

1243
	if (xs_bind(transport, sock)) {
1244 1245 1246
		sock_release(sock);
		goto out;
	}
1247

1248
	dprintk("RPC:       worker connecting xprt %p to address: %s\n",
1249
			xprt, xprt->address_strings[RPC_DISPLAY_ALL]);
1250

1251
	if (!transport->inet) {
1252
		struct sock *sk = sock->sk;
1253

1254
		write_lock_bh(&sk->sk_callback_lock);
1255

1256
		sk->sk_user_data = xprt;
1257 1258 1259
		transport->old_data_ready = sk->sk_data_ready;
		transport->old_state_change = sk->sk_state_change;
		transport->old_write_space = sk->sk_write_space;
1260
		sk->sk_data_ready = xs_udp_data_ready;
1261
		sk->sk_write_space = xs_udp_write_space;
1262
		sk->sk_no_check = UDP_CSUM_NORCV;
1263
		sk->sk_allocation = GFP_ATOMIC;
1264

1265 1266
		xprt_set_connected(xprt);

1267
		/* Reset to new socket */
1268 1269
		transport->sock = sock;
		transport->inet = sk;
1270

1271 1272
		write_unlock_bh(&sk->sk_callback_lock);
	}
1273
	xs_udp_do_set_buffer_size(xprt);
1274 1275 1276 1277
	status = 0;
out:
	xprt_wake_pending_tasks(xprt, status);
	xprt_clear_connecting(xprt);
1278 1279
}

1280 1281 1282 1283 1284 1285 1286
/*
 * We need to preserve the port number so the reply cache on the server can
 * find our cached RPC replies when we get around to reconnecting.
 */
static void xs_tcp_reuse_connection(struct rpc_xprt *xprt)
{
	int result;
1287
	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
1288 1289
	struct sockaddr any;

1290
	dprintk("RPC:       disconnecting xprt %p to reuse port\n", xprt);
1291 1292 1293 1294 1295 1296 1297

	/*
	 * Disconnect the transport socket by doing a connect operation
	 * with AF_UNSPEC.  This should return immediately...
	 */
	memset(&any, 0, sizeof(any));
	any.sa_family = AF_UNSPEC;
1298
	result = kernel_connect(transport->sock, &any, sizeof(any), 0);
1299
	if (result)
1300
		dprintk("RPC:       AF_UNSPEC connect return code %d\n",
1301 1302 1303
				result);
}

1304
/**
1305
 * xs_tcp_connect_worker - connect a TCP socket to a remote endpoint
1306
 * @work: RPC transport to connect
1307 1308
 *
 * Invoked by a work queue tasklet.
1309
 */
1310
static void xs_tcp_connect_worker(struct work_struct *work)
1311
{
1312 1313
	struct sock_xprt *transport =
		container_of(work, struct sock_xprt, connect_worker.work);
1314
	struct rpc_xprt *xprt = &transport->xprt;
1315
	struct socket *sock = transport->sock;
1316
	int err, status = -EIO;
1317

1318
	if (xprt->shutdown || !xprt_bound(xprt))
1319 1320
		goto out;

1321
	if (!sock) {
1322 1323
		/* start from scratch */
		if ((err = sock_create_kern(PF_INET, SOCK_STREAM, IPPROTO_TCP, &sock)) < 0) {
1324 1325
			dprintk("RPC:       can't create TCP transport "
					"socket (%d).\n", -err);
1326 1327
			goto out;
		}
1328
		xs_reclassify_socket(sock);
1329

1330
		if (xs_bind(transport, sock)) {
1331 1332 1333 1334 1335 1336
			sock_release(sock);
			goto out;
		}
	} else
		/* "close" the socket, preserving the local port */
		xs_tcp_reuse_connection(xprt);
1337

1338
	dprintk("RPC:       worker connecting xprt %p to address: %s\n",
1339
			xprt, xprt->address_strings[RPC_DISPLAY_ALL]);
1340

1341
	if (!transport->inet) {
1342 1343 1344 1345 1346
		struct sock *sk = sock->sk;

		write_lock_bh(&sk->sk_callback_lock);

		sk->sk_user_data = xprt;
1347 1348 1349
		transport->old_data_ready = sk->sk_data_ready;
		transport->old_state_change = sk->sk_state_change;
		transport->old_write_space = sk->sk_write_space;
1350 1351 1352
		sk->sk_data_ready = xs_tcp_data_ready;
		sk->sk_state_change = xs_tcp_state_change;
		sk->sk_write_space = xs_tcp_write_space;
1353
		sk->sk_allocation = GFP_ATOMIC;
1354 1355 1356 1357 1358 1359

		/* socket options */
		sk->sk_userlocks |= SOCK_BINDPORT_LOCK;
		sock_reset_flag(sk, SOCK_LINGER);
		tcp_sk(sk)->linger2 = 0;
		tcp_sk(sk)->nonagle |= TCP_NAGLE_OFF;
1360 1361 1362 1363

		xprt_clear_connected(xprt);

		/* Reset to new socket */
1364 1365
		transport->sock = sock;
		transport->inet = sk;
1366 1367 1368 1369 1370

		write_unlock_bh(&sk->sk_callback_lock);
	}

	/* Tell the socket layer to start connecting... */
1371 1372
	xprt->stat.connect_count++;
	xprt->stat.connect_start = jiffies;
1373
	status = kernel_connect(sock, (struct sockaddr *) &xprt->addr,
1374
			xprt->addrlen, O_NONBLOCK);
1375 1376 1377
	dprintk("RPC:       %p connect status %d connected %d sock state %d\n",
			xprt, -status, xprt_connected(xprt),
			sock->sk->sk_state);
1378 1379 1380 1381 1382
	if (status < 0) {
		switch (status) {
			case -EINPROGRESS:
			case -EALREADY:
				goto out_clear;
1383 1384 1385 1386 1387 1388 1389 1390
			case -ECONNREFUSED:
			case -ECONNRESET:
				/* retry with existing socket, after a delay */
				break;
			default:
				/* get rid of existing socket, and retry */
				xs_close(xprt);
				break;
1391 1392 1393
		}
	}
out:
1394
	xprt_wake_pending_tasks(xprt, status);
1395
out_clear:
1396
	xprt_clear_connecting(xprt);
1397 1398
}

1399 1400 1401 1402 1403
/**
 * xs_connect - connect a socket to a remote endpoint
 * @task: address of RPC task that manages state of connect request
 *
 * TCP: If the remote end dropped the connection, delay reconnecting.
1404 1405 1406 1407 1408 1409 1410
 *
 * UDP socket connects are synchronous, but we use a work queue anyway
 * to guarantee that even unprivileged user processes can set up a
 * socket on a privileged port.
 *
 * If a UDP socket connect fails, the delay behavior here prevents
 * retry floods (hard mounts).
1411 1412
 */
static void xs_connect(struct rpc_task *task)
1413 1414
{
	struct rpc_xprt *xprt = task->tk_xprt;
1415
	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
1416

1417 1418 1419
	if (xprt_test_and_set_connecting(xprt))
		return;

1420
	if (transport->sock != NULL) {
1421 1422
		dprintk("RPC:       xs_connect delayed xprt %p for %lu "
				"seconds\n",
1423
				xprt, xprt->reestablish_timeout / HZ);
1424 1425 1426
		queue_delayed_work(rpciod_workqueue,
				   &transport->connect_worker,
				   xprt->reestablish_timeout);
1427 1428 1429
		xprt->reestablish_timeout <<= 1;
		if (xprt->reestablish_timeout > XS_TCP_MAX_REEST_TO)
			xprt->reestablish_timeout = XS_TCP_MAX_REEST_TO;
1430
	} else {
1431
		dprintk("RPC:       xs_connect scheduled xprt %p\n", xprt);
1432 1433
		queue_delayed_work(rpciod_workqueue,
				   &transport->connect_worker, 0);
1434 1435 1436
	}
}

1437 1438 1439 1440 1441 1442 1443 1444
/**
 * xs_udp_print_stats - display UDP socket-specifc stats
 * @xprt: rpc_xprt struct containing statistics
 * @seq: output file
 *
 */
static void xs_udp_print_stats(struct rpc_xprt *xprt, struct seq_file *seq)
{
1445 1446
	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);

1447
	seq_printf(seq, "\txprt:\tudp %u %lu %lu %lu %lu %Lu %Lu\n",
1448
			transport->port,
1449 1450 1451 1452 1453 1454 1455 1456 1457 1458 1459 1460 1461 1462 1463 1464
			xprt->stat.bind_count,
			xprt->stat.sends,
			xprt->stat.recvs,
			xprt->stat.bad_xids,
			xprt->stat.req_u,
			xprt->stat.bklog_u);
}

/**
 * xs_tcp_print_stats - display TCP socket-specifc stats
 * @xprt: rpc_xprt struct containing statistics
 * @seq: output file
 *
 */
static void xs_tcp_print_stats(struct rpc_xprt *xprt, struct seq_file *seq)
{
1465
	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
1466 1467 1468 1469 1470 1471
	long idle_time = 0;

	if (xprt_connected(xprt))
		idle_time = (long)(jiffies - xprt->last_used) / HZ;

	seq_printf(seq, "\txprt:\ttcp %u %lu %lu %lu %ld %lu %lu %lu %Lu %Lu\n",
1472
			transport->port,
1473 1474 1475 1476 1477 1478 1479 1480 1481 1482 1483
			xprt->stat.bind_count,
			xprt->stat.connect_count,
			xprt->stat.connect_time,
			idle_time,
			xprt->stat.sends,
			xprt->stat.recvs,
			xprt->stat.bad_xids,
			xprt->stat.req_u,
			xprt->stat.bklog_u);
}

1484
static struct rpc_xprt_ops xs_udp_ops = {
1485
	.set_buffer_size	= xs_udp_set_buffer_size,
1486
	.reserve_xprt		= xprt_reserve_xprt_cong,
1487
	.release_xprt		= xprt_release_xprt_cong,
1488
	.rpcbind		= rpcb_getport_async,
1489
	.set_port		= xs_set_port,
1490
	.connect		= xs_connect,
1491 1492
	.buf_alloc		= rpc_malloc,
	.buf_free		= rpc_free,
1493
	.send_request		= xs_udp_send_request,
1494
	.set_retrans_timeout	= xprt_set_retrans_timeout_rtt,
1495
	.timer			= xs_udp_timer,
1496
	.release_request	= xprt_release_rqst_cong,
1497 1498
	.close			= xs_close,
	.destroy		= xs_destroy,
1499
	.print_stats		= xs_udp_print_stats,
1500 1501 1502
};

static struct rpc_xprt_ops xs_tcp_ops = {
1503
	.reserve_xprt		= xprt_reserve_xprt,
1504
	.release_xprt		= xs_tcp_release_xprt,
1505
	.rpcbind		= rpcb_getport_async,
1506
	.set_port		= xs_set_port,
1507
	.connect		= xs_connect,
1508 1509
	.buf_alloc		= rpc_malloc,
	.buf_free		= rpc_free,
1510
	.send_request		= xs_tcp_send_request,
1511
	.set_retrans_timeout	= xprt_set_retrans_timeout_def,
1512 1513
	.close			= xs_close,
	.destroy		= xs_destroy,
1514
	.print_stats		= xs_tcp_print_stats,
1515 1516
};

1517
static struct rpc_xprt *xs_setup_xprt(struct rpc_xprtsock_create *args, unsigned int slot_table_size)
1518 1519
{
	struct rpc_xprt *xprt;
1520
	struct sock_xprt *new;
1521

1522
	if (args->addrlen > sizeof(xprt->addr)) {
1523
		dprintk("RPC:       xs_setup_xprt: address too large\n");
1524 1525 1526
		return ERR_PTR(-EBADF);
	}

1527 1528
	new = kzalloc(sizeof(*new), GFP_KERNEL);
	if (new == NULL) {
1529 1530
		dprintk("RPC:       xs_setup_xprt: couldn't allocate "
				"rpc_xprt\n");
1531 1532
		return ERR_PTR(-ENOMEM);
	}
1533
	xprt = &new->xprt;
1534 1535 1536 1537 1538

	xprt->max_reqs = slot_table_size;
	xprt->slot = kcalloc(xprt->max_reqs, sizeof(struct rpc_rqst), GFP_KERNEL);
	if (xprt->slot == NULL) {
		kfree(xprt);
1539 1540
		dprintk("RPC:       xs_setup_xprt: couldn't allocate slot "
				"table\n");
1541 1542 1543
		return ERR_PTR(-ENOMEM);
	}

1544 1545
	memcpy(&xprt->addr, args->dstaddr, args->addrlen);
	xprt->addrlen = args->addrlen;
1546 1547
	if (args->srcaddr)
		memcpy(&new->addr, args->srcaddr, args->addrlen);
1548
	new->port = xs_get_random_port();
1549 1550 1551 1552

	return xprt;
}

1553 1554
/**
 * xs_setup_udp - Set up transport to use a UDP socket
1555
 * @args: rpc transport creation arguments
1556 1557
 *
 */
1558
struct rpc_xprt *xs_setup_udp(struct rpc_xprtsock_create *args)
1559
{
1560
	struct rpc_xprt *xprt;
1561
	struct sock_xprt *transport;
1562

1563
	xprt = xs_setup_xprt(args, xprt_udp_slot_table_entries);
1564 1565
	if (IS_ERR(xprt))
		return xprt;
1566
	transport = container_of(xprt, struct sock_xprt, xprt);
1567

1568
	if (ntohs(((struct sockaddr_in *)args->dstaddr)->sin_port) != 0)
1569 1570 1571
		xprt_set_bound(xprt);

	xprt->prot = IPPROTO_UDP;
1572
	xprt->tsh_size = 0;
1573 1574 1575
	/* XXX: header size can vary due to auth type, IPv6, etc. */
	xprt->max_payload = (1U << 16) - (MAX_HEADER << 3);

1576
	INIT_DELAYED_WORK(&transport->connect_worker, xs_udp_connect_worker);
1577 1578 1579 1580
	xprt->bind_timeout = XS_BIND_TO;
	xprt->connect_timeout = XS_UDP_CONN_TO;
	xprt->reestablish_timeout = XS_UDP_REEST_TO;
	xprt->idle_timeout = XS_IDLE_DISC_TO;
1581

1582
	xprt->ops = &xs_udp_ops;
1583

1584 1585
	if (args->timeout)
		xprt->timeout = *args->timeout;
1586
	else
1587
		xprt_set_timeout(&xprt->timeout, 5, 5 * HZ);
1588

1589
	xs_format_peer_addresses(xprt);
1590
	dprintk("RPC:       set up transport to address %s\n",
1591
			xprt->address_strings[RPC_DISPLAY_ALL]);
1592

1593
	return xprt;
1594 1595
}

1596 1597
/**
 * xs_setup_tcp - Set up transport to use a TCP socket
1598
 * @args: rpc transport creation arguments
1599 1600
 *
 */
1601
struct rpc_xprt *xs_setup_tcp(struct rpc_xprtsock_create *args)
1602
{
1603
	struct rpc_xprt *xprt;
1604
	struct sock_xprt *transport;
1605

1606
	xprt = xs_setup_xprt(args, xprt_tcp_slot_table_entries);
1607 1608
	if (IS_ERR(xprt))
		return xprt;
1609
	transport = container_of(xprt, struct sock_xprt, xprt);
1610

1611
	if (ntohs(((struct sockaddr_in *)args->dstaddr)->sin_port) != 0)
1612 1613 1614
		xprt_set_bound(xprt);

	xprt->prot = IPPROTO_TCP;
1615 1616
	xprt->tsh_size = sizeof(rpc_fraghdr) / sizeof(u32);
	xprt->max_payload = RPC_MAX_FRAGMENT_SIZE;
1617

1618
	INIT_DELAYED_WORK(&transport->connect_worker, xs_tcp_connect_worker);
1619 1620 1621 1622
	xprt->bind_timeout = XS_BIND_TO;
	xprt->connect_timeout = XS_TCP_CONN_TO;
	xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO;
	xprt->idle_timeout = XS_IDLE_DISC_TO;
1623

1624
	xprt->ops = &xs_tcp_ops;
1625

1626 1627
	if (args->timeout)
		xprt->timeout = *args->timeout;
1628
	else
1629
		xprt_set_timeout(&xprt->timeout, 2, 60 * HZ);
1630

1631
	xs_format_peer_addresses(xprt);
1632
	dprintk("RPC:       set up transport to address %s\n",
1633
			xprt->address_strings[RPC_DISPLAY_ALL]);
1634

1635
	return xprt;
1636
}
1637 1638

/**
1639
 * init_socket_xprt - set up xprtsock's sysctls
1640 1641 1642 1643
 *
 */
int init_socket_xprt(void)
{
1644
#ifdef RPC_DEBUG
1645
	if (!sunrpc_table_header)
1646
		sunrpc_table_header = register_sysctl_table(sunrpc_table);
1647 1648
#endif

1649 1650 1651 1652
	return 0;
}

/**
1653
 * cleanup_socket_xprt - remove xprtsock's sysctls
1654 1655 1656 1657
 *
 */
void cleanup_socket_xprt(void)
{
1658 1659 1660 1661 1662 1663
#ifdef RPC_DEBUG
	if (sunrpc_table_header) {
		unregister_sysctl_table(sunrpc_table_header);
		sunrpc_table_header = NULL;
	}
#endif
1664
}