xprtsock.c 43.2 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13
/*
 * linux/net/sunrpc/xprtsock.c
 *
 * Client-side transport implementation for sockets.
 *
 * TCP callback races fixes (C) 1998 Red Hat Software <alan@redhat.com>
 * TCP send fixes (C) 1998 Red Hat Software <alan@redhat.com>
 * TCP NFS related read + write fixes
 *  (C) 1999 Dave Airlie, University of Limerick, Ireland <airlied@linux.ie>
 *
 * Rewrite of larges part of the code in order to stabilize TCP stuff.
 * Fix behaviour when socket buffer is full.
 *  (C) 1999 Trond Myklebust <trond.myklebust@fys.uio.no>
14 15
 *
 * IP socket transport implementation, (C) 2005 Chuck Lever <cel@netapp.com>
16 17 18 19 20 21 22 23 24 25 26 27 28 29
 */

#include <linux/types.h>
#include <linux/slab.h>
#include <linux/capability.h>
#include <linux/pagemap.h>
#include <linux/errno.h>
#include <linux/socket.h>
#include <linux/in.h>
#include <linux/net.h>
#include <linux/mm.h>
#include <linux/udp.h>
#include <linux/tcp.h>
#include <linux/sunrpc/clnt.h>
30
#include <linux/sunrpc/sched.h>
31 32 33 34 35 36 37
#include <linux/file.h>

#include <net/sock.h>
#include <net/checksum.h>
#include <net/udp.h>
#include <net/tcp.h>

38 39 40 41 42 43 44 45 46
/*
 * xprtsock tunables
 */
unsigned int xprt_udp_slot_table_entries = RPC_DEF_SLOT_TABLE;
unsigned int xprt_tcp_slot_table_entries = RPC_DEF_SLOT_TABLE;

unsigned int xprt_min_resvport = RPC_DEF_MIN_RESVPORT;
unsigned int xprt_max_resvport = RPC_DEF_MAX_RESVPORT;

47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132
/*
 * We can register our own files under /proc/sys/sunrpc by
 * calling register_sysctl_table() again.  The files in that
 * directory become the union of all files registered there.
 *
 * We simply need to make sure that we don't collide with
 * someone else's file names!
 */

#ifdef RPC_DEBUG

static unsigned int min_slot_table_size = RPC_MIN_SLOT_TABLE;
static unsigned int max_slot_table_size = RPC_MAX_SLOT_TABLE;
static unsigned int xprt_min_resvport_limit = RPC_MIN_RESVPORT;
static unsigned int xprt_max_resvport_limit = RPC_MAX_RESVPORT;

static struct ctl_table_header *sunrpc_table_header;

/*
 * FIXME: changing the UDP slot table size should also resize the UDP
 *        socket buffers for existing UDP transports
 */
static ctl_table xs_tunables_table[] = {
	{
		.ctl_name	= CTL_SLOTTABLE_UDP,
		.procname	= "udp_slot_table_entries",
		.data		= &xprt_udp_slot_table_entries,
		.maxlen		= sizeof(unsigned int),
		.mode		= 0644,
		.proc_handler	= &proc_dointvec_minmax,
		.strategy	= &sysctl_intvec,
		.extra1		= &min_slot_table_size,
		.extra2		= &max_slot_table_size
	},
	{
		.ctl_name	= CTL_SLOTTABLE_TCP,
		.procname	= "tcp_slot_table_entries",
		.data		= &xprt_tcp_slot_table_entries,
		.maxlen		= sizeof(unsigned int),
		.mode		= 0644,
		.proc_handler	= &proc_dointvec_minmax,
		.strategy	= &sysctl_intvec,
		.extra1		= &min_slot_table_size,
		.extra2		= &max_slot_table_size
	},
	{
		.ctl_name	= CTL_MIN_RESVPORT,
		.procname	= "min_resvport",
		.data		= &xprt_min_resvport,
		.maxlen		= sizeof(unsigned int),
		.mode		= 0644,
		.proc_handler	= &proc_dointvec_minmax,
		.strategy	= &sysctl_intvec,
		.extra1		= &xprt_min_resvport_limit,
		.extra2		= &xprt_max_resvport_limit
	},
	{
		.ctl_name	= CTL_MAX_RESVPORT,
		.procname	= "max_resvport",
		.data		= &xprt_max_resvport,
		.maxlen		= sizeof(unsigned int),
		.mode		= 0644,
		.proc_handler	= &proc_dointvec_minmax,
		.strategy	= &sysctl_intvec,
		.extra1		= &xprt_min_resvport_limit,
		.extra2		= &xprt_max_resvport_limit
	},
	{
		.ctl_name = 0,
	},
};

static ctl_table sunrpc_table[] = {
	{
		.ctl_name	= CTL_SUNRPC,
		.procname	= "sunrpc",
		.mode		= 0555,
		.child		= xs_tunables_table
	},
	{
		.ctl_name = 0,
	},
};

#endif

133 134 135 136 137 138
/*
 * How many times to try sending a request on a socket before waiting
 * for the socket buffer to clear.
 */
#define XS_SENDMSG_RETRY	(10U)

139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182
/*
 * Time out for an RPC UDP socket connect.  UDP socket connects are
 * synchronous, but we set a timeout anyway in case of resource
 * exhaustion on the local host.
 */
#define XS_UDP_CONN_TO		(5U * HZ)

/*
 * Wait duration for an RPC TCP connection to be established.  Solaris
 * NFS over TCP uses 60 seconds, for example, which is in line with how
 * long a server takes to reboot.
 */
#define XS_TCP_CONN_TO		(60U * HZ)

/*
 * Wait duration for a reply from the RPC portmapper.
 */
#define XS_BIND_TO		(60U * HZ)

/*
 * Delay if a UDP socket connect error occurs.  This is most likely some
 * kind of resource problem on the local host.
 */
#define XS_UDP_REEST_TO		(2U * HZ)

/*
 * The reestablish timeout allows clients to delay for a bit before attempting
 * to reconnect to a server that just dropped our connection.
 *
 * We implement an exponential backoff when trying to reestablish a TCP
 * transport connection with the server.  Some servers like to drop a TCP
 * connection when they are overworked, so we start with a short timeout and
 * increase over time if the server is down or not responding.
 */
#define XS_TCP_INIT_REEST_TO	(3U * HZ)
#define XS_TCP_MAX_REEST_TO	(5U * 60 * HZ)

/*
 * TCP idle timeout; client drops the transport socket if it is idle
 * for this long.  Note that we also timeout UDP sockets to prevent
 * holding port numbers when there is no RPC traffic.
 */
#define XS_IDLE_DISC_TO		(5U * 60 * HZ)

183 184
#ifdef RPC_DEBUG
# undef  RPC_DEBUG_DATA
185
# define RPCDBG_FACILITY	RPCDBG_TRANS
186 187 188
#endif

#ifdef RPC_DEBUG_DATA
189
static void xs_pktdump(char *msg, u32 *packet, unsigned int count)
190
{
191 192
	u8 *buf = (u8 *) packet;
	int j;
193

194
	dprintk("RPC:       %s\n", msg);
195 196 197 198 199 200 201 202 203 204 205 206
	for (j = 0; j < count && j < 128; j += 4) {
		if (!(j & 31)) {
			if (j)
				dprintk("\n");
			dprintk("0x%04x ", j);
		}
		dprintk("%02x%02x%02x%02x ",
			buf[j], buf[j+1], buf[j+2], buf[j+3]);
	}
	dprintk("\n");
}
#else
207
static inline void xs_pktdump(char *msg, u32 *packet, unsigned int count)
208 209 210 211 212
{
	/* NOP */
}
#endif

213 214
struct sock_xprt {
	struct rpc_xprt		xprt;
215 216 217 218 219 220

	/*
	 * Network layer
	 */
	struct socket *		sock;
	struct sock *		inet;
221 222 223 224 225 226 227 228 229 230 231 232

	/*
	 * State of TCP reply receive
	 */
	__be32			tcp_fraghdr,
				tcp_xid;

	u32			tcp_offset,
				tcp_reclen;

	unsigned long		tcp_copied,
				tcp_flags;
233 234 235 236

	/*
	 * Connection of transports
	 */
237
	struct delayed_work	connect_worker;
238
	struct sockaddr_storage	addr;
239
	unsigned short		port;
240 241 242 243 244 245

	/*
	 * UDP socket buffer size parameters
	 */
	size_t			rcvsize,
				sndsize;
246 247 248 249 250 251 252

	/*
	 * Saved socket callback addresses
	 */
	void			(*old_data_ready)(struct sock *, int);
	void			(*old_state_change)(struct sock *);
	void			(*old_write_space)(struct sock *);
253 254
};

255 256 257 258 259 260 261 262
/*
 * TCP receive state flags
 */
#define TCP_RCV_LAST_FRAG	(1UL << 0)
#define TCP_RCV_COPY_FRAGHDR	(1UL << 1)
#define TCP_RCV_COPY_XID	(1UL << 2)
#define TCP_RCV_COPY_DATA	(1UL << 3)

263
static void xs_format_ipv4_peer_addresses(struct rpc_xprt *xprt)
264 265 266 267 268 269
{
	struct sockaddr_in *addr = (struct sockaddr_in *) &xprt->addr;
	char *buf;

	buf = kzalloc(20, GFP_KERNEL);
	if (buf) {
270
		snprintf(buf, 20, NIPQUAD_FMT,
271 272 273 274 275 276 277 278 279 280 281
				NIPQUAD(addr->sin_addr.s_addr));
	}
	xprt->address_strings[RPC_DISPLAY_ADDR] = buf;

	buf = kzalloc(8, GFP_KERNEL);
	if (buf) {
		snprintf(buf, 8, "%u",
				ntohs(addr->sin_port));
	}
	xprt->address_strings[RPC_DISPLAY_PORT] = buf;

282 283 284 285 286 287 288 289
	buf = kzalloc(8, GFP_KERNEL);
	if (buf) {
		if (xprt->prot == IPPROTO_UDP)
			snprintf(buf, 8, "udp");
		else
			snprintf(buf, 8, "tcp");
	}
	xprt->address_strings[RPC_DISPLAY_PROTO] = buf;
290 291 292

	buf = kzalloc(48, GFP_KERNEL);
	if (buf) {
293
		snprintf(buf, 48, "addr="NIPQUAD_FMT" port=%u proto=%s",
294 295 296 297 298
			NIPQUAD(addr->sin_addr.s_addr),
			ntohs(addr->sin_port),
			xprt->prot == IPPROTO_UDP ? "udp" : "tcp");
	}
	xprt->address_strings[RPC_DISPLAY_ALL] = buf;
299 300 301 302 303 304 305 306 307 308 309 310 311 312

	buf = kzalloc(10, GFP_KERNEL);
	if (buf) {
		snprintf(buf, 10, "%02x%02x%02x%02x",
				NIPQUAD(addr->sin_addr.s_addr));
	}
	xprt->address_strings[RPC_DISPLAY_HEX_ADDR] = buf;

	buf = kzalloc(8, GFP_KERNEL);
	if (buf) {
		snprintf(buf, 8, "%4hx",
				ntohs(addr->sin_port));
	}
	xprt->address_strings[RPC_DISPLAY_HEX_PORT] = buf;
313 314 315 316
}

static void xs_free_peer_addresses(struct rpc_xprt *xprt)
{
317 318 319 320
	int i;

	for (i = 0; i < RPC_DISPLAY_MAX; i++)
		kfree(xprt->address_strings[i]);
321 322
}

323 324
#define XS_SENDMSG_FLAGS	(MSG_DONTWAIT | MSG_NOSIGNAL)

T
Trond Myklebust 已提交
325
static int xs_send_kvec(struct socket *sock, struct sockaddr *addr, int addrlen, struct kvec *vec, unsigned int base, int more)
326 327 328 329
{
	struct msghdr msg = {
		.msg_name	= addr,
		.msg_namelen	= addrlen,
T
Trond Myklebust 已提交
330 331 332 333 334
		.msg_flags	= XS_SENDMSG_FLAGS | (more ? MSG_MORE : 0),
	};
	struct kvec iov = {
		.iov_base	= vec->iov_base + base,
		.iov_len	= vec->iov_len - base,
335 336
	};

T
Trond Myklebust 已提交
337
	if (iov.iov_len != 0)
338 339 340 341
		return kernel_sendmsg(sock, &msg, &iov, 1, iov.iov_len);
	return kernel_sendmsg(sock, &msg, NULL, 0, 0);
}

T
Trond Myklebust 已提交
342
static int xs_send_pagedata(struct socket *sock, struct xdr_buf *xdr, unsigned int base, int more)
343
{
T
Trond Myklebust 已提交
344 345 346 347 348 349 350 351 352 353 354
	struct page **ppage;
	unsigned int remainder;
	int err, sent = 0;

	remainder = xdr->page_len - base;
	base += xdr->page_base;
	ppage = xdr->pages + (base >> PAGE_SHIFT);
	base &= ~PAGE_MASK;
	for(;;) {
		unsigned int len = min_t(unsigned int, PAGE_SIZE - base, remainder);
		int flags = XS_SENDMSG_FLAGS;
355

T
Trond Myklebust 已提交
356 357 358 359 360 361 362 363 364 365 366 367 368 369 370
		remainder -= len;
		if (remainder != 0 || more)
			flags |= MSG_MORE;
		err = sock->ops->sendpage(sock, *ppage, base, len, flags);
		if (remainder == 0 || err != len)
			break;
		sent += err;
		ppage++;
		base = 0;
	}
	if (sent == 0)
		return err;
	if (err > 0)
		sent += err;
	return sent;
371 372
}

373 374 375 376 377 378 379 380
/**
 * xs_sendpages - write pages directly to a socket
 * @sock: socket to send on
 * @addr: UDP only -- address of destination
 * @addrlen: UDP only -- length of destination address
 * @xdr: buffer containing this request
 * @base: starting position in the buffer
 *
381
 */
T
Trond Myklebust 已提交
382
static int xs_sendpages(struct socket *sock, struct sockaddr *addr, int addrlen, struct xdr_buf *xdr, unsigned int base)
383
{
T
Trond Myklebust 已提交
384 385
	unsigned int remainder = xdr->len - base;
	int err, sent = 0;
386

387 388 389 390
	if (unlikely(!sock))
		return -ENOTCONN;

	clear_bit(SOCK_ASYNC_NOSPACE, &sock->flags);
T
Trond Myklebust 已提交
391 392 393 394
	if (base != 0) {
		addr = NULL;
		addrlen = 0;
	}
395

T
Trond Myklebust 已提交
396 397 398 399 400
	if (base < xdr->head[0].iov_len || addr != NULL) {
		unsigned int len = xdr->head[0].iov_len - base;
		remainder -= len;
		err = xs_send_kvec(sock, addr, addrlen, &xdr->head[0], base, remainder != 0);
		if (remainder == 0 || err != len)
401
			goto out;
T
Trond Myklebust 已提交
402
		sent += err;
403 404
		base = 0;
	} else
T
Trond Myklebust 已提交
405
		base -= xdr->head[0].iov_len;
406

T
Trond Myklebust 已提交
407 408 409 410 411
	if (base < xdr->page_len) {
		unsigned int len = xdr->page_len - base;
		remainder -= len;
		err = xs_send_pagedata(sock, xdr, base, remainder != 0);
		if (remainder == 0 || err != len)
412
			goto out;
T
Trond Myklebust 已提交
413
		sent += err;
414
		base = 0;
T
Trond Myklebust 已提交
415 416 417 418 419 420
	} else
		base -= xdr->page_len;

	if (base >= xdr->tail[0].iov_len)
		return sent;
	err = xs_send_kvec(sock, NULL, 0, &xdr->tail[0], base, 0);
421
out:
T
Trond Myklebust 已提交
422 423 424 425 426
	if (sent == 0)
		return err;
	if (err > 0)
		sent += err;
	return sent;
427 428
}

429
/**
430 431
 * xs_nospace - place task on wait queue if transmit was incomplete
 * @task: task to put to sleep
432
 *
433
 */
434
static void xs_nospace(struct rpc_task *task)
435
{
436 437
	struct rpc_rqst *req = task->tk_rqstp;
	struct rpc_xprt *xprt = req->rq_xprt;
438
	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
439

440
	dprintk("RPC: %5u xmit incomplete (%u left of %u)\n",
441 442 443
			task->tk_pid, req->rq_slen - req->rq_bytes_sent,
			req->rq_slen);

444
	if (test_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags)) {
445 446 447 448 449 450
		/* Protect against races with write_space */
		spin_lock_bh(&xprt->transport_lock);

		/* Don't race with disconnect */
		if (!xprt_connected(xprt))
			task->tk_status = -ENOTCONN;
451
		else if (test_bit(SOCK_NOSPACE, &transport->sock->flags))
452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474
			xprt_wait_for_buffer_space(task);

		spin_unlock_bh(&xprt->transport_lock);
	} else
		/* Keep holding the socket if it is blocked */
		rpc_delay(task, HZ>>4);
}

/**
 * xs_udp_send_request - write an RPC request to a UDP socket
 * @task: address of RPC task that manages the state of an RPC request
 *
 * Return values:
 *        0:	The request has been sent
 *   EAGAIN:	The socket was blocked, please call again later to
 *		complete the request
 * ENOTCONN:	Caller needs to invoke connect logic then call again
 *    other:	Some other error occured, the request was not sent
 */
static int xs_udp_send_request(struct rpc_task *task)
{
	struct rpc_rqst *req = task->tk_rqstp;
	struct rpc_xprt *xprt = req->rq_xprt;
475
	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
476 477
	struct xdr_buf *xdr = &req->rq_snd_buf;
	int status;
478

479
	xs_pktdump("packet data:",
480 481 482
				req->rq_svec->iov_base,
				req->rq_svec->iov_len);

483
	req->rq_xtime = jiffies;
484 485 486 487
	status = xs_sendpages(transport->sock,
			      (struct sockaddr *) &xprt->addr,
			      xprt->addrlen, xdr,
			      req->rq_bytes_sent);
488

489
	dprintk("RPC:       xs_udp_send_request(%u) = %d\n",
490
			xdr->len - req->rq_bytes_sent, status);
491

492 493
	if (likely(status >= (int) req->rq_slen))
		return 0;
494

495 496 497
	/* Still some bytes left; set up for a retry later. */
	if (status > 0)
		status = -EAGAIN;
498

499 500 501
	switch (status) {
	case -ENETUNREACH:
	case -EPIPE:
502 503
	case -ECONNREFUSED:
		/* When the server has died, an ICMP port unreachable message
504
		 * prompts ECONNREFUSED. */
505
		break;
506 507
	case -EAGAIN:
		xs_nospace(task);
508 509
		break;
	default:
510
		dprintk("RPC:       sendmsg returned unrecognized error %d\n",
511
			-status);
512
		break;
513
	}
514 515

	return status;
516 517
}

518 519 520 521 522 523 524
static inline void xs_encode_tcp_record_marker(struct xdr_buf *buf)
{
	u32 reclen = buf->len - sizeof(rpc_fraghdr);
	rpc_fraghdr *base = buf->head[0].iov_base;
	*base = htonl(RPC_LAST_STREAM_FRAGMENT | reclen);
}

525
/**
526
 * xs_tcp_send_request - write an RPC request to a TCP socket
527 528 529
 * @task: address of RPC task that manages the state of an RPC request
 *
 * Return values:
530 531 532 533 534
 *        0:	The request has been sent
 *   EAGAIN:	The socket was blocked, please call again later to
 *		complete the request
 * ENOTCONN:	Caller needs to invoke connect logic then call again
 *    other:	Some other error occured, the request was not sent
535 536
 *
 * XXX: In the case of soft timeouts, should we eventually give up
537
 *	if sendmsg is not able to make progress?
538
 */
539
static int xs_tcp_send_request(struct rpc_task *task)
540 541 542
{
	struct rpc_rqst *req = task->tk_rqstp;
	struct rpc_xprt *xprt = req->rq_xprt;
543
	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
544
	struct xdr_buf *xdr = &req->rq_snd_buf;
545 546
	int status;
	unsigned int retry = 0;
547

548
	xs_encode_tcp_record_marker(&req->rq_snd_buf);
549

550 551 552
	xs_pktdump("packet data:",
				req->rq_svec->iov_base,
				req->rq_svec->iov_len);
553 554 555

	/* Continue transmitting the packet/record. We must be careful
	 * to cope with writespace callbacks arriving _after_ we have
556
	 * called sendmsg(). */
557 558
	while (1) {
		req->rq_xtime = jiffies;
559 560
		status = xs_sendpages(transport->sock,
					NULL, 0, xdr, req->rq_bytes_sent);
561

562
		dprintk("RPC:       xs_tcp_send_request(%u) = %d\n",
563
				xdr->len - req->rq_bytes_sent, status);
564

565
		if (unlikely(status < 0))
566 567
			break;

568 569 570
		/* If we've sent the entire packet, immediately
		 * reset the count of bytes sent. */
		req->rq_bytes_sent += status;
571
		task->tk_bytes_sent += status;
572 573 574 575
		if (likely(req->rq_bytes_sent >= req->rq_slen)) {
			req->rq_bytes_sent = 0;
			return 0;
		}
576 577

		status = -EAGAIN;
578
		if (retry++ > XS_SENDMSG_RETRY)
579 580 581
			break;
	}

582 583 584 585 586 587 588 589 590 591 592
	switch (status) {
	case -EAGAIN:
		xs_nospace(task);
		break;
	case -ECONNREFUSED:
	case -ECONNRESET:
	case -ENOTCONN:
	case -EPIPE:
		status = -ENOTCONN;
		break;
	default:
593
		dprintk("RPC:       sendmsg returned unrecognized error %d\n",
594
			-status);
595
		xprt_disconnect(xprt);
596
		break;
597
	}
598

599 600 601
	return status;
}

602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628
/**
 * xs_tcp_release_xprt - clean up after a tcp transmission
 * @xprt: transport
 * @task: rpc task
 *
 * This cleans up if an error causes us to abort the transmission of a request.
 * In this case, the socket may need to be reset in order to avoid confusing
 * the server.
 */
static void xs_tcp_release_xprt(struct rpc_xprt *xprt, struct rpc_task *task)
{
	struct rpc_rqst *req;

	if (task != xprt->snd_task)
		return;
	if (task == NULL)
		goto out_release;
	req = task->tk_rqstp;
	if (req->rq_bytes_sent == 0)
		goto out_release;
	if (req->rq_bytes_sent == req->rq_snd_buf.len)
		goto out_release;
	set_bit(XPRT_CLOSE_WAIT, &task->tk_xprt->state);
out_release:
	xprt_release_xprt(xprt, task);
}

629 630 631 632
/**
 * xs_close - close a socket
 * @xprt: transport
 *
633 634
 * This is used when all requests are complete; ie, no DRC state remains
 * on the server we want to save.
635
 */
636
static void xs_close(struct rpc_xprt *xprt)
637
{
638 639 640
	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
	struct socket *sock = transport->sock;
	struct sock *sk = transport->inet;
641 642

	if (!sk)
643
		goto clear_close_wait;
644

645
	dprintk("RPC:       xs_close xprt %p\n", xprt);
646

647
	write_lock_bh(&sk->sk_callback_lock);
648 649
	transport->inet = NULL;
	transport->sock = NULL;
650

651
	sk->sk_user_data = NULL;
652 653 654
	sk->sk_data_ready = transport->old_data_ready;
	sk->sk_state_change = transport->old_state_change;
	sk->sk_write_space = transport->old_write_space;
655 656
	write_unlock_bh(&sk->sk_callback_lock);

657
	sk->sk_no_check = 0;
658 659

	sock_release(sock);
660 661 662 663
clear_close_wait:
	smp_mb__before_clear_bit();
	clear_bit(XPRT_CLOSE_WAIT, &xprt->state);
	smp_mb__after_clear_bit();
664 665
}

666 667 668 669 670 671
/**
 * xs_destroy - prepare to shutdown a transport
 * @xprt: doomed transport
 *
 */
static void xs_destroy(struct rpc_xprt *xprt)
672
{
673 674
	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);

675
	dprintk("RPC:       xs_destroy xprt %p\n", xprt);
676

677
	cancel_rearming_delayed_work(&transport->connect_worker);
678 679

	xprt_disconnect(xprt);
680
	xs_close(xprt);
681
	xs_free_peer_addresses(xprt);
682
	kfree(xprt->slot);
683
	kfree(xprt);
684 685
}

686 687 688 689 690 691 692 693 694 695
static inline struct rpc_xprt *xprt_from_sock(struct sock *sk)
{
	return (struct rpc_xprt *) sk->sk_user_data;
}

/**
 * xs_udp_data_ready - "data ready" callback for UDP sockets
 * @sk: socket with data to read
 * @len: how much data to read
 *
696
 */
697
static void xs_udp_data_ready(struct sock *sk, int len)
698
{
699 700
	struct rpc_task *task;
	struct rpc_xprt *xprt;
701
	struct rpc_rqst *rovr;
702
	struct sk_buff *skb;
703
	int err, repsize, copied;
704 705
	u32 _xid;
	__be32 *xp;
706 707

	read_lock(&sk->sk_callback_lock);
708
	dprintk("RPC:       xs_udp_data_ready...\n");
709
	if (!(xprt = xprt_from_sock(sk)))
710 711 712 713 714 715 716 717 718 719
		goto out;

	if ((skb = skb_recv_datagram(sk, 0, 1, &err)) == NULL)
		goto out;

	if (xprt->shutdown)
		goto dropit;

	repsize = skb->len - sizeof(struct udphdr);
	if (repsize < 4) {
720
		dprintk("RPC:       impossible RPC reply size %d!\n", repsize);
721 722 723 724 725 726 727 728 729 730
		goto dropit;
	}

	/* Copy the XID from the skb... */
	xp = skb_header_pointer(skb, sizeof(struct udphdr),
				sizeof(_xid), &_xid);
	if (xp == NULL)
		goto dropit;

	/* Look up and lock the request corresponding to the given XID */
C
Chuck Lever 已提交
731
	spin_lock(&xprt->transport_lock);
732 733 734 735 736 737 738 739 740 741 742 743 744 745 746
	rovr = xprt_lookup_rqst(xprt, *xp);
	if (!rovr)
		goto out_unlock;
	task = rovr->rq_task;

	if ((copied = rovr->rq_private_buf.buflen) > repsize)
		copied = repsize;

	/* Suck it into the iovec, verify checksum if not done by hw. */
	if (csum_partial_copy_to_xdr(&rovr->rq_private_buf, skb))
		goto out_unlock;

	/* Something worked... */
	dst_confirm(skb->dst);

747 748 749
	xprt_adjust_cwnd(task, copied);
	xprt_update_rtt(task);
	xprt_complete_rqst(task, copied);
750 751

 out_unlock:
C
Chuck Lever 已提交
752
	spin_unlock(&xprt->transport_lock);
753 754 755 756 757 758
 dropit:
	skb_free_datagram(sk, skb);
 out:
	read_unlock(&sk->sk_callback_lock);
}

759
static inline void xs_tcp_read_fraghdr(struct rpc_xprt *xprt, struct xdr_skb_reader *desc)
760
{
761
	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
762 763 764
	size_t len, used;
	char *p;

765 766
	p = ((char *) &transport->tcp_fraghdr) + transport->tcp_offset;
	len = sizeof(transport->tcp_fraghdr) - transport->tcp_offset;
767
	used = xdr_skb_read_bits(desc, p, len);
768
	transport->tcp_offset += used;
769 770
	if (used != len)
		return;
771

772 773
	transport->tcp_reclen = ntohl(transport->tcp_fraghdr);
	if (transport->tcp_reclen & RPC_LAST_STREAM_FRAGMENT)
774
		transport->tcp_flags |= TCP_RCV_LAST_FRAG;
775
	else
776
		transport->tcp_flags &= ~TCP_RCV_LAST_FRAG;
777
	transport->tcp_reclen &= RPC_FRAGMENT_SIZE_MASK;
778

779
	transport->tcp_flags &= ~TCP_RCV_COPY_FRAGHDR;
780
	transport->tcp_offset = 0;
781

782
	/* Sanity check of the record length */
783
	if (unlikely(transport->tcp_reclen < 4)) {
784
		dprintk("RPC:       invalid TCP record fragment length\n");
785
		xprt_disconnect(xprt);
786
		return;
787
	}
788
	dprintk("RPC:       reading TCP record fragment of length %d\n",
789
			transport->tcp_reclen);
790 791
}

792
static void xs_tcp_check_fraghdr(struct sock_xprt *transport)
793
{
794
	if (transport->tcp_offset == transport->tcp_reclen) {
795
		transport->tcp_flags |= TCP_RCV_COPY_FRAGHDR;
796
		transport->tcp_offset = 0;
797 798 799
		if (transport->tcp_flags & TCP_RCV_LAST_FRAG) {
			transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
			transport->tcp_flags |= TCP_RCV_COPY_XID;
800
			transport->tcp_copied = 0;
801 802 803 804
		}
	}
}

805
static inline void xs_tcp_read_xid(struct sock_xprt *transport, struct xdr_skb_reader *desc)
806 807 808 809
{
	size_t len, used;
	char *p;

810
	len = sizeof(transport->tcp_xid) - transport->tcp_offset;
811
	dprintk("RPC:       reading XID (%Zu bytes)\n", len);
812
	p = ((char *) &transport->tcp_xid) + transport->tcp_offset;
813
	used = xdr_skb_read_bits(desc, p, len);
814
	transport->tcp_offset += used;
815 816
	if (used != len)
		return;
817 818
	transport->tcp_flags &= ~TCP_RCV_COPY_XID;
	transport->tcp_flags |= TCP_RCV_COPY_DATA;
819
	transport->tcp_copied = 4;
820
	dprintk("RPC:       reading reply for XID %08x\n",
821 822
			ntohl(transport->tcp_xid));
	xs_tcp_check_fraghdr(transport);
823 824
}

825
static inline void xs_tcp_read_request(struct rpc_xprt *xprt, struct xdr_skb_reader *desc)
826
{
827
	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
828 829 830 831 832 833
	struct rpc_rqst *req;
	struct xdr_buf *rcvbuf;
	size_t len;
	ssize_t r;

	/* Find and lock the request corresponding to this xid */
C
Chuck Lever 已提交
834
	spin_lock(&xprt->transport_lock);
835
	req = xprt_lookup_rqst(xprt, transport->tcp_xid);
836
	if (!req) {
837
		transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
838
		dprintk("RPC:       XID %08x request not found!\n",
839
				ntohl(transport->tcp_xid));
C
Chuck Lever 已提交
840
		spin_unlock(&xprt->transport_lock);
841 842 843 844 845
		return;
	}

	rcvbuf = &req->rq_private_buf;
	len = desc->count;
846
	if (len > transport->tcp_reclen - transport->tcp_offset) {
847
		struct xdr_skb_reader my_desc;
848

849
		len = transport->tcp_reclen - transport->tcp_offset;
850 851
		memcpy(&my_desc, desc, sizeof(my_desc));
		my_desc.count = len;
852
		r = xdr_partial_copy_from_skb(rcvbuf, transport->tcp_copied,
853
					  &my_desc, xdr_skb_read_bits);
854 855 856
		desc->count -= r;
		desc->offset += r;
	} else
857
		r = xdr_partial_copy_from_skb(rcvbuf, transport->tcp_copied,
858
					  desc, xdr_skb_read_bits);
859 860

	if (r > 0) {
861 862
		transport->tcp_copied += r;
		transport->tcp_offset += r;
863 864 865 866 867
	}
	if (r != len) {
		/* Error when copying to the receive buffer,
		 * usually because we weren't able to allocate
		 * additional buffer pages. All we can do now
868
		 * is turn off TCP_RCV_COPY_DATA, so the request
869 870 871 872 873
		 * will not receive any additional updates,
		 * and time out.
		 * Any remaining data from this record will
		 * be discarded.
		 */
874
		transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
875
		dprintk("RPC:       XID %08x truncated request\n",
876
				ntohl(transport->tcp_xid));
877 878 879 880
		dprintk("RPC:       xprt = %p, tcp_copied = %lu, "
				"tcp_offset = %u, tcp_reclen = %u\n",
				xprt, transport->tcp_copied,
				transport->tcp_offset, transport->tcp_reclen);
881 882 883
		goto out;
	}

884
	dprintk("RPC:       XID %08x read %Zd bytes\n",
885
			ntohl(transport->tcp_xid), r);
886 887 888
	dprintk("RPC:       xprt = %p, tcp_copied = %lu, tcp_offset = %u, "
			"tcp_reclen = %u\n", xprt, transport->tcp_copied,
			transport->tcp_offset, transport->tcp_reclen);
889 890

	if (transport->tcp_copied == req->rq_private_buf.buflen)
891
		transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
892
	else if (transport->tcp_offset == transport->tcp_reclen) {
893 894
		if (transport->tcp_flags & TCP_RCV_LAST_FRAG)
			transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
895 896 897
	}

out:
898
	if (!(transport->tcp_flags & TCP_RCV_COPY_DATA))
899
		xprt_complete_rqst(req->rq_task, transport->tcp_copied);
C
Chuck Lever 已提交
900
	spin_unlock(&xprt->transport_lock);
901
	xs_tcp_check_fraghdr(transport);
902 903
}

904
static inline void xs_tcp_read_discard(struct sock_xprt *transport, struct xdr_skb_reader *desc)
905 906 907
{
	size_t len;

908
	len = transport->tcp_reclen - transport->tcp_offset;
909 910 911 912
	if (len > desc->count)
		len = desc->count;
	desc->count -= len;
	desc->offset += len;
913
	transport->tcp_offset += len;
914
	dprintk("RPC:       discarded %Zu bytes\n", len);
915
	xs_tcp_check_fraghdr(transport);
916 917
}

918
static int xs_tcp_data_recv(read_descriptor_t *rd_desc, struct sk_buff *skb, unsigned int offset, size_t len)
919 920
{
	struct rpc_xprt *xprt = rd_desc->arg.data;
921
	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
922
	struct xdr_skb_reader desc = {
923 924 925
		.skb	= skb,
		.offset	= offset,
		.count	= len,
926
	};
927

928
	dprintk("RPC:       xs_tcp_data_recv started\n");
929 930 931
	do {
		/* Read in a new fragment marker if necessary */
		/* Can we ever really expect to get completely empty fragments? */
932
		if (transport->tcp_flags & TCP_RCV_COPY_FRAGHDR) {
933
			xs_tcp_read_fraghdr(xprt, &desc);
934 935 936
			continue;
		}
		/* Read in the xid if necessary */
937
		if (transport->tcp_flags & TCP_RCV_COPY_XID) {
938
			xs_tcp_read_xid(transport, &desc);
939 940 941
			continue;
		}
		/* Read in the request data */
942
		if (transport->tcp_flags & TCP_RCV_COPY_DATA) {
943
			xs_tcp_read_request(xprt, &desc);
944 945 946
			continue;
		}
		/* Skip over any trailing bytes on short reads */
947
		xs_tcp_read_discard(transport, &desc);
948
	} while (desc.count);
949
	dprintk("RPC:       xs_tcp_data_recv done\n");
950 951 952
	return len - desc.count;
}

953 954 955 956 957 958 959
/**
 * xs_tcp_data_ready - "data ready" callback for TCP sockets
 * @sk: socket with data to read
 * @bytes: how much data to read
 *
 */
static void xs_tcp_data_ready(struct sock *sk, int bytes)
960 961 962 963
{
	struct rpc_xprt *xprt;
	read_descriptor_t rd_desc;

964 965
	dprintk("RPC:       xs_tcp_data_ready...\n");

966
	read_lock(&sk->sk_callback_lock);
967
	if (!(xprt = xprt_from_sock(sk)))
968 969 970 971
		goto out;
	if (xprt->shutdown)
		goto out;

972
	/* We use rd_desc to pass struct xprt to xs_tcp_data_recv */
973 974
	rd_desc.arg.data = xprt;
	rd_desc.count = 65536;
975
	tcp_read_sock(sk, &rd_desc, xs_tcp_data_recv);
976 977 978 979
out:
	read_unlock(&sk->sk_callback_lock);
}

980 981 982 983 984 985
/**
 * xs_tcp_state_change - callback to handle TCP socket state changes
 * @sk: socket whose state has changed
 *
 */
static void xs_tcp_state_change(struct sock *sk)
986
{
987
	struct rpc_xprt *xprt;
988 989 990 991

	read_lock(&sk->sk_callback_lock);
	if (!(xprt = xprt_from_sock(sk)))
		goto out;
992 993 994 995 996
	dprintk("RPC:       xs_tcp_state_change client %p...\n", xprt);
	dprintk("RPC:       state %x conn %d dead %d zapped %d\n",
			sk->sk_state, xprt_connected(xprt),
			sock_flag(sk, SOCK_DEAD),
			sock_flag(sk, SOCK_ZAPPED));
997 998 999

	switch (sk->sk_state) {
	case TCP_ESTABLISHED:
C
Chuck Lever 已提交
1000
		spin_lock_bh(&xprt->transport_lock);
1001
		if (!xprt_test_and_set_connected(xprt)) {
1002 1003 1004
			struct sock_xprt *transport = container_of(xprt,
					struct sock_xprt, xprt);

1005
			/* Reset TCP record info */
1006 1007 1008
			transport->tcp_offset = 0;
			transport->tcp_reclen = 0;
			transport->tcp_copied = 0;
1009 1010
			transport->tcp_flags =
				TCP_RCV_COPY_FRAGHDR | TCP_RCV_COPY_XID;
1011

1012
			xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO;
1013
			xprt_wake_pending_tasks(xprt, 0);
1014
		}
C
Chuck Lever 已提交
1015
		spin_unlock_bh(&xprt->transport_lock);
1016 1017 1018 1019
		break;
	case TCP_SYN_SENT:
	case TCP_SYN_RECV:
		break;
1020 1021 1022 1023
	case TCP_CLOSE_WAIT:
		/* Try to schedule an autoclose RPC calls */
		set_bit(XPRT_CLOSE_WAIT, &xprt->state);
		if (test_and_set_bit(XPRT_LOCKED, &xprt->state) == 0)
1024
			queue_work(rpciod_workqueue, &xprt->task_cleanup);
1025 1026 1027 1028 1029 1030 1031
	default:
		xprt_disconnect(xprt);
	}
 out:
	read_unlock(&sk->sk_callback_lock);
}

1032
/**
1033 1034
 * xs_udp_write_space - callback invoked when socket buffer space
 *                             becomes available
1035 1036
 * @sk: socket whose state has changed
 *
1037 1038
 * Called when more output buffer space is available for this socket.
 * We try not to wake our writers until they can make "significant"
1039
 * progress, otherwise we'll waste resources thrashing kernel_sendmsg
1040 1041
 * with a bunch of small requests.
 */
1042
static void xs_udp_write_space(struct sock *sk)
1043 1044 1045
{
	read_lock(&sk->sk_callback_lock);

1046 1047 1048 1049 1050 1051
	/* from net/core/sock.c:sock_def_write_space */
	if (sock_writeable(sk)) {
		struct socket *sock;
		struct rpc_xprt *xprt;

		if (unlikely(!(sock = sk->sk_socket)))
1052
			goto out;
1053 1054 1055
		if (unlikely(!(xprt = xprt_from_sock(sk))))
			goto out;
		if (unlikely(!test_and_clear_bit(SOCK_NOSPACE, &sock->flags)))
1056
			goto out;
1057 1058

		xprt_write_space(xprt);
1059 1060
	}

1061 1062 1063
 out:
	read_unlock(&sk->sk_callback_lock);
}
1064

1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094
/**
 * xs_tcp_write_space - callback invoked when socket buffer space
 *                             becomes available
 * @sk: socket whose state has changed
 *
 * Called when more output buffer space is available for this socket.
 * We try not to wake our writers until they can make "significant"
 * progress, otherwise we'll waste resources thrashing kernel_sendmsg
 * with a bunch of small requests.
 */
static void xs_tcp_write_space(struct sock *sk)
{
	read_lock(&sk->sk_callback_lock);

	/* from net/core/stream.c:sk_stream_write_space */
	if (sk_stream_wspace(sk) >= sk_stream_min_wspace(sk)) {
		struct socket *sock;
		struct rpc_xprt *xprt;

		if (unlikely(!(sock = sk->sk_socket)))
			goto out;
		if (unlikely(!(xprt = xprt_from_sock(sk))))
			goto out;
		if (unlikely(!test_and_clear_bit(SOCK_NOSPACE, &sock->flags)))
			goto out;

		xprt_write_space(xprt);
	}

 out:
1095 1096 1097
	read_unlock(&sk->sk_callback_lock);
}

1098
static void xs_udp_do_set_buffer_size(struct rpc_xprt *xprt)
1099
{
1100 1101
	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
	struct sock *sk = transport->inet;
1102

1103
	if (transport->rcvsize) {
1104
		sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
1105
		sk->sk_rcvbuf = transport->rcvsize * xprt->max_reqs * 2;
1106
	}
1107
	if (transport->sndsize) {
1108
		sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
1109
		sk->sk_sndbuf = transport->sndsize * xprt->max_reqs * 2;
1110 1111 1112 1113
		sk->sk_write_space(sk);
	}
}

1114
/**
1115
 * xs_udp_set_buffer_size - set send and receive limits
1116
 * @xprt: generic transport
1117 1118
 * @sndsize: requested size of send buffer, in bytes
 * @rcvsize: requested size of receive buffer, in bytes
1119
 *
1120
 * Set socket send and receive buffer size limits.
1121
 */
1122
static void xs_udp_set_buffer_size(struct rpc_xprt *xprt, size_t sndsize, size_t rcvsize)
1123
{
1124 1125 1126
	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);

	transport->sndsize = 0;
1127
	if (sndsize)
1128 1129
		transport->sndsize = sndsize + 1024;
	transport->rcvsize = 0;
1130
	if (rcvsize)
1131
		transport->rcvsize = rcvsize + 1024;
1132 1133

	xs_udp_do_set_buffer_size(xprt);
1134 1135
}

1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146
/**
 * xs_udp_timer - called when a retransmit timeout occurs on a UDP transport
 * @task: task that timed out
 *
 * Adjust the congestion window after a retransmit timeout has occurred.
 */
static void xs_udp_timer(struct rpc_task *task)
{
	xprt_adjust_cwnd(task, -ETIMEDOUT);
}

1147 1148 1149 1150 1151 1152 1153
static unsigned short xs_get_random_port(void)
{
	unsigned short range = xprt_max_resvport - xprt_min_resvport;
	unsigned short rand = (unsigned short) net_random() % range;
	return rand + xprt_min_resvport;
}

1154 1155 1156 1157 1158 1159 1160 1161
/**
 * xs_set_port - reset the port number in the remote endpoint address
 * @xprt: generic transport
 * @port: new port number
 *
 */
static void xs_set_port(struct rpc_xprt *xprt, unsigned short port)
{
1162 1163
	struct sockaddr_in *sap = (struct sockaddr_in *) &xprt->addr;

1164
	dprintk("RPC:       setting port for xprt %p to %u\n", xprt, port);
1165 1166

	sap->sin_port = htons(port);
1167 1168
}

1169
static int xs_bind(struct sock_xprt *transport, struct socket *sock)
1170 1171 1172 1173
{
	struct sockaddr_in myaddr = {
		.sin_family = AF_INET,
	};
1174
	struct sockaddr_in *sa;
1175
	int err;
1176
	unsigned short port = transport->port;
1177

1178 1179 1180 1181
	if (!transport->xprt.resvport)
		port = 0;
	sa = (struct sockaddr_in *)&transport->addr;
	myaddr.sin_addr = sa->sin_addr;
1182 1183
	do {
		myaddr.sin_port = htons(port);
1184
		err = kernel_bind(sock, (struct sockaddr *) &myaddr,
1185
						sizeof(myaddr));
1186 1187
		if (!transport->xprt.resvport)
			break;
1188
		if (err == 0) {
1189
			transport->port = port;
1190
			break;
1191
		}
1192 1193 1194 1195
		if (port <= xprt_min_resvport)
			port = xprt_max_resvport;
		else
			port--;
1196
	} while (err == -EADDRINUSE && port != transport->port);
1197 1198
	dprintk("RPC:       xs_bind "NIPQUAD_FMT":%u: %s (%d)\n",
		NIPQUAD(myaddr.sin_addr), port, err ? "failed" : "ok", err);
1199 1200 1201
	return err;
}

1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230
#ifdef CONFIG_DEBUG_LOCK_ALLOC
static struct lock_class_key xs_key[2];
static struct lock_class_key xs_slock_key[2];

static inline void xs_reclassify_socket(struct socket *sock)
{
	struct sock *sk = sock->sk;
	BUG_ON(sk->sk_lock.owner != NULL);
	switch (sk->sk_family) {
	case AF_INET:
		sock_lock_init_class_and_name(sk, "slock-AF_INET-NFS",
			&xs_slock_key[0], "sk_lock-AF_INET-NFS", &xs_key[0]);
		break;

	case AF_INET6:
		sock_lock_init_class_and_name(sk, "slock-AF_INET6-NFS",
			&xs_slock_key[1], "sk_lock-AF_INET6-NFS", &xs_key[1]);
		break;

	default:
		BUG();
	}
}
#else
static inline void xs_reclassify_socket(struct socket *sock)
{
}
#endif

1231 1232
/**
 * xs_udp_connect_worker - set up a UDP socket
1233
 * @work: RPC transport to connect
1234 1235 1236
 *
 * Invoked by a work queue tasklet.
 */
1237
static void xs_udp_connect_worker(struct work_struct *work)
1238
{
1239 1240
	struct sock_xprt *transport =
		container_of(work, struct sock_xprt, connect_worker.work);
1241
	struct rpc_xprt *xprt = &transport->xprt;
1242
	struct socket *sock = transport->sock;
1243
	int err, status = -EIO;
1244

1245
	if (xprt->shutdown || !xprt_bound(xprt))
1246
		goto out;
1247

1248 1249
	/* Start by resetting any existing state */
	xs_close(xprt);
1250

1251
	if ((err = sock_create_kern(PF_INET, SOCK_DGRAM, IPPROTO_UDP, &sock)) < 0) {
1252
		dprintk("RPC:       can't create UDP transport socket (%d).\n", -err);
1253 1254
		goto out;
	}
1255
	xs_reclassify_socket(sock);
1256

1257
	if (xs_bind(transport, sock)) {
1258 1259 1260
		sock_release(sock);
		goto out;
	}
1261

1262
	dprintk("RPC:       worker connecting xprt %p to address: %s\n",
1263
			xprt, xprt->address_strings[RPC_DISPLAY_ALL]);
1264

1265
	if (!transport->inet) {
1266
		struct sock *sk = sock->sk;
1267

1268
		write_lock_bh(&sk->sk_callback_lock);
1269

1270
		sk->sk_user_data = xprt;
1271 1272 1273
		transport->old_data_ready = sk->sk_data_ready;
		transport->old_state_change = sk->sk_state_change;
		transport->old_write_space = sk->sk_write_space;
1274
		sk->sk_data_ready = xs_udp_data_ready;
1275
		sk->sk_write_space = xs_udp_write_space;
1276
		sk->sk_no_check = UDP_CSUM_NORCV;
1277
		sk->sk_allocation = GFP_ATOMIC;
1278

1279 1280
		xprt_set_connected(xprt);

1281
		/* Reset to new socket */
1282 1283
		transport->sock = sock;
		transport->inet = sk;
1284

1285 1286
		write_unlock_bh(&sk->sk_callback_lock);
	}
1287
	xs_udp_do_set_buffer_size(xprt);
1288 1289 1290 1291
	status = 0;
out:
	xprt_wake_pending_tasks(xprt, status);
	xprt_clear_connecting(xprt);
1292 1293
}

1294 1295 1296 1297 1298 1299 1300
/*
 * We need to preserve the port number so the reply cache on the server can
 * find our cached RPC replies when we get around to reconnecting.
 */
static void xs_tcp_reuse_connection(struct rpc_xprt *xprt)
{
	int result;
1301
	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
1302 1303
	struct sockaddr any;

1304
	dprintk("RPC:       disconnecting xprt %p to reuse port\n", xprt);
1305 1306 1307 1308 1309 1310 1311

	/*
	 * Disconnect the transport socket by doing a connect operation
	 * with AF_UNSPEC.  This should return immediately...
	 */
	memset(&any, 0, sizeof(any));
	any.sa_family = AF_UNSPEC;
1312
	result = kernel_connect(transport->sock, &any, sizeof(any), 0);
1313
	if (result)
1314
		dprintk("RPC:       AF_UNSPEC connect return code %d\n",
1315 1316 1317
				result);
}

1318
/**
1319
 * xs_tcp_connect_worker - connect a TCP socket to a remote endpoint
1320
 * @work: RPC transport to connect
1321 1322
 *
 * Invoked by a work queue tasklet.
1323
 */
1324
static void xs_tcp_connect_worker(struct work_struct *work)
1325
{
1326 1327
	struct sock_xprt *transport =
		container_of(work, struct sock_xprt, connect_worker.work);
1328
	struct rpc_xprt *xprt = &transport->xprt;
1329
	struct socket *sock = transport->sock;
1330
	int err, status = -EIO;
1331

1332
	if (xprt->shutdown || !xprt_bound(xprt))
1333 1334
		goto out;

1335
	if (!sock) {
1336 1337
		/* start from scratch */
		if ((err = sock_create_kern(PF_INET, SOCK_STREAM, IPPROTO_TCP, &sock)) < 0) {
1338 1339
			dprintk("RPC:       can't create TCP transport "
					"socket (%d).\n", -err);
1340 1341
			goto out;
		}
1342
		xs_reclassify_socket(sock);
1343

1344
		if (xs_bind(transport, sock)) {
1345 1346 1347 1348 1349 1350
			sock_release(sock);
			goto out;
		}
	} else
		/* "close" the socket, preserving the local port */
		xs_tcp_reuse_connection(xprt);
1351

1352
	dprintk("RPC:       worker connecting xprt %p to address: %s\n",
1353
			xprt, xprt->address_strings[RPC_DISPLAY_ALL]);
1354

1355
	if (!transport->inet) {
1356 1357 1358 1359 1360
		struct sock *sk = sock->sk;

		write_lock_bh(&sk->sk_callback_lock);

		sk->sk_user_data = xprt;
1361 1362 1363
		transport->old_data_ready = sk->sk_data_ready;
		transport->old_state_change = sk->sk_state_change;
		transport->old_write_space = sk->sk_write_space;
1364 1365 1366
		sk->sk_data_ready = xs_tcp_data_ready;
		sk->sk_state_change = xs_tcp_state_change;
		sk->sk_write_space = xs_tcp_write_space;
1367
		sk->sk_allocation = GFP_ATOMIC;
1368 1369 1370 1371 1372 1373

		/* socket options */
		sk->sk_userlocks |= SOCK_BINDPORT_LOCK;
		sock_reset_flag(sk, SOCK_LINGER);
		tcp_sk(sk)->linger2 = 0;
		tcp_sk(sk)->nonagle |= TCP_NAGLE_OFF;
1374 1375 1376 1377

		xprt_clear_connected(xprt);

		/* Reset to new socket */
1378 1379
		transport->sock = sock;
		transport->inet = sk;
1380 1381 1382 1383 1384

		write_unlock_bh(&sk->sk_callback_lock);
	}

	/* Tell the socket layer to start connecting... */
1385 1386
	xprt->stat.connect_count++;
	xprt->stat.connect_start = jiffies;
1387
	status = kernel_connect(sock, (struct sockaddr *) &xprt->addr,
1388
			xprt->addrlen, O_NONBLOCK);
1389 1390 1391
	dprintk("RPC:       %p connect status %d connected %d sock state %d\n",
			xprt, -status, xprt_connected(xprt),
			sock->sk->sk_state);
1392 1393 1394 1395 1396
	if (status < 0) {
		switch (status) {
			case -EINPROGRESS:
			case -EALREADY:
				goto out_clear;
1397 1398 1399 1400 1401 1402 1403 1404
			case -ECONNREFUSED:
			case -ECONNRESET:
				/* retry with existing socket, after a delay */
				break;
			default:
				/* get rid of existing socket, and retry */
				xs_close(xprt);
				break;
1405 1406 1407
		}
	}
out:
1408
	xprt_wake_pending_tasks(xprt, status);
1409
out_clear:
1410
	xprt_clear_connecting(xprt);
1411 1412
}

1413 1414 1415 1416 1417
/**
 * xs_connect - connect a socket to a remote endpoint
 * @task: address of RPC task that manages state of connect request
 *
 * TCP: If the remote end dropped the connection, delay reconnecting.
1418 1419 1420 1421 1422 1423 1424
 *
 * UDP socket connects are synchronous, but we use a work queue anyway
 * to guarantee that even unprivileged user processes can set up a
 * socket on a privileged port.
 *
 * If a UDP socket connect fails, the delay behavior here prevents
 * retry floods (hard mounts).
1425 1426
 */
static void xs_connect(struct rpc_task *task)
1427 1428
{
	struct rpc_xprt *xprt = task->tk_xprt;
1429
	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
1430

1431 1432 1433
	if (xprt_test_and_set_connecting(xprt))
		return;

1434
	if (transport->sock != NULL) {
1435 1436
		dprintk("RPC:       xs_connect delayed xprt %p for %lu "
				"seconds\n",
1437
				xprt, xprt->reestablish_timeout / HZ);
1438 1439 1440
		queue_delayed_work(rpciod_workqueue,
				   &transport->connect_worker,
				   xprt->reestablish_timeout);
1441 1442 1443
		xprt->reestablish_timeout <<= 1;
		if (xprt->reestablish_timeout > XS_TCP_MAX_REEST_TO)
			xprt->reestablish_timeout = XS_TCP_MAX_REEST_TO;
1444
	} else {
1445
		dprintk("RPC:       xs_connect scheduled xprt %p\n", xprt);
1446 1447
		queue_delayed_work(rpciod_workqueue,
				   &transport->connect_worker, 0);
1448 1449 1450
	}
}

1451 1452 1453 1454 1455 1456 1457 1458
/**
 * xs_udp_print_stats - display UDP socket-specifc stats
 * @xprt: rpc_xprt struct containing statistics
 * @seq: output file
 *
 */
static void xs_udp_print_stats(struct rpc_xprt *xprt, struct seq_file *seq)
{
1459 1460
	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);

1461
	seq_printf(seq, "\txprt:\tudp %u %lu %lu %lu %lu %Lu %Lu\n",
1462
			transport->port,
1463 1464 1465 1466 1467 1468 1469 1470 1471 1472 1473 1474 1475 1476 1477 1478
			xprt->stat.bind_count,
			xprt->stat.sends,
			xprt->stat.recvs,
			xprt->stat.bad_xids,
			xprt->stat.req_u,
			xprt->stat.bklog_u);
}

/**
 * xs_tcp_print_stats - display TCP socket-specifc stats
 * @xprt: rpc_xprt struct containing statistics
 * @seq: output file
 *
 */
static void xs_tcp_print_stats(struct rpc_xprt *xprt, struct seq_file *seq)
{
1479
	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
1480 1481 1482 1483 1484 1485
	long idle_time = 0;

	if (xprt_connected(xprt))
		idle_time = (long)(jiffies - xprt->last_used) / HZ;

	seq_printf(seq, "\txprt:\ttcp %u %lu %lu %lu %ld %lu %lu %lu %Lu %Lu\n",
1486
			transport->port,
1487 1488 1489 1490 1491 1492 1493 1494 1495 1496 1497
			xprt->stat.bind_count,
			xprt->stat.connect_count,
			xprt->stat.connect_time,
			idle_time,
			xprt->stat.sends,
			xprt->stat.recvs,
			xprt->stat.bad_xids,
			xprt->stat.req_u,
			xprt->stat.bklog_u);
}

1498
static struct rpc_xprt_ops xs_udp_ops = {
1499
	.set_buffer_size	= xs_udp_set_buffer_size,
1500
	.reserve_xprt		= xprt_reserve_xprt_cong,
1501
	.release_xprt		= xprt_release_xprt_cong,
1502
	.rpcbind		= rpcb_getport_async,
1503
	.set_port		= xs_set_port,
1504
	.connect		= xs_connect,
1505 1506
	.buf_alloc		= rpc_malloc,
	.buf_free		= rpc_free,
1507
	.send_request		= xs_udp_send_request,
1508
	.set_retrans_timeout	= xprt_set_retrans_timeout_rtt,
1509
	.timer			= xs_udp_timer,
1510
	.release_request	= xprt_release_rqst_cong,
1511 1512
	.close			= xs_close,
	.destroy		= xs_destroy,
1513
	.print_stats		= xs_udp_print_stats,
1514 1515 1516
};

static struct rpc_xprt_ops xs_tcp_ops = {
1517
	.reserve_xprt		= xprt_reserve_xprt,
1518
	.release_xprt		= xs_tcp_release_xprt,
1519
	.rpcbind		= rpcb_getport_async,
1520
	.set_port		= xs_set_port,
1521
	.connect		= xs_connect,
1522 1523
	.buf_alloc		= rpc_malloc,
	.buf_free		= rpc_free,
1524
	.send_request		= xs_tcp_send_request,
1525
	.set_retrans_timeout	= xprt_set_retrans_timeout_def,
1526 1527
	.close			= xs_close,
	.destroy		= xs_destroy,
1528
	.print_stats		= xs_tcp_print_stats,
1529 1530
};

1531
static struct rpc_xprt *xs_setup_xprt(struct rpc_xprtsock_create *args, unsigned int slot_table_size)
1532 1533
{
	struct rpc_xprt *xprt;
1534
	struct sock_xprt *new;
1535

1536
	if (args->addrlen > sizeof(xprt->addr)) {
1537
		dprintk("RPC:       xs_setup_xprt: address too large\n");
1538 1539 1540
		return ERR_PTR(-EBADF);
	}

1541 1542
	new = kzalloc(sizeof(*new), GFP_KERNEL);
	if (new == NULL) {
1543 1544
		dprintk("RPC:       xs_setup_xprt: couldn't allocate "
				"rpc_xprt\n");
1545 1546
		return ERR_PTR(-ENOMEM);
	}
1547
	xprt = &new->xprt;
1548 1549 1550 1551 1552

	xprt->max_reqs = slot_table_size;
	xprt->slot = kcalloc(xprt->max_reqs, sizeof(struct rpc_rqst), GFP_KERNEL);
	if (xprt->slot == NULL) {
		kfree(xprt);
1553 1554
		dprintk("RPC:       xs_setup_xprt: couldn't allocate slot "
				"table\n");
1555 1556 1557
		return ERR_PTR(-ENOMEM);
	}

1558 1559
	memcpy(&xprt->addr, args->dstaddr, args->addrlen);
	xprt->addrlen = args->addrlen;
1560 1561
	if (args->srcaddr)
		memcpy(&new->addr, args->srcaddr, args->addrlen);
1562
	new->port = xs_get_random_port();
1563 1564 1565 1566

	return xprt;
}

1567 1568
/**
 * xs_setup_udp - Set up transport to use a UDP socket
1569
 * @args: rpc transport creation arguments
1570 1571
 *
 */
1572
struct rpc_xprt *xs_setup_udp(struct rpc_xprtsock_create *args)
1573
{
1574
	struct rpc_xprt *xprt;
1575
	struct sock_xprt *transport;
1576

1577
	xprt = xs_setup_xprt(args, xprt_udp_slot_table_entries);
1578 1579
	if (IS_ERR(xprt))
		return xprt;
1580
	transport = container_of(xprt, struct sock_xprt, xprt);
1581

1582
	if (ntohs(((struct sockaddr_in *)args->dstaddr)->sin_port) != 0)
1583 1584 1585
		xprt_set_bound(xprt);

	xprt->prot = IPPROTO_UDP;
1586
	xprt->tsh_size = 0;
1587 1588 1589
	/* XXX: header size can vary due to auth type, IPv6, etc. */
	xprt->max_payload = (1U << 16) - (MAX_HEADER << 3);

1590
	INIT_DELAYED_WORK(&transport->connect_worker, xs_udp_connect_worker);
1591 1592 1593 1594
	xprt->bind_timeout = XS_BIND_TO;
	xprt->connect_timeout = XS_UDP_CONN_TO;
	xprt->reestablish_timeout = XS_UDP_REEST_TO;
	xprt->idle_timeout = XS_IDLE_DISC_TO;
1595

1596
	xprt->ops = &xs_udp_ops;
1597

1598 1599
	if (args->timeout)
		xprt->timeout = *args->timeout;
1600
	else
1601
		xprt_set_timeout(&xprt->timeout, 5, 5 * HZ);
1602

1603
	xs_format_ipv4_peer_addresses(xprt);
1604
	dprintk("RPC:       set up transport to address %s\n",
1605
			xprt->address_strings[RPC_DISPLAY_ALL]);
1606

1607
	return xprt;
1608 1609
}

1610 1611
/**
 * xs_setup_tcp - Set up transport to use a TCP socket
1612
 * @args: rpc transport creation arguments
1613 1614
 *
 */
1615
struct rpc_xprt *xs_setup_tcp(struct rpc_xprtsock_create *args)
1616
{
1617
	struct rpc_xprt *xprt;
1618
	struct sock_xprt *transport;
1619

1620
	xprt = xs_setup_xprt(args, xprt_tcp_slot_table_entries);
1621 1622
	if (IS_ERR(xprt))
		return xprt;
1623
	transport = container_of(xprt, struct sock_xprt, xprt);
1624

1625
	if (ntohs(((struct sockaddr_in *)args->dstaddr)->sin_port) != 0)
1626 1627 1628
		xprt_set_bound(xprt);

	xprt->prot = IPPROTO_TCP;
1629 1630
	xprt->tsh_size = sizeof(rpc_fraghdr) / sizeof(u32);
	xprt->max_payload = RPC_MAX_FRAGMENT_SIZE;
1631

1632
	INIT_DELAYED_WORK(&transport->connect_worker, xs_tcp_connect_worker);
1633 1634 1635 1636
	xprt->bind_timeout = XS_BIND_TO;
	xprt->connect_timeout = XS_TCP_CONN_TO;
	xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO;
	xprt->idle_timeout = XS_IDLE_DISC_TO;
1637

1638
	xprt->ops = &xs_tcp_ops;
1639

1640 1641
	if (args->timeout)
		xprt->timeout = *args->timeout;
1642
	else
1643
		xprt_set_timeout(&xprt->timeout, 2, 60 * HZ);
1644

1645
	xs_format_ipv4_peer_addresses(xprt);
1646
	dprintk("RPC:       set up transport to address %s\n",
1647
			xprt->address_strings[RPC_DISPLAY_ALL]);
1648

1649
	return xprt;
1650
}
1651 1652

/**
1653
 * init_socket_xprt - set up xprtsock's sysctls
1654 1655 1656 1657
 *
 */
int init_socket_xprt(void)
{
1658
#ifdef RPC_DEBUG
1659
	if (!sunrpc_table_header)
1660
		sunrpc_table_header = register_sysctl_table(sunrpc_table);
1661 1662
#endif

1663 1664 1665 1666
	return 0;
}

/**
1667
 * cleanup_socket_xprt - remove xprtsock's sysctls
1668 1669 1670 1671
 *
 */
void cleanup_socket_xprt(void)
{
1672 1673 1674 1675 1676 1677
#ifdef RPC_DEBUG
	if (sunrpc_table_header) {
		unregister_sysctl_table(sunrpc_table_header);
		sunrpc_table_header = NULL;
	}
#endif
1678
}