xprtsock.c 31.9 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13
/*
 * linux/net/sunrpc/xprtsock.c
 *
 * Client-side transport implementation for sockets.
 *
 * TCP callback races fixes (C) 1998 Red Hat Software <alan@redhat.com>
 * TCP send fixes (C) 1998 Red Hat Software <alan@redhat.com>
 * TCP NFS related read + write fixes
 *  (C) 1999 Dave Airlie, University of Limerick, Ireland <airlied@linux.ie>
 *
 * Rewrite of larges part of the code in order to stabilize TCP stuff.
 * Fix behaviour when socket buffer is full.
 *  (C) 1999 Trond Myklebust <trond.myklebust@fys.uio.no>
14 15
 *
 * IP socket transport implementation, (C) 2005 Chuck Lever <cel@netapp.com>
16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
 */

#include <linux/types.h>
#include <linux/slab.h>
#include <linux/capability.h>
#include <linux/sched.h>
#include <linux/pagemap.h>
#include <linux/errno.h>
#include <linux/socket.h>
#include <linux/in.h>
#include <linux/net.h>
#include <linux/mm.h>
#include <linux/udp.h>
#include <linux/tcp.h>
#include <linux/sunrpc/clnt.h>
#include <linux/file.h>

#include <net/sock.h>
#include <net/checksum.h>
#include <net/udp.h>
#include <net/tcp.h>

38 39 40 41 42 43
/*
 * How many times to try sending a request on a socket before waiting
 * for the socket buffer to clear.
 */
#define XS_SENDMSG_RETRY	(10U)

44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87
/*
 * Time out for an RPC UDP socket connect.  UDP socket connects are
 * synchronous, but we set a timeout anyway in case of resource
 * exhaustion on the local host.
 */
#define XS_UDP_CONN_TO		(5U * HZ)

/*
 * Wait duration for an RPC TCP connection to be established.  Solaris
 * NFS over TCP uses 60 seconds, for example, which is in line with how
 * long a server takes to reboot.
 */
#define XS_TCP_CONN_TO		(60U * HZ)

/*
 * Wait duration for a reply from the RPC portmapper.
 */
#define XS_BIND_TO		(60U * HZ)

/*
 * Delay if a UDP socket connect error occurs.  This is most likely some
 * kind of resource problem on the local host.
 */
#define XS_UDP_REEST_TO		(2U * HZ)

/*
 * The reestablish timeout allows clients to delay for a bit before attempting
 * to reconnect to a server that just dropped our connection.
 *
 * We implement an exponential backoff when trying to reestablish a TCP
 * transport connection with the server.  Some servers like to drop a TCP
 * connection when they are overworked, so we start with a short timeout and
 * increase over time if the server is down or not responding.
 */
#define XS_TCP_INIT_REEST_TO	(3U * HZ)
#define XS_TCP_MAX_REEST_TO	(5U * 60 * HZ)

/*
 * TCP idle timeout; client drops the transport socket if it is idle
 * for this long.  Note that we also timeout UDP sockets to prevent
 * holding port numbers when there is no RPC traffic.
 */
#define XS_IDLE_DISC_TO		(5U * 60 * HZ)

88 89
#ifdef RPC_DEBUG
# undef  RPC_DEBUG_DATA
90
# define RPCDBG_FACILITY	RPCDBG_TRANS
91 92 93
#endif

#ifdef RPC_DEBUG_DATA
94
static void xs_pktdump(char *msg, u32 *packet, unsigned int count)
95
{
96 97
	u8 *buf = (u8 *) packet;
	int j;
98 99 100 101 102 103 104 105 106 107 108 109 110 111

	dprintk("RPC:      %s\n", msg);
	for (j = 0; j < count && j < 128; j += 4) {
		if (!(j & 31)) {
			if (j)
				dprintk("\n");
			dprintk("0x%04x ", j);
		}
		dprintk("%02x%02x%02x%02x ",
			buf[j], buf[j+1], buf[j+2], buf[j+3]);
	}
	dprintk("\n");
}
#else
112
static inline void xs_pktdump(char *msg, u32 *packet, unsigned int count)
113 114 115 116 117
{
	/* NOP */
}
#endif

118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152
#define XS_SENDMSG_FLAGS	(MSG_DONTWAIT | MSG_NOSIGNAL)

static inline int xs_send_head(struct socket *sock, struct sockaddr *addr, int addrlen, struct xdr_buf *xdr, unsigned int base, unsigned int len)
{
	struct kvec iov = {
		.iov_base	= xdr->head[0].iov_base + base,
		.iov_len	= len - base,
	};
	struct msghdr msg = {
		.msg_name	= addr,
		.msg_namelen	= addrlen,
		.msg_flags	= XS_SENDMSG_FLAGS,
	};

	if (xdr->len > len)
		msg.msg_flags |= MSG_MORE;

	if (likely(iov.iov_len))
		return kernel_sendmsg(sock, &msg, &iov, 1, iov.iov_len);
	return kernel_sendmsg(sock, &msg, NULL, 0, 0);
}

static int xs_send_tail(struct socket *sock, struct xdr_buf *xdr, unsigned int base, unsigned int len)
{
	struct kvec iov = {
		.iov_base	= xdr->tail[0].iov_base + base,
		.iov_len	= len - base,
	};
	struct msghdr msg = {
		.msg_flags	= XS_SENDMSG_FLAGS,
	};

	return kernel_sendmsg(sock, &msg, &iov, 1, iov.iov_len);
}

153 154 155 156 157 158 159 160
/**
 * xs_sendpages - write pages directly to a socket
 * @sock: socket to send on
 * @addr: UDP only -- address of destination
 * @addrlen: UDP only -- length of destination address
 * @xdr: buffer containing this request
 * @base: starting position in the buffer
 *
161
 */
162
static inline int xs_sendpages(struct socket *sock, struct sockaddr *addr, int addrlen, struct xdr_buf *xdr, unsigned int base)
163 164 165 166 167 168
{
	struct page **ppage = xdr->pages;
	unsigned int len, pglen = xdr->page_len;
	int err, ret = 0;
	ssize_t (*sendpage)(struct socket *, struct page *, int, size_t, int);

169 170 171 172 173
	if (unlikely(!sock))
		return -ENOTCONN;

	clear_bit(SOCK_ASYNC_NOSPACE, &sock->flags);

174 175
	len = xdr->head[0].iov_len;
	if (base < len || (addr != NULL && base == 0)) {
176
		err = xs_send_head(sock, addr, addrlen, xdr, base, len);
177 178 179 180
		if (ret == 0)
			ret = err;
		else if (err > 0)
			ret += err;
181
		if (err != (len - base))
182 183 184 185 186
			goto out;
		base = 0;
	} else
		base -= len;

187
	if (unlikely(pglen == 0))
188
		goto copy_tail;
189
	if (unlikely(base >= pglen)) {
190 191 192 193 194
		base -= pglen;
		goto copy_tail;
	}
	if (base || xdr->page_base) {
		pglen -= base;
195
		base += xdr->page_base;
196 197 198 199 200 201
		ppage += base >> PAGE_CACHE_SHIFT;
		base &= ~PAGE_CACHE_MASK;
	}

	sendpage = sock->ops->sendpage ? : sock_no_sendpage;
	do {
202
		int flags = XS_SENDMSG_FLAGS;
203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228

		len = PAGE_CACHE_SIZE;
		if (base)
			len -= base;
		if (pglen < len)
			len = pglen;

		if (pglen != len || xdr->tail[0].iov_len != 0)
			flags |= MSG_MORE;

		/* Hmm... We might be dealing with highmem pages */
		if (PageHighMem(*ppage))
			sendpage = sock_no_sendpage;
		err = sendpage(sock, *ppage, base, len, flags);
		if (ret == 0)
			ret = err;
		else if (err > 0)
			ret += err;
		if (err != len)
			goto out;
		base = 0;
		ppage++;
	} while ((pglen -= len) != 0);
copy_tail:
	len = xdr->tail[0].iov_len;
	if (base < len) {
229
		err = xs_send_tail(sock, xdr, base, len);
230 231 232 233 234 235 236 237 238
		if (ret == 0)
			ret = err;
		else if (err > 0)
			ret += err;
	}
out:
	return ret;
}

239
/**
240 241
 * xs_nospace - place task on wait queue if transmit was incomplete
 * @task: task to put to sleep
242
 *
243
 */
244
static void xs_nospace(struct rpc_task *task)
245
{
246 247
	struct rpc_rqst *req = task->tk_rqstp;
	struct rpc_xprt *xprt = req->rq_xprt;
248

249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285
	dprintk("RPC: %4d xmit incomplete (%u left of %u)\n",
			task->tk_pid, req->rq_slen - req->rq_bytes_sent,
			req->rq_slen);

	if (test_bit(SOCK_ASYNC_NOSPACE, &xprt->sock->flags)) {
		/* Protect against races with write_space */
		spin_lock_bh(&xprt->transport_lock);

		/* Don't race with disconnect */
		if (!xprt_connected(xprt))
			task->tk_status = -ENOTCONN;
		else if (test_bit(SOCK_NOSPACE, &xprt->sock->flags))
			xprt_wait_for_buffer_space(task);

		spin_unlock_bh(&xprt->transport_lock);
	} else
		/* Keep holding the socket if it is blocked */
		rpc_delay(task, HZ>>4);
}

/**
 * xs_udp_send_request - write an RPC request to a UDP socket
 * @task: address of RPC task that manages the state of an RPC request
 *
 * Return values:
 *        0:	The request has been sent
 *   EAGAIN:	The socket was blocked, please call again later to
 *		complete the request
 * ENOTCONN:	Caller needs to invoke connect logic then call again
 *    other:	Some other error occured, the request was not sent
 */
static int xs_udp_send_request(struct rpc_task *task)
{
	struct rpc_rqst *req = task->tk_rqstp;
	struct rpc_xprt *xprt = req->rq_xprt;
	struct xdr_buf *xdr = &req->rq_snd_buf;
	int status;
286

287
	xs_pktdump("packet data:",
288 289 290
				req->rq_svec->iov_base,
				req->rq_svec->iov_len);

291 292 293
	req->rq_xtime = jiffies;
	status = xs_sendpages(xprt->sock, (struct sockaddr *) &xprt->addr,
				sizeof(xprt->addr), xdr, req->rq_bytes_sent);
294

295 296
	dprintk("RPC:      xs_udp_send_request(%u) = %d\n",
			xdr->len - req->rq_bytes_sent, status);
297

298 299
	if (likely(status >= (int) req->rq_slen))
		return 0;
300

301 302 303
	/* Still some bytes left; set up for a retry later. */
	if (status > 0)
		status = -EAGAIN;
304

305 306 307
	switch (status) {
	case -ENETUNREACH:
	case -EPIPE:
308 309
	case -ECONNREFUSED:
		/* When the server has died, an ICMP port unreachable message
310
		 * prompts ECONNREFUSED. */
311
		break;
312 313
	case -EAGAIN:
		xs_nospace(task);
314 315
		break;
	default:
316 317
		dprintk("RPC:      sendmsg returned unrecognized error %d\n",
			-status);
318
		break;
319
	}
320 321

	return status;
322 323
}

324 325 326 327 328 329 330
static inline void xs_encode_tcp_record_marker(struct xdr_buf *buf)
{
	u32 reclen = buf->len - sizeof(rpc_fraghdr);
	rpc_fraghdr *base = buf->head[0].iov_base;
	*base = htonl(RPC_LAST_STREAM_FRAGMENT | reclen);
}

331
/**
332
 * xs_tcp_send_request - write an RPC request to a TCP socket
333 334 335
 * @task: address of RPC task that manages the state of an RPC request
 *
 * Return values:
336 337 338 339 340
 *        0:	The request has been sent
 *   EAGAIN:	The socket was blocked, please call again later to
 *		complete the request
 * ENOTCONN:	Caller needs to invoke connect logic then call again
 *    other:	Some other error occured, the request was not sent
341 342
 *
 * XXX: In the case of soft timeouts, should we eventually give up
343
 *	if sendmsg is not able to make progress?
344
 */
345
static int xs_tcp_send_request(struct rpc_task *task)
346 347 348
{
	struct rpc_rqst *req = task->tk_rqstp;
	struct rpc_xprt *xprt = req->rq_xprt;
349
	struct xdr_buf *xdr = &req->rq_snd_buf;
350 351
	int status, retry = 0;

352
	xs_encode_tcp_record_marker(&req->rq_snd_buf);
353

354 355 356
	xs_pktdump("packet data:",
				req->rq_svec->iov_base,
				req->rq_svec->iov_len);
357 358 359

	/* Continue transmitting the packet/record. We must be careful
	 * to cope with writespace callbacks arriving _after_ we have
360
	 * called sendmsg(). */
361 362
	while (1) {
		req->rq_xtime = jiffies;
363 364
		status = xs_sendpages(xprt->sock, NULL, 0, xdr,
						req->rq_bytes_sent);
365

366 367
		dprintk("RPC:      xs_tcp_send_request(%u) = %d\n",
				xdr->len - req->rq_bytes_sent, status);
368

369
		if (unlikely(status < 0))
370 371
			break;

372 373 374 375 376 377 378
		/* If we've sent the entire packet, immediately
		 * reset the count of bytes sent. */
		req->rq_bytes_sent += status;
		if (likely(req->rq_bytes_sent >= req->rq_slen)) {
			req->rq_bytes_sent = 0;
			return 0;
		}
379 380

		status = -EAGAIN;
381
		if (retry++ > XS_SENDMSG_RETRY)
382 383 384
			break;
	}

385 386 387 388 389 390 391 392 393 394 395 396 397
	switch (status) {
	case -EAGAIN:
		xs_nospace(task);
		break;
	case -ECONNREFUSED:
	case -ECONNRESET:
	case -ENOTCONN:
	case -EPIPE:
		status = -ENOTCONN;
		break;
	default:
		dprintk("RPC:      sendmsg returned unrecognized error %d\n",
			-status);
398
		xprt_disconnect(xprt);
399
		break;
400
	}
401

402 403 404
	return status;
}

405 406 407 408
/**
 * xs_close - close a socket
 * @xprt: transport
 *
409 410
 * This is used when all requests are complete; ie, no DRC state remains
 * on the server we want to save.
411
 */
412
static void xs_close(struct rpc_xprt *xprt)
413
{
414 415
	struct socket *sock = xprt->sock;
	struct sock *sk = xprt->inet;
416 417 418 419

	if (!sk)
		return;

420 421
	dprintk("RPC:      xs_close xprt %p\n", xprt);

422 423 424 425
	write_lock_bh(&sk->sk_callback_lock);
	xprt->inet = NULL;
	xprt->sock = NULL;

426 427
	sk->sk_user_data = NULL;
	sk->sk_data_ready = xprt->old_data_ready;
428
	sk->sk_state_change = xprt->old_state_change;
429
	sk->sk_write_space = xprt->old_write_space;
430 431
	write_unlock_bh(&sk->sk_callback_lock);

432
	sk->sk_no_check = 0;
433 434 435 436

	sock_release(sock);
}

437 438 439 440 441 442
/**
 * xs_destroy - prepare to shutdown a transport
 * @xprt: doomed transport
 *
 */
static void xs_destroy(struct rpc_xprt *xprt)
443
{
444 445
	dprintk("RPC:      xs_destroy xprt %p\n", xprt);

446
	cancel_delayed_work(&xprt->connect_worker);
447 448 449
	flush_scheduled_work();

	xprt_disconnect(xprt);
450
	xs_close(xprt);
451 452 453
	kfree(xprt->slot);
}

454 455 456 457 458 459 460 461 462 463
static inline struct rpc_xprt *xprt_from_sock(struct sock *sk)
{
	return (struct rpc_xprt *) sk->sk_user_data;
}

/**
 * xs_udp_data_ready - "data ready" callback for UDP sockets
 * @sk: socket with data to read
 * @len: how much data to read
 *
464
 */
465
static void xs_udp_data_ready(struct sock *sk, int len)
466
{
467 468
	struct rpc_task *task;
	struct rpc_xprt *xprt;
469
	struct rpc_rqst *rovr;
470
	struct sk_buff *skb;
471 472 473 474
	int err, repsize, copied;
	u32 _xid, *xp;

	read_lock(&sk->sk_callback_lock);
475 476
	dprintk("RPC:      xs_udp_data_ready...\n");
	if (!(xprt = xprt_from_sock(sk)))
477 478 479 480 481 482 483 484 485 486
		goto out;

	if ((skb = skb_recv_datagram(sk, 0, 1, &err)) == NULL)
		goto out;

	if (xprt->shutdown)
		goto dropit;

	repsize = skb->len - sizeof(struct udphdr);
	if (repsize < 4) {
487
		dprintk("RPC:      impossible RPC reply size %d!\n", repsize);
488 489 490 491 492 493 494 495 496 497
		goto dropit;
	}

	/* Copy the XID from the skb... */
	xp = skb_header_pointer(skb, sizeof(struct udphdr),
				sizeof(_xid), &_xid);
	if (xp == NULL)
		goto dropit;

	/* Look up and lock the request corresponding to the given XID */
C
Chuck Lever 已提交
498
	spin_lock(&xprt->transport_lock);
499 500 501 502 503 504 505 506 507 508 509 510 511 512 513
	rovr = xprt_lookup_rqst(xprt, *xp);
	if (!rovr)
		goto out_unlock;
	task = rovr->rq_task;

	if ((copied = rovr->rq_private_buf.buflen) > repsize)
		copied = repsize;

	/* Suck it into the iovec, verify checksum if not done by hw. */
	if (csum_partial_copy_to_xdr(&rovr->rq_private_buf, skb))
		goto out_unlock;

	/* Something worked... */
	dst_confirm(skb->dst);

514 515 516
	xprt_adjust_cwnd(task, copied);
	xprt_update_rtt(task);
	xprt_complete_rqst(task, copied);
517 518

 out_unlock:
C
Chuck Lever 已提交
519
	spin_unlock(&xprt->transport_lock);
520 521 522 523 524 525
 dropit:
	skb_free_datagram(sk, skb);
 out:
	read_unlock(&sk->sk_callback_lock);
}

526
static inline size_t xs_tcp_copy_data(skb_reader_t *desc, void *p, size_t len)
527 528 529 530 531 532 533 534 535 536 537 538 539 540 541
{
	if (len > desc->count)
		len = desc->count;
	if (skb_copy_bits(desc->skb, desc->offset, p, len)) {
		dprintk("RPC:      failed to copy %zu bytes from skb. %zu bytes remain\n",
				len, desc->count);
		return 0;
	}
	desc->offset += len;
	desc->count -= len;
	dprintk("RPC:      copied %zu bytes from skb. %zu bytes remain\n",
			len, desc->count);
	return len;
}

542
static inline void xs_tcp_read_fraghdr(struct rpc_xprt *xprt, skb_reader_t *desc)
543 544 545 546 547 548
{
	size_t len, used;
	char *p;

	p = ((char *) &xprt->tcp_recm) + xprt->tcp_offset;
	len = sizeof(xprt->tcp_recm) - xprt->tcp_offset;
549
	used = xs_tcp_copy_data(desc, p, len);
550 551 552
	xprt->tcp_offset += used;
	if (used != len)
		return;
553

554
	xprt->tcp_reclen = ntohl(xprt->tcp_recm);
555
	if (xprt->tcp_reclen & RPC_LAST_STREAM_FRAGMENT)
556 557 558
		xprt->tcp_flags |= XPRT_LAST_FRAG;
	else
		xprt->tcp_flags &= ~XPRT_LAST_FRAG;
559 560
	xprt->tcp_reclen &= RPC_FRAGMENT_SIZE_MASK;

561 562
	xprt->tcp_flags &= ~XPRT_COPY_RECM;
	xprt->tcp_offset = 0;
563

564
	/* Sanity check of the record length */
565
	if (unlikely(xprt->tcp_reclen < 4)) {
566
		dprintk("RPC:      invalid TCP record fragment length\n");
567
		xprt_disconnect(xprt);
568
		return;
569 570 571 572 573
	}
	dprintk("RPC:      reading TCP record fragment of length %d\n",
			xprt->tcp_reclen);
}

574
static void xs_tcp_check_recm(struct rpc_xprt *xprt)
575 576 577 578 579 580 581 582 583 584 585 586 587 588
{
	dprintk("RPC:      xprt = %p, tcp_copied = %lu, tcp_offset = %u, tcp_reclen = %u, tcp_flags = %lx\n",
			xprt, xprt->tcp_copied, xprt->tcp_offset, xprt->tcp_reclen, xprt->tcp_flags);
	if (xprt->tcp_offset == xprt->tcp_reclen) {
		xprt->tcp_flags |= XPRT_COPY_RECM;
		xprt->tcp_offset = 0;
		if (xprt->tcp_flags & XPRT_LAST_FRAG) {
			xprt->tcp_flags &= ~XPRT_COPY_DATA;
			xprt->tcp_flags |= XPRT_COPY_XID;
			xprt->tcp_copied = 0;
		}
	}
}

589
static inline void xs_tcp_read_xid(struct rpc_xprt *xprt, skb_reader_t *desc)
590 591 592 593 594 595 596
{
	size_t len, used;
	char *p;

	len = sizeof(xprt->tcp_xid) - xprt->tcp_offset;
	dprintk("RPC:      reading XID (%Zu bytes)\n", len);
	p = ((char *) &xprt->tcp_xid) + xprt->tcp_offset;
597
	used = xs_tcp_copy_data(desc, p, len);
598 599 600 601 602 603 604 605
	xprt->tcp_offset += used;
	if (used != len)
		return;
	xprt->tcp_flags &= ~XPRT_COPY_XID;
	xprt->tcp_flags |= XPRT_COPY_DATA;
	xprt->tcp_copied = 4;
	dprintk("RPC:      reading reply for XID %08x\n",
						ntohl(xprt->tcp_xid));
606
	xs_tcp_check_recm(xprt);
607 608
}

609
static inline void xs_tcp_read_request(struct rpc_xprt *xprt, skb_reader_t *desc)
610 611 612 613 614 615 616
{
	struct rpc_rqst *req;
	struct xdr_buf *rcvbuf;
	size_t len;
	ssize_t r;

	/* Find and lock the request corresponding to this xid */
C
Chuck Lever 已提交
617
	spin_lock(&xprt->transport_lock);
618 619 620 621 622
	req = xprt_lookup_rqst(xprt, xprt->tcp_xid);
	if (!req) {
		xprt->tcp_flags &= ~XPRT_COPY_DATA;
		dprintk("RPC:      XID %08x request not found!\n",
				ntohl(xprt->tcp_xid));
C
Chuck Lever 已提交
623
		spin_unlock(&xprt->transport_lock);
624 625 626 627 628 629 630 631 632 633 634 635
		return;
	}

	rcvbuf = &req->rq_private_buf;
	len = desc->count;
	if (len > xprt->tcp_reclen - xprt->tcp_offset) {
		skb_reader_t my_desc;

		len = xprt->tcp_reclen - xprt->tcp_offset;
		memcpy(&my_desc, desc, sizeof(my_desc));
		my_desc.count = len;
		r = xdr_partial_copy_from_skb(rcvbuf, xprt->tcp_copied,
636
					  &my_desc, xs_tcp_copy_data);
637 638 639 640
		desc->count -= r;
		desc->offset += r;
	} else
		r = xdr_partial_copy_from_skb(rcvbuf, xprt->tcp_copied,
641
					  desc, xs_tcp_copy_data);
642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677

	if (r > 0) {
		xprt->tcp_copied += r;
		xprt->tcp_offset += r;
	}
	if (r != len) {
		/* Error when copying to the receive buffer,
		 * usually because we weren't able to allocate
		 * additional buffer pages. All we can do now
		 * is turn off XPRT_COPY_DATA, so the request
		 * will not receive any additional updates,
		 * and time out.
		 * Any remaining data from this record will
		 * be discarded.
		 */
		xprt->tcp_flags &= ~XPRT_COPY_DATA;
		dprintk("RPC:      XID %08x truncated request\n",
				ntohl(xprt->tcp_xid));
		dprintk("RPC:      xprt = %p, tcp_copied = %lu, tcp_offset = %u, tcp_reclen = %u\n",
				xprt, xprt->tcp_copied, xprt->tcp_offset, xprt->tcp_reclen);
		goto out;
	}

	dprintk("RPC:      XID %08x read %Zd bytes\n",
			ntohl(xprt->tcp_xid), r);
	dprintk("RPC:      xprt = %p, tcp_copied = %lu, tcp_offset = %u, tcp_reclen = %u\n",
			xprt, xprt->tcp_copied, xprt->tcp_offset, xprt->tcp_reclen);

	if (xprt->tcp_copied == req->rq_private_buf.buflen)
		xprt->tcp_flags &= ~XPRT_COPY_DATA;
	else if (xprt->tcp_offset == xprt->tcp_reclen) {
		if (xprt->tcp_flags & XPRT_LAST_FRAG)
			xprt->tcp_flags &= ~XPRT_COPY_DATA;
	}

out:
678 679
	if (!(xprt->tcp_flags & XPRT_COPY_DATA))
		xprt_complete_rqst(req->rq_task, xprt->tcp_copied);
C
Chuck Lever 已提交
680
	spin_unlock(&xprt->transport_lock);
681
	xs_tcp_check_recm(xprt);
682 683
}

684
static inline void xs_tcp_read_discard(struct rpc_xprt *xprt, skb_reader_t *desc)
685 686 687 688 689 690 691 692 693 694
{
	size_t len;

	len = xprt->tcp_reclen - xprt->tcp_offset;
	if (len > desc->count)
		len = desc->count;
	desc->count -= len;
	desc->offset += len;
	xprt->tcp_offset += len;
	dprintk("RPC:      discarded %Zu bytes\n", len);
695
	xs_tcp_check_recm(xprt);
696 697
}

698
static int xs_tcp_data_recv(read_descriptor_t *rd_desc, struct sk_buff *skb, unsigned int offset, size_t len)
699 700 701 702 703 704 705
{
	struct rpc_xprt *xprt = rd_desc->arg.data;
	skb_reader_t desc = {
		.skb	= skb,
		.offset	= offset,
		.count	= len,
		.csum	= 0
706
	};
707

708
	dprintk("RPC:      xs_tcp_data_recv started\n");
709 710 711 712
	do {
		/* Read in a new fragment marker if necessary */
		/* Can we ever really expect to get completely empty fragments? */
		if (xprt->tcp_flags & XPRT_COPY_RECM) {
713
			xs_tcp_read_fraghdr(xprt, &desc);
714 715 716 717
			continue;
		}
		/* Read in the xid if necessary */
		if (xprt->tcp_flags & XPRT_COPY_XID) {
718
			xs_tcp_read_xid(xprt, &desc);
719 720 721 722
			continue;
		}
		/* Read in the request data */
		if (xprt->tcp_flags & XPRT_COPY_DATA) {
723
			xs_tcp_read_request(xprt, &desc);
724 725 726
			continue;
		}
		/* Skip over any trailing bytes on short reads */
727
		xs_tcp_read_discard(xprt, &desc);
728
	} while (desc.count);
729
	dprintk("RPC:      xs_tcp_data_recv done\n");
730 731 732
	return len - desc.count;
}

733 734 735 736 737 738 739
/**
 * xs_tcp_data_ready - "data ready" callback for TCP sockets
 * @sk: socket with data to read
 * @bytes: how much data to read
 *
 */
static void xs_tcp_data_ready(struct sock *sk, int bytes)
740 741 742 743 744
{
	struct rpc_xprt *xprt;
	read_descriptor_t rd_desc;

	read_lock(&sk->sk_callback_lock);
745 746
	dprintk("RPC:      xs_tcp_data_ready...\n");
	if (!(xprt = xprt_from_sock(sk)))
747 748 749 750
		goto out;
	if (xprt->shutdown)
		goto out;

751
	/* We use rd_desc to pass struct xprt to xs_tcp_data_recv */
752 753
	rd_desc.arg.data = xprt;
	rd_desc.count = 65536;
754
	tcp_read_sock(sk, &rd_desc, xs_tcp_data_recv);
755 756 757 758
out:
	read_unlock(&sk->sk_callback_lock);
}

759 760 761 762 763 764
/**
 * xs_tcp_state_change - callback to handle TCP socket state changes
 * @sk: socket whose state has changed
 *
 */
static void xs_tcp_state_change(struct sock *sk)
765
{
766
	struct rpc_xprt *xprt;
767 768 769 770

	read_lock(&sk->sk_callback_lock);
	if (!(xprt = xprt_from_sock(sk)))
		goto out;
771
	dprintk("RPC:      xs_tcp_state_change client %p...\n", xprt);
772 773 774 775 776 777 778
	dprintk("RPC:      state %x conn %d dead %d zapped %d\n",
				sk->sk_state, xprt_connected(xprt),
				sock_flag(sk, SOCK_DEAD),
				sock_flag(sk, SOCK_ZAPPED));

	switch (sk->sk_state) {
	case TCP_ESTABLISHED:
C
Chuck Lever 已提交
779
		spin_lock_bh(&xprt->transport_lock);
780 781 782 783 784 785
		if (!xprt_test_and_set_connected(xprt)) {
			/* Reset TCP record info */
			xprt->tcp_offset = 0;
			xprt->tcp_reclen = 0;
			xprt->tcp_copied = 0;
			xprt->tcp_flags = XPRT_COPY_RECM | XPRT_COPY_XID;
786
			xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO;
787
			xprt_wake_pending_tasks(xprt, 0);
788
		}
C
Chuck Lever 已提交
789
		spin_unlock_bh(&xprt->transport_lock);
790 791 792 793 794 795 796 797 798 799 800 801
		break;
	case TCP_SYN_SENT:
	case TCP_SYN_RECV:
		break;
	default:
		xprt_disconnect(xprt);
		break;
	}
 out:
	read_unlock(&sk->sk_callback_lock);
}

802
/**
803 804
 * xs_udp_write_space - callback invoked when socket buffer space
 *                             becomes available
805 806
 * @sk: socket whose state has changed
 *
807 808
 * Called when more output buffer space is available for this socket.
 * We try not to wake our writers until they can make "significant"
809
 * progress, otherwise we'll waste resources thrashing kernel_sendmsg
810 811
 * with a bunch of small requests.
 */
812
static void xs_udp_write_space(struct sock *sk)
813 814 815
{
	read_lock(&sk->sk_callback_lock);

816 817 818 819 820 821
	/* from net/core/sock.c:sock_def_write_space */
	if (sock_writeable(sk)) {
		struct socket *sock;
		struct rpc_xprt *xprt;

		if (unlikely(!(sock = sk->sk_socket)))
822
			goto out;
823 824 825
		if (unlikely(!(xprt = xprt_from_sock(sk))))
			goto out;
		if (unlikely(!test_and_clear_bit(SOCK_NOSPACE, &sock->flags)))
826
			goto out;
827 828

		xprt_write_space(xprt);
829 830
	}

831 832 833
 out:
	read_unlock(&sk->sk_callback_lock);
}
834

835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864
/**
 * xs_tcp_write_space - callback invoked when socket buffer space
 *                             becomes available
 * @sk: socket whose state has changed
 *
 * Called when more output buffer space is available for this socket.
 * We try not to wake our writers until they can make "significant"
 * progress, otherwise we'll waste resources thrashing kernel_sendmsg
 * with a bunch of small requests.
 */
static void xs_tcp_write_space(struct sock *sk)
{
	read_lock(&sk->sk_callback_lock);

	/* from net/core/stream.c:sk_stream_write_space */
	if (sk_stream_wspace(sk) >= sk_stream_min_wspace(sk)) {
		struct socket *sock;
		struct rpc_xprt *xprt;

		if (unlikely(!(sock = sk->sk_socket)))
			goto out;
		if (unlikely(!(xprt = xprt_from_sock(sk))))
			goto out;
		if (unlikely(!test_and_clear_bit(SOCK_NOSPACE, &sock->flags)))
			goto out;

		xprt_write_space(xprt);
	}

 out:
865 866 867
	read_unlock(&sk->sk_callback_lock);
}

868
/**
869
 * xs_udp_set_buffer_size - set send and receive limits
870 871 872 873
 * @xprt: generic transport
 *
 * Set socket send and receive limits based on the
 * sndsize and rcvsize fields in the generic transport
874
 * structure.
875
 */
876
static void xs_udp_set_buffer_size(struct rpc_xprt *xprt)
877 878 879 880 881 882 883 884 885 886 887 888 889 890
{
	struct sock *sk = xprt->inet;

	if (xprt->rcvsize) {
		sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
		sk->sk_rcvbuf = xprt->rcvsize * xprt->max_reqs *  2;
	}
	if (xprt->sndsize) {
		sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
		sk->sk_sndbuf = xprt->sndsize * xprt->max_reqs * 2;
		sk->sk_write_space(sk);
	}
}

891 892 893 894 895 896 897 898 899 900 901
/**
 * xs_tcp_set_buffer_size - set send and receive limits
 * @xprt: generic transport
 *
 * Nothing to do for TCP.
 */
static void xs_tcp_set_buffer_size(struct rpc_xprt *xprt)
{
	return;
}

902 903 904 905 906 907 908 909 910 911 912
/**
 * xs_udp_timer - called when a retransmit timeout occurs on a UDP transport
 * @task: task that timed out
 *
 * Adjust the congestion window after a retransmit timeout has occurred.
 */
static void xs_udp_timer(struct rpc_task *task)
{
	xprt_adjust_cwnd(task, -ETIMEDOUT);
}

913
static int xs_bindresvport(struct rpc_xprt *xprt, struct socket *sock)
914 915 916 917
{
	struct sockaddr_in myaddr = {
		.sin_family = AF_INET,
	};
918 919
	int err;
	unsigned short port = xprt->port;
920 921 922 923 924 925 926

	do {
		myaddr.sin_port = htons(port);
		err = sock->ops->bind(sock, (struct sockaddr *) &myaddr,
						sizeof(myaddr));
		if (err == 0) {
			xprt->port = port;
927 928
			dprintk("RPC:      xs_bindresvport bound to port %u\n",
					port);
929 930
			return 0;
		}
931 932 933 934
		if (port <= xprt_min_resvport)
			port = xprt_max_resvport;
		else
			port--;
935 936
	} while (err == -EADDRINUSE && port != xprt->port);

937
	dprintk("RPC:      can't bind to reserved port (%d).\n", -err);
938 939 940
	return err;
}

941 942 943 944 945 946 947
/**
 * xs_udp_connect_worker - set up a UDP socket
 * @args: RPC transport to connect
 *
 * Invoked by a work queue tasklet.
 */
static void xs_udp_connect_worker(void *args)
948
{
949 950 951
	struct rpc_xprt *xprt = (struct rpc_xprt *) args;
	struct socket *sock = xprt->sock;
	int err, status = -EIO;
952

953 954
	if (xprt->shutdown || xprt->addr.sin_port == 0)
		goto out;
955

956
	dprintk("RPC:      xs_udp_connect_worker for xprt %p\n", xprt);
957

958 959
	/* Start by resetting any existing state */
	xs_close(xprt);
960

961 962 963 964
	if ((err = sock_create_kern(PF_INET, SOCK_DGRAM, IPPROTO_UDP, &sock)) < 0) {
		dprintk("RPC:      can't create UDP transport socket (%d).\n", -err);
		goto out;
	}
965

966 967 968 969
	if (xprt->resvport && xs_bindresvport(xprt, sock) < 0) {
		sock_release(sock);
		goto out;
	}
970

971 972
	if (!xprt->inet) {
		struct sock *sk = sock->sk;
973

974
		write_lock_bh(&sk->sk_callback_lock);
975

976 977 978 979
		sk->sk_user_data = xprt;
		xprt->old_data_ready = sk->sk_data_ready;
		xprt->old_state_change = sk->sk_state_change;
		xprt->old_write_space = sk->sk_write_space;
980
		sk->sk_data_ready = xs_udp_data_ready;
981
		sk->sk_write_space = xs_udp_write_space;
982
		sk->sk_no_check = UDP_CSUM_NORCV;
983

984 985
		xprt_set_connected(xprt);

986 987 988
		/* Reset to new socket */
		xprt->sock = sock;
		xprt->inet = sk;
989

990 991
		write_unlock_bh(&sk->sk_callback_lock);
	}
992
	xs_udp_set_buffer_size(xprt);
993 994 995 996
	status = 0;
out:
	xprt_wake_pending_tasks(xprt, status);
	xprt_clear_connecting(xprt);
997 998
}

999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022
/*
 * We need to preserve the port number so the reply cache on the server can
 * find our cached RPC replies when we get around to reconnecting.
 */
static void xs_tcp_reuse_connection(struct rpc_xprt *xprt)
{
	int result;
	struct socket *sock = xprt->sock;
	struct sockaddr any;

	dprintk("RPC:      disconnecting xprt %p to reuse port\n", xprt);

	/*
	 * Disconnect the transport socket by doing a connect operation
	 * with AF_UNSPEC.  This should return immediately...
	 */
	memset(&any, 0, sizeof(any));
	any.sa_family = AF_UNSPEC;
	result = sock->ops->connect(sock, &any, sizeof(any), 0);
	if (result)
		dprintk("RPC:      AF_UNSPEC connect return code %d\n",
				result);
}

1023
/**
1024
 * xs_tcp_connect_worker - connect a TCP socket to a remote endpoint
1025 1026 1027
 * @args: RPC transport to connect
 *
 * Invoked by a work queue tasklet.
1028
 */
1029
static void xs_tcp_connect_worker(void *args)
1030 1031 1032
{
	struct rpc_xprt *xprt = (struct rpc_xprt *)args;
	struct socket *sock = xprt->sock;
1033
	int err, status = -EIO;
1034 1035 1036 1037

	if (xprt->shutdown || xprt->addr.sin_port == 0)
		goto out;

1038
	dprintk("RPC:      xs_tcp_connect_worker for xprt %p\n", xprt);
1039

1040 1041 1042 1043 1044 1045
	if (!xprt->sock) {
		/* start from scratch */
		if ((err = sock_create_kern(PF_INET, SOCK_STREAM, IPPROTO_TCP, &sock)) < 0) {
			dprintk("RPC:      can't create TCP transport socket (%d).\n", -err);
			goto out;
		}
1046

1047 1048 1049 1050 1051 1052 1053
		if (xprt->resvport && xs_bindresvport(xprt, sock) < 0) {
			sock_release(sock);
			goto out;
		}
	} else
		/* "close" the socket, preserving the local port */
		xs_tcp_reuse_connection(xprt);
1054

1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066
	if (!xprt->inet) {
		struct sock *sk = sock->sk;

		write_lock_bh(&sk->sk_callback_lock);

		sk->sk_user_data = xprt;
		xprt->old_data_ready = sk->sk_data_ready;
		xprt->old_state_change = sk->sk_state_change;
		xprt->old_write_space = sk->sk_write_space;
		sk->sk_data_ready = xs_tcp_data_ready;
		sk->sk_state_change = xs_tcp_state_change;
		sk->sk_write_space = xs_tcp_write_space;
1067 1068 1069 1070 1071 1072

		/* socket options */
		sk->sk_userlocks |= SOCK_BINDPORT_LOCK;
		sock_reset_flag(sk, SOCK_LINGER);
		tcp_sk(sk)->linger2 = 0;
		tcp_sk(sk)->nonagle |= TCP_NAGLE_OFF;
1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083

		xprt_clear_connected(xprt);

		/* Reset to new socket */
		xprt->sock = sock;
		xprt->inet = sk;

		write_unlock_bh(&sk->sk_callback_lock);
	}

	/* Tell the socket layer to start connecting... */
1084 1085 1086 1087 1088 1089 1090 1091 1092
	status = sock->ops->connect(sock, (struct sockaddr *) &xprt->addr,
			sizeof(xprt->addr), O_NONBLOCK);
	dprintk("RPC: %p  connect status %d connected %d sock state %d\n",
			xprt, -status, xprt_connected(xprt), sock->sk->sk_state);
	if (status < 0) {
		switch (status) {
			case -EINPROGRESS:
			case -EALREADY:
				goto out_clear;
1093 1094 1095 1096 1097 1098 1099 1100
			case -ECONNREFUSED:
			case -ECONNRESET:
				/* retry with existing socket, after a delay */
				break;
			default:
				/* get rid of existing socket, and retry */
				xs_close(xprt);
				break;
1101 1102 1103
		}
	}
out:
1104
	xprt_wake_pending_tasks(xprt, status);
1105
out_clear:
1106
	xprt_clear_connecting(xprt);
1107 1108
}

1109 1110 1111 1112 1113
/**
 * xs_connect - connect a socket to a remote endpoint
 * @task: address of RPC task that manages state of connect request
 *
 * TCP: If the remote end dropped the connection, delay reconnecting.
1114 1115 1116 1117 1118 1119 1120
 *
 * UDP socket connects are synchronous, but we use a work queue anyway
 * to guarantee that even unprivileged user processes can set up a
 * socket on a privileged port.
 *
 * If a UDP socket connect fails, the delay behavior here prevents
 * retry floods (hard mounts).
1121 1122
 */
static void xs_connect(struct rpc_task *task)
1123 1124 1125
{
	struct rpc_xprt *xprt = task->tk_xprt;

1126 1127 1128 1129
	if (xprt_test_and_set_connecting(xprt))
		return;

	if (xprt->sock != NULL) {
1130 1131
		dprintk("RPC:      xs_connect delayed xprt %p for %lu seconds\n",
				xprt, xprt->reestablish_timeout / HZ);
1132
		schedule_delayed_work(&xprt->connect_worker,
1133 1134 1135 1136
					xprt->reestablish_timeout);
		xprt->reestablish_timeout <<= 1;
		if (xprt->reestablish_timeout > XS_TCP_MAX_REEST_TO)
			xprt->reestablish_timeout = XS_TCP_MAX_REEST_TO;
1137 1138 1139 1140 1141 1142 1143
	} else {
		dprintk("RPC:      xs_connect scheduled xprt %p\n", xprt);
		schedule_work(&xprt->connect_worker);

		/* flush_scheduled_work can sleep... */
		if (!RPC_IS_ASYNC(task))
			flush_scheduled_work();
1144 1145 1146
	}
}

1147
static struct rpc_xprt_ops xs_udp_ops = {
1148
	.set_buffer_size	= xs_udp_set_buffer_size,
1149
	.reserve_xprt		= xprt_reserve_xprt_cong,
1150
	.release_xprt		= xprt_release_xprt_cong,
1151 1152
	.connect		= xs_connect,
	.send_request		= xs_udp_send_request,
1153
	.set_retrans_timeout	= xprt_set_retrans_timeout_rtt,
1154
	.timer			= xs_udp_timer,
1155
	.release_request	= xprt_release_rqst_cong,
1156 1157 1158 1159 1160
	.close			= xs_close,
	.destroy		= xs_destroy,
};

static struct rpc_xprt_ops xs_tcp_ops = {
1161
	.set_buffer_size	= xs_tcp_set_buffer_size,
1162
	.reserve_xprt		= xprt_reserve_xprt,
1163
	.release_xprt		= xprt_release_xprt,
1164
	.connect		= xs_connect,
1165
	.send_request		= xs_tcp_send_request,
1166
	.set_retrans_timeout	= xprt_set_retrans_timeout_def,
1167 1168
	.close			= xs_close,
	.destroy		= xs_destroy,
1169 1170
};

1171 1172 1173 1174 1175 1176
/**
 * xs_setup_udp - Set up transport to use a UDP socket
 * @xprt: transport to set up
 * @to:   timeout parameters
 *
 */
1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190
int xs_setup_udp(struct rpc_xprt *xprt, struct rpc_timeout *to)
{
	size_t slot_table_size;

	dprintk("RPC:      setting up udp-ipv4 transport...\n");

	xprt->max_reqs = xprt_udp_slot_table_entries;
	slot_table_size = xprt->max_reqs * sizeof(xprt->slot[0]);
	xprt->slot = kmalloc(slot_table_size, GFP_KERNEL);
	if (xprt->slot == NULL)
		return -ENOMEM;
	memset(xprt->slot, 0, slot_table_size);

	xprt->prot = IPPROTO_UDP;
1191
	xprt->port = xprt_max_resvport;
1192
	xprt->tsh_size = 0;
1193 1194 1195 1196
	xprt->resvport = capable(CAP_NET_BIND_SERVICE) ? 1 : 0;
	/* XXX: header size can vary due to auth type, IPv6, etc. */
	xprt->max_payload = (1U << 16) - (MAX_HEADER << 3);

1197
	INIT_WORK(&xprt->connect_worker, xs_udp_connect_worker, xprt);
1198 1199 1200 1201
	xprt->bind_timeout = XS_BIND_TO;
	xprt->connect_timeout = XS_UDP_CONN_TO;
	xprt->reestablish_timeout = XS_UDP_REEST_TO;
	xprt->idle_timeout = XS_IDLE_DISC_TO;
1202

1203
	xprt->ops = &xs_udp_ops;
1204 1205 1206 1207

	if (to)
		xprt->timeout = *to;
	else
1208
		xprt_set_timeout(&xprt->timeout, 5, 5 * HZ);
1209 1210 1211 1212

	return 0;
}

1213 1214 1215 1216 1217 1218
/**
 * xs_setup_tcp - Set up transport to use a TCP socket
 * @xprt: transport to set up
 * @to: timeout parameters
 *
 */
1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232
int xs_setup_tcp(struct rpc_xprt *xprt, struct rpc_timeout *to)
{
	size_t slot_table_size;

	dprintk("RPC:      setting up tcp-ipv4 transport...\n");

	xprt->max_reqs = xprt_tcp_slot_table_entries;
	slot_table_size = xprt->max_reqs * sizeof(xprt->slot[0]);
	xprt->slot = kmalloc(slot_table_size, GFP_KERNEL);
	if (xprt->slot == NULL)
		return -ENOMEM;
	memset(xprt->slot, 0, slot_table_size);

	xprt->prot = IPPROTO_TCP;
1233
	xprt->port = xprt_max_resvport;
1234
	xprt->tsh_size = sizeof(rpc_fraghdr) / sizeof(u32);
1235
	xprt->resvport = capable(CAP_NET_BIND_SERVICE) ? 1 : 0;
1236
	xprt->max_payload = RPC_MAX_FRAGMENT_SIZE;
1237

1238
	INIT_WORK(&xprt->connect_worker, xs_tcp_connect_worker, xprt);
1239 1240 1241 1242
	xprt->bind_timeout = XS_BIND_TO;
	xprt->connect_timeout = XS_TCP_CONN_TO;
	xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO;
	xprt->idle_timeout = XS_IDLE_DISC_TO;
1243

1244
	xprt->ops = &xs_tcp_ops;
1245 1246 1247 1248

	if (to)
		xprt->timeout = *to;
	else
1249
		xprt_set_timeout(&xprt->timeout, 2, 60 * HZ);
1250 1251 1252

	return 0;
}