rpc_rdma.c 31.8 KB
Newer Older
1
/*
2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
 * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
 *
 * This software is available to you under a choice of one of two
 * licenses.  You may choose to be licensed under the terms of the GNU
 * General Public License (GPL) Version 2, available from the file
 * COPYING in the main directory of this source tree, or the BSD-type
 * license below:
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions
 * are met:
 *
 *      Redistributions of source code must retain the above copyright
 *      notice, this list of conditions and the following disclaimer.
 *
 *      Redistributions in binary form must reproduce the above
 *      copyright notice, this list of conditions and the following
 *      disclaimer in the documentation and/or other materials provided
 *      with the distribution.
 *
 *      Neither the name of the Network Appliance, Inc. nor the names of
 *      its contributors may be used to endorse or promote products
 *      derived from this software without specific prior written
 *      permission.
 *
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 */

/*
 * rpc_rdma.c
 *
 * This file contains the guts of the RPC RDMA protocol, and
 * does marshaling/unmarshaling, etc. It is also where interfacing
 * to the Linux RPC framework lives.
46 47 48 49
 */

#include "xprt_rdma.h"

50 51
#include <linux/highmem.h>

J
Jeff Layton 已提交
52
#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
53 54 55
# define RPCDBG_FACILITY	RPCDBG_TRANS
#endif

56 57 58 59 60 61 62 63
enum rpcrdma_chunktype {
	rpcrdma_noch = 0,
	rpcrdma_readch,
	rpcrdma_areadch,
	rpcrdma_writech,
	rpcrdma_replych
};

64
static const char transfertypes[][12] = {
65 66 67 68
	"inline",	/* no chunks */
	"read list",	/* some argument via rdma read */
	"*read list",	/* entire request via rdma read */
	"write list",	/* some result via rdma write */
69 70
	"reply chunk"	/* entire reply via rdma write */
};
C
Chuck Lever 已提交
71 72 73

/* Returns size of largest RPC-over-RDMA header in a Call message
 *
74 75
 * The largest Call header contains a full-size Read list and a
 * minimal Reply chunk.
C
Chuck Lever 已提交
76 77 78 79 80 81 82 83 84 85 86 87
 */
static unsigned int rpcrdma_max_call_header_size(unsigned int maxsegs)
{
	unsigned int size;

	/* Fixed header fields and list discriminators */
	size = RPCRDMA_HDRLEN_MIN;

	/* Maximum Read list size */
	maxsegs += 2;	/* segment for head and tail buffers */
	size = maxsegs * sizeof(struct rpcrdma_read_chunk);

88 89 90 91 92
	/* Minimal Read chunk size */
	size += sizeof(__be32);	/* segment count */
	size += sizeof(struct rpcrdma_segment);
	size += sizeof(__be32);	/* list discriminator */

C
Chuck Lever 已提交
93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129
	dprintk("RPC:       %s: max call header size = %u\n",
		__func__, size);
	return size;
}

/* Returns size of largest RPC-over-RDMA header in a Reply message
 *
 * There is only one Write list or one Reply chunk per Reply
 * message.  The larger list is the Write list.
 */
static unsigned int rpcrdma_max_reply_header_size(unsigned int maxsegs)
{
	unsigned int size;

	/* Fixed header fields and list discriminators */
	size = RPCRDMA_HDRLEN_MIN;

	/* Maximum Write list size */
	maxsegs += 2;	/* segment for head and tail buffers */
	size = sizeof(__be32);		/* segment count */
	size += maxsegs * sizeof(struct rpcrdma_segment);
	size += sizeof(__be32);	/* list discriminator */

	dprintk("RPC:       %s: max reply header size = %u\n",
		__func__, size);
	return size;
}

void rpcrdma_set_max_header_sizes(struct rpcrdma_ia *ia,
				  struct rpcrdma_create_data_internal *cdata,
				  unsigned int maxsegs)
{
	ia->ri_max_inline_write = cdata->inline_wsize -
				  rpcrdma_max_call_header_size(maxsegs);
	ia->ri_max_inline_read = cdata->inline_rsize -
				 rpcrdma_max_reply_header_size(maxsegs);
}
130

131 132 133 134 135
/* The client can send a request inline as long as the RPCRDMA header
 * plus the RPC call fit under the transport's inline limit. If the
 * combined call message size exceeds that limit, the client must use
 * the read chunk list for this operation.
 */
C
Chuck Lever 已提交
136 137
static bool rpcrdma_args_inline(struct rpcrdma_xprt *r_xprt,
				struct rpc_rqst *rqst)
138
{
C
Chuck Lever 已提交
139
	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
140

C
Chuck Lever 已提交
141
	return rqst->rq_snd_buf.len <= ia->ri_max_inline_write;
142 143 144 145 146 147 148 149
}

/* The client can't know how large the actual reply will be. Thus it
 * plans for the largest possible reply for that particular ULP
 * operation. If the maximum combined reply message size exceeds that
 * limit, the client must provide a write list or a reply chunk for
 * this request.
 */
C
Chuck Lever 已提交
150 151
static bool rpcrdma_results_inline(struct rpcrdma_xprt *r_xprt,
				   struct rpc_rqst *rqst)
152
{
C
Chuck Lever 已提交
153
	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
154

C
Chuck Lever 已提交
155
	return rqst->rq_rcv_buf.buflen <= ia->ri_max_inline_read;
156 157
}

158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193
static int
rpcrdma_tail_pullup(struct xdr_buf *buf)
{
	size_t tlen = buf->tail[0].iov_len;
	size_t skip = tlen & 3;

	/* Do not include the tail if it is only an XDR pad */
	if (tlen < 4)
		return 0;

	/* xdr_write_pages() adds a pad at the beginning of the tail
	 * if the content in "buf->pages" is unaligned. Force the
	 * tail's actual content to land at the next XDR position
	 * after the head instead.
	 */
	if (skip) {
		unsigned char *src, *dst;
		unsigned int count;

		src = buf->tail[0].iov_base;
		dst = buf->head[0].iov_base;
		dst += buf->head[0].iov_len;

		src += skip;
		tlen -= skip;

		dprintk("RPC:       %s: skip=%zu, memmove(%p, %p, %zu)\n",
			__func__, skip, dst, src, tlen);

		for (count = tlen; count; count--)
			*dst++ = *src++;
	}

	return tlen;
}

194 195 196 197 198
/* Split "vec" on page boundaries into segments. FMR registers pages,
 * not a byte range. Other modes coalesce these segments into a single
 * MR when they can.
 */
static int
199
rpcrdma_convert_kvec(struct kvec *vec, struct rpcrdma_mr_seg *seg, int n)
200 201 202 203 204 205 206 207
{
	size_t page_offset;
	u32 remaining;
	char *base;

	base = vec->iov_base;
	page_offset = offset_in_page(base);
	remaining = vec->iov_len;
208
	while (remaining && n < RPCRDMA_MAX_SEGS) {
209 210 211 212 213 214 215 216 217 218 219
		seg[n].mr_page = NULL;
		seg[n].mr_offset = base;
		seg[n].mr_len = min_t(u32, PAGE_SIZE - page_offset, remaining);
		remaining -= seg[n].mr_len;
		base += seg[n].mr_len;
		++n;
		page_offset = 0;
	}
	return n;
}

220 221 222 223 224 225
/*
 * Chunk assembly from upper layer xdr_buf.
 *
 * Prepare the passed-in xdr_buf into representation as RPC/RDMA chunk
 * elements. Segments are then coalesced when registered, if possible
 * within the selected memreg mode.
226 227
 *
 * Returns positive number of segments converted, or a negative errno.
228 229 230
 */

static int
231
rpcrdma_convert_iovs(struct xdr_buf *xdrbuf, unsigned int pos,
232
	enum rpcrdma_chunktype type, struct rpcrdma_mr_seg *seg)
233
{
234
	int len, n, p, page_base;
235
	struct page **ppages;
236

237
	n = 0;
238
	if (pos == 0) {
239 240 241
		n = rpcrdma_convert_kvec(&xdrbuf->head[0], seg, n);
		if (n == RPCRDMA_MAX_SEGS)
			goto out_overflow;
242 243
	}

244 245 246 247
	len = xdrbuf->page_len;
	ppages = xdrbuf->pages + (xdrbuf->page_base >> PAGE_SHIFT);
	page_base = xdrbuf->page_base & ~PAGE_MASK;
	p = 0;
248
	while (len && n < RPCRDMA_MAX_SEGS) {
S
Shirley Ma 已提交
249 250 251 252
		if (!ppages[p]) {
			/* alloc the pagelist for receiving buffer */
			ppages[p] = alloc_page(GFP_ATOMIC);
			if (!ppages[p])
253
				return -EAGAIN;
S
Shirley Ma 已提交
254
		}
255 256 257
		seg[n].mr_page = ppages[p];
		seg[n].mr_offset = (void *)(unsigned long) page_base;
		seg[n].mr_len = min_t(u32, PAGE_SIZE - page_base, len);
258
		if (seg[n].mr_len > PAGE_SIZE)
259
			goto out_overflow;
260
		len -= seg[n].mr_len;
261
		++n;
262 263
		++p;
		page_base = 0;	/* page offset only applies to first page */
264 265
	}

266
	/* Message overflows the seg array */
267 268
	if (len && n == RPCRDMA_MAX_SEGS)
		goto out_overflow;
269

270 271 272 273
	/* When encoding the read list, the tail is always sent inline */
	if (type == rpcrdma_readch)
		return n;

274
	if (xdrbuf->tail[0].iov_len) {
275 276 277 278
		/* the rpcrdma protocol allows us to omit any trailing
		 * xdr pad bytes, saving the server an RDMA operation. */
		if (xdrbuf->tail[0].iov_len < 4 && xprt_rdma_pad_optimize)
			return n;
279 280 281
		n = rpcrdma_convert_kvec(&xdrbuf->tail[0], seg, n);
		if (n == RPCRDMA_MAX_SEGS)
			goto out_overflow;
282 283 284
	}

	return n;
285 286 287 288

out_overflow:
	pr_err("rpcrdma: segment array overflow\n");
	return -EIO;
289 290
}

291
static inline __be32 *
292
xdr_encode_rdma_segment(__be32 *iptr, struct rpcrdma_mw *mw)
293
{
294 295 296
	*iptr++ = cpu_to_be32(mw->mw_handle);
	*iptr++ = cpu_to_be32(mw->mw_length);
	return xdr_encode_hyper(iptr, mw->mw_offset);
297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315
}

/* XDR-encode the Read list. Supports encoding a list of read
 * segments that belong to a single read chunk.
 *
 * Encoding key for single-list chunks (HLOO = Handle32 Length32 Offset64):
 *
 *  Read chunklist (a linked list):
 *   N elements, position P (same P for all chunks of same arg!):
 *    1 - PHLOO - 1 - PHLOO - ... - 1 - PHLOO - 0
 *
 * Returns a pointer to the XDR word in the RDMA header following
 * the end of the Read list, or an error pointer.
 */
static __be32 *
rpcrdma_encode_read_list(struct rpcrdma_xprt *r_xprt,
			 struct rpcrdma_req *req, struct rpc_rqst *rqst,
			 __be32 *iptr, enum rpcrdma_chunktype rtype)
{
316
	struct rpcrdma_mr_seg *seg;
317
	struct rpcrdma_mw *mw;
318 319 320 321 322 323 324 325 326 327 328
	unsigned int pos;
	int n, nsegs;

	if (rtype == rpcrdma_noch) {
		*iptr++ = xdr_zero;	/* item not present */
		return iptr;
	}

	pos = rqst->rq_snd_buf.head[0].iov_len;
	if (rtype == rpcrdma_areadch)
		pos = 0;
329 330
	seg = req->rl_segments;
	nsegs = rpcrdma_convert_iovs(&rqst->rq_snd_buf, pos, rtype, seg);
331 332 333 334
	if (nsegs < 0)
		return ERR_PTR(nsegs);

	do {
335 336
		n = r_xprt->rx_ia.ri_ops->ro_map(r_xprt, seg, nsegs,
						 false, &mw);
337
		if (n < 0)
338
			return ERR_PTR(n);
339
		list_add(&mw->mw_list, &req->rl_registered);
340 341 342 343 344 345 346

		*iptr++ = xdr_one;	/* item present */

		/* All read segments in this chunk
		 * have the same "position".
		 */
		*iptr++ = cpu_to_be32(pos);
347
		iptr = xdr_encode_rdma_segment(iptr, mw);
348

349
		dprintk("RPC: %5u %s: pos %u %u@0x%016llx:0x%08x (%s)\n",
350
			rqst->rq_task->tk_pid, __func__, pos,
351 352
			mw->mw_length, (unsigned long long)mw->mw_offset,
			mw->mw_handle, n < nsegs ? "more" : "last");
353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380

		r_xprt->rx_stats.read_chunk_count++;
		seg += n;
		nsegs -= n;
	} while (nsegs);

	/* Finish Read list */
	*iptr++ = xdr_zero;	/* Next item not present */
	return iptr;
}

/* XDR-encode the Write list. Supports encoding a list containing
 * one array of plain segments that belong to a single write chunk.
 *
 * Encoding key for single-list chunks (HLOO = Handle32 Length32 Offset64):
 *
 *  Write chunklist (a list of (one) counted array):
 *   N elements:
 *    1 - N - HLOO - HLOO - ... - HLOO - 0
 *
 * Returns a pointer to the XDR word in the RDMA header following
 * the end of the Write list, or an error pointer.
 */
static __be32 *
rpcrdma_encode_write_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
			  struct rpc_rqst *rqst, __be32 *iptr,
			  enum rpcrdma_chunktype wtype)
{
381
	struct rpcrdma_mr_seg *seg;
382
	struct rpcrdma_mw *mw;
383 384 385 386 387 388 389 390
	int n, nsegs, nchunks;
	__be32 *segcount;

	if (wtype != rpcrdma_writech) {
		*iptr++ = xdr_zero;	/* no Write list present */
		return iptr;
	}

391
	seg = req->rl_segments;
392 393
	nsegs = rpcrdma_convert_iovs(&rqst->rq_rcv_buf,
				     rqst->rq_rcv_buf.head[0].iov_len,
394
				     wtype, seg);
395 396 397 398 399 400 401 402
	if (nsegs < 0)
		return ERR_PTR(nsegs);

	*iptr++ = xdr_one;	/* Write list present */
	segcount = iptr++;	/* save location of segment count */

	nchunks = 0;
	do {
403 404
		n = r_xprt->rx_ia.ri_ops->ro_map(r_xprt, seg, nsegs,
						 true, &mw);
405
		if (n < 0)
406
			return ERR_PTR(n);
407
		list_add(&mw->mw_list, &req->rl_registered);
408

409
		iptr = xdr_encode_rdma_segment(iptr, mw);
410

411
		dprintk("RPC: %5u %s: %u@0x016%llx:0x%08x (%s)\n",
412
			rqst->rq_task->tk_pid, __func__,
413 414
			mw->mw_length, (unsigned long long)mw->mw_offset,
			mw->mw_handle, n < nsegs ? "more" : "last");
415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447

		r_xprt->rx_stats.write_chunk_count++;
		r_xprt->rx_stats.total_rdma_request += seg->mr_len;
		nchunks++;
		seg   += n;
		nsegs -= n;
	} while (nsegs);

	/* Update count of segments in this Write chunk */
	*segcount = cpu_to_be32(nchunks);

	/* Finish Write list */
	*iptr++ = xdr_zero;	/* Next item not present */
	return iptr;
}

/* XDR-encode the Reply chunk. Supports encoding an array of plain
 * segments that belong to a single write (reply) chunk.
 *
 * Encoding key for single-list chunks (HLOO = Handle32 Length32 Offset64):
 *
 *  Reply chunk (a counted array):
 *   N elements:
 *    1 - N - HLOO - HLOO - ... - HLOO
 *
 * Returns a pointer to the XDR word in the RDMA header following
 * the end of the Reply chunk, or an error pointer.
 */
static __be32 *
rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt,
			   struct rpcrdma_req *req, struct rpc_rqst *rqst,
			   __be32 *iptr, enum rpcrdma_chunktype wtype)
{
448
	struct rpcrdma_mr_seg *seg;
449
	struct rpcrdma_mw *mw;
450 451 452 453 454 455 456 457
	int n, nsegs, nchunks;
	__be32 *segcount;

	if (wtype != rpcrdma_replych) {
		*iptr++ = xdr_zero;	/* no Reply chunk present */
		return iptr;
	}

458 459
	seg = req->rl_segments;
	nsegs = rpcrdma_convert_iovs(&rqst->rq_rcv_buf, 0, wtype, seg);
460 461 462 463 464 465 466 467
	if (nsegs < 0)
		return ERR_PTR(nsegs);

	*iptr++ = xdr_one;	/* Reply chunk present */
	segcount = iptr++;	/* save location of segment count */

	nchunks = 0;
	do {
468 469
		n = r_xprt->rx_ia.ri_ops->ro_map(r_xprt, seg, nsegs,
						 true, &mw);
470
		if (n < 0)
471
			return ERR_PTR(n);
472
		list_add(&mw->mw_list, &req->rl_registered);
473

474
		iptr = xdr_encode_rdma_segment(iptr, mw);
475

476
		dprintk("RPC: %5u %s: %u@0x%016llx:0x%08x (%s)\n",
477
			rqst->rq_task->tk_pid, __func__,
478 479
			mw->mw_length, (unsigned long long)mw->mw_offset,
			mw->mw_handle, n < nsegs ? "more" : "last");
480 481 482 483 484 485 486 487 488 489 490 491 492 493

		r_xprt->rx_stats.reply_chunk_count++;
		r_xprt->rx_stats.total_rdma_request += seg->mr_len;
		nchunks++;
		seg   += n;
		nsegs -= n;
	} while (nsegs);

	/* Update count of segments in the Reply chunk */
	*segcount = cpu_to_be32(nchunks);

	return iptr;
}

494 495 496 497 498 499 500
/*
 * Copy write data inline.
 * This function is used for "small" requests. Data which is passed
 * to RPC via iovecs (or page list) is copied directly into the
 * pre-registered memory buffer for this request. For small amounts
 * of data, this is efficient. The cutoff value is tunable.
 */
501
static void rpcrdma_inline_pullup(struct rpc_rqst *rqst)
502 503 504 505 506
{
	int i, npages, curlen;
	int copy_len;
	unsigned char *srcp, *destp;
	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_xprt);
507 508
	int page_base;
	struct page **ppages;
509 510 511 512 513

	destp = rqst->rq_svec[0].iov_base;
	curlen = rqst->rq_svec[0].iov_len;
	destp += curlen;

514 515
	dprintk("RPC:       %s: destp 0x%p len %d hdrlen %d\n",
		__func__, destp, rqst->rq_slen, curlen);
516 517

	copy_len = rqst->rq_snd_buf.page_len;
518 519 520 521 522 523 524 525 526 527 528 529

	if (rqst->rq_snd_buf.tail[0].iov_len) {
		curlen = rqst->rq_snd_buf.tail[0].iov_len;
		if (destp + copy_len != rqst->rq_snd_buf.tail[0].iov_base) {
			memmove(destp + copy_len,
				rqst->rq_snd_buf.tail[0].iov_base, curlen);
			r_xprt->rx_stats.pullup_copy_count += curlen;
		}
		dprintk("RPC:       %s: tail destp 0x%p len %d\n",
			__func__, destp + copy_len, curlen);
		rqst->rq_svec[0].iov_len += curlen;
	}
530
	r_xprt->rx_stats.pullup_copy_count += copy_len;
531 532 533 534 535

	page_base = rqst->rq_snd_buf.page_base;
	ppages = rqst->rq_snd_buf.pages + (page_base >> PAGE_SHIFT);
	page_base &= ~PAGE_MASK;
	npages = PAGE_ALIGN(page_base+copy_len) >> PAGE_SHIFT;
536
	for (i = 0; copy_len && i < npages; i++) {
537
		curlen = PAGE_SIZE - page_base;
538 539 540 541
		if (curlen > copy_len)
			curlen = copy_len;
		dprintk("RPC:       %s: page %d destp 0x%p len %d curlen %d\n",
			__func__, i, destp, copy_len, curlen);
542
		srcp = kmap_atomic(ppages[i]);
543
		memcpy(destp, srcp+page_base, curlen);
544
		kunmap_atomic(srcp);
545 546 547
		rqst->rq_svec[0].iov_len += curlen;
		destp += curlen;
		copy_len -= curlen;
548
		page_base = 0;
549 550 551 552 553 554 555 556
	}
	/* header now contains entire send message */
}

/*
 * Marshal a request: the primary job of this routine is to choose
 * the transfer modes. See comments below.
 *
557 558 559 560
 * Prepares up to two IOVs per Call message:
 *
 *  [0] -- RPC RDMA header
 *  [1] -- the RPC header/data
561 562
 *
 * Returns zero on success, otherwise a negative errno.
563 564 565 566 567
 */

int
rpcrdma_marshal_req(struct rpc_rqst *rqst)
{
568
	struct rpc_xprt *xprt = rqst->rq_xprt;
569 570
	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
	struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
571
	enum rpcrdma_chunktype rtype, wtype;
572
	struct rpcrdma_msg *headerp;
573 574 575
	ssize_t hdrlen;
	size_t rpclen;
	__be32 *iptr;
576

577 578 579 580 581
#if defined(CONFIG_SUNRPC_BACKCHANNEL)
	if (test_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state))
		return rpcrdma_bc_marshal_reply(rqst);
#endif

582
	headerp = rdmab_to_msg(req->rl_rdmabuf);
583
	/* don't byte-swap XID, it's already done in request */
584
	headerp->rm_xid = rqst->rq_xid;
585 586 587
	headerp->rm_vers = rpcrdma_version;
	headerp->rm_credit = cpu_to_be32(r_xprt->rx_buf.rb_max_requests);
	headerp->rm_type = rdma_msg;
588 589 590 591 592

	/*
	 * Chunks needed for results?
	 *
	 * o If the expected result is under the inline threshold, all ops
593
	 *   return as inline.
594 595
	 * o Large read ops return data as write chunk(s), header as
	 *   inline.
596 597
	 * o Large non-read ops return as a single reply chunk.
	 */
598
	if (rpcrdma_results_inline(r_xprt, rqst))
599
		wtype = rpcrdma_noch;
600 601
	else if (rqst->rq_rcv_buf.flags & XDRBUF_READ)
		wtype = rpcrdma_writech;
602
	else
603
		wtype = rpcrdma_replych;
604 605 606 607 608 609 610 611

	/*
	 * Chunks needed for arguments?
	 *
	 * o If the total request is under the inline threshold, all ops
	 *   are sent as inline.
	 * o Large write ops transmit data as read chunk(s), header as
	 *   inline.
612 613
	 * o Large non-write ops are sent with the entire message as a
	 *   single read chunk (protocol 0-position special case).
614
	 *
615 616 617
	 * This assumes that the upper layer does not present a request
	 * that both has a data payload, and whose non-data arguments
	 * by themselves are larger than the inline threshold.
618
	 */
C
Chuck Lever 已提交
619
	if (rpcrdma_args_inline(r_xprt, rqst)) {
620
		rtype = rpcrdma_noch;
621 622
		rpcrdma_inline_pullup(rqst);
		rpclen = rqst->rq_svec[0].iov_len;
623
	} else if (rqst->rq_snd_buf.flags & XDRBUF_WRITE) {
624
		rtype = rpcrdma_readch;
625 626
		rpclen = rqst->rq_svec[0].iov_len;
		rpclen += rpcrdma_tail_pullup(&rqst->rq_snd_buf);
627
	} else {
628
		r_xprt->rx_stats.nomsg_call_count++;
629 630 631 632
		headerp->rm_type = htonl(RDMA_NOMSG);
		rtype = rpcrdma_areadch;
		rpclen = 0;
	}
633

634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654
	/* This implementation supports the following combinations
	 * of chunk lists in one RPC-over-RDMA Call message:
	 *
	 *   - Read list
	 *   - Write list
	 *   - Reply chunk
	 *   - Read list + Reply chunk
	 *
	 * It might not yet support the following combinations:
	 *
	 *   - Read list + Write list
	 *
	 * It does not support the following combinations:
	 *
	 *   - Write list + Reply chunk
	 *   - Read list + Write list + Reply chunk
	 *
	 * This implementation supports only a single chunk in each
	 * Read or Write list. Thus for example the client cannot
	 * send a Call message with a Position Zero Read chunk and a
	 * regular Read chunk at the same time.
655
	 */
656 657 658 659 660 661 662 663 664 665 666
	iptr = headerp->rm_body.rm_chunks;
	iptr = rpcrdma_encode_read_list(r_xprt, req, rqst, iptr, rtype);
	if (IS_ERR(iptr))
		goto out_unmap;
	iptr = rpcrdma_encode_write_list(r_xprt, req, rqst, iptr, wtype);
	if (IS_ERR(iptr))
		goto out_unmap;
	iptr = rpcrdma_encode_reply_chunk(r_xprt, req, rqst, iptr, wtype);
	if (IS_ERR(iptr))
		goto out_unmap;
	hdrlen = (unsigned char *)iptr - (unsigned char *)headerp;
667

C
Chuck Lever 已提交
668 669 670
	if (hdrlen + rpclen > RPCRDMA_INLINE_WRITE_THRESHOLD(rqst))
		goto out_overflow;

671 672 673 674
	dprintk("RPC: %5u %s: %s/%s: hdrlen %zd rpclen %zd\n",
		rqst->rq_task->tk_pid, __func__,
		transfertypes[rtype], transfertypes[wtype],
		hdrlen, rpclen);
675

676
	req->rl_send_iov[0].addr = rdmab_addr(req->rl_rdmabuf);
677
	req->rl_send_iov[0].length = hdrlen;
678
	req->rl_send_iov[0].lkey = rdmab_lkey(req->rl_rdmabuf);
679

680 681 682 683
	req->rl_niovs = 1;
	if (rtype == rpcrdma_areadch)
		return 0;

684
	req->rl_send_iov[1].addr = rdmab_addr(req->rl_sendbuf);
685
	req->rl_send_iov[1].length = rpclen;
686
	req->rl_send_iov[1].lkey = rdmab_lkey(req->rl_sendbuf);
687 688 689

	req->rl_niovs = 2;
	return 0;
C
Chuck Lever 已提交
690 691

out_overflow:
692 693
	pr_err("rpcrdma: send overflow: hdrlen %zd rpclen %zu %s/%s\n",
		hdrlen, rpclen, transfertypes[rtype], transfertypes[wtype]);
694
	iptr = ERR_PTR(-EIO);
695 696

out_unmap:
697
	r_xprt->rx_ia.ri_ops->ro_unmap_safe(r_xprt, req, false);
698
	return PTR_ERR(iptr);
699 700 701 702 703 704 705
}

/*
 * Chase down a received write or reply chunklist to get length
 * RDMA'd by server. See map at rpcrdma_create_chunks()! :-)
 */
static int
706
rpcrdma_count_chunks(struct rpcrdma_rep *rep, int wrchunk, __be32 **iptrp)
707 708 709
{
	unsigned int i, total_len;
	struct rpcrdma_write_chunk *cur_wchunk;
710
	char *base = (char *)rdmab_to_msg(rep->rr_rdmabuf);
711

712
	i = be32_to_cpu(**iptrp);
713 714 715 716 717 718
	cur_wchunk = (struct rpcrdma_write_chunk *) (*iptrp + 1);
	total_len = 0;
	while (i--) {
		struct rpcrdma_segment *seg = &cur_wchunk->wc_target;
		ifdebug(FACILITY) {
			u64 off;
A
Al Viro 已提交
719
			xdr_decode_hyper((__be32 *)&seg->rs_offset, &off);
720 721
			dprintk("RPC:       %s: chunk %d@0x%llx:0x%x\n",
				__func__,
722
				be32_to_cpu(seg->rs_length),
723
				(unsigned long long)off,
724
				be32_to_cpu(seg->rs_handle));
725
		}
726
		total_len += be32_to_cpu(seg->rs_length);
727 728 729 730
		++cur_wchunk;
	}
	/* check and adjust for properly terminated write chunk */
	if (wrchunk) {
A
Al Viro 已提交
731
		__be32 *w = (__be32 *) cur_wchunk;
732 733 734 735
		if (*w++ != xdr_zero)
			return -1;
		cur_wchunk = (struct rpcrdma_write_chunk *) w;
	}
736
	if ((char *)cur_wchunk > base + rep->rr_len)
737 738
		return -1;

A
Al Viro 已提交
739
	*iptrp = (__be32 *) cur_wchunk;
740 741 742 743 744 745 746
	return total_len;
}

/*
 * Scatter inline received data back into provided iov's.
 */
static void
747
rpcrdma_inline_fixup(struct rpc_rqst *rqst, char *srcp, int copy_len, int pad)
748 749 750
{
	int i, npages, curlen, olen;
	char *destp;
751 752
	struct page **ppages;
	int page_base;
753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770

	curlen = rqst->rq_rcv_buf.head[0].iov_len;
	if (curlen > copy_len) {	/* write chunk header fixup */
		curlen = copy_len;
		rqst->rq_rcv_buf.head[0].iov_len = curlen;
	}

	dprintk("RPC:       %s: srcp 0x%p len %d hdrlen %d\n",
		__func__, srcp, copy_len, curlen);

	/* Shift pointer for first receive segment only */
	rqst->rq_rcv_buf.head[0].iov_base = srcp;
	srcp += curlen;
	copy_len -= curlen;

	olen = copy_len;
	i = 0;
	rpcx_to_rdmax(rqst->rq_xprt)->rx_stats.fixup_copy_count += olen;
771 772 773 774
	page_base = rqst->rq_rcv_buf.page_base;
	ppages = rqst->rq_rcv_buf.pages + (page_base >> PAGE_SHIFT);
	page_base &= ~PAGE_MASK;

775
	if (copy_len && rqst->rq_rcv_buf.page_len) {
776 777 778 779 780 781
		int pagelist_len;

		pagelist_len = rqst->rq_rcv_buf.page_len;
		if (pagelist_len > copy_len)
			pagelist_len = copy_len;
		npages = PAGE_ALIGN(page_base + pagelist_len) >> PAGE_SHIFT;
782
		for (; i < npages; i++) {
783
			curlen = PAGE_SIZE - page_base;
784 785 786
			if (curlen > pagelist_len)
				curlen = pagelist_len;

787 788 789
			dprintk("RPC:       %s: page %d"
				" srcp 0x%p len %d curlen %d\n",
				__func__, i, srcp, copy_len, curlen);
790
			destp = kmap_atomic(ppages[i]);
791 792
			memcpy(destp + page_base, srcp, curlen);
			flush_dcache_page(ppages[i]);
793
			kunmap_atomic(destp);
794 795
			srcp += curlen;
			copy_len -= curlen;
796 797
			pagelist_len -= curlen;
			if (!pagelist_len)
798
				break;
799
			page_base = 0;
800
		}
801
	}
802 803 804 805 806 807

	if (copy_len && rqst->rq_rcv_buf.tail[0].iov_len) {
		curlen = copy_len;
		if (curlen > rqst->rq_rcv_buf.tail[0].iov_len)
			curlen = rqst->rq_rcv_buf.tail[0].iov_len;
		if (rqst->rq_rcv_buf.tail[0].iov_base != srcp)
808
			memmove(rqst->rq_rcv_buf.tail[0].iov_base, srcp, curlen);
809 810 811 812 813 814 815
		dprintk("RPC:       %s: tail srcp 0x%p len %d curlen %d\n",
			__func__, srcp, copy_len, curlen);
		rqst->rq_rcv_buf.tail[0].iov_len = curlen;
		copy_len -= curlen; ++i;
	} else
		rqst->rq_rcv_buf.tail[0].iov_len = 0;

816 817 818 819 820 821 822
	if (pad) {
		/* implicit padding on terminal chunk */
		unsigned char *p = rqst->rq_rcv_buf.tail[0].iov_base;
		while (pad--)
			p[rqst->rq_rcv_buf.tail[0].iov_len++] = 0;
	}

823 824 825 826 827 828 829 830 831 832
	if (copy_len)
		dprintk("RPC:       %s: %d bytes in"
			" %d extra segments (%d lost)\n",
			__func__, olen, i, copy_len);

	/* TBD avoid a warning from call_decode() */
	rqst->rq_private_buf = rqst->rq_rcv_buf;
}

void
833
rpcrdma_connect_worker(struct work_struct *work)
834
{
835 836
	struct rpcrdma_ep *ep =
		container_of(work, struct rpcrdma_ep, rep_connect_worker.work);
837 838 839
	struct rpcrdma_xprt *r_xprt =
		container_of(ep, struct rpcrdma_xprt, rx_ep);
	struct rpc_xprt *xprt = &r_xprt->rx_xprt;
840 841

	spin_lock_bh(&xprt->transport_lock);
842 843
	if (++xprt->connect_cookie == 0)	/* maintain a reserved value */
		++xprt->connect_cookie;
844 845 846 847 848
	if (ep->rep_connected > 0) {
		if (!xprt_test_and_set_connected(xprt))
			xprt_wake_pending_tasks(xprt, 0);
	} else {
		if (xprt_test_and_clear_connected(xprt))
849
			xprt_wake_pending_tasks(xprt, -ENOTCONN);
850 851 852 853
	}
	spin_unlock_bh(&xprt->transport_lock);
}

854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884
#if defined(CONFIG_SUNRPC_BACKCHANNEL)
/* By convention, backchannel calls arrive via rdma_msg type
 * messages, and never populate the chunk lists. This makes
 * the RPC/RDMA header small and fixed in size, so it is
 * straightforward to check the RPC header's direction field.
 */
static bool
rpcrdma_is_bcall(struct rpcrdma_msg *headerp)
{
	__be32 *p = (__be32 *)headerp;

	if (headerp->rm_type != rdma_msg)
		return false;
	if (headerp->rm_body.rm_chunks[0] != xdr_zero)
		return false;
	if (headerp->rm_body.rm_chunks[1] != xdr_zero)
		return false;
	if (headerp->rm_body.rm_chunks[2] != xdr_zero)
		return false;

	/* sanity */
	if (p[7] != headerp->rm_xid)
		return false;
	/* call direction */
	if (p[8] != cpu_to_be32(RPC_CALL))
		return false;

	return true;
}
#endif	/* CONFIG_SUNRPC_BACKCHANNEL */

885 886 887 888 889 890 891 892 893 894 895 896
/*
 * This function is called when an async event is posted to
 * the connection which changes the connection state. All it
 * does at this point is mark the connection up/down, the rpc
 * timers do the rest.
 */
void
rpcrdma_conn_func(struct rpcrdma_ep *ep)
{
	schedule_delayed_work(&ep->rep_connect_worker, 0);
}

897 898
/* Process received RPC/RDMA messages.
 *
899 900 901 902 903 904 905 906 907
 * Errors must result in the RPC task either being awakened, or
 * allowed to timeout, to discover the errors at that time.
 */
void
rpcrdma_reply_handler(struct rpcrdma_rep *rep)
{
	struct rpcrdma_msg *headerp;
	struct rpcrdma_req *req;
	struct rpc_rqst *rqst;
908 909
	struct rpcrdma_xprt *r_xprt = rep->rr_rxprt;
	struct rpc_xprt *xprt = &r_xprt->rx_xprt;
A
Al Viro 已提交
910
	__be32 *iptr;
911
	int rdmalen, status, rmerr;
912
	unsigned long cwnd;
913

914 915 916 917
	dprintk("RPC:       %s: incoming rep %p\n", __func__, rep);

	if (rep->rr_len == RPCRDMA_BAD_LEN)
		goto out_badstatus;
918
	if (rep->rr_len < RPCRDMA_HDRLEN_ERR)
919 920
		goto out_shortreply;

921
	headerp = rdmab_to_msg(rep->rr_rdmabuf);
922 923 924 925
#if defined(CONFIG_SUNRPC_BACKCHANNEL)
	if (rpcrdma_is_bcall(headerp))
		goto out_bcall;
#endif
926

927 928 929 930
	/* Match incoming rpcrdma_rep to an rpcrdma_req to
	 * get context for handling any incoming chunks.
	 */
	spin_lock_bh(&xprt->transport_lock);
931
	rqst = xprt_lookup_rqst(xprt, headerp->rm_xid);
932 933
	if (!rqst)
		goto out_nomatch;
934 935

	req = rpcr_to_rdmar(rqst);
936 937
	if (req->rl_reply)
		goto out_duplicate;
938

939 940 941 942 943
	/* Sanity checking has passed. We are now committed
	 * to complete this transaction.
	 */
	list_del_init(&rqst->rq_list);
	spin_unlock_bh(&xprt->transport_lock);
944 945
	dprintk("RPC:       %s: reply %p completes request %p (xid 0x%08x)\n",
		__func__, rep, req, be32_to_cpu(headerp->rm_xid));
946 947 948

	/* from here on, the reply is no longer an orphan */
	req->rl_reply = rep;
949
	xprt->reestablish_timeout = 0;
950

951 952 953
	if (headerp->rm_vers != rpcrdma_version)
		goto out_badversion;

954 955 956
	/* check for expected message types */
	/* The order of some of these tests is important. */
	switch (headerp->rm_type) {
957
	case rdma_msg:
958 959 960 961 962 963 964
		/* never expect read chunks */
		/* never expect reply chunks (two ways to check) */
		/* never expect write chunks without having offered RDMA */
		if (headerp->rm_body.rm_chunks[0] != xdr_zero ||
		    (headerp->rm_body.rm_chunks[1] == xdr_zero &&
		     headerp->rm_body.rm_chunks[2] != xdr_zero) ||
		    (headerp->rm_body.rm_chunks[1] != xdr_zero &&
965
		     list_empty(&req->rl_registered)))
966 967 968 969 970
			goto badheader;
		if (headerp->rm_body.rm_chunks[1] != xdr_zero) {
			/* count any expected write chunks in read reply */
			/* start at write chunk array count */
			iptr = &headerp->rm_body.rm_chunks[2];
971
			rdmalen = rpcrdma_count_chunks(rep, 1, &iptr);
972 973 974 975 976 977 978
			/* check for validity, and no reply chunk after */
			if (rdmalen < 0 || *iptr++ != xdr_zero)
				goto badheader;
			rep->rr_len -=
			    ((unsigned char *)iptr - (unsigned char *)headerp);
			status = rep->rr_len + rdmalen;
			r_xprt->rx_stats.total_rdma_reply += rdmalen;
979 980 981 982 983
			/* special case - last chunk may omit padding */
			if (rdmalen &= 3) {
				rdmalen = 4 - rdmalen;
				status += rdmalen;
			}
984 985
		} else {
			/* else ordinary inline */
986
			rdmalen = 0;
C
Chuck Lever 已提交
987 988 989
			iptr = (__be32 *)((unsigned char *)headerp +
							RPCRDMA_HDRLEN_MIN);
			rep->rr_len -= RPCRDMA_HDRLEN_MIN;
990 991 992
			status = rep->rr_len;
		}
		/* Fix up the rpc results for upper layer */
993
		rpcrdma_inline_fixup(rqst, (char *)iptr, rep->rr_len, rdmalen);
994 995
		break;

996
	case rdma_nomsg:
997 998 999 1000
		/* never expect read or write chunks, always reply chunks */
		if (headerp->rm_body.rm_chunks[0] != xdr_zero ||
		    headerp->rm_body.rm_chunks[1] != xdr_zero ||
		    headerp->rm_body.rm_chunks[2] != xdr_one ||
1001
		    list_empty(&req->rl_registered))
1002
			goto badheader;
C
Chuck Lever 已提交
1003 1004
		iptr = (__be32 *)((unsigned char *)headerp +
							RPCRDMA_HDRLEN_MIN);
1005
		rdmalen = rpcrdma_count_chunks(rep, 0, &iptr);
1006 1007 1008 1009 1010 1011 1012
		if (rdmalen < 0)
			goto badheader;
		r_xprt->rx_stats.total_rdma_reply += rdmalen;
		/* Reply chunk buffer already is the reply vector - no fixup. */
		status = rdmalen;
		break;

1013 1014 1015
	case rdma_error:
		goto out_rdmaerr;

1016 1017
badheader:
	default:
1018 1019 1020
		dprintk("RPC: %5u %s: invalid rpcrdma reply (type %u)\n",
			rqst->rq_task->tk_pid, __func__,
			be32_to_cpu(headerp->rm_type));
1021 1022 1023 1024 1025
		status = -EIO;
		r_xprt->rx_stats.bad_reply_count++;
		break;
	}

1026
out:
1027 1028 1029 1030 1031 1032 1033
	/* Invalidate and flush the data payloads before waking the
	 * waiting application. This guarantees the memory region is
	 * properly fenced from the server before the application
	 * accesses the data. It also ensures proper send flow
	 * control: waking the next RPC waits until this RPC has
	 * relinquished all its Send Queue entries.
	 */
1034
	if (!list_empty(&req->rl_registered))
1035 1036 1037
		r_xprt->rx_ia.ri_ops->ro_unmap_sync(r_xprt, req);

	spin_lock_bh(&xprt->transport_lock);
1038
	cwnd = xprt->cwnd;
1039
	xprt->cwnd = atomic_read(&r_xprt->rx_buf.rb_credits) << RPC_CWNDSHIFT;
1040 1041 1042
	if (xprt->cwnd > cwnd)
		xprt_release_rqst_cong(rqst->rq_task);

1043
	xprt_complete_rqst(rqst->rq_task, status);
1044
	spin_unlock_bh(&xprt->transport_lock);
1045 1046
	dprintk("RPC:       %s: xprt_complete_rqst(0x%p, 0x%p, %d)\n",
			__func__, xprt, rqst, status);
1047 1048 1049 1050 1051 1052 1053 1054 1055 1056
	return;

out_badstatus:
	rpcrdma_recv_buffer_put(rep);
	if (r_xprt->rx_ep.rep_connected == 1) {
		r_xprt->rx_ep.rep_connected = -EIO;
		rpcrdma_conn_func(&r_xprt->rx_ep);
	}
	return;

1057 1058 1059 1060 1061 1062
#if defined(CONFIG_SUNRPC_BACKCHANNEL)
out_bcall:
	rpcrdma_bc_receive_call(r_xprt, rep);
	return;
#endif

1063 1064 1065 1066
/* If the incoming reply terminated a pending RPC, the next
 * RPC call will post a replacement receive buffer as it is
 * being marshaled.
 */
1067 1068 1069
out_badversion:
	dprintk("RPC:       %s: invalid version %d\n",
		__func__, be32_to_cpu(headerp->rm_vers));
1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099
	status = -EIO;
	r_xprt->rx_stats.bad_reply_count++;
	goto out;

out_rdmaerr:
	rmerr = be32_to_cpu(headerp->rm_body.rm_error.rm_err);
	switch (rmerr) {
	case ERR_VERS:
		pr_err("%s: server reports header version error (%u-%u)\n",
		       __func__,
		       be32_to_cpu(headerp->rm_body.rm_error.rm_vers_low),
		       be32_to_cpu(headerp->rm_body.rm_error.rm_vers_high));
		break;
	case ERR_CHUNK:
		pr_err("%s: server reports header decoding error\n",
		       __func__);
		break;
	default:
		pr_err("%s: server reports unknown error %d\n",
		       __func__, rmerr);
	}
	status = -EREMOTEIO;
	r_xprt->rx_stats.bad_reply_count++;
	goto out;

/* If no pending RPC transaction was matched, post a replacement
 * receive buffer before returning.
 */
out_shortreply:
	dprintk("RPC:       %s: short/invalid reply\n", __func__);
1100 1101 1102
	goto repost;

out_nomatch:
1103
	spin_unlock_bh(&xprt->transport_lock);
1104 1105 1106 1107 1108 1109
	dprintk("RPC:       %s: no match for incoming xid 0x%08x len %d\n",
		__func__, be32_to_cpu(headerp->rm_xid),
		rep->rr_len);
	goto repost;

out_duplicate:
1110
	spin_unlock_bh(&xprt->transport_lock);
1111 1112 1113 1114 1115 1116 1117 1118
	dprintk("RPC:       %s: "
		"duplicate reply %p to RPC request %p: xid 0x%08x\n",
		__func__, rep, req, be32_to_cpu(headerp->rm_xid));

repost:
	r_xprt->rx_stats.bad_reply_count++;
	if (rpcrdma_ep_post_recv(&r_xprt->rx_ia, &r_xprt->rx_ep, rep))
		rpcrdma_recv_buffer_put(rep);
1119
}