verbs.c 31.1 KB
Newer Older
1
/*
2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
 * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
 *
 * This software is available to you under a choice of one of two
 * licenses.  You may choose to be licensed under the terms of the GNU
 * General Public License (GPL) Version 2, available from the file
 * COPYING in the main directory of this source tree, or the BSD-type
 * license below:
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions
 * are met:
 *
 *      Redistributions of source code must retain the above copyright
 *      notice, this list of conditions and the following disclaimer.
 *
 *      Redistributions in binary form must reproduce the above
 *      copyright notice, this list of conditions and the following
 *      disclaimer in the documentation and/or other materials provided
 *      with the distribution.
 *
 *      Neither the name of the Network Appliance, Inc. nor the names of
 *      its contributors may be used to endorse or promote products
 *      derived from this software without specific prior written
 *      permission.
 *
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
38 39
 */

40 41 42 43 44 45 46 47 48 49
/*
 * verbs.c
 *
 * Encapsulates the major functions managing:
 *  o adapters
 *  o endpoints
 *  o connections
 *  o buffer memory
 */

50
#include <linux/interrupt.h>
51
#include <linux/slab.h>
52
#include <linux/prefetch.h>
53
#include <linux/sunrpc/addr.h>
54
#include <asm/bitops.h>
55
#include <linux/module.h> /* try_module_get()/module_put() */
56

57 58
#include "xprt_rdma.h"

59 60 61 62
/*
 * Globals/Macros
 */

J
Jeff Layton 已提交
63
#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
64 65 66 67 68 69 70
# define RPCDBG_FACILITY	RPCDBG_TRANS
#endif

/*
 * internal functions
 */

71
static struct workqueue_struct *rpcrdma_receive_wq;
72

73 74
int
rpcrdma_alloc_wq(void)
75
{
76
	struct workqueue_struct *recv_wq;
77

78 79 80 81 82
	recv_wq = alloc_workqueue("xprtrdma_receive",
				  WQ_MEM_RECLAIM | WQ_UNBOUND | WQ_HIGHPRI,
				  0);
	if (!recv_wq)
		return -ENOMEM;
83

84 85
	rpcrdma_receive_wq = recv_wq;
	return 0;
86 87
}

88 89
void
rpcrdma_destroy_wq(void)
90
{
91
	struct workqueue_struct *wq;
92

93 94 95 96 97
	if (rpcrdma_receive_wq) {
		wq = rpcrdma_receive_wq;
		rpcrdma_receive_wq = NULL;
		destroy_workqueue(wq);
	}
98 99
}

100 101 102 103 104
static void
rpcrdma_qp_async_error_upcall(struct ib_event *event, void *context)
{
	struct rpcrdma_ep *ep = context;

C
Chuck Lever 已提交
105
	pr_err("RPC:       %s: %s on device %s ep %p\n",
106
	       __func__, ib_event_msg(event->event),
C
Chuck Lever 已提交
107
		event->device->name, context);
108 109
	if (ep->rep_connected == 1) {
		ep->rep_connected = -EIO;
110
		rpcrdma_conn_func(ep);
111 112 113 114
		wake_up_all(&ep->rep_connect_wait);
	}
}

115 116 117 118 119
/**
 * rpcrdma_wc_send - Invoked by RDMA provider for each polled Send WC
 * @cq:	completion queue (ignored)
 * @wc:	completed WR
 *
120 121
 */
static void
122
rpcrdma_wc_send(struct ib_cq *cq, struct ib_wc *wc)
123
{
124 125 126 127 128
	/* WARNING: Only wr_cqe and status are reliable at this point */
	if (wc->status != IB_WC_SUCCESS && wc->status != IB_WC_WR_FLUSH_ERR)
		pr_err("rpcrdma: Send: %s (%u/0x%x)\n",
		       ib_wc_status_msg(wc->status),
		       wc->status, wc->vendor_err);
129
}
130

131
static void
132 133 134 135
rpcrdma_receive_worker(struct work_struct *work)
{
	struct rpcrdma_rep *rep =
			container_of(work, struct rpcrdma_rep, rr_work);
136

137
	rpcrdma_reply_handler(rep);
138 139
}

140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161
/* Perform basic sanity checking to avoid using garbage
 * to update the credit grant value.
 */
static void
rpcrdma_update_granted_credits(struct rpcrdma_rep *rep)
{
	struct rpcrdma_msg *rmsgp = rdmab_to_msg(rep->rr_rdmabuf);
	struct rpcrdma_buffer *buffer = &rep->rr_rxprt->rx_buf;
	u32 credits;

	if (rep->rr_len < RPCRDMA_HDRLEN_ERR)
		return;

	credits = be32_to_cpu(rmsgp->rm_credit);
	if (credits == 0)
		credits = 1;	/* don't deadlock */
	else if (credits > buffer->rb_max_requests)
		credits = buffer->rb_max_requests;

	atomic_set(&buffer->rb_credits, credits);
}

162 163 164 165 166 167
/**
 * rpcrdma_receive_wc - Invoked by RDMA provider for each polled Receive WC
 * @cq:	completion queue (ignored)
 * @wc:	completed WR
 *
 */
168
static void
169
rpcrdma_receive_wc(struct ib_cq *cq, struct ib_wc *wc)
170
{
171 172 173
	struct ib_cqe *cqe = wc->wr_cqe;
	struct rpcrdma_rep *rep = container_of(cqe, struct rpcrdma_rep,
					       rr_cqe);
174

175 176 177
	/* WARNING: Only wr_id and status are reliable at this point */
	if (wc->status != IB_WC_SUCCESS)
		goto out_fail;
178

179
	/* status == SUCCESS means all fields in wc are trustworthy */
180 181 182
	if (wc->opcode != IB_WC_RECV)
		return;

183 184 185
	dprintk("RPC:       %s: rep %p opcode 'recv', length %u: success\n",
		__func__, rep, wc->byte_len);

186
	rep->rr_len = wc->byte_len;
187
	ib_dma_sync_single_for_cpu(rep->rr_device,
188 189
				   rdmab_addr(rep->rr_rdmabuf),
				   rep->rr_len, DMA_FROM_DEVICE);
190 191

	rpcrdma_update_granted_credits(rep);
192 193

out_schedule:
194
	queue_work(rpcrdma_receive_wq, &rep->rr_work);
195
	return;
196

197 198
out_fail:
	if (wc->status != IB_WC_WR_FLUSH_ERR)
199 200 201
		pr_err("rpcrdma: Recv: %s (%u/0x%x)\n",
		       ib_wc_status_msg(wc->status),
		       wc->status, wc->vendor_err);
202
	rep->rr_len = RPCRDMA_BAD_LEN;
203
	goto out_schedule;
204 205
}

206 207 208 209 210 211
static int
rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event)
{
	struct rpcrdma_xprt *xprt = id->context;
	struct rpcrdma_ia *ia = &xprt->rx_ia;
	struct rpcrdma_ep *ep = &xprt->rx_ep;
J
Jeff Layton 已提交
212
#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
213
	struct sockaddr *sap = (struct sockaddr *)&ep->rep_remote_addr;
214
#endif
215 216
	struct ib_qp_attr *attr = &ia->ri_qp_attr;
	struct ib_qp_init_attr *iattr = &ia->ri_qp_init_attr;
217 218 219 220 221
	int connstate = 0;

	switch (event->event) {
	case RDMA_CM_EVENT_ADDR_RESOLVED:
	case RDMA_CM_EVENT_ROUTE_RESOLVED:
222
		ia->ri_async_rc = 0;
223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238
		complete(&ia->ri_done);
		break;
	case RDMA_CM_EVENT_ADDR_ERROR:
		ia->ri_async_rc = -EHOSTUNREACH;
		dprintk("RPC:       %s: CM address resolution error, ep 0x%p\n",
			__func__, ep);
		complete(&ia->ri_done);
		break;
	case RDMA_CM_EVENT_ROUTE_ERROR:
		ia->ri_async_rc = -ENETUNREACH;
		dprintk("RPC:       %s: CM route resolution error, ep 0x%p\n",
			__func__, ep);
		complete(&ia->ri_done);
		break;
	case RDMA_CM_EVENT_ESTABLISHED:
		connstate = 1;
239 240 241
		ib_query_qp(ia->ri_id->qp, attr,
			    IB_QP_MAX_QP_RD_ATOMIC | IB_QP_MAX_DEST_RD_ATOMIC,
			    iattr);
242 243
		dprintk("RPC:       %s: %d responder resources"
			" (%d initiator)\n",
244 245
			__func__, attr->max_dest_rd_atomic,
			attr->max_rd_atomic);
246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263
		goto connected;
	case RDMA_CM_EVENT_CONNECT_ERROR:
		connstate = -ENOTCONN;
		goto connected;
	case RDMA_CM_EVENT_UNREACHABLE:
		connstate = -ENETDOWN;
		goto connected;
	case RDMA_CM_EVENT_REJECTED:
		connstate = -ECONNREFUSED;
		goto connected;
	case RDMA_CM_EVENT_DISCONNECTED:
		connstate = -ECONNABORTED;
		goto connected;
	case RDMA_CM_EVENT_DEVICE_REMOVAL:
		connstate = -ENODEV;
connected:
		dprintk("RPC:       %s: %sconnected\n",
					__func__, connstate > 0 ? "" : "dis");
264
		atomic_set(&xprt->rx_buf.rb_credits, 1);
265
		ep->rep_connected = connstate;
266
		rpcrdma_conn_func(ep);
267
		wake_up_all(&ep->rep_connect_wait);
268
		/*FALLTHROUGH*/
269
	default:
270 271
		dprintk("RPC:       %s: %pIS:%u (ep 0x%p): %s\n",
			__func__, sap, rpc_get_port(sap), ep,
272
			rdma_event_msg(event->event));
273 274 275
		break;
	}

J
Jeff Layton 已提交
276
#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
277
	if (connstate == 1) {
278
		int ird = attr->max_dest_rd_atomic;
279
		int tird = ep->rep_remote_cma.responder_resources;
280

281
		pr_info("rpcrdma: connection to %pIS:%u on %s, memreg '%s', %d credits, %d responders%s\n",
282
			sap, rpc_get_port(sap),
283
			ia->ri_device->name,
284
			ia->ri_ops->ro_displayname,
285 286 287
			xprt->rx_buf.rb_max_requests,
			ird, ird < 4 && ird < tird / 2 ? " (low!)" : "");
	} else if (connstate < 0) {
288 289
		pr_info("rpcrdma: connection to %pIS:%u closed (%d)\n",
			sap, rpc_get_port(sap), connstate);
290 291 292
	}
#endif

293 294 295
	return 0;
}

296 297 298 299 300 301 302 303
static void rpcrdma_destroy_id(struct rdma_cm_id *id)
{
	if (id) {
		module_put(id->device->owner);
		rdma_destroy_id(id);
	}
}

304 305 306 307 308 309 310
static struct rdma_cm_id *
rpcrdma_create_id(struct rpcrdma_xprt *xprt,
			struct rpcrdma_ia *ia, struct sockaddr *addr)
{
	struct rdma_cm_id *id;
	int rc;

311 312
	init_completion(&ia->ri_done);

313 314
	id = rdma_create_id(&init_net, rpcrdma_conn_upcall, xprt, RDMA_PS_TCP,
			    IB_QPT_RC);
315 316 317 318 319 320 321
	if (IS_ERR(id)) {
		rc = PTR_ERR(id);
		dprintk("RPC:       %s: rdma_create_id() failed %i\n",
			__func__, rc);
		return id;
	}

322
	ia->ri_async_rc = -ETIMEDOUT;
323 324 325 326 327 328
	rc = rdma_resolve_addr(id, NULL, addr, RDMA_RESOLVE_TIMEOUT);
	if (rc) {
		dprintk("RPC:       %s: rdma_resolve_addr() failed %i\n",
			__func__, rc);
		goto out;
	}
329 330
	wait_for_completion_interruptible_timeout(&ia->ri_done,
				msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1);
331 332 333 334 335 336 337 338 339 340 341

	/* FIXME:
	 * Until xprtrdma supports DEVICE_REMOVAL, the provider must
	 * be pinned while there are active NFS/RDMA mounts to prevent
	 * hangs and crashes at umount time.
	 */
	if (!ia->ri_async_rc && !try_module_get(id->device->owner)) {
		dprintk("RPC:       %s: Failed to get device module\n",
			__func__);
		ia->ri_async_rc = -ENODEV;
	}
342 343 344 345
	rc = ia->ri_async_rc;
	if (rc)
		goto out;

346
	ia->ri_async_rc = -ETIMEDOUT;
347 348 349 350
	rc = rdma_resolve_route(id, RDMA_RESOLVE_TIMEOUT);
	if (rc) {
		dprintk("RPC:       %s: rdma_resolve_route() failed %i\n",
			__func__, rc);
351
		goto put;
352
	}
353 354
	wait_for_completion_interruptible_timeout(&ia->ri_done,
				msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1);
355 356
	rc = ia->ri_async_rc;
	if (rc)
357
		goto put;
358 359

	return id;
360 361
put:
	module_put(id->device->owner);
362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379
out:
	rdma_destroy_id(id);
	return ERR_PTR(rc);
}

/*
 * Exported functions.
 */

/*
 * Open and initialize an Interface Adapter.
 *  o initializes fields of struct rpcrdma_ia, including
 *    interface and provider attributes and protection zone.
 */
int
rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg)
{
	struct rpcrdma_ia *ia = &xprt->rx_ia;
380 381
	int rc;

382 383 384 385 386
	ia->ri_id = rpcrdma_create_id(xprt, ia, addr);
	if (IS_ERR(ia->ri_id)) {
		rc = PTR_ERR(ia->ri_id);
		goto out1;
	}
387
	ia->ri_device = ia->ri_id->device;
388

389
	ia->ri_pd = ib_alloc_pd(ia->ri_device);
390 391
	if (IS_ERR(ia->ri_pd)) {
		rc = PTR_ERR(ia->ri_pd);
392
		pr_err("rpcrdma: ib_alloc_pd() returned %d\n", rc);
393 394 395
		goto out2;
	}

396
	switch (memreg) {
397
	case RPCRDMA_FRMR:
398 399 400 401 402
		if (frwr_is_supported(ia)) {
			ia->ri_ops = &rpcrdma_frwr_memreg_ops;
			break;
		}
		/*FALLTHROUGH*/
403
	case RPCRDMA_MTHCAFMR:
404 405 406 407 408
		if (fmr_is_supported(ia)) {
			ia->ri_ops = &rpcrdma_fmr_memreg_ops;
			break;
		}
		/*FALLTHROUGH*/
409
	default:
410 411 412
		pr_err("rpcrdma: Unsupported memory registration mode: %d\n",
		       memreg);
		rc = -EINVAL;
413
		goto out3;
414 415 416
	}

	return 0;
417 418 419 420

out3:
	ib_dealloc_pd(ia->ri_pd);
	ia->ri_pd = NULL;
421
out2:
422
	rpcrdma_destroy_id(ia->ri_id);
423
	ia->ri_id = NULL;
424 425 426 427 428 429 430 431 432 433 434 435 436
out1:
	return rc;
}

/*
 * Clean up/close an IA.
 *   o if event handles and PD have been initialized, free them.
 *   o close the IA
 */
void
rpcrdma_ia_close(struct rpcrdma_ia *ia)
{
	dprintk("RPC:       %s: entering\n", __func__);
437 438 439
	if (ia->ri_id != NULL && !IS_ERR(ia->ri_id)) {
		if (ia->ri_id->qp)
			rdma_destroy_qp(ia->ri_id);
440
		rpcrdma_destroy_id(ia->ri_id);
441 442
		ia->ri_id = NULL;
	}
443 444 445

	/* If the pd is still busy, xprtrdma missed freeing a resource */
	if (ia->ri_pd && !IS_ERR(ia->ri_pd))
446
		ib_dealloc_pd(ia->ri_pd);
447 448 449 450 451 452 453 454 455
}

/*
 * Create unconnected endpoint.
 */
int
rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
				struct rpcrdma_create_data_internal *cdata)
{
456
	struct ib_cq *sendcq, *recvcq;
457
	unsigned int max_qp_wr;
458
	int rc;
459

460
	if (ia->ri_device->attrs.max_sge < RPCRDMA_MAX_IOVS) {
461 462 463 464 465
		dprintk("RPC:       %s: insufficient sge's available\n",
			__func__);
		return -ENOMEM;
	}

466
	if (ia->ri_device->attrs.max_qp_wr <= RPCRDMA_BACKWARD_WRS) {
467 468 469 470
		dprintk("RPC:       %s: insufficient wqe's available\n",
			__func__);
		return -ENOMEM;
	}
471
	max_qp_wr = ia->ri_device->attrs.max_qp_wr - RPCRDMA_BACKWARD_WRS - 1;
472

473
	/* check provider's send/recv wr limits */
474 475
	if (cdata->max_requests > max_qp_wr)
		cdata->max_requests = max_qp_wr;
476 477 478 479 480

	ep->rep_attr.event_handler = rpcrdma_qp_async_error_upcall;
	ep->rep_attr.qp_context = ep;
	ep->rep_attr.srq = NULL;
	ep->rep_attr.cap.max_send_wr = cdata->max_requests;
481
	ep->rep_attr.cap.max_send_wr += RPCRDMA_BACKWARD_WRS;
482
	ep->rep_attr.cap.max_send_wr += 1;	/* drain cqe */
C
Chuck Lever 已提交
483 484 485
	rc = ia->ri_ops->ro_open(ia, ep, cdata);
	if (rc)
		return rc;
486
	ep->rep_attr.cap.max_recv_wr = cdata->max_requests;
487
	ep->rep_attr.cap.max_recv_wr += RPCRDMA_BACKWARD_WRS;
488
	ep->rep_attr.cap.max_recv_wr += 1;	/* drain cqe */
489
	ep->rep_attr.cap.max_send_sge = RPCRDMA_MAX_IOVS;
490 491 492 493 494 495 496 497 498 499 500 501 502 503 504
	ep->rep_attr.cap.max_recv_sge = 1;
	ep->rep_attr.cap.max_inline_data = 0;
	ep->rep_attr.sq_sig_type = IB_SIGNAL_REQ_WR;
	ep->rep_attr.qp_type = IB_QPT_RC;
	ep->rep_attr.port_num = ~0;

	dprintk("RPC:       %s: requested max: dtos: send %d recv %d; "
		"iovs: send %d recv %d\n",
		__func__,
		ep->rep_attr.cap.max_send_wr,
		ep->rep_attr.cap.max_recv_wr,
		ep->rep_attr.cap.max_send_sge,
		ep->rep_attr.cap.max_recv_sge);

	/* set trigger for requesting send completion */
505
	ep->rep_cqinit = ep->rep_attr.cap.max_send_wr/2 - 1;
506 507
	if (ep->rep_cqinit <= 2)
		ep->rep_cqinit = 0;	/* always signal? */
508 509
	INIT_CQCOUNT(ep);
	init_waitqueue_head(&ep->rep_connect_wait);
510
	INIT_DELAYED_WORK(&ep->rep_connect_worker, rpcrdma_connect_worker);
511

512 513 514
	sendcq = ib_alloc_cq(ia->ri_device, NULL,
			     ep->rep_attr.cap.max_send_wr + 1,
			     0, IB_POLL_SOFTIRQ);
515 516 517
	if (IS_ERR(sendcq)) {
		rc = PTR_ERR(sendcq);
		dprintk("RPC:       %s: failed to create send CQ: %i\n",
518 519 520 521
			__func__, rc);
		goto out1;
	}

522 523 524
	recvcq = ib_alloc_cq(ia->ri_device, NULL,
			     ep->rep_attr.cap.max_recv_wr + 1,
			     0, IB_POLL_SOFTIRQ);
525 526 527 528 529 530 531 532 533
	if (IS_ERR(recvcq)) {
		rc = PTR_ERR(recvcq);
		dprintk("RPC:       %s: failed to create recv CQ: %i\n",
			__func__, rc);
		goto out2;
	}

	ep->rep_attr.send_cq = sendcq;
	ep->rep_attr.recv_cq = recvcq;
534 535

	/* Initialize cma parameters */
536
	memset(&ep->rep_remote_cma, 0, sizeof(ep->rep_remote_cma));
537 538 539 540 541 542

	/* RPC/RDMA does not use private data */
	ep->rep_remote_cma.private_data = NULL;
	ep->rep_remote_cma.private_data_len = 0;

	/* Client offers RDMA Read but does not initiate */
543
	ep->rep_remote_cma.initiator_depth = 0;
544
	if (ia->ri_device->attrs.max_qp_rd_atom > 32)	/* arbitrary but <= 255 */
545 546
		ep->rep_remote_cma.responder_resources = 32;
	else
547
		ep->rep_remote_cma.responder_resources =
548
						ia->ri_device->attrs.max_qp_rd_atom;
549

550 551 552 553 554 555 556 557 558 559
	/* Limit transport retries so client can detect server
	 * GID changes quickly. RPC layer handles re-establishing
	 * transport connection and retransmission.
	 */
	ep->rep_remote_cma.retry_count = 6;

	/* RPC-over-RDMA handles its own flow control. In addition,
	 * make all RNR NAKs visible so we know that RPC-over-RDMA
	 * flow control is working correctly (no NAKs should be seen).
	 */
560 561 562 563 564 565
	ep->rep_remote_cma.flow_control = 0;
	ep->rep_remote_cma.rnr_retry_count = 0;

	return 0;

out2:
566
	ib_free_cq(sendcq);
567 568 569 570 571 572 573 574 575 576 577
out1:
	return rc;
}

/*
 * rpcrdma_ep_destroy
 *
 * Disconnect and destroy endpoint. After this, the only
 * valid operations on the ep are to free it (if dynamically
 * allocated) or re-create it.
 */
578
void
579 580 581 582 583
rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
{
	dprintk("RPC:       %s: entering, connected is %d\n",
		__func__, ep->rep_connected);

584 585
	cancel_delayed_work_sync(&ep->rep_connect_worker);

586
	if (ia->ri_id->qp) {
587
		rpcrdma_ep_disconnect(ep, ia);
588 589
		rdma_destroy_qp(ia->ri_id);
		ia->ri_id->qp = NULL;
590 591
	}

592
	ib_free_cq(ep->rep_attr.recv_cq);
593
	ib_free_cq(ep->rep_attr.send_cq);
594 595 596 597 598 599 600 601
}

/*
 * Connect unconnected endpoint.
 */
int
rpcrdma_ep_connect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
{
602
	struct rdma_cm_id *id, *old;
603 604 605
	int rc = 0;
	int retry_count = 0;

606
	if (ep->rep_connected != 0) {
607 608
		struct rpcrdma_xprt *xprt;
retry:
609
		dprintk("RPC:       %s: reconnecting...\n", __func__);
610 611

		rpcrdma_ep_disconnect(ep, ia);
612 613 614 615 616

		xprt = container_of(ia, struct rpcrdma_xprt, rx_ia);
		id = rpcrdma_create_id(xprt, ia,
				(struct sockaddr *)&xprt->rx_data.addr);
		if (IS_ERR(id)) {
617
			rc = -EHOSTUNREACH;
618 619 620 621 622 623 624 625 626
			goto out;
		}
		/* TEMP TEMP TEMP - fail if new device:
		 * Deregister/remarshal *all* requests!
		 * Close and recreate adapter, pd, etc!
		 * Re-determine all attributes still sane!
		 * More stuff I haven't thought of!
		 * Rrrgh!
		 */
627
		if (ia->ri_device != id->device) {
628 629
			printk("RPC:       %s: can't reconnect on "
				"different device!\n", __func__);
630
			rpcrdma_destroy_id(id);
631
			rc = -ENETUNREACH;
632 633 634
			goto out;
		}
		/* END TEMP */
635 636 637 638
		rc = rdma_create_qp(id, ia->ri_pd, &ep->rep_attr);
		if (rc) {
			dprintk("RPC:       %s: rdma_create_qp failed %i\n",
				__func__, rc);
639
			rpcrdma_destroy_id(id);
640 641 642
			rc = -ENETUNREACH;
			goto out;
		}
643 644

		old = ia->ri_id;
645
		ia->ri_id = id;
646 647

		rdma_destroy_qp(old);
648
		rpcrdma_destroy_id(old);
649 650 651 652 653 654 655 656 657
	} else {
		dprintk("RPC:       %s: connecting...\n", __func__);
		rc = rdma_create_qp(ia->ri_id, ia->ri_pd, &ep->rep_attr);
		if (rc) {
			dprintk("RPC:       %s: rdma_create_qp failed %i\n",
				__func__, rc);
			/* do not update ep->rep_connected */
			return -ENETUNREACH;
		}
658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676
	}

	ep->rep_connected = 0;

	rc = rdma_connect(ia->ri_id, &ep->rep_remote_cma);
	if (rc) {
		dprintk("RPC:       %s: rdma_connect() failed with %i\n",
				__func__, rc);
		goto out;
	}

	wait_event_interruptible(ep->rep_connect_wait, ep->rep_connected != 0);

	/*
	 * Check state. A non-peer reject indicates no listener
	 * (ECONNREFUSED), which may be a transient state. All
	 * others indicate a transport condition which has already
	 * undergone a best-effort.
	 */
677 678
	if (ep->rep_connected == -ECONNREFUSED &&
	    ++retry_count <= RDMA_CONNECT_RETRY_MAX) {
679 680 681 682 683 684
		dprintk("RPC:       %s: non-peer_reject, retry\n", __func__);
		goto retry;
	}
	if (ep->rep_connected <= 0) {
		/* Sometimes, the only way to reliably connect to remote
		 * CMs is to use same nonzero values for ORD and IRD. */
685 686 687 688 689 690 691 692
		if (retry_count++ <= RDMA_CONNECT_RETRY_MAX + 1 &&
		    (ep->rep_remote_cma.responder_resources == 0 ||
		     ep->rep_remote_cma.initiator_depth !=
				ep->rep_remote_cma.responder_resources)) {
			if (ep->rep_remote_cma.responder_resources == 0)
				ep->rep_remote_cma.responder_resources = 1;
			ep->rep_remote_cma.initiator_depth =
				ep->rep_remote_cma.responder_resources;
693
			goto retry;
694
		}
695 696
		rc = ep->rep_connected;
	} else {
697 698 699
		struct rpcrdma_xprt *r_xprt;
		unsigned int extras;

700
		dprintk("RPC:       %s: connected\n", __func__);
701 702 703 704 705 706

		r_xprt = container_of(ia, struct rpcrdma_xprt, rx_ia);
		extras = r_xprt->rx_buf.rb_bc_srv_max_requests;

		if (extras) {
			rc = rpcrdma_ep_post_extra_recv(r_xprt, extras);
707
			if (rc) {
708 709 710
				pr_warn("%s: rpcrdma_ep_post_extra_recv: %i\n",
					__func__, rc);
				rc = 0;
711
			}
712
		}
713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729
	}

out:
	if (rc)
		ep->rep_connected = rc;
	return rc;
}

/*
 * rpcrdma_ep_disconnect
 *
 * This is separate from destroy to facilitate the ability
 * to reconnect without recreating the endpoint.
 *
 * This call is not reentrant, and must not be made in parallel
 * on the same endpoint.
 */
730
void
731 732 733 734 735 736 737 738 739 740 741 742 743 744 745
rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
{
	int rc;

	rc = rdma_disconnect(ia->ri_id);
	if (!rc) {
		/* returns without wait if not connected */
		wait_event_interruptible(ep->rep_connect_wait,
							ep->rep_connected != 1);
		dprintk("RPC:       %s: after wait, %sconnected\n", __func__,
			(ep->rep_connected == 1) ? "still " : "dis");
	} else {
		dprintk("RPC:       %s: rdma_disconnect %i\n", __func__, rc);
		ep->rep_connected = rc;
	}
746 747

	ib_drain_qp(ia->ri_id->qp);
748 749
}

750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784
static void
rpcrdma_mr_recovery_worker(struct work_struct *work)
{
	struct rpcrdma_buffer *buf = container_of(work, struct rpcrdma_buffer,
						  rb_recovery_worker.work);
	struct rpcrdma_mw *mw;

	spin_lock(&buf->rb_recovery_lock);
	while (!list_empty(&buf->rb_stale_mrs)) {
		mw = list_first_entry(&buf->rb_stale_mrs,
				      struct rpcrdma_mw, mw_list);
		list_del_init(&mw->mw_list);
		spin_unlock(&buf->rb_recovery_lock);

		dprintk("RPC:       %s: recovering MR %p\n", __func__, mw);
		mw->mw_xprt->rx_ia.ri_ops->ro_recover_mr(mw);

		spin_lock(&buf->rb_recovery_lock);
	};
	spin_unlock(&buf->rb_recovery_lock);
}

void
rpcrdma_defer_mr_recovery(struct rpcrdma_mw *mw)
{
	struct rpcrdma_xprt *r_xprt = mw->mw_xprt;
	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;

	spin_lock(&buf->rb_recovery_lock);
	list_add(&mw->mw_list, &buf->rb_stale_mrs);
	spin_unlock(&buf->rb_recovery_lock);

	schedule_delayed_work(&buf->rb_recovery_worker, 0);
}

785
struct rpcrdma_req *
786 787
rpcrdma_create_req(struct rpcrdma_xprt *r_xprt)
{
788
	struct rpcrdma_buffer *buffer = &r_xprt->rx_buf;
789 790
	struct rpcrdma_req *req;

791
	req = kzalloc(sizeof(*req), GFP_KERNEL);
792
	if (req == NULL)
793
		return ERR_PTR(-ENOMEM);
794

795 796 797 798
	INIT_LIST_HEAD(&req->rl_free);
	spin_lock(&buffer->rb_reqslock);
	list_add(&req->rl_all, &buffer->rb_allreqs);
	spin_unlock(&buffer->rb_reqslock);
799
	req->rl_cqe.done = rpcrdma_wc_send;
800 801 802 803
	req->rl_buffer = &r_xprt->rx_buf;
	return req;
}

804
struct rpcrdma_rep *
805 806 807 808 809 810 811 812
rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt)
{
	struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
	struct rpcrdma_rep *rep;
	int rc;

	rc = -ENOMEM;
813
	rep = kzalloc(sizeof(*rep), GFP_KERNEL);
814 815 816
	if (rep == NULL)
		goto out;

817 818 819 820
	rep->rr_rdmabuf = rpcrdma_alloc_regbuf(ia, cdata->inline_rsize,
					       GFP_KERNEL);
	if (IS_ERR(rep->rr_rdmabuf)) {
		rc = PTR_ERR(rep->rr_rdmabuf);
821
		goto out_free;
822
	}
823

824
	rep->rr_device = ia->ri_device;
825
	rep->rr_cqe.done = rpcrdma_receive_wc;
826
	rep->rr_rxprt = r_xprt;
827
	INIT_WORK(&rep->rr_work, rpcrdma_receive_worker);
828 829 830 831 832 833 834 835
	return rep;

out_free:
	kfree(rep);
out:
	return ERR_PTR(rc);
}

836
int
837
rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
838
{
839 840
	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
841 842
	int i, rc;

843
	buf->rb_max_requests = r_xprt->rx_data.max_requests;
844
	buf->rb_bc_srv_max_requests = 0;
845
	atomic_set(&buf->rb_credits, 1);
846 847 848 849 850
	spin_lock_init(&buf->rb_lock);
	spin_lock_init(&buf->rb_recovery_lock);
	INIT_LIST_HEAD(&buf->rb_stale_mrs);
	INIT_DELAYED_WORK(&buf->rb_recovery_worker,
			  rpcrdma_mr_recovery_worker);
851

C
Chuck Lever 已提交
852 853 854
	rc = ia->ri_ops->ro_init(r_xprt);
	if (rc)
		goto out;
855

856
	INIT_LIST_HEAD(&buf->rb_send_bufs);
857 858
	INIT_LIST_HEAD(&buf->rb_allreqs);
	spin_lock_init(&buf->rb_reqslock);
859 860 861
	for (i = 0; i < buf->rb_max_requests; i++) {
		struct rpcrdma_req *req;

862 863
		req = rpcrdma_create_req(r_xprt);
		if (IS_ERR(req)) {
864 865
			dprintk("RPC:       %s: request buffer %d alloc"
				" failed\n", __func__, i);
866
			rc = PTR_ERR(req);
867 868
			goto out;
		}
869
		req->rl_backchannel = false;
870 871 872 873
		list_add(&req->rl_free, &buf->rb_send_bufs);
	}

	INIT_LIST_HEAD(&buf->rb_recv_bufs);
874
	for (i = 0; i < buf->rb_max_requests; i++) {
875
		struct rpcrdma_rep *rep;
876

877 878
		rep = rpcrdma_create_rep(r_xprt);
		if (IS_ERR(rep)) {
879 880
			dprintk("RPC:       %s: reply buffer %d alloc failed\n",
				__func__, i);
881
			rc = PTR_ERR(rep);
882 883
			goto out;
		}
884
		list_add(&rep->rr_list, &buf->rb_recv_bufs);
885
	}
886

887 888 889 890 891 892
	return 0;
out:
	rpcrdma_buffer_destroy(buf);
	return rc;
}

893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914
static struct rpcrdma_req *
rpcrdma_buffer_get_req_locked(struct rpcrdma_buffer *buf)
{
	struct rpcrdma_req *req;

	req = list_first_entry(&buf->rb_send_bufs,
			       struct rpcrdma_req, rl_free);
	list_del(&req->rl_free);
	return req;
}

static struct rpcrdma_rep *
rpcrdma_buffer_get_rep_locked(struct rpcrdma_buffer *buf)
{
	struct rpcrdma_rep *rep;

	rep = list_first_entry(&buf->rb_recv_bufs,
			       struct rpcrdma_rep, rr_list);
	list_del(&rep->rr_list);
	return rep;
}

915 916 917
static void
rpcrdma_destroy_rep(struct rpcrdma_ia *ia, struct rpcrdma_rep *rep)
{
918
	rpcrdma_free_regbuf(ia, rep->rr_rdmabuf);
919 920 921
	kfree(rep);
}

922
void
923 924
rpcrdma_destroy_req(struct rpcrdma_ia *ia, struct rpcrdma_req *req)
{
925
	rpcrdma_free_regbuf(ia, req->rl_sendbuf);
926
	rpcrdma_free_regbuf(ia, req->rl_rdmabuf);
927 928 929
	kfree(req);
}

930 931 932 933 934
void
rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
{
	struct rpcrdma_ia *ia = rdmab_to_ia(buf);

935 936
	cancel_delayed_work_sync(&buf->rb_recovery_worker);

937 938
	while (!list_empty(&buf->rb_recv_bufs)) {
		struct rpcrdma_rep *rep;
939

940 941
		rep = rpcrdma_buffer_get_rep_locked(buf);
		rpcrdma_destroy_rep(ia, rep);
942 943
	}

944 945
	spin_lock(&buf->rb_reqslock);
	while (!list_empty(&buf->rb_allreqs)) {
946
		struct rpcrdma_req *req;
A
Allen Andrews 已提交
947

948 949 950 951 952
		req = list_first_entry(&buf->rb_allreqs,
				       struct rpcrdma_req, rl_all);
		list_del(&req->rl_all);

		spin_unlock(&buf->rb_reqslock);
953
		rpcrdma_destroy_req(ia, req);
954
		spin_lock(&buf->rb_reqslock);
955
	}
956
	spin_unlock(&buf->rb_reqslock);
A
Allen Andrews 已提交
957

958
	ia->ri_ops->ro_destroy(buf);
959 960
}

961 962
struct rpcrdma_mw *
rpcrdma_get_mw(struct rpcrdma_xprt *r_xprt)
963
{
964 965 966
	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
	struct rpcrdma_mw *mw = NULL;

C
Chuck Lever 已提交
967
	spin_lock(&buf->rb_mwlock);
968 969 970 971
	if (!list_empty(&buf->rb_mws)) {
		mw = list_first_entry(&buf->rb_mws,
				      struct rpcrdma_mw, mw_list);
		list_del_init(&mw->mw_list);
972
	}
C
Chuck Lever 已提交
973
	spin_unlock(&buf->rb_mwlock);
974 975 976 977

	if (!mw)
		pr_err("RPC:       %s: no MWs available\n", __func__);
	return mw;
978 979
}

980 981
void
rpcrdma_put_mw(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mw *mw)
982
{
983
	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
984

C
Chuck Lever 已提交
985
	spin_lock(&buf->rb_mwlock);
986
	list_add_tail(&mw->mw_list, &buf->rb_mws);
C
Chuck Lever 已提交
987
	spin_unlock(&buf->rb_mwlock);
988 989
}

990 991 992 993 994 995 996
/*
 * Get a set of request/reply buffers.
 */
struct rpcrdma_req *
rpcrdma_buffer_get(struct rpcrdma_buffer *buffers)
{
	struct rpcrdma_req *req;
997

998
	spin_lock(&buffers->rb_lock);
999 1000 1001 1002 1003 1004
	if (list_empty(&buffers->rb_send_bufs))
		goto out_reqbuf;
	req = rpcrdma_buffer_get_req_locked(buffers);
	if (list_empty(&buffers->rb_recv_bufs))
		goto out_repbuf;
	req->rl_reply = rpcrdma_buffer_get_rep_locked(buffers);
1005
	spin_unlock(&buffers->rb_lock);
1006
	return req;
1007

1008
out_reqbuf:
1009
	spin_unlock(&buffers->rb_lock);
1010
	pr_warn("rpcrdma: out of request buffers (%p)\n", buffers);
1011 1012
	return NULL;
out_repbuf:
1013
	list_add(&req->rl_free, &buffers->rb_send_bufs);
1014
	spin_unlock(&buffers->rb_lock);
1015 1016
	pr_warn("rpcrdma: out of reply buffers (%p)\n", buffers);
	return NULL;
1017 1018 1019 1020 1021 1022 1023 1024 1025 1026
}

/*
 * Put request/reply buffers back into pool.
 * Pre-decrement counter/array index.
 */
void
rpcrdma_buffer_put(struct rpcrdma_req *req)
{
	struct rpcrdma_buffer *buffers = req->rl_buffer;
1027
	struct rpcrdma_rep *rep = req->rl_reply;
1028

1029 1030 1031
	req->rl_niovs = 0;
	req->rl_reply = NULL;

1032
	spin_lock(&buffers->rb_lock);
1033 1034 1035
	list_add_tail(&req->rl_free, &buffers->rb_send_bufs);
	if (rep)
		list_add_tail(&rep->rr_list, &buffers->rb_recv_bufs);
1036
	spin_unlock(&buffers->rb_lock);
1037 1038 1039 1040
}

/*
 * Recover reply buffers from pool.
1041
 * This happens when recovering from disconnect.
1042 1043 1044 1045 1046 1047
 */
void
rpcrdma_recv_buffer_get(struct rpcrdma_req *req)
{
	struct rpcrdma_buffer *buffers = req->rl_buffer;

1048
	spin_lock(&buffers->rb_lock);
1049 1050
	if (!list_empty(&buffers->rb_recv_bufs))
		req->rl_reply = rpcrdma_buffer_get_rep_locked(buffers);
1051
	spin_unlock(&buffers->rb_lock);
1052 1053 1054 1055
}

/*
 * Put reply buffers back into pool when not attached to
1056
 * request. This happens in error conditions.
1057 1058 1059 1060
 */
void
rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep)
{
1061
	struct rpcrdma_buffer *buffers = &rep->rr_rxprt->rx_buf;
1062

1063
	spin_lock(&buffers->rb_lock);
1064
	list_add_tail(&rep->rr_list, &buffers->rb_recv_bufs);
1065
	spin_unlock(&buffers->rb_lock);
1066 1067 1068 1069 1070 1071
}

/*
 * Wrappers for internal-use kmalloc memory registration, used by buffer code.
 */

1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090
/**
 * rpcrdma_alloc_regbuf - kmalloc and register memory for SEND/RECV buffers
 * @ia: controlling rpcrdma_ia
 * @size: size of buffer to be allocated, in bytes
 * @flags: GFP flags
 *
 * Returns pointer to private header of an area of internally
 * registered memory, or an ERR_PTR. The registered buffer follows
 * the end of the private header.
 *
 * xprtrdma uses a regbuf for posting an outgoing RDMA SEND, or for
 * receiving the payload of RDMA RECV operations. regbufs are not
 * used for RDMA READ/WRITE operations, thus are registered only for
 * LOCAL access.
 */
struct rpcrdma_regbuf *
rpcrdma_alloc_regbuf(struct rpcrdma_ia *ia, size_t size, gfp_t flags)
{
	struct rpcrdma_regbuf *rb;
1091
	struct ib_sge *iov;
1092 1093 1094 1095 1096

	rb = kmalloc(sizeof(*rb) + size, flags);
	if (rb == NULL)
		goto out;

1097 1098 1099 1100 1101
	iov = &rb->rg_iov;
	iov->addr = ib_dma_map_single(ia->ri_device,
				      (void *)rb->rg_base, size,
				      DMA_BIDIRECTIONAL);
	if (ib_dma_mapping_error(ia->ri_device, iov->addr))
1102 1103
		goto out_free;

1104
	iov->length = size;
1105
	iov->lkey = ia->ri_pd->local_dma_lkey;
1106 1107
	rb->rg_size = size;
	rb->rg_owner = NULL;
1108 1109 1110 1111 1112
	return rb;

out_free:
	kfree(rb);
out:
1113
	return ERR_PTR(-ENOMEM);
1114 1115 1116 1117 1118 1119 1120 1121 1122 1123
}

/**
 * rpcrdma_free_regbuf - deregister and free registered buffer
 * @ia: controlling rpcrdma_ia
 * @rb: regbuf to be deregistered and freed
 */
void
rpcrdma_free_regbuf(struct rpcrdma_ia *ia, struct rpcrdma_regbuf *rb)
{
1124 1125 1126 1127 1128 1129 1130 1131 1132
	struct ib_sge *iov;

	if (!rb)
		return;

	iov = &rb->rg_iov;
	ib_dma_unmap_single(ia->ri_device,
			    iov->addr, iov->length, DMA_BIDIRECTIONAL);
	kfree(rb);
1133 1134
}

1135 1136 1137 1138 1139 1140 1141 1142 1143 1144
/*
 * Prepost any receive buffer, then post send.
 *
 * Receive buffer is donated to hardware, reclaimed upon recv completion.
 */
int
rpcrdma_ep_post(struct rpcrdma_ia *ia,
		struct rpcrdma_ep *ep,
		struct rpcrdma_req *req)
{
1145
	struct ib_device *device = ia->ri_device;
1146 1147
	struct ib_send_wr send_wr, *send_wr_fail;
	struct rpcrdma_rep *rep = req->rl_reply;
1148 1149
	struct ib_sge *iov = req->rl_send_iov;
	int i, rc;
1150 1151 1152 1153 1154 1155 1156 1157 1158

	if (rep) {
		rc = rpcrdma_ep_post_recv(ia, ep, rep);
		if (rc)
			goto out;
		req->rl_reply = NULL;
	}

	send_wr.next = NULL;
1159
	send_wr.wr_cqe = &req->rl_cqe;
1160
	send_wr.sg_list = iov;
1161 1162
	send_wr.num_sge = req->rl_niovs;
	send_wr.opcode = IB_WR_SEND;
1163 1164 1165 1166 1167 1168

	for (i = 0; i < send_wr.num_sge; i++)
		ib_dma_sync_single_for_device(device, iov[i].addr,
					      iov[i].length, DMA_TO_DEVICE);
	dprintk("RPC:       %s: posting %d s/g entries\n",
		__func__, send_wr.num_sge);
1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196

	if (DECR_CQCOUNT(ep) > 0)
		send_wr.send_flags = 0;
	else { /* Provider must take a send completion every now and then */
		INIT_CQCOUNT(ep);
		send_wr.send_flags = IB_SEND_SIGNALED;
	}

	rc = ib_post_send(ia->ri_id->qp, &send_wr, &send_wr_fail);
	if (rc)
		dprintk("RPC:       %s: ib_post_send returned %i\n", __func__,
			rc);
out:
	return rc;
}

/*
 * (Re)post a receive buffer.
 */
int
rpcrdma_ep_post_recv(struct rpcrdma_ia *ia,
		     struct rpcrdma_ep *ep,
		     struct rpcrdma_rep *rep)
{
	struct ib_recv_wr recv_wr, *recv_wr_fail;
	int rc;

	recv_wr.next = NULL;
1197
	recv_wr.wr_cqe = &rep->rr_cqe;
1198
	recv_wr.sg_list = &rep->rr_rdmabuf->rg_iov;
1199 1200
	recv_wr.num_sge = 1;

1201
	ib_dma_sync_single_for_cpu(ia->ri_device,
1202 1203 1204
				   rdmab_addr(rep->rr_rdmabuf),
				   rdmab_length(rep->rr_rdmabuf),
				   DMA_BIDIRECTIONAL);
1205 1206 1207 1208 1209 1210 1211 1212

	rc = ib_post_recv(ia->ri_id->qp, &recv_wr, &recv_wr_fail);

	if (rc)
		dprintk("RPC:       %s: ib_post_recv returned %i\n", __func__,
			rc);
	return rc;
}
1213

1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230
/**
 * rpcrdma_ep_post_extra_recv - Post buffers for incoming backchannel requests
 * @r_xprt: transport associated with these backchannel resources
 * @min_reqs: minimum number of incoming requests expected
 *
 * Returns zero if all requested buffers were posted, or a negative errno.
 */
int
rpcrdma_ep_post_extra_recv(struct rpcrdma_xprt *r_xprt, unsigned int count)
{
	struct rpcrdma_buffer *buffers = &r_xprt->rx_buf;
	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
	struct rpcrdma_ep *ep = &r_xprt->rx_ep;
	struct rpcrdma_rep *rep;
	int rc;

	while (count--) {
1231
		spin_lock(&buffers->rb_lock);
1232 1233 1234
		if (list_empty(&buffers->rb_recv_bufs))
			goto out_reqbuf;
		rep = rpcrdma_buffer_get_rep_locked(buffers);
1235
		spin_unlock(&buffers->rb_lock);
1236 1237 1238 1239 1240 1241 1242 1243 1244

		rc = rpcrdma_ep_post_recv(ia, ep, rep);
		if (rc)
			goto out_rc;
	}

	return 0;

out_reqbuf:
1245
	spin_unlock(&buffers->rb_lock);
1246 1247 1248 1249 1250 1251 1252
	pr_warn("%s: no extra receive buffers\n", __func__);
	return -ENOMEM;

out_rc:
	rpcrdma_recv_buffer_put(rep);
	return rc;
}