verbs.c 35.0 KB
Newer Older
1
/*
2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
 * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
 *
 * This software is available to you under a choice of one of two
 * licenses.  You may choose to be licensed under the terms of the GNU
 * General Public License (GPL) Version 2, available from the file
 * COPYING in the main directory of this source tree, or the BSD-type
 * license below:
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions
 * are met:
 *
 *      Redistributions of source code must retain the above copyright
 *      notice, this list of conditions and the following disclaimer.
 *
 *      Redistributions in binary form must reproduce the above
 *      copyright notice, this list of conditions and the following
 *      disclaimer in the documentation and/or other materials provided
 *      with the distribution.
 *
 *      Neither the name of the Network Appliance, Inc. nor the names of
 *      its contributors may be used to endorse or promote products
 *      derived from this software without specific prior written
 *      permission.
 *
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
38 39
 */

40 41 42 43 44 45 46 47 48 49
/*
 * verbs.c
 *
 * Encapsulates the major functions managing:
 *  o adapters
 *  o endpoints
 *  o connections
 *  o buffer memory
 */

50
#include <linux/interrupt.h>
51
#include <linux/slab.h>
52
#include <linux/prefetch.h>
53
#include <linux/sunrpc/addr.h>
54
#include <linux/sunrpc/svc_rdma.h>
55
#include <asm/bitops.h>
56
#include <linux/module.h> /* try_module_get()/module_put() */
57

58 59
#include "xprt_rdma.h"

60 61 62 63
/*
 * Globals/Macros
 */

J
Jeff Layton 已提交
64
#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
65 66 67 68 69 70 71
# define RPCDBG_FACILITY	RPCDBG_TRANS
#endif

/*
 * internal functions
 */

72
static struct workqueue_struct *rpcrdma_receive_wq;
73

74 75
int
rpcrdma_alloc_wq(void)
76
{
77
	struct workqueue_struct *recv_wq;
78

79 80 81 82 83
	recv_wq = alloc_workqueue("xprtrdma_receive",
				  WQ_MEM_RECLAIM | WQ_UNBOUND | WQ_HIGHPRI,
				  0);
	if (!recv_wq)
		return -ENOMEM;
84

85 86
	rpcrdma_receive_wq = recv_wq;
	return 0;
87 88
}

89 90
void
rpcrdma_destroy_wq(void)
91
{
92
	struct workqueue_struct *wq;
93

94 95 96 97 98
	if (rpcrdma_receive_wq) {
		wq = rpcrdma_receive_wq;
		rpcrdma_receive_wq = NULL;
		destroy_workqueue(wq);
	}
99 100
}

101 102 103 104 105
static void
rpcrdma_qp_async_error_upcall(struct ib_event *event, void *context)
{
	struct rpcrdma_ep *ep = context;

C
Chuck Lever 已提交
106
	pr_err("RPC:       %s: %s on device %s ep %p\n",
107
	       __func__, ib_event_msg(event->event),
C
Chuck Lever 已提交
108
		event->device->name, context);
109 110
	if (ep->rep_connected == 1) {
		ep->rep_connected = -EIO;
111
		rpcrdma_conn_func(ep);
112 113 114 115
		wake_up_all(&ep->rep_connect_wait);
	}
}

116 117 118 119 120
/**
 * rpcrdma_wc_send - Invoked by RDMA provider for each polled Send WC
 * @cq:	completion queue (ignored)
 * @wc:	completed WR
 *
121 122
 */
static void
123
rpcrdma_wc_send(struct ib_cq *cq, struct ib_wc *wc)
124
{
125 126 127 128 129
	/* WARNING: Only wr_cqe and status are reliable at this point */
	if (wc->status != IB_WC_SUCCESS && wc->status != IB_WC_WR_FLUSH_ERR)
		pr_err("rpcrdma: Send: %s (%u/0x%x)\n",
		       ib_wc_status_msg(wc->status),
		       wc->status, wc->vendor_err);
130
}
131

132
static void
133 134 135 136
rpcrdma_receive_worker(struct work_struct *work)
{
	struct rpcrdma_rep *rep =
			container_of(work, struct rpcrdma_rep, rr_work);
137

138
	rpcrdma_reply_handler(rep);
139 140
}

141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162
/* Perform basic sanity checking to avoid using garbage
 * to update the credit grant value.
 */
static void
rpcrdma_update_granted_credits(struct rpcrdma_rep *rep)
{
	struct rpcrdma_msg *rmsgp = rdmab_to_msg(rep->rr_rdmabuf);
	struct rpcrdma_buffer *buffer = &rep->rr_rxprt->rx_buf;
	u32 credits;

	if (rep->rr_len < RPCRDMA_HDRLEN_ERR)
		return;

	credits = be32_to_cpu(rmsgp->rm_credit);
	if (credits == 0)
		credits = 1;	/* don't deadlock */
	else if (credits > buffer->rb_max_requests)
		credits = buffer->rb_max_requests;

	atomic_set(&buffer->rb_credits, credits);
}

163 164 165 166 167 168
/**
 * rpcrdma_receive_wc - Invoked by RDMA provider for each polled Receive WC
 * @cq:	completion queue (ignored)
 * @wc:	completed WR
 *
 */
169
static void
170
rpcrdma_receive_wc(struct ib_cq *cq, struct ib_wc *wc)
171
{
172 173 174
	struct ib_cqe *cqe = wc->wr_cqe;
	struct rpcrdma_rep *rep = container_of(cqe, struct rpcrdma_rep,
					       rr_cqe);
175

176 177 178
	/* WARNING: Only wr_id and status are reliable at this point */
	if (wc->status != IB_WC_SUCCESS)
		goto out_fail;
179

180
	/* status == SUCCESS means all fields in wc are trustworthy */
181 182 183
	if (wc->opcode != IB_WC_RECV)
		return;

184 185 186
	dprintk("RPC:       %s: rep %p opcode 'recv', length %u: success\n",
		__func__, rep, wc->byte_len);

187
	rep->rr_len = wc->byte_len;
188
	ib_dma_sync_single_for_cpu(rep->rr_device,
189 190
				   rdmab_addr(rep->rr_rdmabuf),
				   rep->rr_len, DMA_FROM_DEVICE);
191 192

	rpcrdma_update_granted_credits(rep);
193 194

out_schedule:
195
	queue_work(rpcrdma_receive_wq, &rep->rr_work);
196
	return;
197

198 199
out_fail:
	if (wc->status != IB_WC_WR_FLUSH_ERR)
200 201 202
		pr_err("rpcrdma: Recv: %s (%u/0x%x)\n",
		       ib_wc_status_msg(wc->status),
		       wc->status, wc->vendor_err);
203
	rep->rr_len = RPCRDMA_BAD_LEN;
204
	goto out_schedule;
205 206
}

207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233
static void
rpcrdma_update_connect_private(struct rpcrdma_xprt *r_xprt,
			       struct rdma_conn_param *param)
{
	struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
	const struct rpcrdma_connect_private *pmsg = param->private_data;
	unsigned int rsize, wsize;

	rsize = RPCRDMA_V1_DEF_INLINE_SIZE;
	wsize = RPCRDMA_V1_DEF_INLINE_SIZE;

	if (pmsg &&
	    pmsg->cp_magic == rpcrdma_cmp_magic &&
	    pmsg->cp_version == RPCRDMA_CMP_VERSION) {
		rsize = rpcrdma_decode_buffer_size(pmsg->cp_send_size);
		wsize = rpcrdma_decode_buffer_size(pmsg->cp_recv_size);
	}

	if (rsize < cdata->inline_rsize)
		cdata->inline_rsize = rsize;
	if (wsize < cdata->inline_wsize)
		cdata->inline_wsize = wsize;
	pr_info("rpcrdma: max send %u, max recv %u\n",
		cdata->inline_wsize, cdata->inline_rsize);
	rpcrdma_set_max_header_sizes(r_xprt);
}

234 235 236 237 238 239
static int
rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event)
{
	struct rpcrdma_xprt *xprt = id->context;
	struct rpcrdma_ia *ia = &xprt->rx_ia;
	struct rpcrdma_ep *ep = &xprt->rx_ep;
J
Jeff Layton 已提交
240
#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
241
	struct sockaddr *sap = (struct sockaddr *)&ep->rep_remote_addr;
242
#endif
243 244
	struct ib_qp_attr *attr = &ia->ri_qp_attr;
	struct ib_qp_init_attr *iattr = &ia->ri_qp_init_attr;
245 246 247 248 249
	int connstate = 0;

	switch (event->event) {
	case RDMA_CM_EVENT_ADDR_RESOLVED:
	case RDMA_CM_EVENT_ROUTE_RESOLVED:
250
		ia->ri_async_rc = 0;
251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266
		complete(&ia->ri_done);
		break;
	case RDMA_CM_EVENT_ADDR_ERROR:
		ia->ri_async_rc = -EHOSTUNREACH;
		dprintk("RPC:       %s: CM address resolution error, ep 0x%p\n",
			__func__, ep);
		complete(&ia->ri_done);
		break;
	case RDMA_CM_EVENT_ROUTE_ERROR:
		ia->ri_async_rc = -ENETUNREACH;
		dprintk("RPC:       %s: CM route resolution error, ep 0x%p\n",
			__func__, ep);
		complete(&ia->ri_done);
		break;
	case RDMA_CM_EVENT_ESTABLISHED:
		connstate = 1;
267 268 269
		ib_query_qp(ia->ri_id->qp, attr,
			    IB_QP_MAX_QP_RD_ATOMIC | IB_QP_MAX_DEST_RD_ATOMIC,
			    iattr);
270 271
		dprintk("RPC:       %s: %d responder resources"
			" (%d initiator)\n",
272 273
			__func__, attr->max_dest_rd_atomic,
			attr->max_rd_atomic);
274
		rpcrdma_update_connect_private(xprt, &event->param.conn);
275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292
		goto connected;
	case RDMA_CM_EVENT_CONNECT_ERROR:
		connstate = -ENOTCONN;
		goto connected;
	case RDMA_CM_EVENT_UNREACHABLE:
		connstate = -ENETDOWN;
		goto connected;
	case RDMA_CM_EVENT_REJECTED:
		connstate = -ECONNREFUSED;
		goto connected;
	case RDMA_CM_EVENT_DISCONNECTED:
		connstate = -ECONNABORTED;
		goto connected;
	case RDMA_CM_EVENT_DEVICE_REMOVAL:
		connstate = -ENODEV;
connected:
		dprintk("RPC:       %s: %sconnected\n",
					__func__, connstate > 0 ? "" : "dis");
293
		atomic_set(&xprt->rx_buf.rb_credits, 1);
294
		ep->rep_connected = connstate;
295
		rpcrdma_conn_func(ep);
296
		wake_up_all(&ep->rep_connect_wait);
297
		/*FALLTHROUGH*/
298
	default:
299 300
		dprintk("RPC:       %s: %pIS:%u (ep 0x%p): %s\n",
			__func__, sap, rpc_get_port(sap), ep,
301
			rdma_event_msg(event->event));
302 303 304
		break;
	}

J
Jeff Layton 已提交
305
#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
306
	if (connstate == 1) {
307
		int ird = attr->max_dest_rd_atomic;
308
		int tird = ep->rep_remote_cma.responder_resources;
309

310
		pr_info("rpcrdma: connection to %pIS:%u on %s, memreg '%s', %d credits, %d responders%s\n",
311
			sap, rpc_get_port(sap),
312
			ia->ri_device->name,
313
			ia->ri_ops->ro_displayname,
314 315 316
			xprt->rx_buf.rb_max_requests,
			ird, ird < 4 && ird < tird / 2 ? " (low!)" : "");
	} else if (connstate < 0) {
317 318
		pr_info("rpcrdma: connection to %pIS:%u closed (%d)\n",
			sap, rpc_get_port(sap), connstate);
319 320 321
	}
#endif

322 323 324
	return 0;
}

325 326 327 328 329 330 331 332
static void rpcrdma_destroy_id(struct rdma_cm_id *id)
{
	if (id) {
		module_put(id->device->owner);
		rdma_destroy_id(id);
	}
}

333 334 335 336 337 338 339
static struct rdma_cm_id *
rpcrdma_create_id(struct rpcrdma_xprt *xprt,
			struct rpcrdma_ia *ia, struct sockaddr *addr)
{
	struct rdma_cm_id *id;
	int rc;

340 341
	init_completion(&ia->ri_done);

342 343
	id = rdma_create_id(&init_net, rpcrdma_conn_upcall, xprt, RDMA_PS_TCP,
			    IB_QPT_RC);
344 345 346 347 348 349 350
	if (IS_ERR(id)) {
		rc = PTR_ERR(id);
		dprintk("RPC:       %s: rdma_create_id() failed %i\n",
			__func__, rc);
		return id;
	}

351
	ia->ri_async_rc = -ETIMEDOUT;
352 353 354 355 356 357
	rc = rdma_resolve_addr(id, NULL, addr, RDMA_RESOLVE_TIMEOUT);
	if (rc) {
		dprintk("RPC:       %s: rdma_resolve_addr() failed %i\n",
			__func__, rc);
		goto out;
	}
358 359
	wait_for_completion_interruptible_timeout(&ia->ri_done,
				msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1);
360 361 362 363 364 365 366 367 368 369 370

	/* FIXME:
	 * Until xprtrdma supports DEVICE_REMOVAL, the provider must
	 * be pinned while there are active NFS/RDMA mounts to prevent
	 * hangs and crashes at umount time.
	 */
	if (!ia->ri_async_rc && !try_module_get(id->device->owner)) {
		dprintk("RPC:       %s: Failed to get device module\n",
			__func__);
		ia->ri_async_rc = -ENODEV;
	}
371 372 373 374
	rc = ia->ri_async_rc;
	if (rc)
		goto out;

375
	ia->ri_async_rc = -ETIMEDOUT;
376 377 378 379
	rc = rdma_resolve_route(id, RDMA_RESOLVE_TIMEOUT);
	if (rc) {
		dprintk("RPC:       %s: rdma_resolve_route() failed %i\n",
			__func__, rc);
380
		goto put;
381
	}
382 383
	wait_for_completion_interruptible_timeout(&ia->ri_done,
				msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1);
384 385
	rc = ia->ri_async_rc;
	if (rc)
386
		goto put;
387 388

	return id;
389 390
put:
	module_put(id->device->owner);
391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408
out:
	rdma_destroy_id(id);
	return ERR_PTR(rc);
}

/*
 * Exported functions.
 */

/*
 * Open and initialize an Interface Adapter.
 *  o initializes fields of struct rpcrdma_ia, including
 *    interface and provider attributes and protection zone.
 */
int
rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg)
{
	struct rpcrdma_ia *ia = &xprt->rx_ia;
409 410
	int rc;

411 412 413 414 415
	ia->ri_id = rpcrdma_create_id(xprt, ia, addr);
	if (IS_ERR(ia->ri_id)) {
		rc = PTR_ERR(ia->ri_id);
		goto out1;
	}
416
	ia->ri_device = ia->ri_id->device;
417

418
	ia->ri_pd = ib_alloc_pd(ia->ri_device);
419 420
	if (IS_ERR(ia->ri_pd)) {
		rc = PTR_ERR(ia->ri_pd);
421
		pr_err("rpcrdma: ib_alloc_pd() returned %d\n", rc);
422 423 424
		goto out2;
	}

425
	switch (memreg) {
426
	case RPCRDMA_FRMR:
427 428 429 430 431
		if (frwr_is_supported(ia)) {
			ia->ri_ops = &rpcrdma_frwr_memreg_ops;
			break;
		}
		/*FALLTHROUGH*/
432
	case RPCRDMA_MTHCAFMR:
433 434 435 436 437
		if (fmr_is_supported(ia)) {
			ia->ri_ops = &rpcrdma_fmr_memreg_ops;
			break;
		}
		/*FALLTHROUGH*/
438
	default:
439 440 441
		pr_err("rpcrdma: Unsupported memory registration mode: %d\n",
		       memreg);
		rc = -EINVAL;
442
		goto out3;
443 444 445
	}

	return 0;
446 447 448 449

out3:
	ib_dealloc_pd(ia->ri_pd);
	ia->ri_pd = NULL;
450
out2:
451
	rpcrdma_destroy_id(ia->ri_id);
452
	ia->ri_id = NULL;
453 454 455 456 457 458 459 460 461 462 463 464 465
out1:
	return rc;
}

/*
 * Clean up/close an IA.
 *   o if event handles and PD have been initialized, free them.
 *   o close the IA
 */
void
rpcrdma_ia_close(struct rpcrdma_ia *ia)
{
	dprintk("RPC:       %s: entering\n", __func__);
466 467 468
	if (ia->ri_id != NULL && !IS_ERR(ia->ri_id)) {
		if (ia->ri_id->qp)
			rdma_destroy_qp(ia->ri_id);
469
		rpcrdma_destroy_id(ia->ri_id);
470 471
		ia->ri_id = NULL;
	}
472 473 474

	/* If the pd is still busy, xprtrdma missed freeing a resource */
	if (ia->ri_pd && !IS_ERR(ia->ri_pd))
475
		ib_dealloc_pd(ia->ri_pd);
476 477 478 479 480 481 482 483 484
}

/*
 * Create unconnected endpoint.
 */
int
rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
				struct rpcrdma_create_data_internal *cdata)
{
485
	struct rpcrdma_connect_private *pmsg = &ep->rep_cm_private;
486
	struct ib_cq *sendcq, *recvcq;
487
	unsigned int max_qp_wr;
488
	int rc;
489

490
	if (ia->ri_device->attrs.max_sge < RPCRDMA_MAX_IOVS) {
491 492 493 494 495
		dprintk("RPC:       %s: insufficient sge's available\n",
			__func__);
		return -ENOMEM;
	}

496
	if (ia->ri_device->attrs.max_qp_wr <= RPCRDMA_BACKWARD_WRS) {
497 498 499 500
		dprintk("RPC:       %s: insufficient wqe's available\n",
			__func__);
		return -ENOMEM;
	}
501
	max_qp_wr = ia->ri_device->attrs.max_qp_wr - RPCRDMA_BACKWARD_WRS - 1;
502

503
	/* check provider's send/recv wr limits */
504 505
	if (cdata->max_requests > max_qp_wr)
		cdata->max_requests = max_qp_wr;
506 507 508 509 510

	ep->rep_attr.event_handler = rpcrdma_qp_async_error_upcall;
	ep->rep_attr.qp_context = ep;
	ep->rep_attr.srq = NULL;
	ep->rep_attr.cap.max_send_wr = cdata->max_requests;
511
	ep->rep_attr.cap.max_send_wr += RPCRDMA_BACKWARD_WRS;
512
	ep->rep_attr.cap.max_send_wr += 1;	/* drain cqe */
C
Chuck Lever 已提交
513 514 515
	rc = ia->ri_ops->ro_open(ia, ep, cdata);
	if (rc)
		return rc;
516
	ep->rep_attr.cap.max_recv_wr = cdata->max_requests;
517
	ep->rep_attr.cap.max_recv_wr += RPCRDMA_BACKWARD_WRS;
518
	ep->rep_attr.cap.max_recv_wr += 1;	/* drain cqe */
519
	ep->rep_attr.cap.max_send_sge = RPCRDMA_MAX_IOVS;
520 521 522 523 524 525 526 527 528 529 530 531 532 533 534
	ep->rep_attr.cap.max_recv_sge = 1;
	ep->rep_attr.cap.max_inline_data = 0;
	ep->rep_attr.sq_sig_type = IB_SIGNAL_REQ_WR;
	ep->rep_attr.qp_type = IB_QPT_RC;
	ep->rep_attr.port_num = ~0;

	dprintk("RPC:       %s: requested max: dtos: send %d recv %d; "
		"iovs: send %d recv %d\n",
		__func__,
		ep->rep_attr.cap.max_send_wr,
		ep->rep_attr.cap.max_recv_wr,
		ep->rep_attr.cap.max_send_sge,
		ep->rep_attr.cap.max_recv_sge);

	/* set trigger for requesting send completion */
535
	ep->rep_cqinit = ep->rep_attr.cap.max_send_wr/2 - 1;
536 537
	if (ep->rep_cqinit <= 2)
		ep->rep_cqinit = 0;	/* always signal? */
538 539
	INIT_CQCOUNT(ep);
	init_waitqueue_head(&ep->rep_connect_wait);
540
	INIT_DELAYED_WORK(&ep->rep_connect_worker, rpcrdma_connect_worker);
541

542 543 544
	sendcq = ib_alloc_cq(ia->ri_device, NULL,
			     ep->rep_attr.cap.max_send_wr + 1,
			     0, IB_POLL_SOFTIRQ);
545 546 547
	if (IS_ERR(sendcq)) {
		rc = PTR_ERR(sendcq);
		dprintk("RPC:       %s: failed to create send CQ: %i\n",
548 549 550 551
			__func__, rc);
		goto out1;
	}

552 553 554
	recvcq = ib_alloc_cq(ia->ri_device, NULL,
			     ep->rep_attr.cap.max_recv_wr + 1,
			     0, IB_POLL_SOFTIRQ);
555 556 557 558 559 560 561 562 563
	if (IS_ERR(recvcq)) {
		rc = PTR_ERR(recvcq);
		dprintk("RPC:       %s: failed to create recv CQ: %i\n",
			__func__, rc);
		goto out2;
	}

	ep->rep_attr.send_cq = sendcq;
	ep->rep_attr.recv_cq = recvcq;
564 565

	/* Initialize cma parameters */
566
	memset(&ep->rep_remote_cma, 0, sizeof(ep->rep_remote_cma));
567

568 569 570 571 572 573 574 575
	/* Prepare RDMA-CM private message */
	pmsg->cp_magic = rpcrdma_cmp_magic;
	pmsg->cp_version = RPCRDMA_CMP_VERSION;
	pmsg->cp_flags = 0;
	pmsg->cp_send_size = rpcrdma_encode_buffer_size(cdata->inline_wsize);
	pmsg->cp_recv_size = rpcrdma_encode_buffer_size(cdata->inline_rsize);
	ep->rep_remote_cma.private_data = pmsg;
	ep->rep_remote_cma.private_data_len = sizeof(*pmsg);
576 577

	/* Client offers RDMA Read but does not initiate */
578
	ep->rep_remote_cma.initiator_depth = 0;
579
	if (ia->ri_device->attrs.max_qp_rd_atom > 32)	/* arbitrary but <= 255 */
580 581
		ep->rep_remote_cma.responder_resources = 32;
	else
582
		ep->rep_remote_cma.responder_resources =
583
						ia->ri_device->attrs.max_qp_rd_atom;
584

585 586 587 588 589 590 591 592 593 594
	/* Limit transport retries so client can detect server
	 * GID changes quickly. RPC layer handles re-establishing
	 * transport connection and retransmission.
	 */
	ep->rep_remote_cma.retry_count = 6;

	/* RPC-over-RDMA handles its own flow control. In addition,
	 * make all RNR NAKs visible so we know that RPC-over-RDMA
	 * flow control is working correctly (no NAKs should be seen).
	 */
595 596 597 598 599 600
	ep->rep_remote_cma.flow_control = 0;
	ep->rep_remote_cma.rnr_retry_count = 0;

	return 0;

out2:
601
	ib_free_cq(sendcq);
602 603 604 605 606 607 608 609 610 611 612
out1:
	return rc;
}

/*
 * rpcrdma_ep_destroy
 *
 * Disconnect and destroy endpoint. After this, the only
 * valid operations on the ep are to free it (if dynamically
 * allocated) or re-create it.
 */
613
void
614 615 616 617 618
rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
{
	dprintk("RPC:       %s: entering, connected is %d\n",
		__func__, ep->rep_connected);

619 620
	cancel_delayed_work_sync(&ep->rep_connect_worker);

621
	if (ia->ri_id->qp) {
622
		rpcrdma_ep_disconnect(ep, ia);
623 624
		rdma_destroy_qp(ia->ri_id);
		ia->ri_id->qp = NULL;
625 626
	}

627
	ib_free_cq(ep->rep_attr.recv_cq);
628
	ib_free_cq(ep->rep_attr.send_cq);
629 630 631 632 633 634 635 636
}

/*
 * Connect unconnected endpoint.
 */
int
rpcrdma_ep_connect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
{
637
	struct rdma_cm_id *id, *old;
638 639 640
	int rc = 0;
	int retry_count = 0;

641
	if (ep->rep_connected != 0) {
642 643
		struct rpcrdma_xprt *xprt;
retry:
644
		dprintk("RPC:       %s: reconnecting...\n", __func__);
645 646

		rpcrdma_ep_disconnect(ep, ia);
647 648 649 650 651

		xprt = container_of(ia, struct rpcrdma_xprt, rx_ia);
		id = rpcrdma_create_id(xprt, ia,
				(struct sockaddr *)&xprt->rx_data.addr);
		if (IS_ERR(id)) {
652
			rc = -EHOSTUNREACH;
653 654 655 656 657 658 659 660 661
			goto out;
		}
		/* TEMP TEMP TEMP - fail if new device:
		 * Deregister/remarshal *all* requests!
		 * Close and recreate adapter, pd, etc!
		 * Re-determine all attributes still sane!
		 * More stuff I haven't thought of!
		 * Rrrgh!
		 */
662
		if (ia->ri_device != id->device) {
663 664
			printk("RPC:       %s: can't reconnect on "
				"different device!\n", __func__);
665
			rpcrdma_destroy_id(id);
666
			rc = -ENETUNREACH;
667 668 669
			goto out;
		}
		/* END TEMP */
670 671 672 673
		rc = rdma_create_qp(id, ia->ri_pd, &ep->rep_attr);
		if (rc) {
			dprintk("RPC:       %s: rdma_create_qp failed %i\n",
				__func__, rc);
674
			rpcrdma_destroy_id(id);
675 676 677
			rc = -ENETUNREACH;
			goto out;
		}
678 679

		old = ia->ri_id;
680
		ia->ri_id = id;
681 682

		rdma_destroy_qp(old);
683
		rpcrdma_destroy_id(old);
684 685 686 687 688 689 690 691 692
	} else {
		dprintk("RPC:       %s: connecting...\n", __func__);
		rc = rdma_create_qp(ia->ri_id, ia->ri_pd, &ep->rep_attr);
		if (rc) {
			dprintk("RPC:       %s: rdma_create_qp failed %i\n",
				__func__, rc);
			/* do not update ep->rep_connected */
			return -ENETUNREACH;
		}
693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711
	}

	ep->rep_connected = 0;

	rc = rdma_connect(ia->ri_id, &ep->rep_remote_cma);
	if (rc) {
		dprintk("RPC:       %s: rdma_connect() failed with %i\n",
				__func__, rc);
		goto out;
	}

	wait_event_interruptible(ep->rep_connect_wait, ep->rep_connected != 0);

	/*
	 * Check state. A non-peer reject indicates no listener
	 * (ECONNREFUSED), which may be a transient state. All
	 * others indicate a transport condition which has already
	 * undergone a best-effort.
	 */
712 713
	if (ep->rep_connected == -ECONNREFUSED &&
	    ++retry_count <= RDMA_CONNECT_RETRY_MAX) {
714 715 716 717 718 719
		dprintk("RPC:       %s: non-peer_reject, retry\n", __func__);
		goto retry;
	}
	if (ep->rep_connected <= 0) {
		/* Sometimes, the only way to reliably connect to remote
		 * CMs is to use same nonzero values for ORD and IRD. */
720 721 722 723 724 725 726 727
		if (retry_count++ <= RDMA_CONNECT_RETRY_MAX + 1 &&
		    (ep->rep_remote_cma.responder_resources == 0 ||
		     ep->rep_remote_cma.initiator_depth !=
				ep->rep_remote_cma.responder_resources)) {
			if (ep->rep_remote_cma.responder_resources == 0)
				ep->rep_remote_cma.responder_resources = 1;
			ep->rep_remote_cma.initiator_depth =
				ep->rep_remote_cma.responder_resources;
728
			goto retry;
729
		}
730 731
		rc = ep->rep_connected;
	} else {
732 733 734
		struct rpcrdma_xprt *r_xprt;
		unsigned int extras;

735
		dprintk("RPC:       %s: connected\n", __func__);
736 737 738 739 740 741

		r_xprt = container_of(ia, struct rpcrdma_xprt, rx_ia);
		extras = r_xprt->rx_buf.rb_bc_srv_max_requests;

		if (extras) {
			rc = rpcrdma_ep_post_extra_recv(r_xprt, extras);
742
			if (rc) {
743 744 745
				pr_warn("%s: rpcrdma_ep_post_extra_recv: %i\n",
					__func__, rc);
				rc = 0;
746
			}
747
		}
748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764
	}

out:
	if (rc)
		ep->rep_connected = rc;
	return rc;
}

/*
 * rpcrdma_ep_disconnect
 *
 * This is separate from destroy to facilitate the ability
 * to reconnect without recreating the endpoint.
 *
 * This call is not reentrant, and must not be made in parallel
 * on the same endpoint.
 */
765
void
766 767 768 769 770 771 772 773 774 775 776 777 778 779 780
rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
{
	int rc;

	rc = rdma_disconnect(ia->ri_id);
	if (!rc) {
		/* returns without wait if not connected */
		wait_event_interruptible(ep->rep_connect_wait,
							ep->rep_connected != 1);
		dprintk("RPC:       %s: after wait, %sconnected\n", __func__,
			(ep->rep_connected == 1) ? "still " : "dis");
	} else {
		dprintk("RPC:       %s: rdma_disconnect %i\n", __func__, rc);
		ep->rep_connected = rc;
	}
781 782

	ib_drain_qp(ia->ri_id->qp);
783 784
}

785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802
static void
rpcrdma_mr_recovery_worker(struct work_struct *work)
{
	struct rpcrdma_buffer *buf = container_of(work, struct rpcrdma_buffer,
						  rb_recovery_worker.work);
	struct rpcrdma_mw *mw;

	spin_lock(&buf->rb_recovery_lock);
	while (!list_empty(&buf->rb_stale_mrs)) {
		mw = list_first_entry(&buf->rb_stale_mrs,
				      struct rpcrdma_mw, mw_list);
		list_del_init(&mw->mw_list);
		spin_unlock(&buf->rb_recovery_lock);

		dprintk("RPC:       %s: recovering MR %p\n", __func__, mw);
		mw->mw_xprt->rx_ia.ri_ops->ro_recover_mr(mw);

		spin_lock(&buf->rb_recovery_lock);
803
	}
804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819
	spin_unlock(&buf->rb_recovery_lock);
}

void
rpcrdma_defer_mr_recovery(struct rpcrdma_mw *mw)
{
	struct rpcrdma_xprt *r_xprt = mw->mw_xprt;
	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;

	spin_lock(&buf->rb_recovery_lock);
	list_add(&mw->mw_list, &buf->rb_stale_mrs);
	spin_unlock(&buf->rb_recovery_lock);

	schedule_delayed_work(&buf->rb_recovery_worker, 0);
}

C
Chuck Lever 已提交
820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868
static void
rpcrdma_create_mrs(struct rpcrdma_xprt *r_xprt)
{
	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
	unsigned int count;
	LIST_HEAD(free);
	LIST_HEAD(all);

	for (count = 0; count < 32; count++) {
		struct rpcrdma_mw *mw;
		int rc;

		mw = kzalloc(sizeof(*mw), GFP_KERNEL);
		if (!mw)
			break;

		rc = ia->ri_ops->ro_init_mr(ia, mw);
		if (rc) {
			kfree(mw);
			break;
		}

		mw->mw_xprt = r_xprt;

		list_add(&mw->mw_list, &free);
		list_add(&mw->mw_all, &all);
	}

	spin_lock(&buf->rb_mwlock);
	list_splice(&free, &buf->rb_mws);
	list_splice(&all, &buf->rb_all);
	r_xprt->rx_stats.mrs_allocated += count;
	spin_unlock(&buf->rb_mwlock);

	dprintk("RPC:       %s: created %u MRs\n", __func__, count);
}

static void
rpcrdma_mr_refresh_worker(struct work_struct *work)
{
	struct rpcrdma_buffer *buf = container_of(work, struct rpcrdma_buffer,
						  rb_refresh_worker.work);
	struct rpcrdma_xprt *r_xprt = container_of(buf, struct rpcrdma_xprt,
						   rx_buf);

	rpcrdma_create_mrs(r_xprt);
}

869
struct rpcrdma_req *
870 871
rpcrdma_create_req(struct rpcrdma_xprt *r_xprt)
{
872
	struct rpcrdma_buffer *buffer = &r_xprt->rx_buf;
873 874
	struct rpcrdma_req *req;

875
	req = kzalloc(sizeof(*req), GFP_KERNEL);
876
	if (req == NULL)
877
		return ERR_PTR(-ENOMEM);
878

879 880 881 882
	INIT_LIST_HEAD(&req->rl_free);
	spin_lock(&buffer->rb_reqslock);
	list_add(&req->rl_all, &buffer->rb_allreqs);
	spin_unlock(&buffer->rb_reqslock);
883
	req->rl_cqe.done = rpcrdma_wc_send;
884
	req->rl_buffer = &r_xprt->rx_buf;
885
	INIT_LIST_HEAD(&req->rl_registered);
886 887 888 889
	req->rl_send_wr.next = NULL;
	req->rl_send_wr.wr_cqe = &req->rl_cqe;
	req->rl_send_wr.sg_list = req->rl_send_iov;
	req->rl_send_wr.opcode = IB_WR_SEND;
890 891 892
	return req;
}

893
struct rpcrdma_rep *
894 895 896 897 898 899 900 901
rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt)
{
	struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
	struct rpcrdma_rep *rep;
	int rc;

	rc = -ENOMEM;
902
	rep = kzalloc(sizeof(*rep), GFP_KERNEL);
903 904 905
	if (rep == NULL)
		goto out;

906
	rep->rr_rdmabuf = rpcrdma_alloc_regbuf(cdata->inline_rsize,
907
					       DMA_FROM_DEVICE, GFP_KERNEL);
908 909
	if (IS_ERR(rep->rr_rdmabuf)) {
		rc = PTR_ERR(rep->rr_rdmabuf);
910
		goto out_free;
911
	}
912

913
	rep->rr_device = ia->ri_device;
914
	rep->rr_cqe.done = rpcrdma_receive_wc;
915
	rep->rr_rxprt = r_xprt;
916
	INIT_WORK(&rep->rr_work, rpcrdma_receive_worker);
917 918 919 920
	rep->rr_recv_wr.next = NULL;
	rep->rr_recv_wr.wr_cqe = &rep->rr_cqe;
	rep->rr_recv_wr.sg_list = &rep->rr_rdmabuf->rg_iov;
	rep->rr_recv_wr.num_sge = 1;
921 922 923 924 925 926 927 928
	return rep;

out_free:
	kfree(rep);
out:
	return ERR_PTR(rc);
}

929
int
930
rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
931
{
932
	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
933 934
	int i, rc;

935
	buf->rb_max_requests = r_xprt->rx_data.max_requests;
936
	buf->rb_bc_srv_max_requests = 0;
937
	atomic_set(&buf->rb_credits, 1);
C
Chuck Lever 已提交
938
	spin_lock_init(&buf->rb_mwlock);
939 940
	spin_lock_init(&buf->rb_lock);
	spin_lock_init(&buf->rb_recovery_lock);
C
Chuck Lever 已提交
941 942
	INIT_LIST_HEAD(&buf->rb_mws);
	INIT_LIST_HEAD(&buf->rb_all);
943
	INIT_LIST_HEAD(&buf->rb_stale_mrs);
C
Chuck Lever 已提交
944 945
	INIT_DELAYED_WORK(&buf->rb_refresh_worker,
			  rpcrdma_mr_refresh_worker);
946 947
	INIT_DELAYED_WORK(&buf->rb_recovery_worker,
			  rpcrdma_mr_recovery_worker);
948

C
Chuck Lever 已提交
949
	rpcrdma_create_mrs(r_xprt);
950

951
	INIT_LIST_HEAD(&buf->rb_send_bufs);
952 953
	INIT_LIST_HEAD(&buf->rb_allreqs);
	spin_lock_init(&buf->rb_reqslock);
954 955 956
	for (i = 0; i < buf->rb_max_requests; i++) {
		struct rpcrdma_req *req;

957 958
		req = rpcrdma_create_req(r_xprt);
		if (IS_ERR(req)) {
959 960
			dprintk("RPC:       %s: request buffer %d alloc"
				" failed\n", __func__, i);
961
			rc = PTR_ERR(req);
962 963
			goto out;
		}
964
		req->rl_backchannel = false;
965 966 967 968
		list_add(&req->rl_free, &buf->rb_send_bufs);
	}

	INIT_LIST_HEAD(&buf->rb_recv_bufs);
969
	for (i = 0; i < buf->rb_max_requests + RPCRDMA_MAX_BC_REQUESTS; i++) {
970
		struct rpcrdma_rep *rep;
971

972 973
		rep = rpcrdma_create_rep(r_xprt);
		if (IS_ERR(rep)) {
974 975
			dprintk("RPC:       %s: reply buffer %d alloc failed\n",
				__func__, i);
976
			rc = PTR_ERR(rep);
977 978
			goto out;
		}
979
		list_add(&rep->rr_list, &buf->rb_recv_bufs);
980
	}
981

982 983 984 985 986 987
	return 0;
out:
	rpcrdma_buffer_destroy(buf);
	return rc;
}

988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009
static struct rpcrdma_req *
rpcrdma_buffer_get_req_locked(struct rpcrdma_buffer *buf)
{
	struct rpcrdma_req *req;

	req = list_first_entry(&buf->rb_send_bufs,
			       struct rpcrdma_req, rl_free);
	list_del(&req->rl_free);
	return req;
}

static struct rpcrdma_rep *
rpcrdma_buffer_get_rep_locked(struct rpcrdma_buffer *buf)
{
	struct rpcrdma_rep *rep;

	rep = list_first_entry(&buf->rb_recv_bufs,
			       struct rpcrdma_rep, rr_list);
	list_del(&rep->rr_list);
	return rep;
}

1010
static void
1011
rpcrdma_destroy_rep(struct rpcrdma_rep *rep)
1012
{
1013
	rpcrdma_free_regbuf(rep->rr_rdmabuf);
1014 1015 1016
	kfree(rep);
}

1017
void
1018
rpcrdma_destroy_req(struct rpcrdma_req *req)
1019
{
1020 1021 1022
	rpcrdma_free_regbuf(req->rl_recvbuf);
	rpcrdma_free_regbuf(req->rl_sendbuf);
	rpcrdma_free_regbuf(req->rl_rdmabuf);
1023 1024 1025
	kfree(req);
}

C
Chuck Lever 已提交
1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051
static void
rpcrdma_destroy_mrs(struct rpcrdma_buffer *buf)
{
	struct rpcrdma_xprt *r_xprt = container_of(buf, struct rpcrdma_xprt,
						   rx_buf);
	struct rpcrdma_ia *ia = rdmab_to_ia(buf);
	struct rpcrdma_mw *mw;
	unsigned int count;

	count = 0;
	spin_lock(&buf->rb_mwlock);
	while (!list_empty(&buf->rb_all)) {
		mw = list_entry(buf->rb_all.next, struct rpcrdma_mw, mw_all);
		list_del(&mw->mw_all);

		spin_unlock(&buf->rb_mwlock);
		ia->ri_ops->ro_release_mr(mw);
		count++;
		spin_lock(&buf->rb_mwlock);
	}
	spin_unlock(&buf->rb_mwlock);
	r_xprt->rx_stats.mrs_allocated = 0;

	dprintk("RPC:       %s: released %u MRs\n", __func__, count);
}

1052 1053 1054
void
rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
{
1055 1056
	cancel_delayed_work_sync(&buf->rb_recovery_worker);

1057 1058
	while (!list_empty(&buf->rb_recv_bufs)) {
		struct rpcrdma_rep *rep;
1059

1060
		rep = rpcrdma_buffer_get_rep_locked(buf);
1061
		rpcrdma_destroy_rep(rep);
1062
	}
1063
	buf->rb_send_count = 0;
1064

1065 1066
	spin_lock(&buf->rb_reqslock);
	while (!list_empty(&buf->rb_allreqs)) {
1067
		struct rpcrdma_req *req;
A
Allen Andrews 已提交
1068

1069 1070 1071 1072 1073
		req = list_first_entry(&buf->rb_allreqs,
				       struct rpcrdma_req, rl_all);
		list_del(&req->rl_all);

		spin_unlock(&buf->rb_reqslock);
1074
		rpcrdma_destroy_req(req);
1075
		spin_lock(&buf->rb_reqslock);
1076
	}
1077
	spin_unlock(&buf->rb_reqslock);
1078
	buf->rb_recv_count = 0;
A
Allen Andrews 已提交
1079

C
Chuck Lever 已提交
1080
	rpcrdma_destroy_mrs(buf);
1081 1082
}

1083 1084
struct rpcrdma_mw *
rpcrdma_get_mw(struct rpcrdma_xprt *r_xprt)
1085
{
1086 1087 1088
	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
	struct rpcrdma_mw *mw = NULL;

C
Chuck Lever 已提交
1089
	spin_lock(&buf->rb_mwlock);
1090 1091 1092 1093
	if (!list_empty(&buf->rb_mws)) {
		mw = list_first_entry(&buf->rb_mws,
				      struct rpcrdma_mw, mw_list);
		list_del_init(&mw->mw_list);
1094
	}
C
Chuck Lever 已提交
1095
	spin_unlock(&buf->rb_mwlock);
1096 1097

	if (!mw)
C
Chuck Lever 已提交
1098
		goto out_nomws;
1099
	return mw;
C
Chuck Lever 已提交
1100 1101 1102 1103 1104 1105 1106 1107 1108

out_nomws:
	dprintk("RPC:       %s: no MWs available\n", __func__);
	schedule_delayed_work(&buf->rb_refresh_worker, 0);

	/* Allow the reply handler and refresh worker to run */
	cond_resched();

	return NULL;
1109 1110
}

1111 1112
void
rpcrdma_put_mw(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mw *mw)
1113
{
1114
	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
1115

C
Chuck Lever 已提交
1116
	spin_lock(&buf->rb_mwlock);
1117
	list_add_tail(&mw->mw_list, &buf->rb_mws);
C
Chuck Lever 已提交
1118
	spin_unlock(&buf->rb_mwlock);
1119 1120
}

1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137
static struct rpcrdma_rep *
rpcrdma_buffer_get_rep(struct rpcrdma_buffer *buffers)
{
	/* If an RPC previously completed without a reply (say, a
	 * credential problem or a soft timeout occurs) then hold off
	 * on supplying more Receive buffers until the number of new
	 * pending RPCs catches up to the number of posted Receives.
	 */
	if (unlikely(buffers->rb_send_count < buffers->rb_recv_count))
		return NULL;

	if (unlikely(list_empty(&buffers->rb_recv_bufs)))
		return NULL;
	buffers->rb_recv_count++;
	return rpcrdma_buffer_get_rep_locked(buffers);
}

1138 1139
/*
 * Get a set of request/reply buffers.
1140 1141
 *
 * Reply buffer (if available) is attached to send buffer upon return.
1142 1143 1144 1145 1146
 */
struct rpcrdma_req *
rpcrdma_buffer_get(struct rpcrdma_buffer *buffers)
{
	struct rpcrdma_req *req;
1147

1148
	spin_lock(&buffers->rb_lock);
1149 1150
	if (list_empty(&buffers->rb_send_bufs))
		goto out_reqbuf;
1151
	buffers->rb_send_count++;
1152
	req = rpcrdma_buffer_get_req_locked(buffers);
1153
	req->rl_reply = rpcrdma_buffer_get_rep(buffers);
1154
	spin_unlock(&buffers->rb_lock);
1155
	return req;
1156

1157
out_reqbuf:
1158
	spin_unlock(&buffers->rb_lock);
1159
	pr_warn("RPC:       %s: out of request buffers\n", __func__);
1160
	return NULL;
1161 1162 1163 1164 1165 1166 1167 1168 1169 1170
}

/*
 * Put request/reply buffers back into pool.
 * Pre-decrement counter/array index.
 */
void
rpcrdma_buffer_put(struct rpcrdma_req *req)
{
	struct rpcrdma_buffer *buffers = req->rl_buffer;
1171
	struct rpcrdma_rep *rep = req->rl_reply;
1172

1173
	req->rl_send_wr.num_sge = 0;
1174 1175
	req->rl_reply = NULL;

1176
	spin_lock(&buffers->rb_lock);
1177
	buffers->rb_send_count--;
1178
	list_add_tail(&req->rl_free, &buffers->rb_send_bufs);
1179 1180
	if (rep) {
		buffers->rb_recv_count--;
1181
		list_add_tail(&rep->rr_list, &buffers->rb_recv_bufs);
1182
	}
1183
	spin_unlock(&buffers->rb_lock);
1184 1185 1186 1187
}

/*
 * Recover reply buffers from pool.
1188
 * This happens when recovering from disconnect.
1189 1190 1191 1192 1193 1194
 */
void
rpcrdma_recv_buffer_get(struct rpcrdma_req *req)
{
	struct rpcrdma_buffer *buffers = req->rl_buffer;

1195
	spin_lock(&buffers->rb_lock);
1196
	req->rl_reply = rpcrdma_buffer_get_rep(buffers);
1197
	spin_unlock(&buffers->rb_lock);
1198 1199 1200 1201
}

/*
 * Put reply buffers back into pool when not attached to
1202
 * request. This happens in error conditions.
1203 1204 1205 1206
 */
void
rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep)
{
1207
	struct rpcrdma_buffer *buffers = &rep->rr_rxprt->rx_buf;
1208

1209
	spin_lock(&buffers->rb_lock);
1210
	buffers->rb_recv_count--;
1211
	list_add_tail(&rep->rr_list, &buffers->rb_recv_bufs);
1212
	spin_unlock(&buffers->rb_lock);
1213 1214
}

1215
/**
1216
 * rpcrdma_alloc_regbuf - allocate and DMA-map memory for SEND/RECV buffers
1217
 * @size: size of buffer to be allocated, in bytes
1218
 * @direction: direction of data movement
1219 1220
 * @flags: GFP flags
 *
1221 1222
 * Returns an ERR_PTR, or a pointer to a regbuf, a buffer that
 * can be persistently DMA-mapped for I/O.
1223 1224
 *
 * xprtrdma uses a regbuf for posting an outgoing RDMA SEND, or for
1225 1226
 * receiving the payload of RDMA RECV operations. During Long Calls
 * or Replies they may be registered externally via ro_map.
1227 1228
 */
struct rpcrdma_regbuf *
1229 1230
rpcrdma_alloc_regbuf(size_t size, enum dma_data_direction direction,
		     gfp_t flags)
1231 1232 1233 1234 1235
{
	struct rpcrdma_regbuf *rb;

	rb = kmalloc(sizeof(*rb) + size, flags);
	if (rb == NULL)
1236
		return ERR_PTR(-ENOMEM);
1237

1238
	rb->rg_device = NULL;
1239
	rb->rg_direction = direction;
1240
	rb->rg_iov.length = size;
1241

1242
	return rb;
1243
}
1244

1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274 1275 1276
/**
 * __rpcrdma_map_regbuf - DMA-map a regbuf
 * @ia: controlling rpcrdma_ia
 * @rb: regbuf to be mapped
 */
bool
__rpcrdma_dma_map_regbuf(struct rpcrdma_ia *ia, struct rpcrdma_regbuf *rb)
{
	if (rb->rg_direction == DMA_NONE)
		return false;

	rb->rg_iov.addr = ib_dma_map_single(ia->ri_device,
					    (void *)rb->rg_base,
					    rdmab_length(rb),
					    rb->rg_direction);
	if (ib_dma_mapping_error(ia->ri_device, rdmab_addr(rb)))
		return false;

	rb->rg_device = ia->ri_device;
	rb->rg_iov.lkey = ia->ri_pd->local_dma_lkey;
	return true;
}

static void
rpcrdma_dma_unmap_regbuf(struct rpcrdma_regbuf *rb)
{
	if (!rpcrdma_regbuf_is_mapped(rb))
		return;

	ib_dma_unmap_single(rb->rg_device, rdmab_addr(rb),
			    rdmab_length(rb), rb->rg_direction);
	rb->rg_device = NULL;
1277 1278 1279 1280 1281 1282 1283
}

/**
 * rpcrdma_free_regbuf - deregister and free registered buffer
 * @rb: regbuf to be deregistered and freed
 */
void
1284
rpcrdma_free_regbuf(struct rpcrdma_regbuf *rb)
1285
{
1286 1287 1288
	if (!rb)
		return;

1289
	rpcrdma_dma_unmap_regbuf(rb);
1290
	kfree(rb);
1291 1292
}

1293 1294 1295 1296 1297 1298 1299 1300 1301 1302
/*
 * Prepost any receive buffer, then post send.
 *
 * Receive buffer is donated to hardware, reclaimed upon recv completion.
 */
int
rpcrdma_ep_post(struct rpcrdma_ia *ia,
		struct rpcrdma_ep *ep,
		struct rpcrdma_req *req)
{
1303
	struct ib_device *device = ia->ri_device;
1304 1305 1306
	struct ib_send_wr *send_wr = &req->rl_send_wr;
	struct ib_send_wr *send_wr_fail;
	struct ib_sge *sge = req->rl_send_iov;
1307
	int i, rc;
1308

1309 1310
	if (req->rl_reply) {
		rc = rpcrdma_ep_post_recv(ia, req->rl_reply);
1311
		if (rc)
1312
			return rc;
1313 1314 1315
		req->rl_reply = NULL;
	}

1316 1317 1318
	for (i = 0; i < send_wr->num_sge; i++)
		ib_dma_sync_single_for_device(device, sge[i].addr,
					      sge[i].length, DMA_TO_DEVICE);
1319
	dprintk("RPC:       %s: posting %d s/g entries\n",
1320
		__func__, send_wr->num_sge);
1321 1322

	if (DECR_CQCOUNT(ep) > 0)
1323
		send_wr->send_flags = 0;
1324 1325
	else { /* Provider must take a send completion every now and then */
		INIT_CQCOUNT(ep);
1326
		send_wr->send_flags = IB_SEND_SIGNALED;
1327 1328
	}

1329
	rc = ib_post_send(ia->ri_id->qp, send_wr, &send_wr_fail);
1330
	if (rc)
1331 1332 1333 1334 1335 1336
		goto out_postsend_err;
	return 0;

out_postsend_err:
	pr_err("rpcrdma: RDMA Send ib_post_send returned %i\n", rc);
	return -ENOTCONN;
1337 1338 1339 1340 1341 1342
}

int
rpcrdma_ep_post_recv(struct rpcrdma_ia *ia,
		     struct rpcrdma_rep *rep)
{
1343
	struct ib_recv_wr *recv_wr_fail;
1344 1345
	int rc;

1346 1347
	if (!rpcrdma_dma_map_regbuf(ia, rep->rr_rdmabuf))
		goto out_map;
1348
	rc = ib_post_recv(ia->ri_id->qp, &rep->rr_recv_wr, &recv_wr_fail);
1349
	if (rc)
1350 1351 1352
		goto out_postrecv;
	return 0;

1353 1354 1355 1356
out_map:
	pr_err("rpcrdma: failed to DMA map the Receive buffer\n");
	return -EIO;

1357 1358 1359
out_postrecv:
	pr_err("rpcrdma: ib_post_recv returned %i\n", rc);
	return -ENOTCONN;
1360
}
1361

1362 1363 1364 1365 1366 1367 1368 1369 1370 1371 1372 1373 1374 1375 1376 1377
/**
 * rpcrdma_ep_post_extra_recv - Post buffers for incoming backchannel requests
 * @r_xprt: transport associated with these backchannel resources
 * @min_reqs: minimum number of incoming requests expected
 *
 * Returns zero if all requested buffers were posted, or a negative errno.
 */
int
rpcrdma_ep_post_extra_recv(struct rpcrdma_xprt *r_xprt, unsigned int count)
{
	struct rpcrdma_buffer *buffers = &r_xprt->rx_buf;
	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
	struct rpcrdma_rep *rep;
	int rc;

	while (count--) {
1378
		spin_lock(&buffers->rb_lock);
1379 1380 1381
		if (list_empty(&buffers->rb_recv_bufs))
			goto out_reqbuf;
		rep = rpcrdma_buffer_get_rep_locked(buffers);
1382
		spin_unlock(&buffers->rb_lock);
1383

1384
		rc = rpcrdma_ep_post_recv(ia, rep);
1385 1386 1387 1388 1389 1390 1391
		if (rc)
			goto out_rc;
	}

	return 0;

out_reqbuf:
1392
	spin_unlock(&buffers->rb_lock);
1393 1394 1395 1396 1397 1398 1399
	pr_warn("%s: no extra receive buffers\n", __func__);
	return -ENOMEM;

out_rc:
	rpcrdma_recv_buffer_put(rep);
	return rc;
}