verbs.c 31.7 KB
Newer Older
1
/*
2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
 * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
 *
 * This software is available to you under a choice of one of two
 * licenses.  You may choose to be licensed under the terms of the GNU
 * General Public License (GPL) Version 2, available from the file
 * COPYING in the main directory of this source tree, or the BSD-type
 * license below:
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions
 * are met:
 *
 *      Redistributions of source code must retain the above copyright
 *      notice, this list of conditions and the following disclaimer.
 *
 *      Redistributions in binary form must reproduce the above
 *      copyright notice, this list of conditions and the following
 *      disclaimer in the documentation and/or other materials provided
 *      with the distribution.
 *
 *      Neither the name of the Network Appliance, Inc. nor the names of
 *      its contributors may be used to endorse or promote products
 *      derived from this software without specific prior written
 *      permission.
 *
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
38 39
 */

40 41 42 43 44 45 46 47 48 49
/*
 * verbs.c
 *
 * Encapsulates the major functions managing:
 *  o adapters
 *  o endpoints
 *  o connections
 *  o buffer memory
 */

50
#include <linux/interrupt.h>
51
#include <linux/slab.h>
52
#include <linux/prefetch.h>
53
#include <linux/sunrpc/addr.h>
54
#include <asm/bitops.h>
55
#include <linux/module.h> /* try_module_get()/module_put() */
56

57 58
#include "xprt_rdma.h"

59 60 61 62
/*
 * Globals/Macros
 */

J
Jeff Layton 已提交
63
#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
64 65 66 67 68 69 70
# define RPCDBG_FACILITY	RPCDBG_TRANS
#endif

/*
 * internal functions
 */

71
static struct workqueue_struct *rpcrdma_receive_wq;
72

73 74
int
rpcrdma_alloc_wq(void)
75
{
76
	struct workqueue_struct *recv_wq;
77

78 79 80 81 82
	recv_wq = alloc_workqueue("xprtrdma_receive",
				  WQ_MEM_RECLAIM | WQ_UNBOUND | WQ_HIGHPRI,
				  0);
	if (!recv_wq)
		return -ENOMEM;
83

84 85
	rpcrdma_receive_wq = recv_wq;
	return 0;
86 87
}

88 89
void
rpcrdma_destroy_wq(void)
90
{
91
	struct workqueue_struct *wq;
92

93 94 95 96 97
	if (rpcrdma_receive_wq) {
		wq = rpcrdma_receive_wq;
		rpcrdma_receive_wq = NULL;
		destroy_workqueue(wq);
	}
98 99
}

100 101 102 103 104
static void
rpcrdma_qp_async_error_upcall(struct ib_event *event, void *context)
{
	struct rpcrdma_ep *ep = context;

C
Chuck Lever 已提交
105
	pr_err("RPC:       %s: %s on device %s ep %p\n",
106
	       __func__, ib_event_msg(event->event),
C
Chuck Lever 已提交
107
		event->device->name, context);
108 109
	if (ep->rep_connected == 1) {
		ep->rep_connected = -EIO;
110
		rpcrdma_conn_func(ep);
111 112 113 114
		wake_up_all(&ep->rep_connect_wait);
	}
}

115 116 117 118 119
/**
 * rpcrdma_wc_send - Invoked by RDMA provider for each polled Send WC
 * @cq:	completion queue (ignored)
 * @wc:	completed WR
 *
120 121
 */
static void
122
rpcrdma_wc_send(struct ib_cq *cq, struct ib_wc *wc)
123
{
124 125 126 127 128
	/* WARNING: Only wr_cqe and status are reliable at this point */
	if (wc->status != IB_WC_SUCCESS && wc->status != IB_WC_WR_FLUSH_ERR)
		pr_err("rpcrdma: Send: %s (%u/0x%x)\n",
		       ib_wc_status_msg(wc->status),
		       wc->status, wc->vendor_err);
129
}
130

131
static void
132 133 134 135
rpcrdma_receive_worker(struct work_struct *work)
{
	struct rpcrdma_rep *rep =
			container_of(work, struct rpcrdma_rep, rr_work);
136

137
	rpcrdma_reply_handler(rep);
138 139
}

140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161
/* Perform basic sanity checking to avoid using garbage
 * to update the credit grant value.
 */
static void
rpcrdma_update_granted_credits(struct rpcrdma_rep *rep)
{
	struct rpcrdma_msg *rmsgp = rdmab_to_msg(rep->rr_rdmabuf);
	struct rpcrdma_buffer *buffer = &rep->rr_rxprt->rx_buf;
	u32 credits;

	if (rep->rr_len < RPCRDMA_HDRLEN_ERR)
		return;

	credits = be32_to_cpu(rmsgp->rm_credit);
	if (credits == 0)
		credits = 1;	/* don't deadlock */
	else if (credits > buffer->rb_max_requests)
		credits = buffer->rb_max_requests;

	atomic_set(&buffer->rb_credits, credits);
}

162 163 164 165 166 167
/**
 * rpcrdma_receive_wc - Invoked by RDMA provider for each polled Receive WC
 * @cq:	completion queue (ignored)
 * @wc:	completed WR
 *
 */
168
static void
169
rpcrdma_receive_wc(struct ib_cq *cq, struct ib_wc *wc)
170
{
171 172 173
	struct ib_cqe *cqe = wc->wr_cqe;
	struct rpcrdma_rep *rep = container_of(cqe, struct rpcrdma_rep,
					       rr_cqe);
174

175 176 177
	/* WARNING: Only wr_id and status are reliable at this point */
	if (wc->status != IB_WC_SUCCESS)
		goto out_fail;
178

179
	/* status == SUCCESS means all fields in wc are trustworthy */
180 181 182
	if (wc->opcode != IB_WC_RECV)
		return;

183 184 185
	dprintk("RPC:       %s: rep %p opcode 'recv', length %u: success\n",
		__func__, rep, wc->byte_len);

186
	rep->rr_len = wc->byte_len;
187
	ib_dma_sync_single_for_cpu(rep->rr_device,
188 189
				   rdmab_addr(rep->rr_rdmabuf),
				   rep->rr_len, DMA_FROM_DEVICE);
190 191

	rpcrdma_update_granted_credits(rep);
192 193

out_schedule:
194
	queue_work(rpcrdma_receive_wq, &rep->rr_work);
195
	return;
196

197 198
out_fail:
	if (wc->status != IB_WC_WR_FLUSH_ERR)
199 200 201
		pr_err("rpcrdma: Recv: %s (%u/0x%x)\n",
		       ib_wc_status_msg(wc->status),
		       wc->status, wc->vendor_err);
202
	rep->rr_len = RPCRDMA_BAD_LEN;
203
	goto out_schedule;
204 205
}

206 207 208 209 210 211
static int
rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event)
{
	struct rpcrdma_xprt *xprt = id->context;
	struct rpcrdma_ia *ia = &xprt->rx_ia;
	struct rpcrdma_ep *ep = &xprt->rx_ep;
J
Jeff Layton 已提交
212
#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
213
	struct sockaddr *sap = (struct sockaddr *)&ep->rep_remote_addr;
214
#endif
215 216
	struct ib_qp_attr *attr = &ia->ri_qp_attr;
	struct ib_qp_init_attr *iattr = &ia->ri_qp_init_attr;
217 218 219 220 221
	int connstate = 0;

	switch (event->event) {
	case RDMA_CM_EVENT_ADDR_RESOLVED:
	case RDMA_CM_EVENT_ROUTE_RESOLVED:
222
		ia->ri_async_rc = 0;
223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238
		complete(&ia->ri_done);
		break;
	case RDMA_CM_EVENT_ADDR_ERROR:
		ia->ri_async_rc = -EHOSTUNREACH;
		dprintk("RPC:       %s: CM address resolution error, ep 0x%p\n",
			__func__, ep);
		complete(&ia->ri_done);
		break;
	case RDMA_CM_EVENT_ROUTE_ERROR:
		ia->ri_async_rc = -ENETUNREACH;
		dprintk("RPC:       %s: CM route resolution error, ep 0x%p\n",
			__func__, ep);
		complete(&ia->ri_done);
		break;
	case RDMA_CM_EVENT_ESTABLISHED:
		connstate = 1;
239 240 241
		ib_query_qp(ia->ri_id->qp, attr,
			    IB_QP_MAX_QP_RD_ATOMIC | IB_QP_MAX_DEST_RD_ATOMIC,
			    iattr);
242 243
		dprintk("RPC:       %s: %d responder resources"
			" (%d initiator)\n",
244 245
			__func__, attr->max_dest_rd_atomic,
			attr->max_rd_atomic);
246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263
		goto connected;
	case RDMA_CM_EVENT_CONNECT_ERROR:
		connstate = -ENOTCONN;
		goto connected;
	case RDMA_CM_EVENT_UNREACHABLE:
		connstate = -ENETDOWN;
		goto connected;
	case RDMA_CM_EVENT_REJECTED:
		connstate = -ECONNREFUSED;
		goto connected;
	case RDMA_CM_EVENT_DISCONNECTED:
		connstate = -ECONNABORTED;
		goto connected;
	case RDMA_CM_EVENT_DEVICE_REMOVAL:
		connstate = -ENODEV;
connected:
		dprintk("RPC:       %s: %sconnected\n",
					__func__, connstate > 0 ? "" : "dis");
264
		atomic_set(&xprt->rx_buf.rb_credits, 1);
265
		ep->rep_connected = connstate;
266
		rpcrdma_conn_func(ep);
267
		wake_up_all(&ep->rep_connect_wait);
268
		/*FALLTHROUGH*/
269
	default:
270 271
		dprintk("RPC:       %s: %pIS:%u (ep 0x%p): %s\n",
			__func__, sap, rpc_get_port(sap), ep,
272
			rdma_event_msg(event->event));
273 274 275
		break;
	}

J
Jeff Layton 已提交
276
#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
277
	if (connstate == 1) {
278
		int ird = attr->max_dest_rd_atomic;
279
		int tird = ep->rep_remote_cma.responder_resources;
280

281
		pr_info("rpcrdma: connection to %pIS:%u on %s, memreg '%s', %d credits, %d responders%s\n",
282
			sap, rpc_get_port(sap),
283
			ia->ri_device->name,
284
			ia->ri_ops->ro_displayname,
285 286 287
			xprt->rx_buf.rb_max_requests,
			ird, ird < 4 && ird < tird / 2 ? " (low!)" : "");
	} else if (connstate < 0) {
288 289
		pr_info("rpcrdma: connection to %pIS:%u closed (%d)\n",
			sap, rpc_get_port(sap), connstate);
290 291 292
	}
#endif

293 294 295
	return 0;
}

296 297 298 299 300 301 302 303
static void rpcrdma_destroy_id(struct rdma_cm_id *id)
{
	if (id) {
		module_put(id->device->owner);
		rdma_destroy_id(id);
	}
}

304 305 306 307 308 309 310
static struct rdma_cm_id *
rpcrdma_create_id(struct rpcrdma_xprt *xprt,
			struct rpcrdma_ia *ia, struct sockaddr *addr)
{
	struct rdma_cm_id *id;
	int rc;

311 312
	init_completion(&ia->ri_done);

313 314
	id = rdma_create_id(&init_net, rpcrdma_conn_upcall, xprt, RDMA_PS_TCP,
			    IB_QPT_RC);
315 316 317 318 319 320 321
	if (IS_ERR(id)) {
		rc = PTR_ERR(id);
		dprintk("RPC:       %s: rdma_create_id() failed %i\n",
			__func__, rc);
		return id;
	}

322
	ia->ri_async_rc = -ETIMEDOUT;
323 324 325 326 327 328
	rc = rdma_resolve_addr(id, NULL, addr, RDMA_RESOLVE_TIMEOUT);
	if (rc) {
		dprintk("RPC:       %s: rdma_resolve_addr() failed %i\n",
			__func__, rc);
		goto out;
	}
329 330
	wait_for_completion_interruptible_timeout(&ia->ri_done,
				msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1);
331 332 333 334 335 336 337 338 339 340 341

	/* FIXME:
	 * Until xprtrdma supports DEVICE_REMOVAL, the provider must
	 * be pinned while there are active NFS/RDMA mounts to prevent
	 * hangs and crashes at umount time.
	 */
	if (!ia->ri_async_rc && !try_module_get(id->device->owner)) {
		dprintk("RPC:       %s: Failed to get device module\n",
			__func__);
		ia->ri_async_rc = -ENODEV;
	}
342 343 344 345
	rc = ia->ri_async_rc;
	if (rc)
		goto out;

346
	ia->ri_async_rc = -ETIMEDOUT;
347 348 349 350
	rc = rdma_resolve_route(id, RDMA_RESOLVE_TIMEOUT);
	if (rc) {
		dprintk("RPC:       %s: rdma_resolve_route() failed %i\n",
			__func__, rc);
351
		goto put;
352
	}
353 354
	wait_for_completion_interruptible_timeout(&ia->ri_done,
				msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1);
355 356
	rc = ia->ri_async_rc;
	if (rc)
357
		goto put;
358 359

	return id;
360 361
put:
	module_put(id->device->owner);
362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379
out:
	rdma_destroy_id(id);
	return ERR_PTR(rc);
}

/*
 * Exported functions.
 */

/*
 * Open and initialize an Interface Adapter.
 *  o initializes fields of struct rpcrdma_ia, including
 *    interface and provider attributes and protection zone.
 */
int
rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg)
{
	struct rpcrdma_ia *ia = &xprt->rx_ia;
380 381
	int rc;

382 383 384 385 386
	ia->ri_id = rpcrdma_create_id(xprt, ia, addr);
	if (IS_ERR(ia->ri_id)) {
		rc = PTR_ERR(ia->ri_id);
		goto out1;
	}
387
	ia->ri_device = ia->ri_id->device;
388

389
	ia->ri_pd = ib_alloc_pd(ia->ri_device);
390 391 392 393 394 395 396
	if (IS_ERR(ia->ri_pd)) {
		rc = PTR_ERR(ia->ri_pd);
		dprintk("RPC:       %s: ib_alloc_pd() failed %i\n",
			__func__, rc);
		goto out2;
	}

397
	if (memreg == RPCRDMA_FRMR) {
398 399 400
		if (!(ia->ri_device->attrs.device_cap_flags &
				IB_DEVICE_MEM_MGT_EXTENSIONS) ||
		    (ia->ri_device->attrs.max_fast_reg_page_list_len == 0)) {
401
			dprintk("RPC:       %s: FRMR registration "
402 403
				"not supported by HCA\n", __func__);
			memreg = RPCRDMA_MTHCAFMR;
404
		}
405 406
	}
	if (memreg == RPCRDMA_MTHCAFMR) {
407
		if (!ia->ri_device->alloc_fmr) {
408 409
			dprintk("RPC:       %s: MTHCAFMR registration "
				"not supported by HCA\n", __func__);
410
			rc = -EINVAL;
411
			goto out3;
412
		}
413 414 415
	}

	switch (memreg) {
416
	case RPCRDMA_FRMR:
417
		ia->ri_ops = &rpcrdma_frwr_memreg_ops;
418 419
		break;
	case RPCRDMA_MTHCAFMR:
420
		ia->ri_ops = &rpcrdma_fmr_memreg_ops;
421 422
		break;
	default:
423 424 425
		printk(KERN_ERR "RPC: Unsupported memory "
				"registration mode: %d\n", memreg);
		rc = -ENOMEM;
426
		goto out3;
427
	}
428 429
	dprintk("RPC:       %s: memory registration strategy is '%s'\n",
		__func__, ia->ri_ops->ro_displayname);
430 431

	return 0;
432 433 434 435

out3:
	ib_dealloc_pd(ia->ri_pd);
	ia->ri_pd = NULL;
436
out2:
437
	rpcrdma_destroy_id(ia->ri_id);
438
	ia->ri_id = NULL;
439 440 441 442 443 444 445 446 447 448 449 450 451
out1:
	return rc;
}

/*
 * Clean up/close an IA.
 *   o if event handles and PD have been initialized, free them.
 *   o close the IA
 */
void
rpcrdma_ia_close(struct rpcrdma_ia *ia)
{
	dprintk("RPC:       %s: entering\n", __func__);
452 453 454
	if (ia->ri_id != NULL && !IS_ERR(ia->ri_id)) {
		if (ia->ri_id->qp)
			rdma_destroy_qp(ia->ri_id);
455
		rpcrdma_destroy_id(ia->ri_id);
456 457
		ia->ri_id = NULL;
	}
458 459 460

	/* If the pd is still busy, xprtrdma missed freeing a resource */
	if (ia->ri_pd && !IS_ERR(ia->ri_pd))
461
		ib_dealloc_pd(ia->ri_pd);
462 463 464 465 466 467 468 469 470
}

/*
 * Create unconnected endpoint.
 */
int
rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
				struct rpcrdma_create_data_internal *cdata)
{
471
	struct ib_cq *sendcq, *recvcq;
472
	unsigned int max_qp_wr;
473
	int rc;
474

475
	if (ia->ri_device->attrs.max_sge < RPCRDMA_MAX_IOVS) {
476 477 478 479 480
		dprintk("RPC:       %s: insufficient sge's available\n",
			__func__);
		return -ENOMEM;
	}

481
	if (ia->ri_device->attrs.max_qp_wr <= RPCRDMA_BACKWARD_WRS) {
482 483 484 485
		dprintk("RPC:       %s: insufficient wqe's available\n",
			__func__);
		return -ENOMEM;
	}
486
	max_qp_wr = ia->ri_device->attrs.max_qp_wr - RPCRDMA_BACKWARD_WRS - 1;
487

488
	/* check provider's send/recv wr limits */
489 490
	if (cdata->max_requests > max_qp_wr)
		cdata->max_requests = max_qp_wr;
491 492 493 494 495

	ep->rep_attr.event_handler = rpcrdma_qp_async_error_upcall;
	ep->rep_attr.qp_context = ep;
	ep->rep_attr.srq = NULL;
	ep->rep_attr.cap.max_send_wr = cdata->max_requests;
496
	ep->rep_attr.cap.max_send_wr += RPCRDMA_BACKWARD_WRS;
497
	ep->rep_attr.cap.max_send_wr += 1;	/* drain cqe */
C
Chuck Lever 已提交
498 499 500
	rc = ia->ri_ops->ro_open(ia, ep, cdata);
	if (rc)
		return rc;
501
	ep->rep_attr.cap.max_recv_wr = cdata->max_requests;
502
	ep->rep_attr.cap.max_recv_wr += RPCRDMA_BACKWARD_WRS;
503
	ep->rep_attr.cap.max_recv_wr += 1;	/* drain cqe */
504
	ep->rep_attr.cap.max_send_sge = RPCRDMA_MAX_IOVS;
505 506 507 508 509 510 511 512 513 514 515 516 517 518 519
	ep->rep_attr.cap.max_recv_sge = 1;
	ep->rep_attr.cap.max_inline_data = 0;
	ep->rep_attr.sq_sig_type = IB_SIGNAL_REQ_WR;
	ep->rep_attr.qp_type = IB_QPT_RC;
	ep->rep_attr.port_num = ~0;

	dprintk("RPC:       %s: requested max: dtos: send %d recv %d; "
		"iovs: send %d recv %d\n",
		__func__,
		ep->rep_attr.cap.max_send_wr,
		ep->rep_attr.cap.max_recv_wr,
		ep->rep_attr.cap.max_send_sge,
		ep->rep_attr.cap.max_recv_sge);

	/* set trigger for requesting send completion */
520
	ep->rep_cqinit = ep->rep_attr.cap.max_send_wr/2 - 1;
521 522
	if (ep->rep_cqinit <= 2)
		ep->rep_cqinit = 0;	/* always signal? */
523 524
	INIT_CQCOUNT(ep);
	init_waitqueue_head(&ep->rep_connect_wait);
525
	INIT_DELAYED_WORK(&ep->rep_connect_worker, rpcrdma_connect_worker);
526

527 528 529
	sendcq = ib_alloc_cq(ia->ri_device, NULL,
			     ep->rep_attr.cap.max_send_wr + 1,
			     0, IB_POLL_SOFTIRQ);
530 531 532
	if (IS_ERR(sendcq)) {
		rc = PTR_ERR(sendcq);
		dprintk("RPC:       %s: failed to create send CQ: %i\n",
533 534 535 536
			__func__, rc);
		goto out1;
	}

537 538 539
	recvcq = ib_alloc_cq(ia->ri_device, NULL,
			     ep->rep_attr.cap.max_recv_wr + 1,
			     0, IB_POLL_SOFTIRQ);
540 541 542 543 544 545 546 547 548
	if (IS_ERR(recvcq)) {
		rc = PTR_ERR(recvcq);
		dprintk("RPC:       %s: failed to create recv CQ: %i\n",
			__func__, rc);
		goto out2;
	}

	ep->rep_attr.send_cq = sendcq;
	ep->rep_attr.recv_cq = recvcq;
549 550

	/* Initialize cma parameters */
551
	memset(&ep->rep_remote_cma, 0, sizeof(ep->rep_remote_cma));
552 553 554 555 556 557

	/* RPC/RDMA does not use private data */
	ep->rep_remote_cma.private_data = NULL;
	ep->rep_remote_cma.private_data_len = 0;

	/* Client offers RDMA Read but does not initiate */
558
	ep->rep_remote_cma.initiator_depth = 0;
559
	if (ia->ri_device->attrs.max_qp_rd_atom > 32)	/* arbitrary but <= 255 */
560 561
		ep->rep_remote_cma.responder_resources = 32;
	else
562
		ep->rep_remote_cma.responder_resources =
563
						ia->ri_device->attrs.max_qp_rd_atom;
564

565 566 567 568 569 570 571 572 573 574
	/* Limit transport retries so client can detect server
	 * GID changes quickly. RPC layer handles re-establishing
	 * transport connection and retransmission.
	 */
	ep->rep_remote_cma.retry_count = 6;

	/* RPC-over-RDMA handles its own flow control. In addition,
	 * make all RNR NAKs visible so we know that RPC-over-RDMA
	 * flow control is working correctly (no NAKs should be seen).
	 */
575 576 577 578 579 580
	ep->rep_remote_cma.flow_control = 0;
	ep->rep_remote_cma.rnr_retry_count = 0;

	return 0;

out2:
581
	ib_free_cq(sendcq);
582 583 584 585 586 587 588 589 590 591 592
out1:
	return rc;
}

/*
 * rpcrdma_ep_destroy
 *
 * Disconnect and destroy endpoint. After this, the only
 * valid operations on the ep are to free it (if dynamically
 * allocated) or re-create it.
 */
593
void
594 595 596 597 598
rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
{
	dprintk("RPC:       %s: entering, connected is %d\n",
		__func__, ep->rep_connected);

599 600
	cancel_delayed_work_sync(&ep->rep_connect_worker);

601
	if (ia->ri_id->qp) {
602
		rpcrdma_ep_disconnect(ep, ia);
603 604
		rdma_destroy_qp(ia->ri_id);
		ia->ri_id->qp = NULL;
605 606
	}

607
	ib_free_cq(ep->rep_attr.recv_cq);
608
	ib_free_cq(ep->rep_attr.send_cq);
609 610 611 612 613 614 615 616
}

/*
 * Connect unconnected endpoint.
 */
int
rpcrdma_ep_connect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
{
617
	struct rdma_cm_id *id, *old;
618 619 620
	int rc = 0;
	int retry_count = 0;

621
	if (ep->rep_connected != 0) {
622 623
		struct rpcrdma_xprt *xprt;
retry:
624
		dprintk("RPC:       %s: reconnecting...\n", __func__);
625 626

		rpcrdma_ep_disconnect(ep, ia);
627 628 629 630 631

		xprt = container_of(ia, struct rpcrdma_xprt, rx_ia);
		id = rpcrdma_create_id(xprt, ia,
				(struct sockaddr *)&xprt->rx_data.addr);
		if (IS_ERR(id)) {
632
			rc = -EHOSTUNREACH;
633 634 635 636 637 638 639 640 641
			goto out;
		}
		/* TEMP TEMP TEMP - fail if new device:
		 * Deregister/remarshal *all* requests!
		 * Close and recreate adapter, pd, etc!
		 * Re-determine all attributes still sane!
		 * More stuff I haven't thought of!
		 * Rrrgh!
		 */
642
		if (ia->ri_device != id->device) {
643 644
			printk("RPC:       %s: can't reconnect on "
				"different device!\n", __func__);
645
			rpcrdma_destroy_id(id);
646
			rc = -ENETUNREACH;
647 648 649
			goto out;
		}
		/* END TEMP */
650 651 652 653
		rc = rdma_create_qp(id, ia->ri_pd, &ep->rep_attr);
		if (rc) {
			dprintk("RPC:       %s: rdma_create_qp failed %i\n",
				__func__, rc);
654
			rpcrdma_destroy_id(id);
655 656 657
			rc = -ENETUNREACH;
			goto out;
		}
658 659

		old = ia->ri_id;
660
		ia->ri_id = id;
661 662

		rdma_destroy_qp(old);
663
		rpcrdma_destroy_id(old);
664 665 666 667 668 669 670 671 672
	} else {
		dprintk("RPC:       %s: connecting...\n", __func__);
		rc = rdma_create_qp(ia->ri_id, ia->ri_pd, &ep->rep_attr);
		if (rc) {
			dprintk("RPC:       %s: rdma_create_qp failed %i\n",
				__func__, rc);
			/* do not update ep->rep_connected */
			return -ENETUNREACH;
		}
673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691
	}

	ep->rep_connected = 0;

	rc = rdma_connect(ia->ri_id, &ep->rep_remote_cma);
	if (rc) {
		dprintk("RPC:       %s: rdma_connect() failed with %i\n",
				__func__, rc);
		goto out;
	}

	wait_event_interruptible(ep->rep_connect_wait, ep->rep_connected != 0);

	/*
	 * Check state. A non-peer reject indicates no listener
	 * (ECONNREFUSED), which may be a transient state. All
	 * others indicate a transport condition which has already
	 * undergone a best-effort.
	 */
692 693
	if (ep->rep_connected == -ECONNREFUSED &&
	    ++retry_count <= RDMA_CONNECT_RETRY_MAX) {
694 695 696 697 698 699
		dprintk("RPC:       %s: non-peer_reject, retry\n", __func__);
		goto retry;
	}
	if (ep->rep_connected <= 0) {
		/* Sometimes, the only way to reliably connect to remote
		 * CMs is to use same nonzero values for ORD and IRD. */
700 701 702 703 704 705 706 707
		if (retry_count++ <= RDMA_CONNECT_RETRY_MAX + 1 &&
		    (ep->rep_remote_cma.responder_resources == 0 ||
		     ep->rep_remote_cma.initiator_depth !=
				ep->rep_remote_cma.responder_resources)) {
			if (ep->rep_remote_cma.responder_resources == 0)
				ep->rep_remote_cma.responder_resources = 1;
			ep->rep_remote_cma.initiator_depth =
				ep->rep_remote_cma.responder_resources;
708
			goto retry;
709
		}
710 711
		rc = ep->rep_connected;
	} else {
712 713 714
		struct rpcrdma_xprt *r_xprt;
		unsigned int extras;

715
		dprintk("RPC:       %s: connected\n", __func__);
716 717 718 719 720 721

		r_xprt = container_of(ia, struct rpcrdma_xprt, rx_ia);
		extras = r_xprt->rx_buf.rb_bc_srv_max_requests;

		if (extras) {
			rc = rpcrdma_ep_post_extra_recv(r_xprt, extras);
722
			if (rc) {
723 724 725
				pr_warn("%s: rpcrdma_ep_post_extra_recv: %i\n",
					__func__, rc);
				rc = 0;
726
			}
727
		}
728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744
	}

out:
	if (rc)
		ep->rep_connected = rc;
	return rc;
}

/*
 * rpcrdma_ep_disconnect
 *
 * This is separate from destroy to facilitate the ability
 * to reconnect without recreating the endpoint.
 *
 * This call is not reentrant, and must not be made in parallel
 * on the same endpoint.
 */
745
void
746 747 748 749 750 751 752 753 754 755 756 757 758 759 760
rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
{
	int rc;

	rc = rdma_disconnect(ia->ri_id);
	if (!rc) {
		/* returns without wait if not connected */
		wait_event_interruptible(ep->rep_connect_wait,
							ep->rep_connected != 1);
		dprintk("RPC:       %s: after wait, %sconnected\n", __func__,
			(ep->rep_connected == 1) ? "still " : "dis");
	} else {
		dprintk("RPC:       %s: rdma_disconnect %i\n", __func__, rc);
		ep->rep_connected = rc;
	}
761 762

	ib_drain_qp(ia->ri_id->qp);
763 764
}

765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799
static void
rpcrdma_mr_recovery_worker(struct work_struct *work)
{
	struct rpcrdma_buffer *buf = container_of(work, struct rpcrdma_buffer,
						  rb_recovery_worker.work);
	struct rpcrdma_mw *mw;

	spin_lock(&buf->rb_recovery_lock);
	while (!list_empty(&buf->rb_stale_mrs)) {
		mw = list_first_entry(&buf->rb_stale_mrs,
				      struct rpcrdma_mw, mw_list);
		list_del_init(&mw->mw_list);
		spin_unlock(&buf->rb_recovery_lock);

		dprintk("RPC:       %s: recovering MR %p\n", __func__, mw);
		mw->mw_xprt->rx_ia.ri_ops->ro_recover_mr(mw);

		spin_lock(&buf->rb_recovery_lock);
	};
	spin_unlock(&buf->rb_recovery_lock);
}

void
rpcrdma_defer_mr_recovery(struct rpcrdma_mw *mw)
{
	struct rpcrdma_xprt *r_xprt = mw->mw_xprt;
	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;

	spin_lock(&buf->rb_recovery_lock);
	list_add(&mw->mw_list, &buf->rb_stale_mrs);
	spin_unlock(&buf->rb_recovery_lock);

	schedule_delayed_work(&buf->rb_recovery_worker, 0);
}

800
struct rpcrdma_req *
801 802
rpcrdma_create_req(struct rpcrdma_xprt *r_xprt)
{
803
	struct rpcrdma_buffer *buffer = &r_xprt->rx_buf;
804 805
	struct rpcrdma_req *req;

806
	req = kzalloc(sizeof(*req), GFP_KERNEL);
807
	if (req == NULL)
808
		return ERR_PTR(-ENOMEM);
809

810 811 812 813
	INIT_LIST_HEAD(&req->rl_free);
	spin_lock(&buffer->rb_reqslock);
	list_add(&req->rl_all, &buffer->rb_allreqs);
	spin_unlock(&buffer->rb_reqslock);
814
	req->rl_cqe.done = rpcrdma_wc_send;
815 816 817 818
	req->rl_buffer = &r_xprt->rx_buf;
	return req;
}

819
struct rpcrdma_rep *
820 821 822 823 824 825 826 827
rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt)
{
	struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
	struct rpcrdma_rep *rep;
	int rc;

	rc = -ENOMEM;
828
	rep = kzalloc(sizeof(*rep), GFP_KERNEL);
829 830 831
	if (rep == NULL)
		goto out;

832 833 834 835
	rep->rr_rdmabuf = rpcrdma_alloc_regbuf(ia, cdata->inline_rsize,
					       GFP_KERNEL);
	if (IS_ERR(rep->rr_rdmabuf)) {
		rc = PTR_ERR(rep->rr_rdmabuf);
836
		goto out_free;
837
	}
838

839
	rep->rr_device = ia->ri_device;
840
	rep->rr_cqe.done = rpcrdma_receive_wc;
841
	rep->rr_rxprt = r_xprt;
842
	INIT_WORK(&rep->rr_work, rpcrdma_receive_worker);
843 844 845 846 847 848 849 850
	return rep;

out_free:
	kfree(rep);
out:
	return ERR_PTR(rc);
}

851
int
852
rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
853
{
854 855
	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
856 857
	int i, rc;

858
	buf->rb_max_requests = r_xprt->rx_data.max_requests;
859
	buf->rb_bc_srv_max_requests = 0;
860
	atomic_set(&buf->rb_credits, 1);
861 862 863 864 865
	spin_lock_init(&buf->rb_lock);
	spin_lock_init(&buf->rb_recovery_lock);
	INIT_LIST_HEAD(&buf->rb_stale_mrs);
	INIT_DELAYED_WORK(&buf->rb_recovery_worker,
			  rpcrdma_mr_recovery_worker);
866

C
Chuck Lever 已提交
867 868 869
	rc = ia->ri_ops->ro_init(r_xprt);
	if (rc)
		goto out;
870

871
	INIT_LIST_HEAD(&buf->rb_send_bufs);
872 873
	INIT_LIST_HEAD(&buf->rb_allreqs);
	spin_lock_init(&buf->rb_reqslock);
874 875 876
	for (i = 0; i < buf->rb_max_requests; i++) {
		struct rpcrdma_req *req;

877 878
		req = rpcrdma_create_req(r_xprt);
		if (IS_ERR(req)) {
879 880
			dprintk("RPC:       %s: request buffer %d alloc"
				" failed\n", __func__, i);
881
			rc = PTR_ERR(req);
882 883
			goto out;
		}
884
		req->rl_backchannel = false;
885 886 887 888 889 890
		list_add(&req->rl_free, &buf->rb_send_bufs);
	}

	INIT_LIST_HEAD(&buf->rb_recv_bufs);
	for (i = 0; i < buf->rb_max_requests + 2; i++) {
		struct rpcrdma_rep *rep;
891

892 893
		rep = rpcrdma_create_rep(r_xprt);
		if (IS_ERR(rep)) {
894 895
			dprintk("RPC:       %s: reply buffer %d alloc failed\n",
				__func__, i);
896
			rc = PTR_ERR(rep);
897 898
			goto out;
		}
899
		list_add(&rep->rr_list, &buf->rb_recv_bufs);
900
	}
901

902 903 904 905 906 907
	return 0;
out:
	rpcrdma_buffer_destroy(buf);
	return rc;
}

908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929
static struct rpcrdma_req *
rpcrdma_buffer_get_req_locked(struct rpcrdma_buffer *buf)
{
	struct rpcrdma_req *req;

	req = list_first_entry(&buf->rb_send_bufs,
			       struct rpcrdma_req, rl_free);
	list_del(&req->rl_free);
	return req;
}

static struct rpcrdma_rep *
rpcrdma_buffer_get_rep_locked(struct rpcrdma_buffer *buf)
{
	struct rpcrdma_rep *rep;

	rep = list_first_entry(&buf->rb_recv_bufs,
			       struct rpcrdma_rep, rr_list);
	list_del(&rep->rr_list);
	return rep;
}

930 931 932
static void
rpcrdma_destroy_rep(struct rpcrdma_ia *ia, struct rpcrdma_rep *rep)
{
933
	rpcrdma_free_regbuf(ia, rep->rr_rdmabuf);
934 935 936
	kfree(rep);
}

937
void
938 939
rpcrdma_destroy_req(struct rpcrdma_ia *ia, struct rpcrdma_req *req)
{
940
	rpcrdma_free_regbuf(ia, req->rl_sendbuf);
941
	rpcrdma_free_regbuf(ia, req->rl_rdmabuf);
942 943 944
	kfree(req);
}

945 946 947 948 949
void
rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
{
	struct rpcrdma_ia *ia = rdmab_to_ia(buf);

950 951
	cancel_delayed_work_sync(&buf->rb_recovery_worker);

952 953
	while (!list_empty(&buf->rb_recv_bufs)) {
		struct rpcrdma_rep *rep;
954

955 956
		rep = rpcrdma_buffer_get_rep_locked(buf);
		rpcrdma_destroy_rep(ia, rep);
957 958
	}

959 960
	spin_lock(&buf->rb_reqslock);
	while (!list_empty(&buf->rb_allreqs)) {
961
		struct rpcrdma_req *req;
A
Allen Andrews 已提交
962

963 964 965 966 967
		req = list_first_entry(&buf->rb_allreqs,
				       struct rpcrdma_req, rl_all);
		list_del(&req->rl_all);

		spin_unlock(&buf->rb_reqslock);
968
		rpcrdma_destroy_req(ia, req);
969
		spin_lock(&buf->rb_reqslock);
970
	}
971
	spin_unlock(&buf->rb_reqslock);
A
Allen Andrews 已提交
972

973
	ia->ri_ops->ro_destroy(buf);
974 975
}

976 977
struct rpcrdma_mw *
rpcrdma_get_mw(struct rpcrdma_xprt *r_xprt)
978
{
979 980 981
	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
	struct rpcrdma_mw *mw = NULL;

C
Chuck Lever 已提交
982
	spin_lock(&buf->rb_mwlock);
983 984 985 986
	if (!list_empty(&buf->rb_mws)) {
		mw = list_first_entry(&buf->rb_mws,
				      struct rpcrdma_mw, mw_list);
		list_del_init(&mw->mw_list);
987
	}
C
Chuck Lever 已提交
988
	spin_unlock(&buf->rb_mwlock);
989 990 991 992

	if (!mw)
		pr_err("RPC:       %s: no MWs available\n", __func__);
	return mw;
993 994
}

995 996
void
rpcrdma_put_mw(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mw *mw)
997
{
998
	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
999

C
Chuck Lever 已提交
1000
	spin_lock(&buf->rb_mwlock);
1001
	list_add_tail(&mw->mw_list, &buf->rb_mws);
C
Chuck Lever 已提交
1002
	spin_unlock(&buf->rb_mwlock);
1003 1004
}

1005 1006 1007
/*
 * Get a set of request/reply buffers.
 *
1008
 * Reply buffer (if available) is attached to send buffer upon return.
1009 1010 1011 1012 1013
 */
struct rpcrdma_req *
rpcrdma_buffer_get(struct rpcrdma_buffer *buffers)
{
	struct rpcrdma_req *req;
1014

1015
	spin_lock(&buffers->rb_lock);
1016 1017 1018 1019 1020 1021
	if (list_empty(&buffers->rb_send_bufs))
		goto out_reqbuf;
	req = rpcrdma_buffer_get_req_locked(buffers);
	if (list_empty(&buffers->rb_recv_bufs))
		goto out_repbuf;
	req->rl_reply = rpcrdma_buffer_get_rep_locked(buffers);
1022
	spin_unlock(&buffers->rb_lock);
1023
	return req;
1024

1025
out_reqbuf:
1026
	spin_unlock(&buffers->rb_lock);
1027 1028 1029
	pr_warn("RPC:       %s: out of request buffers\n", __func__);
	return NULL;
out_repbuf:
1030
	spin_unlock(&buffers->rb_lock);
1031 1032
	pr_warn("RPC:       %s: out of reply buffers\n", __func__);
	req->rl_reply = NULL;
1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043
	return req;
}

/*
 * Put request/reply buffers back into pool.
 * Pre-decrement counter/array index.
 */
void
rpcrdma_buffer_put(struct rpcrdma_req *req)
{
	struct rpcrdma_buffer *buffers = req->rl_buffer;
1044
	struct rpcrdma_rep *rep = req->rl_reply;
1045

1046 1047 1048
	req->rl_niovs = 0;
	req->rl_reply = NULL;

1049
	spin_lock(&buffers->rb_lock);
1050 1051 1052
	list_add_tail(&req->rl_free, &buffers->rb_send_bufs);
	if (rep)
		list_add_tail(&rep->rr_list, &buffers->rb_recv_bufs);
1053
	spin_unlock(&buffers->rb_lock);
1054 1055 1056 1057
}

/*
 * Recover reply buffers from pool.
1058
 * This happens when recovering from disconnect.
1059 1060 1061 1062 1063 1064
 */
void
rpcrdma_recv_buffer_get(struct rpcrdma_req *req)
{
	struct rpcrdma_buffer *buffers = req->rl_buffer;

1065
	spin_lock(&buffers->rb_lock);
1066 1067
	if (!list_empty(&buffers->rb_recv_bufs))
		req->rl_reply = rpcrdma_buffer_get_rep_locked(buffers);
1068
	spin_unlock(&buffers->rb_lock);
1069 1070 1071 1072
}

/*
 * Put reply buffers back into pool when not attached to
1073
 * request. This happens in error conditions.
1074 1075 1076 1077
 */
void
rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep)
{
1078
	struct rpcrdma_buffer *buffers = &rep->rr_rxprt->rx_buf;
1079

1080
	spin_lock(&buffers->rb_lock);
1081
	list_add_tail(&rep->rr_list, &buffers->rb_recv_bufs);
1082
	spin_unlock(&buffers->rb_lock);
1083 1084 1085 1086 1087 1088
}

/*
 * Wrappers for internal-use kmalloc memory registration, used by buffer code.
 */

1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107
/**
 * rpcrdma_alloc_regbuf - kmalloc and register memory for SEND/RECV buffers
 * @ia: controlling rpcrdma_ia
 * @size: size of buffer to be allocated, in bytes
 * @flags: GFP flags
 *
 * Returns pointer to private header of an area of internally
 * registered memory, or an ERR_PTR. The registered buffer follows
 * the end of the private header.
 *
 * xprtrdma uses a regbuf for posting an outgoing RDMA SEND, or for
 * receiving the payload of RDMA RECV operations. regbufs are not
 * used for RDMA READ/WRITE operations, thus are registered only for
 * LOCAL access.
 */
struct rpcrdma_regbuf *
rpcrdma_alloc_regbuf(struct rpcrdma_ia *ia, size_t size, gfp_t flags)
{
	struct rpcrdma_regbuf *rb;
1108
	struct ib_sge *iov;
1109 1110 1111 1112 1113

	rb = kmalloc(sizeof(*rb) + size, flags);
	if (rb == NULL)
		goto out;

1114 1115 1116 1117 1118
	iov = &rb->rg_iov;
	iov->addr = ib_dma_map_single(ia->ri_device,
				      (void *)rb->rg_base, size,
				      DMA_BIDIRECTIONAL);
	if (ib_dma_mapping_error(ia->ri_device, iov->addr))
1119 1120
		goto out_free;

1121
	iov->length = size;
1122
	iov->lkey = ia->ri_pd->local_dma_lkey;
1123 1124
	rb->rg_size = size;
	rb->rg_owner = NULL;
1125 1126 1127 1128 1129
	return rb;

out_free:
	kfree(rb);
out:
1130
	return ERR_PTR(-ENOMEM);
1131 1132 1133 1134 1135 1136 1137 1138 1139 1140
}

/**
 * rpcrdma_free_regbuf - deregister and free registered buffer
 * @ia: controlling rpcrdma_ia
 * @rb: regbuf to be deregistered and freed
 */
void
rpcrdma_free_regbuf(struct rpcrdma_ia *ia, struct rpcrdma_regbuf *rb)
{
1141 1142 1143 1144 1145 1146 1147 1148 1149
	struct ib_sge *iov;

	if (!rb)
		return;

	iov = &rb->rg_iov;
	ib_dma_unmap_single(ia->ri_device,
			    iov->addr, iov->length, DMA_BIDIRECTIONAL);
	kfree(rb);
1150 1151
}

1152 1153 1154 1155 1156 1157 1158 1159 1160 1161
/*
 * Prepost any receive buffer, then post send.
 *
 * Receive buffer is donated to hardware, reclaimed upon recv completion.
 */
int
rpcrdma_ep_post(struct rpcrdma_ia *ia,
		struct rpcrdma_ep *ep,
		struct rpcrdma_req *req)
{
1162
	struct ib_device *device = ia->ri_device;
1163 1164
	struct ib_send_wr send_wr, *send_wr_fail;
	struct rpcrdma_rep *rep = req->rl_reply;
1165 1166
	struct ib_sge *iov = req->rl_send_iov;
	int i, rc;
1167 1168 1169 1170 1171 1172 1173 1174 1175

	if (rep) {
		rc = rpcrdma_ep_post_recv(ia, ep, rep);
		if (rc)
			goto out;
		req->rl_reply = NULL;
	}

	send_wr.next = NULL;
1176
	send_wr.wr_cqe = &req->rl_cqe;
1177
	send_wr.sg_list = iov;
1178 1179
	send_wr.num_sge = req->rl_niovs;
	send_wr.opcode = IB_WR_SEND;
1180 1181 1182 1183 1184 1185

	for (i = 0; i < send_wr.num_sge; i++)
		ib_dma_sync_single_for_device(device, iov[i].addr,
					      iov[i].length, DMA_TO_DEVICE);
	dprintk("RPC:       %s: posting %d s/g entries\n",
		__func__, send_wr.num_sge);
1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213

	if (DECR_CQCOUNT(ep) > 0)
		send_wr.send_flags = 0;
	else { /* Provider must take a send completion every now and then */
		INIT_CQCOUNT(ep);
		send_wr.send_flags = IB_SEND_SIGNALED;
	}

	rc = ib_post_send(ia->ri_id->qp, &send_wr, &send_wr_fail);
	if (rc)
		dprintk("RPC:       %s: ib_post_send returned %i\n", __func__,
			rc);
out:
	return rc;
}

/*
 * (Re)post a receive buffer.
 */
int
rpcrdma_ep_post_recv(struct rpcrdma_ia *ia,
		     struct rpcrdma_ep *ep,
		     struct rpcrdma_rep *rep)
{
	struct ib_recv_wr recv_wr, *recv_wr_fail;
	int rc;

	recv_wr.next = NULL;
1214
	recv_wr.wr_cqe = &rep->rr_cqe;
1215
	recv_wr.sg_list = &rep->rr_rdmabuf->rg_iov;
1216 1217
	recv_wr.num_sge = 1;

1218
	ib_dma_sync_single_for_cpu(ia->ri_device,
1219 1220 1221
				   rdmab_addr(rep->rr_rdmabuf),
				   rdmab_length(rep->rr_rdmabuf),
				   DMA_BIDIRECTIONAL);
1222 1223 1224 1225 1226 1227 1228 1229

	rc = ib_post_recv(ia->ri_id->qp, &recv_wr, &recv_wr_fail);

	if (rc)
		dprintk("RPC:       %s: ib_post_recv returned %i\n", __func__,
			rc);
	return rc;
}
1230

1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247
/**
 * rpcrdma_ep_post_extra_recv - Post buffers for incoming backchannel requests
 * @r_xprt: transport associated with these backchannel resources
 * @min_reqs: minimum number of incoming requests expected
 *
 * Returns zero if all requested buffers were posted, or a negative errno.
 */
int
rpcrdma_ep_post_extra_recv(struct rpcrdma_xprt *r_xprt, unsigned int count)
{
	struct rpcrdma_buffer *buffers = &r_xprt->rx_buf;
	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
	struct rpcrdma_ep *ep = &r_xprt->rx_ep;
	struct rpcrdma_rep *rep;
	int rc;

	while (count--) {
1248
		spin_lock(&buffers->rb_lock);
1249 1250 1251
		if (list_empty(&buffers->rb_recv_bufs))
			goto out_reqbuf;
		rep = rpcrdma_buffer_get_rep_locked(buffers);
1252
		spin_unlock(&buffers->rb_lock);
1253 1254 1255 1256 1257 1258 1259 1260 1261

		rc = rpcrdma_ep_post_recv(ia, ep, rep);
		if (rc)
			goto out_rc;
	}

	return 0;

out_reqbuf:
1262
	spin_unlock(&buffers->rb_lock);
1263 1264 1265 1266 1267 1268 1269
	pr_warn("%s: no extra receive buffers\n", __func__);
	return -ENOMEM;

out_rc:
	rpcrdma_recv_buffer_put(rep);
	return rc;
}