frwr_ops.c 18.0 KB
Newer Older
1
// SPDX-License-Identifier: GPL-2.0
2
/*
3
 * Copyright (c) 2015, 2017 Oracle.  All rights reserved.
4 5 6 7
 * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
 */

/* Lightweight memory registration using Fast Registration Work
8
 * Requests (FRWR).
9
 *
10 11
 * FRWR features ordered asynchronous registration and invalidation
 * of arbitrarily-sized memory regions. This is the fastest and safest
12 13 14
 * but most complex memory registration mode.
 */

15 16
/* Normal operation
 *
17
 * A Memory Region is prepared for RDMA Read or Write using a FAST_REG
18
 * Work Request (frwr_map). When the RDMA operation is finished, this
19
 * Memory Region is invalidated using a LOCAL_INV Work Request
20
 * (frwr_unmap_async and frwr_unmap_sync).
21
 *
22 23 24
 * Typically FAST_REG Work Requests are not signaled, and neither are
 * RDMA Send Work Requests (with the exception of signaling occasionally
 * to prevent provider work queue overflows). This greatly reduces HCA
25 26 27 28 29
 * interrupt workload.
 */

/* Transport recovery
 *
30 31 32 33 34 35 36 37 38
 * frwr_map and frwr_unmap_* cannot run at the same time the transport
 * connect worker is running. The connect worker holds the transport
 * send lock, just as ->send_request does. This prevents frwr_map and
 * the connect worker from running concurrently. When a connection is
 * closed, the Receive completion queue is drained before the allowing
 * the connect worker to get control. This prevents frwr_unmap and the
 * connect worker from running concurrently.
 *
 * When the underlying transport disconnects, MRs that are in flight
39 40
 * are flushed and are likely unusable. Thus all MRs are destroyed.
 * New MRs are created on demand.
41 42
 */

43
#include <linux/sunrpc/rpc_rdma.h>
C
Chuck Lever 已提交
44
#include <linux/sunrpc/svc_rdma.h>
45

46
#include "xprt_rdma.h"
47
#include <trace/events/rpcrdma.h>
48 49 50 51 52

#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
# define RPCDBG_FACILITY	RPCDBG_TRANS
#endif

53 54 55 56 57 58
/**
 * frwr_release_mr - Destroy one MR
 * @mr: MR allocated by frwr_init_mr
 *
 */
void frwr_release_mr(struct rpcrdma_mr *mr)
59 60 61 62 63
{
	int rc;

	rc = ib_dereg_mr(mr->frwr.fr_mr);
	if (rc)
64
		trace_xprtrdma_frwr_dereg(mr, rc);
65 66 67 68
	kfree(mr->mr_sg);
	kfree(mr);
}

69
static void frwr_mr_recycle(struct rpcrdma_mr *mr)
70
{
71 72
	struct rpcrdma_xprt *r_xprt = mr->mr_xprt;

73 74
	trace_xprtrdma_mr_recycle(mr);

75
	if (mr->mr_dir != DMA_NONE) {
76
		trace_xprtrdma_mr_unmap(mr);
77
		ib_dma_unmap_sg(r_xprt->rx_ia.ri_id->device,
78
				mr->mr_sg, mr->mr_nents, mr->mr_dir);
79
		mr->mr_dir = DMA_NONE;
80 81
	}

82
	spin_lock(&r_xprt->rx_buf.rb_lock);
83 84
	list_del(&mr->mr_all);
	r_xprt->rx_stats.mrs_recycled++;
85
	spin_unlock(&r_xprt->rx_buf.rb_lock);
86 87

	frwr_release_mr(mr);
88 89
}

90 91 92 93 94 95 96 97 98 99 100 101
/* frwr_reset - Place MRs back on the free list
 * @req: request to reset
 *
 * Used after a failed marshal. For FRWR, this means the MRs
 * don't have to be fully released and recreated.
 *
 * NB: This is safe only as long as none of @req's MRs are
 * involved with an ongoing asynchronous FAST_REG or LOCAL_INV
 * Work Request.
 */
void frwr_reset(struct rpcrdma_req *req)
{
C
Chuck Lever 已提交
102
	struct rpcrdma_mr *mr;
103

C
Chuck Lever 已提交
104
	while ((mr = rpcrdma_mr_pop(&req->rl_registered)))
105
		rpcrdma_mr_put(mr);
106 107
}

108 109 110 111 112 113 114 115 116
/**
 * frwr_init_mr - Initialize one MR
 * @ia: interface adapter
 * @mr: generic MR to prepare for FRWR
 *
 * Returns zero if successful. Otherwise a negative errno
 * is returned.
 */
int frwr_init_mr(struct rpcrdma_ia *ia, struct rpcrdma_mr *mr)
117
{
118
	unsigned int depth = ia->ri_max_frwr_depth;
C
Chuck Lever 已提交
119 120
	struct scatterlist *sg;
	struct ib_mr *frmr;
121 122
	int rc;

C
Chuck Lever 已提交
123 124
	frmr = ib_alloc_mr(ia->ri_pd, ia->ri_mrtype, depth);
	if (IS_ERR(frmr))
125 126
		goto out_mr_err;

127
	sg = kcalloc(depth, sizeof(*sg), GFP_NOFS);
C
Chuck Lever 已提交
128
	if (!sg)
129 130
		goto out_list_err;

C
Chuck Lever 已提交
131
	mr->frwr.fr_mr = frmr;
132
	mr->mr_dir = DMA_NONE;
133
	INIT_LIST_HEAD(&mr->mr_list);
C
Chuck Lever 已提交
134 135 136 137
	init_completion(&mr->frwr.fr_linv_done);

	sg_init_table(sg, depth);
	mr->mr_sg = sg;
138 139 140
	return 0;

out_mr_err:
C
Chuck Lever 已提交
141
	rc = PTR_ERR(frmr);
142
	trace_xprtrdma_frwr_alloc(mr, rc);
143 144 145
	return rc;

out_list_err:
C
Chuck Lever 已提交
146 147
	ib_dereg_mr(frmr);
	return -ENOMEM;
148 149
}

150
/**
151 152 153
 * frwr_query_device - Prepare a transport for use with FRWR
 * @r_xprt: controlling transport instance
 * @device: RDMA device to query
154 155
 *
 * On success, sets:
156
 *	ep->rep_attr
157
 *	ep->rep_max_requests
158
 *	ia->ri_max_rdma_segs
159 160 161 162
 *
 * And these FRWR-related fields:
 *	ia->ri_max_frwr_depth
 *	ia->ri_mrtype
163
 *
164 165 166 167
 * Return values:
 *   On success, returns zero.
 *   %-EINVAL - the device does not support FRWR memory registration
 *   %-ENOMEM - the device is not sufficiently capable for NFS/RDMA
168
 */
169 170
int frwr_query_device(struct rpcrdma_xprt *r_xprt,
		      const struct ib_device *device)
C
Chuck Lever 已提交
171
{
172 173 174
	const struct ib_device_attr *attrs = &device->attrs;
	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
	struct rpcrdma_ep *ep = &r_xprt->rx_ep;
175
	int max_qp_wr, depth, delta;
176 177
	unsigned int max_sge;

178 179 180 181 182 183 184
	if (!(attrs->device_cap_flags & IB_DEVICE_MEM_MGT_EXTENSIONS) ||
	    attrs->max_fast_reg_page_list_len == 0) {
		pr_err("rpcrdma: 'frwr' mode is not supported by device %s\n",
		       device->name);
		return -EINVAL;
	}

185 186 187 188 189 190 191 192
	max_sge = min_t(unsigned int, attrs->max_send_sge,
			RPCRDMA_MAX_SEND_SGES);
	if (max_sge < RPCRDMA_MIN_SEND_SGES) {
		pr_err("rpcrdma: HCA provides only %u send SGEs\n", max_sge);
		return -ENOMEM;
	}
	ep->rep_attr.cap.max_send_sge = max_sge;
	ep->rep_attr.cap.max_recv_sge = 1;
C
Chuck Lever 已提交
193

C
Chuck Lever 已提交
194 195 196 197
	ia->ri_mrtype = IB_MR_TYPE_MEM_REG;
	if (attrs->device_cap_flags & IB_DEVICE_SG_GAPS_REG)
		ia->ri_mrtype = IB_MR_TYPE_SG_GAPS;

C
Chuck Lever 已提交
198 199 200 201
	/* Quirk: Some devices advertise a large max_fast_reg_page_list_len
	 * capability, but perform optimally when the MRs are not larger
	 * than a page.
	 */
202
	if (attrs->max_sge_rd > RPCRDMA_MAX_HDR_SEGS)
C
Chuck Lever 已提交
203 204 205 206 207
		ia->ri_max_frwr_depth = attrs->max_sge_rd;
	else
		ia->ri_max_frwr_depth = attrs->max_fast_reg_page_list_len;
	if (ia->ri_max_frwr_depth > RPCRDMA_MAX_DATA_SEGS)
		ia->ri_max_frwr_depth = RPCRDMA_MAX_DATA_SEGS;
208 209 210 211 212 213 214 215

	/* Add room for frwr register and invalidate WRs.
	 * 1. FRWR reg WR for head
	 * 2. FRWR invalidate WR for head
	 * 3. N FRWR reg WRs for pagelist
	 * 4. N FRWR invalidate WRs for pagelist
	 * 5. FRWR reg WR for tail
	 * 6. FRWR invalidate WR for tail
C
Chuck Lever 已提交
216 217 218 219
	 * 7. The RDMA_SEND WR
	 */
	depth = 7;

220
	/* Calculate N if the device max FRWR depth is smaller than
C
Chuck Lever 已提交
221 222
	 * RPCRDMA_MAX_DATA_SEGS.
	 */
223 224
	if (ia->ri_max_frwr_depth < RPCRDMA_MAX_DATA_SEGS) {
		delta = RPCRDMA_MAX_DATA_SEGS - ia->ri_max_frwr_depth;
C
Chuck Lever 已提交
225
		do {
226 227
			depth += 2; /* FRWR reg + invalidate */
			delta -= ia->ri_max_frwr_depth;
C
Chuck Lever 已提交
228 229 230
		} while (delta > 0);
	}

231
	max_qp_wr = attrs->max_qp_wr;
232 233 234 235
	max_qp_wr -= RPCRDMA_BACKWARD_WRS;
	max_qp_wr -= 1;
	if (max_qp_wr < RPCRDMA_MIN_SLOT_TABLE)
		return -ENOMEM;
236 237 238
	if (ep->rep_max_requests > max_qp_wr)
		ep->rep_max_requests = max_qp_wr;
	ep->rep_attr.cap.max_send_wr = ep->rep_max_requests * depth;
239
	if (ep->rep_attr.cap.max_send_wr > max_qp_wr) {
240 241
		ep->rep_max_requests = max_qp_wr / depth;
		if (!ep->rep_max_requests)
242
			return -ENOMEM;
243
		ep->rep_attr.cap.max_send_wr = ep->rep_max_requests * depth;
C
Chuck Lever 已提交
244
	}
245 246
	ep->rep_attr.cap.max_send_wr += RPCRDMA_BACKWARD_WRS;
	ep->rep_attr.cap.max_send_wr += 1; /* for ib_drain_sq */
247
	ep->rep_attr.cap.max_recv_wr = ep->rep_max_requests;
248 249
	ep->rep_attr.cap.max_recv_wr += RPCRDMA_BACKWARD_WRS;
	ep->rep_attr.cap.max_recv_wr += 1; /* for ib_drain_rq */
C
Chuck Lever 已提交
250

251
	ia->ri_max_rdma_segs =
252
		DIV_ROUND_UP(RPCRDMA_MAX_DATA_SEGS, ia->ri_max_frwr_depth);
253
	/* Reply chunks require segments for head and tail buffers */
254 255 256 257 258 259 260 261 262 263 264
	ia->ri_max_rdma_segs += 2;
	if (ia->ri_max_rdma_segs > RPCRDMA_MAX_HDR_SEGS)
		ia->ri_max_rdma_segs = RPCRDMA_MAX_HDR_SEGS;

	/* Ensure the underlying device is capable of conveying the
	 * largest r/wsize NFS will ask for. This guarantees that
	 * failing over from one RDMA device to another will not
	 * break NFS I/O.
	 */
	if ((ia->ri_max_rdma_segs * ia->ri_max_frwr_depth) < RPCRDMA_MAX_SEGS)
		return -ENOMEM;
265

266
	return 0;
267 268
}

269 270 271 272 273 274
/**
 * frwr_map - Register a memory region
 * @r_xprt: controlling transport
 * @seg: memory region co-ordinates
 * @nsegs: number of segments remaining
 * @writing: true when RDMA Write will be used
275
 * @xid: XID of RPC using the registered memory
276
 * @mr: MR to fill in
277 278
 *
 * Prepare a REG_MR Work Request to register a memory region
279
 * for remote access via RDMA READ or RDMA WRITE.
280 281
 *
 * Returns the next segment or a negative errno pointer.
282
 * On success, @mr is filled in.
283
 */
284 285
struct rpcrdma_mr_seg *frwr_map(struct rpcrdma_xprt *r_xprt,
				struct rpcrdma_mr_seg *seg,
C
Chuck Lever 已提交
286
				int nsegs, bool writing, __be32 xid,
287
				struct rpcrdma_mr *mr)
288 289
{
	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
290
	struct ib_reg_wr *reg_wr;
291
	struct ib_mr *ibmr;
292
	int i, n;
293 294
	u8 key;

295 296
	if (nsegs > ia->ri_max_frwr_depth)
		nsegs = ia->ri_max_frwr_depth;
297 298
	for (i = 0; i < nsegs;) {
		if (seg->mr_page)
C
Chuck Lever 已提交
299
			sg_set_page(&mr->mr_sg[i],
300 301 302 303
				    seg->mr_page,
				    seg->mr_len,
				    offset_in_page(seg->mr_offset));
		else
C
Chuck Lever 已提交
304
			sg_set_buf(&mr->mr_sg[i], seg->mr_offset,
305 306
				   seg->mr_len);

307 308
		++seg;
		++i;
309
		if (ia->ri_mrtype == IB_MR_TYPE_SG_GAPS)
C
Chuck Lever 已提交
310
			continue;
311 312 313 314
		if ((i < nsegs && offset_in_page(seg->mr_offset)) ||
		    offset_in_page((seg-1)->mr_offset + (seg-1)->mr_len))
			break;
	}
C
Chuck Lever 已提交
315
	mr->mr_dir = rpcrdma_data_dir(writing);
316

317 318
	mr->mr_nents =
		ib_dma_map_sg(ia->ri_id->device, mr->mr_sg, i, mr->mr_dir);
C
Chuck Lever 已提交
319
	if (!mr->mr_nents)
320 321
		goto out_dmamap_err;

C
Chuck Lever 已提交
322
	ibmr = mr->frwr.fr_mr;
C
Chuck Lever 已提交
323 324
	n = ib_map_mr_sg(ibmr, mr->mr_sg, mr->mr_nents, NULL, PAGE_SIZE);
	if (unlikely(n != mr->mr_nents))
325
		goto out_mapmr_err;
326

327
	ibmr->iova &= 0x00000000ffffffff;
C
Chuck Lever 已提交
328
	ibmr->iova |= ((u64)be32_to_cpu(xid)) << 32;
C
Chuck Lever 已提交
329 330
	key = (u8)(ibmr->rkey & 0x000000FF);
	ib_update_fast_reg_key(ibmr, ++key);
331

C
Chuck Lever 已提交
332
	reg_wr = &mr->frwr.fr_regwr;
C
Chuck Lever 已提交
333 334
	reg_wr->mr = ibmr;
	reg_wr->key = ibmr->rkey;
335 336 337
	reg_wr->access = writing ?
			 IB_ACCESS_REMOTE_WRITE | IB_ACCESS_LOCAL_WRITE :
			 IB_ACCESS_REMOTE_READ;
338

C
Chuck Lever 已提交
339 340 341
	mr->mr_handle = ibmr->rkey;
	mr->mr_length = ibmr->length;
	mr->mr_offset = ibmr->iova;
342
	trace_xprtrdma_mr_map(mr);
343

344
	return seg;
345 346

out_dmamap_err:
347
	mr->mr_dir = DMA_NONE;
348
	trace_xprtrdma_frwr_sgerr(mr, i);
349
	return ERR_PTR(-EIO);
350 351

out_mapmr_err:
352
	trace_xprtrdma_frwr_maperr(mr, n);
353
	return ERR_PTR(-EIO);
354
}
355

C
Chuck Lever 已提交
356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372
/**
 * frwr_wc_fastreg - Invoked by RDMA provider for a flushed FastReg WC
 * @cq:	completion queue (ignored)
 * @wc:	completed WR
 *
 */
static void frwr_wc_fastreg(struct ib_cq *cq, struct ib_wc *wc)
{
	struct ib_cqe *cqe = wc->wr_cqe;
	struct rpcrdma_frwr *frwr =
		container_of(cqe, struct rpcrdma_frwr, fr_cqe);

	/* WARNING: Only wr_cqe and status are reliable at this point */
	trace_xprtrdma_wc_fastreg(wc, frwr);
	/* The MR will get recycled when the associated req is retransmitted */
}

373 374 375 376
/**
 * frwr_send - post Send WR containing the RPC Call message
 * @ia: interface adapter
 * @req: Prepared RPC Call
377
 *
378
 * For FRWR, chain any FastReg WRs to the Send WR. Only a
379 380
 * single ib_post_send call is needed to register memory
 * and then post the Send WR.
381 382
 *
 * Returns the result of ib_post_send.
383
 */
384
int frwr_send(struct rpcrdma_ia *ia, struct rpcrdma_req *req)
385
{
386
	struct ib_send_wr *post_wr;
387 388
	struct rpcrdma_mr *mr;

389
	post_wr = &req->rl_wr;
390 391 392 393 394 395 396 397 398 399 400 401 402 403 404
	list_for_each_entry(mr, &req->rl_registered, mr_list) {
		struct rpcrdma_frwr *frwr;

		frwr = &mr->frwr;

		frwr->fr_cqe.done = frwr_wc_fastreg;
		frwr->fr_regwr.wr.next = post_wr;
		frwr->fr_regwr.wr.wr_cqe = &frwr->fr_cqe;
		frwr->fr_regwr.wr.num_sge = 0;
		frwr->fr_regwr.wr.opcode = IB_WR_REG_MR;
		frwr->fr_regwr.wr.send_flags = 0;

		post_wr = &frwr->fr_regwr.wr;
	}

405
	return ib_post_send(ia->ri_id->qp, post_wr, NULL);
406 407
}

408 409 410 411 412
/**
 * frwr_reminv - handle a remotely invalidated mr on the @mrs list
 * @rep: Received reply
 * @mrs: list of MRs to check
 *
413
 */
414
void frwr_reminv(struct rpcrdma_rep *rep, struct list_head *mrs)
415
{
C
Chuck Lever 已提交
416
	struct rpcrdma_mr *mr;
417

C
Chuck Lever 已提交
418 419
	list_for_each_entry(mr, mrs, mr_list)
		if (mr->mr_handle == rep->rr_inv_rkey) {
420
			list_del_init(&mr->mr_list);
421
			trace_xprtrdma_mr_remoteinv(mr);
422
			rpcrdma_mr_put(mr);
423 424 425 426
			break;	/* only one invalidated MR per RPC */
		}
}

C
Chuck Lever 已提交
427 428 429
static void __frwr_release_mr(struct ib_wc *wc, struct rpcrdma_mr *mr)
{
	if (wc->status != IB_WC_SUCCESS)
430
		frwr_mr_recycle(mr);
C
Chuck Lever 已提交
431
	else
432
		rpcrdma_mr_put(mr);
C
Chuck Lever 已提交
433 434
}

435
/**
C
Chuck Lever 已提交
436 437 438
 * frwr_wc_localinv - Invoked by RDMA provider for a LOCAL_INV WC
 * @cq:	completion queue (ignored)
 * @wc:	completed WR
439
 *
C
Chuck Lever 已提交
440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456
 */
static void frwr_wc_localinv(struct ib_cq *cq, struct ib_wc *wc)
{
	struct ib_cqe *cqe = wc->wr_cqe;
	struct rpcrdma_frwr *frwr =
		container_of(cqe, struct rpcrdma_frwr, fr_cqe);
	struct rpcrdma_mr *mr = container_of(frwr, struct rpcrdma_mr, frwr);

	/* WARNING: Only wr_cqe and status are reliable at this point */
	trace_xprtrdma_wc_li(wc, frwr);
	__frwr_release_mr(wc, mr);
}

/**
 * frwr_wc_localinv_wake - Invoked by RDMA provider for a LOCAL_INV WC
 * @cq:	completion queue (ignored)
 * @wc:	completed WR
457
 *
C
Chuck Lever 已提交
458
 * Awaken anyone waiting for an MR to finish being fenced.
459
 */
C
Chuck Lever 已提交
460 461 462 463 464 465 466 467 468 469
static void frwr_wc_localinv_wake(struct ib_cq *cq, struct ib_wc *wc)
{
	struct ib_cqe *cqe = wc->wr_cqe;
	struct rpcrdma_frwr *frwr =
		container_of(cqe, struct rpcrdma_frwr, fr_cqe);
	struct rpcrdma_mr *mr = container_of(frwr, struct rpcrdma_mr, frwr);

	/* WARNING: Only wr_cqe and status are reliable at this point */
	trace_xprtrdma_wc_li_wake(wc, frwr);
	__frwr_release_mr(wc, mr);
470
	complete(&frwr->fr_linv_done);
C
Chuck Lever 已提交
471 472 473 474 475 476 477 478
}

/**
 * frwr_unmap_sync - invalidate memory regions that were registered for @req
 * @r_xprt: controlling transport instance
 * @req: rpcrdma_req with a non-empty list of MRs to process
 *
 * Sleeps until it is safe for the host CPU to access the previously mapped
479 480 481 482
 * memory regions. This guarantees that registered MRs are properly fenced
 * from the server before the RPC consumer accesses the data in them. It
 * also ensures proper Send flow control: waking the next RPC waits until
 * this RPC has relinquished all its Send Queue entries.
C
Chuck Lever 已提交
483 484
 */
void frwr_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
485
{
486 487
	struct ib_send_wr *first, **prev, *last;
	const struct ib_send_wr *bad_wr;
488
	struct rpcrdma_frwr *frwr;
C
Chuck Lever 已提交
489
	struct rpcrdma_mr *mr;
C
Chuck Lever 已提交
490
	int rc;
491

492
	/* ORDER: Invalidate all of the MRs first
493 494 495 496
	 *
	 * Chain the LOCAL_INV Work Requests and post them with
	 * a single ib_post_send() call.
	 */
497
	frwr = NULL;
C
Chuck Lever 已提交
498
	prev = &first;
C
Chuck Lever 已提交
499
	while ((mr = rpcrdma_mr_pop(&req->rl_registered))) {
C
Chuck Lever 已提交
500

501
		trace_xprtrdma_mr_localinv(mr);
C
Chuck Lever 已提交
502
		r_xprt->rx_stats.local_inv_needed++;
C
Chuck Lever 已提交
503

C
Chuck Lever 已提交
504
		frwr = &mr->frwr;
505 506
		frwr->fr_cqe.done = frwr_wc_localinv;
		last = &frwr->fr_invwr;
C
Chuck Lever 已提交
507
		last->next = NULL;
508
		last->wr_cqe = &frwr->fr_cqe;
C
Chuck Lever 已提交
509 510
		last->sg_list = NULL;
		last->num_sge = 0;
C
Chuck Lever 已提交
511
		last->opcode = IB_WR_LOCAL_INV;
C
Chuck Lever 已提交
512
		last->send_flags = IB_SEND_SIGNALED;
C
Chuck Lever 已提交
513
		last->ex.invalidate_rkey = mr->mr_handle;
514

C
Chuck Lever 已提交
515 516
		*prev = last;
		prev = &last->next;
517 518 519 520 521 522
	}

	/* Strong send queue ordering guarantees that when the
	 * last WR in the chain completes, all WRs in the chain
	 * are complete.
	 */
523 524
	frwr->fr_cqe.done = frwr_wc_localinv_wake;
	reinit_completion(&frwr->fr_linv_done);
525

526 527 528 529
	/* Transport disconnect drains the receive CQ before it
	 * replaces the QP. The RPC reply handler won't call us
	 * unless ri_id->qp is a valid pointer.
	 */
530
	bad_wr = NULL;
C
Chuck Lever 已提交
531
	rc = ib_post_send(r_xprt->rx_ia.ri_id->qp, first, &bad_wr);
532

C
Chuck Lever 已提交
533 534 535
	/* The final LOCAL_INV WR in the chain is supposed to
	 * do the wake. If it was never posted, the wake will
	 * not happen, so don't wait in that case.
536
	 */
C
Chuck Lever 已提交
537 538 539 540
	if (bad_wr != first)
		wait_for_completion(&frwr->fr_linv_done);
	if (!rc)
		return;
541

C
Chuck Lever 已提交
542
	/* Recycle MRs in the LOCAL_INV chain that did not get posted.
543
	 */
544
	trace_xprtrdma_post_linv(req, rc);
545
	while (bad_wr) {
546 547
		frwr = container_of(bad_wr, struct rpcrdma_frwr,
				    fr_invwr);
C
Chuck Lever 已提交
548
		mr = container_of(frwr, struct rpcrdma_mr, frwr);
549
		bad_wr = bad_wr->next;
550

551
		list_del_init(&mr->mr_list);
552
		frwr_mr_recycle(mr);
553
	}
554
}
555 556 557 558 559 560 561 562 563 564 565 566 567

/**
 * frwr_wc_localinv_done - Invoked by RDMA provider for a signaled LOCAL_INV WC
 * @cq:	completion queue (ignored)
 * @wc:	completed WR
 *
 */
static void frwr_wc_localinv_done(struct ib_cq *cq, struct ib_wc *wc)
{
	struct ib_cqe *cqe = wc->wr_cqe;
	struct rpcrdma_frwr *frwr =
		container_of(cqe, struct rpcrdma_frwr, fr_cqe);
	struct rpcrdma_mr *mr = container_of(frwr, struct rpcrdma_mr, frwr);
568
	struct rpcrdma_rep *rep = mr->mr_req->rl_reply;
569 570 571 572

	/* WARNING: Only wr_cqe and status are reliable at this point */
	trace_xprtrdma_wc_li_done(wc, frwr);
	__frwr_release_mr(wc, mr);
573 574 575 576

	/* Ensure @rep is generated before __frwr_release_mr */
	smp_rmb();
	rpcrdma_complete_rqst(rep);
577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601
}

/**
 * frwr_unmap_async - invalidate memory regions that were registered for @req
 * @r_xprt: controlling transport instance
 * @req: rpcrdma_req with a non-empty list of MRs to process
 *
 * This guarantees that registered MRs are properly fenced from the
 * server before the RPC consumer accesses the data in them. It also
 * ensures proper Send flow control: waking the next RPC waits until
 * this RPC has relinquished all its Send Queue entries.
 */
void frwr_unmap_async(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
{
	struct ib_send_wr *first, *last, **prev;
	const struct ib_send_wr *bad_wr;
	struct rpcrdma_frwr *frwr;
	struct rpcrdma_mr *mr;
	int rc;

	/* Chain the LOCAL_INV Work Requests and post them with
	 * a single ib_post_send() call.
	 */
	frwr = NULL;
	prev = &first;
C
Chuck Lever 已提交
602
	while ((mr = rpcrdma_mr_pop(&req->rl_registered))) {
603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639

		trace_xprtrdma_mr_localinv(mr);
		r_xprt->rx_stats.local_inv_needed++;

		frwr = &mr->frwr;
		frwr->fr_cqe.done = frwr_wc_localinv;
		last = &frwr->fr_invwr;
		last->next = NULL;
		last->wr_cqe = &frwr->fr_cqe;
		last->sg_list = NULL;
		last->num_sge = 0;
		last->opcode = IB_WR_LOCAL_INV;
		last->send_flags = IB_SEND_SIGNALED;
		last->ex.invalidate_rkey = mr->mr_handle;

		*prev = last;
		prev = &last->next;
	}

	/* Strong send queue ordering guarantees that when the
	 * last WR in the chain completes, all WRs in the chain
	 * are complete. The last completion will wake up the
	 * RPC waiter.
	 */
	frwr->fr_cqe.done = frwr_wc_localinv_done;

	/* Transport disconnect drains the receive CQ before it
	 * replaces the QP. The RPC reply handler won't call us
	 * unless ri_id->qp is a valid pointer.
	 */
	bad_wr = NULL;
	rc = ib_post_send(r_xprt->rx_ia.ri_id->qp, first, &bad_wr);
	if (!rc)
		return;

	/* Recycle MRs in the LOCAL_INV chain that did not get posted.
	 */
640
	trace_xprtrdma_post_linv(req, rc);
641 642 643 644 645
	while (bad_wr) {
		frwr = container_of(bad_wr, struct rpcrdma_frwr, fr_invwr);
		mr = container_of(frwr, struct rpcrdma_mr, frwr);
		bad_wr = bad_wr->next;

646
		frwr_mr_recycle(mr);
647 648 649 650 651 652 653 654
	}

	/* The final LOCAL_INV WR in the chain is supposed to
	 * do the wake. If it was never posted, the wake will
	 * not happen, so wake here in that case.
	 */
	rpcrdma_complete_rqst(req->rl_reply);
}