From 4201c7464753827803366b40e82eb050c04ebdef Mon Sep 17 00:00:00 2001 From: Chuck Lever Date: Mon, 7 May 2018 15:28:04 -0400 Subject: [PATCH] svcrdma: Introduce svc_rdma_send_ctxt svc_rdma_op_ctxt's are pre-allocated and maintained on a per-xprt free list. This eliminates the overhead of calling kmalloc / kfree, both of which grab a globally shared lock that disables interrupts. Introduce a replacement to svc_rdma_op_ctxt's that is built especially for the svcrdma Send path. Subsequent patches will take advantage of this new structure by allocating real resources which are then cached in these objects. The allocations are freed when the transport is torn down. I've renamed the structure so that static type checking can be used to ensure that uses of op_ctxt and send_ctxt are not confused. As an additional clean up, structure fields are renamed to conform with kernel coding conventions. Additional clean ups: - Handle svc_rdma_send_ctxt_get allocation failure at each call site, rather than pre-allocating and hoping we guessed correctly - All send_ctxt_put call-sites request page freeing, so remove the @free_pages argument - All send_ctxt_put call-sites unmap SGEs, so fold that into svc_rdma_send_ctxt_put Signed-off-by: Chuck Lever Signed-off-by: J. Bruce Fields --- include/linux/sunrpc/svc_rdma.h | 35 ++- net/sunrpc/xprtrdma/svc_rdma_backchannel.c | 13 +- net/sunrpc/xprtrdma/svc_rdma_recvfrom.c | 13 +- net/sunrpc/xprtrdma/svc_rdma_sendto.c | 254 +++++++++++++++++---- net/sunrpc/xprtrdma/svc_rdma_transport.c | 205 +---------------- 5 files changed, 254 insertions(+), 266 deletions(-) diff --git a/include/linux/sunrpc/svc_rdma.h b/include/linux/sunrpc/svc_rdma.h index 8827b4e36c3c..d3e2bb331264 100644 --- a/include/linux/sunrpc/svc_rdma.h +++ b/include/linux/sunrpc/svc_rdma.h @@ -109,8 +109,8 @@ struct svcxprt_rdma { struct ib_pd *sc_pd; - spinlock_t sc_ctxt_lock; - struct list_head sc_ctxts; + spinlock_t sc_send_lock; + struct list_head sc_send_ctxts; int sc_ctxt_used; spinlock_t sc_rw_ctxt_lock; struct list_head sc_rw_ctxts; @@ -158,6 +158,19 @@ struct svc_rdma_recv_ctxt { struct page *rc_pages[RPCSVC_MAXPAGES]; }; +enum { + RPCRDMA_MAX_SGES = 1 + (RPCRDMA_MAX_INLINE_THRESH / PAGE_SIZE), +}; + +struct svc_rdma_send_ctxt { + struct list_head sc_list; + struct ib_send_wr sc_send_wr; + struct ib_cqe sc_cqe; + int sc_page_count; + struct page *sc_pages[RPCSVC_MAXPAGES]; + struct ib_sge sc_sges[RPCRDMA_MAX_SGES]; +}; + /* svc_rdma_backchannel.c */ extern int svc_rdma_handle_bc_reply(struct rpc_xprt *xprt, __be32 *rdma_resp, @@ -183,24 +196,22 @@ extern int svc_rdma_send_reply_chunk(struct svcxprt_rdma *rdma, struct xdr_buf *xdr); /* svc_rdma_sendto.c */ +extern void svc_rdma_send_ctxts_destroy(struct svcxprt_rdma *rdma); +extern struct svc_rdma_send_ctxt * + svc_rdma_send_ctxt_get(struct svcxprt_rdma *rdma); +extern void svc_rdma_send_ctxt_put(struct svcxprt_rdma *rdma, + struct svc_rdma_send_ctxt *ctxt); +extern int svc_rdma_send(struct svcxprt_rdma *rdma, struct ib_send_wr *wr); extern int svc_rdma_map_reply_hdr(struct svcxprt_rdma *rdma, - struct svc_rdma_op_ctxt *ctxt, + struct svc_rdma_send_ctxt *ctxt, __be32 *rdma_resp, unsigned int len); extern int svc_rdma_post_send_wr(struct svcxprt_rdma *rdma, - struct svc_rdma_op_ctxt *ctxt, + struct svc_rdma_send_ctxt *ctxt, u32 inv_rkey); extern int svc_rdma_sendto(struct svc_rqst *); /* svc_rdma_transport.c */ -extern void svc_rdma_wc_send(struct ib_cq *, struct ib_wc *); -extern void svc_rdma_wc_reg(struct ib_cq *, struct ib_wc *); -extern void svc_rdma_wc_read(struct ib_cq *, struct ib_wc *); -extern void svc_rdma_wc_inv(struct ib_cq *, struct ib_wc *); -extern int svc_rdma_send(struct svcxprt_rdma *, struct ib_send_wr *); extern int svc_rdma_create_listen(struct svc_serv *, int, struct sockaddr *); -extern struct svc_rdma_op_ctxt *svc_rdma_get_context(struct svcxprt_rdma *); -extern void svc_rdma_put_context(struct svc_rdma_op_ctxt *, int); -extern void svc_rdma_unmap_dma(struct svc_rdma_op_ctxt *ctxt); extern void svc_sq_reap(struct svcxprt_rdma *); extern void svc_rq_reap(struct svcxprt_rdma *); extern void svc_rdma_prep_reply_hdr(struct svc_rqst *); diff --git a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c index 0b9ba9f50a76..95e33511cc6f 100644 --- a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c +++ b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c @@ -1,6 +1,6 @@ // SPDX-License-Identifier: GPL-2.0 /* - * Copyright (c) 2015 Oracle. All rights reserved. + * Copyright (c) 2015-2018 Oracle. All rights reserved. * * Support for backward direction RPCs on RPC/RDMA (server-side). */ @@ -117,10 +117,14 @@ int svc_rdma_handle_bc_reply(struct rpc_xprt *xprt, __be32 *rdma_resp, static int svc_rdma_bc_sendto(struct svcxprt_rdma *rdma, struct rpc_rqst *rqst) { - struct svc_rdma_op_ctxt *ctxt; + struct svc_rdma_send_ctxt *ctxt; int ret; - ctxt = svc_rdma_get_context(rdma); + ctxt = svc_rdma_send_ctxt_get(rdma); + if (!ctxt) { + ret = -ENOMEM; + goto out_err; + } /* rpcrdma_bc_send_request builds the transport header and * the backchannel RPC message in the same buffer. Thus only @@ -144,8 +148,7 @@ static int svc_rdma_bc_sendto(struct svcxprt_rdma *rdma, return ret; out_unmap: - svc_rdma_unmap_dma(ctxt); - svc_rdma_put_context(ctxt, 1); + svc_rdma_send_ctxt_put(rdma, ctxt); ret = -EIO; goto out_err; } diff --git a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c index af6d2f3b3242..2d1e0db4c869 100644 --- a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c +++ b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c @@ -601,7 +601,7 @@ static void rdma_read_complete(struct svc_rqst *rqstp, static void svc_rdma_send_error(struct svcxprt_rdma *xprt, __be32 *rdma_argp, int status) { - struct svc_rdma_op_ctxt *ctxt; + struct svc_rdma_send_ctxt *ctxt; __be32 *p, *err_msgp; unsigned int length; struct page *page; @@ -631,7 +631,10 @@ static void svc_rdma_send_error(struct svcxprt_rdma *xprt, length = (unsigned long)p - (unsigned long)err_msgp; /* Map transport header; no RPC message payload */ - ctxt = svc_rdma_get_context(xprt); + ctxt = svc_rdma_send_ctxt_get(xprt); + if (!ctxt) + return; + ret = svc_rdma_map_reply_hdr(xprt, ctxt, err_msgp, length); if (ret) { dprintk("svcrdma: Error %d mapping send for protocol error\n", @@ -640,10 +643,8 @@ static void svc_rdma_send_error(struct svcxprt_rdma *xprt, } ret = svc_rdma_post_send_wr(xprt, ctxt, 0); - if (ret) { - svc_rdma_unmap_dma(ctxt); - svc_rdma_put_context(ctxt, 1); - } + if (ret) + svc_rdma_send_ctxt_put(xprt, ctxt); } /* By convention, backchannel calls arrive via rdma_msg type diff --git a/net/sunrpc/xprtrdma/svc_rdma_sendto.c b/net/sunrpc/xprtrdma/svc_rdma_sendto.c index 4591017adc1e..b286d6a6e429 100644 --- a/net/sunrpc/xprtrdma/svc_rdma_sendto.c +++ b/net/sunrpc/xprtrdma/svc_rdma_sendto.c @@ -75,11 +75,11 @@ * DMA-unmap the pages under I/O for that Write segment. The Write * completion handler does not release any pages. * - * When the Send WR is constructed, it also gets its own svc_rdma_op_ctxt. + * When the Send WR is constructed, it also gets its own svc_rdma_send_ctxt. * The ownership of all of the Reply's pages are transferred into that * ctxt, the Send WR is posted, and sendto returns. * - * The svc_rdma_op_ctxt is presented when the Send WR completes. The + * The svc_rdma_send_ctxt is presented when the Send WR completes. The * Send completion handler finally releases the Reply's pages. * * This mechanism also assumes that completions on the transport's Send @@ -114,6 +114,184 @@ #define RPCDBG_FACILITY RPCDBG_SVCXPRT +static void svc_rdma_wc_send(struct ib_cq *cq, struct ib_wc *wc); + +static inline struct svc_rdma_send_ctxt * +svc_rdma_next_send_ctxt(struct list_head *list) +{ + return list_first_entry_or_null(list, struct svc_rdma_send_ctxt, + sc_list); +} + +static struct svc_rdma_send_ctxt * +svc_rdma_send_ctxt_alloc(struct svcxprt_rdma *rdma) +{ + struct svc_rdma_send_ctxt *ctxt; + int i; + + ctxt = kmalloc(sizeof(*ctxt), GFP_KERNEL); + if (!ctxt) + return NULL; + + ctxt->sc_cqe.done = svc_rdma_wc_send; + ctxt->sc_send_wr.next = NULL; + ctxt->sc_send_wr.wr_cqe = &ctxt->sc_cqe; + ctxt->sc_send_wr.sg_list = ctxt->sc_sges; + ctxt->sc_send_wr.send_flags = IB_SEND_SIGNALED; + for (i = 0; i < ARRAY_SIZE(ctxt->sc_sges); i++) + ctxt->sc_sges[i].lkey = rdma->sc_pd->local_dma_lkey; + return ctxt; +} + +/** + * svc_rdma_send_ctxts_destroy - Release all send_ctxt's for an xprt + * @rdma: svcxprt_rdma being torn down + * + */ +void svc_rdma_send_ctxts_destroy(struct svcxprt_rdma *rdma) +{ + struct svc_rdma_send_ctxt *ctxt; + + while ((ctxt = svc_rdma_next_send_ctxt(&rdma->sc_send_ctxts))) { + list_del(&ctxt->sc_list); + kfree(ctxt); + } +} + +/** + * svc_rdma_send_ctxt_get - Get a free send_ctxt + * @rdma: controlling svcxprt_rdma + * + * Returns a ready-to-use send_ctxt, or NULL if none are + * available and a fresh one cannot be allocated. + */ +struct svc_rdma_send_ctxt *svc_rdma_send_ctxt_get(struct svcxprt_rdma *rdma) +{ + struct svc_rdma_send_ctxt *ctxt; + + spin_lock(&rdma->sc_send_lock); + ctxt = svc_rdma_next_send_ctxt(&rdma->sc_send_ctxts); + if (!ctxt) + goto out_empty; + list_del(&ctxt->sc_list); + spin_unlock(&rdma->sc_send_lock); + +out: + ctxt->sc_send_wr.num_sge = 0; + ctxt->sc_page_count = 0; + return ctxt; + +out_empty: + spin_unlock(&rdma->sc_send_lock); + ctxt = svc_rdma_send_ctxt_alloc(rdma); + if (!ctxt) + return NULL; + goto out; +} + +/** + * svc_rdma_send_ctxt_put - Return send_ctxt to free list + * @rdma: controlling svcxprt_rdma + * @ctxt: object to return to the free list + * + * Pages left in sc_pages are DMA unmapped and released. + */ +void svc_rdma_send_ctxt_put(struct svcxprt_rdma *rdma, + struct svc_rdma_send_ctxt *ctxt) +{ + struct ib_device *device = rdma->sc_cm_id->device; + unsigned int i; + + for (i = 0; i < ctxt->sc_send_wr.num_sge; i++) + ib_dma_unmap_page(device, + ctxt->sc_sges[i].addr, + ctxt->sc_sges[i].length, + DMA_TO_DEVICE); + + for (i = 0; i < ctxt->sc_page_count; ++i) + put_page(ctxt->sc_pages[i]); + + spin_lock(&rdma->sc_send_lock); + list_add(&ctxt->sc_list, &rdma->sc_send_ctxts); + spin_unlock(&rdma->sc_send_lock); +} + +/** + * svc_rdma_wc_send - Invoked by RDMA provider for each polled Send WC + * @cq: Completion Queue context + * @wc: Work Completion object + * + * NB: The svc_xprt/svcxprt_rdma is pinned whenever it's possible that + * the Send completion handler could be running. + */ +static void svc_rdma_wc_send(struct ib_cq *cq, struct ib_wc *wc) +{ + struct svcxprt_rdma *rdma = cq->cq_context; + struct ib_cqe *cqe = wc->wr_cqe; + struct svc_rdma_send_ctxt *ctxt; + + trace_svcrdma_wc_send(wc); + + atomic_inc(&rdma->sc_sq_avail); + wake_up(&rdma->sc_send_wait); + + ctxt = container_of(cqe, struct svc_rdma_send_ctxt, sc_cqe); + svc_rdma_send_ctxt_put(rdma, ctxt); + + if (unlikely(wc->status != IB_WC_SUCCESS)) { + set_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags); + svc_xprt_enqueue(&rdma->sc_xprt); + if (wc->status != IB_WC_WR_FLUSH_ERR) + pr_err("svcrdma: Send: %s (%u/0x%x)\n", + ib_wc_status_msg(wc->status), + wc->status, wc->vendor_err); + } + + svc_xprt_put(&rdma->sc_xprt); +} + +int svc_rdma_send(struct svcxprt_rdma *rdma, struct ib_send_wr *wr) +{ + struct ib_send_wr *bad_wr, *n_wr; + int wr_count; + int i; + int ret; + + wr_count = 1; + for (n_wr = wr->next; n_wr; n_wr = n_wr->next) + wr_count++; + + /* If the SQ is full, wait until an SQ entry is available */ + while (1) { + if ((atomic_sub_return(wr_count, &rdma->sc_sq_avail) < 0)) { + atomic_inc(&rdma_stat_sq_starve); + trace_svcrdma_sq_full(rdma); + atomic_add(wr_count, &rdma->sc_sq_avail); + wait_event(rdma->sc_send_wait, + atomic_read(&rdma->sc_sq_avail) > wr_count); + if (test_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags)) + return -ENOTCONN; + trace_svcrdma_sq_retry(rdma); + continue; + } + /* Take a transport ref for each WR posted */ + for (i = 0; i < wr_count; i++) + svc_xprt_get(&rdma->sc_xprt); + + /* Bump used SQ WR count and post */ + ret = ib_post_send(rdma->sc_qp, wr, &bad_wr); + trace_svcrdma_post_send(wr, ret); + if (ret) { + set_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags); + for (i = 0; i < wr_count; i++) + svc_xprt_put(&rdma->sc_xprt); + wake_up(&rdma->sc_send_wait); + } + break; + } + return ret; +} + static u32 xdr_padsize(u32 len) { return (len & 3) ? (4 - (len & 3)) : 0; @@ -303,7 +481,7 @@ static u32 svc_rdma_get_inv_rkey(__be32 *rdma_argp, } static int svc_rdma_dma_map_page(struct svcxprt_rdma *rdma, - struct svc_rdma_op_ctxt *ctxt, + struct svc_rdma_send_ctxt *ctxt, unsigned int sge_no, struct page *page, unsigned long offset, @@ -316,10 +494,9 @@ static int svc_rdma_dma_map_page(struct svcxprt_rdma *rdma, if (ib_dma_mapping_error(dev, dma_addr)) goto out_maperr; - ctxt->sge[sge_no].addr = dma_addr; - ctxt->sge[sge_no].length = len; - ctxt->sge[sge_no].lkey = rdma->sc_pd->local_dma_lkey; - ctxt->mapped_sges++; + ctxt->sc_sges[sge_no].addr = dma_addr; + ctxt->sc_sges[sge_no].length = len; + ctxt->sc_send_wr.num_sge++; return 0; out_maperr: @@ -331,7 +508,7 @@ static int svc_rdma_dma_map_page(struct svcxprt_rdma *rdma, * handles DMA-unmap and it uses ib_dma_unmap_page() exclusively. */ static int svc_rdma_dma_map_buf(struct svcxprt_rdma *rdma, - struct svc_rdma_op_ctxt *ctxt, + struct svc_rdma_send_ctxt *ctxt, unsigned int sge_no, unsigned char *base, unsigned int len) @@ -352,14 +529,13 @@ static int svc_rdma_dma_map_buf(struct svcxprt_rdma *rdma, * %-EIO if DMA mapping failed. */ int svc_rdma_map_reply_hdr(struct svcxprt_rdma *rdma, - struct svc_rdma_op_ctxt *ctxt, + struct svc_rdma_send_ctxt *ctxt, __be32 *rdma_resp, unsigned int len) { - ctxt->direction = DMA_TO_DEVICE; - ctxt->pages[0] = virt_to_page(rdma_resp); - ctxt->count = 1; - return svc_rdma_dma_map_page(rdma, ctxt, 0, ctxt->pages[0], 0, len); + ctxt->sc_pages[0] = virt_to_page(rdma_resp); + ctxt->sc_page_count++; + return svc_rdma_dma_map_page(rdma, ctxt, 0, ctxt->sc_pages[0], 0, len); } /* Load the xdr_buf into the ctxt's sge array, and DMA map each @@ -368,7 +544,7 @@ int svc_rdma_map_reply_hdr(struct svcxprt_rdma *rdma, * Returns zero on success, or a negative errno on failure. */ static int svc_rdma_map_reply_msg(struct svcxprt_rdma *rdma, - struct svc_rdma_op_ctxt *ctxt, + struct svc_rdma_send_ctxt *ctxt, struct xdr_buf *xdr, __be32 *wr_lst) { unsigned int len, sge_no, remaining; @@ -436,13 +612,13 @@ static int svc_rdma_map_reply_msg(struct svcxprt_rdma *rdma, * so they are released by the Send completion handler. */ static void svc_rdma_save_io_pages(struct svc_rqst *rqstp, - struct svc_rdma_op_ctxt *ctxt) + struct svc_rdma_send_ctxt *ctxt) { int i, pages = rqstp->rq_next_page - rqstp->rq_respages; - ctxt->count += pages; + ctxt->sc_page_count += pages; for (i = 0; i < pages; i++) { - ctxt->pages[i + 1] = rqstp->rq_respages[i]; + ctxt->sc_pages[i + 1] = rqstp->rq_respages[i]; rqstp->rq_respages[i] = NULL; } rqstp->rq_next_page = rqstp->rq_respages + 1; @@ -461,37 +637,29 @@ static void svc_rdma_save_io_pages(struct svc_rqst *rqstp, * %-ENOMEM if ib_post_send failed. */ int svc_rdma_post_send_wr(struct svcxprt_rdma *rdma, - struct svc_rdma_op_ctxt *ctxt, + struct svc_rdma_send_ctxt *ctxt, u32 inv_rkey) { - struct ib_send_wr *send_wr = &ctxt->send_wr; - dprintk("svcrdma: posting Send WR with %u sge(s)\n", - ctxt->mapped_sges); - - send_wr->next = NULL; - ctxt->cqe.done = svc_rdma_wc_send; - send_wr->wr_cqe = &ctxt->cqe; - send_wr->sg_list = ctxt->sge; - send_wr->num_sge = ctxt->mapped_sges; - send_wr->send_flags = IB_SEND_SIGNALED; + ctxt->sc_send_wr.num_sge); + if (inv_rkey) { - send_wr->opcode = IB_WR_SEND_WITH_INV; - send_wr->ex.invalidate_rkey = inv_rkey; + ctxt->sc_send_wr.opcode = IB_WR_SEND_WITH_INV; + ctxt->sc_send_wr.ex.invalidate_rkey = inv_rkey; } else { - send_wr->opcode = IB_WR_SEND; + ctxt->sc_send_wr.opcode = IB_WR_SEND; } - return svc_rdma_send(rdma, send_wr); + return svc_rdma_send(rdma, &ctxt->sc_send_wr); } /* Prepare the portion of the RPC Reply that will be transmitted * via RDMA Send. The RPC-over-RDMA transport header is prepared - * in sge[0], and the RPC xdr_buf is prepared in following sges. + * in sc_sges[0], and the RPC xdr_buf is prepared in following sges. * * Depending on whether a Write list or Reply chunk is present, * the server may send all, a portion of, or none of the xdr_buf. - * In the latter case, only the transport header (sge[0]) is + * In the latter case, only the transport header (sc_sges[0]) is * transmitted. * * RDMA Send is the last step of transmitting an RPC reply. Pages @@ -508,11 +676,13 @@ static int svc_rdma_send_reply_msg(struct svcxprt_rdma *rdma, struct svc_rqst *rqstp, __be32 *wr_lst, __be32 *rp_ch) { - struct svc_rdma_op_ctxt *ctxt; + struct svc_rdma_send_ctxt *ctxt; u32 inv_rkey; int ret; - ctxt = svc_rdma_get_context(rdma); + ctxt = svc_rdma_send_ctxt_get(rdma); + if (!ctxt) + return -ENOMEM; ret = svc_rdma_map_reply_hdr(rdma, ctxt, rdma_resp, svc_rdma_reply_hdr_len(rdma_resp)); @@ -538,8 +708,7 @@ static int svc_rdma_send_reply_msg(struct svcxprt_rdma *rdma, return 0; err: - svc_rdma_unmap_dma(ctxt); - svc_rdma_put_context(ctxt, 1); + svc_rdma_send_ctxt_put(rdma, ctxt); return ret; } @@ -553,11 +722,13 @@ static int svc_rdma_send_reply_msg(struct svcxprt_rdma *rdma, static int svc_rdma_send_error_msg(struct svcxprt_rdma *rdma, __be32 *rdma_resp, struct svc_rqst *rqstp) { - struct svc_rdma_op_ctxt *ctxt; + struct svc_rdma_send_ctxt *ctxt; __be32 *p; int ret; - ctxt = svc_rdma_get_context(rdma); + ctxt = svc_rdma_send_ctxt_get(rdma); + if (!ctxt) + return -ENOMEM; /* Replace the original transport header with an * RDMA_ERROR response. XID etc are preserved. @@ -580,8 +751,7 @@ static int svc_rdma_send_error_msg(struct svcxprt_rdma *rdma, return 0; err: - svc_rdma_unmap_dma(ctxt); - svc_rdma_put_context(ctxt, 1); + svc_rdma_send_ctxt_put(rdma, ctxt); return ret; } diff --git a/net/sunrpc/xprtrdma/svc_rdma_transport.c b/net/sunrpc/xprtrdma/svc_rdma_transport.c index baeecbb2f763..3de81735a6cc 100644 --- a/net/sunrpc/xprtrdma/svc_rdma_transport.c +++ b/net/sunrpc/xprtrdma/svc_rdma_transport.c @@ -1,5 +1,6 @@ // SPDX-License-Identifier: GPL-2.0 OR BSD-3-Clause /* + * Copyright (c) 2015-2018 Oracle. All rights reserved. * Copyright (c) 2014 Open Grid Computing, Inc. All rights reserved. * Copyright (c) 2005-2007 Network Appliance, Inc. All rights reserved. * @@ -157,114 +158,6 @@ static void svc_rdma_bc_free(struct svc_xprt *xprt) } #endif /* CONFIG_SUNRPC_BACKCHANNEL */ -static struct svc_rdma_op_ctxt *alloc_ctxt(struct svcxprt_rdma *xprt, - gfp_t flags) -{ - struct svc_rdma_op_ctxt *ctxt; - - ctxt = kmalloc(sizeof(*ctxt), flags); - if (ctxt) { - ctxt->xprt = xprt; - INIT_LIST_HEAD(&ctxt->list); - } - return ctxt; -} - -static bool svc_rdma_prealloc_ctxts(struct svcxprt_rdma *xprt) -{ - unsigned int i; - - i = xprt->sc_sq_depth; - while (i--) { - struct svc_rdma_op_ctxt *ctxt; - - ctxt = alloc_ctxt(xprt, GFP_KERNEL); - if (!ctxt) { - dprintk("svcrdma: No memory for RDMA ctxt\n"); - return false; - } - list_add(&ctxt->list, &xprt->sc_ctxts); - } - return true; -} - -struct svc_rdma_op_ctxt *svc_rdma_get_context(struct svcxprt_rdma *xprt) -{ - struct svc_rdma_op_ctxt *ctxt = NULL; - - spin_lock(&xprt->sc_ctxt_lock); - xprt->sc_ctxt_used++; - if (list_empty(&xprt->sc_ctxts)) - goto out_empty; - - ctxt = list_first_entry(&xprt->sc_ctxts, - struct svc_rdma_op_ctxt, list); - list_del(&ctxt->list); - spin_unlock(&xprt->sc_ctxt_lock); - -out: - ctxt->count = 0; - ctxt->mapped_sges = 0; - return ctxt; - -out_empty: - /* Either pre-allocation missed the mark, or send - * queue accounting is broken. - */ - spin_unlock(&xprt->sc_ctxt_lock); - - ctxt = alloc_ctxt(xprt, GFP_NOIO); - if (ctxt) - goto out; - - spin_lock(&xprt->sc_ctxt_lock); - xprt->sc_ctxt_used--; - spin_unlock(&xprt->sc_ctxt_lock); - WARN_ONCE(1, "svcrdma: empty RDMA ctxt list?\n"); - return NULL; -} - -void svc_rdma_unmap_dma(struct svc_rdma_op_ctxt *ctxt) -{ - struct svcxprt_rdma *xprt = ctxt->xprt; - struct ib_device *device = xprt->sc_cm_id->device; - unsigned int i; - - for (i = 0; i < ctxt->mapped_sges; i++) - ib_dma_unmap_page(device, - ctxt->sge[i].addr, - ctxt->sge[i].length, - ctxt->direction); - ctxt->mapped_sges = 0; -} - -void svc_rdma_put_context(struct svc_rdma_op_ctxt *ctxt, int free_pages) -{ - struct svcxprt_rdma *xprt = ctxt->xprt; - int i; - - if (free_pages) - for (i = 0; i < ctxt->count; i++) - put_page(ctxt->pages[i]); - - spin_lock(&xprt->sc_ctxt_lock); - xprt->sc_ctxt_used--; - list_add(&ctxt->list, &xprt->sc_ctxts); - spin_unlock(&xprt->sc_ctxt_lock); -} - -static void svc_rdma_destroy_ctxts(struct svcxprt_rdma *xprt) -{ - while (!list_empty(&xprt->sc_ctxts)) { - struct svc_rdma_op_ctxt *ctxt; - - ctxt = list_first_entry(&xprt->sc_ctxts, - struct svc_rdma_op_ctxt, list); - list_del(&ctxt->list); - kfree(ctxt); - } -} - /* QP event handler */ static void qp_event_handler(struct ib_event *event, void *context) { @@ -292,39 +185,6 @@ static void qp_event_handler(struct ib_event *event, void *context) } } -/** - * svc_rdma_wc_send - Invoked by RDMA provider for each polled Send WC - * @cq: completion queue - * @wc: completed WR - * - */ -void svc_rdma_wc_send(struct ib_cq *cq, struct ib_wc *wc) -{ - struct svcxprt_rdma *xprt = cq->cq_context; - struct ib_cqe *cqe = wc->wr_cqe; - struct svc_rdma_op_ctxt *ctxt; - - trace_svcrdma_wc_send(wc); - - atomic_inc(&xprt->sc_sq_avail); - wake_up(&xprt->sc_send_wait); - - ctxt = container_of(cqe, struct svc_rdma_op_ctxt, cqe); - svc_rdma_unmap_dma(ctxt); - svc_rdma_put_context(ctxt, 1); - - if (unlikely(wc->status != IB_WC_SUCCESS)) { - set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags); - svc_xprt_enqueue(&xprt->sc_xprt); - if (wc->status != IB_WC_WR_FLUSH_ERR) - pr_err("svcrdma: Send: %s (%u/0x%x)\n", - ib_wc_status_msg(wc->status), - wc->status, wc->vendor_err); - } - - svc_xprt_put(&xprt->sc_xprt); -} - static struct svcxprt_rdma *svc_rdma_create_xprt(struct svc_serv *serv, struct net *net) { @@ -338,14 +198,14 @@ static struct svcxprt_rdma *svc_rdma_create_xprt(struct svc_serv *serv, INIT_LIST_HEAD(&cma_xprt->sc_accept_q); INIT_LIST_HEAD(&cma_xprt->sc_rq_dto_q); INIT_LIST_HEAD(&cma_xprt->sc_read_complete_q); - INIT_LIST_HEAD(&cma_xprt->sc_ctxts); + INIT_LIST_HEAD(&cma_xprt->sc_send_ctxts); INIT_LIST_HEAD(&cma_xprt->sc_recv_ctxts); INIT_LIST_HEAD(&cma_xprt->sc_rw_ctxts); init_waitqueue_head(&cma_xprt->sc_send_wait); spin_lock_init(&cma_xprt->sc_lock); spin_lock_init(&cma_xprt->sc_rq_dto_lock); - spin_lock_init(&cma_xprt->sc_ctxt_lock); + spin_lock_init(&cma_xprt->sc_send_lock); spin_lock_init(&cma_xprt->sc_recv_lock); spin_lock_init(&cma_xprt->sc_rw_ctxt_lock); @@ -640,9 +500,6 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt) } atomic_set(&newxprt->sc_sq_avail, newxprt->sc_sq_depth); - if (!svc_rdma_prealloc_ctxts(newxprt)) - goto errout; - newxprt->sc_pd = ib_alloc_pd(dev, 0); if (IS_ERR(newxprt->sc_pd)) { dprintk("svcrdma: error creating PD for connect request\n"); @@ -794,11 +651,6 @@ static void __svc_rdma_free(struct work_struct *work) svc_rdma_flush_recv_queues(rdma); - /* Warn if we leaked a resource or under-referenced */ - if (rdma->sc_ctxt_used != 0) - pr_err("svcrdma: ctxt still in use? (%d)\n", - rdma->sc_ctxt_used); - /* Final put of backchannel client transport */ if (xprt->xpt_bc_xprt) { xprt_put(xprt->xpt_bc_xprt); @@ -806,7 +658,7 @@ static void __svc_rdma_free(struct work_struct *work) } svc_rdma_destroy_rw_ctxts(rdma); - svc_rdma_destroy_ctxts(rdma); + svc_rdma_send_ctxts_destroy(rdma); svc_rdma_recv_ctxts_destroy(rdma); /* Destroy the QP if present (not a listener) */ @@ -860,52 +712,3 @@ static void svc_rdma_secure_port(struct svc_rqst *rqstp) static void svc_rdma_kill_temp_xprt(struct svc_xprt *xprt) { } - -int svc_rdma_send(struct svcxprt_rdma *xprt, struct ib_send_wr *wr) -{ - struct ib_send_wr *bad_wr, *n_wr; - int wr_count; - int i; - int ret; - - if (test_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags)) - return -ENOTCONN; - - wr_count = 1; - for (n_wr = wr->next; n_wr; n_wr = n_wr->next) - wr_count++; - - /* If the SQ is full, wait until an SQ entry is available */ - while (1) { - if ((atomic_sub_return(wr_count, &xprt->sc_sq_avail) < 0)) { - atomic_inc(&rdma_stat_sq_starve); - trace_svcrdma_sq_full(xprt); - atomic_add(wr_count, &xprt->sc_sq_avail); - wait_event(xprt->sc_send_wait, - atomic_read(&xprt->sc_sq_avail) > wr_count); - if (test_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags)) - return -ENOTCONN; - trace_svcrdma_sq_retry(xprt); - continue; - } - /* Take a transport ref for each WR posted */ - for (i = 0; i < wr_count; i++) - svc_xprt_get(&xprt->sc_xprt); - - /* Bump used SQ WR count and post */ - ret = ib_post_send(xprt->sc_qp, wr, &bad_wr); - trace_svcrdma_post_send(wr, ret); - if (ret) { - set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags); - for (i = 0; i < wr_count; i ++) - svc_xprt_put(&xprt->sc_xprt); - dprintk("svcrdma: failed to post SQ WR rc=%d\n", ret); - dprintk(" sc_sq_avail=%d, sc_sq_depth=%d\n", - atomic_read(&xprt->sc_sq_avail), - xprt->sc_sq_depth); - wake_up(&xprt->sc_send_wait); - } - break; - } - return ret; -} -- GitLab