verbs.c 37.9 KB
Newer Older
1
// SPDX-License-Identifier: GPL-2.0 OR BSD-3-Clause
2
/*
C
Chuck Lever 已提交
3
 * Copyright (c) 2014-2017 Oracle.  All rights reserved.
4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
 * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
 *
 * This software is available to you under a choice of one of two
 * licenses.  You may choose to be licensed under the terms of the GNU
 * General Public License (GPL) Version 2, available from the file
 * COPYING in the main directory of this source tree, or the BSD-type
 * license below:
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions
 * are met:
 *
 *      Redistributions of source code must retain the above copyright
 *      notice, this list of conditions and the following disclaimer.
 *
 *      Redistributions in binary form must reproduce the above
 *      copyright notice, this list of conditions and the following
 *      disclaimer in the documentation and/or other materials provided
 *      with the distribution.
 *
 *      Neither the name of the Network Appliance, Inc. nor the names of
 *      its contributors may be used to endorse or promote products
 *      derived from this software without specific prior written
 *      permission.
 *
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
40 41
 */

42 43 44 45 46 47 48 49 50 51
/*
 * verbs.c
 *
 * Encapsulates the major functions managing:
 *  o adapters
 *  o endpoints
 *  o connections
 *  o buffer memory
 */

52
#include <linux/interrupt.h>
53
#include <linux/slab.h>
54
#include <linux/sunrpc/addr.h>
55
#include <linux/sunrpc/svc_rdma.h>
56 57

#include <asm-generic/barrier.h>
58
#include <asm/bitops.h>
59

60
#include <rdma/ib_cm.h>
61

62
#include "xprt_rdma.h"
63
#include <trace/events/rpcrdma.h>
64

65 66 67 68
/*
 * Globals/Macros
 */

J
Jeff Layton 已提交
69
#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
70 71 72 73 74 75
# define RPCDBG_FACILITY	RPCDBG_TRANS
#endif

/*
 * internal functions
 */
76
static void rpcrdma_sendctx_put_locked(struct rpcrdma_sendctx *sc);
C
Chuck Lever 已提交
77 78
static void rpcrdma_mrs_create(struct rpcrdma_xprt *r_xprt);
static void rpcrdma_mrs_destroy(struct rpcrdma_buffer *buf);
79
static int rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt, bool temp);
80
static void rpcrdma_dma_unmap_regbuf(struct rpcrdma_regbuf *rb);
81

82
struct workqueue_struct *rpcrdma_receive_wq __read_mostly;
83

84 85
int
rpcrdma_alloc_wq(void)
86
{
87
	struct workqueue_struct *recv_wq;
88

89
	recv_wq = alloc_workqueue("xprtrdma_receive",
90
				  WQ_MEM_RECLAIM | WQ_HIGHPRI,
91 92 93
				  0);
	if (!recv_wq)
		return -ENOMEM;
94

95 96
	rpcrdma_receive_wq = recv_wq;
	return 0;
97 98
}

99 100
void
rpcrdma_destroy_wq(void)
101
{
102
	struct workqueue_struct *wq;
103

104 105 106 107 108
	if (rpcrdma_receive_wq) {
		wq = rpcrdma_receive_wq;
		rpcrdma_receive_wq = NULL;
		destroy_workqueue(wq);
	}
109 110
}

111 112 113 114
static void
rpcrdma_qp_async_error_upcall(struct ib_event *event, void *context)
{
	struct rpcrdma_ep *ep = context;
115 116
	struct rpcrdma_xprt *r_xprt = container_of(ep, struct rpcrdma_xprt,
						   rx_ep);
117

118
	trace_xprtrdma_qp_error(r_xprt, event);
119 120 121
	pr_err("rpcrdma: %s on device %s ep %p\n",
	       ib_event_msg(event->event), event->device->name, context);

122 123
	if (ep->rep_connected == 1) {
		ep->rep_connected = -EIO;
124
		rpcrdma_conn_func(ep);
125 126 127 128
		wake_up_all(&ep->rep_connect_wait);
	}
}

129 130 131 132 133
/**
 * rpcrdma_wc_send - Invoked by RDMA provider for each polled Send WC
 * @cq:	completion queue (ignored)
 * @wc:	completed WR
 *
134 135
 */
static void
136
rpcrdma_wc_send(struct ib_cq *cq, struct ib_wc *wc)
137
{
138 139 140 141
	struct ib_cqe *cqe = wc->wr_cqe;
	struct rpcrdma_sendctx *sc =
		container_of(cqe, struct rpcrdma_sendctx, sc_cqe);

142
	/* WARNING: Only wr_cqe and status are reliable at this point */
143
	trace_xprtrdma_wc_send(sc, wc);
144 145 146 147
	if (wc->status != IB_WC_SUCCESS && wc->status != IB_WC_WR_FLUSH_ERR)
		pr_err("rpcrdma: Send: %s (%u/0x%x)\n",
		       ib_wc_status_msg(wc->status),
		       wc->status, wc->vendor_err);
148 149

	rpcrdma_sendctx_put_locked(sc);
150
}
151

152
/**
153
 * rpcrdma_wc_receive - Invoked by RDMA provider for each polled Receive WC
154 155 156 157
 * @cq:	completion queue (ignored)
 * @wc:	completed WR
 *
 */
158
static void
159
rpcrdma_wc_receive(struct ib_cq *cq, struct ib_wc *wc)
160
{
161 162 163
	struct ib_cqe *cqe = wc->wr_cqe;
	struct rpcrdma_rep *rep = container_of(cqe, struct rpcrdma_rep,
					       rr_cqe);
164

165
	/* WARNING: Only wr_id and status are reliable at this point */
166
	trace_xprtrdma_wc_receive(wc);
167 168
	if (wc->status != IB_WC_SUCCESS)
		goto out_fail;
169

170
	/* status == SUCCESS means all fields in wc are trustworthy */
171
	rpcrdma_set_xdrlen(&rep->rr_hdrbuf, wc->byte_len);
172 173 174
	rep->rr_wc_flags = wc->wc_flags;
	rep->rr_inv_rkey = wc->ex.invalidate_rkey;

175
	ib_dma_sync_single_for_cpu(rdmab_device(rep->rr_rdmabuf),
176
				   rdmab_addr(rep->rr_rdmabuf),
177
				   wc->byte_len, DMA_FROM_DEVICE);
178

179
out_schedule:
180
	rpcrdma_reply_handler(rep);
181
	return;
182

183 184
out_fail:
	if (wc->status != IB_WC_WR_FLUSH_ERR)
185 186 187
		pr_err("rpcrdma: Recv: %s (%u/0x%x)\n",
		       ib_wc_status_msg(wc->status),
		       wc->status, wc->vendor_err);
188
	rpcrdma_set_xdrlen(&rep->rr_hdrbuf, 0);
189
	goto out_schedule;
190 191
}

192 193 194 195 196 197 198 199
static void
rpcrdma_update_connect_private(struct rpcrdma_xprt *r_xprt,
			       struct rdma_conn_param *param)
{
	struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
	const struct rpcrdma_connect_private *pmsg = param->private_data;
	unsigned int rsize, wsize;

200
	/* Default settings for RPC-over-RDMA Version One */
201
	r_xprt->rx_ia.ri_implicit_roundup = xprt_rdma_pad_optimize;
202 203 204 205 206 207
	rsize = RPCRDMA_V1_DEF_INLINE_SIZE;
	wsize = RPCRDMA_V1_DEF_INLINE_SIZE;

	if (pmsg &&
	    pmsg->cp_magic == rpcrdma_cmp_magic &&
	    pmsg->cp_version == RPCRDMA_CMP_VERSION) {
208
		r_xprt->rx_ia.ri_implicit_roundup = true;
209 210 211 212 213 214 215 216
		rsize = rpcrdma_decode_buffer_size(pmsg->cp_send_size);
		wsize = rpcrdma_decode_buffer_size(pmsg->cp_recv_size);
	}

	if (rsize < cdata->inline_rsize)
		cdata->inline_rsize = rsize;
	if (wsize < cdata->inline_wsize)
		cdata->inline_wsize = wsize;
217 218
	dprintk("RPC:       %s: max send %u, max recv %u\n",
		__func__, cdata->inline_wsize, cdata->inline_rsize);
219 220 221
	rpcrdma_set_max_header_sizes(r_xprt);
}

222 223 224 225 226 227 228 229
static int
rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event)
{
	struct rpcrdma_xprt *xprt = id->context;
	struct rpcrdma_ia *ia = &xprt->rx_ia;
	struct rpcrdma_ep *ep = &xprt->rx_ep;
	int connstate = 0;

230
	trace_xprtrdma_conn_upcall(xprt, event);
231 232 233
	switch (event->event) {
	case RDMA_CM_EVENT_ADDR_RESOLVED:
	case RDMA_CM_EVENT_ROUTE_RESOLVED:
234
		ia->ri_async_rc = 0;
235 236 237
		complete(&ia->ri_done);
		break;
	case RDMA_CM_EVENT_ADDR_ERROR:
238
		ia->ri_async_rc = -EPROTO;
239 240 241 242 243 244
		complete(&ia->ri_done);
		break;
	case RDMA_CM_EVENT_ROUTE_ERROR:
		ia->ri_async_rc = -ENETUNREACH;
		complete(&ia->ri_done);
		break;
245 246
	case RDMA_CM_EVENT_DEVICE_REMOVAL:
#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
247
		pr_info("rpcrdma: removing device %s for %s:%s\n",
248
			ia->ri_device->name,
249
			rpcrdma_addrstr(xprt), rpcrdma_portstr(xprt));
250 251 252 253 254 255 256 257 258 259
#endif
		set_bit(RPCRDMA_IAF_REMOVING, &ia->ri_flags);
		ep->rep_connected = -ENODEV;
		xprt_force_disconnect(&xprt->rx_xprt);
		wait_for_completion(&ia->ri_remove_done);

		ia->ri_id = NULL;
		ia->ri_device = NULL;
		/* Return 1 to ensure the core destroys the id. */
		return 1;
260
	case RDMA_CM_EVENT_ESTABLISHED:
261
		++xprt->rx_xprt.connect_cookie;
262
		connstate = 1;
263
		rpcrdma_update_connect_private(xprt, &event->param.conn);
264 265 266 267 268
		goto connected;
	case RDMA_CM_EVENT_CONNECT_ERROR:
		connstate = -ENOTCONN;
		goto connected;
	case RDMA_CM_EVENT_UNREACHABLE:
269
		connstate = -ENETUNREACH;
270 271
		goto connected;
	case RDMA_CM_EVENT_REJECTED:
272 273
		dprintk("rpcrdma: connection to %s:%s rejected: %s\n",
			rpcrdma_addrstr(xprt), rpcrdma_portstr(xprt),
274
			rdma_reject_msg(id, event->status));
275
		connstate = -ECONNREFUSED;
276 277
		if (event->status == IB_CM_REJ_STALE_CONN)
			connstate = -EAGAIN;
278 279
		goto connected;
	case RDMA_CM_EVENT_DISCONNECTED:
280
		++xprt->rx_xprt.connect_cookie;
281 282 283
		connstate = -ECONNABORTED;
connected:
		ep->rep_connected = connstate;
284
		rpcrdma_conn_func(ep);
285
		wake_up_all(&ep->rep_connect_wait);
286
		/*FALLTHROUGH*/
287
	default:
288 289 290
		dprintk("RPC:       %s: %s:%s on %s/%s (ep 0x%p): %s\n",
			__func__,
			rpcrdma_addrstr(xprt), rpcrdma_portstr(xprt),
291 292
			ia->ri_device->name, ia->ri_ops->ro_displayname,
			ep, rdma_event_msg(event->event));
293 294 295 296 297 298 299
		break;
	}

	return 0;
}

static struct rdma_cm_id *
300
rpcrdma_create_id(struct rpcrdma_xprt *xprt, struct rpcrdma_ia *ia)
301
{
302
	unsigned long wtimeout = msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1;
303 304 305
	struct rdma_cm_id *id;
	int rc;

306 307
	trace_xprtrdma_conn_start(xprt);

308
	init_completion(&ia->ri_done);
309
	init_completion(&ia->ri_remove_done);
310

311 312
	id = rdma_create_id(xprt->rx_xprt.xprt_net, rpcrdma_conn_upcall,
			    xprt, RDMA_PS_TCP, IB_QPT_RC);
313 314 315 316 317 318 319
	if (IS_ERR(id)) {
		rc = PTR_ERR(id);
		dprintk("RPC:       %s: rdma_create_id() failed %i\n",
			__func__, rc);
		return id;
	}

320
	ia->ri_async_rc = -ETIMEDOUT;
321 322 323
	rc = rdma_resolve_addr(id, NULL,
			       (struct sockaddr *)&xprt->rx_xprt.addr,
			       RDMA_RESOLVE_TIMEOUT);
324 325 326 327 328
	if (rc) {
		dprintk("RPC:       %s: rdma_resolve_addr() failed %i\n",
			__func__, rc);
		goto out;
	}
329 330
	rc = wait_for_completion_interruptible_timeout(&ia->ri_done, wtimeout);
	if (rc < 0) {
331
		trace_xprtrdma_conn_tout(xprt);
332 333
		goto out;
	}
334

335 336 337 338
	rc = ia->ri_async_rc;
	if (rc)
		goto out;

339
	ia->ri_async_rc = -ETIMEDOUT;
340 341 342 343
	rc = rdma_resolve_route(id, RDMA_RESOLVE_TIMEOUT);
	if (rc) {
		dprintk("RPC:       %s: rdma_resolve_route() failed %i\n",
			__func__, rc);
344
		goto out;
345
	}
346 347
	rc = wait_for_completion_interruptible_timeout(&ia->ri_done, wtimeout);
	if (rc < 0) {
348
		trace_xprtrdma_conn_tout(xprt);
349
		goto out;
350
	}
351 352
	rc = ia->ri_async_rc;
	if (rc)
353
		goto out;
354 355

	return id;
356

357 358 359 360 361 362 363 364 365
out:
	rdma_destroy_id(id);
	return ERR_PTR(rc);
}

/*
 * Exported functions.
 */

366 367
/**
 * rpcrdma_ia_open - Open and initialize an Interface Adapter.
368
 * @xprt: transport with IA to (re)initialize
369 370 371
 *
 * Returns 0 on success, negative errno if an appropriate
 * Interface Adapter could not be found and opened.
372 373
 */
int
374
rpcrdma_ia_open(struct rpcrdma_xprt *xprt)
375 376
{
	struct rpcrdma_ia *ia = &xprt->rx_ia;
377 378
	int rc;

379
	ia->ri_id = rpcrdma_create_id(xprt, ia);
380 381
	if (IS_ERR(ia->ri_id)) {
		rc = PTR_ERR(ia->ri_id);
382
		goto out_err;
383
	}
384
	ia->ri_device = ia->ri_id->device;
385

386
	ia->ri_pd = ib_alloc_pd(ia->ri_device, 0);
387 388
	if (IS_ERR(ia->ri_pd)) {
		rc = PTR_ERR(ia->ri_pd);
389
		pr_err("rpcrdma: ib_alloc_pd() returned %d\n", rc);
390
		goto out_err;
391 392
	}

393
	switch (xprt_rdma_memreg_strategy) {
394
	case RPCRDMA_FRWR:
395 396 397 398 399
		if (frwr_is_supported(ia)) {
			ia->ri_ops = &rpcrdma_frwr_memreg_ops;
			break;
		}
		/*FALLTHROUGH*/
400
	case RPCRDMA_MTHCAFMR:
401 402 403 404 405
		if (fmr_is_supported(ia)) {
			ia->ri_ops = &rpcrdma_fmr_memreg_ops;
			break;
		}
		/*FALLTHROUGH*/
406
	default:
407 408
		pr_err("rpcrdma: Device %s does not support memreg mode %d\n",
		       ia->ri_device->name, xprt_rdma_memreg_strategy);
409
		rc = -EINVAL;
410
		goto out_err;
411 412 413
	}

	return 0;
414

415 416
out_err:
	rpcrdma_ia_close(ia);
417 418 419
	return rc;
}

420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451
/**
 * rpcrdma_ia_remove - Handle device driver unload
 * @ia: interface adapter being removed
 *
 * Divest transport H/W resources associated with this adapter,
 * but allow it to be restored later.
 */
void
rpcrdma_ia_remove(struct rpcrdma_ia *ia)
{
	struct rpcrdma_xprt *r_xprt = container_of(ia, struct rpcrdma_xprt,
						   rx_ia);
	struct rpcrdma_ep *ep = &r_xprt->rx_ep;
	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
	struct rpcrdma_req *req;
	struct rpcrdma_rep *rep;

	cancel_delayed_work_sync(&buf->rb_refresh_worker);

	/* This is similar to rpcrdma_ep_destroy, but:
	 * - Don't cancel the connect worker.
	 * - Don't call rpcrdma_ep_disconnect, which waits
	 *   for another conn upcall, which will deadlock.
	 * - rdma_disconnect is unneeded, the underlying
	 *   connection is already gone.
	 */
	if (ia->ri_id->qp) {
		ib_drain_qp(ia->ri_id->qp);
		rdma_destroy_qp(ia->ri_id);
		ia->ri_id->qp = NULL;
	}
	ib_free_cq(ep->rep_attr.recv_cq);
452
	ep->rep_attr.recv_cq = NULL;
453
	ib_free_cq(ep->rep_attr.send_cq);
454
	ep->rep_attr.send_cq = NULL;
455 456 457 458 459 460 461 462 463 464 465

	/* The ULP is responsible for ensuring all DMA
	 * mappings and MRs are gone.
	 */
	list_for_each_entry(rep, &buf->rb_recv_bufs, rr_list)
		rpcrdma_dma_unmap_regbuf(rep->rr_rdmabuf);
	list_for_each_entry(req, &buf->rb_allreqs, rl_all) {
		rpcrdma_dma_unmap_regbuf(req->rl_rdmabuf);
		rpcrdma_dma_unmap_regbuf(req->rl_sendbuf);
		rpcrdma_dma_unmap_regbuf(req->rl_recvbuf);
	}
C
Chuck Lever 已提交
466
	rpcrdma_mrs_destroy(buf);
467 468
	ib_dealloc_pd(ia->ri_pd);
	ia->ri_pd = NULL;
469 470 471

	/* Allow waiters to continue */
	complete(&ia->ri_remove_done);
472 473

	trace_xprtrdma_remove(r_xprt);
474 475
}

476 477 478 479
/**
 * rpcrdma_ia_close - Clean up/close an IA.
 * @ia: interface adapter to close
 *
480 481 482 483
 */
void
rpcrdma_ia_close(struct rpcrdma_ia *ia)
{
484 485 486
	if (ia->ri_id != NULL && !IS_ERR(ia->ri_id)) {
		if (ia->ri_id->qp)
			rdma_destroy_qp(ia->ri_id);
487
		rdma_destroy_id(ia->ri_id);
488
	}
489 490
	ia->ri_id = NULL;
	ia->ri_device = NULL;
491 492 493

	/* If the pd is still busy, xprtrdma missed freeing a resource */
	if (ia->ri_pd && !IS_ERR(ia->ri_pd))
494
		ib_dealloc_pd(ia->ri_pd);
495
	ia->ri_pd = NULL;
496 497 498 499 500 501 502
}

/*
 * Create unconnected endpoint.
 */
int
rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
503
		  struct rpcrdma_create_data_internal *cdata)
504
{
505
	struct rpcrdma_connect_private *pmsg = &ep->rep_cm_private;
506
	struct ib_cq *sendcq, *recvcq;
507
	unsigned int max_sge;
508
	int rc;
509

510
	max_sge = min_t(unsigned int, ia->ri_device->attrs.max_send_sge,
511
			RPCRDMA_MAX_SEND_SGES);
512 513
	if (max_sge < RPCRDMA_MIN_SEND_SGES) {
		pr_warn("rpcrdma: HCA provides only %d send SGEs\n", max_sge);
514 515
		return -ENOMEM;
	}
516
	ia->ri_max_send_sges = max_sge;
517

518 519 520
	rc = ia->ri_ops->ro_open(ia, ep, cdata);
	if (rc)
		return rc;
521 522 523 524

	ep->rep_attr.event_handler = rpcrdma_qp_async_error_upcall;
	ep->rep_attr.qp_context = ep;
	ep->rep_attr.srq = NULL;
525
	ep->rep_attr.cap.max_send_sge = max_sge;
526 527 528 529 530 531 532 533 534 535 536 537 538 539 540
	ep->rep_attr.cap.max_recv_sge = 1;
	ep->rep_attr.cap.max_inline_data = 0;
	ep->rep_attr.sq_sig_type = IB_SIGNAL_REQ_WR;
	ep->rep_attr.qp_type = IB_QPT_RC;
	ep->rep_attr.port_num = ~0;

	dprintk("RPC:       %s: requested max: dtos: send %d recv %d; "
		"iovs: send %d recv %d\n",
		__func__,
		ep->rep_attr.cap.max_send_wr,
		ep->rep_attr.cap.max_recv_wr,
		ep->rep_attr.cap.max_send_sge,
		ep->rep_attr.cap.max_recv_sge);

	/* set trigger for requesting send completion */
541 542 543
	ep->rep_send_batch = min_t(unsigned int, RPCRDMA_MAX_SEND_BATCH,
				   cdata->max_requests >> 2);
	ep->rep_send_count = ep->rep_send_batch;
544
	init_waitqueue_head(&ep->rep_connect_wait);
545
	INIT_DELAYED_WORK(&ep->rep_connect_worker, rpcrdma_connect_worker);
546

547 548
	sendcq = ib_alloc_cq(ia->ri_device, NULL,
			     ep->rep_attr.cap.max_send_wr + 1,
549
			     1, IB_POLL_WORKQUEUE);
550 551 552
	if (IS_ERR(sendcq)) {
		rc = PTR_ERR(sendcq);
		dprintk("RPC:       %s: failed to create send CQ: %i\n",
553 554 555 556
			__func__, rc);
		goto out1;
	}

557 558
	recvcq = ib_alloc_cq(ia->ri_device, NULL,
			     ep->rep_attr.cap.max_recv_wr + 1,
559
			     0, IB_POLL_WORKQUEUE);
560 561 562 563 564 565 566 567 568
	if (IS_ERR(recvcq)) {
		rc = PTR_ERR(recvcq);
		dprintk("RPC:       %s: failed to create recv CQ: %i\n",
			__func__, rc);
		goto out2;
	}

	ep->rep_attr.send_cq = sendcq;
	ep->rep_attr.recv_cq = recvcq;
569 570

	/* Initialize cma parameters */
571
	memset(&ep->rep_remote_cma, 0, sizeof(ep->rep_remote_cma));
572

573 574 575
	/* Prepare RDMA-CM private message */
	pmsg->cp_magic = rpcrdma_cmp_magic;
	pmsg->cp_version = RPCRDMA_CMP_VERSION;
576
	pmsg->cp_flags |= ia->ri_ops->ro_send_w_inv_ok;
577 578 579 580
	pmsg->cp_send_size = rpcrdma_encode_buffer_size(cdata->inline_wsize);
	pmsg->cp_recv_size = rpcrdma_encode_buffer_size(cdata->inline_rsize);
	ep->rep_remote_cma.private_data = pmsg;
	ep->rep_remote_cma.private_data_len = sizeof(*pmsg);
581 582

	/* Client offers RDMA Read but does not initiate */
583
	ep->rep_remote_cma.initiator_depth = 0;
584 585
	ep->rep_remote_cma.responder_resources =
		min_t(int, U8_MAX, ia->ri_device->attrs.max_qp_rd_atom);
586

587 588 589 590 591 592 593 594 595 596
	/* Limit transport retries so client can detect server
	 * GID changes quickly. RPC layer handles re-establishing
	 * transport connection and retransmission.
	 */
	ep->rep_remote_cma.retry_count = 6;

	/* RPC-over-RDMA handles its own flow control. In addition,
	 * make all RNR NAKs visible so we know that RPC-over-RDMA
	 * flow control is working correctly (no NAKs should be seen).
	 */
597 598 599 600 601 602
	ep->rep_remote_cma.flow_control = 0;
	ep->rep_remote_cma.rnr_retry_count = 0;

	return 0;

out2:
603
	ib_free_cq(sendcq);
604 605 606 607 608 609 610 611 612 613 614
out1:
	return rc;
}

/*
 * rpcrdma_ep_destroy
 *
 * Disconnect and destroy endpoint. After this, the only
 * valid operations on the ep are to free it (if dynamically
 * allocated) or re-create it.
 */
615
void
616 617
rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
{
618 619
	cancel_delayed_work_sync(&ep->rep_connect_worker);

620
	if (ia->ri_id && ia->ri_id->qp) {
621
		rpcrdma_ep_disconnect(ep, ia);
622 623
		rdma_destroy_qp(ia->ri_id);
		ia->ri_id->qp = NULL;
624 625
	}

626 627 628 629
	if (ep->rep_attr.recv_cq)
		ib_free_cq(ep->rep_attr.recv_cq);
	if (ep->rep_attr.send_cq)
		ib_free_cq(ep->rep_attr.send_cq);
630 631
}

632 633 634 635 636 637 638 639 640 641
/* Re-establish a connection after a device removal event.
 * Unlike a normal reconnection, a fresh PD and a new set
 * of MRs and buffers is needed.
 */
static int
rpcrdma_ep_recreate_xprt(struct rpcrdma_xprt *r_xprt,
			 struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
{
	int rc, err;

642
	trace_xprtrdma_reinsert(r_xprt);
643 644

	rc = -EHOSTUNREACH;
645
	if (rpcrdma_ia_open(r_xprt))
646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661
		goto out1;

	rc = -ENOMEM;
	err = rpcrdma_ep_create(ep, ia, &r_xprt->rx_data);
	if (err) {
		pr_err("rpcrdma: rpcrdma_ep_create returned %d\n", err);
		goto out2;
	}

	rc = -ENETUNREACH;
	err = rdma_create_qp(ia->ri_id, ia->ri_pd, &ep->rep_attr);
	if (err) {
		pr_err("rpcrdma: rdma_create_qp returned %d\n", err);
		goto out3;
	}

C
Chuck Lever 已提交
662
	rpcrdma_mrs_create(r_xprt);
663 664 665 666 667 668 669 670 671 672
	return 0;

out3:
	rpcrdma_ep_destroy(ep, ia);
out2:
	rpcrdma_ia_close(ia);
out1:
	return rc;
}

673 674 675 676 677 678 679
static int
rpcrdma_ep_reconnect(struct rpcrdma_xprt *r_xprt, struct rpcrdma_ep *ep,
		     struct rpcrdma_ia *ia)
{
	struct rdma_cm_id *id, *old;
	int err, rc;

680
	trace_xprtrdma_reconnect(r_xprt);
681 682 683 684

	rpcrdma_ep_disconnect(ep, ia);

	rc = -EHOSTUNREACH;
685
	id = rpcrdma_create_id(r_xprt, ia);
686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717
	if (IS_ERR(id))
		goto out;

	/* As long as the new ID points to the same device as the
	 * old ID, we can reuse the transport's existing PD and all
	 * previously allocated MRs. Also, the same device means
	 * the transport's previous DMA mappings are still valid.
	 *
	 * This is a sanity check only. There should be no way these
	 * point to two different devices here.
	 */
	old = id;
	rc = -ENETUNREACH;
	if (ia->ri_device != id->device) {
		pr_err("rpcrdma: can't reconnect on different device!\n");
		goto out_destroy;
	}

	err = rdma_create_qp(id, ia->ri_pd, &ep->rep_attr);
	if (err) {
		dprintk("RPC:       %s: rdma_create_qp returned %d\n",
			__func__, err);
		goto out_destroy;
	}

	/* Atomically replace the transport's ID and QP. */
	rc = 0;
	old = ia->ri_id;
	ia->ri_id = id;
	rdma_destroy_qp(old);

out_destroy:
718
	rdma_destroy_id(old);
719 720 721 722
out:
	return rc;
}

723 724 725 726 727 728
/*
 * Connect unconnected endpoint.
 */
int
rpcrdma_ep_connect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
{
729 730
	struct rpcrdma_xprt *r_xprt = container_of(ia, struct rpcrdma_xprt,
						   rx_ia);
731
	int rc;
732 733

retry:
734 735
	switch (ep->rep_connected) {
	case 0:
736 737 738 739 740
		dprintk("RPC:       %s: connecting...\n", __func__);
		rc = rdma_create_qp(ia->ri_id, ia->ri_pd, &ep->rep_attr);
		if (rc) {
			dprintk("RPC:       %s: rdma_create_qp failed %i\n",
				__func__, rc);
741 742
			rc = -ENETUNREACH;
			goto out_noupdate;
743
		}
744
		break;
745 746 747 748 749
	case -ENODEV:
		rc = rpcrdma_ep_recreate_xprt(r_xprt, ep, ia);
		if (rc)
			goto out_noupdate;
		break;
750 751 752 753
	default:
		rc = rpcrdma_ep_reconnect(r_xprt, ep, ia);
		if (rc)
			goto out;
754 755 756
	}

	ep->rep_connected = 0;
757
	rpcrdma_post_recvs(r_xprt, true);
758 759 760 761 762 763 764 765 766 767

	rc = rdma_connect(ia->ri_id, &ep->rep_remote_cma);
	if (rc) {
		dprintk("RPC:       %s: rdma_connect() failed with %i\n",
				__func__, rc);
		goto out;
	}

	wait_event_interruptible(ep->rep_connect_wait, ep->rep_connected != 0);
	if (ep->rep_connected <= 0) {
768
		if (ep->rep_connected == -EAGAIN)
769 770
			goto retry;
		rc = ep->rep_connected;
771
		goto out;
772 773
	}

774
	dprintk("RPC:       %s: connected\n", __func__);
775

776 777 778
out:
	if (rc)
		ep->rep_connected = rc;
779 780

out_noupdate:
781 782 783 784 785 786 787 788 789 790 791 792
	return rc;
}

/*
 * rpcrdma_ep_disconnect
 *
 * This is separate from destroy to facilitate the ability
 * to reconnect without recreating the endpoint.
 *
 * This call is not reentrant, and must not be made in parallel
 * on the same endpoint.
 */
793
void
794 795 796 797 798
rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
{
	int rc;

	rc = rdma_disconnect(ia->ri_id);
799
	if (!rc)
800 801 802
		/* returns without wait if not connected */
		wait_event_interruptible(ep->rep_connect_wait,
							ep->rep_connected != 1);
803
	else
804
		ep->rep_connected = rc;
805 806
	trace_xprtrdma_disconnect(container_of(ep, struct rpcrdma_xprt,
					       rx_ep), rc);
807 808

	ib_drain_qp(ia->ri_id->qp);
809 810
}

811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879
/* Fixed-size circular FIFO queue. This implementation is wait-free and
 * lock-free.
 *
 * Consumer is the code path that posts Sends. This path dequeues a
 * sendctx for use by a Send operation. Multiple consumer threads
 * are serialized by the RPC transport lock, which allows only one
 * ->send_request call at a time.
 *
 * Producer is the code path that handles Send completions. This path
 * enqueues a sendctx that has been completed. Multiple producer
 * threads are serialized by the ib_poll_cq() function.
 */

/* rpcrdma_sendctxs_destroy() assumes caller has already quiesced
 * queue activity, and ib_drain_qp has flushed all remaining Send
 * requests.
 */
static void rpcrdma_sendctxs_destroy(struct rpcrdma_buffer *buf)
{
	unsigned long i;

	for (i = 0; i <= buf->rb_sc_last; i++)
		kfree(buf->rb_sc_ctxs[i]);
	kfree(buf->rb_sc_ctxs);
}

static struct rpcrdma_sendctx *rpcrdma_sendctx_create(struct rpcrdma_ia *ia)
{
	struct rpcrdma_sendctx *sc;

	sc = kzalloc(sizeof(*sc) +
		     ia->ri_max_send_sges * sizeof(struct ib_sge),
		     GFP_KERNEL);
	if (!sc)
		return NULL;

	sc->sc_wr.wr_cqe = &sc->sc_cqe;
	sc->sc_wr.sg_list = sc->sc_sges;
	sc->sc_wr.opcode = IB_WR_SEND;
	sc->sc_cqe.done = rpcrdma_wc_send;
	return sc;
}

static int rpcrdma_sendctxs_create(struct rpcrdma_xprt *r_xprt)
{
	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
	struct rpcrdma_sendctx *sc;
	unsigned long i;

	/* Maximum number of concurrent outstanding Send WRs. Capping
	 * the circular queue size stops Send Queue overflow by causing
	 * the ->send_request call to fail temporarily before too many
	 * Sends are posted.
	 */
	i = buf->rb_max_requests + RPCRDMA_MAX_BC_REQUESTS;
	dprintk("RPC:       %s: allocating %lu send_ctxs\n", __func__, i);
	buf->rb_sc_ctxs = kcalloc(i, sizeof(sc), GFP_KERNEL);
	if (!buf->rb_sc_ctxs)
		return -ENOMEM;

	buf->rb_sc_last = i - 1;
	for (i = 0; i <= buf->rb_sc_last; i++) {
		sc = rpcrdma_sendctx_create(&r_xprt->rx_ia);
		if (!sc)
			goto out_destroy;

		sc->sc_xprt = r_xprt;
		buf->rb_sc_ctxs[i] = sc;
	}
880
	buf->rb_flags = 0;
881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937

	return 0;

out_destroy:
	rpcrdma_sendctxs_destroy(buf);
	return -ENOMEM;
}

/* The sendctx queue is not guaranteed to have a size that is a
 * power of two, thus the helpers in circ_buf.h cannot be used.
 * The other option is to use modulus (%), which can be expensive.
 */
static unsigned long rpcrdma_sendctx_next(struct rpcrdma_buffer *buf,
					  unsigned long item)
{
	return likely(item < buf->rb_sc_last) ? item + 1 : 0;
}

/**
 * rpcrdma_sendctx_get_locked - Acquire a send context
 * @buf: transport buffers from which to acquire an unused context
 *
 * Returns pointer to a free send completion context; or NULL if
 * the queue is empty.
 *
 * Usage: Called to acquire an SGE array before preparing a Send WR.
 *
 * The caller serializes calls to this function (per rpcrdma_buffer),
 * and provides an effective memory barrier that flushes the new value
 * of rb_sc_head.
 */
struct rpcrdma_sendctx *rpcrdma_sendctx_get_locked(struct rpcrdma_buffer *buf)
{
	struct rpcrdma_xprt *r_xprt;
	struct rpcrdma_sendctx *sc;
	unsigned long next_head;

	next_head = rpcrdma_sendctx_next(buf, buf->rb_sc_head);

	if (next_head == READ_ONCE(buf->rb_sc_tail))
		goto out_emptyq;

	/* ORDER: item must be accessed _before_ head is updated */
	sc = buf->rb_sc_ctxs[next_head];

	/* Releasing the lock in the caller acts as a memory
	 * barrier that flushes rb_sc_head.
	 */
	buf->rb_sc_head = next_head;

	return sc;

out_emptyq:
	/* The queue is "empty" if there have not been enough Send
	 * completions recently. This is a sign the Send Queue is
	 * backing up. Cause the caller to pause and try again.
	 */
938
	set_bit(RPCRDMA_BUF_F_EMPTY_SCQ, &buf->rb_flags);
939 940 941 942 943 944 945 946 947 948 949 950 951 952
	r_xprt = container_of(buf, struct rpcrdma_xprt, rx_buf);
	r_xprt->rx_stats.empty_sendctx_q++;
	return NULL;
}

/**
 * rpcrdma_sendctx_put_locked - Release a send context
 * @sc: send context to release
 *
 * Usage: Called from Send completion to return a sendctxt
 * to the queue.
 *
 * The caller serializes calls to this function (per rpcrdma_buffer).
 */
953 954
static void
rpcrdma_sendctx_put_locked(struct rpcrdma_sendctx *sc)
955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972
{
	struct rpcrdma_buffer *buf = &sc->sc_xprt->rx_buf;
	unsigned long next_tail;

	/* Unmap SGEs of previously completed by unsignaled
	 * Sends by walking up the queue until @sc is found.
	 */
	next_tail = buf->rb_sc_tail;
	do {
		next_tail = rpcrdma_sendctx_next(buf, next_tail);

		/* ORDER: item must be accessed _before_ tail is updated */
		rpcrdma_unmap_sendctx(buf->rb_sc_ctxs[next_tail]);

	} while (buf->rb_sc_ctxs[next_tail] != sc);

	/* Paired with READ_ONCE */
	smp_store_release(&buf->rb_sc_tail, next_tail);
973 974 975 976 977

	if (test_and_clear_bit(RPCRDMA_BUF_F_EMPTY_SCQ, &buf->rb_flags)) {
		smp_mb__after_atomic();
		xprt_write_space(&sc->sc_xprt->rx_xprt);
	}
978 979
}

C
Chuck Lever 已提交
980
static void
C
Chuck Lever 已提交
981
rpcrdma_mrs_create(struct rpcrdma_xprt *r_xprt)
C
Chuck Lever 已提交
982 983 984 985 986 987 988
{
	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
	unsigned int count;
	LIST_HEAD(free);
	LIST_HEAD(all);

C
Chuck Lever 已提交
989
	for (count = 0; count < ia->ri_max_segs; count++) {
C
Chuck Lever 已提交
990
		struct rpcrdma_mr *mr;
C
Chuck Lever 已提交
991 992
		int rc;

C
Chuck Lever 已提交
993 994
		mr = kzalloc(sizeof(*mr), GFP_KERNEL);
		if (!mr)
C
Chuck Lever 已提交
995 996
			break;

C
Chuck Lever 已提交
997
		rc = ia->ri_ops->ro_init_mr(ia, mr);
C
Chuck Lever 已提交
998
		if (rc) {
C
Chuck Lever 已提交
999
			kfree(mr);
C
Chuck Lever 已提交
1000 1001 1002
			break;
		}

C
Chuck Lever 已提交
1003
		mr->mr_xprt = r_xprt;
C
Chuck Lever 已提交
1004

C
Chuck Lever 已提交
1005 1006
		list_add(&mr->mr_list, &free);
		list_add(&mr->mr_all, &all);
C
Chuck Lever 已提交
1007 1008
	}

C
Chuck Lever 已提交
1009 1010
	spin_lock(&buf->rb_mrlock);
	list_splice(&free, &buf->rb_mrs);
C
Chuck Lever 已提交
1011 1012
	list_splice(&all, &buf->rb_all);
	r_xprt->rx_stats.mrs_allocated += count;
C
Chuck Lever 已提交
1013
	spin_unlock(&buf->rb_mrlock);
1014
	trace_xprtrdma_createmrs(r_xprt, count);
1015 1016

	xprt_write_space(&r_xprt->rx_xprt);
C
Chuck Lever 已提交
1017 1018 1019 1020 1021 1022 1023 1024 1025 1026
}

static void
rpcrdma_mr_refresh_worker(struct work_struct *work)
{
	struct rpcrdma_buffer *buf = container_of(work, struct rpcrdma_buffer,
						  rb_refresh_worker.work);
	struct rpcrdma_xprt *r_xprt = container_of(buf, struct rpcrdma_xprt,
						   rx_buf);

C
Chuck Lever 已提交
1027
	rpcrdma_mrs_create(r_xprt);
C
Chuck Lever 已提交
1028 1029
}

1030
struct rpcrdma_req *
1031 1032
rpcrdma_create_req(struct rpcrdma_xprt *r_xprt)
{
1033
	struct rpcrdma_buffer *buffer = &r_xprt->rx_buf;
1034
	struct rpcrdma_regbuf *rb;
1035 1036
	struct rpcrdma_req *req;

1037
	req = kzalloc(sizeof(*req), GFP_KERNEL);
1038
	if (req == NULL)
1039
		return ERR_PTR(-ENOMEM);
1040

1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051
	rb = rpcrdma_alloc_regbuf(RPCRDMA_HDRBUF_SIZE,
				  DMA_TO_DEVICE, GFP_KERNEL);
	if (IS_ERR(rb)) {
		kfree(req);
		return ERR_PTR(-ENOMEM);
	}
	req->rl_rdmabuf = rb;
	xdr_buf_init(&req->rl_hdrbuf, rb->rg_base, rdmab_length(rb));
	req->rl_buffer = buffer;
	INIT_LIST_HEAD(&req->rl_registered);

1052 1053 1054
	spin_lock(&buffer->rb_reqslock);
	list_add(&req->rl_all, &buffer->rb_allreqs);
	spin_unlock(&buffer->rb_reqslock);
1055 1056 1057
	return req;
}

1058 1059
static int
rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt, bool temp)
1060 1061
{
	struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
1062
	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
1063 1064 1065 1066
	struct rpcrdma_rep *rep;
	int rc;

	rc = -ENOMEM;
1067
	rep = kzalloc(sizeof(*rep), GFP_KERNEL);
1068 1069 1070
	if (rep == NULL)
		goto out;

1071
	rep->rr_rdmabuf = rpcrdma_alloc_regbuf(cdata->inline_rsize,
1072
					       DMA_FROM_DEVICE, GFP_KERNEL);
1073 1074
	if (IS_ERR(rep->rr_rdmabuf)) {
		rc = PTR_ERR(rep->rr_rdmabuf);
1075
		goto out_free;
1076
	}
1077 1078
	xdr_buf_init(&rep->rr_hdrbuf, rep->rr_rdmabuf->rg_base,
		     rdmab_length(rep->rr_rdmabuf));
1079

1080
	rep->rr_cqe.done = rpcrdma_wc_receive;
1081
	rep->rr_rxprt = r_xprt;
1082
	INIT_WORK(&rep->rr_work, rpcrdma_deferred_completion);
1083 1084 1085 1086
	rep->rr_recv_wr.next = NULL;
	rep->rr_recv_wr.wr_cqe = &rep->rr_cqe;
	rep->rr_recv_wr.sg_list = &rep->rr_rdmabuf->rg_iov;
	rep->rr_recv_wr.num_sge = 1;
1087
	rep->rr_temp = temp;
1088 1089 1090 1091 1092

	spin_lock(&buf->rb_lock);
	list_add(&rep->rr_list, &buf->rb_recv_bufs);
	spin_unlock(&buf->rb_lock);
	return 0;
1093 1094 1095 1096

out_free:
	kfree(rep);
out:
1097 1098 1099
	dprintk("RPC:       %s: reply buffer %d alloc failed\n",
		__func__, rc);
	return rc;
1100 1101
}

1102
int
1103
rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
1104
{
1105
	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
1106 1107
	int i, rc;

1108
	buf->rb_max_requests = r_xprt->rx_data.max_requests;
1109
	buf->rb_bc_srv_max_requests = 0;
C
Chuck Lever 已提交
1110
	spin_lock_init(&buf->rb_mrlock);
1111
	spin_lock_init(&buf->rb_lock);
C
Chuck Lever 已提交
1112
	INIT_LIST_HEAD(&buf->rb_mrs);
C
Chuck Lever 已提交
1113 1114 1115
	INIT_LIST_HEAD(&buf->rb_all);
	INIT_DELAYED_WORK(&buf->rb_refresh_worker,
			  rpcrdma_mr_refresh_worker);
1116

C
Chuck Lever 已提交
1117
	rpcrdma_mrs_create(r_xprt);
1118

1119
	INIT_LIST_HEAD(&buf->rb_send_bufs);
1120 1121
	INIT_LIST_HEAD(&buf->rb_allreqs);
	spin_lock_init(&buf->rb_reqslock);
1122 1123 1124
	for (i = 0; i < buf->rb_max_requests; i++) {
		struct rpcrdma_req *req;

1125 1126
		req = rpcrdma_create_req(r_xprt);
		if (IS_ERR(req)) {
1127 1128
			dprintk("RPC:       %s: request buffer %d alloc"
				" failed\n", __func__, i);
1129
			rc = PTR_ERR(req);
1130 1131
			goto out;
		}
1132
		list_add(&req->rl_list, &buf->rb_send_bufs);
1133 1134
	}

1135
	buf->rb_credits = 1;
1136
	buf->rb_posted_receives = 0;
1137
	INIT_LIST_HEAD(&buf->rb_recv_bufs);
1138

1139 1140 1141 1142
	rc = rpcrdma_sendctxs_create(r_xprt);
	if (rc)
		goto out;

1143 1144 1145 1146 1147 1148
	return 0;
out:
	rpcrdma_buffer_destroy(buf);
	return rc;
}

1149
static void
1150
rpcrdma_destroy_rep(struct rpcrdma_rep *rep)
1151
{
1152
	rpcrdma_free_regbuf(rep->rr_rdmabuf);
1153 1154 1155
	kfree(rep);
}

1156
void
1157
rpcrdma_destroy_req(struct rpcrdma_req *req)
1158
{
1159 1160 1161
	rpcrdma_free_regbuf(req->rl_recvbuf);
	rpcrdma_free_regbuf(req->rl_sendbuf);
	rpcrdma_free_regbuf(req->rl_rdmabuf);
1162 1163 1164
	kfree(req);
}

C
Chuck Lever 已提交
1165
static void
C
Chuck Lever 已提交
1166
rpcrdma_mrs_destroy(struct rpcrdma_buffer *buf)
C
Chuck Lever 已提交
1167 1168 1169 1170
{
	struct rpcrdma_xprt *r_xprt = container_of(buf, struct rpcrdma_xprt,
						   rx_buf);
	struct rpcrdma_ia *ia = rdmab_to_ia(buf);
C
Chuck Lever 已提交
1171
	struct rpcrdma_mr *mr;
C
Chuck Lever 已提交
1172 1173 1174
	unsigned int count;

	count = 0;
C
Chuck Lever 已提交
1175
	spin_lock(&buf->rb_mrlock);
C
Chuck Lever 已提交
1176
	while (!list_empty(&buf->rb_all)) {
C
Chuck Lever 已提交
1177 1178
		mr = list_entry(buf->rb_all.next, struct rpcrdma_mr, mr_all);
		list_del(&mr->mr_all);
C
Chuck Lever 已提交
1179

C
Chuck Lever 已提交
1180
		spin_unlock(&buf->rb_mrlock);
1181 1182 1183 1184 1185

		/* Ensure MW is not on any rl_registered list */
		if (!list_empty(&mr->mr_list))
			list_del(&mr->mr_list);

C
Chuck Lever 已提交
1186
		ia->ri_ops->ro_release_mr(mr);
C
Chuck Lever 已提交
1187
		count++;
C
Chuck Lever 已提交
1188
		spin_lock(&buf->rb_mrlock);
C
Chuck Lever 已提交
1189
	}
C
Chuck Lever 已提交
1190
	spin_unlock(&buf->rb_mrlock);
C
Chuck Lever 已提交
1191 1192 1193 1194 1195
	r_xprt->rx_stats.mrs_allocated = 0;

	dprintk("RPC:       %s: released %u MRs\n", __func__, count);
}

1196 1197 1198
void
rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
{
1199
	cancel_delayed_work_sync(&buf->rb_refresh_worker);
1200

1201 1202
	rpcrdma_sendctxs_destroy(buf);

1203 1204
	while (!list_empty(&buf->rb_recv_bufs)) {
		struct rpcrdma_rep *rep;
1205

1206 1207 1208
		rep = list_first_entry(&buf->rb_recv_bufs,
				       struct rpcrdma_rep, rr_list);
		list_del(&rep->rr_list);
1209
		rpcrdma_destroy_rep(rep);
1210 1211
	}

1212 1213
	spin_lock(&buf->rb_reqslock);
	while (!list_empty(&buf->rb_allreqs)) {
1214
		struct rpcrdma_req *req;
A
Allen Andrews 已提交
1215

1216 1217 1218 1219 1220
		req = list_first_entry(&buf->rb_allreqs,
				       struct rpcrdma_req, rl_all);
		list_del(&req->rl_all);

		spin_unlock(&buf->rb_reqslock);
1221
		rpcrdma_destroy_req(req);
1222
		spin_lock(&buf->rb_reqslock);
1223
	}
1224
	spin_unlock(&buf->rb_reqslock);
A
Allen Andrews 已提交
1225

C
Chuck Lever 已提交
1226
	rpcrdma_mrs_destroy(buf);
1227 1228
}

C
Chuck Lever 已提交
1229 1230 1231 1232 1233 1234 1235 1236 1237
/**
 * rpcrdma_mr_get - Allocate an rpcrdma_mr object
 * @r_xprt: controlling transport
 *
 * Returns an initialized rpcrdma_mr or NULL if no free
 * rpcrdma_mr objects are available.
 */
struct rpcrdma_mr *
rpcrdma_mr_get(struct rpcrdma_xprt *r_xprt)
1238
{
1239
	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
C
Chuck Lever 已提交
1240
	struct rpcrdma_mr *mr = NULL;
1241

C
Chuck Lever 已提交
1242 1243 1244 1245
	spin_lock(&buf->rb_mrlock);
	if (!list_empty(&buf->rb_mrs))
		mr = rpcrdma_mr_pop(&buf->rb_mrs);
	spin_unlock(&buf->rb_mrlock);
1246

C
Chuck Lever 已提交
1247 1248 1249
	if (!mr)
		goto out_nomrs;
	return mr;
C
Chuck Lever 已提交
1250

C
Chuck Lever 已提交
1251
out_nomrs:
1252
	trace_xprtrdma_nomrs(r_xprt);
1253 1254
	if (r_xprt->rx_ep.rep_connected != -ENODEV)
		schedule_delayed_work(&buf->rb_refresh_worker, 0);
C
Chuck Lever 已提交
1255 1256 1257 1258 1259

	/* Allow the reply handler and refresh worker to run */
	cond_resched();

	return NULL;
1260 1261
}

1262 1263 1264 1265 1266 1267 1268 1269
static void
__rpcrdma_mr_put(struct rpcrdma_buffer *buf, struct rpcrdma_mr *mr)
{
	spin_lock(&buf->rb_mrlock);
	rpcrdma_mr_push(mr, &buf->rb_mrs);
	spin_unlock(&buf->rb_mrlock);
}

C
Chuck Lever 已提交
1270 1271 1272 1273 1274
/**
 * rpcrdma_mr_put - Release an rpcrdma_mr object
 * @mr: object to release
 *
 */
1275
void
C
Chuck Lever 已提交
1276
rpcrdma_mr_put(struct rpcrdma_mr *mr)
1277 1278 1279 1280 1281 1282 1283 1284 1285 1286 1287
{
	__rpcrdma_mr_put(&mr->mr_xprt->rx_buf, mr);
}

/**
 * rpcrdma_mr_unmap_and_put - DMA unmap an MR and release it
 * @mr: object to release
 *
 */
void
rpcrdma_mr_unmap_and_put(struct rpcrdma_mr *mr)
1288
{
C
Chuck Lever 已提交
1289
	struct rpcrdma_xprt *r_xprt = mr->mr_xprt;
1290

1291
	trace_xprtrdma_mr_unmap(mr);
1292 1293 1294
	ib_dma_unmap_sg(r_xprt->rx_ia.ri_device,
			mr->mr_sg, mr->mr_nents, mr->mr_dir);
	__rpcrdma_mr_put(&r_xprt->rx_buf, mr);
1295 1296
}

1297 1298 1299
/**
 * rpcrdma_buffer_get - Get a request buffer
 * @buffers: Buffer pool from which to obtain a buffer
1300
 *
1301
 * Returns a fresh rpcrdma_req, or NULL if none are available.
1302 1303 1304 1305 1306
 */
struct rpcrdma_req *
rpcrdma_buffer_get(struct rpcrdma_buffer *buffers)
{
	struct rpcrdma_req *req;
1307

1308
	spin_lock(&buffers->rb_lock);
1309 1310 1311 1312
	req = list_first_entry_or_null(&buffers->rb_send_bufs,
				       struct rpcrdma_req, rl_list);
	if (req)
		list_del_init(&req->rl_list);
1313
	spin_unlock(&buffers->rb_lock);
1314
	return req;
1315 1316
}

1317 1318 1319 1320
/**
 * rpcrdma_buffer_put - Put request/reply buffers back into pool
 * @req: object to return
 *
1321 1322 1323 1324 1325
 */
void
rpcrdma_buffer_put(struct rpcrdma_req *req)
{
	struct rpcrdma_buffer *buffers = req->rl_buffer;
1326
	struct rpcrdma_rep *rep = req->rl_reply;
1327

1328 1329
	req->rl_reply = NULL;

1330
	spin_lock(&buffers->rb_lock);
1331
	list_add(&req->rl_list, &buffers->rb_send_bufs);
1332
	if (rep) {
1333 1334 1335 1336
		if (!rep->rr_temp) {
			list_add(&rep->rr_list, &buffers->rb_recv_bufs);
			rep = NULL;
		}
1337
	}
1338
	spin_unlock(&buffers->rb_lock);
1339 1340
	if (rep)
		rpcrdma_destroy_rep(rep);
1341 1342 1343 1344
}

/*
 * Put reply buffers back into pool when not attached to
1345
 * request. This happens in error conditions.
1346 1347 1348 1349
 */
void
rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep)
{
1350
	struct rpcrdma_buffer *buffers = &rep->rr_rxprt->rx_buf;
1351

1352 1353 1354 1355 1356 1357 1358
	if (!rep->rr_temp) {
		spin_lock(&buffers->rb_lock);
		list_add(&rep->rr_list, &buffers->rb_recv_bufs);
		spin_unlock(&buffers->rb_lock);
	} else {
		rpcrdma_destroy_rep(rep);
	}
1359 1360
}

1361
/**
1362
 * rpcrdma_alloc_regbuf - allocate and DMA-map memory for SEND/RECV buffers
1363
 * @size: size of buffer to be allocated, in bytes
1364
 * @direction: direction of data movement
1365 1366
 * @flags: GFP flags
 *
1367 1368
 * Returns an ERR_PTR, or a pointer to a regbuf, a buffer that
 * can be persistently DMA-mapped for I/O.
1369 1370
 *
 * xprtrdma uses a regbuf for posting an outgoing RDMA SEND, or for
1371 1372
 * receiving the payload of RDMA RECV operations. During Long Calls
 * or Replies they may be registered externally via ro_map.
1373 1374
 */
struct rpcrdma_regbuf *
1375 1376
rpcrdma_alloc_regbuf(size_t size, enum dma_data_direction direction,
		     gfp_t flags)
1377 1378 1379 1380 1381
{
	struct rpcrdma_regbuf *rb;

	rb = kmalloc(sizeof(*rb) + size, flags);
	if (rb == NULL)
1382
		return ERR_PTR(-ENOMEM);
1383

1384
	rb->rg_device = NULL;
1385
	rb->rg_direction = direction;
1386
	rb->rg_iov.length = size;
1387 1388

	return rb;
1389
}
1390

1391 1392 1393 1394 1395 1396 1397 1398
/**
 * __rpcrdma_map_regbuf - DMA-map a regbuf
 * @ia: controlling rpcrdma_ia
 * @rb: regbuf to be mapped
 */
bool
__rpcrdma_dma_map_regbuf(struct rpcrdma_ia *ia, struct rpcrdma_regbuf *rb)
{
1399 1400
	struct ib_device *device = ia->ri_device;

1401 1402 1403
	if (rb->rg_direction == DMA_NONE)
		return false;

1404
	rb->rg_iov.addr = ib_dma_map_single(device,
1405 1406 1407
					    (void *)rb->rg_base,
					    rdmab_length(rb),
					    rb->rg_direction);
1408
	if (ib_dma_mapping_error(device, rdmab_addr(rb)))
1409 1410
		return false;

1411
	rb->rg_device = device;
1412 1413 1414 1415 1416 1417 1418
	rb->rg_iov.lkey = ia->ri_pd->local_dma_lkey;
	return true;
}

static void
rpcrdma_dma_unmap_regbuf(struct rpcrdma_regbuf *rb)
{
1419 1420 1421
	if (!rb)
		return;

1422 1423 1424 1425 1426 1427
	if (!rpcrdma_regbuf_is_mapped(rb))
		return;

	ib_dma_unmap_single(rb->rg_device, rdmab_addr(rb),
			    rdmab_length(rb), rb->rg_direction);
	rb->rg_device = NULL;
1428 1429 1430 1431 1432 1433 1434
}

/**
 * rpcrdma_free_regbuf - deregister and free registered buffer
 * @rb: regbuf to be deregistered and freed
 */
void
1435
rpcrdma_free_regbuf(struct rpcrdma_regbuf *rb)
1436
{
1437
	rpcrdma_dma_unmap_regbuf(rb);
1438
	kfree(rb);
1439 1440
}

1441 1442 1443 1444 1445 1446 1447 1448 1449 1450
/*
 * Prepost any receive buffer, then post send.
 *
 * Receive buffer is donated to hardware, reclaimed upon recv completion.
 */
int
rpcrdma_ep_post(struct rpcrdma_ia *ia,
		struct rpcrdma_ep *ep,
		struct rpcrdma_req *req)
{
1451
	struct ib_send_wr *send_wr = &req->rl_sendctx->sc_wr;
1452
	int rc;
1453

1454 1455
	if (!ep->rep_send_count ||
	    test_bit(RPCRDMA_REQ_F_TX_RESOURCES, &req->rl_flags)) {
1456 1457 1458 1459 1460 1461
		send_wr->send_flags |= IB_SEND_SIGNALED;
		ep->rep_send_count = ep->rep_send_batch;
	} else {
		send_wr->send_flags &= ~IB_SEND_SIGNALED;
		--ep->rep_send_count;
	}
1462

1463
	rc = ia->ri_ops->ro_send(ia, req);
1464
	trace_xprtrdma_post_send(req, rc);
1465
	if (rc)
1466
		return -ENOTCONN;
1467
	return 0;
1468 1469
}

1470
/**
1471 1472 1473
 * rpcrdma_post_recvs - Maybe post some Receive buffers
 * @r_xprt: controlling transport
 * @temp: when true, allocate temp rpcrdma_rep objects
1474 1475
 *
 */
1476 1477
void
rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp)
1478
{
1479 1480 1481
	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
	struct ib_recv_wr *wr, *bad_wr;
	int needed, count, rc;
1482

1483 1484 1485 1486
	needed = buf->rb_credits + (buf->rb_bc_srv_max_requests << 1);
	if (buf->rb_posted_receives > needed)
		return;
	needed -= buf->rb_posted_receives;
1487

1488 1489 1490 1491 1492
	count = 0;
	wr = NULL;
	while (needed) {
		struct rpcrdma_regbuf *rb;
		struct rpcrdma_rep *rep;
1493

1494 1495 1496 1497 1498 1499 1500 1501 1502 1503 1504
		spin_lock(&buf->rb_lock);
		rep = list_first_entry_or_null(&buf->rb_recv_bufs,
					       struct rpcrdma_rep, rr_list);
		if (likely(rep))
			list_del(&rep->rr_list);
		spin_unlock(&buf->rb_lock);
		if (!rep) {
			if (rpcrdma_create_rep(r_xprt, temp))
				break;
			continue;
		}
1505

1506 1507 1508 1509 1510 1511 1512
		rb = rep->rr_rdmabuf;
		if (!rpcrdma_regbuf_is_mapped(rb)) {
			if (!__rpcrdma_dma_map_regbuf(&r_xprt->rx_ia, rb)) {
				rpcrdma_recv_buffer_put(rep);
				break;
			}
		}
1513

1514 1515 1516 1517 1518 1519 1520 1521 1522
		trace_xprtrdma_post_recv(rep->rr_recv_wr.wr_cqe);
		rep->rr_recv_wr.next = wr;
		wr = &rep->rr_recv_wr;
		++count;
		--needed;
	}
	if (!count)
		return;

1523 1524
	rc = ib_post_recv(r_xprt->rx_ia.ri_id->qp, wr,
			  (const struct ib_recv_wr **)&bad_wr);
1525 1526 1527 1528 1529 1530 1531 1532 1533 1534 1535
	if (rc) {
		for (wr = bad_wr; wr; wr = wr->next) {
			struct rpcrdma_rep *rep;

			rep = container_of(wr, struct rpcrdma_rep, rr_recv_wr);
			rpcrdma_recv_buffer_put(rep);
			--count;
		}
	}
	buf->rb_posted_receives += count;
	trace_xprtrdma_post_recvs(r_xprt, count, rc);
1536
}