frwr_ops.c 16.6 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13
/*
 * Copyright (c) 2015 Oracle.  All rights reserved.
 * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
 */

/* Lightweight memory registration using Fast Registration Work
 * Requests (FRWR). Also referred to sometimes as FRMR mode.
 *
 * FRWR features ordered asynchronous registration and deregistration
 * of arbitrarily sized memory regions. This is the fastest and safest
 * but most complex memory registration mode.
 */

14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69
/* Normal operation
 *
 * A Memory Region is prepared for RDMA READ or WRITE using a FAST_REG
 * Work Request (frmr_op_map). When the RDMA operation is finished, this
 * Memory Region is invalidated using a LOCAL_INV Work Request
 * (frmr_op_unmap).
 *
 * Typically these Work Requests are not signaled, and neither are RDMA
 * SEND Work Requests (with the exception of signaling occasionally to
 * prevent provider work queue overflows). This greatly reduces HCA
 * interrupt workload.
 *
 * As an optimization, frwr_op_unmap marks MRs INVALID before the
 * LOCAL_INV WR is posted. If posting succeeds, the MR is placed on
 * rb_mws immediately so that no work (like managing a linked list
 * under a spinlock) is needed in the completion upcall.
 *
 * But this means that frwr_op_map() can occasionally encounter an MR
 * that is INVALID but the LOCAL_INV WR has not completed. Work Queue
 * ordering prevents a subsequent FAST_REG WR from executing against
 * that MR while it is still being invalidated.
 */

/* Transport recovery
 *
 * ->op_map and the transport connect worker cannot run at the same
 * time, but ->op_unmap can fire while the transport connect worker
 * is running. Thus MR recovery is handled in ->op_map, to guarantee
 * that recovered MRs are owned by a sending RPC, and not one where
 * ->op_unmap could fire at the same time transport reconnect is
 * being done.
 *
 * When the underlying transport disconnects, MRs are left in one of
 * three states:
 *
 * INVALID:	The MR was not in use before the QP entered ERROR state.
 *		(Or, the LOCAL_INV WR has not completed or flushed yet).
 *
 * STALE:	The MR was being registered or unregistered when the QP
 *		entered ERROR state, and the pending WR was flushed.
 *
 * VALID:	The MR was registered before the QP entered ERROR state.
 *
 * When frwr_op_map encounters STALE and VALID MRs, they are recovered
 * with ib_dereg_mr and then are re-initialized. Beause MR recovery
 * allocates fresh resources, it is deferred to a workqueue, and the
 * recovered MRs are placed back on the rb_mws list when recovery is
 * complete. frwr_op_map allocates another MR for the current RPC while
 * the broken MR is reset.
 *
 * To ensure that frwr_op_map doesn't encounter an MR that is marked
 * INVALID but that is about to be flushed due to a previous transport
 * disconnect, the transport connect worker attempts to drain all
 * pending send queue WRs before the transport is reconnected.
 */

70 71 72 73 74 75
#include "xprt_rdma.h"

#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
# define RPCDBG_FACILITY	RPCDBG_TRANS
#endif

76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92
bool
frwr_is_supported(struct rpcrdma_ia *ia)
{
	struct ib_device_attr *attrs = &ia->ri_device->attrs;

	if (!(attrs->device_cap_flags & IB_DEVICE_MEM_MGT_EXTENSIONS))
		goto out_not_supported;
	if (attrs->max_fast_reg_page_list_len == 0)
		goto out_not_supported;
	return true;

out_not_supported:
	pr_info("rpcrdma: 'frwr' mode is not supported by device %s\n",
		ia->ri_device->name);
	return false;
}

93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136
static int
__frwr_init(struct rpcrdma_mw *r, struct ib_pd *pd, unsigned int depth)
{
	struct rpcrdma_frmr *f = &r->frmr;
	int rc;

	f->fr_mr = ib_alloc_mr(pd, IB_MR_TYPE_MEM_REG, depth);
	if (IS_ERR(f->fr_mr))
		goto out_mr_err;

	r->mw_sg = kcalloc(depth, sizeof(*r->mw_sg), GFP_KERNEL);
	if (!r->mw_sg)
		goto out_list_err;

	sg_init_table(r->mw_sg, depth);
	init_completion(&f->fr_linv_done);
	return 0;

out_mr_err:
	rc = PTR_ERR(f->fr_mr);
	dprintk("RPC:       %s: ib_alloc_mr status %i\n",
		__func__, rc);
	return rc;

out_list_err:
	rc = -ENOMEM;
	dprintk("RPC:       %s: sg allocation failure\n",
		__func__);
	ib_dereg_mr(f->fr_mr);
	return rc;
}

static void
__frwr_release(struct rpcrdma_mw *r)
{
	int rc;

	rc = ib_dereg_mr(r->frmr.fr_mr);
	if (rc)
		pr_err("rpcrdma: final ib_dereg_mr for %p returned %i\n",
		       r, rc);
	kfree(r->mw_sg);
}

137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162
static int
__frwr_reset_mr(struct rpcrdma_ia *ia, struct rpcrdma_mw *r)
{
	struct rpcrdma_frmr *f = &r->frmr;
	int rc;

	rc = ib_dereg_mr(f->fr_mr);
	if (rc) {
		pr_warn("rpcrdma: ib_dereg_mr status %d, frwr %p orphaned\n",
			rc, r);
		return rc;
	}

	f->fr_mr = ib_alloc_mr(ia->ri_pd, IB_MR_TYPE_MEM_REG,
			       ia->ri_max_frmr_depth);
	if (IS_ERR(f->fr_mr)) {
		pr_warn("rpcrdma: ib_alloc_mr status %ld, frwr %p orphaned\n",
			PTR_ERR(f->fr_mr), r);
		return PTR_ERR(f->fr_mr);
	}

	dprintk("RPC:       %s: recovered FRMR %p\n", __func__, r);
	f->fr_state = FRMR_IS_INVALID;
	return 0;
}

163 164 165 166 167 168
/* Reset of a single FRMR. Generate a fresh rkey by replacing the MR.
 *
 * There's no recovery if this fails. The FRMR is abandoned, but
 * remains in rb_all. It will be cleaned up when the transport is
 * destroyed.
 */
169
static void
170
frwr_op_recover_mr(struct rpcrdma_mw *mw)
171
{
172
	struct rpcrdma_xprt *r_xprt = mw->mw_xprt;
173 174 175 176
	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
	int rc;

	rc = __frwr_reset_mr(ia, mw);
177
	ib_dma_unmap_sg(ia->ri_device, mw->mw_sg, mw->mw_nents, mw->mw_dir);
178 179 180 181
	if (rc) {
		pr_err("rpcrdma: FRMR reset status %d, %p orphaned\n",
		       rc, mw);
		r_xprt->rx_stats.mrs_orphaned++;
182
		return;
183
	}
184

185 186
	rpcrdma_put_mw(r_xprt, mw);
	r_xprt->rx_stats.mrs_recovered++;
187 188
}

C
Chuck Lever 已提交
189 190 191 192 193 194 195 196
static int
frwr_op_open(struct rpcrdma_ia *ia, struct rpcrdma_ep *ep,
	     struct rpcrdma_create_data_internal *cdata)
{
	int depth, delta;

	ia->ri_max_frmr_depth =
			min_t(unsigned int, RPCRDMA_MAX_DATA_SEGS,
197
			      ia->ri_device->attrs.max_fast_reg_page_list_len);
C
Chuck Lever 已提交
198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223
	dprintk("RPC:       %s: device's max FR page list len = %u\n",
		__func__, ia->ri_max_frmr_depth);

	/* Add room for frmr register and invalidate WRs.
	 * 1. FRMR reg WR for head
	 * 2. FRMR invalidate WR for head
	 * 3. N FRMR reg WRs for pagelist
	 * 4. N FRMR invalidate WRs for pagelist
	 * 5. FRMR reg WR for tail
	 * 6. FRMR invalidate WR for tail
	 * 7. The RDMA_SEND WR
	 */
	depth = 7;

	/* Calculate N if the device max FRMR depth is smaller than
	 * RPCRDMA_MAX_DATA_SEGS.
	 */
	if (ia->ri_max_frmr_depth < RPCRDMA_MAX_DATA_SEGS) {
		delta = RPCRDMA_MAX_DATA_SEGS - ia->ri_max_frmr_depth;
		do {
			depth += 2; /* FRMR reg + invalidate */
			delta -= ia->ri_max_frmr_depth;
		} while (delta > 0);
	}

	ep->rep_attr.cap.max_send_wr *= depth;
224 225
	if (ep->rep_attr.cap.max_send_wr > ia->ri_device->attrs.max_qp_wr) {
		cdata->max_requests = ia->ri_device->attrs.max_qp_wr / depth;
C
Chuck Lever 已提交
226 227 228 229 230 231
		if (!cdata->max_requests)
			return -EINVAL;
		ep->rep_attr.cap.max_send_wr = cdata->max_requests *
					       depth;
	}

C
Chuck Lever 已提交
232 233 234
	rpcrdma_set_max_header_sizes(ia, cdata, max_t(unsigned int, 1,
						      RPCRDMA_MAX_DATA_SEGS /
						      ia->ri_max_frmr_depth));
C
Chuck Lever 已提交
235 236 237
	return 0;
}

238 239 240 241 242 243 244 245 246
/* FRWR mode conveys a list of pages per chunk segment. The
 * maximum length of that list is the FRWR page list depth.
 */
static size_t
frwr_op_maxpages(struct rpcrdma_xprt *r_xprt)
{
	struct rpcrdma_ia *ia = &r_xprt->rx_ia;

	return min_t(unsigned int, RPCRDMA_MAX_DATA_SEGS,
247
		     RPCRDMA_MAX_HDR_SEGS * ia->ri_max_frmr_depth);
248 249
}

250 251 252 253 254 255 256 257 258 259 260 261 262 263 264
static void
__frwr_sendcompletion_flush(struct ib_wc *wc, struct rpcrdma_frmr *frmr,
			    const char *wr)
{
	frmr->fr_state = FRMR_IS_STALE;
	if (wc->status != IB_WC_WR_FLUSH_ERR)
		pr_err("rpcrdma: %s: %s (%u/0x%x)\n",
		       wr, ib_wc_status_msg(wc->status),
		       wc->status, wc->vendor_err);
}

/**
 * frwr_wc_fastreg - Invoked by RDMA provider for each polled FastReg WC
 * @cq:	completion queue (ignored)
 * @wc:	completed WR
265 266
 *
 */
267
static void
268
frwr_wc_fastreg(struct ib_cq *cq, struct ib_wc *wc)
269
{
270 271
	struct rpcrdma_frmr *frmr;
	struct ib_cqe *cqe;
272

273 274 275 276 277 278
	/* WARNING: Only wr_cqe and status are reliable at this point */
	if (wc->status != IB_WC_SUCCESS) {
		cqe = wc->wr_cqe;
		frmr = container_of(cqe, struct rpcrdma_frmr, fr_cqe);
		__frwr_sendcompletion_flush(wc, frmr, "fastreg");
	}
279 280
}

281 282 283 284 285 286
/**
 * frwr_wc_localinv - Invoked by RDMA provider for each polled LocalInv WC
 * @cq:	completion queue (ignored)
 * @wc:	completed WR
 *
 */
287
static void
288
frwr_wc_localinv(struct ib_cq *cq, struct ib_wc *wc)
289
{
290 291
	struct rpcrdma_frmr *frmr;
	struct ib_cqe *cqe;
292

293 294 295 296 297 298 299
	/* WARNING: Only wr_cqe and status are reliable at this point */
	if (wc->status != IB_WC_SUCCESS) {
		cqe = wc->wr_cqe;
		frmr = container_of(cqe, struct rpcrdma_frmr, fr_cqe);
		__frwr_sendcompletion_flush(wc, frmr, "localinv");
	}
}
300

301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319
/**
 * frwr_wc_localinv - Invoked by RDMA provider for each polled LocalInv WC
 * @cq:	completion queue (ignored)
 * @wc:	completed WR
 *
 * Awaken anyone waiting for an MR to finish being fenced.
 */
static void
frwr_wc_localinv_wake(struct ib_cq *cq, struct ib_wc *wc)
{
	struct rpcrdma_frmr *frmr;
	struct ib_cqe *cqe;

	/* WARNING: Only wr_cqe and status are reliable at this point */
	cqe = wc->wr_cqe;
	frmr = container_of(cqe, struct rpcrdma_frmr, fr_cqe);
	if (wc->status != IB_WC_SUCCESS)
		__frwr_sendcompletion_flush(wc, frmr, "localinv");
	complete_all(&frmr->fr_linv_done);
320 321
}

C
Chuck Lever 已提交
322 323 324 325 326 327 328 329
static int
frwr_op_init(struct rpcrdma_xprt *r_xprt)
{
	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
	unsigned int depth = r_xprt->rx_ia.ri_max_frmr_depth;
	struct ib_pd *pd = r_xprt->rx_ia.ri_pd;
	int i;

C
Chuck Lever 已提交
330
	spin_lock_init(&buf->rb_mwlock);
C
Chuck Lever 已提交
331 332 333
	INIT_LIST_HEAD(&buf->rb_mws);
	INIT_LIST_HEAD(&buf->rb_all);

334 335 336 337
	i = max_t(int, RPCRDMA_MAX_DATA_SEGS / depth, 1);
	i += 2;				/* head + tail */
	i *= buf->rb_max_requests;	/* one set for each RPC slot */
	dprintk("RPC:       %s: initalizing %d FRMRs\n", __func__, i);
C
Chuck Lever 已提交
338 339 340 341 342 343 344 345 346

	while (i--) {
		struct rpcrdma_mw *r;
		int rc;

		r = kzalloc(sizeof(*r), GFP_KERNEL);
		if (!r)
			return -ENOMEM;

347
		rc = __frwr_init(r, pd, depth);
C
Chuck Lever 已提交
348 349 350 351 352
		if (rc) {
			kfree(r);
			return rc;
		}

353
		r->mw_xprt = r_xprt;
C
Chuck Lever 已提交
354 355 356 357 358 359 360
		list_add(&r->mw_list, &buf->rb_mws);
		list_add(&r->mw_all, &buf->rb_all);
	}

	return 0;
}

361
/* Post a REG_MR Work Request to register a memory region
362 363 364 365 366 367 368 369
 * for remote access via RDMA READ or RDMA WRITE.
 */
static int
frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
	    int nsegs, bool writing)
{
	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
	struct rpcrdma_mr_seg *seg1 = seg;
370 371 372
	struct rpcrdma_mw *mw;
	struct rpcrdma_frmr *frmr;
	struct ib_mr *mr;
373
	struct ib_reg_wr *reg_wr;
C
Christoph Hellwig 已提交
374
	struct ib_send_wr *bad_wr;
375
	int rc, i, n, dma_nents;
376 377
	u8 key;

378 379 380 381
	mw = seg1->rl_mw;
	seg1->rl_mw = NULL;
	do {
		if (mw)
382
			rpcrdma_defer_mr_recovery(mw);
383 384
		mw = rpcrdma_get_mw(r_xprt);
		if (!mw)
385
			return -ENOBUFS;
386 387
	} while (mw->frmr.fr_state != FRMR_IS_INVALID);
	frmr = &mw->frmr;
388
	frmr->fr_state = FRMR_IS_VALID;
389
	mr = frmr->fr_mr;
390
	reg_wr = &frmr->fr_regwr;
391

392 393
	if (nsegs > ia->ri_max_frmr_depth)
		nsegs = ia->ri_max_frmr_depth;
394 395
	for (i = 0; i < nsegs;) {
		if (seg->mr_page)
396
			sg_set_page(&mw->mw_sg[i],
397 398 399 400
				    seg->mr_page,
				    seg->mr_len,
				    offset_in_page(seg->mr_offset));
		else
401
			sg_set_buf(&mw->mw_sg[i], seg->mr_offset,
402 403
				   seg->mr_len);

404 405
		++seg;
		++i;
406

407 408 409 410 411
		/* Check for holes */
		if ((i < nsegs && offset_in_page(seg->mr_offset)) ||
		    offset_in_page((seg-1)->mr_offset + (seg-1)->mr_len))
			break;
	}
412 413
	mw->mw_nents = i;
	mw->mw_dir = rpcrdma_data_dir(writing);
414 415
	if (i == 0)
		goto out_dmamap_err;
416

417 418 419 420 421 422 423 424
	dma_nents = ib_dma_map_sg(ia->ri_device,
				  mw->mw_sg, mw->mw_nents, mw->mw_dir);
	if (!dma_nents)
		goto out_dmamap_err;

	n = ib_map_mr_sg(mr, mw->mw_sg, mw->mw_nents, NULL, PAGE_SIZE);
	if (unlikely(n != mw->mw_nents))
		goto out_mapmr_err;
425 426

	dprintk("RPC:       %s: Using frmr %p to map %u segments (%u bytes)\n",
427
		__func__, mw, mw->mw_nents, mr->length);
428

429 430
	key = (u8)(mr->rkey & 0x000000FF);
	ib_update_fast_reg_key(mr, ++key);
431

432 433
	reg_wr->wr.next = NULL;
	reg_wr->wr.opcode = IB_WR_REG_MR;
434 435
	frmr->fr_cqe.done = frwr_wc_fastreg;
	reg_wr->wr.wr_cqe = &frmr->fr_cqe;
436 437 438 439 440 441 442
	reg_wr->wr.num_sge = 0;
	reg_wr->wr.send_flags = 0;
	reg_wr->mr = mr;
	reg_wr->key = mr->rkey;
	reg_wr->access = writing ?
			 IB_ACCESS_REMOTE_WRITE | IB_ACCESS_LOCAL_WRITE :
			 IB_ACCESS_REMOTE_READ;
443 444

	DECR_CQCOUNT(&r_xprt->rx_ep);
445
	rc = ib_post_send(ia->ri_id->qp, &reg_wr->wr, &bad_wr);
446 447 448
	if (rc)
		goto out_senderr;

449
	seg1->rl_mw = mw;
450
	seg1->mr_rkey = mr->rkey;
451
	seg1->mr_base = mr->iova;
452
	seg1->mr_nsegs = mw->mw_nents;
453 454
	seg1->mr_len = mr->length;

455 456 457 458 459
	return mw->mw_nents;

out_dmamap_err:
	pr_err("rpcrdma: failed to dma map sg %p sg_nents %u\n",
	       mw->mw_sg, mw->mw_nents);
460
	rpcrdma_defer_mr_recovery(mw);
461
	return -EIO;
462 463 464 465

out_mapmr_err:
	pr_err("rpcrdma: failed to map mr %p (%u/%u)\n",
	       frmr->fr_mr, n, mw->mw_nents);
466
	rpcrdma_defer_mr_recovery(mw);
467
	return -EIO;
468 469

out_senderr:
470
	pr_err("rpcrdma: FRMR registration ib_post_send returned %i\n", rc);
471
	rpcrdma_defer_mr_recovery(mw);
472
	return -ENOTCONN;
473 474
}

475 476 477 478
static struct ib_send_wr *
__frwr_prepare_linv_wr(struct rpcrdma_mr_seg *seg)
{
	struct rpcrdma_mw *mw = seg->rl_mw;
479
	struct rpcrdma_frmr *f = &mw->frmr;
480 481 482 483 484 485
	struct ib_send_wr *invalidate_wr;

	f->fr_state = FRMR_IS_INVALID;
	invalidate_wr = &f->fr_invwr;

	memset(invalidate_wr, 0, sizeof(*invalidate_wr));
486 487
	f->fr_cqe.done = frwr_wc_localinv;
	invalidate_wr->wr_cqe = &f->fr_cqe;
488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506
	invalidate_wr->opcode = IB_WR_LOCAL_INV;
	invalidate_wr->ex.invalidate_rkey = f->fr_mr->rkey;

	return invalidate_wr;
}

/* Invalidate all memory regions that were registered for "req".
 *
 * Sleeps until it is safe for the host CPU to access the
 * previously mapped memory regions.
 */
static void
frwr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
{
	struct ib_send_wr *invalidate_wrs, *pos, *prev, *bad_wr;
	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
	struct rpcrdma_mr_seg *seg;
	unsigned int i, nchunks;
	struct rpcrdma_frmr *f;
507
	struct rpcrdma_mw *mw;
508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531
	int rc;

	dprintk("RPC:       %s: req %p\n", __func__, req);

	/* ORDER: Invalidate all of the req's MRs first
	 *
	 * Chain the LOCAL_INV Work Requests and post them with
	 * a single ib_post_send() call.
	 */
	invalidate_wrs = pos = prev = NULL;
	seg = NULL;
	for (i = 0, nchunks = req->rl_nchunks; nchunks; nchunks--) {
		seg = &req->rl_segments[i];

		pos = __frwr_prepare_linv_wr(seg);

		if (!invalidate_wrs)
			invalidate_wrs = pos;
		else
			prev->next = pos;
		prev = pos;

		i += seg->mr_nsegs;
	}
532
	f = &seg->rl_mw->frmr;
533 534 535 536 537 538

	/* Strong send queue ordering guarantees that when the
	 * last WR in the chain completes, all WRs in the chain
	 * are complete.
	 */
	f->fr_invwr.send_flags = IB_SEND_SIGNALED;
539 540
	f->fr_cqe.done = frwr_wc_localinv_wake;
	reinit_completion(&f->fr_linv_done);
541 542 543 544 545 546 547
	INIT_CQCOUNT(&r_xprt->rx_ep);

	/* Transport disconnect drains the receive CQ before it
	 * replaces the QP. The RPC reply handler won't call us
	 * unless ri_id->qp is a valid pointer.
	 */
	rc = ib_post_send(ia->ri_id->qp, invalidate_wrs, &bad_wr);
548 549
	if (rc)
		goto reset_mrs;
550 551 552 553 554 555

	wait_for_completion(&f->fr_linv_done);

	/* ORDER: Now DMA unmap all of the req's MRs, and return
	 * them to the free MW list.
	 */
556
unmap:
557 558
	for (i = 0, nchunks = req->rl_nchunks; nchunks; nchunks--) {
		seg = &req->rl_segments[i];
559 560
		mw = seg->rl_mw;
		seg->rl_mw = NULL;
561

562 563
		ib_dma_unmap_sg(ia->ri_device,
				mw->mw_sg, mw->mw_nents, mw->mw_dir);
564
		rpcrdma_put_mw(r_xprt, mw);
565 566 567 568 569 570

		i += seg->mr_nsegs;
		seg->mr_nsegs = 0;
	}

	req->rl_nchunks = 0;
571
	return;
572

573
reset_mrs:
574 575
	pr_err("rpcrdma: FRMR invalidate ib_post_send returned %i\n", rc);
	rdma_disconnect(ia->ri_id);
576

577 578 579 580 581 582 583
	/* Find and reset the MRs in the LOCAL_INV WRs that did not
	 * get posted. This is synchronous, and slow.
	 */
	for (i = 0, nchunks = req->rl_nchunks; nchunks; nchunks--) {
		seg = &req->rl_segments[i];
		mw = seg->rl_mw;
		f = &mw->frmr;
584

585 586 587 588
		if (mw->frmr.fr_mr->rkey == bad_wr->ex.invalidate_rkey) {
			__frwr_reset_mr(ia, mw);
			bad_wr = bad_wr->next;
		}
589

590 591 592
		i += seg->mr_nsegs;
	}
	goto unmap;
593
}
594

595 596 597 598 599 600 601 602 603 604
/* Use a slow, safe mechanism to invalidate all memory regions
 * that were registered for "req".
 */
static void
frwr_op_unmap_safe(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
		   bool sync)
{
	struct rpcrdma_mr_seg *seg;
	struct rpcrdma_mw *mw;
	unsigned int i;
605

606 607 608
	for (i = 0; req->rl_nchunks; req->rl_nchunks--) {
		seg = &req->rl_segments[i];
		mw = seg->rl_mw;
609

610
		if (sync)
611
			frwr_op_recover_mr(mw);
612
		else
613
			rpcrdma_defer_mr_recovery(mw);
614 615 616 617 618

		i += seg->mr_nsegs;
		seg->mr_nsegs = 0;
		seg->rl_mw = NULL;
	}
619 620
}

621 622 623 624 625 626 627 628 629 630 631 632 633
static void
frwr_op_destroy(struct rpcrdma_buffer *buf)
{
	struct rpcrdma_mw *r;

	while (!list_empty(&buf->rb_all)) {
		r = list_entry(buf->rb_all.next, struct rpcrdma_mw, mw_all);
		list_del(&r->mw_all);
		__frwr_release(r);
		kfree(r);
	}
}

634
const struct rpcrdma_memreg_ops rpcrdma_frwr_memreg_ops = {
635
	.ro_map				= frwr_op_map,
636
	.ro_unmap_sync			= frwr_op_unmap_sync,
637
	.ro_unmap_safe			= frwr_op_unmap_safe,
638
	.ro_recover_mr			= frwr_op_recover_mr,
C
Chuck Lever 已提交
639
	.ro_open			= frwr_op_open,
640
	.ro_maxpages			= frwr_op_maxpages,
C
Chuck Lever 已提交
641
	.ro_init			= frwr_op_init,
642
	.ro_destroy			= frwr_op_destroy,
643 644
	.ro_displayname			= "frwr",
};