frwr_ops.c 20.1 KB
Newer Older
1
// SPDX-License-Identifier: GPL-2.0
2
/*
3
 * Copyright (c) 2015, 2017 Oracle.  All rights reserved.
4 5 6 7
 * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
 */

/* Lightweight memory registration using Fast Registration Work
8
 * Requests (FRWR).
9 10 11 12 13 14
 *
 * FRWR features ordered asynchronous registration and deregistration
 * of arbitrarily sized memory regions. This is the fastest and safest
 * but most complex memory registration mode.
 */

15 16 17
/* Normal operation
 *
 * A Memory Region is prepared for RDMA READ or WRITE using a FAST_REG
18
 * Work Request (frwr_map). When the RDMA operation is finished, this
19
 * Memory Region is invalidated using a LOCAL_INV Work Request
20
 * (frwr_unmap_sync).
21 22 23 24 25 26
 *
 * Typically these Work Requests are not signaled, and neither are RDMA
 * SEND Work Requests (with the exception of signaling occasionally to
 * prevent provider work queue overflows). This greatly reduces HCA
 * interrupt workload.
 *
27
 * As an optimization, frwr_unmap marks MRs INVALID before the
28
 * LOCAL_INV WR is posted. If posting succeeds, the MR is placed on
C
Chuck Lever 已提交
29
 * rb_mrs immediately so that no work (like managing a linked list
30 31
 * under a spinlock) is needed in the completion upcall.
 *
32
 * But this means that frwr_map() can occasionally encounter an MR
33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
 * that is INVALID but the LOCAL_INV WR has not completed. Work Queue
 * ordering prevents a subsequent FAST_REG WR from executing against
 * that MR while it is still being invalidated.
 */

/* Transport recovery
 *
 * ->op_map and the transport connect worker cannot run at the same
 * time, but ->op_unmap can fire while the transport connect worker
 * is running. Thus MR recovery is handled in ->op_map, to guarantee
 * that recovered MRs are owned by a sending RPC, and not one where
 * ->op_unmap could fire at the same time transport reconnect is
 * being done.
 *
 * When the underlying transport disconnects, MRs are left in one of
48
 * four states:
49 50 51 52 53
 *
 * INVALID:	The MR was not in use before the QP entered ERROR state.
 *
 * VALID:	The MR was registered before the QP entered ERROR state.
 *
54 55 56 57 58 59
 * FLUSHED_FR:	The MR was being registered when the QP entered ERROR
 *		state, and the pending WR was flushed.
 *
 * FLUSHED_LI:	The MR was being invalidated when the QP entered ERROR
 *		state, and the pending WR was flushed.
 *
60
 * When frwr_map encounters FLUSHED and VALID MRs, they are recovered
61
 * with ib_dereg_mr and then are re-initialized. Because MR recovery
62
 * allocates fresh resources, it is deferred to a workqueue, and the
C
Chuck Lever 已提交
63
 * recovered MRs are placed back on the rb_mrs list when recovery is
64
 * complete. frwr_map allocates another MR for the current RPC while
65 66
 * the broken MR is reset.
 *
67
 * To ensure that frwr_map doesn't encounter an MR that is marked
68 69 70 71 72
 * INVALID but that is about to be flushed due to a previous transport
 * disconnect, the transport connect worker attempts to drain all
 * pending send queue WRs before the transport is reconnected.
 */

73
#include <linux/sunrpc/rpc_rdma.h>
C
Chuck Lever 已提交
74
#include <linux/sunrpc/svc_rdma.h>
75

76
#include "xprt_rdma.h"
77
#include <trace/events/rpcrdma.h>
78 79 80 81 82

#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
# define RPCDBG_FACILITY	RPCDBG_TRANS
#endif

83 84
/**
 * frwr_is_supported - Check if device supports FRWR
85
 * @device: interface adapter to check
86 87 88
 *
 * Returns true if device supports FRWR, otherwise false
 */
89
bool frwr_is_supported(struct ib_device *device)
90
{
91
	struct ib_device_attr *attrs = &device->attrs;
92 93 94 95 96 97 98 99 100

	if (!(attrs->device_cap_flags & IB_DEVICE_MEM_MGT_EXTENSIONS))
		goto out_not_supported;
	if (attrs->max_fast_reg_page_list_len == 0)
		goto out_not_supported;
	return true;

out_not_supported:
	pr_info("rpcrdma: 'frwr' mode is not supported by device %s\n",
101
		device->name);
102 103 104
	return false;
}

105 106 107 108 109 110
/**
 * frwr_release_mr - Destroy one MR
 * @mr: MR allocated by frwr_init_mr
 *
 */
void frwr_release_mr(struct rpcrdma_mr *mr)
111 112 113 114 115
{
	int rc;

	rc = ib_dereg_mr(mr->frwr.fr_mr);
	if (rc)
116
		trace_xprtrdma_frwr_dereg(mr, rc);
117 118 119 120 121 122 123 124 125 126 127 128 129 130 131
	kfree(mr->mr_sg);
	kfree(mr);
}

/* MRs are dynamically allocated, so simply clean up and release the MR.
 * A replacement MR will subsequently be allocated on demand.
 */
static void
frwr_mr_recycle_worker(struct work_struct *work)
{
	struct rpcrdma_mr *mr = container_of(work, struct rpcrdma_mr, mr_recycle);
	struct rpcrdma_xprt *r_xprt = mr->mr_xprt;

	trace_xprtrdma_mr_recycle(mr);

132
	if (mr->mr_dir != DMA_NONE) {
133
		trace_xprtrdma_mr_unmap(mr);
134
		ib_dma_unmap_sg(r_xprt->rx_ia.ri_id->device,
135
				mr->mr_sg, mr->mr_nents, mr->mr_dir);
136
		mr->mr_dir = DMA_NONE;
137 138 139 140 141 142
	}

	spin_lock(&r_xprt->rx_buf.rb_mrlock);
	list_del(&mr->mr_all);
	r_xprt->rx_stats.mrs_recycled++;
	spin_unlock(&r_xprt->rx_buf.rb_mrlock);
143 144

	frwr_release_mr(mr);
145 146
}

147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166
/* frwr_reset - Place MRs back on the free list
 * @req: request to reset
 *
 * Used after a failed marshal. For FRWR, this means the MRs
 * don't have to be fully released and recreated.
 *
 * NB: This is safe only as long as none of @req's MRs are
 * involved with an ongoing asynchronous FAST_REG or LOCAL_INV
 * Work Request.
 */
void frwr_reset(struct rpcrdma_req *req)
{
	while (!list_empty(&req->rl_registered)) {
		struct rpcrdma_mr *mr;

		mr = rpcrdma_mr_pop(&req->rl_registered);
		rpcrdma_mr_unmap_and_put(mr);
	}
}

167 168 169 170 171 172 173 174 175
/**
 * frwr_init_mr - Initialize one MR
 * @ia: interface adapter
 * @mr: generic MR to prepare for FRWR
 *
 * Returns zero if successful. Otherwise a negative errno
 * is returned.
 */
int frwr_init_mr(struct rpcrdma_ia *ia, struct rpcrdma_mr *mr)
176
{
177
	unsigned int depth = ia->ri_max_frwr_depth;
C
Chuck Lever 已提交
178 179
	struct scatterlist *sg;
	struct ib_mr *frmr;
180 181
	int rc;

C
Chuck Lever 已提交
182 183
	frmr = ib_alloc_mr(ia->ri_pd, ia->ri_mrtype, depth);
	if (IS_ERR(frmr))
184 185
		goto out_mr_err;

C
Chuck Lever 已提交
186 187
	sg = kcalloc(depth, sizeof(*sg), GFP_KERNEL);
	if (!sg)
188 189
		goto out_list_err;

C
Chuck Lever 已提交
190
	mr->frwr.fr_mr = frmr;
191
	mr->mr_dir = DMA_NONE;
192
	INIT_LIST_HEAD(&mr->mr_list);
193
	INIT_WORK(&mr->mr_recycle, frwr_mr_recycle_worker);
C
Chuck Lever 已提交
194 195 196 197
	init_completion(&mr->frwr.fr_linv_done);

	sg_init_table(sg, depth);
	mr->mr_sg = sg;
198 199 200
	return 0;

out_mr_err:
C
Chuck Lever 已提交
201
	rc = PTR_ERR(frmr);
202
	trace_xprtrdma_frwr_alloc(mr, rc);
203 204 205 206 207
	return rc;

out_list_err:
	dprintk("RPC:       %s: sg allocation failure\n",
		__func__);
C
Chuck Lever 已提交
208 209
	ib_dereg_mr(frmr);
	return -ENOMEM;
210 211
}

212 213 214 215 216 217
/**
 * frwr_open - Prepare an endpoint for use with FRWR
 * @ia: interface adapter this endpoint will use
 * @ep: endpoint to prepare
 *
 * On success, sets:
218 219
 *	ep->rep_attr.cap.max_send_wr
 *	ep->rep_attr.cap.max_recv_wr
220
 *	ep->rep_max_requests
221 222 223 224 225
 *	ia->ri_max_segs
 *
 * And these FRWR-related fields:
 *	ia->ri_max_frwr_depth
 *	ia->ri_mrtype
226 227
 *
 * On failure, a negative errno is returned.
228
 */
229
int frwr_open(struct rpcrdma_ia *ia, struct rpcrdma_ep *ep)
C
Chuck Lever 已提交
230
{
231
	struct ib_device_attr *attrs = &ia->ri_id->device->attrs;
232
	int max_qp_wr, depth, delta;
C
Chuck Lever 已提交
233

C
Chuck Lever 已提交
234 235 236 237
	ia->ri_mrtype = IB_MR_TYPE_MEM_REG;
	if (attrs->device_cap_flags & IB_DEVICE_SG_GAPS_REG)
		ia->ri_mrtype = IB_MR_TYPE_SG_GAPS;

C
Chuck Lever 已提交
238 239 240 241 242 243 244 245 246 247 248
	/* Quirk: Some devices advertise a large max_fast_reg_page_list_len
	 * capability, but perform optimally when the MRs are not larger
	 * than a page.
	 */
	if (attrs->max_sge_rd > 1)
		ia->ri_max_frwr_depth = attrs->max_sge_rd;
	else
		ia->ri_max_frwr_depth = attrs->max_fast_reg_page_list_len;
	if (ia->ri_max_frwr_depth > RPCRDMA_MAX_DATA_SEGS)
		ia->ri_max_frwr_depth = RPCRDMA_MAX_DATA_SEGS;
	dprintk("RPC:       %s: max FR page list depth = %u\n",
249 250 251 252 253 254 255 256 257
		__func__, ia->ri_max_frwr_depth);

	/* Add room for frwr register and invalidate WRs.
	 * 1. FRWR reg WR for head
	 * 2. FRWR invalidate WR for head
	 * 3. N FRWR reg WRs for pagelist
	 * 4. N FRWR invalidate WRs for pagelist
	 * 5. FRWR reg WR for tail
	 * 6. FRWR invalidate WR for tail
C
Chuck Lever 已提交
258 259 260 261
	 * 7. The RDMA_SEND WR
	 */
	depth = 7;

262
	/* Calculate N if the device max FRWR depth is smaller than
C
Chuck Lever 已提交
263 264
	 * RPCRDMA_MAX_DATA_SEGS.
	 */
265 266
	if (ia->ri_max_frwr_depth < RPCRDMA_MAX_DATA_SEGS) {
		delta = RPCRDMA_MAX_DATA_SEGS - ia->ri_max_frwr_depth;
C
Chuck Lever 已提交
267
		do {
268 269
			depth += 2; /* FRWR reg + invalidate */
			delta -= ia->ri_max_frwr_depth;
C
Chuck Lever 已提交
270 271 272
		} while (delta > 0);
	}

273
	max_qp_wr = ia->ri_id->device->attrs.max_qp_wr;
274 275 276 277
	max_qp_wr -= RPCRDMA_BACKWARD_WRS;
	max_qp_wr -= 1;
	if (max_qp_wr < RPCRDMA_MIN_SLOT_TABLE)
		return -ENOMEM;
278 279 280
	if (ep->rep_max_requests > max_qp_wr)
		ep->rep_max_requests = max_qp_wr;
	ep->rep_attr.cap.max_send_wr = ep->rep_max_requests * depth;
281
	if (ep->rep_attr.cap.max_send_wr > max_qp_wr) {
282 283
		ep->rep_max_requests = max_qp_wr / depth;
		if (!ep->rep_max_requests)
C
Chuck Lever 已提交
284
			return -EINVAL;
285
		ep->rep_attr.cap.max_send_wr = ep->rep_max_requests * depth;
C
Chuck Lever 已提交
286
	}
287 288
	ep->rep_attr.cap.max_send_wr += RPCRDMA_BACKWARD_WRS;
	ep->rep_attr.cap.max_send_wr += 1; /* for ib_drain_sq */
289
	ep->rep_attr.cap.max_recv_wr = ep->rep_max_requests;
290 291
	ep->rep_attr.cap.max_recv_wr += RPCRDMA_BACKWARD_WRS;
	ep->rep_attr.cap.max_recv_wr += 1; /* for ib_drain_rq */
C
Chuck Lever 已提交
292

293
	ia->ri_max_segs = max_t(unsigned int, 1, RPCRDMA_MAX_DATA_SEGS /
294
				ia->ri_max_frwr_depth);
295 296 297 298
	/* Reply chunks require segments for head and tail buffers */
	ia->ri_max_segs += 2;
	if (ia->ri_max_segs > RPCRDMA_MAX_HDR_SEGS)
		ia->ri_max_segs = RPCRDMA_MAX_HDR_SEGS;
C
Chuck Lever 已提交
299 300 301
	return 0;
}

302 303 304 305 306 307 308
/**
 * frwr_maxpages - Compute size of largest payload
 * @r_xprt: transport
 *
 * Returns maximum size of an RPC message, in pages.
 *
 * FRWR mode conveys a list of pages per chunk segment. The
309 310
 * maximum length of that list is the FRWR page list depth.
 */
311
size_t frwr_maxpages(struct rpcrdma_xprt *r_xprt)
312 313 314 315
{
	struct rpcrdma_ia *ia = &r_xprt->rx_ia;

	return min_t(unsigned int, RPCRDMA_MAX_DATA_SEGS,
316
		     (ia->ri_max_segs - 2) * ia->ri_max_frwr_depth);
317 318
}

319 320 321 322 323 324
/**
 * frwr_map - Register a memory region
 * @r_xprt: controlling transport
 * @seg: memory region co-ordinates
 * @nsegs: number of segments remaining
 * @writing: true when RDMA Write will be used
325
 * @xid: XID of RPC using the registered memory
326 327 328
 * @out: initialized MR
 *
 * Prepare a REG_MR Work Request to register a memory region
329
 * for remote access via RDMA READ or RDMA WRITE.
330 331 332
 *
 * Returns the next segment or a negative errno pointer.
 * On success, the prepared MR is planted in @out.
333
 */
334 335
struct rpcrdma_mr_seg *frwr_map(struct rpcrdma_xprt *r_xprt,
				struct rpcrdma_mr_seg *seg,
C
Chuck Lever 已提交
336
				int nsegs, bool writing, __be32 xid,
337
				struct rpcrdma_mr **out)
338 339
{
	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
C
Chuck Lever 已提交
340
	bool holes_ok = ia->ri_mrtype == IB_MR_TYPE_SG_GAPS;
C
Chuck Lever 已提交
341 342
	struct rpcrdma_mr *mr;
	struct ib_mr *ibmr;
343
	struct ib_reg_wr *reg_wr;
344
	int i, n;
345 346
	u8 key;

C
Chuck Lever 已提交
347 348 349
	mr = rpcrdma_mr_get(r_xprt);
	if (!mr)
		goto out_getmr_err;
350 351 352

	if (nsegs > ia->ri_max_frwr_depth)
		nsegs = ia->ri_max_frwr_depth;
353 354
	for (i = 0; i < nsegs;) {
		if (seg->mr_page)
C
Chuck Lever 已提交
355
			sg_set_page(&mr->mr_sg[i],
356 357 358 359
				    seg->mr_page,
				    seg->mr_len,
				    offset_in_page(seg->mr_offset));
		else
C
Chuck Lever 已提交
360
			sg_set_buf(&mr->mr_sg[i], seg->mr_offset,
361 362
				   seg->mr_len);

363 364
		++seg;
		++i;
C
Chuck Lever 已提交
365 366
		if (holes_ok)
			continue;
367 368 369 370
		if ((i < nsegs && offset_in_page(seg->mr_offset)) ||
		    offset_in_page((seg-1)->mr_offset + (seg-1)->mr_len))
			break;
	}
C
Chuck Lever 已提交
371
	mr->mr_dir = rpcrdma_data_dir(writing);
372

373 374
	mr->mr_nents =
		ib_dma_map_sg(ia->ri_id->device, mr->mr_sg, i, mr->mr_dir);
C
Chuck Lever 已提交
375
	if (!mr->mr_nents)
376 377
		goto out_dmamap_err;

C
Chuck Lever 已提交
378
	ibmr = mr->frwr.fr_mr;
C
Chuck Lever 已提交
379 380
	n = ib_map_mr_sg(ibmr, mr->mr_sg, mr->mr_nents, NULL, PAGE_SIZE);
	if (unlikely(n != mr->mr_nents))
381
		goto out_mapmr_err;
382

383
	ibmr->iova &= 0x00000000ffffffff;
C
Chuck Lever 已提交
384
	ibmr->iova |= ((u64)be32_to_cpu(xid)) << 32;
C
Chuck Lever 已提交
385 386
	key = (u8)(ibmr->rkey & 0x000000FF);
	ib_update_fast_reg_key(ibmr, ++key);
387

C
Chuck Lever 已提交
388
	reg_wr = &mr->frwr.fr_regwr;
C
Chuck Lever 已提交
389 390
	reg_wr->mr = ibmr;
	reg_wr->key = ibmr->rkey;
391 392 393
	reg_wr->access = writing ?
			 IB_ACCESS_REMOTE_WRITE | IB_ACCESS_LOCAL_WRITE :
			 IB_ACCESS_REMOTE_READ;
394

C
Chuck Lever 已提交
395 396 397
	mr->mr_handle = ibmr->rkey;
	mr->mr_length = ibmr->length;
	mr->mr_offset = ibmr->iova;
398
	trace_xprtrdma_mr_map(mr);
399

C
Chuck Lever 已提交
400
	*out = mr;
401
	return seg;
402

403 404 405 406
out_getmr_err:
	xprt_wait_for_buffer_space(&r_xprt->rx_xprt);
	return ERR_PTR(-EAGAIN);

407
out_dmamap_err:
408
	mr->mr_dir = DMA_NONE;
409
	trace_xprtrdma_frwr_sgerr(mr, i);
C
Chuck Lever 已提交
410
	rpcrdma_mr_put(mr);
411
	return ERR_PTR(-EIO);
412 413

out_mapmr_err:
414
	trace_xprtrdma_frwr_maperr(mr, n);
415
	rpcrdma_mr_recycle(mr);
416
	return ERR_PTR(-EIO);
417
}
418

C
Chuck Lever 已提交
419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435
/**
 * frwr_wc_fastreg - Invoked by RDMA provider for a flushed FastReg WC
 * @cq:	completion queue (ignored)
 * @wc:	completed WR
 *
 */
static void frwr_wc_fastreg(struct ib_cq *cq, struct ib_wc *wc)
{
	struct ib_cqe *cqe = wc->wr_cqe;
	struct rpcrdma_frwr *frwr =
		container_of(cqe, struct rpcrdma_frwr, fr_cqe);

	/* WARNING: Only wr_cqe and status are reliable at this point */
	trace_xprtrdma_wc_fastreg(wc, frwr);
	/* The MR will get recycled when the associated req is retransmitted */
}

436 437 438 439
/**
 * frwr_send - post Send WR containing the RPC Call message
 * @ia: interface adapter
 * @req: Prepared RPC Call
440
 *
441
 * For FRWR, chain any FastReg WRs to the Send WR. Only a
442 443
 * single ib_post_send call is needed to register memory
 * and then post the Send WR.
444 445
 *
 * Returns the result of ib_post_send.
446
 */
447
int frwr_send(struct rpcrdma_ia *ia, struct rpcrdma_req *req)
448
{
449
	struct ib_send_wr *post_wr;
450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468
	struct rpcrdma_mr *mr;

	post_wr = &req->rl_sendctx->sc_wr;
	list_for_each_entry(mr, &req->rl_registered, mr_list) {
		struct rpcrdma_frwr *frwr;

		frwr = &mr->frwr;

		frwr->fr_cqe.done = frwr_wc_fastreg;
		frwr->fr_regwr.wr.next = post_wr;
		frwr->fr_regwr.wr.wr_cqe = &frwr->fr_cqe;
		frwr->fr_regwr.wr.num_sge = 0;
		frwr->fr_regwr.wr.opcode = IB_WR_REG_MR;
		frwr->fr_regwr.wr.send_flags = 0;

		post_wr = &frwr->fr_regwr.wr;
	}

	/* If ib_post_send fails, the next ->send_request for
469
	 * @req will queue these MRs for recovery.
470
	 */
471
	return ib_post_send(ia->ri_id->qp, post_wr, NULL);
472 473
}

474 475 476 477 478
/**
 * frwr_reminv - handle a remotely invalidated mr on the @mrs list
 * @rep: Received reply
 * @mrs: list of MRs to check
 *
479
 */
480
void frwr_reminv(struct rpcrdma_rep *rep, struct list_head *mrs)
481
{
C
Chuck Lever 已提交
482
	struct rpcrdma_mr *mr;
483

C
Chuck Lever 已提交
484 485
	list_for_each_entry(mr, mrs, mr_list)
		if (mr->mr_handle == rep->rr_inv_rkey) {
486
			list_del_init(&mr->mr_list);
487
			trace_xprtrdma_mr_remoteinv(mr);
488
			rpcrdma_mr_unmap_and_put(mr);
489 490 491 492
			break;	/* only one invalidated MR per RPC */
		}
}

C
Chuck Lever 已提交
493 494 495 496 497 498 499 500
static void __frwr_release_mr(struct ib_wc *wc, struct rpcrdma_mr *mr)
{
	if (wc->status != IB_WC_SUCCESS)
		rpcrdma_mr_recycle(mr);
	else
		rpcrdma_mr_unmap_and_put(mr);
}

501
/**
C
Chuck Lever 已提交
502 503 504
 * frwr_wc_localinv - Invoked by RDMA provider for a LOCAL_INV WC
 * @cq:	completion queue (ignored)
 * @wc:	completed WR
505
 *
C
Chuck Lever 已提交
506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522
 */
static void frwr_wc_localinv(struct ib_cq *cq, struct ib_wc *wc)
{
	struct ib_cqe *cqe = wc->wr_cqe;
	struct rpcrdma_frwr *frwr =
		container_of(cqe, struct rpcrdma_frwr, fr_cqe);
	struct rpcrdma_mr *mr = container_of(frwr, struct rpcrdma_mr, frwr);

	/* WARNING: Only wr_cqe and status are reliable at this point */
	trace_xprtrdma_wc_li(wc, frwr);
	__frwr_release_mr(wc, mr);
}

/**
 * frwr_wc_localinv_wake - Invoked by RDMA provider for a LOCAL_INV WC
 * @cq:	completion queue (ignored)
 * @wc:	completed WR
523
 *
C
Chuck Lever 已提交
524
 * Awaken anyone waiting for an MR to finish being fenced.
525
 */
C
Chuck Lever 已提交
526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544
static void frwr_wc_localinv_wake(struct ib_cq *cq, struct ib_wc *wc)
{
	struct ib_cqe *cqe = wc->wr_cqe;
	struct rpcrdma_frwr *frwr =
		container_of(cqe, struct rpcrdma_frwr, fr_cqe);
	struct rpcrdma_mr *mr = container_of(frwr, struct rpcrdma_mr, frwr);

	/* WARNING: Only wr_cqe and status are reliable at this point */
	trace_xprtrdma_wc_li_wake(wc, frwr);
	complete(&frwr->fr_linv_done);
	__frwr_release_mr(wc, mr);
}

/**
 * frwr_unmap_sync - invalidate memory regions that were registered for @req
 * @r_xprt: controlling transport instance
 * @req: rpcrdma_req with a non-empty list of MRs to process
 *
 * Sleeps until it is safe for the host CPU to access the previously mapped
545 546 547 548
 * memory regions. This guarantees that registered MRs are properly fenced
 * from the server before the RPC consumer accesses the data in them. It
 * also ensures proper Send flow control: waking the next RPC waits until
 * this RPC has relinquished all its Send Queue entries.
C
Chuck Lever 已提交
549 550
 */
void frwr_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
551
{
552 553
	struct ib_send_wr *first, **prev, *last;
	const struct ib_send_wr *bad_wr;
554
	struct rpcrdma_frwr *frwr;
C
Chuck Lever 已提交
555
	struct rpcrdma_mr *mr;
C
Chuck Lever 已提交
556
	int rc;
557

558
	/* ORDER: Invalidate all of the MRs first
559 560 561 562
	 *
	 * Chain the LOCAL_INV Work Requests and post them with
	 * a single ib_post_send() call.
	 */
563
	frwr = NULL;
C
Chuck Lever 已提交
564
	prev = &first;
C
Chuck Lever 已提交
565 566
	while (!list_empty(&req->rl_registered)) {
		mr = rpcrdma_mr_pop(&req->rl_registered);
C
Chuck Lever 已提交
567

568
		trace_xprtrdma_mr_localinv(mr);
C
Chuck Lever 已提交
569
		r_xprt->rx_stats.local_inv_needed++;
C
Chuck Lever 已提交
570

C
Chuck Lever 已提交
571
		frwr = &mr->frwr;
572 573
		frwr->fr_cqe.done = frwr_wc_localinv;
		last = &frwr->fr_invwr;
C
Chuck Lever 已提交
574
		last->next = NULL;
575
		last->wr_cqe = &frwr->fr_cqe;
C
Chuck Lever 已提交
576 577
		last->sg_list = NULL;
		last->num_sge = 0;
C
Chuck Lever 已提交
578
		last->opcode = IB_WR_LOCAL_INV;
C
Chuck Lever 已提交
579
		last->send_flags = IB_SEND_SIGNALED;
C
Chuck Lever 已提交
580
		last->ex.invalidate_rkey = mr->mr_handle;
581

C
Chuck Lever 已提交
582 583
		*prev = last;
		prev = &last->next;
584 585 586 587 588 589
	}

	/* Strong send queue ordering guarantees that when the
	 * last WR in the chain completes, all WRs in the chain
	 * are complete.
	 */
590 591
	frwr->fr_cqe.done = frwr_wc_localinv_wake;
	reinit_completion(&frwr->fr_linv_done);
592

593 594 595 596
	/* Transport disconnect drains the receive CQ before it
	 * replaces the QP. The RPC reply handler won't call us
	 * unless ri_id->qp is a valid pointer.
	 */
597
	bad_wr = NULL;
C
Chuck Lever 已提交
598 599
	rc = ib_post_send(r_xprt->rx_ia.ri_id->qp, first, &bad_wr);
	trace_xprtrdma_post_send(req, rc);
600

C
Chuck Lever 已提交
601 602 603
	/* The final LOCAL_INV WR in the chain is supposed to
	 * do the wake. If it was never posted, the wake will
	 * not happen, so don't wait in that case.
604
	 */
C
Chuck Lever 已提交
605 606 607 608
	if (bad_wr != first)
		wait_for_completion(&frwr->fr_linv_done);
	if (!rc)
		return;
609

C
Chuck Lever 已提交
610
	/* Recycle MRs in the LOCAL_INV chain that did not get posted.
611
	 */
612
	while (bad_wr) {
613 614
		frwr = container_of(bad_wr, struct rpcrdma_frwr,
				    fr_invwr);
C
Chuck Lever 已提交
615
		mr = container_of(frwr, struct rpcrdma_mr, frwr);
616
		bad_wr = bad_wr->next;
617

618 619
		list_del_init(&mr->mr_list);
		rpcrdma_mr_recycle(mr);
620
	}
621
}
622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719

/**
 * frwr_wc_localinv_done - Invoked by RDMA provider for a signaled LOCAL_INV WC
 * @cq:	completion queue (ignored)
 * @wc:	completed WR
 *
 */
static void frwr_wc_localinv_done(struct ib_cq *cq, struct ib_wc *wc)
{
	struct ib_cqe *cqe = wc->wr_cqe;
	struct rpcrdma_frwr *frwr =
		container_of(cqe, struct rpcrdma_frwr, fr_cqe);
	struct rpcrdma_mr *mr = container_of(frwr, struct rpcrdma_mr, frwr);

	/* WARNING: Only wr_cqe and status are reliable at this point */
	trace_xprtrdma_wc_li_done(wc, frwr);
	rpcrdma_complete_rqst(frwr->fr_req->rl_reply);
	__frwr_release_mr(wc, mr);
}

/**
 * frwr_unmap_async - invalidate memory regions that were registered for @req
 * @r_xprt: controlling transport instance
 * @req: rpcrdma_req with a non-empty list of MRs to process
 *
 * This guarantees that registered MRs are properly fenced from the
 * server before the RPC consumer accesses the data in them. It also
 * ensures proper Send flow control: waking the next RPC waits until
 * this RPC has relinquished all its Send Queue entries.
 */
void frwr_unmap_async(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
{
	struct ib_send_wr *first, *last, **prev;
	const struct ib_send_wr *bad_wr;
	struct rpcrdma_frwr *frwr;
	struct rpcrdma_mr *mr;
	int rc;

	/* Chain the LOCAL_INV Work Requests and post them with
	 * a single ib_post_send() call.
	 */
	frwr = NULL;
	prev = &first;
	while (!list_empty(&req->rl_registered)) {
		mr = rpcrdma_mr_pop(&req->rl_registered);

		trace_xprtrdma_mr_localinv(mr);
		r_xprt->rx_stats.local_inv_needed++;

		frwr = &mr->frwr;
		frwr->fr_cqe.done = frwr_wc_localinv;
		frwr->fr_req = req;
		last = &frwr->fr_invwr;
		last->next = NULL;
		last->wr_cqe = &frwr->fr_cqe;
		last->sg_list = NULL;
		last->num_sge = 0;
		last->opcode = IB_WR_LOCAL_INV;
		last->send_flags = IB_SEND_SIGNALED;
		last->ex.invalidate_rkey = mr->mr_handle;

		*prev = last;
		prev = &last->next;
	}

	/* Strong send queue ordering guarantees that when the
	 * last WR in the chain completes, all WRs in the chain
	 * are complete. The last completion will wake up the
	 * RPC waiter.
	 */
	frwr->fr_cqe.done = frwr_wc_localinv_done;

	/* Transport disconnect drains the receive CQ before it
	 * replaces the QP. The RPC reply handler won't call us
	 * unless ri_id->qp is a valid pointer.
	 */
	bad_wr = NULL;
	rc = ib_post_send(r_xprt->rx_ia.ri_id->qp, first, &bad_wr);
	trace_xprtrdma_post_send(req, rc);
	if (!rc)
		return;

	/* Recycle MRs in the LOCAL_INV chain that did not get posted.
	 */
	while (bad_wr) {
		frwr = container_of(bad_wr, struct rpcrdma_frwr, fr_invwr);
		mr = container_of(frwr, struct rpcrdma_mr, frwr);
		bad_wr = bad_wr->next;

		rpcrdma_mr_recycle(mr);
	}

	/* The final LOCAL_INV WR in the chain is supposed to
	 * do the wake. If it was never posted, the wake will
	 * not happen, so wake here in that case.
	 */
	rpcrdma_complete_rqst(req->rl_reply);
}