frwr_ops.c 15.8 KB
Newer Older
1
// SPDX-License-Identifier: GPL-2.0
2
/*
3
 * Copyright (c) 2015, 2017 Oracle.  All rights reserved.
4 5 6 7
 * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
 */

/* Lightweight memory registration using Fast Registration Work
8
 * Requests (FRWR).
9 10 11 12 13 14
 *
 * FRWR features ordered asynchronous registration and deregistration
 * of arbitrarily sized memory regions. This is the fastest and safest
 * but most complex memory registration mode.
 */

15 16 17
/* Normal operation
 *
 * A Memory Region is prepared for RDMA READ or WRITE using a FAST_REG
18
 * Work Request (frwr_op_map). When the RDMA operation is finished, this
19
 * Memory Region is invalidated using a LOCAL_INV Work Request
C
Chuck Lever 已提交
20
 * (frwr_op_unmap_sync).
21 22 23 24 25 26 27 28
 *
 * Typically these Work Requests are not signaled, and neither are RDMA
 * SEND Work Requests (with the exception of signaling occasionally to
 * prevent provider work queue overflows). This greatly reduces HCA
 * interrupt workload.
 *
 * As an optimization, frwr_op_unmap marks MRs INVALID before the
 * LOCAL_INV WR is posted. If posting succeeds, the MR is placed on
C
Chuck Lever 已提交
29
 * rb_mrs immediately so that no work (like managing a linked list
30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
 * under a spinlock) is needed in the completion upcall.
 *
 * But this means that frwr_op_map() can occasionally encounter an MR
 * that is INVALID but the LOCAL_INV WR has not completed. Work Queue
 * ordering prevents a subsequent FAST_REG WR from executing against
 * that MR while it is still being invalidated.
 */

/* Transport recovery
 *
 * ->op_map and the transport connect worker cannot run at the same
 * time, but ->op_unmap can fire while the transport connect worker
 * is running. Thus MR recovery is handled in ->op_map, to guarantee
 * that recovered MRs are owned by a sending RPC, and not one where
 * ->op_unmap could fire at the same time transport reconnect is
 * being done.
 *
 * When the underlying transport disconnects, MRs are left in one of
48
 * four states:
49 50 51 52 53
 *
 * INVALID:	The MR was not in use before the QP entered ERROR state.
 *
 * VALID:	The MR was registered before the QP entered ERROR state.
 *
54 55 56 57 58 59 60 61
 * FLUSHED_FR:	The MR was being registered when the QP entered ERROR
 *		state, and the pending WR was flushed.
 *
 * FLUSHED_LI:	The MR was being invalidated when the QP entered ERROR
 *		state, and the pending WR was flushed.
 *
 * When frwr_op_map encounters FLUSHED and VALID MRs, they are recovered
 * with ib_dereg_mr and then are re-initialized. Because MR recovery
62
 * allocates fresh resources, it is deferred to a workqueue, and the
C
Chuck Lever 已提交
63
 * recovered MRs are placed back on the rb_mrs list when recovery is
64 65 66 67 68 69 70 71 72
 * complete. frwr_op_map allocates another MR for the current RPC while
 * the broken MR is reset.
 *
 * To ensure that frwr_op_map doesn't encounter an MR that is marked
 * INVALID but that is about to be flushed due to a previous transport
 * disconnect, the transport connect worker attempts to drain all
 * pending send queue WRs before the transport is reconnected.
 */

73 74
#include <linux/sunrpc/rpc_rdma.h>

75 76 77 78 79 80
#include "xprt_rdma.h"

#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
# define RPCDBG_FACILITY	RPCDBG_TRANS
#endif

81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97
bool
frwr_is_supported(struct rpcrdma_ia *ia)
{
	struct ib_device_attr *attrs = &ia->ri_device->attrs;

	if (!(attrs->device_cap_flags & IB_DEVICE_MEM_MGT_EXTENSIONS))
		goto out_not_supported;
	if (attrs->max_fast_reg_page_list_len == 0)
		goto out_not_supported;
	return true;

out_not_supported:
	pr_info("rpcrdma: 'frwr' mode is not supported by device %s\n",
		ia->ri_device->name);
	return false;
}

98
static int
C
Chuck Lever 已提交
99
frwr_op_init_mr(struct rpcrdma_ia *ia, struct rpcrdma_mr *mr)
100
{
101
	unsigned int depth = ia->ri_max_frwr_depth;
C
Chuck Lever 已提交
102
	struct rpcrdma_frwr *frwr = &mr->frwr;
103 104
	int rc;

105 106
	frwr->fr_mr = ib_alloc_mr(ia->ri_pd, ia->ri_mrtype, depth);
	if (IS_ERR(frwr->fr_mr))
107 108
		goto out_mr_err;

C
Chuck Lever 已提交
109 110
	mr->mr_sg = kcalloc(depth, sizeof(*mr->mr_sg), GFP_KERNEL);
	if (!mr->mr_sg)
111 112
		goto out_list_err;

C
Chuck Lever 已提交
113
	sg_init_table(mr->mr_sg, depth);
114
	init_completion(&frwr->fr_linv_done);
115 116 117
	return 0;

out_mr_err:
118
	rc = PTR_ERR(frwr->fr_mr);
119 120 121 122 123 124 125 126
	dprintk("RPC:       %s: ib_alloc_mr status %i\n",
		__func__, rc);
	return rc;

out_list_err:
	rc = -ENOMEM;
	dprintk("RPC:       %s: sg allocation failure\n",
		__func__);
127
	ib_dereg_mr(frwr->fr_mr);
128 129 130 131
	return rc;
}

static void
C
Chuck Lever 已提交
132
frwr_op_release_mr(struct rpcrdma_mr *mr)
133 134 135
{
	int rc;

C
Chuck Lever 已提交
136 137 138
	/* Ensure MR is not on any rl_registered list */
	if (!list_empty(&mr->mr_list))
		list_del(&mr->mr_list);
139

C
Chuck Lever 已提交
140
	rc = ib_dereg_mr(mr->frwr.fr_mr);
141 142
	if (rc)
		pr_err("rpcrdma: final ib_dereg_mr for %p returned %i\n",
C
Chuck Lever 已提交
143 144 145
		       mr, rc);
	kfree(mr->mr_sg);
	kfree(mr);
146 147
}

148
static int
C
Chuck Lever 已提交
149
__frwr_mr_reset(struct rpcrdma_ia *ia, struct rpcrdma_mr *mr)
150
{
C
Chuck Lever 已提交
151
	struct rpcrdma_frwr *frwr = &mr->frwr;
152 153
	int rc;

154
	rc = ib_dereg_mr(frwr->fr_mr);
155 156
	if (rc) {
		pr_warn("rpcrdma: ib_dereg_mr status %d, frwr %p orphaned\n",
C
Chuck Lever 已提交
157
			rc, mr);
158 159 160
		return rc;
	}

161 162 163
	frwr->fr_mr = ib_alloc_mr(ia->ri_pd, ia->ri_mrtype,
				  ia->ri_max_frwr_depth);
	if (IS_ERR(frwr->fr_mr)) {
164
		pr_warn("rpcrdma: ib_alloc_mr status %ld, frwr %p orphaned\n",
C
Chuck Lever 已提交
165
			PTR_ERR(frwr->fr_mr), mr);
166
		return PTR_ERR(frwr->fr_mr);
167 168
	}

169 170
	dprintk("RPC:       %s: recovered FRWR %p\n", __func__, frwr);
	frwr->fr_state = FRWR_IS_INVALID;
171 172 173
	return 0;
}

174
/* Reset of a single FRWR. Generate a fresh rkey by replacing the MR.
175
 */
176
static void
C
Chuck Lever 已提交
177
frwr_op_recover_mr(struct rpcrdma_mr *mr)
178
{
C
Chuck Lever 已提交
179 180
	enum rpcrdma_frwr_state state = mr->frwr.fr_state;
	struct rpcrdma_xprt *r_xprt = mr->mr_xprt;
181 182 183
	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
	int rc;

C
Chuck Lever 已提交
184
	rc = __frwr_mr_reset(ia, mr);
185 186
	if (state != FRWR_FLUSHED_LI) {
		trace_xprtrdma_dma_unmap(mr);
187
		ib_dma_unmap_sg(ia->ri_device,
C
Chuck Lever 已提交
188
				mr->mr_sg, mr->mr_nents, mr->mr_dir);
189
	}
190 191
	if (rc)
		goto out_release;
192

C
Chuck Lever 已提交
193
	rpcrdma_mr_put(mr);
194
	r_xprt->rx_stats.mrs_recovered++;
195 196 197
	return;

out_release:
C
Chuck Lever 已提交
198
	pr_err("rpcrdma: FRWR reset failed %d, %p release\n", rc, mr);
199 200
	r_xprt->rx_stats.mrs_orphaned++;

C
Chuck Lever 已提交
201 202 203
	spin_lock(&r_xprt->rx_buf.rb_mrlock);
	list_del(&mr->mr_all);
	spin_unlock(&r_xprt->rx_buf.rb_mrlock);
204

C
Chuck Lever 已提交
205
	frwr_op_release_mr(mr);
206 207
}

C
Chuck Lever 已提交
208 209 210 211
static int
frwr_op_open(struct rpcrdma_ia *ia, struct rpcrdma_ep *ep,
	     struct rpcrdma_create_data_internal *cdata)
{
C
Chuck Lever 已提交
212
	struct ib_device_attr *attrs = &ia->ri_device->attrs;
C
Chuck Lever 已提交
213 214
	int depth, delta;

C
Chuck Lever 已提交
215 216 217 218
	ia->ri_mrtype = IB_MR_TYPE_MEM_REG;
	if (attrs->device_cap_flags & IB_DEVICE_SG_GAPS_REG)
		ia->ri_mrtype = IB_MR_TYPE_SG_GAPS;

219
	ia->ri_max_frwr_depth =
C
Chuck Lever 已提交
220
			min_t(unsigned int, RPCRDMA_MAX_DATA_SEGS,
C
Chuck Lever 已提交
221
			      attrs->max_fast_reg_page_list_len);
C
Chuck Lever 已提交
222
	dprintk("RPC:       %s: device's max FR page list len = %u\n",
223 224 225 226 227 228 229 230 231
		__func__, ia->ri_max_frwr_depth);

	/* Add room for frwr register and invalidate WRs.
	 * 1. FRWR reg WR for head
	 * 2. FRWR invalidate WR for head
	 * 3. N FRWR reg WRs for pagelist
	 * 4. N FRWR invalidate WRs for pagelist
	 * 5. FRWR reg WR for tail
	 * 6. FRWR invalidate WR for tail
C
Chuck Lever 已提交
232 233 234 235
	 * 7. The RDMA_SEND WR
	 */
	depth = 7;

236
	/* Calculate N if the device max FRWR depth is smaller than
C
Chuck Lever 已提交
237 238
	 * RPCRDMA_MAX_DATA_SEGS.
	 */
239 240
	if (ia->ri_max_frwr_depth < RPCRDMA_MAX_DATA_SEGS) {
		delta = RPCRDMA_MAX_DATA_SEGS - ia->ri_max_frwr_depth;
C
Chuck Lever 已提交
241
		do {
242 243
			depth += 2; /* FRWR reg + invalidate */
			delta -= ia->ri_max_frwr_depth;
C
Chuck Lever 已提交
244 245 246 247
		} while (delta > 0);
	}

	ep->rep_attr.cap.max_send_wr *= depth;
C
Chuck Lever 已提交
248 249
	if (ep->rep_attr.cap.max_send_wr > attrs->max_qp_wr) {
		cdata->max_requests = attrs->max_qp_wr / depth;
C
Chuck Lever 已提交
250 251 252 253 254 255
		if (!cdata->max_requests)
			return -EINVAL;
		ep->rep_attr.cap.max_send_wr = cdata->max_requests *
					       depth;
	}

256
	ia->ri_max_segs = max_t(unsigned int, 1, RPCRDMA_MAX_DATA_SEGS /
257
				ia->ri_max_frwr_depth);
C
Chuck Lever 已提交
258 259 260
	return 0;
}

261 262 263 264 265 266 267 268 269
/* FRWR mode conveys a list of pages per chunk segment. The
 * maximum length of that list is the FRWR page list depth.
 */
static size_t
frwr_op_maxpages(struct rpcrdma_xprt *r_xprt)
{
	struct rpcrdma_ia *ia = &r_xprt->rx_ia;

	return min_t(unsigned int, RPCRDMA_MAX_DATA_SEGS,
270
		     RPCRDMA_MAX_HDR_SEGS * ia->ri_max_frwr_depth);
271 272
}

273
static void
274
__frwr_sendcompletion_flush(struct ib_wc *wc, const char *wr)
275 276 277 278 279 280 281 282
{
	if (wc->status != IB_WC_WR_FLUSH_ERR)
		pr_err("rpcrdma: %s: %s (%u/0x%x)\n",
		       wr, ib_wc_status_msg(wc->status),
		       wc->status, wc->vendor_err);
}

/**
283
 * frwr_wc_fastreg - Invoked by RDMA provider for a flushed FastReg WC
284 285
 * @cq:	completion queue (ignored)
 * @wc:	completed WR
286 287
 *
 */
288
static void
289
frwr_wc_fastreg(struct ib_cq *cq, struct ib_wc *wc)
290
{
291 292 293
	struct ib_cqe *cqe = wc->wr_cqe;
	struct rpcrdma_frwr *frwr =
			container_of(cqe, struct rpcrdma_frwr, fr_cqe);
294

295 296
	/* WARNING: Only wr_cqe and status are reliable at this point */
	if (wc->status != IB_WC_SUCCESS) {
297
		frwr->fr_state = FRWR_FLUSHED_FR;
298
		__frwr_sendcompletion_flush(wc, "fastreg");
299
	}
300
	trace_xprtrdma_wc_fastreg(wc, frwr);
301 302
}

303
/**
304
 * frwr_wc_localinv - Invoked by RDMA provider for a flushed LocalInv WC
305 306 307 308
 * @cq:	completion queue (ignored)
 * @wc:	completed WR
 *
 */
309
static void
310
frwr_wc_localinv(struct ib_cq *cq, struct ib_wc *wc)
311
{
312 313 314
	struct ib_cqe *cqe = wc->wr_cqe;
	struct rpcrdma_frwr *frwr = container_of(cqe, struct rpcrdma_frwr,
						 fr_cqe);
315

316 317
	/* WARNING: Only wr_cqe and status are reliable at this point */
	if (wc->status != IB_WC_SUCCESS) {
318
		frwr->fr_state = FRWR_FLUSHED_LI;
319
		__frwr_sendcompletion_flush(wc, "localinv");
320
	}
321
	trace_xprtrdma_wc_li(wc, frwr);
322
}
323

324
/**
325
 * frwr_wc_localinv_wake - Invoked by RDMA provider for a signaled LocalInv WC
326 327 328 329 330 331 332 333
 * @cq:	completion queue (ignored)
 * @wc:	completed WR
 *
 * Awaken anyone waiting for an MR to finish being fenced.
 */
static void
frwr_wc_localinv_wake(struct ib_cq *cq, struct ib_wc *wc)
{
334 335 336
	struct ib_cqe *cqe = wc->wr_cqe;
	struct rpcrdma_frwr *frwr = container_of(cqe, struct rpcrdma_frwr,
						 fr_cqe);
337 338

	/* WARNING: Only wr_cqe and status are reliable at this point */
339
	if (wc->status != IB_WC_SUCCESS) {
340
		frwr->fr_state = FRWR_FLUSHED_LI;
341 342
		__frwr_sendcompletion_flush(wc, "localinv");
	}
343
	complete(&frwr->fr_linv_done);
344
	trace_xprtrdma_wc_li_wake(wc, frwr);
345 346
}

347
/* Post a REG_MR Work Request to register a memory region
348 349
 * for remote access via RDMA READ or RDMA WRITE.
 */
350
static struct rpcrdma_mr_seg *
351
frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
C
Chuck Lever 已提交
352
	    int nsegs, bool writing, struct rpcrdma_mr **out)
353 354
{
	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
C
Chuck Lever 已提交
355
	bool holes_ok = ia->ri_mrtype == IB_MR_TYPE_SG_GAPS;
356
	struct rpcrdma_frwr *frwr;
C
Chuck Lever 已提交
357 358
	struct rpcrdma_mr *mr;
	struct ib_mr *ibmr;
359
	struct ib_reg_wr *reg_wr;
360
	int i, n;
361 362
	u8 key;

C
Chuck Lever 已提交
363
	mr = NULL;
364
	do {
C
Chuck Lever 已提交
365 366 367 368
		if (mr)
			rpcrdma_mr_defer_recovery(mr);
		mr = rpcrdma_mr_get(r_xprt);
		if (!mr)
369
			return ERR_PTR(-EAGAIN);
C
Chuck Lever 已提交
370 371
	} while (mr->frwr.fr_state != FRWR_IS_INVALID);
	frwr = &mr->frwr;
372 373 374 375
	frwr->fr_state = FRWR_IS_VALID;

	if (nsegs > ia->ri_max_frwr_depth)
		nsegs = ia->ri_max_frwr_depth;
376 377
	for (i = 0; i < nsegs;) {
		if (seg->mr_page)
C
Chuck Lever 已提交
378
			sg_set_page(&mr->mr_sg[i],
379 380 381 382
				    seg->mr_page,
				    seg->mr_len,
				    offset_in_page(seg->mr_offset));
		else
C
Chuck Lever 已提交
383
			sg_set_buf(&mr->mr_sg[i], seg->mr_offset,
384 385
				   seg->mr_len);

386 387
		++seg;
		++i;
C
Chuck Lever 已提交
388 389
		if (holes_ok)
			continue;
390 391 392 393
		if ((i < nsegs && offset_in_page(seg->mr_offset)) ||
		    offset_in_page((seg-1)->mr_offset + (seg-1)->mr_len))
			break;
	}
C
Chuck Lever 已提交
394
	mr->mr_dir = rpcrdma_data_dir(writing);
395

C
Chuck Lever 已提交
396 397
	mr->mr_nents = ib_dma_map_sg(ia->ri_device, mr->mr_sg, i, mr->mr_dir);
	if (!mr->mr_nents)
398 399
		goto out_dmamap_err;

C
Chuck Lever 已提交
400 401 402
	ibmr = frwr->fr_mr;
	n = ib_map_mr_sg(ibmr, mr->mr_sg, mr->mr_nents, NULL, PAGE_SIZE);
	if (unlikely(n != mr->mr_nents))
403
		goto out_mapmr_err;
404

C
Chuck Lever 已提交
405 406
	key = (u8)(ibmr->rkey & 0x000000FF);
	ib_update_fast_reg_key(ibmr, ++key);
407

408
	reg_wr = &frwr->fr_regwr;
C
Chuck Lever 已提交
409 410
	reg_wr->mr = ibmr;
	reg_wr->key = ibmr->rkey;
411 412 413
	reg_wr->access = writing ?
			 IB_ACCESS_REMOTE_WRITE | IB_ACCESS_LOCAL_WRITE :
			 IB_ACCESS_REMOTE_READ;
414

C
Chuck Lever 已提交
415 416 417
	mr->mr_handle = ibmr->rkey;
	mr->mr_length = ibmr->length;
	mr->mr_offset = ibmr->iova;
418

C
Chuck Lever 已提交
419
	*out = mr;
420
	return seg;
421 422

out_dmamap_err:
423
	pr_err("rpcrdma: failed to DMA map sg %p sg_nents %d\n",
C
Chuck Lever 已提交
424
	       mr->mr_sg, i);
425
	frwr->fr_state = FRWR_IS_INVALID;
C
Chuck Lever 已提交
426
	rpcrdma_mr_put(mr);
427
	return ERR_PTR(-EIO);
428 429

out_mapmr_err:
430
	pr_err("rpcrdma: failed to map mr %p (%d/%d)\n",
C
Chuck Lever 已提交
431 432
	       frwr->fr_mr, n, mr->mr_nents);
	rpcrdma_mr_defer_recovery(mr);
433
	return ERR_PTR(-EIO);
434
}
435

436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467
/* Post Send WR containing the RPC Call message.
 *
 * For FRMR, chain any FastReg WRs to the Send WR. Only a
 * single ib_post_send call is needed to register memory
 * and then post the Send WR.
 */
static int
frwr_op_send(struct rpcrdma_ia *ia, struct rpcrdma_req *req)
{
	struct ib_send_wr *post_wr, *bad_wr;
	struct rpcrdma_mr *mr;

	post_wr = &req->rl_sendctx->sc_wr;
	list_for_each_entry(mr, &req->rl_registered, mr_list) {
		struct rpcrdma_frwr *frwr;

		frwr = &mr->frwr;

		frwr->fr_cqe.done = frwr_wc_fastreg;
		frwr->fr_regwr.wr.next = post_wr;
		frwr->fr_regwr.wr.wr_cqe = &frwr->fr_cqe;
		frwr->fr_regwr.wr.num_sge = 0;
		frwr->fr_regwr.wr.opcode = IB_WR_REG_MR;
		frwr->fr_regwr.wr.send_flags = 0;

		post_wr = &frwr->fr_regwr.wr;
	}

	/* If ib_post_send fails, the next ->send_request for
	 * @req will queue these MWs for recovery.
	 */
	return ib_post_send(ia->ri_id->qp, post_wr, &bad_wr);
468 469
}

C
Chuck Lever 已提交
470
/* Handle a remotely invalidated mr on the @mrs list
471 472
 */
static void
C
Chuck Lever 已提交
473
frwr_op_reminv(struct rpcrdma_rep *rep, struct list_head *mrs)
474
{
C
Chuck Lever 已提交
475
	struct rpcrdma_mr *mr;
476

C
Chuck Lever 已提交
477 478 479
	list_for_each_entry(mr, mrs, mr_list)
		if (mr->mr_handle == rep->rr_inv_rkey) {
			list_del(&mr->mr_list);
480
			trace_xprtrdma_remoteinv(mr);
C
Chuck Lever 已提交
481
			mr->frwr.fr_state = FRWR_IS_INVALID;
482
			rpcrdma_mr_unmap_and_put(mr);
483 484 485 486
			break;	/* only one invalidated MR per RPC */
		}
}

487 488 489 490
/* Invalidate all memory regions that were registered for "req".
 *
 * Sleeps until it is safe for the host CPU to access the
 * previously mapped memory regions.
491
 *
C
Chuck Lever 已提交
492
 * Caller ensures that @mrs is not empty before the call. This
493
 * function empties the list.
494 495
 */
static void
C
Chuck Lever 已提交
496
frwr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct list_head *mrs)
497
{
C
Chuck Lever 已提交
498
	struct ib_send_wr *first, **prev, *last, *bad_wr;
499
	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
500
	struct rpcrdma_frwr *frwr;
C
Chuck Lever 已提交
501
	struct rpcrdma_mr *mr;
502
	int count, rc;
503

504
	/* ORDER: Invalidate all of the MRs first
505 506 507 508
	 *
	 * Chain the LOCAL_INV Work Requests and post them with
	 * a single ib_post_send() call.
	 */
509
	frwr = NULL;
510
	count = 0;
C
Chuck Lever 已提交
511
	prev = &first;
C
Chuck Lever 已提交
512 513 514 515
	list_for_each_entry(mr, mrs, mr_list) {
		mr->frwr.fr_state = FRWR_IS_INVALID;

		frwr = &mr->frwr;
516
		trace_xprtrdma_localinv(mr);
C
Chuck Lever 已提交
517

518 519
		frwr->fr_cqe.done = frwr_wc_localinv;
		last = &frwr->fr_invwr;
C
Chuck Lever 已提交
520
		memset(last, 0, sizeof(*last));
521
		last->wr_cqe = &frwr->fr_cqe;
C
Chuck Lever 已提交
522
		last->opcode = IB_WR_LOCAL_INV;
C
Chuck Lever 已提交
523
		last->ex.invalidate_rkey = mr->mr_handle;
524
		count++;
525

C
Chuck Lever 已提交
526 527
		*prev = last;
		prev = &last->next;
528
	}
529
	if (!frwr)
530
		goto unmap;
531 532 533 534 535

	/* Strong send queue ordering guarantees that when the
	 * last WR in the chain completes, all WRs in the chain
	 * are complete.
	 */
C
Chuck Lever 已提交
536
	last->send_flags = IB_SEND_SIGNALED;
537 538
	frwr->fr_cqe.done = frwr_wc_localinv_wake;
	reinit_completion(&frwr->fr_linv_done);
539

540 541 542 543
	/* Transport disconnect drains the receive CQ before it
	 * replaces the QP. The RPC reply handler won't call us
	 * unless ri_id->qp is a valid pointer.
	 */
544
	r_xprt->rx_stats.local_inv_needed++;
545
	bad_wr = NULL;
C
Chuck Lever 已提交
546
	rc = ib_post_send(ia->ri_id->qp, first, &bad_wr);
547
	if (bad_wr != first)
548
		wait_for_completion(&frwr->fr_linv_done);
549 550
	if (rc)
		goto reset_mrs;
551

552
	/* ORDER: Now DMA unmap all of the MRs, and return
C
Chuck Lever 已提交
553
	 * them to the free MR list.
554
	 */
555
unmap:
C
Chuck Lever 已提交
556 557
	while (!list_empty(mrs)) {
		mr = rpcrdma_mr_pop(mrs);
558
		rpcrdma_mr_unmap_and_put(mr);
559
	}
560
	return;
561

562
reset_mrs:
563
	pr_err("rpcrdma: FRWR invalidate ib_post_send returned %i\n", rc);
564

565
	/* Find and reset the MRs in the LOCAL_INV WRs that did not
566
	 * get posted.
567
	 */
568
	while (bad_wr) {
569 570
		frwr = container_of(bad_wr, struct rpcrdma_frwr,
				    fr_invwr);
C
Chuck Lever 已提交
571
		mr = container_of(frwr, struct rpcrdma_mr, frwr);
572

C
Chuck Lever 已提交
573
		__frwr_mr_reset(ia, mr);
574 575

		bad_wr = bad_wr->next;
576 577
	}
	goto unmap;
578
}
579

580
const struct rpcrdma_memreg_ops rpcrdma_frwr_memreg_ops = {
581
	.ro_map				= frwr_op_map,
582
	.ro_send			= frwr_op_send,
583
	.ro_reminv			= frwr_op_reminv,
584
	.ro_unmap_sync			= frwr_op_unmap_sync,
585
	.ro_recover_mr			= frwr_op_recover_mr,
C
Chuck Lever 已提交
586
	.ro_open			= frwr_op_open,
587
	.ro_maxpages			= frwr_op_maxpages,
C
Chuck Lever 已提交
588 589
	.ro_init_mr			= frwr_op_init_mr,
	.ro_release_mr			= frwr_op_release_mr,
590
	.ro_displayname			= "frwr",
591
	.ro_send_w_inv_ok		= RPCRDMA_CMP_F_SND_W_INV_OK,
592
};