verbs.c 30.7 KB
Newer Older
1
/*
2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
 * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
 *
 * This software is available to you under a choice of one of two
 * licenses.  You may choose to be licensed under the terms of the GNU
 * General Public License (GPL) Version 2, available from the file
 * COPYING in the main directory of this source tree, or the BSD-type
 * license below:
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions
 * are met:
 *
 *      Redistributions of source code must retain the above copyright
 *      notice, this list of conditions and the following disclaimer.
 *
 *      Redistributions in binary form must reproduce the above
 *      copyright notice, this list of conditions and the following
 *      disclaimer in the documentation and/or other materials provided
 *      with the distribution.
 *
 *      Neither the name of the Network Appliance, Inc. nor the names of
 *      its contributors may be used to endorse or promote products
 *      derived from this software without specific prior written
 *      permission.
 *
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
38 39
 */

40 41 42 43 44 45 46 47 48 49
/*
 * verbs.c
 *
 * Encapsulates the major functions managing:
 *  o adapters
 *  o endpoints
 *  o connections
 *  o buffer memory
 */

50
#include <linux/interrupt.h>
51
#include <linux/slab.h>
52
#include <linux/prefetch.h>
53
#include <linux/sunrpc/addr.h>
54
#include <asm/bitops.h>
55
#include <linux/module.h> /* try_module_get()/module_put() */
56

57 58
#include "xprt_rdma.h"

59 60 61 62
/*
 * Globals/Macros
 */

J
Jeff Layton 已提交
63
#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
64 65 66 67 68 69 70
# define RPCDBG_FACILITY	RPCDBG_TRANS
#endif

/*
 * internal functions
 */

71
static struct workqueue_struct *rpcrdma_receive_wq;
72

73 74
int
rpcrdma_alloc_wq(void)
75
{
76
	struct workqueue_struct *recv_wq;
77

78 79 80 81 82
	recv_wq = alloc_workqueue("xprtrdma_receive",
				  WQ_MEM_RECLAIM | WQ_UNBOUND | WQ_HIGHPRI,
				  0);
	if (!recv_wq)
		return -ENOMEM;
83

84 85
	rpcrdma_receive_wq = recv_wq;
	return 0;
86 87
}

88 89
void
rpcrdma_destroy_wq(void)
90
{
91
	struct workqueue_struct *wq;
92

93 94 95 96 97
	if (rpcrdma_receive_wq) {
		wq = rpcrdma_receive_wq;
		rpcrdma_receive_wq = NULL;
		destroy_workqueue(wq);
	}
98 99
}

100 101 102 103 104
static void
rpcrdma_qp_async_error_upcall(struct ib_event *event, void *context)
{
	struct rpcrdma_ep *ep = context;

C
Chuck Lever 已提交
105
	pr_err("RPC:       %s: %s on device %s ep %p\n",
106
	       __func__, ib_event_msg(event->event),
C
Chuck Lever 已提交
107
		event->device->name, context);
108 109
	if (ep->rep_connected == 1) {
		ep->rep_connected = -EIO;
110
		rpcrdma_conn_func(ep);
111 112 113 114
		wake_up_all(&ep->rep_connect_wait);
	}
}

115 116 117 118 119
/**
 * rpcrdma_wc_send - Invoked by RDMA provider for each polled Send WC
 * @cq:	completion queue (ignored)
 * @wc:	completed WR
 *
120 121
 */
static void
122
rpcrdma_wc_send(struct ib_cq *cq, struct ib_wc *wc)
123
{
124 125 126 127 128
	/* WARNING: Only wr_cqe and status are reliable at this point */
	if (wc->status != IB_WC_SUCCESS && wc->status != IB_WC_WR_FLUSH_ERR)
		pr_err("rpcrdma: Send: %s (%u/0x%x)\n",
		       ib_wc_status_msg(wc->status),
		       wc->status, wc->vendor_err);
129
}
130

131
static void
132 133 134 135
rpcrdma_receive_worker(struct work_struct *work)
{
	struct rpcrdma_rep *rep =
			container_of(work, struct rpcrdma_rep, rr_work);
136

137
	rpcrdma_reply_handler(rep);
138 139
}

140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161
/* Perform basic sanity checking to avoid using garbage
 * to update the credit grant value.
 */
static void
rpcrdma_update_granted_credits(struct rpcrdma_rep *rep)
{
	struct rpcrdma_msg *rmsgp = rdmab_to_msg(rep->rr_rdmabuf);
	struct rpcrdma_buffer *buffer = &rep->rr_rxprt->rx_buf;
	u32 credits;

	if (rep->rr_len < RPCRDMA_HDRLEN_ERR)
		return;

	credits = be32_to_cpu(rmsgp->rm_credit);
	if (credits == 0)
		credits = 1;	/* don't deadlock */
	else if (credits > buffer->rb_max_requests)
		credits = buffer->rb_max_requests;

	atomic_set(&buffer->rb_credits, credits);
}

162 163 164 165 166 167
/**
 * rpcrdma_receive_wc - Invoked by RDMA provider for each polled Receive WC
 * @cq:	completion queue (ignored)
 * @wc:	completed WR
 *
 */
168
static void
169
rpcrdma_receive_wc(struct ib_cq *cq, struct ib_wc *wc)
170
{
171 172 173
	struct ib_cqe *cqe = wc->wr_cqe;
	struct rpcrdma_rep *rep = container_of(cqe, struct rpcrdma_rep,
					       rr_cqe);
174

175 176 177
	/* WARNING: Only wr_id and status are reliable at this point */
	if (wc->status != IB_WC_SUCCESS)
		goto out_fail;
178

179
	/* status == SUCCESS means all fields in wc are trustworthy */
180 181 182
	if (wc->opcode != IB_WC_RECV)
		return;

183 184 185
	dprintk("RPC:       %s: rep %p opcode 'recv', length %u: success\n",
		__func__, rep, wc->byte_len);

186
	rep->rr_len = wc->byte_len;
187
	ib_dma_sync_single_for_cpu(rep->rr_device,
188 189
				   rdmab_addr(rep->rr_rdmabuf),
				   rep->rr_len, DMA_FROM_DEVICE);
190 191

	rpcrdma_update_granted_credits(rep);
192 193

out_schedule:
194
	queue_work(rpcrdma_receive_wq, &rep->rr_work);
195
	return;
196

197 198
out_fail:
	if (wc->status != IB_WC_WR_FLUSH_ERR)
199 200 201
		pr_err("rpcrdma: Recv: %s (%u/0x%x)\n",
		       ib_wc_status_msg(wc->status),
		       wc->status, wc->vendor_err);
202
	rep->rr_len = RPCRDMA_BAD_LEN;
203
	goto out_schedule;
204 205
}

206 207 208 209 210 211
static int
rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event)
{
	struct rpcrdma_xprt *xprt = id->context;
	struct rpcrdma_ia *ia = &xprt->rx_ia;
	struct rpcrdma_ep *ep = &xprt->rx_ep;
J
Jeff Layton 已提交
212
#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
213
	struct sockaddr *sap = (struct sockaddr *)&ep->rep_remote_addr;
214
#endif
215 216
	struct ib_qp_attr *attr = &ia->ri_qp_attr;
	struct ib_qp_init_attr *iattr = &ia->ri_qp_init_attr;
217 218 219 220 221
	int connstate = 0;

	switch (event->event) {
	case RDMA_CM_EVENT_ADDR_RESOLVED:
	case RDMA_CM_EVENT_ROUTE_RESOLVED:
222
		ia->ri_async_rc = 0;
223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238
		complete(&ia->ri_done);
		break;
	case RDMA_CM_EVENT_ADDR_ERROR:
		ia->ri_async_rc = -EHOSTUNREACH;
		dprintk("RPC:       %s: CM address resolution error, ep 0x%p\n",
			__func__, ep);
		complete(&ia->ri_done);
		break;
	case RDMA_CM_EVENT_ROUTE_ERROR:
		ia->ri_async_rc = -ENETUNREACH;
		dprintk("RPC:       %s: CM route resolution error, ep 0x%p\n",
			__func__, ep);
		complete(&ia->ri_done);
		break;
	case RDMA_CM_EVENT_ESTABLISHED:
		connstate = 1;
239 240 241
		ib_query_qp(ia->ri_id->qp, attr,
			    IB_QP_MAX_QP_RD_ATOMIC | IB_QP_MAX_DEST_RD_ATOMIC,
			    iattr);
242 243
		dprintk("RPC:       %s: %d responder resources"
			" (%d initiator)\n",
244 245
			__func__, attr->max_dest_rd_atomic,
			attr->max_rd_atomic);
246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263
		goto connected;
	case RDMA_CM_EVENT_CONNECT_ERROR:
		connstate = -ENOTCONN;
		goto connected;
	case RDMA_CM_EVENT_UNREACHABLE:
		connstate = -ENETDOWN;
		goto connected;
	case RDMA_CM_EVENT_REJECTED:
		connstate = -ECONNREFUSED;
		goto connected;
	case RDMA_CM_EVENT_DISCONNECTED:
		connstate = -ECONNABORTED;
		goto connected;
	case RDMA_CM_EVENT_DEVICE_REMOVAL:
		connstate = -ENODEV;
connected:
		dprintk("RPC:       %s: %sconnected\n",
					__func__, connstate > 0 ? "" : "dis");
264
		atomic_set(&xprt->rx_buf.rb_credits, 1);
265
		ep->rep_connected = connstate;
266
		rpcrdma_conn_func(ep);
267
		wake_up_all(&ep->rep_connect_wait);
268
		/*FALLTHROUGH*/
269
	default:
270 271
		dprintk("RPC:       %s: %pIS:%u (ep 0x%p): %s\n",
			__func__, sap, rpc_get_port(sap), ep,
272
			rdma_event_msg(event->event));
273 274 275
		break;
	}

J
Jeff Layton 已提交
276
#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
277
	if (connstate == 1) {
278
		int ird = attr->max_dest_rd_atomic;
279
		int tird = ep->rep_remote_cma.responder_resources;
280

281
		pr_info("rpcrdma: connection to %pIS:%u on %s, memreg '%s', %d credits, %d responders%s\n",
282
			sap, rpc_get_port(sap),
283
			ia->ri_device->name,
284
			ia->ri_ops->ro_displayname,
285 286 287
			xprt->rx_buf.rb_max_requests,
			ird, ird < 4 && ird < tird / 2 ? " (low!)" : "");
	} else if (connstate < 0) {
288 289
		pr_info("rpcrdma: connection to %pIS:%u closed (%d)\n",
			sap, rpc_get_port(sap), connstate);
290 291 292
	}
#endif

293 294 295
	return 0;
}

296 297 298 299 300 301 302 303
static void rpcrdma_destroy_id(struct rdma_cm_id *id)
{
	if (id) {
		module_put(id->device->owner);
		rdma_destroy_id(id);
	}
}

304 305 306 307 308 309 310
static struct rdma_cm_id *
rpcrdma_create_id(struct rpcrdma_xprt *xprt,
			struct rpcrdma_ia *ia, struct sockaddr *addr)
{
	struct rdma_cm_id *id;
	int rc;

311 312
	init_completion(&ia->ri_done);

313 314
	id = rdma_create_id(&init_net, rpcrdma_conn_upcall, xprt, RDMA_PS_TCP,
			    IB_QPT_RC);
315 316 317 318 319 320 321
	if (IS_ERR(id)) {
		rc = PTR_ERR(id);
		dprintk("RPC:       %s: rdma_create_id() failed %i\n",
			__func__, rc);
		return id;
	}

322
	ia->ri_async_rc = -ETIMEDOUT;
323 324 325 326 327 328
	rc = rdma_resolve_addr(id, NULL, addr, RDMA_RESOLVE_TIMEOUT);
	if (rc) {
		dprintk("RPC:       %s: rdma_resolve_addr() failed %i\n",
			__func__, rc);
		goto out;
	}
329 330
	wait_for_completion_interruptible_timeout(&ia->ri_done,
				msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1);
331 332 333 334 335 336 337 338 339 340 341

	/* FIXME:
	 * Until xprtrdma supports DEVICE_REMOVAL, the provider must
	 * be pinned while there are active NFS/RDMA mounts to prevent
	 * hangs and crashes at umount time.
	 */
	if (!ia->ri_async_rc && !try_module_get(id->device->owner)) {
		dprintk("RPC:       %s: Failed to get device module\n",
			__func__);
		ia->ri_async_rc = -ENODEV;
	}
342 343 344 345
	rc = ia->ri_async_rc;
	if (rc)
		goto out;

346
	ia->ri_async_rc = -ETIMEDOUT;
347 348 349 350
	rc = rdma_resolve_route(id, RDMA_RESOLVE_TIMEOUT);
	if (rc) {
		dprintk("RPC:       %s: rdma_resolve_route() failed %i\n",
			__func__, rc);
351
		goto put;
352
	}
353 354
	wait_for_completion_interruptible_timeout(&ia->ri_done,
				msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1);
355 356
	rc = ia->ri_async_rc;
	if (rc)
357
		goto put;
358 359

	return id;
360 361
put:
	module_put(id->device->owner);
362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379
out:
	rdma_destroy_id(id);
	return ERR_PTR(rc);
}

/*
 * Exported functions.
 */

/*
 * Open and initialize an Interface Adapter.
 *  o initializes fields of struct rpcrdma_ia, including
 *    interface and provider attributes and protection zone.
 */
int
rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg)
{
	struct rpcrdma_ia *ia = &xprt->rx_ia;
380 381 382
	int rc;

	ia->ri_dma_mr = NULL;
383 384 385 386 387 388

	ia->ri_id = rpcrdma_create_id(xprt, ia, addr);
	if (IS_ERR(ia->ri_id)) {
		rc = PTR_ERR(ia->ri_id);
		goto out1;
	}
389
	ia->ri_device = ia->ri_id->device;
390

391
	ia->ri_pd = ib_alloc_pd(ia->ri_device);
392 393 394 395 396 397 398
	if (IS_ERR(ia->ri_pd)) {
		rc = PTR_ERR(ia->ri_pd);
		dprintk("RPC:       %s: ib_alloc_pd() failed %i\n",
			__func__, rc);
		goto out2;
	}

399
	if (memreg == RPCRDMA_FRMR) {
400 401 402
		if (!(ia->ri_device->attrs.device_cap_flags &
				IB_DEVICE_MEM_MGT_EXTENSIONS) ||
		    (ia->ri_device->attrs.max_fast_reg_page_list_len == 0)) {
403
			dprintk("RPC:       %s: FRMR registration "
404 405
				"not supported by HCA\n", __func__);
			memreg = RPCRDMA_MTHCAFMR;
406
		}
407 408
	}
	if (memreg == RPCRDMA_MTHCAFMR) {
409
		if (!ia->ri_device->alloc_fmr) {
410 411
			dprintk("RPC:       %s: MTHCAFMR registration "
				"not supported by HCA\n", __func__);
412
			rc = -EINVAL;
413
			goto out3;
414
		}
415 416 417
	}

	switch (memreg) {
418
	case RPCRDMA_FRMR:
419
		ia->ri_ops = &rpcrdma_frwr_memreg_ops;
420 421
		break;
	case RPCRDMA_ALLPHYSICAL:
422
		ia->ri_ops = &rpcrdma_physical_memreg_ops;
423
		break;
424
	case RPCRDMA_MTHCAFMR:
425
		ia->ri_ops = &rpcrdma_fmr_memreg_ops;
426 427
		break;
	default:
428 429 430
		printk(KERN_ERR "RPC: Unsupported memory "
				"registration mode: %d\n", memreg);
		rc = -ENOMEM;
431
		goto out3;
432
	}
433 434
	dprintk("RPC:       %s: memory registration strategy is '%s'\n",
		__func__, ia->ri_ops->ro_displayname);
435

436
	rwlock_init(&ia->ri_qplock);
437
	return 0;
438 439 440 441

out3:
	ib_dealloc_pd(ia->ri_pd);
	ia->ri_pd = NULL;
442
out2:
443
	rpcrdma_destroy_id(ia->ri_id);
444
	ia->ri_id = NULL;
445 446 447 448 449 450 451 452 453 454 455 456 457
out1:
	return rc;
}

/*
 * Clean up/close an IA.
 *   o if event handles and PD have been initialized, free them.
 *   o close the IA
 */
void
rpcrdma_ia_close(struct rpcrdma_ia *ia)
{
	dprintk("RPC:       %s: entering\n", __func__);
458 459 460
	if (ia->ri_id != NULL && !IS_ERR(ia->ri_id)) {
		if (ia->ri_id->qp)
			rdma_destroy_qp(ia->ri_id);
461
		rpcrdma_destroy_id(ia->ri_id);
462 463
		ia->ri_id = NULL;
	}
464 465 466

	/* If the pd is still busy, xprtrdma missed freeing a resource */
	if (ia->ri_pd && !IS_ERR(ia->ri_pd))
467
		ib_dealloc_pd(ia->ri_pd);
468 469 470 471 472 473 474 475 476
}

/*
 * Create unconnected endpoint.
 */
int
rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
				struct rpcrdma_create_data_internal *cdata)
{
477
	struct ib_cq *sendcq, *recvcq;
478
	unsigned int max_qp_wr;
479
	int rc;
480

481
	if (ia->ri_device->attrs.max_sge < RPCRDMA_MAX_IOVS) {
482 483 484 485 486
		dprintk("RPC:       %s: insufficient sge's available\n",
			__func__);
		return -ENOMEM;
	}

487
	if (ia->ri_device->attrs.max_qp_wr <= RPCRDMA_BACKWARD_WRS) {
488 489 490 491
		dprintk("RPC:       %s: insufficient wqe's available\n",
			__func__);
		return -ENOMEM;
	}
492
	max_qp_wr = ia->ri_device->attrs.max_qp_wr - RPCRDMA_BACKWARD_WRS - 1;
493

494
	/* check provider's send/recv wr limits */
495 496
	if (cdata->max_requests > max_qp_wr)
		cdata->max_requests = max_qp_wr;
497 498 499 500 501

	ep->rep_attr.event_handler = rpcrdma_qp_async_error_upcall;
	ep->rep_attr.qp_context = ep;
	ep->rep_attr.srq = NULL;
	ep->rep_attr.cap.max_send_wr = cdata->max_requests;
502
	ep->rep_attr.cap.max_send_wr += RPCRDMA_BACKWARD_WRS;
503
	ep->rep_attr.cap.max_send_wr += 1;	/* drain cqe */
C
Chuck Lever 已提交
504 505 506
	rc = ia->ri_ops->ro_open(ia, ep, cdata);
	if (rc)
		return rc;
507
	ep->rep_attr.cap.max_recv_wr = cdata->max_requests;
508
	ep->rep_attr.cap.max_recv_wr += RPCRDMA_BACKWARD_WRS;
509
	ep->rep_attr.cap.max_recv_wr += 1;	/* drain cqe */
510
	ep->rep_attr.cap.max_send_sge = RPCRDMA_MAX_IOVS;
511 512 513 514 515 516 517 518 519 520 521 522 523 524 525
	ep->rep_attr.cap.max_recv_sge = 1;
	ep->rep_attr.cap.max_inline_data = 0;
	ep->rep_attr.sq_sig_type = IB_SIGNAL_REQ_WR;
	ep->rep_attr.qp_type = IB_QPT_RC;
	ep->rep_attr.port_num = ~0;

	dprintk("RPC:       %s: requested max: dtos: send %d recv %d; "
		"iovs: send %d recv %d\n",
		__func__,
		ep->rep_attr.cap.max_send_wr,
		ep->rep_attr.cap.max_recv_wr,
		ep->rep_attr.cap.max_send_sge,
		ep->rep_attr.cap.max_recv_sge);

	/* set trigger for requesting send completion */
526
	ep->rep_cqinit = ep->rep_attr.cap.max_send_wr/2 - 1;
527 528
	if (ep->rep_cqinit <= 2)
		ep->rep_cqinit = 0;	/* always signal? */
529 530
	INIT_CQCOUNT(ep);
	init_waitqueue_head(&ep->rep_connect_wait);
531
	INIT_DELAYED_WORK(&ep->rep_connect_worker, rpcrdma_connect_worker);
532

533 534 535
	sendcq = ib_alloc_cq(ia->ri_device, NULL,
			     ep->rep_attr.cap.max_send_wr + 1,
			     0, IB_POLL_SOFTIRQ);
536 537 538
	if (IS_ERR(sendcq)) {
		rc = PTR_ERR(sendcq);
		dprintk("RPC:       %s: failed to create send CQ: %i\n",
539 540 541 542
			__func__, rc);
		goto out1;
	}

543 544 545
	recvcq = ib_alloc_cq(ia->ri_device, NULL,
			     ep->rep_attr.cap.max_recv_wr + 1,
			     0, IB_POLL_SOFTIRQ);
546 547 548 549 550 551 552 553 554
	if (IS_ERR(recvcq)) {
		rc = PTR_ERR(recvcq);
		dprintk("RPC:       %s: failed to create recv CQ: %i\n",
			__func__, rc);
		goto out2;
	}

	ep->rep_attr.send_cq = sendcq;
	ep->rep_attr.recv_cq = recvcq;
555 556 557 558 559 560 561 562

	/* Initialize cma parameters */

	/* RPC/RDMA does not use private data */
	ep->rep_remote_cma.private_data = NULL;
	ep->rep_remote_cma.private_data_len = 0;

	/* Client offers RDMA Read but does not initiate */
563
	ep->rep_remote_cma.initiator_depth = 0;
564
	if (ia->ri_device->attrs.max_qp_rd_atom > 32)	/* arbitrary but <= 255 */
565 566
		ep->rep_remote_cma.responder_resources = 32;
	else
567
		ep->rep_remote_cma.responder_resources =
568
						ia->ri_device->attrs.max_qp_rd_atom;
569 570 571 572 573 574 575 576

	ep->rep_remote_cma.retry_count = 7;
	ep->rep_remote_cma.flow_control = 0;
	ep->rep_remote_cma.rnr_retry_count = 0;

	return 0;

out2:
577
	ib_free_cq(sendcq);
578
out1:
579 580
	if (ia->ri_dma_mr)
		ib_dereg_mr(ia->ri_dma_mr);
581 582 583 584 585 586 587 588 589 590
	return rc;
}

/*
 * rpcrdma_ep_destroy
 *
 * Disconnect and destroy endpoint. After this, the only
 * valid operations on the ep are to free it (if dynamically
 * allocated) or re-create it.
 */
591
void
592 593 594 595 596 597 598
rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
{
	int rc;

	dprintk("RPC:       %s: entering, connected is %d\n",
		__func__, ep->rep_connected);

599 600
	cancel_delayed_work_sync(&ep->rep_connect_worker);

601
	if (ia->ri_id->qp) {
602
		rpcrdma_ep_disconnect(ep, ia);
603 604
		rdma_destroy_qp(ia->ri_id);
		ia->ri_id->qp = NULL;
605 606
	}

607
	ib_free_cq(ep->rep_attr.recv_cq);
608
	ib_free_cq(ep->rep_attr.send_cq);
609 610 611 612 613 614

	if (ia->ri_dma_mr) {
		rc = ib_dereg_mr(ia->ri_dma_mr);
		dprintk("RPC:       %s: ib_dereg_mr returned %i\n",
			__func__, rc);
	}
615 616 617 618 619 620 621 622
}

/*
 * Connect unconnected endpoint.
 */
int
rpcrdma_ep_connect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
{
623
	struct rdma_cm_id *id, *old;
624 625 626
	int rc = 0;
	int retry_count = 0;

627
	if (ep->rep_connected != 0) {
628 629
		struct rpcrdma_xprt *xprt;
retry:
630
		dprintk("RPC:       %s: reconnecting...\n", __func__);
631 632

		rpcrdma_ep_disconnect(ep, ia);
633 634 635 636 637

		xprt = container_of(ia, struct rpcrdma_xprt, rx_ia);
		id = rpcrdma_create_id(xprt, ia,
				(struct sockaddr *)&xprt->rx_data.addr);
		if (IS_ERR(id)) {
638
			rc = -EHOSTUNREACH;
639 640 641 642 643 644 645 646 647
			goto out;
		}
		/* TEMP TEMP TEMP - fail if new device:
		 * Deregister/remarshal *all* requests!
		 * Close and recreate adapter, pd, etc!
		 * Re-determine all attributes still sane!
		 * More stuff I haven't thought of!
		 * Rrrgh!
		 */
648
		if (ia->ri_device != id->device) {
649 650
			printk("RPC:       %s: can't reconnect on "
				"different device!\n", __func__);
651
			rpcrdma_destroy_id(id);
652
			rc = -ENETUNREACH;
653 654 655
			goto out;
		}
		/* END TEMP */
656 657 658 659
		rc = rdma_create_qp(id, ia->ri_pd, &ep->rep_attr);
		if (rc) {
			dprintk("RPC:       %s: rdma_create_qp failed %i\n",
				__func__, rc);
660
			rpcrdma_destroy_id(id);
661 662 663
			rc = -ENETUNREACH;
			goto out;
		}
664 665 666

		write_lock(&ia->ri_qplock);
		old = ia->ri_id;
667
		ia->ri_id = id;
668 669 670
		write_unlock(&ia->ri_qplock);

		rdma_destroy_qp(old);
671
		rpcrdma_destroy_id(old);
672 673 674 675 676 677 678 679 680
	} else {
		dprintk("RPC:       %s: connecting...\n", __func__);
		rc = rdma_create_qp(ia->ri_id, ia->ri_pd, &ep->rep_attr);
		if (rc) {
			dprintk("RPC:       %s: rdma_create_qp failed %i\n",
				__func__, rc);
			/* do not update ep->rep_connected */
			return -ENETUNREACH;
		}
681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699
	}

	ep->rep_connected = 0;

	rc = rdma_connect(ia->ri_id, &ep->rep_remote_cma);
	if (rc) {
		dprintk("RPC:       %s: rdma_connect() failed with %i\n",
				__func__, rc);
		goto out;
	}

	wait_event_interruptible(ep->rep_connect_wait, ep->rep_connected != 0);

	/*
	 * Check state. A non-peer reject indicates no listener
	 * (ECONNREFUSED), which may be a transient state. All
	 * others indicate a transport condition which has already
	 * undergone a best-effort.
	 */
700 701
	if (ep->rep_connected == -ECONNREFUSED &&
	    ++retry_count <= RDMA_CONNECT_RETRY_MAX) {
702 703 704 705 706 707
		dprintk("RPC:       %s: non-peer_reject, retry\n", __func__);
		goto retry;
	}
	if (ep->rep_connected <= 0) {
		/* Sometimes, the only way to reliably connect to remote
		 * CMs is to use same nonzero values for ORD and IRD. */
708 709 710 711 712 713 714 715
		if (retry_count++ <= RDMA_CONNECT_RETRY_MAX + 1 &&
		    (ep->rep_remote_cma.responder_resources == 0 ||
		     ep->rep_remote_cma.initiator_depth !=
				ep->rep_remote_cma.responder_resources)) {
			if (ep->rep_remote_cma.responder_resources == 0)
				ep->rep_remote_cma.responder_resources = 1;
			ep->rep_remote_cma.initiator_depth =
				ep->rep_remote_cma.responder_resources;
716
			goto retry;
717
		}
718 719
		rc = ep->rep_connected;
	} else {
720 721 722
		struct rpcrdma_xprt *r_xprt;
		unsigned int extras;

723
		dprintk("RPC:       %s: connected\n", __func__);
724 725 726 727 728 729

		r_xprt = container_of(ia, struct rpcrdma_xprt, rx_ia);
		extras = r_xprt->rx_buf.rb_bc_srv_max_requests;

		if (extras) {
			rc = rpcrdma_ep_post_extra_recv(r_xprt, extras);
730
			if (rc) {
731 732 733
				pr_warn("%s: rpcrdma_ep_post_extra_recv: %i\n",
					__func__, rc);
				rc = 0;
734
			}
735
		}
736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752
	}

out:
	if (rc)
		ep->rep_connected = rc;
	return rc;
}

/*
 * rpcrdma_ep_disconnect
 *
 * This is separate from destroy to facilitate the ability
 * to reconnect without recreating the endpoint.
 *
 * This call is not reentrant, and must not be made in parallel
 * on the same endpoint.
 */
753
void
754 755 756 757 758 759 760 761 762 763 764 765 766 767 768
rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
{
	int rc;

	rc = rdma_disconnect(ia->ri_id);
	if (!rc) {
		/* returns without wait if not connected */
		wait_event_interruptible(ep->rep_connect_wait,
							ep->rep_connected != 1);
		dprintk("RPC:       %s: after wait, %sconnected\n", __func__,
			(ep->rep_connected == 1) ? "still " : "dis");
	} else {
		dprintk("RPC:       %s: rdma_disconnect %i\n", __func__, rc);
		ep->rep_connected = rc;
	}
769 770

	ib_drain_qp(ia->ri_id->qp);
771 772
}

773
struct rpcrdma_req *
774 775
rpcrdma_create_req(struct rpcrdma_xprt *r_xprt)
{
776
	struct rpcrdma_buffer *buffer = &r_xprt->rx_buf;
777 778
	struct rpcrdma_req *req;

779
	req = kzalloc(sizeof(*req), GFP_KERNEL);
780
	if (req == NULL)
781
		return ERR_PTR(-ENOMEM);
782

783 784 785 786
	INIT_LIST_HEAD(&req->rl_free);
	spin_lock(&buffer->rb_reqslock);
	list_add(&req->rl_all, &buffer->rb_allreqs);
	spin_unlock(&buffer->rb_reqslock);
787
	req->rl_cqe.done = rpcrdma_wc_send;
788 789 790 791
	req->rl_buffer = &r_xprt->rx_buf;
	return req;
}

792
struct rpcrdma_rep *
793 794 795 796 797 798 799 800
rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt)
{
	struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
	struct rpcrdma_rep *rep;
	int rc;

	rc = -ENOMEM;
801
	rep = kzalloc(sizeof(*rep), GFP_KERNEL);
802 803 804
	if (rep == NULL)
		goto out;

805 806 807 808
	rep->rr_rdmabuf = rpcrdma_alloc_regbuf(ia, cdata->inline_rsize,
					       GFP_KERNEL);
	if (IS_ERR(rep->rr_rdmabuf)) {
		rc = PTR_ERR(rep->rr_rdmabuf);
809
		goto out_free;
810
	}
811

812
	rep->rr_device = ia->ri_device;
813
	rep->rr_cqe.done = rpcrdma_receive_wc;
814
	rep->rr_rxprt = r_xprt;
815
	INIT_WORK(&rep->rr_work, rpcrdma_receive_worker);
816 817 818 819 820 821 822 823
	return rep;

out_free:
	kfree(rep);
out:
	return ERR_PTR(rc);
}

824
int
825
rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
826
{
827 828
	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
829 830
	int i, rc;

831
	buf->rb_max_requests = r_xprt->rx_data.max_requests;
832
	buf->rb_bc_srv_max_requests = 0;
833
	spin_lock_init(&buf->rb_lock);
834
	atomic_set(&buf->rb_credits, 1);
835

C
Chuck Lever 已提交
836 837 838
	rc = ia->ri_ops->ro_init(r_xprt);
	if (rc)
		goto out;
839

840
	INIT_LIST_HEAD(&buf->rb_send_bufs);
841 842
	INIT_LIST_HEAD(&buf->rb_allreqs);
	spin_lock_init(&buf->rb_reqslock);
843 844 845
	for (i = 0; i < buf->rb_max_requests; i++) {
		struct rpcrdma_req *req;

846 847
		req = rpcrdma_create_req(r_xprt);
		if (IS_ERR(req)) {
848 849
			dprintk("RPC:       %s: request buffer %d alloc"
				" failed\n", __func__, i);
850
			rc = PTR_ERR(req);
851 852
			goto out;
		}
853
		req->rl_backchannel = false;
854 855 856 857 858 859
		list_add(&req->rl_free, &buf->rb_send_bufs);
	}

	INIT_LIST_HEAD(&buf->rb_recv_bufs);
	for (i = 0; i < buf->rb_max_requests + 2; i++) {
		struct rpcrdma_rep *rep;
860

861 862
		rep = rpcrdma_create_rep(r_xprt);
		if (IS_ERR(rep)) {
863 864
			dprintk("RPC:       %s: reply buffer %d alloc failed\n",
				__func__, i);
865
			rc = PTR_ERR(rep);
866 867
			goto out;
		}
868
		list_add(&rep->rr_list, &buf->rb_recv_bufs);
869
	}
870

871 872 873 874 875 876
	return 0;
out:
	rpcrdma_buffer_destroy(buf);
	return rc;
}

877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898
static struct rpcrdma_req *
rpcrdma_buffer_get_req_locked(struct rpcrdma_buffer *buf)
{
	struct rpcrdma_req *req;

	req = list_first_entry(&buf->rb_send_bufs,
			       struct rpcrdma_req, rl_free);
	list_del(&req->rl_free);
	return req;
}

static struct rpcrdma_rep *
rpcrdma_buffer_get_rep_locked(struct rpcrdma_buffer *buf)
{
	struct rpcrdma_rep *rep;

	rep = list_first_entry(&buf->rb_recv_bufs,
			       struct rpcrdma_rep, rr_list);
	list_del(&rep->rr_list);
	return rep;
}

899 900 901
static void
rpcrdma_destroy_rep(struct rpcrdma_ia *ia, struct rpcrdma_rep *rep)
{
902
	rpcrdma_free_regbuf(ia, rep->rr_rdmabuf);
903 904 905
	kfree(rep);
}

906
void
907 908
rpcrdma_destroy_req(struct rpcrdma_ia *ia, struct rpcrdma_req *req)
{
909
	rpcrdma_free_regbuf(ia, req->rl_sendbuf);
910
	rpcrdma_free_regbuf(ia, req->rl_rdmabuf);
911 912 913
	kfree(req);
}

914 915 916 917 918
void
rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
{
	struct rpcrdma_ia *ia = rdmab_to_ia(buf);

919 920
	while (!list_empty(&buf->rb_recv_bufs)) {
		struct rpcrdma_rep *rep;
921

922 923
		rep = rpcrdma_buffer_get_rep_locked(buf);
		rpcrdma_destroy_rep(ia, rep);
924 925
	}

926 927
	spin_lock(&buf->rb_reqslock);
	while (!list_empty(&buf->rb_allreqs)) {
928
		struct rpcrdma_req *req;
A
Allen Andrews 已提交
929

930 931 932 933 934
		req = list_first_entry(&buf->rb_allreqs,
				       struct rpcrdma_req, rl_all);
		list_del(&req->rl_all);

		spin_unlock(&buf->rb_reqslock);
935
		rpcrdma_destroy_req(ia, req);
936
		spin_lock(&buf->rb_reqslock);
937
	}
938
	spin_unlock(&buf->rb_reqslock);
A
Allen Andrews 已提交
939

940
	ia->ri_ops->ro_destroy(buf);
941 942
}

943 944
struct rpcrdma_mw *
rpcrdma_get_mw(struct rpcrdma_xprt *r_xprt)
945
{
946 947 948
	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
	struct rpcrdma_mw *mw = NULL;

C
Chuck Lever 已提交
949
	spin_lock(&buf->rb_mwlock);
950 951 952 953
	if (!list_empty(&buf->rb_mws)) {
		mw = list_first_entry(&buf->rb_mws,
				      struct rpcrdma_mw, mw_list);
		list_del_init(&mw->mw_list);
954
	}
C
Chuck Lever 已提交
955
	spin_unlock(&buf->rb_mwlock);
956 957 958 959

	if (!mw)
		pr_err("RPC:       %s: no MWs available\n", __func__);
	return mw;
960 961
}

962 963
void
rpcrdma_put_mw(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mw *mw)
964
{
965
	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
966

C
Chuck Lever 已提交
967
	spin_lock(&buf->rb_mwlock);
968
	list_add_tail(&mw->mw_list, &buf->rb_mws);
C
Chuck Lever 已提交
969
	spin_unlock(&buf->rb_mwlock);
970 971
}

972 973 974
/*
 * Get a set of request/reply buffers.
 *
975
 * Reply buffer (if available) is attached to send buffer upon return.
976 977 978 979 980
 */
struct rpcrdma_req *
rpcrdma_buffer_get(struct rpcrdma_buffer *buffers)
{
	struct rpcrdma_req *req;
981

982
	spin_lock(&buffers->rb_lock);
983 984 985 986 987 988
	if (list_empty(&buffers->rb_send_bufs))
		goto out_reqbuf;
	req = rpcrdma_buffer_get_req_locked(buffers);
	if (list_empty(&buffers->rb_recv_bufs))
		goto out_repbuf;
	req->rl_reply = rpcrdma_buffer_get_rep_locked(buffers);
989
	spin_unlock(&buffers->rb_lock);
990
	return req;
991

992
out_reqbuf:
993
	spin_unlock(&buffers->rb_lock);
994 995 996
	pr_warn("RPC:       %s: out of request buffers\n", __func__);
	return NULL;
out_repbuf:
997
	spin_unlock(&buffers->rb_lock);
998 999
	pr_warn("RPC:       %s: out of reply buffers\n", __func__);
	req->rl_reply = NULL;
1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010
	return req;
}

/*
 * Put request/reply buffers back into pool.
 * Pre-decrement counter/array index.
 */
void
rpcrdma_buffer_put(struct rpcrdma_req *req)
{
	struct rpcrdma_buffer *buffers = req->rl_buffer;
1011
	struct rpcrdma_rep *rep = req->rl_reply;
1012

1013 1014 1015
	req->rl_niovs = 0;
	req->rl_reply = NULL;

1016
	spin_lock(&buffers->rb_lock);
1017 1018 1019
	list_add_tail(&req->rl_free, &buffers->rb_send_bufs);
	if (rep)
		list_add_tail(&rep->rr_list, &buffers->rb_recv_bufs);
1020
	spin_unlock(&buffers->rb_lock);
1021 1022 1023 1024
}

/*
 * Recover reply buffers from pool.
1025
 * This happens when recovering from disconnect.
1026 1027 1028 1029 1030 1031
 */
void
rpcrdma_recv_buffer_get(struct rpcrdma_req *req)
{
	struct rpcrdma_buffer *buffers = req->rl_buffer;

1032
	spin_lock(&buffers->rb_lock);
1033 1034
	if (!list_empty(&buffers->rb_recv_bufs))
		req->rl_reply = rpcrdma_buffer_get_rep_locked(buffers);
1035
	spin_unlock(&buffers->rb_lock);
1036 1037 1038 1039
}

/*
 * Put reply buffers back into pool when not attached to
1040
 * request. This happens in error conditions.
1041 1042 1043 1044
 */
void
rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep)
{
1045
	struct rpcrdma_buffer *buffers = &rep->rr_rxprt->rx_buf;
1046

1047
	spin_lock(&buffers->rb_lock);
1048
	list_add_tail(&rep->rr_list, &buffers->rb_recv_bufs);
1049
	spin_unlock(&buffers->rb_lock);
1050 1051 1052 1053 1054 1055
}

/*
 * Wrappers for internal-use kmalloc memory registration, used by buffer code.
 */

1056 1057 1058 1059 1060 1061 1062 1063
void
rpcrdma_mapping_error(struct rpcrdma_mr_seg *seg)
{
	dprintk("RPC:       map_one: offset %p iova %llx len %zu\n",
		seg->mr_offset,
		(unsigned long long)seg->mr_dma, seg->mr_dmalen);
}

1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082
/**
 * rpcrdma_alloc_regbuf - kmalloc and register memory for SEND/RECV buffers
 * @ia: controlling rpcrdma_ia
 * @size: size of buffer to be allocated, in bytes
 * @flags: GFP flags
 *
 * Returns pointer to private header of an area of internally
 * registered memory, or an ERR_PTR. The registered buffer follows
 * the end of the private header.
 *
 * xprtrdma uses a regbuf for posting an outgoing RDMA SEND, or for
 * receiving the payload of RDMA RECV operations. regbufs are not
 * used for RDMA READ/WRITE operations, thus are registered only for
 * LOCAL access.
 */
struct rpcrdma_regbuf *
rpcrdma_alloc_regbuf(struct rpcrdma_ia *ia, size_t size, gfp_t flags)
{
	struct rpcrdma_regbuf *rb;
1083
	struct ib_sge *iov;
1084 1085 1086 1087 1088

	rb = kmalloc(sizeof(*rb) + size, flags);
	if (rb == NULL)
		goto out;

1089 1090 1091 1092 1093
	iov = &rb->rg_iov;
	iov->addr = ib_dma_map_single(ia->ri_device,
				      (void *)rb->rg_base, size,
				      DMA_BIDIRECTIONAL);
	if (ib_dma_mapping_error(ia->ri_device, iov->addr))
1094 1095
		goto out_free;

1096
	iov->length = size;
1097
	iov->lkey = ia->ri_pd->local_dma_lkey;
1098 1099
	rb->rg_size = size;
	rb->rg_owner = NULL;
1100 1101 1102 1103 1104
	return rb;

out_free:
	kfree(rb);
out:
1105
	return ERR_PTR(-ENOMEM);
1106 1107 1108 1109 1110 1111 1112 1113 1114 1115
}

/**
 * rpcrdma_free_regbuf - deregister and free registered buffer
 * @ia: controlling rpcrdma_ia
 * @rb: regbuf to be deregistered and freed
 */
void
rpcrdma_free_regbuf(struct rpcrdma_ia *ia, struct rpcrdma_regbuf *rb)
{
1116 1117 1118 1119 1120 1121 1122 1123 1124
	struct ib_sge *iov;

	if (!rb)
		return;

	iov = &rb->rg_iov;
	ib_dma_unmap_single(ia->ri_device,
			    iov->addr, iov->length, DMA_BIDIRECTIONAL);
	kfree(rb);
1125 1126
}

1127 1128 1129 1130 1131 1132 1133 1134 1135 1136
/*
 * Prepost any receive buffer, then post send.
 *
 * Receive buffer is donated to hardware, reclaimed upon recv completion.
 */
int
rpcrdma_ep_post(struct rpcrdma_ia *ia,
		struct rpcrdma_ep *ep,
		struct rpcrdma_req *req)
{
1137
	struct ib_device *device = ia->ri_device;
1138 1139
	struct ib_send_wr send_wr, *send_wr_fail;
	struct rpcrdma_rep *rep = req->rl_reply;
1140 1141
	struct ib_sge *iov = req->rl_send_iov;
	int i, rc;
1142 1143 1144 1145 1146 1147 1148 1149 1150

	if (rep) {
		rc = rpcrdma_ep_post_recv(ia, ep, rep);
		if (rc)
			goto out;
		req->rl_reply = NULL;
	}

	send_wr.next = NULL;
1151
	send_wr.wr_cqe = &req->rl_cqe;
1152
	send_wr.sg_list = iov;
1153 1154
	send_wr.num_sge = req->rl_niovs;
	send_wr.opcode = IB_WR_SEND;
1155 1156 1157 1158 1159 1160

	for (i = 0; i < send_wr.num_sge; i++)
		ib_dma_sync_single_for_device(device, iov[i].addr,
					      iov[i].length, DMA_TO_DEVICE);
	dprintk("RPC:       %s: posting %d s/g entries\n",
		__func__, send_wr.num_sge);
1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188

	if (DECR_CQCOUNT(ep) > 0)
		send_wr.send_flags = 0;
	else { /* Provider must take a send completion every now and then */
		INIT_CQCOUNT(ep);
		send_wr.send_flags = IB_SEND_SIGNALED;
	}

	rc = ib_post_send(ia->ri_id->qp, &send_wr, &send_wr_fail);
	if (rc)
		dprintk("RPC:       %s: ib_post_send returned %i\n", __func__,
			rc);
out:
	return rc;
}

/*
 * (Re)post a receive buffer.
 */
int
rpcrdma_ep_post_recv(struct rpcrdma_ia *ia,
		     struct rpcrdma_ep *ep,
		     struct rpcrdma_rep *rep)
{
	struct ib_recv_wr recv_wr, *recv_wr_fail;
	int rc;

	recv_wr.next = NULL;
1189
	recv_wr.wr_cqe = &rep->rr_cqe;
1190
	recv_wr.sg_list = &rep->rr_rdmabuf->rg_iov;
1191 1192
	recv_wr.num_sge = 1;

1193
	ib_dma_sync_single_for_cpu(ia->ri_device,
1194 1195 1196
				   rdmab_addr(rep->rr_rdmabuf),
				   rdmab_length(rep->rr_rdmabuf),
				   DMA_BIDIRECTIONAL);
1197 1198 1199 1200 1201 1202 1203 1204

	rc = ib_post_recv(ia->ri_id->qp, &recv_wr, &recv_wr_fail);

	if (rc)
		dprintk("RPC:       %s: ib_post_recv returned %i\n", __func__,
			rc);
	return rc;
}
1205

1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222
/**
 * rpcrdma_ep_post_extra_recv - Post buffers for incoming backchannel requests
 * @r_xprt: transport associated with these backchannel resources
 * @min_reqs: minimum number of incoming requests expected
 *
 * Returns zero if all requested buffers were posted, or a negative errno.
 */
int
rpcrdma_ep_post_extra_recv(struct rpcrdma_xprt *r_xprt, unsigned int count)
{
	struct rpcrdma_buffer *buffers = &r_xprt->rx_buf;
	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
	struct rpcrdma_ep *ep = &r_xprt->rx_ep;
	struct rpcrdma_rep *rep;
	int rc;

	while (count--) {
1223
		spin_lock(&buffers->rb_lock);
1224 1225 1226
		if (list_empty(&buffers->rb_recv_bufs))
			goto out_reqbuf;
		rep = rpcrdma_buffer_get_rep_locked(buffers);
1227
		spin_unlock(&buffers->rb_lock);
1228 1229 1230 1231 1232 1233 1234 1235 1236

		rc = rpcrdma_ep_post_recv(ia, ep, rep);
		if (rc)
			goto out_rc;
	}

	return 0;

out_reqbuf:
1237
	spin_unlock(&buffers->rb_lock);
1238 1239 1240 1241 1242 1243 1244
	pr_warn("%s: no extra receive buffers\n", __func__);
	return -ENOMEM;

out_rc:
	rpcrdma_recv_buffer_put(rep);
	return rc;
}