xprtsock.c 54.6 KB
Newer Older
1 2 3 4 5
/*
 * linux/net/sunrpc/xprtsock.c
 *
 * Client-side transport implementation for sockets.
 *
6 7
 * TCP callback races fixes (C) 1998 Red Hat
 * TCP send fixes (C) 1998 Red Hat
8 9 10 11 12 13
 * TCP NFS related read + write fixes
 *  (C) 1999 Dave Airlie, University of Limerick, Ireland <airlied@linux.ie>
 *
 * Rewrite of larges part of the code in order to stabilize TCP stuff.
 * Fix behaviour when socket buffer is full.
 *  (C) 1999 Trond Myklebust <trond.myklebust@fys.uio.no>
14 15
 *
 * IP socket transport implementation, (C) 2005 Chuck Lever <cel@netapp.com>
16 17 18
 *
 * IPv6 support contributed by Gilles Quillard, Bull Open Source, 2005.
 *   <gilles.quillard@bull.net>
19 20 21 22
 */

#include <linux/types.h>
#include <linux/slab.h>
23
#include <linux/module.h>
24 25 26 27 28 29 30 31 32 33
#include <linux/capability.h>
#include <linux/pagemap.h>
#include <linux/errno.h>
#include <linux/socket.h>
#include <linux/in.h>
#include <linux/net.h>
#include <linux/mm.h>
#include <linux/udp.h>
#include <linux/tcp.h>
#include <linux/sunrpc/clnt.h>
34
#include <linux/sunrpc/sched.h>
35
#include <linux/sunrpc/xprtsock.h>
36 37 38 39 40 41 42
#include <linux/file.h>

#include <net/sock.h>
#include <net/checksum.h>
#include <net/udp.h>
#include <net/tcp.h>

43 44 45 46 47 48 49 50 51
/*
 * xprtsock tunables
 */
unsigned int xprt_udp_slot_table_entries = RPC_DEF_SLOT_TABLE;
unsigned int xprt_tcp_slot_table_entries = RPC_DEF_SLOT_TABLE;

unsigned int xprt_min_resvport = RPC_DEF_MIN_RESVPORT;
unsigned int xprt_max_resvport = RPC_DEF_MAX_RESVPORT;

52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137
/*
 * We can register our own files under /proc/sys/sunrpc by
 * calling register_sysctl_table() again.  The files in that
 * directory become the union of all files registered there.
 *
 * We simply need to make sure that we don't collide with
 * someone else's file names!
 */

#ifdef RPC_DEBUG

static unsigned int min_slot_table_size = RPC_MIN_SLOT_TABLE;
static unsigned int max_slot_table_size = RPC_MAX_SLOT_TABLE;
static unsigned int xprt_min_resvport_limit = RPC_MIN_RESVPORT;
static unsigned int xprt_max_resvport_limit = RPC_MAX_RESVPORT;

static struct ctl_table_header *sunrpc_table_header;

/*
 * FIXME: changing the UDP slot table size should also resize the UDP
 *        socket buffers for existing UDP transports
 */
static ctl_table xs_tunables_table[] = {
	{
		.ctl_name	= CTL_SLOTTABLE_UDP,
		.procname	= "udp_slot_table_entries",
		.data		= &xprt_udp_slot_table_entries,
		.maxlen		= sizeof(unsigned int),
		.mode		= 0644,
		.proc_handler	= &proc_dointvec_minmax,
		.strategy	= &sysctl_intvec,
		.extra1		= &min_slot_table_size,
		.extra2		= &max_slot_table_size
	},
	{
		.ctl_name	= CTL_SLOTTABLE_TCP,
		.procname	= "tcp_slot_table_entries",
		.data		= &xprt_tcp_slot_table_entries,
		.maxlen		= sizeof(unsigned int),
		.mode		= 0644,
		.proc_handler	= &proc_dointvec_minmax,
		.strategy	= &sysctl_intvec,
		.extra1		= &min_slot_table_size,
		.extra2		= &max_slot_table_size
	},
	{
		.ctl_name	= CTL_MIN_RESVPORT,
		.procname	= "min_resvport",
		.data		= &xprt_min_resvport,
		.maxlen		= sizeof(unsigned int),
		.mode		= 0644,
		.proc_handler	= &proc_dointvec_minmax,
		.strategy	= &sysctl_intvec,
		.extra1		= &xprt_min_resvport_limit,
		.extra2		= &xprt_max_resvport_limit
	},
	{
		.ctl_name	= CTL_MAX_RESVPORT,
		.procname	= "max_resvport",
		.data		= &xprt_max_resvport,
		.maxlen		= sizeof(unsigned int),
		.mode		= 0644,
		.proc_handler	= &proc_dointvec_minmax,
		.strategy	= &sysctl_intvec,
		.extra1		= &xprt_min_resvport_limit,
		.extra2		= &xprt_max_resvport_limit
	},
	{
		.ctl_name = 0,
	},
};

static ctl_table sunrpc_table[] = {
	{
		.ctl_name	= CTL_SUNRPC,
		.procname	= "sunrpc",
		.mode		= 0555,
		.child		= xs_tunables_table
	},
	{
		.ctl_name = 0,
	},
};

#endif

138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181
/*
 * Time out for an RPC UDP socket connect.  UDP socket connects are
 * synchronous, but we set a timeout anyway in case of resource
 * exhaustion on the local host.
 */
#define XS_UDP_CONN_TO		(5U * HZ)

/*
 * Wait duration for an RPC TCP connection to be established.  Solaris
 * NFS over TCP uses 60 seconds, for example, which is in line with how
 * long a server takes to reboot.
 */
#define XS_TCP_CONN_TO		(60U * HZ)

/*
 * Wait duration for a reply from the RPC portmapper.
 */
#define XS_BIND_TO		(60U * HZ)

/*
 * Delay if a UDP socket connect error occurs.  This is most likely some
 * kind of resource problem on the local host.
 */
#define XS_UDP_REEST_TO		(2U * HZ)

/*
 * The reestablish timeout allows clients to delay for a bit before attempting
 * to reconnect to a server that just dropped our connection.
 *
 * We implement an exponential backoff when trying to reestablish a TCP
 * transport connection with the server.  Some servers like to drop a TCP
 * connection when they are overworked, so we start with a short timeout and
 * increase over time if the server is down or not responding.
 */
#define XS_TCP_INIT_REEST_TO	(3U * HZ)
#define XS_TCP_MAX_REEST_TO	(5U * 60 * HZ)

/*
 * TCP idle timeout; client drops the transport socket if it is idle
 * for this long.  Note that we also timeout UDP sockets to prevent
 * holding port numbers when there is no RPC traffic.
 */
#define XS_IDLE_DISC_TO		(5U * 60 * HZ)

182 183
#ifdef RPC_DEBUG
# undef  RPC_DEBUG_DATA
184
# define RPCDBG_FACILITY	RPCDBG_TRANS
185 186 187
#endif

#ifdef RPC_DEBUG_DATA
188
static void xs_pktdump(char *msg, u32 *packet, unsigned int count)
189
{
190 191
	u8 *buf = (u8 *) packet;
	int j;
192

193
	dprintk("RPC:       %s\n", msg);
194 195 196 197 198 199 200 201 202 203 204 205
	for (j = 0; j < count && j < 128; j += 4) {
		if (!(j & 31)) {
			if (j)
				dprintk("\n");
			dprintk("0x%04x ", j);
		}
		dprintk("%02x%02x%02x%02x ",
			buf[j], buf[j+1], buf[j+2], buf[j+3]);
	}
	dprintk("\n");
}
#else
206
static inline void xs_pktdump(char *msg, u32 *packet, unsigned int count)
207 208 209 210 211
{
	/* NOP */
}
#endif

212 213
struct sock_xprt {
	struct rpc_xprt		xprt;
214 215 216 217 218 219

	/*
	 * Network layer
	 */
	struct socket *		sock;
	struct sock *		inet;
220 221 222 223 224 225 226 227 228 229 230 231

	/*
	 * State of TCP reply receive
	 */
	__be32			tcp_fraghdr,
				tcp_xid;

	u32			tcp_offset,
				tcp_reclen;

	unsigned long		tcp_copied,
				tcp_flags;
232 233 234 235

	/*
	 * Connection of transports
	 */
236
	struct delayed_work	connect_worker;
237
	struct sockaddr_storage	addr;
238
	unsigned short		port;
239 240 241 242 243 244

	/*
	 * UDP socket buffer size parameters
	 */
	size_t			rcvsize,
				sndsize;
245 246 247 248 249 250 251

	/*
	 * Saved socket callback addresses
	 */
	void			(*old_data_ready)(struct sock *, int);
	void			(*old_state_change)(struct sock *);
	void			(*old_write_space)(struct sock *);
252
	void			(*old_error_report)(struct sock *);
253 254
};

255 256 257 258 259 260 261 262
/*
 * TCP receive state flags
 */
#define TCP_RCV_LAST_FRAG	(1UL << 0)
#define TCP_RCV_COPY_FRAGHDR	(1UL << 1)
#define TCP_RCV_COPY_XID	(1UL << 2)
#define TCP_RCV_COPY_DATA	(1UL << 3)

263 264 265 266 267 268
static inline struct sockaddr *xs_addr(struct rpc_xprt *xprt)
{
	return (struct sockaddr *) &xprt->addr;
}

static inline struct sockaddr_in *xs_addr_in(struct rpc_xprt *xprt)
269
{
270 271 272 273 274 275 276 277
	return (struct sockaddr_in *) &xprt->addr;
}

static inline struct sockaddr_in6 *xs_addr_in6(struct rpc_xprt *xprt)
{
	return (struct sockaddr_in6 *) &xprt->addr;
}

278 279 280
static void xs_format_ipv4_peer_addresses(struct rpc_xprt *xprt,
					  const char *protocol,
					  const char *netid)
281
{
282
	struct sockaddr_in *addr = xs_addr_in(xprt);
283 284 285 286
	char *buf;

	buf = kzalloc(20, GFP_KERNEL);
	if (buf) {
287
		snprintf(buf, 20, "%pI4", &addr->sin_addr.s_addr);
288 289 290 291 292 293 294 295 296 297
	}
	xprt->address_strings[RPC_DISPLAY_ADDR] = buf;

	buf = kzalloc(8, GFP_KERNEL);
	if (buf) {
		snprintf(buf, 8, "%u",
				ntohs(addr->sin_port));
	}
	xprt->address_strings[RPC_DISPLAY_PORT] = buf;

298
	xprt->address_strings[RPC_DISPLAY_PROTO] = protocol;
299 300 301

	buf = kzalloc(48, GFP_KERNEL);
	if (buf) {
H
Harvey Harrison 已提交
302 303
		snprintf(buf, 48, "addr=%pI4 port=%u proto=%s",
			&addr->sin_addr.s_addr,
304
			ntohs(addr->sin_port),
305
			protocol);
306 307
	}
	xprt->address_strings[RPC_DISPLAY_ALL] = buf;
308 309 310 311 312 313 314 315 316 317 318 319 320 321

	buf = kzalloc(10, GFP_KERNEL);
	if (buf) {
		snprintf(buf, 10, "%02x%02x%02x%02x",
				NIPQUAD(addr->sin_addr.s_addr));
	}
	xprt->address_strings[RPC_DISPLAY_HEX_ADDR] = buf;

	buf = kzalloc(8, GFP_KERNEL);
	if (buf) {
		snprintf(buf, 8, "%4hx",
				ntohs(addr->sin_port));
	}
	xprt->address_strings[RPC_DISPLAY_HEX_PORT] = buf;
322 323 324

	buf = kzalloc(30, GFP_KERNEL);
	if (buf) {
H
Harvey Harrison 已提交
325 326
		snprintf(buf, 30, "%pI4.%u.%u",
				&addr->sin_addr.s_addr,
327 328 329 330
				ntohs(addr->sin_port) >> 8,
				ntohs(addr->sin_port) & 0xff);
	}
	xprt->address_strings[RPC_DISPLAY_UNIVERSAL_ADDR] = buf;
331

332
	xprt->address_strings[RPC_DISPLAY_NETID] = netid;
333 334
}

335 336 337
static void xs_format_ipv6_peer_addresses(struct rpc_xprt *xprt,
					  const char *protocol,
					  const char *netid)
338
{
339
	struct sockaddr_in6 *addr = xs_addr_in6(xprt);
340 341 342 343
	char *buf;

	buf = kzalloc(40, GFP_KERNEL);
	if (buf) {
H
Harvey Harrison 已提交
344
		snprintf(buf, 40, "%pI6",&addr->sin6_addr);
345 346 347 348 349 350 351 352 353 354
	}
	xprt->address_strings[RPC_DISPLAY_ADDR] = buf;

	buf = kzalloc(8, GFP_KERNEL);
	if (buf) {
		snprintf(buf, 8, "%u",
				ntohs(addr->sin6_port));
	}
	xprt->address_strings[RPC_DISPLAY_PORT] = buf;

355
	xprt->address_strings[RPC_DISPLAY_PROTO] = protocol;
356 357 358

	buf = kzalloc(64, GFP_KERNEL);
	if (buf) {
H
Harvey Harrison 已提交
359
		snprintf(buf, 64, "addr=%pI6 port=%u proto=%s",
360
				&addr->sin6_addr,
361
				ntohs(addr->sin6_port),
362
				protocol);
363 364 365 366
	}
	xprt->address_strings[RPC_DISPLAY_ALL] = buf;

	buf = kzalloc(36, GFP_KERNEL);
367
	if (buf)
368
		snprintf(buf, 36, "%pi6", &addr->sin6_addr);
369

370 371 372 373 374 375 376 377
	xprt->address_strings[RPC_DISPLAY_HEX_ADDR] = buf;

	buf = kzalloc(8, GFP_KERNEL);
	if (buf) {
		snprintf(buf, 8, "%4hx",
				ntohs(addr->sin6_port));
	}
	xprt->address_strings[RPC_DISPLAY_HEX_PORT] = buf;
378 379 380

	buf = kzalloc(50, GFP_KERNEL);
	if (buf) {
H
Harvey Harrison 已提交
381
		snprintf(buf, 50, "%pI6.%u.%u",
382 383 384
			 &addr->sin6_addr,
			 ntohs(addr->sin6_port) >> 8,
			 ntohs(addr->sin6_port) & 0xff);
385 386
	}
	xprt->address_strings[RPC_DISPLAY_UNIVERSAL_ADDR] = buf;
387

388
	xprt->address_strings[RPC_DISPLAY_NETID] = netid;
389 390 391 392
}

static void xs_free_peer_addresses(struct rpc_xprt *xprt)
{
393 394 395 396 397 398 399 400 401 402
	unsigned int i;

	for (i = 0; i < RPC_DISPLAY_MAX; i++)
		switch (i) {
		case RPC_DISPLAY_PROTO:
		case RPC_DISPLAY_NETID:
			continue;
		default:
			kfree(xprt->address_strings[i]);
		}
403 404
}

405 406
#define XS_SENDMSG_FLAGS	(MSG_DONTWAIT | MSG_NOSIGNAL)

T
Trond Myklebust 已提交
407
static int xs_send_kvec(struct socket *sock, struct sockaddr *addr, int addrlen, struct kvec *vec, unsigned int base, int more)
408 409 410 411
{
	struct msghdr msg = {
		.msg_name	= addr,
		.msg_namelen	= addrlen,
T
Trond Myklebust 已提交
412 413 414 415 416
		.msg_flags	= XS_SENDMSG_FLAGS | (more ? MSG_MORE : 0),
	};
	struct kvec iov = {
		.iov_base	= vec->iov_base + base,
		.iov_len	= vec->iov_len - base,
417 418
	};

T
Trond Myklebust 已提交
419
	if (iov.iov_len != 0)
420 421 422 423
		return kernel_sendmsg(sock, &msg, &iov, 1, iov.iov_len);
	return kernel_sendmsg(sock, &msg, NULL, 0, 0);
}

T
Trond Myklebust 已提交
424
static int xs_send_pagedata(struct socket *sock, struct xdr_buf *xdr, unsigned int base, int more)
425
{
T
Trond Myklebust 已提交
426 427 428 429 430 431 432 433 434 435 436
	struct page **ppage;
	unsigned int remainder;
	int err, sent = 0;

	remainder = xdr->page_len - base;
	base += xdr->page_base;
	ppage = xdr->pages + (base >> PAGE_SHIFT);
	base &= ~PAGE_MASK;
	for(;;) {
		unsigned int len = min_t(unsigned int, PAGE_SIZE - base, remainder);
		int flags = XS_SENDMSG_FLAGS;
437

T
Trond Myklebust 已提交
438 439 440 441 442 443 444 445 446 447 448 449 450 451 452
		remainder -= len;
		if (remainder != 0 || more)
			flags |= MSG_MORE;
		err = sock->ops->sendpage(sock, *ppage, base, len, flags);
		if (remainder == 0 || err != len)
			break;
		sent += err;
		ppage++;
		base = 0;
	}
	if (sent == 0)
		return err;
	if (err > 0)
		sent += err;
	return sent;
453 454
}

455 456 457 458 459 460 461 462
/**
 * xs_sendpages - write pages directly to a socket
 * @sock: socket to send on
 * @addr: UDP only -- address of destination
 * @addrlen: UDP only -- length of destination address
 * @xdr: buffer containing this request
 * @base: starting position in the buffer
 *
463
 */
T
Trond Myklebust 已提交
464
static int xs_sendpages(struct socket *sock, struct sockaddr *addr, int addrlen, struct xdr_buf *xdr, unsigned int base)
465
{
T
Trond Myklebust 已提交
466 467
	unsigned int remainder = xdr->len - base;
	int err, sent = 0;
468

469
	if (unlikely(!sock))
470
		return -ENOTSOCK;
471 472

	clear_bit(SOCK_ASYNC_NOSPACE, &sock->flags);
T
Trond Myklebust 已提交
473 474 475 476
	if (base != 0) {
		addr = NULL;
		addrlen = 0;
	}
477

T
Trond Myklebust 已提交
478 479 480 481 482
	if (base < xdr->head[0].iov_len || addr != NULL) {
		unsigned int len = xdr->head[0].iov_len - base;
		remainder -= len;
		err = xs_send_kvec(sock, addr, addrlen, &xdr->head[0], base, remainder != 0);
		if (remainder == 0 || err != len)
483
			goto out;
T
Trond Myklebust 已提交
484
		sent += err;
485 486
		base = 0;
	} else
T
Trond Myklebust 已提交
487
		base -= xdr->head[0].iov_len;
488

T
Trond Myklebust 已提交
489 490 491 492 493
	if (base < xdr->page_len) {
		unsigned int len = xdr->page_len - base;
		remainder -= len;
		err = xs_send_pagedata(sock, xdr, base, remainder != 0);
		if (remainder == 0 || err != len)
494
			goto out;
T
Trond Myklebust 已提交
495
		sent += err;
496
		base = 0;
T
Trond Myklebust 已提交
497 498 499 500 501 502
	} else
		base -= xdr->page_len;

	if (base >= xdr->tail[0].iov_len)
		return sent;
	err = xs_send_kvec(sock, NULL, 0, &xdr->tail[0], base, 0);
503
out:
T
Trond Myklebust 已提交
504 505 506 507 508
	if (sent == 0)
		return err;
	if (err > 0)
		sent += err;
	return sent;
509 510
}

511 512 513 514 515 516 517 518
static void xs_nospace_callback(struct rpc_task *task)
{
	struct sock_xprt *transport = container_of(task->tk_rqstp->rq_xprt, struct sock_xprt, xprt);

	transport->inet->sk_write_pending--;
	clear_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags);
}

519
/**
520 521
 * xs_nospace - place task on wait queue if transmit was incomplete
 * @task: task to put to sleep
522
 *
523
 */
524
static void xs_nospace(struct rpc_task *task)
525
{
526 527
	struct rpc_rqst *req = task->tk_rqstp;
	struct rpc_xprt *xprt = req->rq_xprt;
528
	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
529

530
	dprintk("RPC: %5u xmit incomplete (%u left of %u)\n",
531 532 533
			task->tk_pid, req->rq_slen - req->rq_bytes_sent,
			req->rq_slen);

534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552
	/* Protect against races with write_space */
	spin_lock_bh(&xprt->transport_lock);

	/* Don't race with disconnect */
	if (xprt_connected(xprt)) {
		if (test_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags)) {
			/*
			 * Notify TCP that we're limited by the application
			 * window size
			 */
			set_bit(SOCK_NOSPACE, &transport->sock->flags);
			transport->inet->sk_write_pending++;
			/* ...and wait for more buffer space */
			xprt_wait_for_buffer_space(task, xs_nospace_callback);
		}
	} else {
		clear_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags);
		task->tk_status = -ENOTCONN;
	}
553

554
	spin_unlock_bh(&xprt->transport_lock);
555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571
}

/**
 * xs_udp_send_request - write an RPC request to a UDP socket
 * @task: address of RPC task that manages the state of an RPC request
 *
 * Return values:
 *        0:	The request has been sent
 *   EAGAIN:	The socket was blocked, please call again later to
 *		complete the request
 * ENOTCONN:	Caller needs to invoke connect logic then call again
 *    other:	Some other error occured, the request was not sent
 */
static int xs_udp_send_request(struct rpc_task *task)
{
	struct rpc_rqst *req = task->tk_rqstp;
	struct rpc_xprt *xprt = req->rq_xprt;
572
	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
573 574
	struct xdr_buf *xdr = &req->rq_snd_buf;
	int status;
575

576
	xs_pktdump("packet data:",
577 578 579
				req->rq_svec->iov_base,
				req->rq_svec->iov_len);

580 581
	if (!xprt_bound(xprt))
		return -ENOTCONN;
582
	status = xs_sendpages(transport->sock,
583
			      xs_addr(xprt),
584 585
			      xprt->addrlen, xdr,
			      req->rq_bytes_sent);
586

587
	dprintk("RPC:       xs_udp_send_request(%u) = %d\n",
588
			xdr->len - req->rq_bytes_sent, status);
589

590 591 592 593 594
	if (status >= 0) {
		task->tk_bytes_sent += status;
		if (status >= req->rq_slen)
			return 0;
		/* Still some bytes left; set up for a retry later. */
595
		status = -EAGAIN;
596
	}
597

598
	switch (status) {
599 600 601 602
	case -ENOTSOCK:
		status = -ENOTCONN;
		/* Should we call xs_close() here? */
		break;
603 604 605
	case -EAGAIN:
		xs_nospace(task);
		break;
606 607
	case -ENETUNREACH:
	case -EPIPE:
608 609
	case -ECONNREFUSED:
		/* When the server has died, an ICMP port unreachable message
610
		 * prompts ECONNREFUSED. */
611
		clear_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags);
612 613
		break;
	default:
614
		clear_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags);
615
		dprintk("RPC:       sendmsg returned unrecognized error %d\n",
616
			-status);
617
	}
618 619

	return status;
620 621
}

622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637
/**
 * xs_tcp_shutdown - gracefully shut down a TCP socket
 * @xprt: transport
 *
 * Initiates a graceful shutdown of the TCP socket by calling the
 * equivalent of shutdown(SHUT_WR);
 */
static void xs_tcp_shutdown(struct rpc_xprt *xprt)
{
	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
	struct socket *sock = transport->sock;

	if (sock != NULL)
		kernel_sock_shutdown(sock, SHUT_WR);
}

638 639 640 641 642 643 644
static inline void xs_encode_tcp_record_marker(struct xdr_buf *buf)
{
	u32 reclen = buf->len - sizeof(rpc_fraghdr);
	rpc_fraghdr *base = buf->head[0].iov_base;
	*base = htonl(RPC_LAST_STREAM_FRAGMENT | reclen);
}

645
/**
646
 * xs_tcp_send_request - write an RPC request to a TCP socket
647 648 649
 * @task: address of RPC task that manages the state of an RPC request
 *
 * Return values:
650 651 652 653 654
 *        0:	The request has been sent
 *   EAGAIN:	The socket was blocked, please call again later to
 *		complete the request
 * ENOTCONN:	Caller needs to invoke connect logic then call again
 *    other:	Some other error occured, the request was not sent
655 656
 *
 * XXX: In the case of soft timeouts, should we eventually give up
657
 *	if sendmsg is not able to make progress?
658
 */
659
static int xs_tcp_send_request(struct rpc_task *task)
660 661 662
{
	struct rpc_rqst *req = task->tk_rqstp;
	struct rpc_xprt *xprt = req->rq_xprt;
663
	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
664
	struct xdr_buf *xdr = &req->rq_snd_buf;
665
	int status;
666

667
	xs_encode_tcp_record_marker(&req->rq_snd_buf);
668

669 670 671
	xs_pktdump("packet data:",
				req->rq_svec->iov_base,
				req->rq_svec->iov_len);
672 673 674

	/* Continue transmitting the packet/record. We must be careful
	 * to cope with writespace callbacks arriving _after_ we have
675
	 * called sendmsg(). */
676
	while (1) {
677 678
		status = xs_sendpages(transport->sock,
					NULL, 0, xdr, req->rq_bytes_sent);
679

680
		dprintk("RPC:       xs_tcp_send_request(%u) = %d\n",
681
				xdr->len - req->rq_bytes_sent, status);
682

683
		if (unlikely(status < 0))
684 685
			break;

686 687 688
		/* If we've sent the entire packet, immediately
		 * reset the count of bytes sent. */
		req->rq_bytes_sent += status;
689
		task->tk_bytes_sent += status;
690 691 692 693
		if (likely(req->rq_bytes_sent >= req->rq_slen)) {
			req->rq_bytes_sent = 0;
			return 0;
		}
694

695 696
		if (status != 0)
			continue;
697
		status = -EAGAIN;
698
		break;
699 700
	}

701
	switch (status) {
702 703 704 705
	case -ENOTSOCK:
		status = -ENOTCONN;
		/* Should we call xs_close() here? */
		break;
706 707 708 709
	case -EAGAIN:
		xs_nospace(task);
		break;
	case -ECONNRESET:
710 711
		xs_tcp_shutdown(xprt);
	case -ECONNREFUSED:
712 713 714
	case -ENOTCONN:
	case -EPIPE:
		status = -ENOTCONN;
715
		clear_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags);
716 717
		break;
	default:
718
		dprintk("RPC:       sendmsg returned unrecognized error %d\n",
719
			-status);
720
		clear_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags);
721
		xs_tcp_shutdown(xprt);
722
	}
723

724 725 726
	return status;
}

727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753
/**
 * xs_tcp_release_xprt - clean up after a tcp transmission
 * @xprt: transport
 * @task: rpc task
 *
 * This cleans up if an error causes us to abort the transmission of a request.
 * In this case, the socket may need to be reset in order to avoid confusing
 * the server.
 */
static void xs_tcp_release_xprt(struct rpc_xprt *xprt, struct rpc_task *task)
{
	struct rpc_rqst *req;

	if (task != xprt->snd_task)
		return;
	if (task == NULL)
		goto out_release;
	req = task->tk_rqstp;
	if (req->rq_bytes_sent == 0)
		goto out_release;
	if (req->rq_bytes_sent == req->rq_snd_buf.len)
		goto out_release;
	set_bit(XPRT_CLOSE_WAIT, &task->tk_xprt->state);
out_release:
	xprt_release_xprt(xprt, task);
}

754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769
static void xs_save_old_callbacks(struct sock_xprt *transport, struct sock *sk)
{
	transport->old_data_ready = sk->sk_data_ready;
	transport->old_state_change = sk->sk_state_change;
	transport->old_write_space = sk->sk_write_space;
	transport->old_error_report = sk->sk_error_report;
}

static void xs_restore_old_callbacks(struct sock_xprt *transport, struct sock *sk)
{
	sk->sk_data_ready = transport->old_data_ready;
	sk->sk_state_change = transport->old_state_change;
	sk->sk_write_space = transport->old_write_space;
	sk->sk_error_report = transport->old_error_report;
}

770 771 772 773
/**
 * xs_close - close a socket
 * @xprt: transport
 *
774 775
 * This is used when all requests are complete; ie, no DRC state remains
 * on the server we want to save.
776
 */
777
static void xs_close(struct rpc_xprt *xprt)
778
{
779 780 781
	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
	struct socket *sock = transport->sock;
	struct sock *sk = transport->inet;
782 783

	if (!sk)
784
		goto clear_close_wait;
785

786
	dprintk("RPC:       xs_close xprt %p\n", xprt);
787

788
	write_lock_bh(&sk->sk_callback_lock);
789 790
	transport->inet = NULL;
	transport->sock = NULL;
791

792
	sk->sk_user_data = NULL;
793 794

	xs_restore_old_callbacks(transport, sk);
795 796
	write_unlock_bh(&sk->sk_callback_lock);

797
	sk->sk_no_check = 0;
798 799

	sock_release(sock);
800 801 802
clear_close_wait:
	smp_mb__before_clear_bit();
	clear_bit(XPRT_CLOSE_WAIT, &xprt->state);
803
	clear_bit(XPRT_CLOSING, &xprt->state);
804
	smp_mb__after_clear_bit();
805
	xprt_disconnect_done(xprt);
806 807
}

808 809 810 811 812 813
/**
 * xs_destroy - prepare to shutdown a transport
 * @xprt: doomed transport
 *
 */
static void xs_destroy(struct rpc_xprt *xprt)
814
{
815 816
	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);

817
	dprintk("RPC:       xs_destroy xprt %p\n", xprt);
818

819
	cancel_rearming_delayed_work(&transport->connect_worker);
820

821
	xs_close(xprt);
822
	xs_free_peer_addresses(xprt);
823
	kfree(xprt->slot);
824
	kfree(xprt);
825
	module_put(THIS_MODULE);
826 827
}

828 829 830 831 832 833 834 835 836 837
static inline struct rpc_xprt *xprt_from_sock(struct sock *sk)
{
	return (struct rpc_xprt *) sk->sk_user_data;
}

/**
 * xs_udp_data_ready - "data ready" callback for UDP sockets
 * @sk: socket with data to read
 * @len: how much data to read
 *
838
 */
839
static void xs_udp_data_ready(struct sock *sk, int len)
840
{
841 842
	struct rpc_task *task;
	struct rpc_xprt *xprt;
843
	struct rpc_rqst *rovr;
844
	struct sk_buff *skb;
845
	int err, repsize, copied;
846 847
	u32 _xid;
	__be32 *xp;
848 849

	read_lock(&sk->sk_callback_lock);
850
	dprintk("RPC:       xs_udp_data_ready...\n");
851
	if (!(xprt = xprt_from_sock(sk)))
852 853 854 855 856 857 858 859 860 861
		goto out;

	if ((skb = skb_recv_datagram(sk, 0, 1, &err)) == NULL)
		goto out;

	if (xprt->shutdown)
		goto dropit;

	repsize = skb->len - sizeof(struct udphdr);
	if (repsize < 4) {
862
		dprintk("RPC:       impossible RPC reply size %d!\n", repsize);
863 864 865 866 867 868 869 870 871 872
		goto dropit;
	}

	/* Copy the XID from the skb... */
	xp = skb_header_pointer(skb, sizeof(struct udphdr),
				sizeof(_xid), &_xid);
	if (xp == NULL)
		goto dropit;

	/* Look up and lock the request corresponding to the given XID */
C
Chuck Lever 已提交
873
	spin_lock(&xprt->transport_lock);
874 875 876 877 878 879 880 881 882
	rovr = xprt_lookup_rqst(xprt, *xp);
	if (!rovr)
		goto out_unlock;
	task = rovr->rq_task;

	if ((copied = rovr->rq_private_buf.buflen) > repsize)
		copied = repsize;

	/* Suck it into the iovec, verify checksum if not done by hw. */
883 884
	if (csum_partial_copy_to_xdr(&rovr->rq_private_buf, skb)) {
		UDPX_INC_STATS_BH(sk, UDP_MIB_INERRORS);
885
		goto out_unlock;
886 887 888
	}

	UDPX_INC_STATS_BH(sk, UDP_MIB_INDATAGRAMS);
889 890 891 892

	/* Something worked... */
	dst_confirm(skb->dst);

893 894 895
	xprt_adjust_cwnd(task, copied);
	xprt_update_rtt(task);
	xprt_complete_rqst(task, copied);
896 897

 out_unlock:
C
Chuck Lever 已提交
898
	spin_unlock(&xprt->transport_lock);
899 900 901 902 903 904
 dropit:
	skb_free_datagram(sk, skb);
 out:
	read_unlock(&sk->sk_callback_lock);
}

905
static inline void xs_tcp_read_fraghdr(struct rpc_xprt *xprt, struct xdr_skb_reader *desc)
906
{
907
	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
908 909 910
	size_t len, used;
	char *p;

911 912
	p = ((char *) &transport->tcp_fraghdr) + transport->tcp_offset;
	len = sizeof(transport->tcp_fraghdr) - transport->tcp_offset;
913
	used = xdr_skb_read_bits(desc, p, len);
914
	transport->tcp_offset += used;
915 916
	if (used != len)
		return;
917

918 919
	transport->tcp_reclen = ntohl(transport->tcp_fraghdr);
	if (transport->tcp_reclen & RPC_LAST_STREAM_FRAGMENT)
920
		transport->tcp_flags |= TCP_RCV_LAST_FRAG;
921
	else
922
		transport->tcp_flags &= ~TCP_RCV_LAST_FRAG;
923
	transport->tcp_reclen &= RPC_FRAGMENT_SIZE_MASK;
924

925
	transport->tcp_flags &= ~TCP_RCV_COPY_FRAGHDR;
926
	transport->tcp_offset = 0;
927

928
	/* Sanity check of the record length */
929
	if (unlikely(transport->tcp_reclen < 4)) {
930
		dprintk("RPC:       invalid TCP record fragment length\n");
931
		xprt_force_disconnect(xprt);
932
		return;
933
	}
934
	dprintk("RPC:       reading TCP record fragment of length %d\n",
935
			transport->tcp_reclen);
936 937
}

938
static void xs_tcp_check_fraghdr(struct sock_xprt *transport)
939
{
940
	if (transport->tcp_offset == transport->tcp_reclen) {
941
		transport->tcp_flags |= TCP_RCV_COPY_FRAGHDR;
942
		transport->tcp_offset = 0;
943 944 945
		if (transport->tcp_flags & TCP_RCV_LAST_FRAG) {
			transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
			transport->tcp_flags |= TCP_RCV_COPY_XID;
946
			transport->tcp_copied = 0;
947 948 949 950
		}
	}
}

951
static inline void xs_tcp_read_xid(struct sock_xprt *transport, struct xdr_skb_reader *desc)
952 953 954 955
{
	size_t len, used;
	char *p;

956
	len = sizeof(transport->tcp_xid) - transport->tcp_offset;
957
	dprintk("RPC:       reading XID (%Zu bytes)\n", len);
958
	p = ((char *) &transport->tcp_xid) + transport->tcp_offset;
959
	used = xdr_skb_read_bits(desc, p, len);
960
	transport->tcp_offset += used;
961 962
	if (used != len)
		return;
963 964
	transport->tcp_flags &= ~TCP_RCV_COPY_XID;
	transport->tcp_flags |= TCP_RCV_COPY_DATA;
965
	transport->tcp_copied = 4;
966
	dprintk("RPC:       reading reply for XID %08x\n",
967 968
			ntohl(transport->tcp_xid));
	xs_tcp_check_fraghdr(transport);
969 970
}

971
static inline void xs_tcp_read_request(struct rpc_xprt *xprt, struct xdr_skb_reader *desc)
972
{
973
	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
974 975 976 977 978 979
	struct rpc_rqst *req;
	struct xdr_buf *rcvbuf;
	size_t len;
	ssize_t r;

	/* Find and lock the request corresponding to this xid */
C
Chuck Lever 已提交
980
	spin_lock(&xprt->transport_lock);
981
	req = xprt_lookup_rqst(xprt, transport->tcp_xid);
982
	if (!req) {
983
		transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
984
		dprintk("RPC:       XID %08x request not found!\n",
985
				ntohl(transport->tcp_xid));
C
Chuck Lever 已提交
986
		spin_unlock(&xprt->transport_lock);
987 988 989 990 991
		return;
	}

	rcvbuf = &req->rq_private_buf;
	len = desc->count;
992
	if (len > transport->tcp_reclen - transport->tcp_offset) {
993
		struct xdr_skb_reader my_desc;
994

995
		len = transport->tcp_reclen - transport->tcp_offset;
996 997
		memcpy(&my_desc, desc, sizeof(my_desc));
		my_desc.count = len;
998
		r = xdr_partial_copy_from_skb(rcvbuf, transport->tcp_copied,
999
					  &my_desc, xdr_skb_read_bits);
1000 1001 1002
		desc->count -= r;
		desc->offset += r;
	} else
1003
		r = xdr_partial_copy_from_skb(rcvbuf, transport->tcp_copied,
1004
					  desc, xdr_skb_read_bits);
1005 1006

	if (r > 0) {
1007 1008
		transport->tcp_copied += r;
		transport->tcp_offset += r;
1009 1010 1011 1012 1013
	}
	if (r != len) {
		/* Error when copying to the receive buffer,
		 * usually because we weren't able to allocate
		 * additional buffer pages. All we can do now
1014
		 * is turn off TCP_RCV_COPY_DATA, so the request
1015 1016 1017 1018 1019
		 * will not receive any additional updates,
		 * and time out.
		 * Any remaining data from this record will
		 * be discarded.
		 */
1020
		transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
1021
		dprintk("RPC:       XID %08x truncated request\n",
1022
				ntohl(transport->tcp_xid));
1023 1024 1025 1026
		dprintk("RPC:       xprt = %p, tcp_copied = %lu, "
				"tcp_offset = %u, tcp_reclen = %u\n",
				xprt, transport->tcp_copied,
				transport->tcp_offset, transport->tcp_reclen);
1027 1028 1029
		goto out;
	}

1030
	dprintk("RPC:       XID %08x read %Zd bytes\n",
1031
			ntohl(transport->tcp_xid), r);
1032 1033 1034
	dprintk("RPC:       xprt = %p, tcp_copied = %lu, tcp_offset = %u, "
			"tcp_reclen = %u\n", xprt, transport->tcp_copied,
			transport->tcp_offset, transport->tcp_reclen);
1035 1036

	if (transport->tcp_copied == req->rq_private_buf.buflen)
1037
		transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
1038
	else if (transport->tcp_offset == transport->tcp_reclen) {
1039 1040
		if (transport->tcp_flags & TCP_RCV_LAST_FRAG)
			transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
1041 1042 1043
	}

out:
1044
	if (!(transport->tcp_flags & TCP_RCV_COPY_DATA))
1045
		xprt_complete_rqst(req->rq_task, transport->tcp_copied);
C
Chuck Lever 已提交
1046
	spin_unlock(&xprt->transport_lock);
1047
	xs_tcp_check_fraghdr(transport);
1048 1049
}

1050
static inline void xs_tcp_read_discard(struct sock_xprt *transport, struct xdr_skb_reader *desc)
1051 1052 1053
{
	size_t len;

1054
	len = transport->tcp_reclen - transport->tcp_offset;
1055 1056 1057 1058
	if (len > desc->count)
		len = desc->count;
	desc->count -= len;
	desc->offset += len;
1059
	transport->tcp_offset += len;
1060
	dprintk("RPC:       discarded %Zu bytes\n", len);
1061
	xs_tcp_check_fraghdr(transport);
1062 1063
}

1064
static int xs_tcp_data_recv(read_descriptor_t *rd_desc, struct sk_buff *skb, unsigned int offset, size_t len)
1065 1066
{
	struct rpc_xprt *xprt = rd_desc->arg.data;
1067
	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
1068
	struct xdr_skb_reader desc = {
1069 1070 1071
		.skb	= skb,
		.offset	= offset,
		.count	= len,
1072
	};
1073

1074
	dprintk("RPC:       xs_tcp_data_recv started\n");
1075 1076 1077
	do {
		/* Read in a new fragment marker if necessary */
		/* Can we ever really expect to get completely empty fragments? */
1078
		if (transport->tcp_flags & TCP_RCV_COPY_FRAGHDR) {
1079
			xs_tcp_read_fraghdr(xprt, &desc);
1080 1081 1082
			continue;
		}
		/* Read in the xid if necessary */
1083
		if (transport->tcp_flags & TCP_RCV_COPY_XID) {
1084
			xs_tcp_read_xid(transport, &desc);
1085 1086 1087
			continue;
		}
		/* Read in the request data */
1088
		if (transport->tcp_flags & TCP_RCV_COPY_DATA) {
1089
			xs_tcp_read_request(xprt, &desc);
1090 1091 1092
			continue;
		}
		/* Skip over any trailing bytes on short reads */
1093
		xs_tcp_read_discard(transport, &desc);
1094
	} while (desc.count);
1095
	dprintk("RPC:       xs_tcp_data_recv done\n");
1096 1097 1098
	return len - desc.count;
}

1099 1100 1101 1102 1103 1104 1105
/**
 * xs_tcp_data_ready - "data ready" callback for TCP sockets
 * @sk: socket with data to read
 * @bytes: how much data to read
 *
 */
static void xs_tcp_data_ready(struct sock *sk, int bytes)
1106 1107 1108
{
	struct rpc_xprt *xprt;
	read_descriptor_t rd_desc;
1109
	int read;
1110

1111 1112
	dprintk("RPC:       xs_tcp_data_ready...\n");

1113
	read_lock(&sk->sk_callback_lock);
1114
	if (!(xprt = xprt_from_sock(sk)))
1115 1116 1117 1118
		goto out;
	if (xprt->shutdown)
		goto out;

1119
	/* We use rd_desc to pass struct xprt to xs_tcp_data_recv */
1120
	rd_desc.arg.data = xprt;
1121 1122 1123 1124
	do {
		rd_desc.count = 65536;
		read = tcp_read_sock(sk, &rd_desc, xs_tcp_data_recv);
	} while (read > 0);
1125 1126 1127 1128
out:
	read_unlock(&sk->sk_callback_lock);
}

1129 1130 1131 1132 1133 1134
/**
 * xs_tcp_state_change - callback to handle TCP socket state changes
 * @sk: socket whose state has changed
 *
 */
static void xs_tcp_state_change(struct sock *sk)
1135
{
1136
	struct rpc_xprt *xprt;
1137 1138 1139 1140

	read_lock(&sk->sk_callback_lock);
	if (!(xprt = xprt_from_sock(sk)))
		goto out;
1141 1142 1143 1144 1145
	dprintk("RPC:       xs_tcp_state_change client %p...\n", xprt);
	dprintk("RPC:       state %x conn %d dead %d zapped %d\n",
			sk->sk_state, xprt_connected(xprt),
			sock_flag(sk, SOCK_DEAD),
			sock_flag(sk, SOCK_ZAPPED));
1146 1147 1148

	switch (sk->sk_state) {
	case TCP_ESTABLISHED:
C
Chuck Lever 已提交
1149
		spin_lock_bh(&xprt->transport_lock);
1150
		if (!xprt_test_and_set_connected(xprt)) {
1151 1152 1153
			struct sock_xprt *transport = container_of(xprt,
					struct sock_xprt, xprt);

1154
			/* Reset TCP record info */
1155 1156 1157
			transport->tcp_offset = 0;
			transport->tcp_reclen = 0;
			transport->tcp_copied = 0;
1158 1159
			transport->tcp_flags =
				TCP_RCV_COPY_FRAGHDR | TCP_RCV_COPY_XID;
1160

1161
			xprt_wake_pending_tasks(xprt, 0);
1162
		}
C
Chuck Lever 已提交
1163
		spin_unlock_bh(&xprt->transport_lock);
1164
		break;
1165 1166
	case TCP_FIN_WAIT1:
		/* The client initiated a shutdown of the socket */
1167
		xprt->connect_cookie++;
1168
		xprt->reestablish_timeout = 0;
1169 1170 1171
		set_bit(XPRT_CLOSING, &xprt->state);
		smp_mb__before_clear_bit();
		clear_bit(XPRT_CONNECTED, &xprt->state);
1172
		clear_bit(XPRT_CLOSE_WAIT, &xprt->state);
1173
		smp_mb__after_clear_bit();
1174
		break;
1175
	case TCP_CLOSE_WAIT:
1176 1177
		/* The server initiated a shutdown of the socket */
		set_bit(XPRT_CLOSING, &xprt->state);
1178
		xprt_force_disconnect(xprt);
1179
	case TCP_SYN_SENT:
1180
		xprt->connect_cookie++;
1181 1182 1183 1184 1185 1186 1187
	case TCP_CLOSING:
		/*
		 * If the server closed down the connection, make sure that
		 * we back off before reconnecting
		 */
		if (xprt->reestablish_timeout < XS_TCP_INIT_REEST_TO)
			xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO;
1188 1189 1190 1191 1192 1193 1194 1195
		break;
	case TCP_LAST_ACK:
		smp_mb__before_clear_bit();
		clear_bit(XPRT_CONNECTED, &xprt->state);
		smp_mb__after_clear_bit();
		break;
	case TCP_CLOSE:
		smp_mb__before_clear_bit();
1196
		clear_bit(XPRT_CLOSE_WAIT, &xprt->state);
1197 1198 1199
		clear_bit(XPRT_CLOSING, &xprt->state);
		smp_mb__after_clear_bit();
		/* Mark transport as closed and wake up all pending tasks */
1200
		xprt_disconnect_done(xprt);
1201 1202 1203 1204 1205
	}
 out:
	read_unlock(&sk->sk_callback_lock);
}

1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227
/**
 * xs_tcp_error_report - callback mainly for catching RST events
 * @sk: socket
 */
static void xs_tcp_error_report(struct sock *sk)
{
	struct rpc_xprt *xprt;

	read_lock(&sk->sk_callback_lock);
	if (sk->sk_err != ECONNRESET || sk->sk_state != TCP_ESTABLISHED)
		goto out;
	if (!(xprt = xprt_from_sock(sk)))
		goto out;
	dprintk("RPC:       %s client %p...\n"
			"RPC:       error %d\n",
			__func__, xprt, sk->sk_err);

	xprt_force_disconnect(xprt);
out:
	read_unlock(&sk->sk_callback_lock);
}

1228
/**
1229 1230
 * xs_udp_write_space - callback invoked when socket buffer space
 *                             becomes available
1231 1232
 * @sk: socket whose state has changed
 *
1233 1234
 * Called when more output buffer space is available for this socket.
 * We try not to wake our writers until they can make "significant"
1235
 * progress, otherwise we'll waste resources thrashing kernel_sendmsg
1236 1237
 * with a bunch of small requests.
 */
1238
static void xs_udp_write_space(struct sock *sk)
1239 1240 1241
{
	read_lock(&sk->sk_callback_lock);

1242 1243 1244 1245 1246 1247
	/* from net/core/sock.c:sock_def_write_space */
	if (sock_writeable(sk)) {
		struct socket *sock;
		struct rpc_xprt *xprt;

		if (unlikely(!(sock = sk->sk_socket)))
1248
			goto out;
1249 1250
		clear_bit(SOCK_NOSPACE, &sock->flags);

1251 1252
		if (unlikely(!(xprt = xprt_from_sock(sk))))
			goto out;
1253
		if (test_and_clear_bit(SOCK_ASYNC_NOSPACE, &sock->flags) == 0)
1254
			goto out;
1255 1256

		xprt_write_space(xprt);
1257 1258
	}

1259 1260 1261
 out:
	read_unlock(&sk->sk_callback_lock);
}
1262

1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274 1275 1276 1277 1278 1279 1280 1281 1282 1283
/**
 * xs_tcp_write_space - callback invoked when socket buffer space
 *                             becomes available
 * @sk: socket whose state has changed
 *
 * Called when more output buffer space is available for this socket.
 * We try not to wake our writers until they can make "significant"
 * progress, otherwise we'll waste resources thrashing kernel_sendmsg
 * with a bunch of small requests.
 */
static void xs_tcp_write_space(struct sock *sk)
{
	read_lock(&sk->sk_callback_lock);

	/* from net/core/stream.c:sk_stream_write_space */
	if (sk_stream_wspace(sk) >= sk_stream_min_wspace(sk)) {
		struct socket *sock;
		struct rpc_xprt *xprt;

		if (unlikely(!(sock = sk->sk_socket)))
			goto out;
1284 1285
		clear_bit(SOCK_NOSPACE, &sock->flags);

1286 1287
		if (unlikely(!(xprt = xprt_from_sock(sk))))
			goto out;
1288
		if (test_and_clear_bit(SOCK_ASYNC_NOSPACE, &sock->flags) == 0)
1289 1290 1291 1292 1293 1294
			goto out;

		xprt_write_space(xprt);
	}

 out:
1295 1296 1297
	read_unlock(&sk->sk_callback_lock);
}

1298
static void xs_udp_do_set_buffer_size(struct rpc_xprt *xprt)
1299
{
1300 1301
	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
	struct sock *sk = transport->inet;
1302

1303
	if (transport->rcvsize) {
1304
		sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
1305
		sk->sk_rcvbuf = transport->rcvsize * xprt->max_reqs * 2;
1306
	}
1307
	if (transport->sndsize) {
1308
		sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
1309
		sk->sk_sndbuf = transport->sndsize * xprt->max_reqs * 2;
1310 1311 1312 1313
		sk->sk_write_space(sk);
	}
}

1314
/**
1315
 * xs_udp_set_buffer_size - set send and receive limits
1316
 * @xprt: generic transport
1317 1318
 * @sndsize: requested size of send buffer, in bytes
 * @rcvsize: requested size of receive buffer, in bytes
1319
 *
1320
 * Set socket send and receive buffer size limits.
1321
 */
1322
static void xs_udp_set_buffer_size(struct rpc_xprt *xprt, size_t sndsize, size_t rcvsize)
1323
{
1324 1325 1326
	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);

	transport->sndsize = 0;
1327
	if (sndsize)
1328 1329
		transport->sndsize = sndsize + 1024;
	transport->rcvsize = 0;
1330
	if (rcvsize)
1331
		transport->rcvsize = rcvsize + 1024;
1332 1333

	xs_udp_do_set_buffer_size(xprt);
1334 1335
}

1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346
/**
 * xs_udp_timer - called when a retransmit timeout occurs on a UDP transport
 * @task: task that timed out
 *
 * Adjust the congestion window after a retransmit timeout has occurred.
 */
static void xs_udp_timer(struct rpc_task *task)
{
	xprt_adjust_cwnd(task, -ETIMEDOUT);
}

1347 1348 1349 1350 1351 1352 1353
static unsigned short xs_get_random_port(void)
{
	unsigned short range = xprt_max_resvport - xprt_min_resvport;
	unsigned short rand = (unsigned short) net_random() % range;
	return rand + xprt_min_resvport;
}

1354 1355 1356 1357 1358 1359 1360 1361
/**
 * xs_set_port - reset the port number in the remote endpoint address
 * @xprt: generic transport
 * @port: new port number
 *
 */
static void xs_set_port(struct rpc_xprt *xprt, unsigned short port)
{
1362
	struct sockaddr *addr = xs_addr(xprt);
1363

1364
	dprintk("RPC:       setting port for xprt %p to %u\n", xprt, port);
1365

1366 1367 1368 1369 1370 1371 1372 1373 1374 1375
	switch (addr->sa_family) {
	case AF_INET:
		((struct sockaddr_in *)addr)->sin_port = htons(port);
		break;
	case AF_INET6:
		((struct sockaddr_in6 *)addr)->sin6_port = htons(port);
		break;
	default:
		BUG();
	}
1376 1377
}

1378 1379 1380 1381 1382 1383 1384 1385 1386 1387 1388 1389 1390 1391 1392 1393 1394 1395 1396 1397
static unsigned short xs_get_srcport(struct sock_xprt *transport, struct socket *sock)
{
	unsigned short port = transport->port;

	if (port == 0 && transport->xprt.resvport)
		port = xs_get_random_port();
	return port;
}

static unsigned short xs_next_srcport(struct sock_xprt *transport, struct socket *sock, unsigned short port)
{
	if (transport->port != 0)
		transport->port = 0;
	if (!transport->xprt.resvport)
		return 0;
	if (port <= xprt_min_resvport || port > xprt_max_resvport)
		return xprt_max_resvport;
	return --port;
}

1398
static int xs_bind4(struct sock_xprt *transport, struct socket *sock)
1399 1400 1401 1402
{
	struct sockaddr_in myaddr = {
		.sin_family = AF_INET,
	};
1403
	struct sockaddr_in *sa;
1404 1405 1406
	int err, nloop = 0;
	unsigned short port = xs_get_srcport(transport, sock);
	unsigned short last;
1407

1408 1409
	sa = (struct sockaddr_in *)&transport->addr;
	myaddr.sin_addr = sa->sin_addr;
1410 1411
	do {
		myaddr.sin_port = htons(port);
1412
		err = kernel_bind(sock, (struct sockaddr *) &myaddr,
1413
						sizeof(myaddr));
1414
		if (port == 0)
1415
			break;
1416
		if (err == 0) {
1417
			transport->port = port;
1418
			break;
1419
		}
1420 1421 1422 1423 1424
		last = port;
		port = xs_next_srcport(transport, sock, port);
		if (port > last)
			nloop++;
	} while (err == -EADDRINUSE && nloop != 2);
H
Harvey Harrison 已提交
1425 1426
	dprintk("RPC:       %s %pI4:%u: %s (%d)\n",
			__func__, &myaddr.sin_addr,
1427
			port, err ? "failed" : "ok", err);
1428 1429 1430
	return err;
}

1431 1432 1433 1434 1435 1436
static int xs_bind6(struct sock_xprt *transport, struct socket *sock)
{
	struct sockaddr_in6 myaddr = {
		.sin6_family = AF_INET6,
	};
	struct sockaddr_in6 *sa;
1437 1438 1439
	int err, nloop = 0;
	unsigned short port = xs_get_srcport(transport, sock);
	unsigned short last;
1440 1441 1442 1443 1444 1445 1446

	sa = (struct sockaddr_in6 *)&transport->addr;
	myaddr.sin6_addr = sa->sin6_addr;
	do {
		myaddr.sin6_port = htons(port);
		err = kernel_bind(sock, (struct sockaddr *) &myaddr,
						sizeof(myaddr));
1447
		if (port == 0)
1448 1449 1450 1451 1452
			break;
		if (err == 0) {
			transport->port = port;
			break;
		}
1453 1454 1455 1456 1457
		last = port;
		port = xs_next_srcport(transport, sock, port);
		if (port > last)
			nloop++;
	} while (err == -EADDRINUSE && nloop != 2);
H
Harvey Harrison 已提交
1458
	dprintk("RPC:       xs_bind6 %pI6:%u: %s (%d)\n",
1459
		&myaddr.sin6_addr, port, err ? "failed" : "ok", err);
1460 1461 1462
	return err;
}

1463 1464 1465 1466
#ifdef CONFIG_DEBUG_LOCK_ALLOC
static struct lock_class_key xs_key[2];
static struct lock_class_key xs_slock_key[2];

1467
static inline void xs_reclassify_socket4(struct socket *sock)
1468 1469
{
	struct sock *sk = sock->sk;
1470

1471
	BUG_ON(sock_owned_by_user(sk));
1472 1473 1474
	sock_lock_init_class_and_name(sk, "slock-AF_INET-RPC",
		&xs_slock_key[0], "sk_lock-AF_INET-RPC", &xs_key[0]);
}
1475

1476 1477 1478
static inline void xs_reclassify_socket6(struct socket *sock)
{
	struct sock *sk = sock->sk;
1479

1480
	BUG_ON(sock_owned_by_user(sk));
1481 1482
	sock_lock_init_class_and_name(sk, "slock-AF_INET6-RPC",
		&xs_slock_key[1], "sk_lock-AF_INET6-RPC", &xs_key[1]);
1483 1484
}
#else
1485 1486 1487 1488 1489
static inline void xs_reclassify_socket4(struct socket *sock)
{
}

static inline void xs_reclassify_socket6(struct socket *sock)
1490 1491 1492 1493
{
}
#endif

1494 1495 1496 1497 1498 1499 1500 1501 1502
static void xs_udp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock)
{
	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);

	if (!transport->inet) {
		struct sock *sk = sock->sk;

		write_lock_bh(&sk->sk_callback_lock);

1503 1504
		xs_save_old_callbacks(transport, sk);

1505 1506 1507 1508 1509 1510 1511 1512 1513 1514 1515 1516 1517 1518 1519 1520 1521
		sk->sk_user_data = xprt;
		sk->sk_data_ready = xs_udp_data_ready;
		sk->sk_write_space = xs_udp_write_space;
		sk->sk_no_check = UDP_CSUM_NORCV;
		sk->sk_allocation = GFP_ATOMIC;

		xprt_set_connected(xprt);

		/* Reset to new socket */
		transport->sock = sock;
		transport->inet = sk;

		write_unlock_bh(&sk->sk_callback_lock);
	}
	xs_udp_do_set_buffer_size(xprt);
}

1522
/**
C
Chuck Lever 已提交
1523
 * xs_udp_connect_worker4 - set up a UDP socket
1524
 * @work: RPC transport to connect
1525 1526 1527
 *
 * Invoked by a work queue tasklet.
 */
C
Chuck Lever 已提交
1528
static void xs_udp_connect_worker4(struct work_struct *work)
1529
{
1530 1531
	struct sock_xprt *transport =
		container_of(work, struct sock_xprt, connect_worker.work);
1532
	struct rpc_xprt *xprt = &transport->xprt;
1533
	struct socket *sock = transport->sock;
1534
	int err, status = -EIO;
1535

1536
	if (xprt->shutdown)
1537
		goto out;
1538

1539 1540
	/* Start by resetting any existing state */
	xs_close(xprt);
1541

1542
	if ((err = sock_create_kern(PF_INET, SOCK_DGRAM, IPPROTO_UDP, &sock)) < 0) {
1543
		dprintk("RPC:       can't create UDP transport socket (%d).\n", -err);
1544 1545
		goto out;
	}
1546
	xs_reclassify_socket4(sock);
1547

1548
	if (xs_bind4(transport, sock)) {
1549 1550 1551
		sock_release(sock);
		goto out;
	}
1552

1553
	dprintk("RPC:       worker connecting xprt %p to address: %s\n",
1554
			xprt, xprt->address_strings[RPC_DISPLAY_ALL]);
1555

1556
	xs_udp_finish_connecting(xprt, sock);
1557 1558 1559 1560
	status = 0;
out:
	xprt_wake_pending_tasks(xprt, status);
	xprt_clear_connecting(xprt);
1561 1562
}

1563 1564 1565 1566 1567 1568 1569 1570 1571 1572 1573 1574 1575
/**
 * xs_udp_connect_worker6 - set up a UDP socket
 * @work: RPC transport to connect
 *
 * Invoked by a work queue tasklet.
 */
static void xs_udp_connect_worker6(struct work_struct *work)
{
	struct sock_xprt *transport =
		container_of(work, struct sock_xprt, connect_worker.work);
	struct rpc_xprt *xprt = &transport->xprt;
	struct socket *sock = transport->sock;
	int err, status = -EIO;
1576

1577
	if (xprt->shutdown)
1578
		goto out;
1579

1580 1581
	/* Start by resetting any existing state */
	xs_close(xprt);
1582

1583 1584 1585 1586
	if ((err = sock_create_kern(PF_INET6, SOCK_DGRAM, IPPROTO_UDP, &sock)) < 0) {
		dprintk("RPC:       can't create UDP transport socket (%d).\n", -err);
		goto out;
	}
1587
	xs_reclassify_socket6(sock);
1588

1589 1590 1591
	if (xs_bind6(transport, sock) < 0) {
		sock_release(sock);
		goto out;
1592
	}
1593 1594 1595 1596 1597

	dprintk("RPC:       worker connecting xprt %p to address: %s\n",
			xprt, xprt->address_strings[RPC_DISPLAY_ALL]);

	xs_udp_finish_connecting(xprt, sock);
1598 1599 1600 1601
	status = 0;
out:
	xprt_wake_pending_tasks(xprt, status);
	xprt_clear_connecting(xprt);
1602 1603
}

1604 1605 1606 1607 1608 1609 1610
/*
 * We need to preserve the port number so the reply cache on the server can
 * find our cached RPC replies when we get around to reconnecting.
 */
static void xs_tcp_reuse_connection(struct rpc_xprt *xprt)
{
	int result;
1611
	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
1612 1613
	struct sockaddr any;

1614
	dprintk("RPC:       disconnecting xprt %p to reuse port\n", xprt);
1615 1616 1617 1618 1619 1620 1621

	/*
	 * Disconnect the transport socket by doing a connect operation
	 * with AF_UNSPEC.  This should return immediately...
	 */
	memset(&any, 0, sizeof(any));
	any.sa_family = AF_UNSPEC;
1622
	result = kernel_connect(transport->sock, &any, sizeof(any), 0);
1623
	if (result)
1624
		dprintk("RPC:       AF_UNSPEC connect return code %d\n",
1625 1626 1627
				result);
}

1628
static int xs_tcp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock)
1629
{
1630
	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
1631

1632
	if (!transport->inet) {
1633 1634 1635 1636
		struct sock *sk = sock->sk;

		write_lock_bh(&sk->sk_callback_lock);

1637 1638
		xs_save_old_callbacks(transport, sk);

1639 1640 1641 1642
		sk->sk_user_data = xprt;
		sk->sk_data_ready = xs_tcp_data_ready;
		sk->sk_state_change = xs_tcp_state_change;
		sk->sk_write_space = xs_tcp_write_space;
1643
		sk->sk_error_report = xs_tcp_error_report;
1644
		sk->sk_allocation = GFP_ATOMIC;
1645 1646 1647 1648 1649 1650

		/* socket options */
		sk->sk_userlocks |= SOCK_BINDPORT_LOCK;
		sock_reset_flag(sk, SOCK_LINGER);
		tcp_sk(sk)->linger2 = 0;
		tcp_sk(sk)->nonagle |= TCP_NAGLE_OFF;
1651 1652 1653 1654

		xprt_clear_connected(xprt);

		/* Reset to new socket */
1655 1656
		transport->sock = sock;
		transport->inet = sk;
1657 1658 1659 1660

		write_unlock_bh(&sk->sk_callback_lock);
	}

1661 1662 1663
	if (!xprt_bound(xprt))
		return -ENOTCONN;

1664
	/* Tell the socket layer to start connecting... */
1665 1666
	xprt->stat.connect_count++;
	xprt->stat.connect_start = jiffies;
1667
	return kernel_connect(sock, xs_addr(xprt), xprt->addrlen, O_NONBLOCK);
1668 1669
}

1670
/**
C
Chuck Lever 已提交
1671
 * xs_tcp_connect_worker4 - connect a TCP socket to a remote endpoint
1672
 * @work: RPC transport to connect
1673 1674
 *
 * Invoked by a work queue tasklet.
1675
 */
C
Chuck Lever 已提交
1676
static void xs_tcp_connect_worker4(struct work_struct *work)
1677
{
1678 1679
	struct sock_xprt *transport =
		container_of(work, struct sock_xprt, connect_worker.work);
1680
	struct rpc_xprt *xprt = &transport->xprt;
1681
	struct socket *sock = transport->sock;
1682
	int err, status = -EIO;
1683

1684
	if (xprt->shutdown)
1685 1686
		goto out;

1687
	if (!sock) {
1688 1689
		/* start from scratch */
		if ((err = sock_create_kern(PF_INET, SOCK_STREAM, IPPROTO_TCP, &sock)) < 0) {
1690
			dprintk("RPC:       can't create TCP transport socket (%d).\n", -err);
1691 1692
			goto out;
		}
1693
		xs_reclassify_socket4(sock);
1694

1695
		if (xs_bind4(transport, sock) < 0) {
1696 1697 1698 1699 1700 1701
			sock_release(sock);
			goto out;
		}
	} else
		/* "close" the socket, preserving the local port */
		xs_tcp_reuse_connection(xprt);
1702

1703
	dprintk("RPC:       worker connecting xprt %p to address: %s\n",
1704
			xprt, xprt->address_strings[RPC_DISPLAY_ALL]);
1705

1706
	status = xs_tcp_finish_connecting(xprt, sock);
1707 1708 1709
	dprintk("RPC:       %p connect status %d connected %d sock state %d\n",
			xprt, -status, xprt_connected(xprt),
			sock->sk->sk_state);
1710 1711 1712 1713 1714
	if (status < 0) {
		switch (status) {
			case -EINPROGRESS:
			case -EALREADY:
				goto out_clear;
1715 1716 1717 1718 1719 1720
			case -ECONNREFUSED:
			case -ECONNRESET:
				/* retry with existing socket, after a delay */
				break;
			default:
				/* get rid of existing socket, and retry */
1721
				xs_tcp_shutdown(xprt);
1722 1723 1724
		}
	}
out:
1725
	xprt_wake_pending_tasks(xprt, status);
1726 1727 1728
out_clear:
	xprt_clear_connecting(xprt);
}
1729

1730 1731 1732 1733 1734 1735 1736 1737 1738 1739 1740 1741 1742
/**
 * xs_tcp_connect_worker6 - connect a TCP socket to a remote endpoint
 * @work: RPC transport to connect
 *
 * Invoked by a work queue tasklet.
 */
static void xs_tcp_connect_worker6(struct work_struct *work)
{
	struct sock_xprt *transport =
		container_of(work, struct sock_xprt, connect_worker.work);
	struct rpc_xprt *xprt = &transport->xprt;
	struct socket *sock = transport->sock;
	int err, status = -EIO;
1743

1744
	if (xprt->shutdown)
1745
		goto out;
1746

1747 1748 1749 1750 1751 1752
	if (!sock) {
		/* start from scratch */
		if ((err = sock_create_kern(PF_INET6, SOCK_STREAM, IPPROTO_TCP, &sock)) < 0) {
			dprintk("RPC:       can't create TCP transport socket (%d).\n", -err);
			goto out;
		}
1753
		xs_reclassify_socket6(sock);
1754

1755 1756 1757 1758 1759 1760 1761
		if (xs_bind6(transport, sock) < 0) {
			sock_release(sock);
			goto out;
		}
	} else
		/* "close" the socket, preserving the local port */
		xs_tcp_reuse_connection(xprt);
1762

1763 1764
	dprintk("RPC:       worker connecting xprt %p to address: %s\n",
			xprt, xprt->address_strings[RPC_DISPLAY_ALL]);
1765

1766
	status = xs_tcp_finish_connecting(xprt, sock);
1767
	dprintk("RPC:       %p connect status %d connected %d sock state %d\n",
1768
			xprt, -status, xprt_connected(xprt), sock->sk->sk_state);
1769 1770 1771 1772 1773
	if (status < 0) {
		switch (status) {
			case -EINPROGRESS:
			case -EALREADY:
				goto out_clear;
1774 1775 1776 1777 1778 1779
			case -ECONNREFUSED:
			case -ECONNRESET:
				/* retry with existing socket, after a delay */
				break;
			default:
				/* get rid of existing socket, and retry */
1780
				xs_tcp_shutdown(xprt);
1781 1782 1783
		}
	}
out:
1784
	xprt_wake_pending_tasks(xprt, status);
1785
out_clear:
1786
	xprt_clear_connecting(xprt);
1787 1788
}

1789 1790 1791 1792 1793
/**
 * xs_connect - connect a socket to a remote endpoint
 * @task: address of RPC task that manages state of connect request
 *
 * TCP: If the remote end dropped the connection, delay reconnecting.
1794 1795 1796 1797 1798 1799 1800
 *
 * UDP socket connects are synchronous, but we use a work queue anyway
 * to guarantee that even unprivileged user processes can set up a
 * socket on a privileged port.
 *
 * If a UDP socket connect fails, the delay behavior here prevents
 * retry floods (hard mounts).
1801 1802
 */
static void xs_connect(struct rpc_task *task)
1803 1804
{
	struct rpc_xprt *xprt = task->tk_xprt;
1805
	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
1806

1807 1808 1809
	if (xprt_test_and_set_connecting(xprt))
		return;

1810
	if (transport->sock != NULL) {
1811 1812
		dprintk("RPC:       xs_connect delayed xprt %p for %lu "
				"seconds\n",
1813
				xprt, xprt->reestablish_timeout / HZ);
1814 1815 1816
		queue_delayed_work(rpciod_workqueue,
				   &transport->connect_worker,
				   xprt->reestablish_timeout);
1817 1818 1819
		xprt->reestablish_timeout <<= 1;
		if (xprt->reestablish_timeout > XS_TCP_MAX_REEST_TO)
			xprt->reestablish_timeout = XS_TCP_MAX_REEST_TO;
1820
	} else {
1821
		dprintk("RPC:       xs_connect scheduled xprt %p\n", xprt);
1822 1823
		queue_delayed_work(rpciod_workqueue,
				   &transport->connect_worker, 0);
1824 1825 1826
	}
}

1827 1828 1829 1830 1831 1832 1833 1834 1835 1836 1837 1838 1839
static void xs_tcp_connect(struct rpc_task *task)
{
	struct rpc_xprt *xprt = task->tk_xprt;

	/* Initiate graceful shutdown of the socket if not already done */
	if (test_bit(XPRT_CONNECTED, &xprt->state))
		xs_tcp_shutdown(xprt);
	/* Exit if we need to wait for socket shutdown to complete */
	if (test_bit(XPRT_CLOSING, &xprt->state))
		return;
	xs_connect(task);
}

1840 1841 1842 1843 1844 1845 1846 1847
/**
 * xs_udp_print_stats - display UDP socket-specifc stats
 * @xprt: rpc_xprt struct containing statistics
 * @seq: output file
 *
 */
static void xs_udp_print_stats(struct rpc_xprt *xprt, struct seq_file *seq)
{
1848 1849
	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);

1850
	seq_printf(seq, "\txprt:\tudp %u %lu %lu %lu %lu %Lu %Lu\n",
1851
			transport->port,
1852 1853 1854 1855 1856 1857 1858 1859 1860 1861 1862 1863 1864 1865 1866 1867
			xprt->stat.bind_count,
			xprt->stat.sends,
			xprt->stat.recvs,
			xprt->stat.bad_xids,
			xprt->stat.req_u,
			xprt->stat.bklog_u);
}

/**
 * xs_tcp_print_stats - display TCP socket-specifc stats
 * @xprt: rpc_xprt struct containing statistics
 * @seq: output file
 *
 */
static void xs_tcp_print_stats(struct rpc_xprt *xprt, struct seq_file *seq)
{
1868
	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
1869 1870 1871 1872 1873 1874
	long idle_time = 0;

	if (xprt_connected(xprt))
		idle_time = (long)(jiffies - xprt->last_used) / HZ;

	seq_printf(seq, "\txprt:\ttcp %u %lu %lu %lu %ld %lu %lu %lu %Lu %Lu\n",
1875
			transport->port,
1876 1877 1878 1879 1880 1881 1882 1883 1884 1885 1886
			xprt->stat.bind_count,
			xprt->stat.connect_count,
			xprt->stat.connect_time,
			idle_time,
			xprt->stat.sends,
			xprt->stat.recvs,
			xprt->stat.bad_xids,
			xprt->stat.req_u,
			xprt->stat.bklog_u);
}

1887
static struct rpc_xprt_ops xs_udp_ops = {
1888
	.set_buffer_size	= xs_udp_set_buffer_size,
1889
	.reserve_xprt		= xprt_reserve_xprt_cong,
1890
	.release_xprt		= xprt_release_xprt_cong,
1891
	.rpcbind		= rpcb_getport_async,
1892
	.set_port		= xs_set_port,
1893
	.connect		= xs_connect,
1894 1895
	.buf_alloc		= rpc_malloc,
	.buf_free		= rpc_free,
1896
	.send_request		= xs_udp_send_request,
1897
	.set_retrans_timeout	= xprt_set_retrans_timeout_rtt,
1898
	.timer			= xs_udp_timer,
1899
	.release_request	= xprt_release_rqst_cong,
1900 1901
	.close			= xs_close,
	.destroy		= xs_destroy,
1902
	.print_stats		= xs_udp_print_stats,
1903 1904 1905
};

static struct rpc_xprt_ops xs_tcp_ops = {
1906
	.reserve_xprt		= xprt_reserve_xprt,
1907
	.release_xprt		= xs_tcp_release_xprt,
1908
	.rpcbind		= rpcb_getport_async,
1909
	.set_port		= xs_set_port,
1910
	.connect		= xs_tcp_connect,
1911 1912
	.buf_alloc		= rpc_malloc,
	.buf_free		= rpc_free,
1913
	.send_request		= xs_tcp_send_request,
1914
	.set_retrans_timeout	= xprt_set_retrans_timeout_def,
1915
	.close			= xs_tcp_shutdown,
1916
	.destroy		= xs_destroy,
1917
	.print_stats		= xs_tcp_print_stats,
1918 1919
};

1920
static struct rpc_xprt *xs_setup_xprt(struct xprt_create *args,
1921
				      unsigned int slot_table_size)
1922 1923
{
	struct rpc_xprt *xprt;
1924
	struct sock_xprt *new;
1925

1926
	if (args->addrlen > sizeof(xprt->addr)) {
1927
		dprintk("RPC:       xs_setup_xprt: address too large\n");
1928 1929 1930
		return ERR_PTR(-EBADF);
	}

1931 1932
	new = kzalloc(sizeof(*new), GFP_KERNEL);
	if (new == NULL) {
1933 1934
		dprintk("RPC:       xs_setup_xprt: couldn't allocate "
				"rpc_xprt\n");
1935 1936
		return ERR_PTR(-ENOMEM);
	}
1937
	xprt = &new->xprt;
1938 1939 1940 1941 1942

	xprt->max_reqs = slot_table_size;
	xprt->slot = kcalloc(xprt->max_reqs, sizeof(struct rpc_rqst), GFP_KERNEL);
	if (xprt->slot == NULL) {
		kfree(xprt);
1943 1944
		dprintk("RPC:       xs_setup_xprt: couldn't allocate slot "
				"table\n");
1945 1946 1947
		return ERR_PTR(-ENOMEM);
	}

1948 1949
	memcpy(&xprt->addr, args->dstaddr, args->addrlen);
	xprt->addrlen = args->addrlen;
1950 1951
	if (args->srcaddr)
		memcpy(&new->addr, args->srcaddr, args->addrlen);
1952 1953 1954 1955

	return xprt;
}

1956 1957 1958 1959 1960 1961 1962
static const struct rpc_timeout xs_udp_default_timeout = {
	.to_initval = 5 * HZ,
	.to_maxval = 30 * HZ,
	.to_increment = 5 * HZ,
	.to_retries = 5,
};

1963 1964
/**
 * xs_setup_udp - Set up transport to use a UDP socket
1965
 * @args: rpc transport creation arguments
1966 1967
 *
 */
1968
static struct rpc_xprt *xs_setup_udp(struct xprt_create *args)
1969
{
1970
	struct sockaddr *addr = args->dstaddr;
1971
	struct rpc_xprt *xprt;
1972
	struct sock_xprt *transport;
1973

1974
	xprt = xs_setup_xprt(args, xprt_udp_slot_table_entries);
1975 1976
	if (IS_ERR(xprt))
		return xprt;
1977
	transport = container_of(xprt, struct sock_xprt, xprt);
1978

1979
	xprt->prot = IPPROTO_UDP;
1980
	xprt->tsh_size = 0;
1981 1982 1983
	/* XXX: header size can vary due to auth type, IPv6, etc. */
	xprt->max_payload = (1U << 16) - (MAX_HEADER << 3);

1984 1985 1986 1987
	xprt->bind_timeout = XS_BIND_TO;
	xprt->connect_timeout = XS_UDP_CONN_TO;
	xprt->reestablish_timeout = XS_UDP_REEST_TO;
	xprt->idle_timeout = XS_IDLE_DISC_TO;
1988

1989
	xprt->ops = &xs_udp_ops;
1990

1991
	xprt->timeout = &xs_udp_default_timeout;
1992

1993 1994 1995 1996 1997 1998 1999
	switch (addr->sa_family) {
	case AF_INET:
		if (((struct sockaddr_in *)addr)->sin_port != htons(0))
			xprt_set_bound(xprt);

		INIT_DELAYED_WORK(&transport->connect_worker,
					xs_udp_connect_worker4);
2000
		xs_format_ipv4_peer_addresses(xprt, "udp", RPCBIND_NETID_UDP);
2001 2002 2003 2004 2005 2006 2007
		break;
	case AF_INET6:
		if (((struct sockaddr_in6 *)addr)->sin6_port != htons(0))
			xprt_set_bound(xprt);

		INIT_DELAYED_WORK(&transport->connect_worker,
					xs_udp_connect_worker6);
2008
		xs_format_ipv6_peer_addresses(xprt, "udp", RPCBIND_NETID_UDP6);
2009 2010 2011 2012 2013 2014
		break;
	default:
		kfree(xprt);
		return ERR_PTR(-EAFNOSUPPORT);
	}

2015
	dprintk("RPC:       set up transport to address %s\n",
2016
			xprt->address_strings[RPC_DISPLAY_ALL]);
2017

2018 2019 2020 2021 2022 2023
	if (try_module_get(THIS_MODULE))
		return xprt;

	kfree(xprt->slot);
	kfree(xprt);
	return ERR_PTR(-EINVAL);
2024 2025
}

2026 2027 2028 2029 2030 2031
static const struct rpc_timeout xs_tcp_default_timeout = {
	.to_initval = 60 * HZ,
	.to_maxval = 60 * HZ,
	.to_retries = 2,
};

2032 2033
/**
 * xs_setup_tcp - Set up transport to use a TCP socket
2034
 * @args: rpc transport creation arguments
2035 2036
 *
 */
2037
static struct rpc_xprt *xs_setup_tcp(struct xprt_create *args)
2038
{
2039
	struct sockaddr *addr = args->dstaddr;
2040
	struct rpc_xprt *xprt;
2041
	struct sock_xprt *transport;
2042

2043
	xprt = xs_setup_xprt(args, xprt_tcp_slot_table_entries);
2044 2045
	if (IS_ERR(xprt))
		return xprt;
2046
	transport = container_of(xprt, struct sock_xprt, xprt);
2047

2048
	xprt->prot = IPPROTO_TCP;
2049 2050
	xprt->tsh_size = sizeof(rpc_fraghdr) / sizeof(u32);
	xprt->max_payload = RPC_MAX_FRAGMENT_SIZE;
2051

2052 2053 2054 2055
	xprt->bind_timeout = XS_BIND_TO;
	xprt->connect_timeout = XS_TCP_CONN_TO;
	xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO;
	xprt->idle_timeout = XS_IDLE_DISC_TO;
2056

2057
	xprt->ops = &xs_tcp_ops;
2058
	xprt->timeout = &xs_tcp_default_timeout;
2059

2060 2061 2062 2063 2064 2065
	switch (addr->sa_family) {
	case AF_INET:
		if (((struct sockaddr_in *)addr)->sin_port != htons(0))
			xprt_set_bound(xprt);

		INIT_DELAYED_WORK(&transport->connect_worker, xs_tcp_connect_worker4);
2066
		xs_format_ipv4_peer_addresses(xprt, "tcp", RPCBIND_NETID_TCP);
2067 2068 2069 2070 2071 2072
		break;
	case AF_INET6:
		if (((struct sockaddr_in6 *)addr)->sin6_port != htons(0))
			xprt_set_bound(xprt);

		INIT_DELAYED_WORK(&transport->connect_worker, xs_tcp_connect_worker6);
2073
		xs_format_ipv6_peer_addresses(xprt, "tcp", RPCBIND_NETID_TCP6);
2074 2075 2076 2077 2078 2079
		break;
	default:
		kfree(xprt);
		return ERR_PTR(-EAFNOSUPPORT);
	}

2080
	dprintk("RPC:       set up transport to address %s\n",
2081
			xprt->address_strings[RPC_DISPLAY_ALL]);
2082

2083 2084 2085 2086 2087 2088
	if (try_module_get(THIS_MODULE))
		return xprt;

	kfree(xprt->slot);
	kfree(xprt);
	return ERR_PTR(-EINVAL);
2089
}
2090

2091 2092 2093 2094
static struct xprt_class	xs_udp_transport = {
	.list		= LIST_HEAD_INIT(xs_udp_transport.list),
	.name		= "udp",
	.owner		= THIS_MODULE,
2095
	.ident		= IPPROTO_UDP,
2096 2097 2098 2099 2100 2101 2102
	.setup		= xs_setup_udp,
};

static struct xprt_class	xs_tcp_transport = {
	.list		= LIST_HEAD_INIT(xs_tcp_transport.list),
	.name		= "tcp",
	.owner		= THIS_MODULE,
2103
	.ident		= IPPROTO_TCP,
2104 2105 2106
	.setup		= xs_setup_tcp,
};

2107
/**
2108
 * init_socket_xprt - set up xprtsock's sysctls, register with RPC client
2109 2110 2111 2112
 *
 */
int init_socket_xprt(void)
{
2113
#ifdef RPC_DEBUG
2114
	if (!sunrpc_table_header)
2115
		sunrpc_table_header = register_sysctl_table(sunrpc_table);
2116 2117
#endif

2118 2119 2120
	xprt_register_transport(&xs_udp_transport);
	xprt_register_transport(&xs_tcp_transport);

2121 2122 2123 2124
	return 0;
}

/**
2125
 * cleanup_socket_xprt - remove xprtsock's sysctls, unregister
2126 2127 2128 2129
 *
 */
void cleanup_socket_xprt(void)
{
2130 2131 2132 2133 2134 2135
#ifdef RPC_DEBUG
	if (sunrpc_table_header) {
		unregister_sysctl_table(sunrpc_table_header);
		sunrpc_table_header = NULL;
	}
#endif
2136 2137 2138

	xprt_unregister_transport(&xs_udp_transport);
	xprt_unregister_transport(&xs_tcp_transport);
2139
}