frwr_ops.c 16.6 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13
/*
 * Copyright (c) 2015 Oracle.  All rights reserved.
 * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
 */

/* Lightweight memory registration using Fast Registration Work
 * Requests (FRWR). Also referred to sometimes as FRMR mode.
 *
 * FRWR features ordered asynchronous registration and deregistration
 * of arbitrarily sized memory regions. This is the fastest and safest
 * but most complex memory registration mode.
 */

14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69
/* Normal operation
 *
 * A Memory Region is prepared for RDMA READ or WRITE using a FAST_REG
 * Work Request (frmr_op_map). When the RDMA operation is finished, this
 * Memory Region is invalidated using a LOCAL_INV Work Request
 * (frmr_op_unmap).
 *
 * Typically these Work Requests are not signaled, and neither are RDMA
 * SEND Work Requests (with the exception of signaling occasionally to
 * prevent provider work queue overflows). This greatly reduces HCA
 * interrupt workload.
 *
 * As an optimization, frwr_op_unmap marks MRs INVALID before the
 * LOCAL_INV WR is posted. If posting succeeds, the MR is placed on
 * rb_mws immediately so that no work (like managing a linked list
 * under a spinlock) is needed in the completion upcall.
 *
 * But this means that frwr_op_map() can occasionally encounter an MR
 * that is INVALID but the LOCAL_INV WR has not completed. Work Queue
 * ordering prevents a subsequent FAST_REG WR from executing against
 * that MR while it is still being invalidated.
 */

/* Transport recovery
 *
 * ->op_map and the transport connect worker cannot run at the same
 * time, but ->op_unmap can fire while the transport connect worker
 * is running. Thus MR recovery is handled in ->op_map, to guarantee
 * that recovered MRs are owned by a sending RPC, and not one where
 * ->op_unmap could fire at the same time transport reconnect is
 * being done.
 *
 * When the underlying transport disconnects, MRs are left in one of
 * three states:
 *
 * INVALID:	The MR was not in use before the QP entered ERROR state.
 *		(Or, the LOCAL_INV WR has not completed or flushed yet).
 *
 * STALE:	The MR was being registered or unregistered when the QP
 *		entered ERROR state, and the pending WR was flushed.
 *
 * VALID:	The MR was registered before the QP entered ERROR state.
 *
 * When frwr_op_map encounters STALE and VALID MRs, they are recovered
 * with ib_dereg_mr and then are re-initialized. Beause MR recovery
 * allocates fresh resources, it is deferred to a workqueue, and the
 * recovered MRs are placed back on the rb_mws list when recovery is
 * complete. frwr_op_map allocates another MR for the current RPC while
 * the broken MR is reset.
 *
 * To ensure that frwr_op_map doesn't encounter an MR that is marked
 * INVALID but that is about to be flushed due to a previous transport
 * disconnect, the transport connect worker attempts to drain all
 * pending send queue WRs before the transport is reconnected.
 */

70 71 72 73 74 75
#include "xprt_rdma.h"

#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
# define RPCDBG_FACILITY	RPCDBG_TRANS
#endif

76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92
bool
frwr_is_supported(struct rpcrdma_ia *ia)
{
	struct ib_device_attr *attrs = &ia->ri_device->attrs;

	if (!(attrs->device_cap_flags & IB_DEVICE_MEM_MGT_EXTENSIONS))
		goto out_not_supported;
	if (attrs->max_fast_reg_page_list_len == 0)
		goto out_not_supported;
	return true;

out_not_supported:
	pr_info("rpcrdma: 'frwr' mode is not supported by device %s\n",
		ia->ri_device->name);
	return false;
}

93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136
static int
__frwr_init(struct rpcrdma_mw *r, struct ib_pd *pd, unsigned int depth)
{
	struct rpcrdma_frmr *f = &r->frmr;
	int rc;

	f->fr_mr = ib_alloc_mr(pd, IB_MR_TYPE_MEM_REG, depth);
	if (IS_ERR(f->fr_mr))
		goto out_mr_err;

	r->mw_sg = kcalloc(depth, sizeof(*r->mw_sg), GFP_KERNEL);
	if (!r->mw_sg)
		goto out_list_err;

	sg_init_table(r->mw_sg, depth);
	init_completion(&f->fr_linv_done);
	return 0;

out_mr_err:
	rc = PTR_ERR(f->fr_mr);
	dprintk("RPC:       %s: ib_alloc_mr status %i\n",
		__func__, rc);
	return rc;

out_list_err:
	rc = -ENOMEM;
	dprintk("RPC:       %s: sg allocation failure\n",
		__func__);
	ib_dereg_mr(f->fr_mr);
	return rc;
}

static void
__frwr_release(struct rpcrdma_mw *r)
{
	int rc;

	rc = ib_dereg_mr(r->frmr.fr_mr);
	if (rc)
		pr_err("rpcrdma: final ib_dereg_mr for %p returned %i\n",
		       r, rc);
	kfree(r->mw_sg);
}

137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162
static int
__frwr_reset_mr(struct rpcrdma_ia *ia, struct rpcrdma_mw *r)
{
	struct rpcrdma_frmr *f = &r->frmr;
	int rc;

	rc = ib_dereg_mr(f->fr_mr);
	if (rc) {
		pr_warn("rpcrdma: ib_dereg_mr status %d, frwr %p orphaned\n",
			rc, r);
		return rc;
	}

	f->fr_mr = ib_alloc_mr(ia->ri_pd, IB_MR_TYPE_MEM_REG,
			       ia->ri_max_frmr_depth);
	if (IS_ERR(f->fr_mr)) {
		pr_warn("rpcrdma: ib_alloc_mr status %ld, frwr %p orphaned\n",
			PTR_ERR(f->fr_mr), r);
		return PTR_ERR(f->fr_mr);
	}

	dprintk("RPC:       %s: recovered FRMR %p\n", __func__, r);
	f->fr_state = FRMR_IS_INVALID;
	return 0;
}

163 164 165 166 167 168
/* Reset of a single FRMR. Generate a fresh rkey by replacing the MR.
 *
 * There's no recovery if this fails. The FRMR is abandoned, but
 * remains in rb_all. It will be cleaned up when the transport is
 * destroyed.
 */
169
static void
170
frwr_op_recover_mr(struct rpcrdma_mw *mw)
171
{
172
	struct rpcrdma_xprt *r_xprt = mw->mw_xprt;
173 174 175 176
	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
	int rc;

	rc = __frwr_reset_mr(ia, mw);
177
	ib_dma_unmap_sg(ia->ri_device, mw->mw_sg, mw->mw_nents, mw->mw_dir);
178 179 180 181
	if (rc) {
		pr_err("rpcrdma: FRMR reset status %d, %p orphaned\n",
		       rc, mw);
		r_xprt->rx_stats.mrs_orphaned++;
182
		return;
183
	}
184

185 186
	rpcrdma_put_mw(r_xprt, mw);
	r_xprt->rx_stats.mrs_recovered++;
187 188
}

C
Chuck Lever 已提交
189 190 191 192 193 194 195 196
static int
frwr_op_open(struct rpcrdma_ia *ia, struct rpcrdma_ep *ep,
	     struct rpcrdma_create_data_internal *cdata)
{
	int depth, delta;

	ia->ri_max_frmr_depth =
			min_t(unsigned int, RPCRDMA_MAX_DATA_SEGS,
197
			      ia->ri_device->attrs.max_fast_reg_page_list_len);
C
Chuck Lever 已提交
198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223
	dprintk("RPC:       %s: device's max FR page list len = %u\n",
		__func__, ia->ri_max_frmr_depth);

	/* Add room for frmr register and invalidate WRs.
	 * 1. FRMR reg WR for head
	 * 2. FRMR invalidate WR for head
	 * 3. N FRMR reg WRs for pagelist
	 * 4. N FRMR invalidate WRs for pagelist
	 * 5. FRMR reg WR for tail
	 * 6. FRMR invalidate WR for tail
	 * 7. The RDMA_SEND WR
	 */
	depth = 7;

	/* Calculate N if the device max FRMR depth is smaller than
	 * RPCRDMA_MAX_DATA_SEGS.
	 */
	if (ia->ri_max_frmr_depth < RPCRDMA_MAX_DATA_SEGS) {
		delta = RPCRDMA_MAX_DATA_SEGS - ia->ri_max_frmr_depth;
		do {
			depth += 2; /* FRMR reg + invalidate */
			delta -= ia->ri_max_frmr_depth;
		} while (delta > 0);
	}

	ep->rep_attr.cap.max_send_wr *= depth;
224 225
	if (ep->rep_attr.cap.max_send_wr > ia->ri_device->attrs.max_qp_wr) {
		cdata->max_requests = ia->ri_device->attrs.max_qp_wr / depth;
C
Chuck Lever 已提交
226 227 228 229 230 231
		if (!cdata->max_requests)
			return -EINVAL;
		ep->rep_attr.cap.max_send_wr = cdata->max_requests *
					       depth;
	}

C
Chuck Lever 已提交
232 233 234
	rpcrdma_set_max_header_sizes(ia, cdata, max_t(unsigned int, 1,
						      RPCRDMA_MAX_DATA_SEGS /
						      ia->ri_max_frmr_depth));
C
Chuck Lever 已提交
235 236 237
	return 0;
}

238 239 240 241 242 243 244 245 246
/* FRWR mode conveys a list of pages per chunk segment. The
 * maximum length of that list is the FRWR page list depth.
 */
static size_t
frwr_op_maxpages(struct rpcrdma_xprt *r_xprt)
{
	struct rpcrdma_ia *ia = &r_xprt->rx_ia;

	return min_t(unsigned int, RPCRDMA_MAX_DATA_SEGS,
247
		     RPCRDMA_MAX_HDR_SEGS * ia->ri_max_frmr_depth);
248 249
}

250 251 252 253 254 255 256 257 258 259 260 261 262 263 264
static void
__frwr_sendcompletion_flush(struct ib_wc *wc, struct rpcrdma_frmr *frmr,
			    const char *wr)
{
	frmr->fr_state = FRMR_IS_STALE;
	if (wc->status != IB_WC_WR_FLUSH_ERR)
		pr_err("rpcrdma: %s: %s (%u/0x%x)\n",
		       wr, ib_wc_status_msg(wc->status),
		       wc->status, wc->vendor_err);
}

/**
 * frwr_wc_fastreg - Invoked by RDMA provider for each polled FastReg WC
 * @cq:	completion queue (ignored)
 * @wc:	completed WR
265 266
 *
 */
267
static void
268
frwr_wc_fastreg(struct ib_cq *cq, struct ib_wc *wc)
269
{
270 271
	struct rpcrdma_frmr *frmr;
	struct ib_cqe *cqe;
272

273 274 275 276 277 278
	/* WARNING: Only wr_cqe and status are reliable at this point */
	if (wc->status != IB_WC_SUCCESS) {
		cqe = wc->wr_cqe;
		frmr = container_of(cqe, struct rpcrdma_frmr, fr_cqe);
		__frwr_sendcompletion_flush(wc, frmr, "fastreg");
	}
279 280
}

281 282 283 284 285 286
/**
 * frwr_wc_localinv - Invoked by RDMA provider for each polled LocalInv WC
 * @cq:	completion queue (ignored)
 * @wc:	completed WR
 *
 */
287
static void
288
frwr_wc_localinv(struct ib_cq *cq, struct ib_wc *wc)
289
{
290 291
	struct rpcrdma_frmr *frmr;
	struct ib_cqe *cqe;
292

293 294 295 296 297 298 299
	/* WARNING: Only wr_cqe and status are reliable at this point */
	if (wc->status != IB_WC_SUCCESS) {
		cqe = wc->wr_cqe;
		frmr = container_of(cqe, struct rpcrdma_frmr, fr_cqe);
		__frwr_sendcompletion_flush(wc, frmr, "localinv");
	}
}
300

301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319
/**
 * frwr_wc_localinv - Invoked by RDMA provider for each polled LocalInv WC
 * @cq:	completion queue (ignored)
 * @wc:	completed WR
 *
 * Awaken anyone waiting for an MR to finish being fenced.
 */
static void
frwr_wc_localinv_wake(struct ib_cq *cq, struct ib_wc *wc)
{
	struct rpcrdma_frmr *frmr;
	struct ib_cqe *cqe;

	/* WARNING: Only wr_cqe and status are reliable at this point */
	cqe = wc->wr_cqe;
	frmr = container_of(cqe, struct rpcrdma_frmr, fr_cqe);
	if (wc->status != IB_WC_SUCCESS)
		__frwr_sendcompletion_flush(wc, frmr, "localinv");
	complete_all(&frmr->fr_linv_done);
320 321
}

C
Chuck Lever 已提交
322 323 324 325 326 327 328 329
static int
frwr_op_init(struct rpcrdma_xprt *r_xprt)
{
	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
	unsigned int depth = r_xprt->rx_ia.ri_max_frmr_depth;
	struct ib_pd *pd = r_xprt->rx_ia.ri_pd;
	int i;

C
Chuck Lever 已提交
330
	spin_lock_init(&buf->rb_mwlock);
C
Chuck Lever 已提交
331 332 333
	INIT_LIST_HEAD(&buf->rb_mws);
	INIT_LIST_HEAD(&buf->rb_all);

334 335 336 337
	i = max_t(int, RPCRDMA_MAX_DATA_SEGS / depth, 1);
	i += 2;				/* head + tail */
	i *= buf->rb_max_requests;	/* one set for each RPC slot */
	dprintk("RPC:       %s: initalizing %d FRMRs\n", __func__, i);
C
Chuck Lever 已提交
338 339 340 341 342 343 344 345 346

	while (i--) {
		struct rpcrdma_mw *r;
		int rc;

		r = kzalloc(sizeof(*r), GFP_KERNEL);
		if (!r)
			return -ENOMEM;

347
		rc = __frwr_init(r, pd, depth);
C
Chuck Lever 已提交
348 349 350 351 352
		if (rc) {
			kfree(r);
			return rc;
		}

353
		r->mw_xprt = r_xprt;
C
Chuck Lever 已提交
354 355 356 357 358 359 360
		list_add(&r->mw_list, &buf->rb_mws);
		list_add(&r->mw_all, &buf->rb_all);
	}

	return 0;
}

361
/* Post a REG_MR Work Request to register a memory region
362 363 364 365 366 367 368 369
 * for remote access via RDMA READ or RDMA WRITE.
 */
static int
frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
	    int nsegs, bool writing)
{
	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
	struct rpcrdma_mr_seg *seg1 = seg;
370 371 372
	struct rpcrdma_mw *mw;
	struct rpcrdma_frmr *frmr;
	struct ib_mr *mr;
373
	struct ib_reg_wr *reg_wr;
C
Christoph Hellwig 已提交
374
	struct ib_send_wr *bad_wr;
375
	int rc, i, n, dma_nents;
376 377
	u8 key;

378 379 380 381
	mw = seg1->rl_mw;
	seg1->rl_mw = NULL;
	do {
		if (mw)
382
			rpcrdma_defer_mr_recovery(mw);
383 384
		mw = rpcrdma_get_mw(r_xprt);
		if (!mw)
385
			return -ENOBUFS;
386 387
	} while (mw->frmr.fr_state != FRMR_IS_INVALID);
	frmr = &mw->frmr;
388
	frmr->fr_state = FRMR_IS_VALID;
389
	mr = frmr->fr_mr;
390
	reg_wr = &frmr->fr_regwr;
391

392 393
	if (nsegs > ia->ri_max_frmr_depth)
		nsegs = ia->ri_max_frmr_depth;
394 395
	for (i = 0; i < nsegs;) {
		if (seg->mr_page)
396
			sg_set_page(&mw->mw_sg[i],
397 398 399 400
				    seg->mr_page,
				    seg->mr_len,
				    offset_in_page(seg->mr_offset));
		else
401
			sg_set_buf(&mw->mw_sg[i], seg->mr_offset,
402 403
				   seg->mr_len);

404 405
		++seg;
		++i;
406

407 408 409 410 411
		/* Check for holes */
		if ((i < nsegs && offset_in_page(seg->mr_offset)) ||
		    offset_in_page((seg-1)->mr_offset + (seg-1)->mr_len))
			break;
	}
412 413
	mw->mw_nents = i;
	mw->mw_dir = rpcrdma_data_dir(writing);
414

415 416 417 418 419 420 421 422
	dma_nents = ib_dma_map_sg(ia->ri_device,
				  mw->mw_sg, mw->mw_nents, mw->mw_dir);
	if (!dma_nents)
		goto out_dmamap_err;

	n = ib_map_mr_sg(mr, mw->mw_sg, mw->mw_nents, NULL, PAGE_SIZE);
	if (unlikely(n != mw->mw_nents))
		goto out_mapmr_err;
423 424

	dprintk("RPC:       %s: Using frmr %p to map %u segments (%u bytes)\n",
425
		__func__, mw, mw->mw_nents, mr->length);
426

427 428
	key = (u8)(mr->rkey & 0x000000FF);
	ib_update_fast_reg_key(mr, ++key);
429

430 431
	reg_wr->wr.next = NULL;
	reg_wr->wr.opcode = IB_WR_REG_MR;
432 433
	frmr->fr_cqe.done = frwr_wc_fastreg;
	reg_wr->wr.wr_cqe = &frmr->fr_cqe;
434 435 436 437 438 439 440
	reg_wr->wr.num_sge = 0;
	reg_wr->wr.send_flags = 0;
	reg_wr->mr = mr;
	reg_wr->key = mr->rkey;
	reg_wr->access = writing ?
			 IB_ACCESS_REMOTE_WRITE | IB_ACCESS_LOCAL_WRITE :
			 IB_ACCESS_REMOTE_READ;
441 442

	DECR_CQCOUNT(&r_xprt->rx_ep);
443
	rc = ib_post_send(ia->ri_id->qp, &reg_wr->wr, &bad_wr);
444 445 446
	if (rc)
		goto out_senderr;

447
	seg1->rl_mw = mw;
448
	seg1->mr_rkey = mr->rkey;
449
	seg1->mr_base = mr->iova;
450
	seg1->mr_nsegs = mw->mw_nents;
451 452
	seg1->mr_len = mr->length;

453 454 455 456 457
	return mw->mw_nents;

out_dmamap_err:
	pr_err("rpcrdma: failed to dma map sg %p sg_nents %u\n",
	       mw->mw_sg, mw->mw_nents);
458
	rpcrdma_defer_mr_recovery(mw);
459
	return -EIO;
460 461 462 463

out_mapmr_err:
	pr_err("rpcrdma: failed to map mr %p (%u/%u)\n",
	       frmr->fr_mr, n, mw->mw_nents);
464
	rpcrdma_defer_mr_recovery(mw);
465
	return -EIO;
466 467

out_senderr:
468
	pr_err("rpcrdma: FRMR registration ib_post_send returned %i\n", rc);
469
	rpcrdma_defer_mr_recovery(mw);
470
	return -ENOTCONN;
471 472
}

473 474 475 476
static struct ib_send_wr *
__frwr_prepare_linv_wr(struct rpcrdma_mr_seg *seg)
{
	struct rpcrdma_mw *mw = seg->rl_mw;
477
	struct rpcrdma_frmr *f = &mw->frmr;
478 479 480 481 482 483
	struct ib_send_wr *invalidate_wr;

	f->fr_state = FRMR_IS_INVALID;
	invalidate_wr = &f->fr_invwr;

	memset(invalidate_wr, 0, sizeof(*invalidate_wr));
484 485
	f->fr_cqe.done = frwr_wc_localinv;
	invalidate_wr->wr_cqe = &f->fr_cqe;
486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504
	invalidate_wr->opcode = IB_WR_LOCAL_INV;
	invalidate_wr->ex.invalidate_rkey = f->fr_mr->rkey;

	return invalidate_wr;
}

/* Invalidate all memory regions that were registered for "req".
 *
 * Sleeps until it is safe for the host CPU to access the
 * previously mapped memory regions.
 */
static void
frwr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
{
	struct ib_send_wr *invalidate_wrs, *pos, *prev, *bad_wr;
	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
	struct rpcrdma_mr_seg *seg;
	unsigned int i, nchunks;
	struct rpcrdma_frmr *f;
505
	struct rpcrdma_mw *mw;
506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529
	int rc;

	dprintk("RPC:       %s: req %p\n", __func__, req);

	/* ORDER: Invalidate all of the req's MRs first
	 *
	 * Chain the LOCAL_INV Work Requests and post them with
	 * a single ib_post_send() call.
	 */
	invalidate_wrs = pos = prev = NULL;
	seg = NULL;
	for (i = 0, nchunks = req->rl_nchunks; nchunks; nchunks--) {
		seg = &req->rl_segments[i];

		pos = __frwr_prepare_linv_wr(seg);

		if (!invalidate_wrs)
			invalidate_wrs = pos;
		else
			prev->next = pos;
		prev = pos;

		i += seg->mr_nsegs;
	}
530
	f = &seg->rl_mw->frmr;
531 532 533 534 535 536

	/* Strong send queue ordering guarantees that when the
	 * last WR in the chain completes, all WRs in the chain
	 * are complete.
	 */
	f->fr_invwr.send_flags = IB_SEND_SIGNALED;
537 538
	f->fr_cqe.done = frwr_wc_localinv_wake;
	reinit_completion(&f->fr_linv_done);
539 540 541 542 543 544 545
	INIT_CQCOUNT(&r_xprt->rx_ep);

	/* Transport disconnect drains the receive CQ before it
	 * replaces the QP. The RPC reply handler won't call us
	 * unless ri_id->qp is a valid pointer.
	 */
	rc = ib_post_send(ia->ri_id->qp, invalidate_wrs, &bad_wr);
546 547
	if (rc)
		goto reset_mrs;
548 549 550 551 552 553

	wait_for_completion(&f->fr_linv_done);

	/* ORDER: Now DMA unmap all of the req's MRs, and return
	 * them to the free MW list.
	 */
554
unmap:
555 556
	for (i = 0, nchunks = req->rl_nchunks; nchunks; nchunks--) {
		seg = &req->rl_segments[i];
557 558
		mw = seg->rl_mw;
		seg->rl_mw = NULL;
559

560 561
		ib_dma_unmap_sg(ia->ri_device,
				mw->mw_sg, mw->mw_nents, mw->mw_dir);
562
		rpcrdma_put_mw(r_xprt, mw);
563 564 565 566 567 568

		i += seg->mr_nsegs;
		seg->mr_nsegs = 0;
	}

	req->rl_nchunks = 0;
569
	return;
570

571
reset_mrs:
572 573
	pr_err("rpcrdma: FRMR invalidate ib_post_send returned %i\n", rc);
	rdma_disconnect(ia->ri_id);
574

575 576 577 578 579 580 581
	/* Find and reset the MRs in the LOCAL_INV WRs that did not
	 * get posted. This is synchronous, and slow.
	 */
	for (i = 0, nchunks = req->rl_nchunks; nchunks; nchunks--) {
		seg = &req->rl_segments[i];
		mw = seg->rl_mw;
		f = &mw->frmr;
582

583 584 585 586
		if (mw->frmr.fr_mr->rkey == bad_wr->ex.invalidate_rkey) {
			__frwr_reset_mr(ia, mw);
			bad_wr = bad_wr->next;
		}
587

588 589 590
		i += seg->mr_nsegs;
	}
	goto unmap;
591
}
592

593 594 595 596 597 598 599 600 601 602
/* Use a slow, safe mechanism to invalidate all memory regions
 * that were registered for "req".
 */
static void
frwr_op_unmap_safe(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
		   bool sync)
{
	struct rpcrdma_mr_seg *seg;
	struct rpcrdma_mw *mw;
	unsigned int i;
603

604 605 606
	for (i = 0; req->rl_nchunks; req->rl_nchunks--) {
		seg = &req->rl_segments[i];
		mw = seg->rl_mw;
607

608
		if (sync)
609
			frwr_op_recover_mr(mw);
610
		else
611
			rpcrdma_defer_mr_recovery(mw);
612 613 614 615 616

		i += seg->mr_nsegs;
		seg->mr_nsegs = 0;
		seg->rl_mw = NULL;
	}
617 618
}

619 620 621 622 623 624 625 626 627 628 629 630 631
static void
frwr_op_destroy(struct rpcrdma_buffer *buf)
{
	struct rpcrdma_mw *r;

	while (!list_empty(&buf->rb_all)) {
		r = list_entry(buf->rb_all.next, struct rpcrdma_mw, mw_all);
		list_del(&r->mw_all);
		__frwr_release(r);
		kfree(r);
	}
}

632
const struct rpcrdma_memreg_ops rpcrdma_frwr_memreg_ops = {
633
	.ro_map				= frwr_op_map,
634
	.ro_unmap_sync			= frwr_op_unmap_sync,
635
	.ro_unmap_safe			= frwr_op_unmap_safe,
636
	.ro_recover_mr			= frwr_op_recover_mr,
C
Chuck Lever 已提交
637
	.ro_open			= frwr_op_open,
638
	.ro_maxpages			= frwr_op_maxpages,
C
Chuck Lever 已提交
639
	.ro_init			= frwr_op_init,
640
	.ro_destroy			= frwr_op_destroy,
641 642
	.ro_displayname			= "frwr",
};