verbs.c 31.8 KB
Newer Older
1
/*
2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
 * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
 *
 * This software is available to you under a choice of one of two
 * licenses.  You may choose to be licensed under the terms of the GNU
 * General Public License (GPL) Version 2, available from the file
 * COPYING in the main directory of this source tree, or the BSD-type
 * license below:
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions
 * are met:
 *
 *      Redistributions of source code must retain the above copyright
 *      notice, this list of conditions and the following disclaimer.
 *
 *      Redistributions in binary form must reproduce the above
 *      copyright notice, this list of conditions and the following
 *      disclaimer in the documentation and/or other materials provided
 *      with the distribution.
 *
 *      Neither the name of the Network Appliance, Inc. nor the names of
 *      its contributors may be used to endorse or promote products
 *      derived from this software without specific prior written
 *      permission.
 *
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
38 39
 */

40 41 42 43 44 45 46 47 48 49
/*
 * verbs.c
 *
 * Encapsulates the major functions managing:
 *  o adapters
 *  o endpoints
 *  o connections
 *  o buffer memory
 */

50
#include <linux/interrupt.h>
51
#include <linux/slab.h>
52
#include <linux/prefetch.h>
53
#include <linux/sunrpc/addr.h>
54
#include <asm/bitops.h>
55
#include <linux/module.h> /* try_module_get()/module_put() */
56

57 58
#include "xprt_rdma.h"

59 60 61 62
/*
 * Globals/Macros
 */

J
Jeff Layton 已提交
63
#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
64 65 66 67 68 69 70
# define RPCDBG_FACILITY	RPCDBG_TRANS
#endif

/*
 * internal functions
 */

71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99
static struct workqueue_struct *rpcrdma_receive_wq;

int
rpcrdma_alloc_wq(void)
{
	struct workqueue_struct *recv_wq;

	recv_wq = alloc_workqueue("xprtrdma_receive",
				  WQ_MEM_RECLAIM | WQ_UNBOUND | WQ_HIGHPRI,
				  0);
	if (!recv_wq)
		return -ENOMEM;

	rpcrdma_receive_wq = recv_wq;
	return 0;
}

void
rpcrdma_destroy_wq(void)
{
	struct workqueue_struct *wq;

	if (rpcrdma_receive_wq) {
		wq = rpcrdma_receive_wq;
		rpcrdma_receive_wq = NULL;
		destroy_workqueue(wq);
	}
}

100 101 102 103 104
static void
rpcrdma_qp_async_error_upcall(struct ib_event *event, void *context)
{
	struct rpcrdma_ep *ep = context;

C
Chuck Lever 已提交
105
	pr_err("RPC:       %s: %s on device %s ep %p\n",
106
	       __func__, ib_event_msg(event->event),
C
Chuck Lever 已提交
107
		event->device->name, context);
108 109
	if (ep->rep_connected == 1) {
		ep->rep_connected = -EIO;
110
		rpcrdma_conn_func(ep);
111 112 113 114 115 116 117 118 119
		wake_up_all(&ep->rep_connect_wait);
	}
}

static void
rpcrdma_cq_async_error_upcall(struct ib_event *event, void *context)
{
	struct rpcrdma_ep *ep = context;

C
Chuck Lever 已提交
120
	pr_err("RPC:       %s: %s on device %s ep %p\n",
121
	       __func__, ib_event_msg(event->event),
C
Chuck Lever 已提交
122
		event->device->name, context);
123 124
	if (ep->rep_connected == 1) {
		ep->rep_connected = -EIO;
125
		rpcrdma_conn_func(ep);
126 127 128 129
		wake_up_all(&ep->rep_connect_wait);
	}
}

130 131
static void
rpcrdma_sendcq_process_wc(struct ib_wc *wc)
132
{
133
	/* WARNING: Only wr_id and status are reliable at this point */
134 135 136
	if (wc->wr_id == RPCRDMA_IGNORE_COMPLETION) {
		if (wc->status != IB_WC_SUCCESS &&
		    wc->status != IB_WC_WR_FLUSH_ERR)
137
			pr_err("RPC:       %s: SEND: %s\n",
138
			       __func__, ib_wc_status_msg(wc->status));
139 140 141 142
	} else {
		struct rpcrdma_mw *r;

		r = (struct rpcrdma_mw *)(unsigned long)wc->wr_id;
143
		r->mw_sendcompletion(wc);
144
	}
145 146
}

147 148 149 150 151 152 153 154
/* The common case is a single send completion is waiting. By
 * passing two WC entries to ib_poll_cq, a return code of 1
 * means there is exactly one WC waiting and no more. We don't
 * have to invoke ib_poll_cq again to know that the CQ has been
 * properly drained.
 */
static void
rpcrdma_sendcq_poll(struct ib_cq *cq)
155
{
156 157
	struct ib_wc *pos, wcs[2];
	int count, rc;
158

159
	do {
160
		pos = wcs;
161

162 163 164
		rc = ib_poll_cq(cq, ARRAY_SIZE(wcs), pos);
		if (rc < 0)
			break;
165 166 167

		count = rc;
		while (count-- > 0)
168 169 170
			rpcrdma_sendcq_process_wc(pos++);
	} while (rc == ARRAY_SIZE(wcs));
	return;
171
}
172

C
Chuck Lever 已提交
173
/* Handle provider send completion upcalls.
174 175 176 177
 */
static void
rpcrdma_sendcq_upcall(struct ib_cq *cq, void *cq_context)
{
C
Chuck Lever 已提交
178
	do {
179
		rpcrdma_sendcq_poll(cq);
C
Chuck Lever 已提交
180 181
	} while (ib_req_notify_cq(cq, IB_CQ_NEXT_COMP |
				  IB_CQ_REPORT_MISSED_EVENTS) > 0);
182 183 184
}

static void
185 186 187 188 189 190 191 192 193 194
rpcrdma_receive_worker(struct work_struct *work)
{
	struct rpcrdma_rep *rep =
			container_of(work, struct rpcrdma_rep, rr_work);

	rpcrdma_reply_handler(rep);
}

static void
rpcrdma_recvcq_process_wc(struct ib_wc *wc)
195 196 197 198
{
	struct rpcrdma_rep *rep =
			(struct rpcrdma_rep *)(unsigned long)wc->wr_id;

199 200 201
	/* WARNING: Only wr_id and status are reliable at this point */
	if (wc->status != IB_WC_SUCCESS)
		goto out_fail;
202

203
	/* status == SUCCESS means all fields in wc are trustworthy */
204 205 206
	if (wc->opcode != IB_WC_RECV)
		return;

207 208 209
	dprintk("RPC:       %s: rep %p opcode 'recv', length %u: success\n",
		__func__, rep, wc->byte_len);

210
	rep->rr_len = wc->byte_len;
211
	ib_dma_sync_single_for_cpu(rep->rr_device,
212 213 214
				   rdmab_addr(rep->rr_rdmabuf),
				   rep->rr_len, DMA_FROM_DEVICE);
	prefetch(rdmab_to_msg(rep->rr_rdmabuf));
215 216

out_schedule:
217
	queue_work(rpcrdma_receive_wq, &rep->rr_work);
218
	return;
219

220 221 222
out_fail:
	if (wc->status != IB_WC_WR_FLUSH_ERR)
		pr_err("RPC:       %s: rep %p: %s\n",
223
		       __func__, rep, ib_wc_status_msg(wc->status));
224
	rep->rr_len = RPCRDMA_BAD_LEN;
225
	goto out_schedule;
226 227
}

228 229 230 231 232 233 234 235
/* The wc array is on stack: automatic memory is always CPU-local.
 *
 * struct ib_wc is 64 bytes, making the poll array potentially
 * large. But this is at the bottom of the call chain. Further
 * substantial work is done in another thread.
 */
static void
rpcrdma_recvcq_poll(struct ib_cq *cq)
236
{
237 238
	struct ib_wc *pos, wcs[4];
	int count, rc;
239

240
	do {
241
		pos = wcs;
242

243 244 245
		rc = ib_poll_cq(cq, ARRAY_SIZE(wcs), pos);
		if (rc < 0)
			break;
246 247 248

		count = rc;
		while (count-- > 0)
249
			rpcrdma_recvcq_process_wc(pos++);
250
	} while (rc == ARRAY_SIZE(wcs));
251 252
}

C
Chuck Lever 已提交
253
/* Handle provider receive completion upcalls.
254 255
 */
static void
256
rpcrdma_recvcq_upcall(struct ib_cq *cq, void *cq_context)
257
{
C
Chuck Lever 已提交
258
	do {
259
		rpcrdma_recvcq_poll(cq);
C
Chuck Lever 已提交
260 261
	} while (ib_req_notify_cq(cq, IB_CQ_NEXT_COMP |
				  IB_CQ_REPORT_MISSED_EVENTS) > 0);
262 263
}

264 265 266
static void
rpcrdma_flush_cqs(struct rpcrdma_ep *ep)
{
267 268 269
	struct ib_wc wc;

	while (ib_poll_cq(ep->rep_attr.recv_cq, 1, &wc) > 0)
270
		rpcrdma_recvcq_process_wc(&wc);
271 272
	while (ib_poll_cq(ep->rep_attr.send_cq, 1, &wc) > 0)
		rpcrdma_sendcq_process_wc(&wc);
273 274
}

275 276 277 278 279 280
static int
rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event)
{
	struct rpcrdma_xprt *xprt = id->context;
	struct rpcrdma_ia *ia = &xprt->rx_ia;
	struct rpcrdma_ep *ep = &xprt->rx_ep;
J
Jeff Layton 已提交
281
#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
282
	struct sockaddr *sap = (struct sockaddr *)&ep->rep_remote_addr;
283
#endif
284 285
	struct ib_qp_attr *attr = &ia->ri_qp_attr;
	struct ib_qp_init_attr *iattr = &ia->ri_qp_init_attr;
286 287 288 289 290
	int connstate = 0;

	switch (event->event) {
	case RDMA_CM_EVENT_ADDR_RESOLVED:
	case RDMA_CM_EVENT_ROUTE_RESOLVED:
291
		ia->ri_async_rc = 0;
292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307
		complete(&ia->ri_done);
		break;
	case RDMA_CM_EVENT_ADDR_ERROR:
		ia->ri_async_rc = -EHOSTUNREACH;
		dprintk("RPC:       %s: CM address resolution error, ep 0x%p\n",
			__func__, ep);
		complete(&ia->ri_done);
		break;
	case RDMA_CM_EVENT_ROUTE_ERROR:
		ia->ri_async_rc = -ENETUNREACH;
		dprintk("RPC:       %s: CM route resolution error, ep 0x%p\n",
			__func__, ep);
		complete(&ia->ri_done);
		break;
	case RDMA_CM_EVENT_ESTABLISHED:
		connstate = 1;
308 309 310
		ib_query_qp(ia->ri_id->qp, attr,
			    IB_QP_MAX_QP_RD_ATOMIC | IB_QP_MAX_DEST_RD_ATOMIC,
			    iattr);
311 312
		dprintk("RPC:       %s: %d responder resources"
			" (%d initiator)\n",
313 314
			__func__, attr->max_dest_rd_atomic,
			attr->max_rd_atomic);
315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333
		goto connected;
	case RDMA_CM_EVENT_CONNECT_ERROR:
		connstate = -ENOTCONN;
		goto connected;
	case RDMA_CM_EVENT_UNREACHABLE:
		connstate = -ENETDOWN;
		goto connected;
	case RDMA_CM_EVENT_REJECTED:
		connstate = -ECONNREFUSED;
		goto connected;
	case RDMA_CM_EVENT_DISCONNECTED:
		connstate = -ECONNABORTED;
		goto connected;
	case RDMA_CM_EVENT_DEVICE_REMOVAL:
		connstate = -ENODEV;
connected:
		dprintk("RPC:       %s: %sconnected\n",
					__func__, connstate > 0 ? "" : "dis");
		ep->rep_connected = connstate;
334
		rpcrdma_conn_func(ep);
335
		wake_up_all(&ep->rep_connect_wait);
336
		/*FALLTHROUGH*/
337
	default:
338 339
		dprintk("RPC:       %s: %pIS:%u (ep 0x%p): %s\n",
			__func__, sap, rpc_get_port(sap), ep,
340
			rdma_event_msg(event->event));
341 342 343
		break;
	}

J
Jeff Layton 已提交
344
#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
345
	if (connstate == 1) {
346
		int ird = attr->max_dest_rd_atomic;
347
		int tird = ep->rep_remote_cma.responder_resources;
348

349
		pr_info("rpcrdma: connection to %pIS:%u on %s, memreg '%s', %d credits, %d responders%s\n",
350
			sap, rpc_get_port(sap),
351
			ia->ri_device->name,
352
			ia->ri_ops->ro_displayname,
353 354 355
			xprt->rx_buf.rb_max_requests,
			ird, ird < 4 && ird < tird / 2 ? " (low!)" : "");
	} else if (connstate < 0) {
356 357
		pr_info("rpcrdma: connection to %pIS:%u closed (%d)\n",
			sap, rpc_get_port(sap), connstate);
358 359 360
	}
#endif

361 362 363
	return 0;
}

364 365 366 367 368 369 370 371
static void rpcrdma_destroy_id(struct rdma_cm_id *id)
{
	if (id) {
		module_put(id->device->owner);
		rdma_destroy_id(id);
	}
}

372 373 374 375 376 377 378
static struct rdma_cm_id *
rpcrdma_create_id(struct rpcrdma_xprt *xprt,
			struct rpcrdma_ia *ia, struct sockaddr *addr)
{
	struct rdma_cm_id *id;
	int rc;

379 380
	init_completion(&ia->ri_done);

381
	id = rdma_create_id(rpcrdma_conn_upcall, xprt, RDMA_PS_TCP, IB_QPT_RC);
382 383 384 385 386 387 388
	if (IS_ERR(id)) {
		rc = PTR_ERR(id);
		dprintk("RPC:       %s: rdma_create_id() failed %i\n",
			__func__, rc);
		return id;
	}

389
	ia->ri_async_rc = -ETIMEDOUT;
390 391 392 393 394 395
	rc = rdma_resolve_addr(id, NULL, addr, RDMA_RESOLVE_TIMEOUT);
	if (rc) {
		dprintk("RPC:       %s: rdma_resolve_addr() failed %i\n",
			__func__, rc);
		goto out;
	}
396 397
	wait_for_completion_interruptible_timeout(&ia->ri_done,
				msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1);
398 399 400 401 402 403 404 405 406 407 408

	/* FIXME:
	 * Until xprtrdma supports DEVICE_REMOVAL, the provider must
	 * be pinned while there are active NFS/RDMA mounts to prevent
	 * hangs and crashes at umount time.
	 */
	if (!ia->ri_async_rc && !try_module_get(id->device->owner)) {
		dprintk("RPC:       %s: Failed to get device module\n",
			__func__);
		ia->ri_async_rc = -ENODEV;
	}
409 410 411 412
	rc = ia->ri_async_rc;
	if (rc)
		goto out;

413
	ia->ri_async_rc = -ETIMEDOUT;
414 415 416 417
	rc = rdma_resolve_route(id, RDMA_RESOLVE_TIMEOUT);
	if (rc) {
		dprintk("RPC:       %s: rdma_resolve_route() failed %i\n",
			__func__, rc);
418
		goto put;
419
	}
420 421
	wait_for_completion_interruptible_timeout(&ia->ri_done,
				msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1);
422 423
	rc = ia->ri_async_rc;
	if (rc)
424
		goto put;
425 426

	return id;
427 428
put:
	module_put(id->device->owner);
429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463
out:
	rdma_destroy_id(id);
	return ERR_PTR(rc);
}

/*
 * Drain any cq, prior to teardown.
 */
static void
rpcrdma_clean_cq(struct ib_cq *cq)
{
	struct ib_wc wc;
	int count = 0;

	while (1 == ib_poll_cq(cq, 1, &wc))
		++count;

	if (count)
		dprintk("RPC:       %s: flushed %d events (last 0x%x)\n",
			__func__, count, wc.opcode);
}

/*
 * Exported functions.
 */

/*
 * Open and initialize an Interface Adapter.
 *  o initializes fields of struct rpcrdma_ia, including
 *    interface and provider attributes and protection zone.
 */
int
rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg)
{
	struct rpcrdma_ia *ia = &xprt->rx_ia;
464
	struct ib_device_attr *devattr = &ia->ri_devattr;
465 466 467
	int rc;

	ia->ri_dma_mr = NULL;
468 469 470 471 472 473

	ia->ri_id = rpcrdma_create_id(xprt, ia, addr);
	if (IS_ERR(ia->ri_id)) {
		rc = PTR_ERR(ia->ri_id);
		goto out1;
	}
474
	ia->ri_device = ia->ri_id->device;
475

476
	ia->ri_pd = ib_alloc_pd(ia->ri_device);
477 478 479 480 481 482 483
	if (IS_ERR(ia->ri_pd)) {
		rc = PTR_ERR(ia->ri_pd);
		dprintk("RPC:       %s: ib_alloc_pd() failed %i\n",
			__func__, rc);
		goto out2;
	}

484
	rc = ib_query_device(ia->ri_device, devattr);
485 486 487
	if (rc) {
		dprintk("RPC:       %s: ib_query_device failed %d\n",
			__func__, rc);
488
		goto out3;
489 490
	}

491
	if (memreg == RPCRDMA_FRMR) {
492 493
		if (!(devattr->device_cap_flags & IB_DEVICE_MEM_MGT_EXTENSIONS) ||
		    (devattr->max_fast_reg_page_list_len == 0)) {
494
			dprintk("RPC:       %s: FRMR registration "
495 496
				"not supported by HCA\n", __func__);
			memreg = RPCRDMA_MTHCAFMR;
497
		}
498 499
	}
	if (memreg == RPCRDMA_MTHCAFMR) {
500
		if (!ia->ri_device->alloc_fmr) {
501 502
			dprintk("RPC:       %s: MTHCAFMR registration "
				"not supported by HCA\n", __func__);
503
			rc = -EINVAL;
504
			goto out3;
505
		}
506 507 508
	}

	switch (memreg) {
509
	case RPCRDMA_FRMR:
510
		ia->ri_ops = &rpcrdma_frwr_memreg_ops;
511 512
		break;
	case RPCRDMA_ALLPHYSICAL:
513
		ia->ri_ops = &rpcrdma_physical_memreg_ops;
514
		break;
515
	case RPCRDMA_MTHCAFMR:
516
		ia->ri_ops = &rpcrdma_fmr_memreg_ops;
517 518
		break;
	default:
519 520 521
		printk(KERN_ERR "RPC: Unsupported memory "
				"registration mode: %d\n", memreg);
		rc = -ENOMEM;
522
		goto out3;
523
	}
524 525
	dprintk("RPC:       %s: memory registration strategy is '%s'\n",
		__func__, ia->ri_ops->ro_displayname);
526

527
	rwlock_init(&ia->ri_qplock);
528
	return 0;
529 530 531 532

out3:
	ib_dealloc_pd(ia->ri_pd);
	ia->ri_pd = NULL;
533
out2:
534
	rpcrdma_destroy_id(ia->ri_id);
535
	ia->ri_id = NULL;
536 537 538 539 540 541 542 543 544 545 546 547 548
out1:
	return rc;
}

/*
 * Clean up/close an IA.
 *   o if event handles and PD have been initialized, free them.
 *   o close the IA
 */
void
rpcrdma_ia_close(struct rpcrdma_ia *ia)
{
	dprintk("RPC:       %s: entering\n", __func__);
549 550 551
	if (ia->ri_id != NULL && !IS_ERR(ia->ri_id)) {
		if (ia->ri_id->qp)
			rdma_destroy_qp(ia->ri_id);
552
		rpcrdma_destroy_id(ia->ri_id);
553 554
		ia->ri_id = NULL;
	}
555 556 557

	/* If the pd is still busy, xprtrdma missed freeing a resource */
	if (ia->ri_pd && !IS_ERR(ia->ri_pd))
558
		ib_dealloc_pd(ia->ri_pd);
559 560 561 562 563 564 565 566 567
}

/*
 * Create unconnected endpoint.
 */
int
rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
				struct rpcrdma_create_data_internal *cdata)
{
568
	struct ib_device_attr *devattr = &ia->ri_devattr;
569
	struct ib_cq *sendcq, *recvcq;
570
	struct ib_cq_init_attr cq_attr = {};
C
Chuck Lever 已提交
571
	int rc, err;
572

573 574 575 576 577 578
	if (devattr->max_sge < RPCRDMA_MAX_IOVS) {
		dprintk("RPC:       %s: insufficient sge's available\n",
			__func__);
		return -ENOMEM;
	}

579
	/* check provider's send/recv wr limits */
580 581
	if (cdata->max_requests > devattr->max_qp_wr)
		cdata->max_requests = devattr->max_qp_wr;
582 583 584 585 586

	ep->rep_attr.event_handler = rpcrdma_qp_async_error_upcall;
	ep->rep_attr.qp_context = ep;
	ep->rep_attr.srq = NULL;
	ep->rep_attr.cap.max_send_wr = cdata->max_requests;
C
Chuck Lever 已提交
587 588 589
	rc = ia->ri_ops->ro_open(ia, ep, cdata);
	if (rc)
		return rc;
590
	ep->rep_attr.cap.max_recv_wr = cdata->max_requests;
591
	ep->rep_attr.cap.max_send_sge = RPCRDMA_MAX_IOVS;
592 593 594 595 596 597 598 599 600 601 602 603 604 605 606
	ep->rep_attr.cap.max_recv_sge = 1;
	ep->rep_attr.cap.max_inline_data = 0;
	ep->rep_attr.sq_sig_type = IB_SIGNAL_REQ_WR;
	ep->rep_attr.qp_type = IB_QPT_RC;
	ep->rep_attr.port_num = ~0;

	dprintk("RPC:       %s: requested max: dtos: send %d recv %d; "
		"iovs: send %d recv %d\n",
		__func__,
		ep->rep_attr.cap.max_send_wr,
		ep->rep_attr.cap.max_recv_wr,
		ep->rep_attr.cap.max_send_sge,
		ep->rep_attr.cap.max_recv_sge);

	/* set trigger for requesting send completion */
607
	ep->rep_cqinit = ep->rep_attr.cap.max_send_wr/2 - 1;
C
Chuck Lever 已提交
608 609 610
	if (ep->rep_cqinit > RPCRDMA_MAX_UNSIGNALED_SENDS)
		ep->rep_cqinit = RPCRDMA_MAX_UNSIGNALED_SENDS;
	else if (ep->rep_cqinit <= 2)
611 612 613
		ep->rep_cqinit = 0;
	INIT_CQCOUNT(ep);
	init_waitqueue_head(&ep->rep_connect_wait);
614
	INIT_DELAYED_WORK(&ep->rep_connect_worker, rpcrdma_connect_worker);
615

616
	cq_attr.cqe = ep->rep_attr.cap.max_send_wr + 1;
617
	sendcq = ib_create_cq(ia->ri_device, rpcrdma_sendcq_upcall,
618
			      rpcrdma_cq_async_error_upcall, NULL, &cq_attr);
619 620 621
	if (IS_ERR(sendcq)) {
		rc = PTR_ERR(sendcq);
		dprintk("RPC:       %s: failed to create send CQ: %i\n",
622 623 624 625
			__func__, rc);
		goto out1;
	}

626
	rc = ib_req_notify_cq(sendcq, IB_CQ_NEXT_COMP);
627 628 629 630 631 632
	if (rc) {
		dprintk("RPC:       %s: ib_req_notify_cq failed: %i\n",
			__func__, rc);
		goto out2;
	}

633
	cq_attr.cqe = ep->rep_attr.cap.max_recv_wr + 1;
634
	recvcq = ib_create_cq(ia->ri_device, rpcrdma_recvcq_upcall,
635
			      rpcrdma_cq_async_error_upcall, NULL, &cq_attr);
636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652
	if (IS_ERR(recvcq)) {
		rc = PTR_ERR(recvcq);
		dprintk("RPC:       %s: failed to create recv CQ: %i\n",
			__func__, rc);
		goto out2;
	}

	rc = ib_req_notify_cq(recvcq, IB_CQ_NEXT_COMP);
	if (rc) {
		dprintk("RPC:       %s: ib_req_notify_cq failed: %i\n",
			__func__, rc);
		ib_destroy_cq(recvcq);
		goto out2;
	}

	ep->rep_attr.send_cq = sendcq;
	ep->rep_attr.recv_cq = recvcq;
653 654 655 656 657 658 659 660

	/* Initialize cma parameters */

	/* RPC/RDMA does not use private data */
	ep->rep_remote_cma.private_data = NULL;
	ep->rep_remote_cma.private_data_len = 0;

	/* Client offers RDMA Read but does not initiate */
661
	ep->rep_remote_cma.initiator_depth = 0;
662
	if (devattr->max_qp_rd_atom > 32)	/* arbitrary but <= 255 */
663 664
		ep->rep_remote_cma.responder_resources = 32;
	else
665 666
		ep->rep_remote_cma.responder_resources =
						devattr->max_qp_rd_atom;
667 668 669 670 671 672 673 674

	ep->rep_remote_cma.retry_count = 7;
	ep->rep_remote_cma.flow_control = 0;
	ep->rep_remote_cma.rnr_retry_count = 0;

	return 0;

out2:
675
	err = ib_destroy_cq(sendcq);
C
Chuck Lever 已提交
676 677 678
	if (err)
		dprintk("RPC:       %s: ib_destroy_cq returned %i\n",
			__func__, err);
679
out1:
680 681
	if (ia->ri_dma_mr)
		ib_dereg_mr(ia->ri_dma_mr);
682 683 684 685 686 687 688 689 690 691
	return rc;
}

/*
 * rpcrdma_ep_destroy
 *
 * Disconnect and destroy endpoint. After this, the only
 * valid operations on the ep are to free it (if dynamically
 * allocated) or re-create it.
 */
692
void
693 694 695 696 697 698 699
rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
{
	int rc;

	dprintk("RPC:       %s: entering, connected is %d\n",
		__func__, ep->rep_connected);

700 701
	cancel_delayed_work_sync(&ep->rep_connect_worker);

702
	if (ia->ri_id->qp)
703
		rpcrdma_ep_disconnect(ep, ia);
704 705 706 707 708

	rpcrdma_clean_cq(ep->rep_attr.recv_cq);
	rpcrdma_clean_cq(ep->rep_attr.send_cq);

	if (ia->ri_id->qp) {
709 710
		rdma_destroy_qp(ia->ri_id);
		ia->ri_id->qp = NULL;
711 712
	}

713 714 715 716 717 718
	rc = ib_destroy_cq(ep->rep_attr.recv_cq);
	if (rc)
		dprintk("RPC:       %s: ib_destroy_cq returned %i\n",
			__func__, rc);

	rc = ib_destroy_cq(ep->rep_attr.send_cq);
719 720 721
	if (rc)
		dprintk("RPC:       %s: ib_destroy_cq returned %i\n",
			__func__, rc);
722 723 724 725 726 727

	if (ia->ri_dma_mr) {
		rc = ib_dereg_mr(ia->ri_dma_mr);
		dprintk("RPC:       %s: ib_dereg_mr returned %i\n",
			__func__, rc);
	}
728 729 730 731 732 733 734 735
}

/*
 * Connect unconnected endpoint.
 */
int
rpcrdma_ep_connect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
{
736
	struct rdma_cm_id *id, *old;
737 738 739
	int rc = 0;
	int retry_count = 0;

740
	if (ep->rep_connected != 0) {
741 742
		struct rpcrdma_xprt *xprt;
retry:
743
		dprintk("RPC:       %s: reconnecting...\n", __func__);
744 745

		rpcrdma_ep_disconnect(ep, ia);
746
		rpcrdma_flush_cqs(ep);
747 748 749 750 751

		xprt = container_of(ia, struct rpcrdma_xprt, rx_ia);
		id = rpcrdma_create_id(xprt, ia,
				(struct sockaddr *)&xprt->rx_data.addr);
		if (IS_ERR(id)) {
752
			rc = -EHOSTUNREACH;
753 754 755 756 757 758 759 760 761
			goto out;
		}
		/* TEMP TEMP TEMP - fail if new device:
		 * Deregister/remarshal *all* requests!
		 * Close and recreate adapter, pd, etc!
		 * Re-determine all attributes still sane!
		 * More stuff I haven't thought of!
		 * Rrrgh!
		 */
762
		if (ia->ri_device != id->device) {
763 764
			printk("RPC:       %s: can't reconnect on "
				"different device!\n", __func__);
765
			rpcrdma_destroy_id(id);
766
			rc = -ENETUNREACH;
767 768 769
			goto out;
		}
		/* END TEMP */
770 771 772 773
		rc = rdma_create_qp(id, ia->ri_pd, &ep->rep_attr);
		if (rc) {
			dprintk("RPC:       %s: rdma_create_qp failed %i\n",
				__func__, rc);
774
			rpcrdma_destroy_id(id);
775 776 777
			rc = -ENETUNREACH;
			goto out;
		}
778 779 780

		write_lock(&ia->ri_qplock);
		old = ia->ri_id;
781
		ia->ri_id = id;
782 783 784
		write_unlock(&ia->ri_qplock);

		rdma_destroy_qp(old);
785
		rpcrdma_destroy_id(old);
786 787 788 789 790 791 792 793 794
	} else {
		dprintk("RPC:       %s: connecting...\n", __func__);
		rc = rdma_create_qp(ia->ri_id, ia->ri_pd, &ep->rep_attr);
		if (rc) {
			dprintk("RPC:       %s: rdma_create_qp failed %i\n",
				__func__, rc);
			/* do not update ep->rep_connected */
			return -ENETUNREACH;
		}
795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813
	}

	ep->rep_connected = 0;

	rc = rdma_connect(ia->ri_id, &ep->rep_remote_cma);
	if (rc) {
		dprintk("RPC:       %s: rdma_connect() failed with %i\n",
				__func__, rc);
		goto out;
	}

	wait_event_interruptible(ep->rep_connect_wait, ep->rep_connected != 0);

	/*
	 * Check state. A non-peer reject indicates no listener
	 * (ECONNREFUSED), which may be a transient state. All
	 * others indicate a transport condition which has already
	 * undergone a best-effort.
	 */
814 815
	if (ep->rep_connected == -ECONNREFUSED &&
	    ++retry_count <= RDMA_CONNECT_RETRY_MAX) {
816 817 818 819 820 821
		dprintk("RPC:       %s: non-peer_reject, retry\n", __func__);
		goto retry;
	}
	if (ep->rep_connected <= 0) {
		/* Sometimes, the only way to reliably connect to remote
		 * CMs is to use same nonzero values for ORD and IRD. */
822 823 824 825 826 827 828 829
		if (retry_count++ <= RDMA_CONNECT_RETRY_MAX + 1 &&
		    (ep->rep_remote_cma.responder_resources == 0 ||
		     ep->rep_remote_cma.initiator_depth !=
				ep->rep_remote_cma.responder_resources)) {
			if (ep->rep_remote_cma.responder_resources == 0)
				ep->rep_remote_cma.responder_resources = 1;
			ep->rep_remote_cma.initiator_depth =
				ep->rep_remote_cma.responder_resources;
830
			goto retry;
831
		}
832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851
		rc = ep->rep_connected;
	} else {
		dprintk("RPC:       %s: connected\n", __func__);
	}

out:
	if (rc)
		ep->rep_connected = rc;
	return rc;
}

/*
 * rpcrdma_ep_disconnect
 *
 * This is separate from destroy to facilitate the ability
 * to reconnect without recreating the endpoint.
 *
 * This call is not reentrant, and must not be made in parallel
 * on the same endpoint.
 */
852
void
853 854 855 856
rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
{
	int rc;

857
	rpcrdma_flush_cqs(ep);
858 859 860 861 862 863 864 865 866 867 868 869 870
	rc = rdma_disconnect(ia->ri_id);
	if (!rc) {
		/* returns without wait if not connected */
		wait_event_interruptible(ep->rep_connect_wait,
							ep->rep_connected != 1);
		dprintk("RPC:       %s: after wait, %sconnected\n", __func__,
			(ep->rep_connected == 1) ? "still " : "dis");
	} else {
		dprintk("RPC:       %s: rdma_disconnect %i\n", __func__, rc);
		ep->rep_connected = rc;
	}
}

871 872 873 874 875
static struct rpcrdma_req *
rpcrdma_create_req(struct rpcrdma_xprt *r_xprt)
{
	struct rpcrdma_req *req;

876
	req = kzalloc(sizeof(*req), GFP_KERNEL);
877
	if (req == NULL)
878
		return ERR_PTR(-ENOMEM);
879 880 881 882 883 884 885 886 887 888 889 890 891 892

	req->rl_buffer = &r_xprt->rx_buf;
	return req;
}

static struct rpcrdma_rep *
rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt)
{
	struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
	struct rpcrdma_rep *rep;
	int rc;

	rc = -ENOMEM;
893
	rep = kzalloc(sizeof(*rep), GFP_KERNEL);
894 895 896
	if (rep == NULL)
		goto out;

897 898 899 900
	rep->rr_rdmabuf = rpcrdma_alloc_regbuf(ia, cdata->inline_rsize,
					       GFP_KERNEL);
	if (IS_ERR(rep->rr_rdmabuf)) {
		rc = PTR_ERR(rep->rr_rdmabuf);
901
		goto out_free;
902
	}
903

904
	rep->rr_device = ia->ri_device;
905
	rep->rr_rxprt = r_xprt;
906
	INIT_WORK(&rep->rr_work, rpcrdma_receive_worker);
907 908 909 910 911 912 913 914
	return rep;

out_free:
	kfree(rep);
out:
	return ERR_PTR(rc);
}

915
int
916
rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
917
{
918 919
	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
920 921
	int i, rc;

922
	buf->rb_max_requests = r_xprt->rx_data.max_requests;
923 924
	spin_lock_init(&buf->rb_lock);

C
Chuck Lever 已提交
925 926 927
	rc = ia->ri_ops->ro_init(r_xprt);
	if (rc)
		goto out;
928

929
	INIT_LIST_HEAD(&buf->rb_send_bufs);
930 931 932
	for (i = 0; i < buf->rb_max_requests; i++) {
		struct rpcrdma_req *req;

933 934
		req = rpcrdma_create_req(r_xprt);
		if (IS_ERR(req)) {
935 936
			dprintk("RPC:       %s: request buffer %d alloc"
				" failed\n", __func__, i);
937
			rc = PTR_ERR(req);
938 939
			goto out;
		}
940 941 942 943 944 945
		list_add(&req->rl_free, &buf->rb_send_bufs);
	}

	INIT_LIST_HEAD(&buf->rb_recv_bufs);
	for (i = 0; i < buf->rb_max_requests + 2; i++) {
		struct rpcrdma_rep *rep;
946

947 948
		rep = rpcrdma_create_rep(r_xprt);
		if (IS_ERR(rep)) {
949 950
			dprintk("RPC:       %s: reply buffer %d alloc failed\n",
				__func__, i);
951
			rc = PTR_ERR(rep);
952 953
			goto out;
		}
954
		list_add(&rep->rr_list, &buf->rb_recv_bufs);
955
	}
956

957 958 959 960 961 962
	return 0;
out:
	rpcrdma_buffer_destroy(buf);
	return rc;
}

963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984
static struct rpcrdma_req *
rpcrdma_buffer_get_req_locked(struct rpcrdma_buffer *buf)
{
	struct rpcrdma_req *req;

	req = list_first_entry(&buf->rb_send_bufs,
			       struct rpcrdma_req, rl_free);
	list_del(&req->rl_free);
	return req;
}

static struct rpcrdma_rep *
rpcrdma_buffer_get_rep_locked(struct rpcrdma_buffer *buf)
{
	struct rpcrdma_rep *rep;

	rep = list_first_entry(&buf->rb_recv_bufs,
			       struct rpcrdma_rep, rr_list);
	list_del(&rep->rr_list);
	return rep;
}

985 986 987 988 989 990
static void
rpcrdma_destroy_rep(struct rpcrdma_ia *ia, struct rpcrdma_rep *rep)
{
	if (!rep)
		return;

991
	rpcrdma_free_regbuf(ia, rep->rr_rdmabuf);
992 993 994 995 996 997 998 999 1000
	kfree(rep);
}

static void
rpcrdma_destroy_req(struct rpcrdma_ia *ia, struct rpcrdma_req *req)
{
	if (!req)
		return;

1001
	rpcrdma_free_regbuf(ia, req->rl_sendbuf);
1002
	rpcrdma_free_regbuf(ia, req->rl_rdmabuf);
1003 1004 1005
	kfree(req);
}

1006 1007 1008 1009 1010
void
rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
{
	struct rpcrdma_ia *ia = rdmab_to_ia(buf);

1011 1012
	while (!list_empty(&buf->rb_recv_bufs)) {
		struct rpcrdma_rep *rep;
1013

1014 1015
		rep = rpcrdma_buffer_get_rep_locked(buf);
		rpcrdma_destroy_rep(ia, rep);
1016 1017
	}

1018 1019
	while (!list_empty(&buf->rb_send_bufs)) {
		struct rpcrdma_req *req;
A
Allen Andrews 已提交
1020

1021 1022 1023 1024 1025
		req = rpcrdma_buffer_get_req_locked(buf);
		rpcrdma_destroy_req(ia, req);
	}

	ia->ri_ops->ro_destroy(buf);
1026 1027
}

1028 1029
struct rpcrdma_mw *
rpcrdma_get_mw(struct rpcrdma_xprt *r_xprt)
1030
{
1031 1032 1033
	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
	struct rpcrdma_mw *mw = NULL;

C
Chuck Lever 已提交
1034
	spin_lock(&buf->rb_mwlock);
1035 1036 1037 1038
	if (!list_empty(&buf->rb_mws)) {
		mw = list_first_entry(&buf->rb_mws,
				      struct rpcrdma_mw, mw_list);
		list_del_init(&mw->mw_list);
1039
	}
C
Chuck Lever 已提交
1040
	spin_unlock(&buf->rb_mwlock);
1041 1042 1043 1044

	if (!mw)
		pr_err("RPC:       %s: no MWs available\n", __func__);
	return mw;
1045 1046
}

1047 1048
void
rpcrdma_put_mw(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mw *mw)
1049
{
1050
	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
1051

C
Chuck Lever 已提交
1052
	spin_lock(&buf->rb_mwlock);
1053
	list_add_tail(&mw->mw_list, &buf->rb_mws);
C
Chuck Lever 已提交
1054
	spin_unlock(&buf->rb_mwlock);
1055 1056
}

1057 1058 1059
/*
 * Get a set of request/reply buffers.
 *
1060
 * Reply buffer (if available) is attached to send buffer upon return.
1061 1062 1063 1064 1065 1066
 */
struct rpcrdma_req *
rpcrdma_buffer_get(struct rpcrdma_buffer *buffers)
{
	struct rpcrdma_req *req;

1067
	spin_lock(&buffers->rb_lock);
1068 1069 1070 1071 1072 1073
	if (list_empty(&buffers->rb_send_bufs))
		goto out_reqbuf;
	req = rpcrdma_buffer_get_req_locked(buffers);
	if (list_empty(&buffers->rb_recv_bufs))
		goto out_repbuf;
	req->rl_reply = rpcrdma_buffer_get_rep_locked(buffers);
1074
	spin_unlock(&buffers->rb_lock);
1075
	return req;
1076

1077
out_reqbuf:
1078
	spin_unlock(&buffers->rb_lock);
1079 1080 1081
	pr_warn("RPC:       %s: out of request buffers\n", __func__);
	return NULL;
out_repbuf:
1082
	spin_unlock(&buffers->rb_lock);
1083 1084
	pr_warn("RPC:       %s: out of reply buffers\n", __func__);
	req->rl_reply = NULL;
1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095
	return req;
}

/*
 * Put request/reply buffers back into pool.
 * Pre-decrement counter/array index.
 */
void
rpcrdma_buffer_put(struct rpcrdma_req *req)
{
	struct rpcrdma_buffer *buffers = req->rl_buffer;
1096
	struct rpcrdma_rep *rep = req->rl_reply;
1097

1098 1099 1100
	req->rl_niovs = 0;
	req->rl_reply = NULL;

1101
	spin_lock(&buffers->rb_lock);
1102 1103 1104
	list_add_tail(&req->rl_free, &buffers->rb_send_bufs);
	if (rep)
		list_add_tail(&rep->rr_list, &buffers->rb_recv_bufs);
1105
	spin_unlock(&buffers->rb_lock);
1106 1107 1108 1109
}

/*
 * Recover reply buffers from pool.
1110
 * This happens when recovering from disconnect.
1111 1112 1113 1114 1115 1116
 */
void
rpcrdma_recv_buffer_get(struct rpcrdma_req *req)
{
	struct rpcrdma_buffer *buffers = req->rl_buffer;

1117
	spin_lock(&buffers->rb_lock);
1118 1119
	if (!list_empty(&buffers->rb_recv_bufs))
		req->rl_reply = rpcrdma_buffer_get_rep_locked(buffers);
1120
	spin_unlock(&buffers->rb_lock);
1121 1122 1123 1124
}

/*
 * Put reply buffers back into pool when not attached to
1125
 * request. This happens in error conditions.
1126 1127 1128 1129
 */
void
rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep)
{
1130
	struct rpcrdma_buffer *buffers = &rep->rr_rxprt->rx_buf;
1131

1132
	spin_lock(&buffers->rb_lock);
1133
	list_add_tail(&rep->rr_list, &buffers->rb_recv_bufs);
1134
	spin_unlock(&buffers->rb_lock);
1135 1136 1137 1138 1139 1140
}

/*
 * Wrappers for internal-use kmalloc memory registration, used by buffer code.
 */

1141 1142 1143 1144 1145 1146 1147 1148
void
rpcrdma_mapping_error(struct rpcrdma_mr_seg *seg)
{
	dprintk("RPC:       map_one: offset %p iova %llx len %zu\n",
		seg->mr_offset,
		(unsigned long long)seg->mr_dma, seg->mr_dmalen);
}

1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167
/**
 * rpcrdma_alloc_regbuf - kmalloc and register memory for SEND/RECV buffers
 * @ia: controlling rpcrdma_ia
 * @size: size of buffer to be allocated, in bytes
 * @flags: GFP flags
 *
 * Returns pointer to private header of an area of internally
 * registered memory, or an ERR_PTR. The registered buffer follows
 * the end of the private header.
 *
 * xprtrdma uses a regbuf for posting an outgoing RDMA SEND, or for
 * receiving the payload of RDMA RECV operations. regbufs are not
 * used for RDMA READ/WRITE operations, thus are registered only for
 * LOCAL access.
 */
struct rpcrdma_regbuf *
rpcrdma_alloc_regbuf(struct rpcrdma_ia *ia, size_t size, gfp_t flags)
{
	struct rpcrdma_regbuf *rb;
1168
	struct ib_sge *iov;
1169 1170 1171 1172 1173

	rb = kmalloc(sizeof(*rb) + size, flags);
	if (rb == NULL)
		goto out;

1174 1175 1176 1177 1178
	iov = &rb->rg_iov;
	iov->addr = ib_dma_map_single(ia->ri_device,
				      (void *)rb->rg_base, size,
				      DMA_BIDIRECTIONAL);
	if (ib_dma_mapping_error(ia->ri_device, iov->addr))
1179 1180
		goto out_free;

1181
	iov->length = size;
1182
	iov->lkey = ia->ri_pd->local_dma_lkey;
1183 1184
	rb->rg_size = size;
	rb->rg_owner = NULL;
1185 1186 1187 1188 1189
	return rb;

out_free:
	kfree(rb);
out:
1190
	return ERR_PTR(-ENOMEM);
1191 1192 1193 1194 1195 1196 1197 1198 1199 1200
}

/**
 * rpcrdma_free_regbuf - deregister and free registered buffer
 * @ia: controlling rpcrdma_ia
 * @rb: regbuf to be deregistered and freed
 */
void
rpcrdma_free_regbuf(struct rpcrdma_ia *ia, struct rpcrdma_regbuf *rb)
{
1201 1202 1203 1204 1205 1206 1207 1208 1209
	struct ib_sge *iov;

	if (!rb)
		return;

	iov = &rb->rg_iov;
	ib_dma_unmap_single(ia->ri_device,
			    iov->addr, iov->length, DMA_BIDIRECTIONAL);
	kfree(rb);
1210 1211
}

1212 1213 1214 1215 1216 1217 1218 1219 1220 1221
/*
 * Prepost any receive buffer, then post send.
 *
 * Receive buffer is donated to hardware, reclaimed upon recv completion.
 */
int
rpcrdma_ep_post(struct rpcrdma_ia *ia,
		struct rpcrdma_ep *ep,
		struct rpcrdma_req *req)
{
1222
	struct ib_device *device = ia->ri_device;
1223 1224
	struct ib_send_wr send_wr, *send_wr_fail;
	struct rpcrdma_rep *rep = req->rl_reply;
1225 1226
	struct ib_sge *iov = req->rl_send_iov;
	int i, rc;
1227 1228 1229 1230 1231 1232 1233 1234 1235

	if (rep) {
		rc = rpcrdma_ep_post_recv(ia, ep, rep);
		if (rc)
			goto out;
		req->rl_reply = NULL;
	}

	send_wr.next = NULL;
1236
	send_wr.wr_id = RPCRDMA_IGNORE_COMPLETION;
1237
	send_wr.sg_list = iov;
1238 1239
	send_wr.num_sge = req->rl_niovs;
	send_wr.opcode = IB_WR_SEND;
1240 1241 1242 1243 1244 1245

	for (i = 0; i < send_wr.num_sge; i++)
		ib_dma_sync_single_for_device(device, iov[i].addr,
					      iov[i].length, DMA_TO_DEVICE);
	dprintk("RPC:       %s: posting %d s/g entries\n",
		__func__, send_wr.num_sge);
1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274

	if (DECR_CQCOUNT(ep) > 0)
		send_wr.send_flags = 0;
	else { /* Provider must take a send completion every now and then */
		INIT_CQCOUNT(ep);
		send_wr.send_flags = IB_SEND_SIGNALED;
	}

	rc = ib_post_send(ia->ri_id->qp, &send_wr, &send_wr_fail);
	if (rc)
		dprintk("RPC:       %s: ib_post_send returned %i\n", __func__,
			rc);
out:
	return rc;
}

/*
 * (Re)post a receive buffer.
 */
int
rpcrdma_ep_post_recv(struct rpcrdma_ia *ia,
		     struct rpcrdma_ep *ep,
		     struct rpcrdma_rep *rep)
{
	struct ib_recv_wr recv_wr, *recv_wr_fail;
	int rc;

	recv_wr.next = NULL;
	recv_wr.wr_id = (u64) (unsigned long) rep;
1275
	recv_wr.sg_list = &rep->rr_rdmabuf->rg_iov;
1276 1277
	recv_wr.num_sge = 1;

1278
	ib_dma_sync_single_for_cpu(ia->ri_device,
1279 1280 1281
				   rdmab_addr(rep->rr_rdmabuf),
				   rdmab_length(rep->rr_rdmabuf),
				   DMA_BIDIRECTIONAL);
1282 1283 1284 1285 1286 1287 1288 1289

	rc = ib_post_recv(ia->ri_id->qp, &recv_wr, &recv_wr_fail);

	if (rc)
		dprintk("RPC:       %s: ib_post_recv returned %i\n", __func__,
			rc);
	return rc;
}
1290

1291
/* How many chunk list items fit within our inline buffers?
1292
 */
1293 1294
unsigned int
rpcrdma_max_segments(struct rpcrdma_xprt *r_xprt)
1295 1296
{
	struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
1297
	int bytes, segments;
1298

1299 1300 1301 1302 1303 1304
	bytes = min_t(unsigned int, cdata->inline_wsize, cdata->inline_rsize);
	bytes -= RPCRDMA_HDRLEN_MIN;
	if (bytes < sizeof(struct rpcrdma_segment) * 2) {
		pr_warn("RPC:       %s: inline threshold too small\n",
			__func__);
		return 0;
1305
	}
1306 1307 1308 1309 1310

	segments = 1 << (fls(bytes / sizeof(struct rpcrdma_segment)) - 1);
	dprintk("RPC:       %s: max chunk list size = %d segments\n",
		__func__, segments);
	return segments;
1311
}