verbs.c 31.1 KB
Newer Older
1
/*
2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
 * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
 *
 * This software is available to you under a choice of one of two
 * licenses.  You may choose to be licensed under the terms of the GNU
 * General Public License (GPL) Version 2, available from the file
 * COPYING in the main directory of this source tree, or the BSD-type
 * license below:
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions
 * are met:
 *
 *      Redistributions of source code must retain the above copyright
 *      notice, this list of conditions and the following disclaimer.
 *
 *      Redistributions in binary form must reproduce the above
 *      copyright notice, this list of conditions and the following
 *      disclaimer in the documentation and/or other materials provided
 *      with the distribution.
 *
 *      Neither the name of the Network Appliance, Inc. nor the names of
 *      its contributors may be used to endorse or promote products
 *      derived from this software without specific prior written
 *      permission.
 *
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
38 39
 */

40 41 42 43 44 45 46 47 48 49
/*
 * verbs.c
 *
 * Encapsulates the major functions managing:
 *  o adapters
 *  o endpoints
 *  o connections
 *  o buffer memory
 */

50
#include <linux/interrupt.h>
51
#include <linux/slab.h>
52
#include <linux/prefetch.h>
53
#include <linux/sunrpc/addr.h>
54
#include <asm/bitops.h>
55
#include <linux/module.h> /* try_module_get()/module_put() */
56

57 58
#include "xprt_rdma.h"

59 60 61 62
/*
 * Globals/Macros
 */

J
Jeff Layton 已提交
63
#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
64 65 66 67 68 69 70
# define RPCDBG_FACILITY	RPCDBG_TRANS
#endif

/*
 * internal functions
 */

71
static struct workqueue_struct *rpcrdma_receive_wq;
72

73 74
int
rpcrdma_alloc_wq(void)
75
{
76
	struct workqueue_struct *recv_wq;
77

78 79 80 81 82
	recv_wq = alloc_workqueue("xprtrdma_receive",
				  WQ_MEM_RECLAIM | WQ_UNBOUND | WQ_HIGHPRI,
				  0);
	if (!recv_wq)
		return -ENOMEM;
83

84 85
	rpcrdma_receive_wq = recv_wq;
	return 0;
86 87
}

88 89
void
rpcrdma_destroy_wq(void)
90
{
91
	struct workqueue_struct *wq;
92

93 94 95 96 97
	if (rpcrdma_receive_wq) {
		wq = rpcrdma_receive_wq;
		rpcrdma_receive_wq = NULL;
		destroy_workqueue(wq);
	}
98 99
}

100 101 102 103 104
static void
rpcrdma_qp_async_error_upcall(struct ib_event *event, void *context)
{
	struct rpcrdma_ep *ep = context;

C
Chuck Lever 已提交
105
	pr_err("RPC:       %s: %s on device %s ep %p\n",
106
	       __func__, ib_event_msg(event->event),
C
Chuck Lever 已提交
107
		event->device->name, context);
108 109
	if (ep->rep_connected == 1) {
		ep->rep_connected = -EIO;
110
		rpcrdma_conn_func(ep);
111 112 113 114
		wake_up_all(&ep->rep_connect_wait);
	}
}

115 116 117 118 119
/**
 * rpcrdma_wc_send - Invoked by RDMA provider for each polled Send WC
 * @cq:	completion queue (ignored)
 * @wc:	completed WR
 *
120 121
 */
static void
122
rpcrdma_wc_send(struct ib_cq *cq, struct ib_wc *wc)
123
{
124 125 126 127 128
	/* WARNING: Only wr_cqe and status are reliable at this point */
	if (wc->status != IB_WC_SUCCESS && wc->status != IB_WC_WR_FLUSH_ERR)
		pr_err("rpcrdma: Send: %s (%u/0x%x)\n",
		       ib_wc_status_msg(wc->status),
		       wc->status, wc->vendor_err);
129
}
130

131
static void
132 133 134 135
rpcrdma_receive_worker(struct work_struct *work)
{
	struct rpcrdma_rep *rep =
			container_of(work, struct rpcrdma_rep, rr_work);
136

137
	rpcrdma_reply_handler(rep);
138 139
}

140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161
/* Perform basic sanity checking to avoid using garbage
 * to update the credit grant value.
 */
static void
rpcrdma_update_granted_credits(struct rpcrdma_rep *rep)
{
	struct rpcrdma_msg *rmsgp = rdmab_to_msg(rep->rr_rdmabuf);
	struct rpcrdma_buffer *buffer = &rep->rr_rxprt->rx_buf;
	u32 credits;

	if (rep->rr_len < RPCRDMA_HDRLEN_ERR)
		return;

	credits = be32_to_cpu(rmsgp->rm_credit);
	if (credits == 0)
		credits = 1;	/* don't deadlock */
	else if (credits > buffer->rb_max_requests)
		credits = buffer->rb_max_requests;

	atomic_set(&buffer->rb_credits, credits);
}

162 163 164 165 166 167
/**
 * rpcrdma_receive_wc - Invoked by RDMA provider for each polled Receive WC
 * @cq:	completion queue (ignored)
 * @wc:	completed WR
 *
 */
168
static void
169
rpcrdma_receive_wc(struct ib_cq *cq, struct ib_wc *wc)
170
{
171 172 173
	struct ib_cqe *cqe = wc->wr_cqe;
	struct rpcrdma_rep *rep = container_of(cqe, struct rpcrdma_rep,
					       rr_cqe);
174

175 176 177
	/* WARNING: Only wr_id and status are reliable at this point */
	if (wc->status != IB_WC_SUCCESS)
		goto out_fail;
178

179
	/* status == SUCCESS means all fields in wc are trustworthy */
180 181 182
	if (wc->opcode != IB_WC_RECV)
		return;

183 184 185
	dprintk("RPC:       %s: rep %p opcode 'recv', length %u: success\n",
		__func__, rep, wc->byte_len);

186
	rep->rr_len = wc->byte_len;
187
	ib_dma_sync_single_for_cpu(rep->rr_device,
188 189
				   rdmab_addr(rep->rr_rdmabuf),
				   rep->rr_len, DMA_FROM_DEVICE);
190 191

	rpcrdma_update_granted_credits(rep);
192 193

out_schedule:
194
	queue_work(rpcrdma_receive_wq, &rep->rr_work);
195
	return;
196

197 198
out_fail:
	if (wc->status != IB_WC_WR_FLUSH_ERR)
199 200 201
		pr_err("rpcrdma: Recv: %s (%u/0x%x)\n",
		       ib_wc_status_msg(wc->status),
		       wc->status, wc->vendor_err);
202
	rep->rr_len = RPCRDMA_BAD_LEN;
203
	goto out_schedule;
204 205
}

206 207 208
static void
rpcrdma_flush_cqs(struct rpcrdma_ep *ep)
{
209 210 211
	struct ib_wc wc;

	while (ib_poll_cq(ep->rep_attr.recv_cq, 1, &wc) > 0)
212
		rpcrdma_receive_wc(NULL, &wc);
213 214
}

215 216 217 218 219 220
static int
rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event)
{
	struct rpcrdma_xprt *xprt = id->context;
	struct rpcrdma_ia *ia = &xprt->rx_ia;
	struct rpcrdma_ep *ep = &xprt->rx_ep;
J
Jeff Layton 已提交
221
#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
222
	struct sockaddr *sap = (struct sockaddr *)&ep->rep_remote_addr;
223
#endif
224 225
	struct ib_qp_attr *attr = &ia->ri_qp_attr;
	struct ib_qp_init_attr *iattr = &ia->ri_qp_init_attr;
226 227 228 229 230
	int connstate = 0;

	switch (event->event) {
	case RDMA_CM_EVENT_ADDR_RESOLVED:
	case RDMA_CM_EVENT_ROUTE_RESOLVED:
231
		ia->ri_async_rc = 0;
232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247
		complete(&ia->ri_done);
		break;
	case RDMA_CM_EVENT_ADDR_ERROR:
		ia->ri_async_rc = -EHOSTUNREACH;
		dprintk("RPC:       %s: CM address resolution error, ep 0x%p\n",
			__func__, ep);
		complete(&ia->ri_done);
		break;
	case RDMA_CM_EVENT_ROUTE_ERROR:
		ia->ri_async_rc = -ENETUNREACH;
		dprintk("RPC:       %s: CM route resolution error, ep 0x%p\n",
			__func__, ep);
		complete(&ia->ri_done);
		break;
	case RDMA_CM_EVENT_ESTABLISHED:
		connstate = 1;
248 249 250
		ib_query_qp(ia->ri_id->qp, attr,
			    IB_QP_MAX_QP_RD_ATOMIC | IB_QP_MAX_DEST_RD_ATOMIC,
			    iattr);
251 252
		dprintk("RPC:       %s: %d responder resources"
			" (%d initiator)\n",
253 254
			__func__, attr->max_dest_rd_atomic,
			attr->max_rd_atomic);
255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272
		goto connected;
	case RDMA_CM_EVENT_CONNECT_ERROR:
		connstate = -ENOTCONN;
		goto connected;
	case RDMA_CM_EVENT_UNREACHABLE:
		connstate = -ENETDOWN;
		goto connected;
	case RDMA_CM_EVENT_REJECTED:
		connstate = -ECONNREFUSED;
		goto connected;
	case RDMA_CM_EVENT_DISCONNECTED:
		connstate = -ECONNABORTED;
		goto connected;
	case RDMA_CM_EVENT_DEVICE_REMOVAL:
		connstate = -ENODEV;
connected:
		dprintk("RPC:       %s: %sconnected\n",
					__func__, connstate > 0 ? "" : "dis");
273
		atomic_set(&xprt->rx_buf.rb_credits, 1);
274
		ep->rep_connected = connstate;
275
		rpcrdma_conn_func(ep);
276
		wake_up_all(&ep->rep_connect_wait);
277
		/*FALLTHROUGH*/
278
	default:
279 280
		dprintk("RPC:       %s: %pIS:%u (ep 0x%p): %s\n",
			__func__, sap, rpc_get_port(sap), ep,
281
			rdma_event_msg(event->event));
282 283 284
		break;
	}

J
Jeff Layton 已提交
285
#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
286
	if (connstate == 1) {
287
		int ird = attr->max_dest_rd_atomic;
288
		int tird = ep->rep_remote_cma.responder_resources;
289

290
		pr_info("rpcrdma: connection to %pIS:%u on %s, memreg '%s', %d credits, %d responders%s\n",
291
			sap, rpc_get_port(sap),
292
			ia->ri_device->name,
293
			ia->ri_ops->ro_displayname,
294 295 296
			xprt->rx_buf.rb_max_requests,
			ird, ird < 4 && ird < tird / 2 ? " (low!)" : "");
	} else if (connstate < 0) {
297 298
		pr_info("rpcrdma: connection to %pIS:%u closed (%d)\n",
			sap, rpc_get_port(sap), connstate);
299 300 301
	}
#endif

302 303 304
	return 0;
}

305 306 307 308 309 310 311 312
static void rpcrdma_destroy_id(struct rdma_cm_id *id)
{
	if (id) {
		module_put(id->device->owner);
		rdma_destroy_id(id);
	}
}

313 314 315 316 317 318 319
static struct rdma_cm_id *
rpcrdma_create_id(struct rpcrdma_xprt *xprt,
			struct rpcrdma_ia *ia, struct sockaddr *addr)
{
	struct rdma_cm_id *id;
	int rc;

320 321
	init_completion(&ia->ri_done);

322 323
	id = rdma_create_id(&init_net, rpcrdma_conn_upcall, xprt, RDMA_PS_TCP,
			    IB_QPT_RC);
324 325 326 327 328 329 330
	if (IS_ERR(id)) {
		rc = PTR_ERR(id);
		dprintk("RPC:       %s: rdma_create_id() failed %i\n",
			__func__, rc);
		return id;
	}

331
	ia->ri_async_rc = -ETIMEDOUT;
332 333 334 335 336 337
	rc = rdma_resolve_addr(id, NULL, addr, RDMA_RESOLVE_TIMEOUT);
	if (rc) {
		dprintk("RPC:       %s: rdma_resolve_addr() failed %i\n",
			__func__, rc);
		goto out;
	}
338 339
	wait_for_completion_interruptible_timeout(&ia->ri_done,
				msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1);
340 341 342 343 344 345 346 347 348 349 350

	/* FIXME:
	 * Until xprtrdma supports DEVICE_REMOVAL, the provider must
	 * be pinned while there are active NFS/RDMA mounts to prevent
	 * hangs and crashes at umount time.
	 */
	if (!ia->ri_async_rc && !try_module_get(id->device->owner)) {
		dprintk("RPC:       %s: Failed to get device module\n",
			__func__);
		ia->ri_async_rc = -ENODEV;
	}
351 352 353 354
	rc = ia->ri_async_rc;
	if (rc)
		goto out;

355
	ia->ri_async_rc = -ETIMEDOUT;
356 357 358 359
	rc = rdma_resolve_route(id, RDMA_RESOLVE_TIMEOUT);
	if (rc) {
		dprintk("RPC:       %s: rdma_resolve_route() failed %i\n",
			__func__, rc);
360
		goto put;
361
	}
362 363
	wait_for_completion_interruptible_timeout(&ia->ri_done,
				msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1);
364 365
	rc = ia->ri_async_rc;
	if (rc)
366
		goto put;
367 368

	return id;
369 370
put:
	module_put(id->device->owner);
371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405
out:
	rdma_destroy_id(id);
	return ERR_PTR(rc);
}

/*
 * Drain any cq, prior to teardown.
 */
static void
rpcrdma_clean_cq(struct ib_cq *cq)
{
	struct ib_wc wc;
	int count = 0;

	while (1 == ib_poll_cq(cq, 1, &wc))
		++count;

	if (count)
		dprintk("RPC:       %s: flushed %d events (last 0x%x)\n",
			__func__, count, wc.opcode);
}

/*
 * Exported functions.
 */

/*
 * Open and initialize an Interface Adapter.
 *  o initializes fields of struct rpcrdma_ia, including
 *    interface and provider attributes and protection zone.
 */
int
rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg)
{
	struct rpcrdma_ia *ia = &xprt->rx_ia;
406 407 408
	int rc;

	ia->ri_dma_mr = NULL;
409 410 411 412 413 414

	ia->ri_id = rpcrdma_create_id(xprt, ia, addr);
	if (IS_ERR(ia->ri_id)) {
		rc = PTR_ERR(ia->ri_id);
		goto out1;
	}
415
	ia->ri_device = ia->ri_id->device;
416

417
	ia->ri_pd = ib_alloc_pd(ia->ri_device);
418 419 420 421 422 423 424
	if (IS_ERR(ia->ri_pd)) {
		rc = PTR_ERR(ia->ri_pd);
		dprintk("RPC:       %s: ib_alloc_pd() failed %i\n",
			__func__, rc);
		goto out2;
	}

425
	if (memreg == RPCRDMA_FRMR) {
426 427 428
		if (!(ia->ri_device->attrs.device_cap_flags &
				IB_DEVICE_MEM_MGT_EXTENSIONS) ||
		    (ia->ri_device->attrs.max_fast_reg_page_list_len == 0)) {
429
			dprintk("RPC:       %s: FRMR registration "
430 431
				"not supported by HCA\n", __func__);
			memreg = RPCRDMA_MTHCAFMR;
432
		}
433 434
	}
	if (memreg == RPCRDMA_MTHCAFMR) {
435
		if (!ia->ri_device->alloc_fmr) {
436 437
			dprintk("RPC:       %s: MTHCAFMR registration "
				"not supported by HCA\n", __func__);
438
			rc = -EINVAL;
439
			goto out3;
440
		}
441 442 443
	}

	switch (memreg) {
444
	case RPCRDMA_FRMR:
445
		ia->ri_ops = &rpcrdma_frwr_memreg_ops;
446 447
		break;
	case RPCRDMA_ALLPHYSICAL:
448
		ia->ri_ops = &rpcrdma_physical_memreg_ops;
449
		break;
450
	case RPCRDMA_MTHCAFMR:
451
		ia->ri_ops = &rpcrdma_fmr_memreg_ops;
452 453
		break;
	default:
454 455 456
		printk(KERN_ERR "RPC: Unsupported memory "
				"registration mode: %d\n", memreg);
		rc = -ENOMEM;
457
		goto out3;
458
	}
459 460
	dprintk("RPC:       %s: memory registration strategy is '%s'\n",
		__func__, ia->ri_ops->ro_displayname);
461

462
	rwlock_init(&ia->ri_qplock);
463
	return 0;
464 465 466 467

out3:
	ib_dealloc_pd(ia->ri_pd);
	ia->ri_pd = NULL;
468
out2:
469
	rpcrdma_destroy_id(ia->ri_id);
470
	ia->ri_id = NULL;
471 472 473 474 475 476 477 478 479 480 481 482 483
out1:
	return rc;
}

/*
 * Clean up/close an IA.
 *   o if event handles and PD have been initialized, free them.
 *   o close the IA
 */
void
rpcrdma_ia_close(struct rpcrdma_ia *ia)
{
	dprintk("RPC:       %s: entering\n", __func__);
484 485 486
	if (ia->ri_id != NULL && !IS_ERR(ia->ri_id)) {
		if (ia->ri_id->qp)
			rdma_destroy_qp(ia->ri_id);
487
		rpcrdma_destroy_id(ia->ri_id);
488 489
		ia->ri_id = NULL;
	}
490 491 492

	/* If the pd is still busy, xprtrdma missed freeing a resource */
	if (ia->ri_pd && !IS_ERR(ia->ri_pd))
493
		ib_dealloc_pd(ia->ri_pd);
494 495 496 497 498 499 500 501 502
}

/*
 * Create unconnected endpoint.
 */
int
rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
				struct rpcrdma_create_data_internal *cdata)
{
503
	struct ib_cq *sendcq, *recvcq;
504
	unsigned int max_qp_wr;
505
	int rc;
506

507
	if (ia->ri_device->attrs.max_sge < RPCRDMA_MAX_IOVS) {
508 509 510 511 512
		dprintk("RPC:       %s: insufficient sge's available\n",
			__func__);
		return -ENOMEM;
	}

513
	if (ia->ri_device->attrs.max_qp_wr <= RPCRDMA_BACKWARD_WRS) {
514 515 516 517
		dprintk("RPC:       %s: insufficient wqe's available\n",
			__func__);
		return -ENOMEM;
	}
518
	max_qp_wr = ia->ri_device->attrs.max_qp_wr - RPCRDMA_BACKWARD_WRS;
519

520
	/* check provider's send/recv wr limits */
521 522
	if (cdata->max_requests > max_qp_wr)
		cdata->max_requests = max_qp_wr;
523 524 525 526 527

	ep->rep_attr.event_handler = rpcrdma_qp_async_error_upcall;
	ep->rep_attr.qp_context = ep;
	ep->rep_attr.srq = NULL;
	ep->rep_attr.cap.max_send_wr = cdata->max_requests;
528
	ep->rep_attr.cap.max_send_wr += RPCRDMA_BACKWARD_WRS;
C
Chuck Lever 已提交
529 530 531
	rc = ia->ri_ops->ro_open(ia, ep, cdata);
	if (rc)
		return rc;
532
	ep->rep_attr.cap.max_recv_wr = cdata->max_requests;
533
	ep->rep_attr.cap.max_recv_wr += RPCRDMA_BACKWARD_WRS;
534
	ep->rep_attr.cap.max_send_sge = RPCRDMA_MAX_IOVS;
535 536 537 538 539 540 541 542 543 544 545 546 547 548 549
	ep->rep_attr.cap.max_recv_sge = 1;
	ep->rep_attr.cap.max_inline_data = 0;
	ep->rep_attr.sq_sig_type = IB_SIGNAL_REQ_WR;
	ep->rep_attr.qp_type = IB_QPT_RC;
	ep->rep_attr.port_num = ~0;

	dprintk("RPC:       %s: requested max: dtos: send %d recv %d; "
		"iovs: send %d recv %d\n",
		__func__,
		ep->rep_attr.cap.max_send_wr,
		ep->rep_attr.cap.max_recv_wr,
		ep->rep_attr.cap.max_send_sge,
		ep->rep_attr.cap.max_recv_sge);

	/* set trigger for requesting send completion */
550
	ep->rep_cqinit = ep->rep_attr.cap.max_send_wr/2 - 1;
551 552
	if (ep->rep_cqinit <= 2)
		ep->rep_cqinit = 0;	/* always signal? */
553 554
	INIT_CQCOUNT(ep);
	init_waitqueue_head(&ep->rep_connect_wait);
555
	INIT_DELAYED_WORK(&ep->rep_connect_worker, rpcrdma_connect_worker);
556

557 558 559
	sendcq = ib_alloc_cq(ia->ri_device, NULL,
			     ep->rep_attr.cap.max_send_wr + 1,
			     0, IB_POLL_SOFTIRQ);
560 561 562
	if (IS_ERR(sendcq)) {
		rc = PTR_ERR(sendcq);
		dprintk("RPC:       %s: failed to create send CQ: %i\n",
563 564 565 566
			__func__, rc);
		goto out1;
	}

567 568 569
	recvcq = ib_alloc_cq(ia->ri_device, NULL,
			     ep->rep_attr.cap.max_recv_wr + 1,
			     0, IB_POLL_SOFTIRQ);
570 571 572 573 574 575 576 577 578
	if (IS_ERR(recvcq)) {
		rc = PTR_ERR(recvcq);
		dprintk("RPC:       %s: failed to create recv CQ: %i\n",
			__func__, rc);
		goto out2;
	}

	ep->rep_attr.send_cq = sendcq;
	ep->rep_attr.recv_cq = recvcq;
579 580 581 582 583 584 585 586

	/* Initialize cma parameters */

	/* RPC/RDMA does not use private data */
	ep->rep_remote_cma.private_data = NULL;
	ep->rep_remote_cma.private_data_len = 0;

	/* Client offers RDMA Read but does not initiate */
587
	ep->rep_remote_cma.initiator_depth = 0;
588
	if (ia->ri_device->attrs.max_qp_rd_atom > 32)	/* arbitrary but <= 255 */
589 590
		ep->rep_remote_cma.responder_resources = 32;
	else
591
		ep->rep_remote_cma.responder_resources =
592
						ia->ri_device->attrs.max_qp_rd_atom;
593 594 595 596 597 598 599 600

	ep->rep_remote_cma.retry_count = 7;
	ep->rep_remote_cma.flow_control = 0;
	ep->rep_remote_cma.rnr_retry_count = 0;

	return 0;

out2:
601
	ib_free_cq(sendcq);
602
out1:
603 604
	if (ia->ri_dma_mr)
		ib_dereg_mr(ia->ri_dma_mr);
605 606 607 608 609 610 611 612 613 614
	return rc;
}

/*
 * rpcrdma_ep_destroy
 *
 * Disconnect and destroy endpoint. After this, the only
 * valid operations on the ep are to free it (if dynamically
 * allocated) or re-create it.
 */
615
void
616 617 618 619 620 621 622
rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
{
	int rc;

	dprintk("RPC:       %s: entering, connected is %d\n",
		__func__, ep->rep_connected);

623 624
	cancel_delayed_work_sync(&ep->rep_connect_worker);

625
	if (ia->ri_id->qp)
626
		rpcrdma_ep_disconnect(ep, ia);
627 628 629 630 631

	rpcrdma_clean_cq(ep->rep_attr.recv_cq);
	rpcrdma_clean_cq(ep->rep_attr.send_cq);

	if (ia->ri_id->qp) {
632 633
		rdma_destroy_qp(ia->ri_id);
		ia->ri_id->qp = NULL;
634 635
	}

636
	ib_free_cq(ep->rep_attr.recv_cq);
637
	ib_free_cq(ep->rep_attr.send_cq);
638 639 640 641 642 643

	if (ia->ri_dma_mr) {
		rc = ib_dereg_mr(ia->ri_dma_mr);
		dprintk("RPC:       %s: ib_dereg_mr returned %i\n",
			__func__, rc);
	}
644 645 646 647 648 649 650 651
}

/*
 * Connect unconnected endpoint.
 */
int
rpcrdma_ep_connect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
{
652
	struct rdma_cm_id *id, *old;
653 654 655
	int rc = 0;
	int retry_count = 0;

656
	if (ep->rep_connected != 0) {
657 658
		struct rpcrdma_xprt *xprt;
retry:
659
		dprintk("RPC:       %s: reconnecting...\n", __func__);
660 661

		rpcrdma_ep_disconnect(ep, ia);
662
		rpcrdma_flush_cqs(ep);
663 664 665 666 667

		xprt = container_of(ia, struct rpcrdma_xprt, rx_ia);
		id = rpcrdma_create_id(xprt, ia,
				(struct sockaddr *)&xprt->rx_data.addr);
		if (IS_ERR(id)) {
668
			rc = -EHOSTUNREACH;
669 670 671 672 673 674 675 676 677
			goto out;
		}
		/* TEMP TEMP TEMP - fail if new device:
		 * Deregister/remarshal *all* requests!
		 * Close and recreate adapter, pd, etc!
		 * Re-determine all attributes still sane!
		 * More stuff I haven't thought of!
		 * Rrrgh!
		 */
678
		if (ia->ri_device != id->device) {
679 680
			printk("RPC:       %s: can't reconnect on "
				"different device!\n", __func__);
681
			rpcrdma_destroy_id(id);
682
			rc = -ENETUNREACH;
683 684 685
			goto out;
		}
		/* END TEMP */
686 687 688 689
		rc = rdma_create_qp(id, ia->ri_pd, &ep->rep_attr);
		if (rc) {
			dprintk("RPC:       %s: rdma_create_qp failed %i\n",
				__func__, rc);
690
			rpcrdma_destroy_id(id);
691 692 693
			rc = -ENETUNREACH;
			goto out;
		}
694 695 696

		write_lock(&ia->ri_qplock);
		old = ia->ri_id;
697
		ia->ri_id = id;
698 699 700
		write_unlock(&ia->ri_qplock);

		rdma_destroy_qp(old);
701
		rpcrdma_destroy_id(old);
702 703 704 705 706 707 708 709 710
	} else {
		dprintk("RPC:       %s: connecting...\n", __func__);
		rc = rdma_create_qp(ia->ri_id, ia->ri_pd, &ep->rep_attr);
		if (rc) {
			dprintk("RPC:       %s: rdma_create_qp failed %i\n",
				__func__, rc);
			/* do not update ep->rep_connected */
			return -ENETUNREACH;
		}
711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729
	}

	ep->rep_connected = 0;

	rc = rdma_connect(ia->ri_id, &ep->rep_remote_cma);
	if (rc) {
		dprintk("RPC:       %s: rdma_connect() failed with %i\n",
				__func__, rc);
		goto out;
	}

	wait_event_interruptible(ep->rep_connect_wait, ep->rep_connected != 0);

	/*
	 * Check state. A non-peer reject indicates no listener
	 * (ECONNREFUSED), which may be a transient state. All
	 * others indicate a transport condition which has already
	 * undergone a best-effort.
	 */
730 731
	if (ep->rep_connected == -ECONNREFUSED &&
	    ++retry_count <= RDMA_CONNECT_RETRY_MAX) {
732 733 734 735 736 737
		dprintk("RPC:       %s: non-peer_reject, retry\n", __func__);
		goto retry;
	}
	if (ep->rep_connected <= 0) {
		/* Sometimes, the only way to reliably connect to remote
		 * CMs is to use same nonzero values for ORD and IRD. */
738 739 740 741 742 743 744 745
		if (retry_count++ <= RDMA_CONNECT_RETRY_MAX + 1 &&
		    (ep->rep_remote_cma.responder_resources == 0 ||
		     ep->rep_remote_cma.initiator_depth !=
				ep->rep_remote_cma.responder_resources)) {
			if (ep->rep_remote_cma.responder_resources == 0)
				ep->rep_remote_cma.responder_resources = 1;
			ep->rep_remote_cma.initiator_depth =
				ep->rep_remote_cma.responder_resources;
746
			goto retry;
747
		}
748 749
		rc = ep->rep_connected;
	} else {
750 751 752
		struct rpcrdma_xprt *r_xprt;
		unsigned int extras;

753
		dprintk("RPC:       %s: connected\n", __func__);
754 755 756 757 758 759

		r_xprt = container_of(ia, struct rpcrdma_xprt, rx_ia);
		extras = r_xprt->rx_buf.rb_bc_srv_max_requests;

		if (extras) {
			rc = rpcrdma_ep_post_extra_recv(r_xprt, extras);
760
			if (rc) {
761 762 763
				pr_warn("%s: rpcrdma_ep_post_extra_recv: %i\n",
					__func__, rc);
				rc = 0;
764
			}
765
		}
766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782
	}

out:
	if (rc)
		ep->rep_connected = rc;
	return rc;
}

/*
 * rpcrdma_ep_disconnect
 *
 * This is separate from destroy to facilitate the ability
 * to reconnect without recreating the endpoint.
 *
 * This call is not reentrant, and must not be made in parallel
 * on the same endpoint.
 */
783
void
784 785 786 787
rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
{
	int rc;

788
	rpcrdma_flush_cqs(ep);
789 790 791 792 793 794 795 796 797 798 799 800 801
	rc = rdma_disconnect(ia->ri_id);
	if (!rc) {
		/* returns without wait if not connected */
		wait_event_interruptible(ep->rep_connect_wait,
							ep->rep_connected != 1);
		dprintk("RPC:       %s: after wait, %sconnected\n", __func__,
			(ep->rep_connected == 1) ? "still " : "dis");
	} else {
		dprintk("RPC:       %s: rdma_disconnect %i\n", __func__, rc);
		ep->rep_connected = rc;
	}
}

802
struct rpcrdma_req *
803 804
rpcrdma_create_req(struct rpcrdma_xprt *r_xprt)
{
805
	struct rpcrdma_buffer *buffer = &r_xprt->rx_buf;
806 807
	struct rpcrdma_req *req;

808
	req = kzalloc(sizeof(*req), GFP_KERNEL);
809
	if (req == NULL)
810
		return ERR_PTR(-ENOMEM);
811

812 813 814 815
	INIT_LIST_HEAD(&req->rl_free);
	spin_lock(&buffer->rb_reqslock);
	list_add(&req->rl_all, &buffer->rb_allreqs);
	spin_unlock(&buffer->rb_reqslock);
816
	req->rl_cqe.done = rpcrdma_wc_send;
817 818 819 820
	req->rl_buffer = &r_xprt->rx_buf;
	return req;
}

821
struct rpcrdma_rep *
822 823 824 825 826 827 828 829
rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt)
{
	struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
	struct rpcrdma_rep *rep;
	int rc;

	rc = -ENOMEM;
830
	rep = kzalloc(sizeof(*rep), GFP_KERNEL);
831 832 833
	if (rep == NULL)
		goto out;

834 835 836 837
	rep->rr_rdmabuf = rpcrdma_alloc_regbuf(ia, cdata->inline_rsize,
					       GFP_KERNEL);
	if (IS_ERR(rep->rr_rdmabuf)) {
		rc = PTR_ERR(rep->rr_rdmabuf);
838
		goto out_free;
839
	}
840

841
	rep->rr_device = ia->ri_device;
842
	rep->rr_cqe.done = rpcrdma_receive_wc;
843
	rep->rr_rxprt = r_xprt;
844
	INIT_WORK(&rep->rr_work, rpcrdma_receive_worker);
845 846 847 848 849 850 851 852
	return rep;

out_free:
	kfree(rep);
out:
	return ERR_PTR(rc);
}

853
int
854
rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
855
{
856 857
	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
858 859
	int i, rc;

860
	buf->rb_max_requests = r_xprt->rx_data.max_requests;
861
	buf->rb_bc_srv_max_requests = 0;
862
	spin_lock_init(&buf->rb_lock);
863
	atomic_set(&buf->rb_credits, 1);
864

C
Chuck Lever 已提交
865 866 867
	rc = ia->ri_ops->ro_init(r_xprt);
	if (rc)
		goto out;
868

869
	INIT_LIST_HEAD(&buf->rb_send_bufs);
870 871
	INIT_LIST_HEAD(&buf->rb_allreqs);
	spin_lock_init(&buf->rb_reqslock);
872 873 874
	for (i = 0; i < buf->rb_max_requests; i++) {
		struct rpcrdma_req *req;

875 876
		req = rpcrdma_create_req(r_xprt);
		if (IS_ERR(req)) {
877 878
			dprintk("RPC:       %s: request buffer %d alloc"
				" failed\n", __func__, i);
879
			rc = PTR_ERR(req);
880 881
			goto out;
		}
882
		req->rl_backchannel = false;
883 884 885 886 887 888
		list_add(&req->rl_free, &buf->rb_send_bufs);
	}

	INIT_LIST_HEAD(&buf->rb_recv_bufs);
	for (i = 0; i < buf->rb_max_requests + 2; i++) {
		struct rpcrdma_rep *rep;
889

890 891
		rep = rpcrdma_create_rep(r_xprt);
		if (IS_ERR(rep)) {
892 893
			dprintk("RPC:       %s: reply buffer %d alloc failed\n",
				__func__, i);
894
			rc = PTR_ERR(rep);
895 896
			goto out;
		}
897
		list_add(&rep->rr_list, &buf->rb_recv_bufs);
898
	}
899

900 901 902 903 904 905
	return 0;
out:
	rpcrdma_buffer_destroy(buf);
	return rc;
}

906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927
static struct rpcrdma_req *
rpcrdma_buffer_get_req_locked(struct rpcrdma_buffer *buf)
{
	struct rpcrdma_req *req;

	req = list_first_entry(&buf->rb_send_bufs,
			       struct rpcrdma_req, rl_free);
	list_del(&req->rl_free);
	return req;
}

static struct rpcrdma_rep *
rpcrdma_buffer_get_rep_locked(struct rpcrdma_buffer *buf)
{
	struct rpcrdma_rep *rep;

	rep = list_first_entry(&buf->rb_recv_bufs,
			       struct rpcrdma_rep, rr_list);
	list_del(&rep->rr_list);
	return rep;
}

928 929 930
static void
rpcrdma_destroy_rep(struct rpcrdma_ia *ia, struct rpcrdma_rep *rep)
{
931
	rpcrdma_free_regbuf(ia, rep->rr_rdmabuf);
932 933 934
	kfree(rep);
}

935
void
936 937
rpcrdma_destroy_req(struct rpcrdma_ia *ia, struct rpcrdma_req *req)
{
938
	rpcrdma_free_regbuf(ia, req->rl_sendbuf);
939
	rpcrdma_free_regbuf(ia, req->rl_rdmabuf);
940 941 942
	kfree(req);
}

943 944 945 946 947
void
rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
{
	struct rpcrdma_ia *ia = rdmab_to_ia(buf);

948 949
	while (!list_empty(&buf->rb_recv_bufs)) {
		struct rpcrdma_rep *rep;
950

951 952
		rep = rpcrdma_buffer_get_rep_locked(buf);
		rpcrdma_destroy_rep(ia, rep);
953 954
	}

955 956
	spin_lock(&buf->rb_reqslock);
	while (!list_empty(&buf->rb_allreqs)) {
957
		struct rpcrdma_req *req;
A
Allen Andrews 已提交
958

959 960 961 962 963
		req = list_first_entry(&buf->rb_allreqs,
				       struct rpcrdma_req, rl_all);
		list_del(&req->rl_all);

		spin_unlock(&buf->rb_reqslock);
964
		rpcrdma_destroy_req(ia, req);
965
		spin_lock(&buf->rb_reqslock);
966
	}
967
	spin_unlock(&buf->rb_reqslock);
A
Allen Andrews 已提交
968

969
	ia->ri_ops->ro_destroy(buf);
970 971
}

972 973
struct rpcrdma_mw *
rpcrdma_get_mw(struct rpcrdma_xprt *r_xprt)
974
{
975 976 977
	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
	struct rpcrdma_mw *mw = NULL;

C
Chuck Lever 已提交
978
	spin_lock(&buf->rb_mwlock);
979 980 981 982
	if (!list_empty(&buf->rb_mws)) {
		mw = list_first_entry(&buf->rb_mws,
				      struct rpcrdma_mw, mw_list);
		list_del_init(&mw->mw_list);
983
	}
C
Chuck Lever 已提交
984
	spin_unlock(&buf->rb_mwlock);
985 986 987 988

	if (!mw)
		pr_err("RPC:       %s: no MWs available\n", __func__);
	return mw;
989 990
}

991 992
void
rpcrdma_put_mw(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mw *mw)
993
{
994
	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
995

C
Chuck Lever 已提交
996
	spin_lock(&buf->rb_mwlock);
997
	list_add_tail(&mw->mw_list, &buf->rb_mws);
C
Chuck Lever 已提交
998
	spin_unlock(&buf->rb_mwlock);
999 1000
}

1001 1002 1003
/*
 * Get a set of request/reply buffers.
 *
1004
 * Reply buffer (if available) is attached to send buffer upon return.
1005 1006 1007 1008 1009
 */
struct rpcrdma_req *
rpcrdma_buffer_get(struct rpcrdma_buffer *buffers)
{
	struct rpcrdma_req *req;
1010

1011
	spin_lock(&buffers->rb_lock);
1012 1013 1014 1015 1016 1017
	if (list_empty(&buffers->rb_send_bufs))
		goto out_reqbuf;
	req = rpcrdma_buffer_get_req_locked(buffers);
	if (list_empty(&buffers->rb_recv_bufs))
		goto out_repbuf;
	req->rl_reply = rpcrdma_buffer_get_rep_locked(buffers);
1018
	spin_unlock(&buffers->rb_lock);
1019
	return req;
1020

1021
out_reqbuf:
1022
	spin_unlock(&buffers->rb_lock);
1023 1024 1025
	pr_warn("RPC:       %s: out of request buffers\n", __func__);
	return NULL;
out_repbuf:
1026
	spin_unlock(&buffers->rb_lock);
1027 1028
	pr_warn("RPC:       %s: out of reply buffers\n", __func__);
	req->rl_reply = NULL;
1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039
	return req;
}

/*
 * Put request/reply buffers back into pool.
 * Pre-decrement counter/array index.
 */
void
rpcrdma_buffer_put(struct rpcrdma_req *req)
{
	struct rpcrdma_buffer *buffers = req->rl_buffer;
1040
	struct rpcrdma_rep *rep = req->rl_reply;
1041

1042 1043 1044
	req->rl_niovs = 0;
	req->rl_reply = NULL;

1045
	spin_lock(&buffers->rb_lock);
1046 1047 1048
	list_add_tail(&req->rl_free, &buffers->rb_send_bufs);
	if (rep)
		list_add_tail(&rep->rr_list, &buffers->rb_recv_bufs);
1049
	spin_unlock(&buffers->rb_lock);
1050 1051 1052 1053
}

/*
 * Recover reply buffers from pool.
1054
 * This happens when recovering from disconnect.
1055 1056 1057 1058 1059 1060
 */
void
rpcrdma_recv_buffer_get(struct rpcrdma_req *req)
{
	struct rpcrdma_buffer *buffers = req->rl_buffer;

1061
	spin_lock(&buffers->rb_lock);
1062 1063
	if (!list_empty(&buffers->rb_recv_bufs))
		req->rl_reply = rpcrdma_buffer_get_rep_locked(buffers);
1064
	spin_unlock(&buffers->rb_lock);
1065 1066 1067 1068
}

/*
 * Put reply buffers back into pool when not attached to
1069
 * request. This happens in error conditions.
1070 1071 1072 1073
 */
void
rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep)
{
1074
	struct rpcrdma_buffer *buffers = &rep->rr_rxprt->rx_buf;
1075

1076
	spin_lock(&buffers->rb_lock);
1077
	list_add_tail(&rep->rr_list, &buffers->rb_recv_bufs);
1078
	spin_unlock(&buffers->rb_lock);
1079 1080 1081 1082 1083 1084
}

/*
 * Wrappers for internal-use kmalloc memory registration, used by buffer code.
 */

1085 1086 1087 1088 1089 1090 1091 1092
void
rpcrdma_mapping_error(struct rpcrdma_mr_seg *seg)
{
	dprintk("RPC:       map_one: offset %p iova %llx len %zu\n",
		seg->mr_offset,
		(unsigned long long)seg->mr_dma, seg->mr_dmalen);
}

1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111
/**
 * rpcrdma_alloc_regbuf - kmalloc and register memory for SEND/RECV buffers
 * @ia: controlling rpcrdma_ia
 * @size: size of buffer to be allocated, in bytes
 * @flags: GFP flags
 *
 * Returns pointer to private header of an area of internally
 * registered memory, or an ERR_PTR. The registered buffer follows
 * the end of the private header.
 *
 * xprtrdma uses a regbuf for posting an outgoing RDMA SEND, or for
 * receiving the payload of RDMA RECV operations. regbufs are not
 * used for RDMA READ/WRITE operations, thus are registered only for
 * LOCAL access.
 */
struct rpcrdma_regbuf *
rpcrdma_alloc_regbuf(struct rpcrdma_ia *ia, size_t size, gfp_t flags)
{
	struct rpcrdma_regbuf *rb;
1112
	struct ib_sge *iov;
1113 1114 1115 1116 1117

	rb = kmalloc(sizeof(*rb) + size, flags);
	if (rb == NULL)
		goto out;

1118 1119 1120 1121 1122
	iov = &rb->rg_iov;
	iov->addr = ib_dma_map_single(ia->ri_device,
				      (void *)rb->rg_base, size,
				      DMA_BIDIRECTIONAL);
	if (ib_dma_mapping_error(ia->ri_device, iov->addr))
1123 1124
		goto out_free;

1125
	iov->length = size;
1126
	iov->lkey = ia->ri_pd->local_dma_lkey;
1127 1128
	rb->rg_size = size;
	rb->rg_owner = NULL;
1129 1130 1131 1132 1133
	return rb;

out_free:
	kfree(rb);
out:
1134
	return ERR_PTR(-ENOMEM);
1135 1136 1137 1138 1139 1140 1141 1142 1143 1144
}

/**
 * rpcrdma_free_regbuf - deregister and free registered buffer
 * @ia: controlling rpcrdma_ia
 * @rb: regbuf to be deregistered and freed
 */
void
rpcrdma_free_regbuf(struct rpcrdma_ia *ia, struct rpcrdma_regbuf *rb)
{
1145 1146 1147 1148 1149 1150 1151 1152 1153
	struct ib_sge *iov;

	if (!rb)
		return;

	iov = &rb->rg_iov;
	ib_dma_unmap_single(ia->ri_device,
			    iov->addr, iov->length, DMA_BIDIRECTIONAL);
	kfree(rb);
1154 1155
}

1156 1157 1158 1159 1160 1161 1162 1163 1164 1165
/*
 * Prepost any receive buffer, then post send.
 *
 * Receive buffer is donated to hardware, reclaimed upon recv completion.
 */
int
rpcrdma_ep_post(struct rpcrdma_ia *ia,
		struct rpcrdma_ep *ep,
		struct rpcrdma_req *req)
{
1166
	struct ib_device *device = ia->ri_device;
1167 1168
	struct ib_send_wr send_wr, *send_wr_fail;
	struct rpcrdma_rep *rep = req->rl_reply;
1169 1170
	struct ib_sge *iov = req->rl_send_iov;
	int i, rc;
1171 1172 1173 1174 1175 1176 1177 1178 1179

	if (rep) {
		rc = rpcrdma_ep_post_recv(ia, ep, rep);
		if (rc)
			goto out;
		req->rl_reply = NULL;
	}

	send_wr.next = NULL;
1180
	send_wr.wr_cqe = &req->rl_cqe;
1181
	send_wr.sg_list = iov;
1182 1183
	send_wr.num_sge = req->rl_niovs;
	send_wr.opcode = IB_WR_SEND;
1184 1185 1186 1187 1188 1189

	for (i = 0; i < send_wr.num_sge; i++)
		ib_dma_sync_single_for_device(device, iov[i].addr,
					      iov[i].length, DMA_TO_DEVICE);
	dprintk("RPC:       %s: posting %d s/g entries\n",
		__func__, send_wr.num_sge);
1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217

	if (DECR_CQCOUNT(ep) > 0)
		send_wr.send_flags = 0;
	else { /* Provider must take a send completion every now and then */
		INIT_CQCOUNT(ep);
		send_wr.send_flags = IB_SEND_SIGNALED;
	}

	rc = ib_post_send(ia->ri_id->qp, &send_wr, &send_wr_fail);
	if (rc)
		dprintk("RPC:       %s: ib_post_send returned %i\n", __func__,
			rc);
out:
	return rc;
}

/*
 * (Re)post a receive buffer.
 */
int
rpcrdma_ep_post_recv(struct rpcrdma_ia *ia,
		     struct rpcrdma_ep *ep,
		     struct rpcrdma_rep *rep)
{
	struct ib_recv_wr recv_wr, *recv_wr_fail;
	int rc;

	recv_wr.next = NULL;
1218
	recv_wr.wr_cqe = &rep->rr_cqe;
1219
	recv_wr.sg_list = &rep->rr_rdmabuf->rg_iov;
1220 1221
	recv_wr.num_sge = 1;

1222
	ib_dma_sync_single_for_cpu(ia->ri_device,
1223 1224 1225
				   rdmab_addr(rep->rr_rdmabuf),
				   rdmab_length(rep->rr_rdmabuf),
				   DMA_BIDIRECTIONAL);
1226 1227 1228 1229 1230 1231 1232 1233

	rc = ib_post_recv(ia->ri_id->qp, &recv_wr, &recv_wr_fail);

	if (rc)
		dprintk("RPC:       %s: ib_post_recv returned %i\n", __func__,
			rc);
	return rc;
}
1234

1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251
/**
 * rpcrdma_ep_post_extra_recv - Post buffers for incoming backchannel requests
 * @r_xprt: transport associated with these backchannel resources
 * @min_reqs: minimum number of incoming requests expected
 *
 * Returns zero if all requested buffers were posted, or a negative errno.
 */
int
rpcrdma_ep_post_extra_recv(struct rpcrdma_xprt *r_xprt, unsigned int count)
{
	struct rpcrdma_buffer *buffers = &r_xprt->rx_buf;
	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
	struct rpcrdma_ep *ep = &r_xprt->rx_ep;
	struct rpcrdma_rep *rep;
	int rc;

	while (count--) {
1252
		spin_lock(&buffers->rb_lock);
1253 1254 1255
		if (list_empty(&buffers->rb_recv_bufs))
			goto out_reqbuf;
		rep = rpcrdma_buffer_get_rep_locked(buffers);
1256
		spin_unlock(&buffers->rb_lock);
1257 1258 1259 1260 1261 1262 1263 1264 1265

		rc = rpcrdma_ep_post_recv(ia, ep, rep);
		if (rc)
			goto out_rc;
	}

	return 0;

out_reqbuf:
1266
	spin_unlock(&buffers->rb_lock);
1267 1268 1269 1270 1271 1272 1273
	pr_warn("%s: no extra receive buffers\n", __func__);
	return -ENOMEM;

out_rc:
	rpcrdma_recv_buffer_put(rep);
	return rc;
}