verbs.c 38.2 KB
Newer Older
1
// SPDX-License-Identifier: GPL-2.0 OR BSD-3-Clause
2
/*
C
Chuck Lever 已提交
3
 * Copyright (c) 2014-2017 Oracle.  All rights reserved.
4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
 * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
 *
 * This software is available to you under a choice of one of two
 * licenses.  You may choose to be licensed under the terms of the GNU
 * General Public License (GPL) Version 2, available from the file
 * COPYING in the main directory of this source tree, or the BSD-type
 * license below:
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions
 * are met:
 *
 *      Redistributions of source code must retain the above copyright
 *      notice, this list of conditions and the following disclaimer.
 *
 *      Redistributions in binary form must reproduce the above
 *      copyright notice, this list of conditions and the following
 *      disclaimer in the documentation and/or other materials provided
 *      with the distribution.
 *
 *      Neither the name of the Network Appliance, Inc. nor the names of
 *      its contributors may be used to endorse or promote products
 *      derived from this software without specific prior written
 *      permission.
 *
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
40 41
 */

42 43 44 45 46 47 48 49 50 51
/*
 * verbs.c
 *
 * Encapsulates the major functions managing:
 *  o adapters
 *  o endpoints
 *  o connections
 *  o buffer memory
 */

52
#include <linux/interrupt.h>
53
#include <linux/slab.h>
54
#include <linux/sunrpc/addr.h>
55
#include <linux/sunrpc/svc_rdma.h>
56 57

#include <asm-generic/barrier.h>
58
#include <asm/bitops.h>
59

60
#include <rdma/ib_cm.h>
61

62
#include "xprt_rdma.h"
63
#include <trace/events/rpcrdma.h>
64

65 66 67 68
/*
 * Globals/Macros
 */

J
Jeff Layton 已提交
69
#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
70 71 72 73 74 75
# define RPCDBG_FACILITY	RPCDBG_TRANS
#endif

/*
 * internal functions
 */
76
static void rpcrdma_sendctx_put_locked(struct rpcrdma_sendctx *sc);
C
Chuck Lever 已提交
77 78
static void rpcrdma_mrs_create(struct rpcrdma_xprt *r_xprt);
static void rpcrdma_mrs_destroy(struct rpcrdma_buffer *buf);
79
static int rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt, bool temp);
80
static void rpcrdma_dma_unmap_regbuf(struct rpcrdma_regbuf *rb);
81
static void rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp);
82

83 84 85
/* Wait for outstanding transport work to finish.
 */
static void rpcrdma_xprt_drain(struct rpcrdma_xprt *r_xprt)
86
{
87 88
	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
89

90 91 92 93 94
	/* Flush Receives, then wait for deferred Reply work
	 * to complete.
	 */
	ib_drain_qp(ia->ri_id->qp);
	drain_workqueue(buf->rb_completion_wq);
95

96 97 98 99
	/* Deferred Reply processing might have scheduled
	 * local invalidations.
	 */
	ib_drain_sq(ia->ri_id->qp);
100 101
}

102 103 104 105 106 107 108 109
/**
 * rpcrdma_qp_event_handler - Handle one QP event (error notification)
 * @event: details of the event
 * @context: ep that owns QP where event occurred
 *
 * Called from the RDMA provider (device driver) possibly in an interrupt
 * context.
 */
110
static void
111
rpcrdma_qp_event_handler(struct ib_event *event, void *context)
112 113
{
	struct rpcrdma_ep *ep = context;
114 115
	struct rpcrdma_xprt *r_xprt = container_of(ep, struct rpcrdma_xprt,
						   rx_ep);
116

117
	trace_xprtrdma_qp_event(r_xprt, event);
118 119
}

120 121 122 123 124
/**
 * rpcrdma_wc_send - Invoked by RDMA provider for each polled Send WC
 * @cq:	completion queue (ignored)
 * @wc:	completed WR
 *
125 126
 */
static void
127
rpcrdma_wc_send(struct ib_cq *cq, struct ib_wc *wc)
128
{
129 130 131 132
	struct ib_cqe *cqe = wc->wr_cqe;
	struct rpcrdma_sendctx *sc =
		container_of(cqe, struct rpcrdma_sendctx, sc_cqe);

133
	/* WARNING: Only wr_cqe and status are reliable at this point */
134
	trace_xprtrdma_wc_send(sc, wc);
135 136 137 138
	if (wc->status != IB_WC_SUCCESS && wc->status != IB_WC_WR_FLUSH_ERR)
		pr_err("rpcrdma: Send: %s (%u/0x%x)\n",
		       ib_wc_status_msg(wc->status),
		       wc->status, wc->vendor_err);
139 140

	rpcrdma_sendctx_put_locked(sc);
141
}
142

143
/**
144
 * rpcrdma_wc_receive - Invoked by RDMA provider for each polled Receive WC
145 146 147 148
 * @cq:	completion queue (ignored)
 * @wc:	completed WR
 *
 */
149
static void
150
rpcrdma_wc_receive(struct ib_cq *cq, struct ib_wc *wc)
151
{
152 153 154
	struct ib_cqe *cqe = wc->wr_cqe;
	struct rpcrdma_rep *rep = container_of(cqe, struct rpcrdma_rep,
					       rr_cqe);
155
	struct rpcrdma_xprt *r_xprt = rep->rr_rxprt;
156

157
	/* WARNING: Only wr_cqe and status are reliable at this point */
158
	trace_xprtrdma_wc_receive(wc);
159
	--r_xprt->rx_ep.rep_receive_count;
160
	if (wc->status != IB_WC_SUCCESS)
161
		goto out_flushed;
162

163
	/* status == SUCCESS means all fields in wc are trustworthy */
164
	rpcrdma_set_xdrlen(&rep->rr_hdrbuf, wc->byte_len);
165 166 167
	rep->rr_wc_flags = wc->wc_flags;
	rep->rr_inv_rkey = wc->ex.invalidate_rkey;

168
	ib_dma_sync_single_for_cpu(rdmab_device(rep->rr_rdmabuf),
169
				   rdmab_addr(rep->rr_rdmabuf),
170
				   wc->byte_len, DMA_FROM_DEVICE);
171

172
	rpcrdma_post_recvs(r_xprt, false);
173
	rpcrdma_reply_handler(rep);
174
	return;
175

176
out_flushed:
177
	if (wc->status != IB_WC_WR_FLUSH_ERR)
178 179 180
		pr_err("rpcrdma: Recv: %s (%u/0x%x)\n",
		       ib_wc_status_msg(wc->status),
		       wc->status, wc->vendor_err);
181
	rpcrdma_recv_buffer_put(rep);
182 183
}

184 185 186 187 188 189 190 191
static void
rpcrdma_update_connect_private(struct rpcrdma_xprt *r_xprt,
			       struct rdma_conn_param *param)
{
	struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
	const struct rpcrdma_connect_private *pmsg = param->private_data;
	unsigned int rsize, wsize;

192
	/* Default settings for RPC-over-RDMA Version One */
193
	r_xprt->rx_ia.ri_implicit_roundup = xprt_rdma_pad_optimize;
194 195 196 197 198 199
	rsize = RPCRDMA_V1_DEF_INLINE_SIZE;
	wsize = RPCRDMA_V1_DEF_INLINE_SIZE;

	if (pmsg &&
	    pmsg->cp_magic == rpcrdma_cmp_magic &&
	    pmsg->cp_version == RPCRDMA_CMP_VERSION) {
200
		r_xprt->rx_ia.ri_implicit_roundup = true;
201 202 203 204 205 206 207 208
		rsize = rpcrdma_decode_buffer_size(pmsg->cp_send_size);
		wsize = rpcrdma_decode_buffer_size(pmsg->cp_recv_size);
	}

	if (rsize < cdata->inline_rsize)
		cdata->inline_rsize = rsize;
	if (wsize < cdata->inline_wsize)
		cdata->inline_wsize = wsize;
209 210
	dprintk("RPC:       %s: max send %u, max recv %u\n",
		__func__, cdata->inline_wsize, cdata->inline_rsize);
211 212 213
	rpcrdma_set_max_header_sizes(r_xprt);
}

C
Chuck Lever 已提交
214 215 216 217 218 219 220 221
/**
 * rpcrdma_cm_event_handler - Handle RDMA CM events
 * @id: rdma_cm_id on which an event has occurred
 * @event: details of the event
 *
 * Called with @id's mutex held. Returns 1 if caller should
 * destroy @id, otherwise 0.
 */
222
static int
C
Chuck Lever 已提交
223
rpcrdma_cm_event_handler(struct rdma_cm_id *id, struct rdma_cm_event *event)
224
{
225 226 227 228
	struct rpcrdma_xprt *r_xprt = id->context;
	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
	struct rpcrdma_ep *ep = &r_xprt->rx_ep;
	struct rpc_xprt *xprt = &r_xprt->rx_xprt;
229

C
Chuck Lever 已提交
230 231
	might_sleep();

232
	trace_xprtrdma_cm_event(r_xprt, event);
233 234 235
	switch (event->event) {
	case RDMA_CM_EVENT_ADDR_RESOLVED:
	case RDMA_CM_EVENT_ROUTE_RESOLVED:
236
		ia->ri_async_rc = 0;
237
		complete(&ia->ri_done);
238
		return 0;
239
	case RDMA_CM_EVENT_ADDR_ERROR:
240
		ia->ri_async_rc = -EPROTO;
241
		complete(&ia->ri_done);
242
		return 0;
243 244 245
	case RDMA_CM_EVENT_ROUTE_ERROR:
		ia->ri_async_rc = -ENETUNREACH;
		complete(&ia->ri_done);
246
		return 0;
247 248
	case RDMA_CM_EVENT_DEVICE_REMOVAL:
#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
249
		pr_info("rpcrdma: removing device %s for %s:%s\n",
250
			ia->ri_device->name,
251
			rpcrdma_addrstr(r_xprt), rpcrdma_portstr(r_xprt));
252 253 254
#endif
		set_bit(RPCRDMA_IAF_REMOVING, &ia->ri_flags);
		ep->rep_connected = -ENODEV;
255
		xprt_force_disconnect(xprt);
256 257 258 259 260 261
		wait_for_completion(&ia->ri_remove_done);

		ia->ri_id = NULL;
		ia->ri_device = NULL;
		/* Return 1 to ensure the core destroys the id. */
		return 1;
262
	case RDMA_CM_EVENT_ESTABLISHED:
263
		++xprt->connect_cookie;
264
		ep->rep_connected = 1;
265
		rpcrdma_update_connect_private(r_xprt, &event->param.conn);
266 267
		wake_up_all(&ep->rep_connect_wait);
		break;
268
	case RDMA_CM_EVENT_CONNECT_ERROR:
269
		ep->rep_connected = -ENOTCONN;
270
		goto disconnected;
271
	case RDMA_CM_EVENT_UNREACHABLE:
272
		ep->rep_connected = -ENETUNREACH;
273
		goto disconnected;
274
	case RDMA_CM_EVENT_REJECTED:
275
		dprintk("rpcrdma: connection to %s:%s rejected: %s\n",
276
			rpcrdma_addrstr(r_xprt), rpcrdma_portstr(r_xprt),
277
			rdma_reject_msg(id, event->status));
278
		ep->rep_connected = -ECONNREFUSED;
279
		if (event->status == IB_CM_REJ_STALE_CONN)
280
			ep->rep_connected = -EAGAIN;
281
		goto disconnected;
282
	case RDMA_CM_EVENT_DISCONNECTED:
283
		ep->rep_connected = -ECONNABORTED;
284 285
disconnected:
		xprt_force_disconnect(xprt);
286
		wake_up_all(&ep->rep_connect_wait);
287
		break;
288 289 290 291
	default:
		break;
	}

292
	dprintk("RPC:       %s: %s:%s on %s/frwr: %s\n", __func__,
293
		rpcrdma_addrstr(r_xprt), rpcrdma_portstr(r_xprt),
294
		ia->ri_device->name, rdma_event_msg(event->event));
295 296 297 298
	return 0;
}

static struct rdma_cm_id *
299
rpcrdma_create_id(struct rpcrdma_xprt *xprt, struct rpcrdma_ia *ia)
300
{
301
	unsigned long wtimeout = msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1;
302 303 304
	struct rdma_cm_id *id;
	int rc;

305 306
	trace_xprtrdma_conn_start(xprt);

307
	init_completion(&ia->ri_done);
308
	init_completion(&ia->ri_remove_done);
309

C
Chuck Lever 已提交
310
	id = rdma_create_id(xprt->rx_xprt.xprt_net, rpcrdma_cm_event_handler,
311
			    xprt, RDMA_PS_TCP, IB_QPT_RC);
312
	if (IS_ERR(id))
313 314
		return id;

315
	ia->ri_async_rc = -ETIMEDOUT;
316 317 318
	rc = rdma_resolve_addr(id, NULL,
			       (struct sockaddr *)&xprt->rx_xprt.addr,
			       RDMA_RESOLVE_TIMEOUT);
319
	if (rc)
320
		goto out;
321 322
	rc = wait_for_completion_interruptible_timeout(&ia->ri_done, wtimeout);
	if (rc < 0) {
323
		trace_xprtrdma_conn_tout(xprt);
324 325
		goto out;
	}
326

327 328 329 330
	rc = ia->ri_async_rc;
	if (rc)
		goto out;

331
	ia->ri_async_rc = -ETIMEDOUT;
332
	rc = rdma_resolve_route(id, RDMA_RESOLVE_TIMEOUT);
333
	if (rc)
334
		goto out;
335 336
	rc = wait_for_completion_interruptible_timeout(&ia->ri_done, wtimeout);
	if (rc < 0) {
337
		trace_xprtrdma_conn_tout(xprt);
338
		goto out;
339
	}
340 341
	rc = ia->ri_async_rc;
	if (rc)
342
		goto out;
343 344

	return id;
345

346 347 348 349 350 351 352 353 354
out:
	rdma_destroy_id(id);
	return ERR_PTR(rc);
}

/*
 * Exported functions.
 */

355 356
/**
 * rpcrdma_ia_open - Open and initialize an Interface Adapter.
357
 * @xprt: transport with IA to (re)initialize
358 359 360
 *
 * Returns 0 on success, negative errno if an appropriate
 * Interface Adapter could not be found and opened.
361 362
 */
int
363
rpcrdma_ia_open(struct rpcrdma_xprt *xprt)
364 365
{
	struct rpcrdma_ia *ia = &xprt->rx_ia;
366 367
	int rc;

368
	ia->ri_id = rpcrdma_create_id(xprt, ia);
369 370
	if (IS_ERR(ia->ri_id)) {
		rc = PTR_ERR(ia->ri_id);
371
		goto out_err;
372
	}
373
	ia->ri_device = ia->ri_id->device;
374

375
	ia->ri_pd = ib_alloc_pd(ia->ri_device, 0);
376 377
	if (IS_ERR(ia->ri_pd)) {
		rc = PTR_ERR(ia->ri_pd);
378
		pr_err("rpcrdma: ib_alloc_pd() returned %d\n", rc);
379
		goto out_err;
380 381
	}

382
	switch (xprt_rdma_memreg_strategy) {
383
	case RPCRDMA_FRWR:
384
		if (frwr_is_supported(ia))
385 386
			break;
		/*FALLTHROUGH*/
387
	default:
388 389
		pr_err("rpcrdma: Device %s does not support memreg mode %d\n",
		       ia->ri_device->name, xprt_rdma_memreg_strategy);
390
		rc = -EINVAL;
391
		goto out_err;
392 393 394
	}

	return 0;
395

396 397
out_err:
	rpcrdma_ia_close(ia);
398 399 400
	return rc;
}

401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427
/**
 * rpcrdma_ia_remove - Handle device driver unload
 * @ia: interface adapter being removed
 *
 * Divest transport H/W resources associated with this adapter,
 * but allow it to be restored later.
 */
void
rpcrdma_ia_remove(struct rpcrdma_ia *ia)
{
	struct rpcrdma_xprt *r_xprt = container_of(ia, struct rpcrdma_xprt,
						   rx_ia);
	struct rpcrdma_ep *ep = &r_xprt->rx_ep;
	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
	struct rpcrdma_req *req;
	struct rpcrdma_rep *rep;

	cancel_delayed_work_sync(&buf->rb_refresh_worker);

	/* This is similar to rpcrdma_ep_destroy, but:
	 * - Don't cancel the connect worker.
	 * - Don't call rpcrdma_ep_disconnect, which waits
	 *   for another conn upcall, which will deadlock.
	 * - rdma_disconnect is unneeded, the underlying
	 *   connection is already gone.
	 */
	if (ia->ri_id->qp) {
428
		rpcrdma_xprt_drain(r_xprt);
429 430 431 432
		rdma_destroy_qp(ia->ri_id);
		ia->ri_id->qp = NULL;
	}
	ib_free_cq(ep->rep_attr.recv_cq);
433
	ep->rep_attr.recv_cq = NULL;
434
	ib_free_cq(ep->rep_attr.send_cq);
435
	ep->rep_attr.send_cq = NULL;
436 437 438 439 440 441 442 443 444 445 446

	/* The ULP is responsible for ensuring all DMA
	 * mappings and MRs are gone.
	 */
	list_for_each_entry(rep, &buf->rb_recv_bufs, rr_list)
		rpcrdma_dma_unmap_regbuf(rep->rr_rdmabuf);
	list_for_each_entry(req, &buf->rb_allreqs, rl_all) {
		rpcrdma_dma_unmap_regbuf(req->rl_rdmabuf);
		rpcrdma_dma_unmap_regbuf(req->rl_sendbuf);
		rpcrdma_dma_unmap_regbuf(req->rl_recvbuf);
	}
C
Chuck Lever 已提交
447
	rpcrdma_mrs_destroy(buf);
448 449
	ib_dealloc_pd(ia->ri_pd);
	ia->ri_pd = NULL;
450 451 452

	/* Allow waiters to continue */
	complete(&ia->ri_remove_done);
453 454

	trace_xprtrdma_remove(r_xprt);
455 456
}

457 458 459 460
/**
 * rpcrdma_ia_close - Clean up/close an IA.
 * @ia: interface adapter to close
 *
461 462 463 464
 */
void
rpcrdma_ia_close(struct rpcrdma_ia *ia)
{
465 466 467
	if (ia->ri_id != NULL && !IS_ERR(ia->ri_id)) {
		if (ia->ri_id->qp)
			rdma_destroy_qp(ia->ri_id);
468
		rdma_destroy_id(ia->ri_id);
469
	}
470 471
	ia->ri_id = NULL;
	ia->ri_device = NULL;
472 473 474

	/* If the pd is still busy, xprtrdma missed freeing a resource */
	if (ia->ri_pd && !IS_ERR(ia->ri_pd))
475
		ib_dealloc_pd(ia->ri_pd);
476
	ia->ri_pd = NULL;
477 478 479 480 481 482 483
}

/*
 * Create unconnected endpoint.
 */
int
rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
484
		  struct rpcrdma_create_data_internal *cdata)
485
{
486
	struct rpcrdma_connect_private *pmsg = &ep->rep_cm_private;
487
	struct ib_cq *sendcq, *recvcq;
488
	unsigned int max_sge;
489
	int rc;
490

491
	max_sge = min_t(unsigned int, ia->ri_device->attrs.max_send_sge,
492
			RPCRDMA_MAX_SEND_SGES);
493 494
	if (max_sge < RPCRDMA_MIN_SEND_SGES) {
		pr_warn("rpcrdma: HCA provides only %d send SGEs\n", max_sge);
495 496
		return -ENOMEM;
	}
497
	ia->ri_max_send_sges = max_sge;
498

499
	rc = frwr_open(ia, ep, cdata);
500 501
	if (rc)
		return rc;
502

503
	ep->rep_attr.event_handler = rpcrdma_qp_event_handler;
504 505
	ep->rep_attr.qp_context = ep;
	ep->rep_attr.srq = NULL;
506
	ep->rep_attr.cap.max_send_sge = max_sge;
507 508 509 510 511 512 513 514 515 516 517 518 519 520 521
	ep->rep_attr.cap.max_recv_sge = 1;
	ep->rep_attr.cap.max_inline_data = 0;
	ep->rep_attr.sq_sig_type = IB_SIGNAL_REQ_WR;
	ep->rep_attr.qp_type = IB_QPT_RC;
	ep->rep_attr.port_num = ~0;

	dprintk("RPC:       %s: requested max: dtos: send %d recv %d; "
		"iovs: send %d recv %d\n",
		__func__,
		ep->rep_attr.cap.max_send_wr,
		ep->rep_attr.cap.max_recv_wr,
		ep->rep_attr.cap.max_send_sge,
		ep->rep_attr.cap.max_recv_sge);

	/* set trigger for requesting send completion */
522 523 524
	ep->rep_send_batch = min_t(unsigned int, RPCRDMA_MAX_SEND_BATCH,
				   cdata->max_requests >> 2);
	ep->rep_send_count = ep->rep_send_batch;
525
	init_waitqueue_head(&ep->rep_connect_wait);
526
	ep->rep_receive_count = 0;
527

528 529
	sendcq = ib_alloc_cq(ia->ri_device, NULL,
			     ep->rep_attr.cap.max_send_wr + 1,
530 531
			     ia->ri_device->num_comp_vectors > 1 ? 1 : 0,
			     IB_POLL_WORKQUEUE);
532 533
	if (IS_ERR(sendcq)) {
		rc = PTR_ERR(sendcq);
534 535 536
		goto out1;
	}

537 538
	recvcq = ib_alloc_cq(ia->ri_device, NULL,
			     ep->rep_attr.cap.max_recv_wr + 1,
539
			     0, IB_POLL_WORKQUEUE);
540 541 542 543 544 545 546
	if (IS_ERR(recvcq)) {
		rc = PTR_ERR(recvcq);
		goto out2;
	}

	ep->rep_attr.send_cq = sendcq;
	ep->rep_attr.recv_cq = recvcq;
547 548

	/* Initialize cma parameters */
549
	memset(&ep->rep_remote_cma, 0, sizeof(ep->rep_remote_cma));
550

551 552 553
	/* Prepare RDMA-CM private message */
	pmsg->cp_magic = rpcrdma_cmp_magic;
	pmsg->cp_version = RPCRDMA_CMP_VERSION;
554
	pmsg->cp_flags |= RPCRDMA_CMP_F_SND_W_INV_OK;
555 556 557 558
	pmsg->cp_send_size = rpcrdma_encode_buffer_size(cdata->inline_wsize);
	pmsg->cp_recv_size = rpcrdma_encode_buffer_size(cdata->inline_rsize);
	ep->rep_remote_cma.private_data = pmsg;
	ep->rep_remote_cma.private_data_len = sizeof(*pmsg);
559 560

	/* Client offers RDMA Read but does not initiate */
561
	ep->rep_remote_cma.initiator_depth = 0;
562 563
	ep->rep_remote_cma.responder_resources =
		min_t(int, U8_MAX, ia->ri_device->attrs.max_qp_rd_atom);
564

565 566 567 568 569 570 571 572 573 574
	/* Limit transport retries so client can detect server
	 * GID changes quickly. RPC layer handles re-establishing
	 * transport connection and retransmission.
	 */
	ep->rep_remote_cma.retry_count = 6;

	/* RPC-over-RDMA handles its own flow control. In addition,
	 * make all RNR NAKs visible so we know that RPC-over-RDMA
	 * flow control is working correctly (no NAKs should be seen).
	 */
575 576 577 578 579 580
	ep->rep_remote_cma.flow_control = 0;
	ep->rep_remote_cma.rnr_retry_count = 0;

	return 0;

out2:
581
	ib_free_cq(sendcq);
582 583 584 585 586 587 588 589 590 591 592
out1:
	return rc;
}

/*
 * rpcrdma_ep_destroy
 *
 * Disconnect and destroy endpoint. After this, the only
 * valid operations on the ep are to free it (if dynamically
 * allocated) or re-create it.
 */
593
void
594 595
rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
{
596
	if (ia->ri_id && ia->ri_id->qp) {
597
		rpcrdma_ep_disconnect(ep, ia);
598 599
		rdma_destroy_qp(ia->ri_id);
		ia->ri_id->qp = NULL;
600 601
	}

602 603 604 605
	if (ep->rep_attr.recv_cq)
		ib_free_cq(ep->rep_attr.recv_cq);
	if (ep->rep_attr.send_cq)
		ib_free_cq(ep->rep_attr.send_cq);
606 607
}

608 609 610 611 612 613 614 615 616 617
/* Re-establish a connection after a device removal event.
 * Unlike a normal reconnection, a fresh PD and a new set
 * of MRs and buffers is needed.
 */
static int
rpcrdma_ep_recreate_xprt(struct rpcrdma_xprt *r_xprt,
			 struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
{
	int rc, err;

618
	trace_xprtrdma_reinsert(r_xprt);
619 620

	rc = -EHOSTUNREACH;
621
	if (rpcrdma_ia_open(r_xprt))
622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637
		goto out1;

	rc = -ENOMEM;
	err = rpcrdma_ep_create(ep, ia, &r_xprt->rx_data);
	if (err) {
		pr_err("rpcrdma: rpcrdma_ep_create returned %d\n", err);
		goto out2;
	}

	rc = -ENETUNREACH;
	err = rdma_create_qp(ia->ri_id, ia->ri_pd, &ep->rep_attr);
	if (err) {
		pr_err("rpcrdma: rdma_create_qp returned %d\n", err);
		goto out3;
	}

C
Chuck Lever 已提交
638
	rpcrdma_mrs_create(r_xprt);
639 640 641 642 643 644 645 646 647 648
	return 0;

out3:
	rpcrdma_ep_destroy(ep, ia);
out2:
	rpcrdma_ia_close(ia);
out1:
	return rc;
}

649 650 651 652 653 654 655
static int
rpcrdma_ep_reconnect(struct rpcrdma_xprt *r_xprt, struct rpcrdma_ep *ep,
		     struct rpcrdma_ia *ia)
{
	struct rdma_cm_id *id, *old;
	int err, rc;

656
	trace_xprtrdma_reconnect(r_xprt);
657 658 659 660

	rpcrdma_ep_disconnect(ep, ia);

	rc = -EHOSTUNREACH;
661
	id = rpcrdma_create_id(r_xprt, ia);
662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680
	if (IS_ERR(id))
		goto out;

	/* As long as the new ID points to the same device as the
	 * old ID, we can reuse the transport's existing PD and all
	 * previously allocated MRs. Also, the same device means
	 * the transport's previous DMA mappings are still valid.
	 *
	 * This is a sanity check only. There should be no way these
	 * point to two different devices here.
	 */
	old = id;
	rc = -ENETUNREACH;
	if (ia->ri_device != id->device) {
		pr_err("rpcrdma: can't reconnect on different device!\n");
		goto out_destroy;
	}

	err = rdma_create_qp(id, ia->ri_pd, &ep->rep_attr);
681
	if (err)
682 683 684 685 686 687 688 689 690
		goto out_destroy;

	/* Atomically replace the transport's ID and QP. */
	rc = 0;
	old = ia->ri_id;
	ia->ri_id = id;
	rdma_destroy_qp(old);

out_destroy:
691
	rdma_destroy_id(old);
692 693 694 695
out:
	return rc;
}

696 697 698 699 700 701
/*
 * Connect unconnected endpoint.
 */
int
rpcrdma_ep_connect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
{
702 703
	struct rpcrdma_xprt *r_xprt = container_of(ia, struct rpcrdma_xprt,
						   rx_ia);
704
	struct rpc_xprt *xprt = &r_xprt->rx_xprt;
705
	int rc;
706 707

retry:
708 709
	switch (ep->rep_connected) {
	case 0:
710 711 712
		dprintk("RPC:       %s: connecting...\n", __func__);
		rc = rdma_create_qp(ia->ri_id, ia->ri_pd, &ep->rep_attr);
		if (rc) {
713 714
			rc = -ENETUNREACH;
			goto out_noupdate;
715
		}
716
		break;
717 718 719 720 721
	case -ENODEV:
		rc = rpcrdma_ep_recreate_xprt(r_xprt, ep, ia);
		if (rc)
			goto out_noupdate;
		break;
722 723 724 725
	default:
		rc = rpcrdma_ep_reconnect(r_xprt, ep, ia);
		if (rc)
			goto out;
726 727 728
	}

	ep->rep_connected = 0;
729 730
	xprt_clear_connected(xprt);

731
	rpcrdma_post_recvs(r_xprt, true);
732 733

	rc = rdma_connect(ia->ri_id, &ep->rep_remote_cma);
734
	if (rc)
735 736 737 738
		goto out;

	wait_event_interruptible(ep->rep_connect_wait, ep->rep_connected != 0);
	if (ep->rep_connected <= 0) {
739
		if (ep->rep_connected == -EAGAIN)
740 741
			goto retry;
		rc = ep->rep_connected;
742
		goto out;
743 744
	}

745
	dprintk("RPC:       %s: connected\n", __func__);
746

747 748 749
out:
	if (rc)
		ep->rep_connected = rc;
750 751

out_noupdate:
752 753 754
	return rc;
}

755 756 757 758
/**
 * rpcrdma_ep_disconnect - Disconnect underlying transport
 * @ep: endpoint to disconnect
 * @ia: associated interface adapter
759 760 761 762 763 764 765
 *
 * This is separate from destroy to facilitate the ability
 * to reconnect without recreating the endpoint.
 *
 * This call is not reentrant, and must not be made in parallel
 * on the same endpoint.
 */
766
void
767 768
rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
{
769 770
	struct rpcrdma_xprt *r_xprt = container_of(ep, struct rpcrdma_xprt,
						   rx_ep);
771 772
	int rc;

773
	/* returns without wait if ID is not connected */
774
	rc = rdma_disconnect(ia->ri_id);
775
	if (!rc)
776 777
		wait_event_interruptible(ep->rep_connect_wait,
							ep->rep_connected != 1);
778
	else
779
		ep->rep_connected = rc;
780
	trace_xprtrdma_disconnect(r_xprt, rc);
781

782
	rpcrdma_xprt_drain(r_xprt);
783 784
}

785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848
/* Fixed-size circular FIFO queue. This implementation is wait-free and
 * lock-free.
 *
 * Consumer is the code path that posts Sends. This path dequeues a
 * sendctx for use by a Send operation. Multiple consumer threads
 * are serialized by the RPC transport lock, which allows only one
 * ->send_request call at a time.
 *
 * Producer is the code path that handles Send completions. This path
 * enqueues a sendctx that has been completed. Multiple producer
 * threads are serialized by the ib_poll_cq() function.
 */

/* rpcrdma_sendctxs_destroy() assumes caller has already quiesced
 * queue activity, and ib_drain_qp has flushed all remaining Send
 * requests.
 */
static void rpcrdma_sendctxs_destroy(struct rpcrdma_buffer *buf)
{
	unsigned long i;

	for (i = 0; i <= buf->rb_sc_last; i++)
		kfree(buf->rb_sc_ctxs[i]);
	kfree(buf->rb_sc_ctxs);
}

static struct rpcrdma_sendctx *rpcrdma_sendctx_create(struct rpcrdma_ia *ia)
{
	struct rpcrdma_sendctx *sc;

	sc = kzalloc(sizeof(*sc) +
		     ia->ri_max_send_sges * sizeof(struct ib_sge),
		     GFP_KERNEL);
	if (!sc)
		return NULL;

	sc->sc_wr.wr_cqe = &sc->sc_cqe;
	sc->sc_wr.sg_list = sc->sc_sges;
	sc->sc_wr.opcode = IB_WR_SEND;
	sc->sc_cqe.done = rpcrdma_wc_send;
	return sc;
}

static int rpcrdma_sendctxs_create(struct rpcrdma_xprt *r_xprt)
{
	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
	struct rpcrdma_sendctx *sc;
	unsigned long i;

	/* Maximum number of concurrent outstanding Send WRs. Capping
	 * the circular queue size stops Send Queue overflow by causing
	 * the ->send_request call to fail temporarily before too many
	 * Sends are posted.
	 */
	i = buf->rb_max_requests + RPCRDMA_MAX_BC_REQUESTS;
	dprintk("RPC:       %s: allocating %lu send_ctxs\n", __func__, i);
	buf->rb_sc_ctxs = kcalloc(i, sizeof(sc), GFP_KERNEL);
	if (!buf->rb_sc_ctxs)
		return -ENOMEM;

	buf->rb_sc_last = i - 1;
	for (i = 0; i <= buf->rb_sc_last; i++) {
		sc = rpcrdma_sendctx_create(&r_xprt->rx_ia);
		if (!sc)
849
			return -ENOMEM;
850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906

		sc->sc_xprt = r_xprt;
		buf->rb_sc_ctxs[i] = sc;
	}

	return 0;
}

/* The sendctx queue is not guaranteed to have a size that is a
 * power of two, thus the helpers in circ_buf.h cannot be used.
 * The other option is to use modulus (%), which can be expensive.
 */
static unsigned long rpcrdma_sendctx_next(struct rpcrdma_buffer *buf,
					  unsigned long item)
{
	return likely(item < buf->rb_sc_last) ? item + 1 : 0;
}

/**
 * rpcrdma_sendctx_get_locked - Acquire a send context
 * @buf: transport buffers from which to acquire an unused context
 *
 * Returns pointer to a free send completion context; or NULL if
 * the queue is empty.
 *
 * Usage: Called to acquire an SGE array before preparing a Send WR.
 *
 * The caller serializes calls to this function (per rpcrdma_buffer),
 * and provides an effective memory barrier that flushes the new value
 * of rb_sc_head.
 */
struct rpcrdma_sendctx *rpcrdma_sendctx_get_locked(struct rpcrdma_buffer *buf)
{
	struct rpcrdma_xprt *r_xprt;
	struct rpcrdma_sendctx *sc;
	unsigned long next_head;

	next_head = rpcrdma_sendctx_next(buf, buf->rb_sc_head);

	if (next_head == READ_ONCE(buf->rb_sc_tail))
		goto out_emptyq;

	/* ORDER: item must be accessed _before_ head is updated */
	sc = buf->rb_sc_ctxs[next_head];

	/* Releasing the lock in the caller acts as a memory
	 * barrier that flushes rb_sc_head.
	 */
	buf->rb_sc_head = next_head;

	return sc;

out_emptyq:
	/* The queue is "empty" if there have not been enough Send
	 * completions recently. This is a sign the Send Queue is
	 * backing up. Cause the caller to pause and try again.
	 */
907
	set_bit(RPCRDMA_BUF_F_EMPTY_SCQ, &buf->rb_flags);
908 909 910 911 912 913 914 915 916 917 918 919 920 921
	r_xprt = container_of(buf, struct rpcrdma_xprt, rx_buf);
	r_xprt->rx_stats.empty_sendctx_q++;
	return NULL;
}

/**
 * rpcrdma_sendctx_put_locked - Release a send context
 * @sc: send context to release
 *
 * Usage: Called from Send completion to return a sendctxt
 * to the queue.
 *
 * The caller serializes calls to this function (per rpcrdma_buffer).
 */
922 923
static void
rpcrdma_sendctx_put_locked(struct rpcrdma_sendctx *sc)
924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941
{
	struct rpcrdma_buffer *buf = &sc->sc_xprt->rx_buf;
	unsigned long next_tail;

	/* Unmap SGEs of previously completed by unsignaled
	 * Sends by walking up the queue until @sc is found.
	 */
	next_tail = buf->rb_sc_tail;
	do {
		next_tail = rpcrdma_sendctx_next(buf, next_tail);

		/* ORDER: item must be accessed _before_ tail is updated */
		rpcrdma_unmap_sendctx(buf->rb_sc_ctxs[next_tail]);

	} while (buf->rb_sc_ctxs[next_tail] != sc);

	/* Paired with READ_ONCE */
	smp_store_release(&buf->rb_sc_tail, next_tail);
942 943 944 945 946

	if (test_and_clear_bit(RPCRDMA_BUF_F_EMPTY_SCQ, &buf->rb_flags)) {
		smp_mb__after_atomic();
		xprt_write_space(&sc->sc_xprt->rx_xprt);
	}
947 948
}

C
Chuck Lever 已提交
949
static void
C
Chuck Lever 已提交
950
rpcrdma_mrs_create(struct rpcrdma_xprt *r_xprt)
C
Chuck Lever 已提交
951 952 953 954 955 956 957
{
	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
	unsigned int count;
	LIST_HEAD(free);
	LIST_HEAD(all);

C
Chuck Lever 已提交
958
	for (count = 0; count < ia->ri_max_segs; count++) {
C
Chuck Lever 已提交
959
		struct rpcrdma_mr *mr;
C
Chuck Lever 已提交
960 961
		int rc;

C
Chuck Lever 已提交
962 963
		mr = kzalloc(sizeof(*mr), GFP_KERNEL);
		if (!mr)
C
Chuck Lever 已提交
964 965
			break;

966
		rc = frwr_init_mr(ia, mr);
C
Chuck Lever 已提交
967
		if (rc) {
C
Chuck Lever 已提交
968
			kfree(mr);
C
Chuck Lever 已提交
969 970 971
			break;
		}

C
Chuck Lever 已提交
972
		mr->mr_xprt = r_xprt;
C
Chuck Lever 已提交
973

C
Chuck Lever 已提交
974 975
		list_add(&mr->mr_list, &free);
		list_add(&mr->mr_all, &all);
C
Chuck Lever 已提交
976 977
	}

C
Chuck Lever 已提交
978 979
	spin_lock(&buf->rb_mrlock);
	list_splice(&free, &buf->rb_mrs);
C
Chuck Lever 已提交
980 981
	list_splice(&all, &buf->rb_all);
	r_xprt->rx_stats.mrs_allocated += count;
C
Chuck Lever 已提交
982
	spin_unlock(&buf->rb_mrlock);
983
	trace_xprtrdma_createmrs(r_xprt, count);
984 985

	xprt_write_space(&r_xprt->rx_xprt);
C
Chuck Lever 已提交
986 987 988 989 990 991 992 993 994 995
}

static void
rpcrdma_mr_refresh_worker(struct work_struct *work)
{
	struct rpcrdma_buffer *buf = container_of(work, struct rpcrdma_buffer,
						  rb_refresh_worker.work);
	struct rpcrdma_xprt *r_xprt = container_of(buf, struct rpcrdma_xprt,
						   rx_buf);

C
Chuck Lever 已提交
996
	rpcrdma_mrs_create(r_xprt);
C
Chuck Lever 已提交
997 998
}

999
struct rpcrdma_req *
1000 1001
rpcrdma_create_req(struct rpcrdma_xprt *r_xprt)
{
1002
	struct rpcrdma_buffer *buffer = &r_xprt->rx_buf;
1003
	struct rpcrdma_regbuf *rb;
1004 1005
	struct rpcrdma_req *req;

1006
	req = kzalloc(sizeof(*req), GFP_KERNEL);
1007
	if (req == NULL)
1008
		return ERR_PTR(-ENOMEM);
1009

1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020
	rb = rpcrdma_alloc_regbuf(RPCRDMA_HDRBUF_SIZE,
				  DMA_TO_DEVICE, GFP_KERNEL);
	if (IS_ERR(rb)) {
		kfree(req);
		return ERR_PTR(-ENOMEM);
	}
	req->rl_rdmabuf = rb;
	xdr_buf_init(&req->rl_hdrbuf, rb->rg_base, rdmab_length(rb));
	req->rl_buffer = buffer;
	INIT_LIST_HEAD(&req->rl_registered);

1021
	spin_lock(&buffer->rb_lock);
1022
	list_add(&req->rl_all, &buffer->rb_allreqs);
1023
	spin_unlock(&buffer->rb_lock);
1024 1025 1026
	return req;
}

1027 1028
static int
rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt, bool temp)
1029 1030
{
	struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
1031
	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
1032 1033 1034 1035
	struct rpcrdma_rep *rep;
	int rc;

	rc = -ENOMEM;
1036
	rep = kzalloc(sizeof(*rep), GFP_KERNEL);
1037 1038 1039
	if (rep == NULL)
		goto out;

1040
	rep->rr_rdmabuf = rpcrdma_alloc_regbuf(cdata->inline_rsize,
1041
					       DMA_FROM_DEVICE, GFP_KERNEL);
1042 1043
	if (IS_ERR(rep->rr_rdmabuf)) {
		rc = PTR_ERR(rep->rr_rdmabuf);
1044
		goto out_free;
1045
	}
1046 1047
	xdr_buf_init(&rep->rr_hdrbuf, rep->rr_rdmabuf->rg_base,
		     rdmab_length(rep->rr_rdmabuf));
1048

1049
	rep->rr_cqe.done = rpcrdma_wc_receive;
1050
	rep->rr_rxprt = r_xprt;
1051
	INIT_WORK(&rep->rr_work, rpcrdma_deferred_completion);
1052 1053 1054 1055
	rep->rr_recv_wr.next = NULL;
	rep->rr_recv_wr.wr_cqe = &rep->rr_cqe;
	rep->rr_recv_wr.sg_list = &rep->rr_rdmabuf->rg_iov;
	rep->rr_recv_wr.num_sge = 1;
1056
	rep->rr_temp = temp;
1057 1058 1059 1060 1061

	spin_lock(&buf->rb_lock);
	list_add(&rep->rr_list, &buf->rb_recv_bufs);
	spin_unlock(&buf->rb_lock);
	return 0;
1062 1063 1064 1065

out_free:
	kfree(rep);
out:
1066
	return rc;
1067 1068
}

1069
int
1070
rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
1071
{
1072
	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
1073 1074
	int i, rc;

1075
	buf->rb_flags = 0;
1076
	buf->rb_max_requests = r_xprt->rx_data.max_requests;
1077
	buf->rb_bc_srv_max_requests = 0;
C
Chuck Lever 已提交
1078
	spin_lock_init(&buf->rb_mrlock);
1079
	spin_lock_init(&buf->rb_lock);
C
Chuck Lever 已提交
1080
	INIT_LIST_HEAD(&buf->rb_mrs);
C
Chuck Lever 已提交
1081 1082 1083
	INIT_LIST_HEAD(&buf->rb_all);
	INIT_DELAYED_WORK(&buf->rb_refresh_worker,
			  rpcrdma_mr_refresh_worker);
1084

C
Chuck Lever 已提交
1085
	rpcrdma_mrs_create(r_xprt);
1086

1087
	INIT_LIST_HEAD(&buf->rb_send_bufs);
1088
	INIT_LIST_HEAD(&buf->rb_allreqs);
1089 1090 1091
	for (i = 0; i < buf->rb_max_requests; i++) {
		struct rpcrdma_req *req;

1092 1093
		req = rpcrdma_create_req(r_xprt);
		if (IS_ERR(req)) {
1094 1095
			dprintk("RPC:       %s: request buffer %d alloc"
				" failed\n", __func__, i);
1096
			rc = PTR_ERR(req);
1097 1098
			goto out;
		}
1099
		list_add(&req->rl_list, &buf->rb_send_bufs);
1100 1101
	}

1102
	buf->rb_credits = 1;
1103
	INIT_LIST_HEAD(&buf->rb_recv_bufs);
1104

1105 1106 1107 1108
	rc = rpcrdma_sendctxs_create(r_xprt);
	if (rc)
		goto out;

1109 1110 1111 1112
	buf->rb_completion_wq = alloc_workqueue("rpcrdma-%s",
						WQ_MEM_RECLAIM | WQ_HIGHPRI,
						0,
			r_xprt->rx_xprt.address_strings[RPC_DISPLAY_ADDR]);
1113 1114
	if (!buf->rb_completion_wq) {
		rc = -ENOMEM;
1115
		goto out;
1116
	}
1117

1118 1119 1120 1121 1122 1123
	return 0;
out:
	rpcrdma_buffer_destroy(buf);
	return rc;
}

1124
static void
1125
rpcrdma_destroy_rep(struct rpcrdma_rep *rep)
1126
{
1127
	rpcrdma_free_regbuf(rep->rr_rdmabuf);
1128 1129 1130
	kfree(rep);
}

1131 1132 1133 1134 1135 1136 1137
/**
 * rpcrdma_req_destroy - Destroy an rpcrdma_req object
 * @req: unused object to be destroyed
 *
 * This function assumes that the caller prevents concurrent device
 * unload and transport tear-down.
 */
1138
void
1139
rpcrdma_req_destroy(struct rpcrdma_req *req)
1140
{
1141 1142
	list_del(&req->rl_all);

1143 1144 1145
	rpcrdma_free_regbuf(req->rl_recvbuf);
	rpcrdma_free_regbuf(req->rl_sendbuf);
	rpcrdma_free_regbuf(req->rl_rdmabuf);
1146 1147 1148
	kfree(req);
}

C
Chuck Lever 已提交
1149
static void
C
Chuck Lever 已提交
1150
rpcrdma_mrs_destroy(struct rpcrdma_buffer *buf)
C
Chuck Lever 已提交
1151 1152 1153
{
	struct rpcrdma_xprt *r_xprt = container_of(buf, struct rpcrdma_xprt,
						   rx_buf);
C
Chuck Lever 已提交
1154
	struct rpcrdma_mr *mr;
C
Chuck Lever 已提交
1155 1156 1157
	unsigned int count;

	count = 0;
C
Chuck Lever 已提交
1158
	spin_lock(&buf->rb_mrlock);
C
Chuck Lever 已提交
1159
	while (!list_empty(&buf->rb_all)) {
C
Chuck Lever 已提交
1160 1161
		mr = list_entry(buf->rb_all.next, struct rpcrdma_mr, mr_all);
		list_del(&mr->mr_all);
C
Chuck Lever 已提交
1162

C
Chuck Lever 已提交
1163
		spin_unlock(&buf->rb_mrlock);
1164 1165 1166 1167 1168

		/* Ensure MW is not on any rl_registered list */
		if (!list_empty(&mr->mr_list))
			list_del(&mr->mr_list);

1169
		frwr_release_mr(mr);
C
Chuck Lever 已提交
1170
		count++;
C
Chuck Lever 已提交
1171
		spin_lock(&buf->rb_mrlock);
C
Chuck Lever 已提交
1172
	}
C
Chuck Lever 已提交
1173
	spin_unlock(&buf->rb_mrlock);
C
Chuck Lever 已提交
1174 1175 1176 1177 1178
	r_xprt->rx_stats.mrs_allocated = 0;

	dprintk("RPC:       %s: released %u MRs\n", __func__, count);
}

1179 1180 1181 1182 1183 1184 1185 1186
/**
 * rpcrdma_buffer_destroy - Release all hw resources
 * @buf: root control block for resources
 *
 * ORDERING: relies on a prior ib_drain_qp :
 * - No more Send or Receive completions can occur
 * - All MRs, reps, and reqs are returned to their free lists
 */
1187 1188 1189
void
rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
{
1190
	cancel_delayed_work_sync(&buf->rb_refresh_worker);
1191

1192 1193 1194 1195 1196
	if (buf->rb_completion_wq) {
		destroy_workqueue(buf->rb_completion_wq);
		buf->rb_completion_wq = NULL;
	}

1197 1198
	rpcrdma_sendctxs_destroy(buf);

1199 1200
	while (!list_empty(&buf->rb_recv_bufs)) {
		struct rpcrdma_rep *rep;
1201

1202 1203 1204
		rep = list_first_entry(&buf->rb_recv_bufs,
				       struct rpcrdma_rep, rr_list);
		list_del(&rep->rr_list);
1205
		rpcrdma_destroy_rep(rep);
1206 1207
	}

1208
	while (!list_empty(&buf->rb_send_bufs)) {
1209
		struct rpcrdma_req *req;
A
Allen Andrews 已提交
1210

1211 1212 1213 1214
		req = list_first_entry(&buf->rb_send_bufs,
				       struct rpcrdma_req, rl_list);
		list_del(&req->rl_list);
		rpcrdma_req_destroy(req);
1215
	}
A
Allen Andrews 已提交
1216

C
Chuck Lever 已提交
1217
	rpcrdma_mrs_destroy(buf);
1218 1219
}

C
Chuck Lever 已提交
1220 1221 1222 1223 1224 1225 1226 1227 1228
/**
 * rpcrdma_mr_get - Allocate an rpcrdma_mr object
 * @r_xprt: controlling transport
 *
 * Returns an initialized rpcrdma_mr or NULL if no free
 * rpcrdma_mr objects are available.
 */
struct rpcrdma_mr *
rpcrdma_mr_get(struct rpcrdma_xprt *r_xprt)
1229
{
1230
	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
C
Chuck Lever 已提交
1231
	struct rpcrdma_mr *mr = NULL;
1232

C
Chuck Lever 已提交
1233 1234 1235 1236
	spin_lock(&buf->rb_mrlock);
	if (!list_empty(&buf->rb_mrs))
		mr = rpcrdma_mr_pop(&buf->rb_mrs);
	spin_unlock(&buf->rb_mrlock);
1237

C
Chuck Lever 已提交
1238 1239 1240
	if (!mr)
		goto out_nomrs;
	return mr;
C
Chuck Lever 已提交
1241

C
Chuck Lever 已提交
1242
out_nomrs:
1243
	trace_xprtrdma_nomrs(r_xprt);
1244 1245
	if (r_xprt->rx_ep.rep_connected != -ENODEV)
		schedule_delayed_work(&buf->rb_refresh_worker, 0);
C
Chuck Lever 已提交
1246 1247 1248 1249 1250

	/* Allow the reply handler and refresh worker to run */
	cond_resched();

	return NULL;
1251 1252
}

1253 1254 1255 1256 1257 1258 1259 1260
static void
__rpcrdma_mr_put(struct rpcrdma_buffer *buf, struct rpcrdma_mr *mr)
{
	spin_lock(&buf->rb_mrlock);
	rpcrdma_mr_push(mr, &buf->rb_mrs);
	spin_unlock(&buf->rb_mrlock);
}

C
Chuck Lever 已提交
1261 1262 1263 1264 1265
/**
 * rpcrdma_mr_put - Release an rpcrdma_mr object
 * @mr: object to release
 *
 */
1266
void
C
Chuck Lever 已提交
1267
rpcrdma_mr_put(struct rpcrdma_mr *mr)
1268 1269 1270 1271 1272 1273 1274 1275 1276 1277 1278
{
	__rpcrdma_mr_put(&mr->mr_xprt->rx_buf, mr);
}

/**
 * rpcrdma_mr_unmap_and_put - DMA unmap an MR and release it
 * @mr: object to release
 *
 */
void
rpcrdma_mr_unmap_and_put(struct rpcrdma_mr *mr)
1279
{
C
Chuck Lever 已提交
1280
	struct rpcrdma_xprt *r_xprt = mr->mr_xprt;
1281

1282 1283 1284 1285 1286 1287
	if (mr->mr_dir != DMA_NONE) {
		trace_xprtrdma_mr_unmap(mr);
		ib_dma_unmap_sg(r_xprt->rx_ia.ri_device,
				mr->mr_sg, mr->mr_nents, mr->mr_dir);
		mr->mr_dir = DMA_NONE;
	}
1288
	__rpcrdma_mr_put(&r_xprt->rx_buf, mr);
1289 1290
}

1291 1292 1293
/**
 * rpcrdma_buffer_get - Get a request buffer
 * @buffers: Buffer pool from which to obtain a buffer
1294
 *
1295
 * Returns a fresh rpcrdma_req, or NULL if none are available.
1296 1297 1298 1299 1300
 */
struct rpcrdma_req *
rpcrdma_buffer_get(struct rpcrdma_buffer *buffers)
{
	struct rpcrdma_req *req;
1301

1302
	spin_lock(&buffers->rb_lock);
1303 1304 1305 1306
	req = list_first_entry_or_null(&buffers->rb_send_bufs,
				       struct rpcrdma_req, rl_list);
	if (req)
		list_del_init(&req->rl_list);
1307
	spin_unlock(&buffers->rb_lock);
1308
	return req;
1309 1310
}

1311 1312 1313 1314
/**
 * rpcrdma_buffer_put - Put request/reply buffers back into pool
 * @req: object to return
 *
1315 1316 1317 1318 1319
 */
void
rpcrdma_buffer_put(struct rpcrdma_req *req)
{
	struct rpcrdma_buffer *buffers = req->rl_buffer;
1320
	struct rpcrdma_rep *rep = req->rl_reply;
1321

1322 1323
	req->rl_reply = NULL;

1324
	spin_lock(&buffers->rb_lock);
1325
	list_add(&req->rl_list, &buffers->rb_send_bufs);
1326
	if (rep) {
1327 1328 1329 1330
		if (!rep->rr_temp) {
			list_add(&rep->rr_list, &buffers->rb_recv_bufs);
			rep = NULL;
		}
1331
	}
1332
	spin_unlock(&buffers->rb_lock);
1333 1334
	if (rep)
		rpcrdma_destroy_rep(rep);
1335 1336 1337 1338
}

/*
 * Put reply buffers back into pool when not attached to
1339
 * request. This happens in error conditions.
1340 1341 1342 1343
 */
void
rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep)
{
1344
	struct rpcrdma_buffer *buffers = &rep->rr_rxprt->rx_buf;
1345

1346 1347 1348 1349 1350 1351 1352
	if (!rep->rr_temp) {
		spin_lock(&buffers->rb_lock);
		list_add(&rep->rr_list, &buffers->rb_recv_bufs);
		spin_unlock(&buffers->rb_lock);
	} else {
		rpcrdma_destroy_rep(rep);
	}
1353 1354
}

1355
/**
1356
 * rpcrdma_alloc_regbuf - allocate and DMA-map memory for SEND/RECV buffers
1357
 * @size: size of buffer to be allocated, in bytes
1358
 * @direction: direction of data movement
1359 1360
 * @flags: GFP flags
 *
1361 1362
 * Returns an ERR_PTR, or a pointer to a regbuf, a buffer that
 * can be persistently DMA-mapped for I/O.
1363 1364
 *
 * xprtrdma uses a regbuf for posting an outgoing RDMA SEND, or for
1365
 * receiving the payload of RDMA RECV operations. During Long Calls
1366
 * or Replies they may be registered externally via frwr_map.
1367 1368
 */
struct rpcrdma_regbuf *
1369 1370
rpcrdma_alloc_regbuf(size_t size, enum dma_data_direction direction,
		     gfp_t flags)
1371 1372 1373 1374 1375
{
	struct rpcrdma_regbuf *rb;

	rb = kmalloc(sizeof(*rb) + size, flags);
	if (rb == NULL)
1376
		return ERR_PTR(-ENOMEM);
1377

1378
	rb->rg_device = NULL;
1379
	rb->rg_direction = direction;
1380
	rb->rg_iov.length = size;
1381 1382

	return rb;
1383
}
1384

1385 1386 1387 1388 1389 1390 1391 1392
/**
 * __rpcrdma_map_regbuf - DMA-map a regbuf
 * @ia: controlling rpcrdma_ia
 * @rb: regbuf to be mapped
 */
bool
__rpcrdma_dma_map_regbuf(struct rpcrdma_ia *ia, struct rpcrdma_regbuf *rb)
{
1393 1394
	struct ib_device *device = ia->ri_device;

1395 1396 1397
	if (rb->rg_direction == DMA_NONE)
		return false;

1398
	rb->rg_iov.addr = ib_dma_map_single(device,
1399 1400 1401
					    (void *)rb->rg_base,
					    rdmab_length(rb),
					    rb->rg_direction);
1402 1403
	if (ib_dma_mapping_error(device, rdmab_addr(rb))) {
		trace_xprtrdma_dma_maperr(rdmab_addr(rb));
1404
		return false;
1405
	}
1406

1407
	rb->rg_device = device;
1408 1409 1410 1411 1412 1413 1414
	rb->rg_iov.lkey = ia->ri_pd->local_dma_lkey;
	return true;
}

static void
rpcrdma_dma_unmap_regbuf(struct rpcrdma_regbuf *rb)
{
1415 1416 1417
	if (!rb)
		return;

1418 1419 1420 1421 1422 1423
	if (!rpcrdma_regbuf_is_mapped(rb))
		return;

	ib_dma_unmap_single(rb->rg_device, rdmab_addr(rb),
			    rdmab_length(rb), rb->rg_direction);
	rb->rg_device = NULL;
1424 1425 1426 1427 1428 1429 1430
}

/**
 * rpcrdma_free_regbuf - deregister and free registered buffer
 * @rb: regbuf to be deregistered and freed
 */
void
1431
rpcrdma_free_regbuf(struct rpcrdma_regbuf *rb)
1432
{
1433
	rpcrdma_dma_unmap_regbuf(rb);
1434
	kfree(rb);
1435 1436
}

1437 1438 1439 1440 1441
/**
 * rpcrdma_ep_post - Post WRs to a transport's Send Queue
 * @ia: transport's device information
 * @ep: transport's RDMA endpoint information
 * @req: rpcrdma_req containing the Send WR to post
1442
 *
1443 1444
 * Returns 0 if the post was successful, otherwise -ENOTCONN
 * is returned.
1445 1446 1447 1448 1449 1450
 */
int
rpcrdma_ep_post(struct rpcrdma_ia *ia,
		struct rpcrdma_ep *ep,
		struct rpcrdma_req *req)
{
1451
	struct ib_send_wr *send_wr = &req->rl_sendctx->sc_wr;
1452
	int rc;
1453

1454 1455
	if (!ep->rep_send_count ||
	    test_bit(RPCRDMA_REQ_F_TX_RESOURCES, &req->rl_flags)) {
1456 1457 1458 1459 1460 1461
		send_wr->send_flags |= IB_SEND_SIGNALED;
		ep->rep_send_count = ep->rep_send_batch;
	} else {
		send_wr->send_flags &= ~IB_SEND_SIGNALED;
		--ep->rep_send_count;
	}
1462

1463
	rc = frwr_send(ia, req);
1464
	trace_xprtrdma_post_send(req, rc);
1465
	if (rc)
1466
		return -ENOTCONN;
1467
	return 0;
1468 1469
}

1470
static void
1471
rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp)
1472
{
1473
	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
1474
	struct rpcrdma_ep *ep = &r_xprt->rx_ep;
1475 1476
	struct ib_recv_wr *wr, *bad_wr;
	int needed, count, rc;
1477

1478 1479
	rc = 0;
	count = 0;
1480
	needed = buf->rb_credits + (buf->rb_bc_srv_max_requests << 1);
1481
	if (ep->rep_receive_count > needed)
1482
		goto out;
1483
	needed -= ep->rep_receive_count;
1484 1485
	if (!temp)
		needed += RPCRDMA_MAX_RECV_BATCH;
1486

1487 1488 1489 1490 1491
	count = 0;
	wr = NULL;
	while (needed) {
		struct rpcrdma_regbuf *rb;
		struct rpcrdma_rep *rep;
1492

1493 1494 1495 1496 1497 1498 1499 1500 1501 1502 1503
		spin_lock(&buf->rb_lock);
		rep = list_first_entry_or_null(&buf->rb_recv_bufs,
					       struct rpcrdma_rep, rr_list);
		if (likely(rep))
			list_del(&rep->rr_list);
		spin_unlock(&buf->rb_lock);
		if (!rep) {
			if (rpcrdma_create_rep(r_xprt, temp))
				break;
			continue;
		}
1504

1505 1506 1507 1508 1509 1510 1511
		rb = rep->rr_rdmabuf;
		if (!rpcrdma_regbuf_is_mapped(rb)) {
			if (!__rpcrdma_dma_map_regbuf(&r_xprt->rx_ia, rb)) {
				rpcrdma_recv_buffer_put(rep);
				break;
			}
		}
1512

1513 1514 1515 1516 1517 1518 1519
		trace_xprtrdma_post_recv(rep->rr_recv_wr.wr_cqe);
		rep->rr_recv_wr.next = wr;
		wr = &rep->rr_recv_wr;
		++count;
		--needed;
	}
	if (!count)
1520
		goto out;
1521

1522 1523
	rc = ib_post_recv(r_xprt->rx_ia.ri_id->qp, wr,
			  (const struct ib_recv_wr **)&bad_wr);
1524 1525 1526 1527 1528 1529 1530 1531 1532
	if (rc) {
		for (wr = bad_wr; wr; wr = wr->next) {
			struct rpcrdma_rep *rep;

			rep = container_of(wr, struct rpcrdma_rep, rr_recv_wr);
			rpcrdma_recv_buffer_put(rep);
			--count;
		}
	}
1533
	ep->rep_receive_count += count;
1534
out:
1535
	trace_xprtrdma_post_recvs(r_xprt, count, rc);
1536
}