diff --git a/include/linux/sunrpc/svc_rdma.h b/include/linux/sunrpc/svc_rdma.h index 8827b4e36c3c45f318a62dded47783de75a9a9b6..d3e2bb3312647ab6aaec1c56043f3a3e7af341ae 100644 --- a/include/linux/sunrpc/svc_rdma.h +++ b/include/linux/sunrpc/svc_rdma.h @@ -109,8 +109,8 @@ struct svcxprt_rdma { struct ib_pd *sc_pd; - spinlock_t sc_ctxt_lock; - struct list_head sc_ctxts; + spinlock_t sc_send_lock; + struct list_head sc_send_ctxts; int sc_ctxt_used; spinlock_t sc_rw_ctxt_lock; struct list_head sc_rw_ctxts; @@ -158,6 +158,19 @@ struct svc_rdma_recv_ctxt { struct page *rc_pages[RPCSVC_MAXPAGES]; }; +enum { + RPCRDMA_MAX_SGES = 1 + (RPCRDMA_MAX_INLINE_THRESH / PAGE_SIZE), +}; + +struct svc_rdma_send_ctxt { + struct list_head sc_list; + struct ib_send_wr sc_send_wr; + struct ib_cqe sc_cqe; + int sc_page_count; + struct page *sc_pages[RPCSVC_MAXPAGES]; + struct ib_sge sc_sges[RPCRDMA_MAX_SGES]; +}; + /* svc_rdma_backchannel.c */ extern int svc_rdma_handle_bc_reply(struct rpc_xprt *xprt, __be32 *rdma_resp, @@ -183,24 +196,22 @@ extern int svc_rdma_send_reply_chunk(struct svcxprt_rdma *rdma, struct xdr_buf *xdr); /* svc_rdma_sendto.c */ +extern void svc_rdma_send_ctxts_destroy(struct svcxprt_rdma *rdma); +extern struct svc_rdma_send_ctxt * + svc_rdma_send_ctxt_get(struct svcxprt_rdma *rdma); +extern void svc_rdma_send_ctxt_put(struct svcxprt_rdma *rdma, + struct svc_rdma_send_ctxt *ctxt); +extern int svc_rdma_send(struct svcxprt_rdma *rdma, struct ib_send_wr *wr); extern int svc_rdma_map_reply_hdr(struct svcxprt_rdma *rdma, - struct svc_rdma_op_ctxt *ctxt, + struct svc_rdma_send_ctxt *ctxt, __be32 *rdma_resp, unsigned int len); extern int svc_rdma_post_send_wr(struct svcxprt_rdma *rdma, - struct svc_rdma_op_ctxt *ctxt, + struct svc_rdma_send_ctxt *ctxt, u32 inv_rkey); extern int svc_rdma_sendto(struct svc_rqst *); /* svc_rdma_transport.c */ -extern void svc_rdma_wc_send(struct ib_cq *, struct ib_wc *); -extern void svc_rdma_wc_reg(struct ib_cq *, struct ib_wc *); -extern void svc_rdma_wc_read(struct ib_cq *, struct ib_wc *); -extern void svc_rdma_wc_inv(struct ib_cq *, struct ib_wc *); -extern int svc_rdma_send(struct svcxprt_rdma *, struct ib_send_wr *); extern int svc_rdma_create_listen(struct svc_serv *, int, struct sockaddr *); -extern struct svc_rdma_op_ctxt *svc_rdma_get_context(struct svcxprt_rdma *); -extern void svc_rdma_put_context(struct svc_rdma_op_ctxt *, int); -extern void svc_rdma_unmap_dma(struct svc_rdma_op_ctxt *ctxt); extern void svc_sq_reap(struct svcxprt_rdma *); extern void svc_rq_reap(struct svcxprt_rdma *); extern void svc_rdma_prep_reply_hdr(struct svc_rqst *); diff --git a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c index 0b9ba9f50a765d86beaa63bafbabdcea649c9ed0..95e33511cc6f3f2758c79bc08f8ef8ad3a93411a 100644 --- a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c +++ b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c @@ -1,6 +1,6 @@ // SPDX-License-Identifier: GPL-2.0 /* - * Copyright (c) 2015 Oracle. All rights reserved. + * Copyright (c) 2015-2018 Oracle. All rights reserved. * * Support for backward direction RPCs on RPC/RDMA (server-side). */ @@ -117,10 +117,14 @@ int svc_rdma_handle_bc_reply(struct rpc_xprt *xprt, __be32 *rdma_resp, static int svc_rdma_bc_sendto(struct svcxprt_rdma *rdma, struct rpc_rqst *rqst) { - struct svc_rdma_op_ctxt *ctxt; + struct svc_rdma_send_ctxt *ctxt; int ret; - ctxt = svc_rdma_get_context(rdma); + ctxt = svc_rdma_send_ctxt_get(rdma); + if (!ctxt) { + ret = -ENOMEM; + goto out_err; + } /* rpcrdma_bc_send_request builds the transport header and * the backchannel RPC message in the same buffer. Thus only @@ -144,8 +148,7 @@ static int svc_rdma_bc_sendto(struct svcxprt_rdma *rdma, return ret; out_unmap: - svc_rdma_unmap_dma(ctxt); - svc_rdma_put_context(ctxt, 1); + svc_rdma_send_ctxt_put(rdma, ctxt); ret = -EIO; goto out_err; } diff --git a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c index af6d2f3b32420fd10f451d0be73c3e449cc89c76..2d1e0db4c8697327fdc9408c598f384db2c111d6 100644 --- a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c +++ b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c @@ -601,7 +601,7 @@ static void rdma_read_complete(struct svc_rqst *rqstp, static void svc_rdma_send_error(struct svcxprt_rdma *xprt, __be32 *rdma_argp, int status) { - struct svc_rdma_op_ctxt *ctxt; + struct svc_rdma_send_ctxt *ctxt; __be32 *p, *err_msgp; unsigned int length; struct page *page; @@ -631,7 +631,10 @@ static void svc_rdma_send_error(struct svcxprt_rdma *xprt, length = (unsigned long)p - (unsigned long)err_msgp; /* Map transport header; no RPC message payload */ - ctxt = svc_rdma_get_context(xprt); + ctxt = svc_rdma_send_ctxt_get(xprt); + if (!ctxt) + return; + ret = svc_rdma_map_reply_hdr(xprt, ctxt, err_msgp, length); if (ret) { dprintk("svcrdma: Error %d mapping send for protocol error\n", @@ -640,10 +643,8 @@ static void svc_rdma_send_error(struct svcxprt_rdma *xprt, } ret = svc_rdma_post_send_wr(xprt, ctxt, 0); - if (ret) { - svc_rdma_unmap_dma(ctxt); - svc_rdma_put_context(ctxt, 1); - } + if (ret) + svc_rdma_send_ctxt_put(xprt, ctxt); } /* By convention, backchannel calls arrive via rdma_msg type diff --git a/net/sunrpc/xprtrdma/svc_rdma_sendto.c b/net/sunrpc/xprtrdma/svc_rdma_sendto.c index 4591017adc1e0795e561deb93cf2a75248877bb3..b286d6a6e4294513dc2024d3dd4390c1efa77b42 100644 --- a/net/sunrpc/xprtrdma/svc_rdma_sendto.c +++ b/net/sunrpc/xprtrdma/svc_rdma_sendto.c @@ -75,11 +75,11 @@ * DMA-unmap the pages under I/O for that Write segment. The Write * completion handler does not release any pages. * - * When the Send WR is constructed, it also gets its own svc_rdma_op_ctxt. + * When the Send WR is constructed, it also gets its own svc_rdma_send_ctxt. * The ownership of all of the Reply's pages are transferred into that * ctxt, the Send WR is posted, and sendto returns. * - * The svc_rdma_op_ctxt is presented when the Send WR completes. The + * The svc_rdma_send_ctxt is presented when the Send WR completes. The * Send completion handler finally releases the Reply's pages. * * This mechanism also assumes that completions on the transport's Send @@ -114,6 +114,184 @@ #define RPCDBG_FACILITY RPCDBG_SVCXPRT +static void svc_rdma_wc_send(struct ib_cq *cq, struct ib_wc *wc); + +static inline struct svc_rdma_send_ctxt * +svc_rdma_next_send_ctxt(struct list_head *list) +{ + return list_first_entry_or_null(list, struct svc_rdma_send_ctxt, + sc_list); +} + +static struct svc_rdma_send_ctxt * +svc_rdma_send_ctxt_alloc(struct svcxprt_rdma *rdma) +{ + struct svc_rdma_send_ctxt *ctxt; + int i; + + ctxt = kmalloc(sizeof(*ctxt), GFP_KERNEL); + if (!ctxt) + return NULL; + + ctxt->sc_cqe.done = svc_rdma_wc_send; + ctxt->sc_send_wr.next = NULL; + ctxt->sc_send_wr.wr_cqe = &ctxt->sc_cqe; + ctxt->sc_send_wr.sg_list = ctxt->sc_sges; + ctxt->sc_send_wr.send_flags = IB_SEND_SIGNALED; + for (i = 0; i < ARRAY_SIZE(ctxt->sc_sges); i++) + ctxt->sc_sges[i].lkey = rdma->sc_pd->local_dma_lkey; + return ctxt; +} + +/** + * svc_rdma_send_ctxts_destroy - Release all send_ctxt's for an xprt + * @rdma: svcxprt_rdma being torn down + * + */ +void svc_rdma_send_ctxts_destroy(struct svcxprt_rdma *rdma) +{ + struct svc_rdma_send_ctxt *ctxt; + + while ((ctxt = svc_rdma_next_send_ctxt(&rdma->sc_send_ctxts))) { + list_del(&ctxt->sc_list); + kfree(ctxt); + } +} + +/** + * svc_rdma_send_ctxt_get - Get a free send_ctxt + * @rdma: controlling svcxprt_rdma + * + * Returns a ready-to-use send_ctxt, or NULL if none are + * available and a fresh one cannot be allocated. + */ +struct svc_rdma_send_ctxt *svc_rdma_send_ctxt_get(struct svcxprt_rdma *rdma) +{ + struct svc_rdma_send_ctxt *ctxt; + + spin_lock(&rdma->sc_send_lock); + ctxt = svc_rdma_next_send_ctxt(&rdma->sc_send_ctxts); + if (!ctxt) + goto out_empty; + list_del(&ctxt->sc_list); + spin_unlock(&rdma->sc_send_lock); + +out: + ctxt->sc_send_wr.num_sge = 0; + ctxt->sc_page_count = 0; + return ctxt; + +out_empty: + spin_unlock(&rdma->sc_send_lock); + ctxt = svc_rdma_send_ctxt_alloc(rdma); + if (!ctxt) + return NULL; + goto out; +} + +/** + * svc_rdma_send_ctxt_put - Return send_ctxt to free list + * @rdma: controlling svcxprt_rdma + * @ctxt: object to return to the free list + * + * Pages left in sc_pages are DMA unmapped and released. + */ +void svc_rdma_send_ctxt_put(struct svcxprt_rdma *rdma, + struct svc_rdma_send_ctxt *ctxt) +{ + struct ib_device *device = rdma->sc_cm_id->device; + unsigned int i; + + for (i = 0; i < ctxt->sc_send_wr.num_sge; i++) + ib_dma_unmap_page(device, + ctxt->sc_sges[i].addr, + ctxt->sc_sges[i].length, + DMA_TO_DEVICE); + + for (i = 0; i < ctxt->sc_page_count; ++i) + put_page(ctxt->sc_pages[i]); + + spin_lock(&rdma->sc_send_lock); + list_add(&ctxt->sc_list, &rdma->sc_send_ctxts); + spin_unlock(&rdma->sc_send_lock); +} + +/** + * svc_rdma_wc_send - Invoked by RDMA provider for each polled Send WC + * @cq: Completion Queue context + * @wc: Work Completion object + * + * NB: The svc_xprt/svcxprt_rdma is pinned whenever it's possible that + * the Send completion handler could be running. + */ +static void svc_rdma_wc_send(struct ib_cq *cq, struct ib_wc *wc) +{ + struct svcxprt_rdma *rdma = cq->cq_context; + struct ib_cqe *cqe = wc->wr_cqe; + struct svc_rdma_send_ctxt *ctxt; + + trace_svcrdma_wc_send(wc); + + atomic_inc(&rdma->sc_sq_avail); + wake_up(&rdma->sc_send_wait); + + ctxt = container_of(cqe, struct svc_rdma_send_ctxt, sc_cqe); + svc_rdma_send_ctxt_put(rdma, ctxt); + + if (unlikely(wc->status != IB_WC_SUCCESS)) { + set_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags); + svc_xprt_enqueue(&rdma->sc_xprt); + if (wc->status != IB_WC_WR_FLUSH_ERR) + pr_err("svcrdma: Send: %s (%u/0x%x)\n", + ib_wc_status_msg(wc->status), + wc->status, wc->vendor_err); + } + + svc_xprt_put(&rdma->sc_xprt); +} + +int svc_rdma_send(struct svcxprt_rdma *rdma, struct ib_send_wr *wr) +{ + struct ib_send_wr *bad_wr, *n_wr; + int wr_count; + int i; + int ret; + + wr_count = 1; + for (n_wr = wr->next; n_wr; n_wr = n_wr->next) + wr_count++; + + /* If the SQ is full, wait until an SQ entry is available */ + while (1) { + if ((atomic_sub_return(wr_count, &rdma->sc_sq_avail) < 0)) { + atomic_inc(&rdma_stat_sq_starve); + trace_svcrdma_sq_full(rdma); + atomic_add(wr_count, &rdma->sc_sq_avail); + wait_event(rdma->sc_send_wait, + atomic_read(&rdma->sc_sq_avail) > wr_count); + if (test_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags)) + return -ENOTCONN; + trace_svcrdma_sq_retry(rdma); + continue; + } + /* Take a transport ref for each WR posted */ + for (i = 0; i < wr_count; i++) + svc_xprt_get(&rdma->sc_xprt); + + /* Bump used SQ WR count and post */ + ret = ib_post_send(rdma->sc_qp, wr, &bad_wr); + trace_svcrdma_post_send(wr, ret); + if (ret) { + set_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags); + for (i = 0; i < wr_count; i++) + svc_xprt_put(&rdma->sc_xprt); + wake_up(&rdma->sc_send_wait); + } + break; + } + return ret; +} + static u32 xdr_padsize(u32 len) { return (len & 3) ? (4 - (len & 3)) : 0; @@ -303,7 +481,7 @@ static u32 svc_rdma_get_inv_rkey(__be32 *rdma_argp, } static int svc_rdma_dma_map_page(struct svcxprt_rdma *rdma, - struct svc_rdma_op_ctxt *ctxt, + struct svc_rdma_send_ctxt *ctxt, unsigned int sge_no, struct page *page, unsigned long offset, @@ -316,10 +494,9 @@ static int svc_rdma_dma_map_page(struct svcxprt_rdma *rdma, if (ib_dma_mapping_error(dev, dma_addr)) goto out_maperr; - ctxt->sge[sge_no].addr = dma_addr; - ctxt->sge[sge_no].length = len; - ctxt->sge[sge_no].lkey = rdma->sc_pd->local_dma_lkey; - ctxt->mapped_sges++; + ctxt->sc_sges[sge_no].addr = dma_addr; + ctxt->sc_sges[sge_no].length = len; + ctxt->sc_send_wr.num_sge++; return 0; out_maperr: @@ -331,7 +508,7 @@ static int svc_rdma_dma_map_page(struct svcxprt_rdma *rdma, * handles DMA-unmap and it uses ib_dma_unmap_page() exclusively. */ static int svc_rdma_dma_map_buf(struct svcxprt_rdma *rdma, - struct svc_rdma_op_ctxt *ctxt, + struct svc_rdma_send_ctxt *ctxt, unsigned int sge_no, unsigned char *base, unsigned int len) @@ -352,14 +529,13 @@ static int svc_rdma_dma_map_buf(struct svcxprt_rdma *rdma, * %-EIO if DMA mapping failed. */ int svc_rdma_map_reply_hdr(struct svcxprt_rdma *rdma, - struct svc_rdma_op_ctxt *ctxt, + struct svc_rdma_send_ctxt *ctxt, __be32 *rdma_resp, unsigned int len) { - ctxt->direction = DMA_TO_DEVICE; - ctxt->pages[0] = virt_to_page(rdma_resp); - ctxt->count = 1; - return svc_rdma_dma_map_page(rdma, ctxt, 0, ctxt->pages[0], 0, len); + ctxt->sc_pages[0] = virt_to_page(rdma_resp); + ctxt->sc_page_count++; + return svc_rdma_dma_map_page(rdma, ctxt, 0, ctxt->sc_pages[0], 0, len); } /* Load the xdr_buf into the ctxt's sge array, and DMA map each @@ -368,7 +544,7 @@ int svc_rdma_map_reply_hdr(struct svcxprt_rdma *rdma, * Returns zero on success, or a negative errno on failure. */ static int svc_rdma_map_reply_msg(struct svcxprt_rdma *rdma, - struct svc_rdma_op_ctxt *ctxt, + struct svc_rdma_send_ctxt *ctxt, struct xdr_buf *xdr, __be32 *wr_lst) { unsigned int len, sge_no, remaining; @@ -436,13 +612,13 @@ static int svc_rdma_map_reply_msg(struct svcxprt_rdma *rdma, * so they are released by the Send completion handler. */ static void svc_rdma_save_io_pages(struct svc_rqst *rqstp, - struct svc_rdma_op_ctxt *ctxt) + struct svc_rdma_send_ctxt *ctxt) { int i, pages = rqstp->rq_next_page - rqstp->rq_respages; - ctxt->count += pages; + ctxt->sc_page_count += pages; for (i = 0; i < pages; i++) { - ctxt->pages[i + 1] = rqstp->rq_respages[i]; + ctxt->sc_pages[i + 1] = rqstp->rq_respages[i]; rqstp->rq_respages[i] = NULL; } rqstp->rq_next_page = rqstp->rq_respages + 1; @@ -461,37 +637,29 @@ static void svc_rdma_save_io_pages(struct svc_rqst *rqstp, * %-ENOMEM if ib_post_send failed. */ int svc_rdma_post_send_wr(struct svcxprt_rdma *rdma, - struct svc_rdma_op_ctxt *ctxt, + struct svc_rdma_send_ctxt *ctxt, u32 inv_rkey) { - struct ib_send_wr *send_wr = &ctxt->send_wr; - dprintk("svcrdma: posting Send WR with %u sge(s)\n", - ctxt->mapped_sges); - - send_wr->next = NULL; - ctxt->cqe.done = svc_rdma_wc_send; - send_wr->wr_cqe = &ctxt->cqe; - send_wr->sg_list = ctxt->sge; - send_wr->num_sge = ctxt->mapped_sges; - send_wr->send_flags = IB_SEND_SIGNALED; + ctxt->sc_send_wr.num_sge); + if (inv_rkey) { - send_wr->opcode = IB_WR_SEND_WITH_INV; - send_wr->ex.invalidate_rkey = inv_rkey; + ctxt->sc_send_wr.opcode = IB_WR_SEND_WITH_INV; + ctxt->sc_send_wr.ex.invalidate_rkey = inv_rkey; } else { - send_wr->opcode = IB_WR_SEND; + ctxt->sc_send_wr.opcode = IB_WR_SEND; } - return svc_rdma_send(rdma, send_wr); + return svc_rdma_send(rdma, &ctxt->sc_send_wr); } /* Prepare the portion of the RPC Reply that will be transmitted * via RDMA Send. The RPC-over-RDMA transport header is prepared - * in sge[0], and the RPC xdr_buf is prepared in following sges. + * in sc_sges[0], and the RPC xdr_buf is prepared in following sges. * * Depending on whether a Write list or Reply chunk is present, * the server may send all, a portion of, or none of the xdr_buf. - * In the latter case, only the transport header (sge[0]) is + * In the latter case, only the transport header (sc_sges[0]) is * transmitted. * * RDMA Send is the last step of transmitting an RPC reply. Pages @@ -508,11 +676,13 @@ static int svc_rdma_send_reply_msg(struct svcxprt_rdma *rdma, struct svc_rqst *rqstp, __be32 *wr_lst, __be32 *rp_ch) { - struct svc_rdma_op_ctxt *ctxt; + struct svc_rdma_send_ctxt *ctxt; u32 inv_rkey; int ret; - ctxt = svc_rdma_get_context(rdma); + ctxt = svc_rdma_send_ctxt_get(rdma); + if (!ctxt) + return -ENOMEM; ret = svc_rdma_map_reply_hdr(rdma, ctxt, rdma_resp, svc_rdma_reply_hdr_len(rdma_resp)); @@ -538,8 +708,7 @@ static int svc_rdma_send_reply_msg(struct svcxprt_rdma *rdma, return 0; err: - svc_rdma_unmap_dma(ctxt); - svc_rdma_put_context(ctxt, 1); + svc_rdma_send_ctxt_put(rdma, ctxt); return ret; } @@ -553,11 +722,13 @@ static int svc_rdma_send_reply_msg(struct svcxprt_rdma *rdma, static int svc_rdma_send_error_msg(struct svcxprt_rdma *rdma, __be32 *rdma_resp, struct svc_rqst *rqstp) { - struct svc_rdma_op_ctxt *ctxt; + struct svc_rdma_send_ctxt *ctxt; __be32 *p; int ret; - ctxt = svc_rdma_get_context(rdma); + ctxt = svc_rdma_send_ctxt_get(rdma); + if (!ctxt) + return -ENOMEM; /* Replace the original transport header with an * RDMA_ERROR response. XID etc are preserved. @@ -580,8 +751,7 @@ static int svc_rdma_send_error_msg(struct svcxprt_rdma *rdma, return 0; err: - svc_rdma_unmap_dma(ctxt); - svc_rdma_put_context(ctxt, 1); + svc_rdma_send_ctxt_put(rdma, ctxt); return ret; } diff --git a/net/sunrpc/xprtrdma/svc_rdma_transport.c b/net/sunrpc/xprtrdma/svc_rdma_transport.c index baeecbb2f763a32c42b7cb04ef3a49d576599b9a..3de81735a6ccfa812928650343e34ffd6dafd662 100644 --- a/net/sunrpc/xprtrdma/svc_rdma_transport.c +++ b/net/sunrpc/xprtrdma/svc_rdma_transport.c @@ -1,5 +1,6 @@ // SPDX-License-Identifier: GPL-2.0 OR BSD-3-Clause /* + * Copyright (c) 2015-2018 Oracle. All rights reserved. * Copyright (c) 2014 Open Grid Computing, Inc. All rights reserved. * Copyright (c) 2005-2007 Network Appliance, Inc. All rights reserved. * @@ -157,114 +158,6 @@ static void svc_rdma_bc_free(struct svc_xprt *xprt) } #endif /* CONFIG_SUNRPC_BACKCHANNEL */ -static struct svc_rdma_op_ctxt *alloc_ctxt(struct svcxprt_rdma *xprt, - gfp_t flags) -{ - struct svc_rdma_op_ctxt *ctxt; - - ctxt = kmalloc(sizeof(*ctxt), flags); - if (ctxt) { - ctxt->xprt = xprt; - INIT_LIST_HEAD(&ctxt->list); - } - return ctxt; -} - -static bool svc_rdma_prealloc_ctxts(struct svcxprt_rdma *xprt) -{ - unsigned int i; - - i = xprt->sc_sq_depth; - while (i--) { - struct svc_rdma_op_ctxt *ctxt; - - ctxt = alloc_ctxt(xprt, GFP_KERNEL); - if (!ctxt) { - dprintk("svcrdma: No memory for RDMA ctxt\n"); - return false; - } - list_add(&ctxt->list, &xprt->sc_ctxts); - } - return true; -} - -struct svc_rdma_op_ctxt *svc_rdma_get_context(struct svcxprt_rdma *xprt) -{ - struct svc_rdma_op_ctxt *ctxt = NULL; - - spin_lock(&xprt->sc_ctxt_lock); - xprt->sc_ctxt_used++; - if (list_empty(&xprt->sc_ctxts)) - goto out_empty; - - ctxt = list_first_entry(&xprt->sc_ctxts, - struct svc_rdma_op_ctxt, list); - list_del(&ctxt->list); - spin_unlock(&xprt->sc_ctxt_lock); - -out: - ctxt->count = 0; - ctxt->mapped_sges = 0; - return ctxt; - -out_empty: - /* Either pre-allocation missed the mark, or send - * queue accounting is broken. - */ - spin_unlock(&xprt->sc_ctxt_lock); - - ctxt = alloc_ctxt(xprt, GFP_NOIO); - if (ctxt) - goto out; - - spin_lock(&xprt->sc_ctxt_lock); - xprt->sc_ctxt_used--; - spin_unlock(&xprt->sc_ctxt_lock); - WARN_ONCE(1, "svcrdma: empty RDMA ctxt list?\n"); - return NULL; -} - -void svc_rdma_unmap_dma(struct svc_rdma_op_ctxt *ctxt) -{ - struct svcxprt_rdma *xprt = ctxt->xprt; - struct ib_device *device = xprt->sc_cm_id->device; - unsigned int i; - - for (i = 0; i < ctxt->mapped_sges; i++) - ib_dma_unmap_page(device, - ctxt->sge[i].addr, - ctxt->sge[i].length, - ctxt->direction); - ctxt->mapped_sges = 0; -} - -void svc_rdma_put_context(struct svc_rdma_op_ctxt *ctxt, int free_pages) -{ - struct svcxprt_rdma *xprt = ctxt->xprt; - int i; - - if (free_pages) - for (i = 0; i < ctxt->count; i++) - put_page(ctxt->pages[i]); - - spin_lock(&xprt->sc_ctxt_lock); - xprt->sc_ctxt_used--; - list_add(&ctxt->list, &xprt->sc_ctxts); - spin_unlock(&xprt->sc_ctxt_lock); -} - -static void svc_rdma_destroy_ctxts(struct svcxprt_rdma *xprt) -{ - while (!list_empty(&xprt->sc_ctxts)) { - struct svc_rdma_op_ctxt *ctxt; - - ctxt = list_first_entry(&xprt->sc_ctxts, - struct svc_rdma_op_ctxt, list); - list_del(&ctxt->list); - kfree(ctxt); - } -} - /* QP event handler */ static void qp_event_handler(struct ib_event *event, void *context) { @@ -292,39 +185,6 @@ static void qp_event_handler(struct ib_event *event, void *context) } } -/** - * svc_rdma_wc_send - Invoked by RDMA provider for each polled Send WC - * @cq: completion queue - * @wc: completed WR - * - */ -void svc_rdma_wc_send(struct ib_cq *cq, struct ib_wc *wc) -{ - struct svcxprt_rdma *xprt = cq->cq_context; - struct ib_cqe *cqe = wc->wr_cqe; - struct svc_rdma_op_ctxt *ctxt; - - trace_svcrdma_wc_send(wc); - - atomic_inc(&xprt->sc_sq_avail); - wake_up(&xprt->sc_send_wait); - - ctxt = container_of(cqe, struct svc_rdma_op_ctxt, cqe); - svc_rdma_unmap_dma(ctxt); - svc_rdma_put_context(ctxt, 1); - - if (unlikely(wc->status != IB_WC_SUCCESS)) { - set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags); - svc_xprt_enqueue(&xprt->sc_xprt); - if (wc->status != IB_WC_WR_FLUSH_ERR) - pr_err("svcrdma: Send: %s (%u/0x%x)\n", - ib_wc_status_msg(wc->status), - wc->status, wc->vendor_err); - } - - svc_xprt_put(&xprt->sc_xprt); -} - static struct svcxprt_rdma *svc_rdma_create_xprt(struct svc_serv *serv, struct net *net) { @@ -338,14 +198,14 @@ static struct svcxprt_rdma *svc_rdma_create_xprt(struct svc_serv *serv, INIT_LIST_HEAD(&cma_xprt->sc_accept_q); INIT_LIST_HEAD(&cma_xprt->sc_rq_dto_q); INIT_LIST_HEAD(&cma_xprt->sc_read_complete_q); - INIT_LIST_HEAD(&cma_xprt->sc_ctxts); + INIT_LIST_HEAD(&cma_xprt->sc_send_ctxts); INIT_LIST_HEAD(&cma_xprt->sc_recv_ctxts); INIT_LIST_HEAD(&cma_xprt->sc_rw_ctxts); init_waitqueue_head(&cma_xprt->sc_send_wait); spin_lock_init(&cma_xprt->sc_lock); spin_lock_init(&cma_xprt->sc_rq_dto_lock); - spin_lock_init(&cma_xprt->sc_ctxt_lock); + spin_lock_init(&cma_xprt->sc_send_lock); spin_lock_init(&cma_xprt->sc_recv_lock); spin_lock_init(&cma_xprt->sc_rw_ctxt_lock); @@ -640,9 +500,6 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt) } atomic_set(&newxprt->sc_sq_avail, newxprt->sc_sq_depth); - if (!svc_rdma_prealloc_ctxts(newxprt)) - goto errout; - newxprt->sc_pd = ib_alloc_pd(dev, 0); if (IS_ERR(newxprt->sc_pd)) { dprintk("svcrdma: error creating PD for connect request\n"); @@ -794,11 +651,6 @@ static void __svc_rdma_free(struct work_struct *work) svc_rdma_flush_recv_queues(rdma); - /* Warn if we leaked a resource or under-referenced */ - if (rdma->sc_ctxt_used != 0) - pr_err("svcrdma: ctxt still in use? (%d)\n", - rdma->sc_ctxt_used); - /* Final put of backchannel client transport */ if (xprt->xpt_bc_xprt) { xprt_put(xprt->xpt_bc_xprt); @@ -806,7 +658,7 @@ static void __svc_rdma_free(struct work_struct *work) } svc_rdma_destroy_rw_ctxts(rdma); - svc_rdma_destroy_ctxts(rdma); + svc_rdma_send_ctxts_destroy(rdma); svc_rdma_recv_ctxts_destroy(rdma); /* Destroy the QP if present (not a listener) */ @@ -860,52 +712,3 @@ static void svc_rdma_secure_port(struct svc_rqst *rqstp) static void svc_rdma_kill_temp_xprt(struct svc_xprt *xprt) { } - -int svc_rdma_send(struct svcxprt_rdma *xprt, struct ib_send_wr *wr) -{ - struct ib_send_wr *bad_wr, *n_wr; - int wr_count; - int i; - int ret; - - if (test_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags)) - return -ENOTCONN; - - wr_count = 1; - for (n_wr = wr->next; n_wr; n_wr = n_wr->next) - wr_count++; - - /* If the SQ is full, wait until an SQ entry is available */ - while (1) { - if ((atomic_sub_return(wr_count, &xprt->sc_sq_avail) < 0)) { - atomic_inc(&rdma_stat_sq_starve); - trace_svcrdma_sq_full(xprt); - atomic_add(wr_count, &xprt->sc_sq_avail); - wait_event(xprt->sc_send_wait, - atomic_read(&xprt->sc_sq_avail) > wr_count); - if (test_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags)) - return -ENOTCONN; - trace_svcrdma_sq_retry(xprt); - continue; - } - /* Take a transport ref for each WR posted */ - for (i = 0; i < wr_count; i++) - svc_xprt_get(&xprt->sc_xprt); - - /* Bump used SQ WR count and post */ - ret = ib_post_send(xprt->sc_qp, wr, &bad_wr); - trace_svcrdma_post_send(wr, ret); - if (ret) { - set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags); - for (i = 0; i < wr_count; i ++) - svc_xprt_put(&xprt->sc_xprt); - dprintk("svcrdma: failed to post SQ WR rc=%d\n", ret); - dprintk(" sc_sq_avail=%d, sc_sq_depth=%d\n", - atomic_read(&xprt->sc_sq_avail), - xprt->sc_sq_depth); - wake_up(&xprt->sc_send_wait); - } - break; - } - return ret; -}