frwr_ops.c 18.5 KB
Newer Older
1
// SPDX-License-Identifier: GPL-2.0
2
/*
3
 * Copyright (c) 2015, 2017 Oracle.  All rights reserved.
4 5 6 7
 * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
 */

/* Lightweight memory registration using Fast Registration Work
8
 * Requests (FRWR).
9
 *
10 11
 * FRWR features ordered asynchronous registration and invalidation
 * of arbitrarily-sized memory regions. This is the fastest and safest
12 13 14
 * but most complex memory registration mode.
 */

15 16
/* Normal operation
 *
17
 * A Memory Region is prepared for RDMA Read or Write using a FAST_REG
18
 * Work Request (frwr_map). When the RDMA operation is finished, this
19
 * Memory Region is invalidated using a LOCAL_INV Work Request
20
 * (frwr_unmap_async and frwr_unmap_sync).
21
 *
22 23 24
 * Typically FAST_REG Work Requests are not signaled, and neither are
 * RDMA Send Work Requests (with the exception of signaling occasionally
 * to prevent provider work queue overflows). This greatly reduces HCA
25 26 27 28 29
 * interrupt workload.
 */

/* Transport recovery
 *
30 31 32 33 34 35 36 37 38 39 40
 * frwr_map and frwr_unmap_* cannot run at the same time the transport
 * connect worker is running. The connect worker holds the transport
 * send lock, just as ->send_request does. This prevents frwr_map and
 * the connect worker from running concurrently. When a connection is
 * closed, the Receive completion queue is drained before the allowing
 * the connect worker to get control. This prevents frwr_unmap and the
 * connect worker from running concurrently.
 *
 * When the underlying transport disconnects, MRs that are in flight
 * are flushed and are likely unusable. Thus all flushed MRs are
 * destroyed. New MRs are created on demand.
41 42
 */

43
#include <linux/sunrpc/rpc_rdma.h>
C
Chuck Lever 已提交
44
#include <linux/sunrpc/svc_rdma.h>
45

46
#include "xprt_rdma.h"
47
#include <trace/events/rpcrdma.h>
48 49 50 51 52

#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
# define RPCDBG_FACILITY	RPCDBG_TRANS
#endif

53 54
/**
 * frwr_is_supported - Check if device supports FRWR
55
 * @device: interface adapter to check
56 57 58
 *
 * Returns true if device supports FRWR, otherwise false
 */
59
bool frwr_is_supported(struct ib_device *device)
60
{
61
	struct ib_device_attr *attrs = &device->attrs;
62 63 64 65 66 67 68 69 70

	if (!(attrs->device_cap_flags & IB_DEVICE_MEM_MGT_EXTENSIONS))
		goto out_not_supported;
	if (attrs->max_fast_reg_page_list_len == 0)
		goto out_not_supported;
	return true;

out_not_supported:
	pr_info("rpcrdma: 'frwr' mode is not supported by device %s\n",
71
		device->name);
72 73 74
	return false;
}

75 76 77 78 79 80
/**
 * frwr_release_mr - Destroy one MR
 * @mr: MR allocated by frwr_init_mr
 *
 */
void frwr_release_mr(struct rpcrdma_mr *mr)
81 82 83 84 85
{
	int rc;

	rc = ib_dereg_mr(mr->frwr.fr_mr);
	if (rc)
86
		trace_xprtrdma_frwr_dereg(mr, rc);
87 88 89 90 91 92 93 94 95 96 97 98 99 100 101
	kfree(mr->mr_sg);
	kfree(mr);
}

/* MRs are dynamically allocated, so simply clean up and release the MR.
 * A replacement MR will subsequently be allocated on demand.
 */
static void
frwr_mr_recycle_worker(struct work_struct *work)
{
	struct rpcrdma_mr *mr = container_of(work, struct rpcrdma_mr, mr_recycle);
	struct rpcrdma_xprt *r_xprt = mr->mr_xprt;

	trace_xprtrdma_mr_recycle(mr);

102
	if (mr->mr_dir != DMA_NONE) {
103
		trace_xprtrdma_mr_unmap(mr);
104
		ib_dma_unmap_sg(r_xprt->rx_ia.ri_id->device,
105
				mr->mr_sg, mr->mr_nents, mr->mr_dir);
106
		mr->mr_dir = DMA_NONE;
107 108 109 110 111 112
	}

	spin_lock(&r_xprt->rx_buf.rb_mrlock);
	list_del(&mr->mr_all);
	r_xprt->rx_stats.mrs_recycled++;
	spin_unlock(&r_xprt->rx_buf.rb_mrlock);
113 114

	frwr_release_mr(mr);
115 116
}

117 118 119 120 121 122 123 124 125 126 127 128
/* frwr_reset - Place MRs back on the free list
 * @req: request to reset
 *
 * Used after a failed marshal. For FRWR, this means the MRs
 * don't have to be fully released and recreated.
 *
 * NB: This is safe only as long as none of @req's MRs are
 * involved with an ongoing asynchronous FAST_REG or LOCAL_INV
 * Work Request.
 */
void frwr_reset(struct rpcrdma_req *req)
{
C
Chuck Lever 已提交
129
	struct rpcrdma_mr *mr;
130

C
Chuck Lever 已提交
131
	while ((mr = rpcrdma_mr_pop(&req->rl_registered)))
132
		rpcrdma_mr_put(mr);
133 134
}

135 136 137 138 139 140 141 142 143
/**
 * frwr_init_mr - Initialize one MR
 * @ia: interface adapter
 * @mr: generic MR to prepare for FRWR
 *
 * Returns zero if successful. Otherwise a negative errno
 * is returned.
 */
int frwr_init_mr(struct rpcrdma_ia *ia, struct rpcrdma_mr *mr)
144
{
145
	unsigned int depth = ia->ri_max_frwr_depth;
C
Chuck Lever 已提交
146 147
	struct scatterlist *sg;
	struct ib_mr *frmr;
148 149
	int rc;

150 151 152
	/* NB: ib_alloc_mr and device drivers typically allocate
	 *     memory with GFP_KERNEL.
	 */
C
Chuck Lever 已提交
153 154
	frmr = ib_alloc_mr(ia->ri_pd, ia->ri_mrtype, depth);
	if (IS_ERR(frmr))
155 156
		goto out_mr_err;

157
	sg = kcalloc(depth, sizeof(*sg), GFP_NOFS);
C
Chuck Lever 已提交
158
	if (!sg)
159 160
		goto out_list_err;

C
Chuck Lever 已提交
161
	mr->frwr.fr_mr = frmr;
162
	mr->mr_dir = DMA_NONE;
163
	INIT_LIST_HEAD(&mr->mr_list);
164
	INIT_WORK(&mr->mr_recycle, frwr_mr_recycle_worker);
C
Chuck Lever 已提交
165 166 167 168
	init_completion(&mr->frwr.fr_linv_done);

	sg_init_table(sg, depth);
	mr->mr_sg = sg;
169 170 171
	return 0;

out_mr_err:
C
Chuck Lever 已提交
172
	rc = PTR_ERR(frmr);
173
	trace_xprtrdma_frwr_alloc(mr, rc);
174 175 176
	return rc;

out_list_err:
C
Chuck Lever 已提交
177 178
	ib_dereg_mr(frmr);
	return -ENOMEM;
179 180
}

181 182 183 184 185 186
/**
 * frwr_open - Prepare an endpoint for use with FRWR
 * @ia: interface adapter this endpoint will use
 * @ep: endpoint to prepare
 *
 * On success, sets:
187 188
 *	ep->rep_attr.cap.max_send_wr
 *	ep->rep_attr.cap.max_recv_wr
189
 *	ep->rep_max_requests
190 191 192 193 194
 *	ia->ri_max_segs
 *
 * And these FRWR-related fields:
 *	ia->ri_max_frwr_depth
 *	ia->ri_mrtype
195 196
 *
 * On failure, a negative errno is returned.
197
 */
198
int frwr_open(struct rpcrdma_ia *ia, struct rpcrdma_ep *ep)
C
Chuck Lever 已提交
199
{
200
	struct ib_device_attr *attrs = &ia->ri_id->device->attrs;
201
	int max_qp_wr, depth, delta;
C
Chuck Lever 已提交
202

C
Chuck Lever 已提交
203 204 205 206
	ia->ri_mrtype = IB_MR_TYPE_MEM_REG;
	if (attrs->device_cap_flags & IB_DEVICE_SG_GAPS_REG)
		ia->ri_mrtype = IB_MR_TYPE_SG_GAPS;

C
Chuck Lever 已提交
207 208 209 210 211 212 213 214 215 216 217
	/* Quirk: Some devices advertise a large max_fast_reg_page_list_len
	 * capability, but perform optimally when the MRs are not larger
	 * than a page.
	 */
	if (attrs->max_sge_rd > 1)
		ia->ri_max_frwr_depth = attrs->max_sge_rd;
	else
		ia->ri_max_frwr_depth = attrs->max_fast_reg_page_list_len;
	if (ia->ri_max_frwr_depth > RPCRDMA_MAX_DATA_SEGS)
		ia->ri_max_frwr_depth = RPCRDMA_MAX_DATA_SEGS;
	dprintk("RPC:       %s: max FR page list depth = %u\n",
218 219 220 221 222 223 224 225 226
		__func__, ia->ri_max_frwr_depth);

	/* Add room for frwr register and invalidate WRs.
	 * 1. FRWR reg WR for head
	 * 2. FRWR invalidate WR for head
	 * 3. N FRWR reg WRs for pagelist
	 * 4. N FRWR invalidate WRs for pagelist
	 * 5. FRWR reg WR for tail
	 * 6. FRWR invalidate WR for tail
C
Chuck Lever 已提交
227 228 229 230
	 * 7. The RDMA_SEND WR
	 */
	depth = 7;

231
	/* Calculate N if the device max FRWR depth is smaller than
C
Chuck Lever 已提交
232 233
	 * RPCRDMA_MAX_DATA_SEGS.
	 */
234 235
	if (ia->ri_max_frwr_depth < RPCRDMA_MAX_DATA_SEGS) {
		delta = RPCRDMA_MAX_DATA_SEGS - ia->ri_max_frwr_depth;
C
Chuck Lever 已提交
236
		do {
237 238
			depth += 2; /* FRWR reg + invalidate */
			delta -= ia->ri_max_frwr_depth;
C
Chuck Lever 已提交
239 240 241
		} while (delta > 0);
	}

242
	max_qp_wr = ia->ri_id->device->attrs.max_qp_wr;
243 244 245 246
	max_qp_wr -= RPCRDMA_BACKWARD_WRS;
	max_qp_wr -= 1;
	if (max_qp_wr < RPCRDMA_MIN_SLOT_TABLE)
		return -ENOMEM;
247 248 249
	if (ep->rep_max_requests > max_qp_wr)
		ep->rep_max_requests = max_qp_wr;
	ep->rep_attr.cap.max_send_wr = ep->rep_max_requests * depth;
250
	if (ep->rep_attr.cap.max_send_wr > max_qp_wr) {
251 252
		ep->rep_max_requests = max_qp_wr / depth;
		if (!ep->rep_max_requests)
C
Chuck Lever 已提交
253
			return -EINVAL;
254
		ep->rep_attr.cap.max_send_wr = ep->rep_max_requests * depth;
C
Chuck Lever 已提交
255
	}
256 257
	ep->rep_attr.cap.max_send_wr += RPCRDMA_BACKWARD_WRS;
	ep->rep_attr.cap.max_send_wr += 1; /* for ib_drain_sq */
258
	ep->rep_attr.cap.max_recv_wr = ep->rep_max_requests;
259 260
	ep->rep_attr.cap.max_recv_wr += RPCRDMA_BACKWARD_WRS;
	ep->rep_attr.cap.max_recv_wr += 1; /* for ib_drain_rq */
C
Chuck Lever 已提交
261

262 263
	ia->ri_max_segs =
		DIV_ROUND_UP(RPCRDMA_MAX_DATA_SEGS, ia->ri_max_frwr_depth);
264 265 266 267
	/* Reply chunks require segments for head and tail buffers */
	ia->ri_max_segs += 2;
	if (ia->ri_max_segs > RPCRDMA_MAX_HDR_SEGS)
		ia->ri_max_segs = RPCRDMA_MAX_HDR_SEGS;
C
Chuck Lever 已提交
268 269 270
	return 0;
}

271 272 273 274 275 276 277
/**
 * frwr_maxpages - Compute size of largest payload
 * @r_xprt: transport
 *
 * Returns maximum size of an RPC message, in pages.
 *
 * FRWR mode conveys a list of pages per chunk segment. The
278 279
 * maximum length of that list is the FRWR page list depth.
 */
280
size_t frwr_maxpages(struct rpcrdma_xprt *r_xprt)
281 282 283 284
{
	struct rpcrdma_ia *ia = &r_xprt->rx_ia;

	return min_t(unsigned int, RPCRDMA_MAX_DATA_SEGS,
285
		     (ia->ri_max_segs - 2) * ia->ri_max_frwr_depth);
286 287
}

288 289 290 291 292 293
/**
 * frwr_map - Register a memory region
 * @r_xprt: controlling transport
 * @seg: memory region co-ordinates
 * @nsegs: number of segments remaining
 * @writing: true when RDMA Write will be used
294
 * @xid: XID of RPC using the registered memory
295
 * @mr: MR to fill in
296 297
 *
 * Prepare a REG_MR Work Request to register a memory region
298
 * for remote access via RDMA READ or RDMA WRITE.
299 300
 *
 * Returns the next segment or a negative errno pointer.
301
 * On success, @mr is filled in.
302
 */
303 304
struct rpcrdma_mr_seg *frwr_map(struct rpcrdma_xprt *r_xprt,
				struct rpcrdma_mr_seg *seg,
C
Chuck Lever 已提交
305
				int nsegs, bool writing, __be32 xid,
306
				struct rpcrdma_mr *mr)
307 308
{
	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
309
	struct ib_reg_wr *reg_wr;
310
	struct ib_mr *ibmr;
311
	int i, n;
312 313
	u8 key;

314 315
	if (nsegs > ia->ri_max_frwr_depth)
		nsegs = ia->ri_max_frwr_depth;
316 317
	for (i = 0; i < nsegs;) {
		if (seg->mr_page)
C
Chuck Lever 已提交
318
			sg_set_page(&mr->mr_sg[i],
319 320 321 322
				    seg->mr_page,
				    seg->mr_len,
				    offset_in_page(seg->mr_offset));
		else
C
Chuck Lever 已提交
323
			sg_set_buf(&mr->mr_sg[i], seg->mr_offset,
324 325
				   seg->mr_len);

326 327
		++seg;
		++i;
328
		if (ia->ri_mrtype == IB_MR_TYPE_SG_GAPS)
C
Chuck Lever 已提交
329
			continue;
330 331 332 333
		if ((i < nsegs && offset_in_page(seg->mr_offset)) ||
		    offset_in_page((seg-1)->mr_offset + (seg-1)->mr_len))
			break;
	}
C
Chuck Lever 已提交
334
	mr->mr_dir = rpcrdma_data_dir(writing);
335

336 337
	mr->mr_nents =
		ib_dma_map_sg(ia->ri_id->device, mr->mr_sg, i, mr->mr_dir);
C
Chuck Lever 已提交
338
	if (!mr->mr_nents)
339 340
		goto out_dmamap_err;

C
Chuck Lever 已提交
341
	ibmr = mr->frwr.fr_mr;
C
Chuck Lever 已提交
342 343
	n = ib_map_mr_sg(ibmr, mr->mr_sg, mr->mr_nents, NULL, PAGE_SIZE);
	if (unlikely(n != mr->mr_nents))
344
		goto out_mapmr_err;
345

346
	ibmr->iova &= 0x00000000ffffffff;
C
Chuck Lever 已提交
347
	ibmr->iova |= ((u64)be32_to_cpu(xid)) << 32;
C
Chuck Lever 已提交
348 349
	key = (u8)(ibmr->rkey & 0x000000FF);
	ib_update_fast_reg_key(ibmr, ++key);
350

C
Chuck Lever 已提交
351
	reg_wr = &mr->frwr.fr_regwr;
C
Chuck Lever 已提交
352 353
	reg_wr->mr = ibmr;
	reg_wr->key = ibmr->rkey;
354 355 356
	reg_wr->access = writing ?
			 IB_ACCESS_REMOTE_WRITE | IB_ACCESS_LOCAL_WRITE :
			 IB_ACCESS_REMOTE_READ;
357

C
Chuck Lever 已提交
358 359 360
	mr->mr_handle = ibmr->rkey;
	mr->mr_length = ibmr->length;
	mr->mr_offset = ibmr->iova;
361
	trace_xprtrdma_mr_map(mr);
362

363
	return seg;
364 365

out_dmamap_err:
366
	mr->mr_dir = DMA_NONE;
367
	trace_xprtrdma_frwr_sgerr(mr, i);
368
	return ERR_PTR(-EIO);
369 370

out_mapmr_err:
371
	trace_xprtrdma_frwr_maperr(mr, n);
372
	return ERR_PTR(-EIO);
373
}
374

C
Chuck Lever 已提交
375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391
/**
 * frwr_wc_fastreg - Invoked by RDMA provider for a flushed FastReg WC
 * @cq:	completion queue (ignored)
 * @wc:	completed WR
 *
 */
static void frwr_wc_fastreg(struct ib_cq *cq, struct ib_wc *wc)
{
	struct ib_cqe *cqe = wc->wr_cqe;
	struct rpcrdma_frwr *frwr =
		container_of(cqe, struct rpcrdma_frwr, fr_cqe);

	/* WARNING: Only wr_cqe and status are reliable at this point */
	trace_xprtrdma_wc_fastreg(wc, frwr);
	/* The MR will get recycled when the associated req is retransmitted */
}

392 393 394 395
/**
 * frwr_send - post Send WR containing the RPC Call message
 * @ia: interface adapter
 * @req: Prepared RPC Call
396
 *
397
 * For FRWR, chain any FastReg WRs to the Send WR. Only a
398 399
 * single ib_post_send call is needed to register memory
 * and then post the Send WR.
400 401
 *
 * Returns the result of ib_post_send.
402
 */
403
int frwr_send(struct rpcrdma_ia *ia, struct rpcrdma_req *req)
404
{
405
	struct ib_send_wr *post_wr;
406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424
	struct rpcrdma_mr *mr;

	post_wr = &req->rl_sendctx->sc_wr;
	list_for_each_entry(mr, &req->rl_registered, mr_list) {
		struct rpcrdma_frwr *frwr;

		frwr = &mr->frwr;

		frwr->fr_cqe.done = frwr_wc_fastreg;
		frwr->fr_regwr.wr.next = post_wr;
		frwr->fr_regwr.wr.wr_cqe = &frwr->fr_cqe;
		frwr->fr_regwr.wr.num_sge = 0;
		frwr->fr_regwr.wr.opcode = IB_WR_REG_MR;
		frwr->fr_regwr.wr.send_flags = 0;

		post_wr = &frwr->fr_regwr.wr;
	}

	/* If ib_post_send fails, the next ->send_request for
425
	 * @req will queue these MRs for recovery.
426
	 */
427
	return ib_post_send(ia->ri_id->qp, post_wr, NULL);
428 429
}

430 431 432 433 434
/**
 * frwr_reminv - handle a remotely invalidated mr on the @mrs list
 * @rep: Received reply
 * @mrs: list of MRs to check
 *
435
 */
436
void frwr_reminv(struct rpcrdma_rep *rep, struct list_head *mrs)
437
{
C
Chuck Lever 已提交
438
	struct rpcrdma_mr *mr;
439

C
Chuck Lever 已提交
440 441
	list_for_each_entry(mr, mrs, mr_list)
		if (mr->mr_handle == rep->rr_inv_rkey) {
442
			list_del_init(&mr->mr_list);
443
			trace_xprtrdma_mr_remoteinv(mr);
444
			rpcrdma_mr_put(mr);
445 446 447 448
			break;	/* only one invalidated MR per RPC */
		}
}

C
Chuck Lever 已提交
449 450 451 452 453
static void __frwr_release_mr(struct ib_wc *wc, struct rpcrdma_mr *mr)
{
	if (wc->status != IB_WC_SUCCESS)
		rpcrdma_mr_recycle(mr);
	else
454
		rpcrdma_mr_put(mr);
C
Chuck Lever 已提交
455 456
}

457
/**
C
Chuck Lever 已提交
458 459 460
 * frwr_wc_localinv - Invoked by RDMA provider for a LOCAL_INV WC
 * @cq:	completion queue (ignored)
 * @wc:	completed WR
461
 *
C
Chuck Lever 已提交
462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478
 */
static void frwr_wc_localinv(struct ib_cq *cq, struct ib_wc *wc)
{
	struct ib_cqe *cqe = wc->wr_cqe;
	struct rpcrdma_frwr *frwr =
		container_of(cqe, struct rpcrdma_frwr, fr_cqe);
	struct rpcrdma_mr *mr = container_of(frwr, struct rpcrdma_mr, frwr);

	/* WARNING: Only wr_cqe and status are reliable at this point */
	trace_xprtrdma_wc_li(wc, frwr);
	__frwr_release_mr(wc, mr);
}

/**
 * frwr_wc_localinv_wake - Invoked by RDMA provider for a LOCAL_INV WC
 * @cq:	completion queue (ignored)
 * @wc:	completed WR
479
 *
C
Chuck Lever 已提交
480
 * Awaken anyone waiting for an MR to finish being fenced.
481
 */
C
Chuck Lever 已提交
482 483 484 485 486 487 488 489 490 491
static void frwr_wc_localinv_wake(struct ib_cq *cq, struct ib_wc *wc)
{
	struct ib_cqe *cqe = wc->wr_cqe;
	struct rpcrdma_frwr *frwr =
		container_of(cqe, struct rpcrdma_frwr, fr_cqe);
	struct rpcrdma_mr *mr = container_of(frwr, struct rpcrdma_mr, frwr);

	/* WARNING: Only wr_cqe and status are reliable at this point */
	trace_xprtrdma_wc_li_wake(wc, frwr);
	__frwr_release_mr(wc, mr);
492
	complete(&frwr->fr_linv_done);
C
Chuck Lever 已提交
493 494 495 496 497 498 499 500
}

/**
 * frwr_unmap_sync - invalidate memory regions that were registered for @req
 * @r_xprt: controlling transport instance
 * @req: rpcrdma_req with a non-empty list of MRs to process
 *
 * Sleeps until it is safe for the host CPU to access the previously mapped
501 502 503 504
 * memory regions. This guarantees that registered MRs are properly fenced
 * from the server before the RPC consumer accesses the data in them. It
 * also ensures proper Send flow control: waking the next RPC waits until
 * this RPC has relinquished all its Send Queue entries.
C
Chuck Lever 已提交
505 506
 */
void frwr_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
507
{
508 509
	struct ib_send_wr *first, **prev, *last;
	const struct ib_send_wr *bad_wr;
510
	struct rpcrdma_frwr *frwr;
C
Chuck Lever 已提交
511
	struct rpcrdma_mr *mr;
C
Chuck Lever 已提交
512
	int rc;
513

514
	/* ORDER: Invalidate all of the MRs first
515 516 517 518
	 *
	 * Chain the LOCAL_INV Work Requests and post them with
	 * a single ib_post_send() call.
	 */
519
	frwr = NULL;
C
Chuck Lever 已提交
520
	prev = &first;
C
Chuck Lever 已提交
521
	while ((mr = rpcrdma_mr_pop(&req->rl_registered))) {
C
Chuck Lever 已提交
522

523
		trace_xprtrdma_mr_localinv(mr);
C
Chuck Lever 已提交
524
		r_xprt->rx_stats.local_inv_needed++;
C
Chuck Lever 已提交
525

C
Chuck Lever 已提交
526
		frwr = &mr->frwr;
527 528
		frwr->fr_cqe.done = frwr_wc_localinv;
		last = &frwr->fr_invwr;
C
Chuck Lever 已提交
529
		last->next = NULL;
530
		last->wr_cqe = &frwr->fr_cqe;
C
Chuck Lever 已提交
531 532
		last->sg_list = NULL;
		last->num_sge = 0;
C
Chuck Lever 已提交
533
		last->opcode = IB_WR_LOCAL_INV;
C
Chuck Lever 已提交
534
		last->send_flags = IB_SEND_SIGNALED;
C
Chuck Lever 已提交
535
		last->ex.invalidate_rkey = mr->mr_handle;
536

C
Chuck Lever 已提交
537 538
		*prev = last;
		prev = &last->next;
539 540 541 542 543 544
	}

	/* Strong send queue ordering guarantees that when the
	 * last WR in the chain completes, all WRs in the chain
	 * are complete.
	 */
545 546
	frwr->fr_cqe.done = frwr_wc_localinv_wake;
	reinit_completion(&frwr->fr_linv_done);
547

548 549 550 551
	/* Transport disconnect drains the receive CQ before it
	 * replaces the QP. The RPC reply handler won't call us
	 * unless ri_id->qp is a valid pointer.
	 */
552
	bad_wr = NULL;
C
Chuck Lever 已提交
553 554
	rc = ib_post_send(r_xprt->rx_ia.ri_id->qp, first, &bad_wr);
	trace_xprtrdma_post_send(req, rc);
555

C
Chuck Lever 已提交
556 557 558
	/* The final LOCAL_INV WR in the chain is supposed to
	 * do the wake. If it was never posted, the wake will
	 * not happen, so don't wait in that case.
559
	 */
C
Chuck Lever 已提交
560 561 562 563
	if (bad_wr != first)
		wait_for_completion(&frwr->fr_linv_done);
	if (!rc)
		return;
564

C
Chuck Lever 已提交
565
	/* Recycle MRs in the LOCAL_INV chain that did not get posted.
566
	 */
567
	while (bad_wr) {
568 569
		frwr = container_of(bad_wr, struct rpcrdma_frwr,
				    fr_invwr);
C
Chuck Lever 已提交
570
		mr = container_of(frwr, struct rpcrdma_mr, frwr);
571
		bad_wr = bad_wr->next;
572

573 574
		list_del_init(&mr->mr_list);
		rpcrdma_mr_recycle(mr);
575
	}
576
}
577 578 579 580 581 582 583 584 585 586 587 588 589

/**
 * frwr_wc_localinv_done - Invoked by RDMA provider for a signaled LOCAL_INV WC
 * @cq:	completion queue (ignored)
 * @wc:	completed WR
 *
 */
static void frwr_wc_localinv_done(struct ib_cq *cq, struct ib_wc *wc)
{
	struct ib_cqe *cqe = wc->wr_cqe;
	struct rpcrdma_frwr *frwr =
		container_of(cqe, struct rpcrdma_frwr, fr_cqe);
	struct rpcrdma_mr *mr = container_of(frwr, struct rpcrdma_mr, frwr);
590
	struct rpcrdma_rep *rep = mr->mr_req->rl_reply;
591 592 593 594

	/* WARNING: Only wr_cqe and status are reliable at this point */
	trace_xprtrdma_wc_li_done(wc, frwr);
	__frwr_release_mr(wc, mr);
595 596 597 598

	/* Ensure @rep is generated before __frwr_release_mr */
	smp_rmb();
	rpcrdma_complete_rqst(rep);
599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623
}

/**
 * frwr_unmap_async - invalidate memory regions that were registered for @req
 * @r_xprt: controlling transport instance
 * @req: rpcrdma_req with a non-empty list of MRs to process
 *
 * This guarantees that registered MRs are properly fenced from the
 * server before the RPC consumer accesses the data in them. It also
 * ensures proper Send flow control: waking the next RPC waits until
 * this RPC has relinquished all its Send Queue entries.
 */
void frwr_unmap_async(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
{
	struct ib_send_wr *first, *last, **prev;
	const struct ib_send_wr *bad_wr;
	struct rpcrdma_frwr *frwr;
	struct rpcrdma_mr *mr;
	int rc;

	/* Chain the LOCAL_INV Work Requests and post them with
	 * a single ib_post_send() call.
	 */
	frwr = NULL;
	prev = &first;
C
Chuck Lever 已提交
624
	while ((mr = rpcrdma_mr_pop(&req->rl_registered))) {
625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676

		trace_xprtrdma_mr_localinv(mr);
		r_xprt->rx_stats.local_inv_needed++;

		frwr = &mr->frwr;
		frwr->fr_cqe.done = frwr_wc_localinv;
		last = &frwr->fr_invwr;
		last->next = NULL;
		last->wr_cqe = &frwr->fr_cqe;
		last->sg_list = NULL;
		last->num_sge = 0;
		last->opcode = IB_WR_LOCAL_INV;
		last->send_flags = IB_SEND_SIGNALED;
		last->ex.invalidate_rkey = mr->mr_handle;

		*prev = last;
		prev = &last->next;
	}

	/* Strong send queue ordering guarantees that when the
	 * last WR in the chain completes, all WRs in the chain
	 * are complete. The last completion will wake up the
	 * RPC waiter.
	 */
	frwr->fr_cqe.done = frwr_wc_localinv_done;

	/* Transport disconnect drains the receive CQ before it
	 * replaces the QP. The RPC reply handler won't call us
	 * unless ri_id->qp is a valid pointer.
	 */
	bad_wr = NULL;
	rc = ib_post_send(r_xprt->rx_ia.ri_id->qp, first, &bad_wr);
	trace_xprtrdma_post_send(req, rc);
	if (!rc)
		return;

	/* Recycle MRs in the LOCAL_INV chain that did not get posted.
	 */
	while (bad_wr) {
		frwr = container_of(bad_wr, struct rpcrdma_frwr, fr_invwr);
		mr = container_of(frwr, struct rpcrdma_mr, frwr);
		bad_wr = bad_wr->next;

		rpcrdma_mr_recycle(mr);
	}

	/* The final LOCAL_INV WR in the chain is supposed to
	 * do the wake. If it was never posted, the wake will
	 * not happen, so wake here in that case.
	 */
	rpcrdma_complete_rqst(req->rl_reply);
}