diff --git a/include/linux/sunrpc/svc_rdma.h b/include/linux/sunrpc/svc_rdma.h index 88da0c9bd7b1a08f2718164e583aa9aa1296e216..37f759d653487b8af03f9961e7f4f1e747866d3b 100644 --- a/include/linux/sunrpc/svc_rdma.h +++ b/include/linux/sunrpc/svc_rdma.h @@ -128,6 +128,9 @@ struct svcxprt_rdma { unsigned long sc_flags; struct list_head sc_read_complete_q; struct work_struct sc_work; + + spinlock_t sc_recv_lock; + struct list_head sc_recv_ctxts; }; /* sc_flags */ #define RDMAXPRT_CONN_PENDING 3 @@ -142,6 +145,19 @@ struct svcxprt_rdma { #define RPCSVC_MAXPAYLOAD_RDMA RPCSVC_MAXPAYLOAD +struct svc_rdma_recv_ctxt { + struct list_head rc_list; + struct ib_recv_wr rc_recv_wr; + struct ib_cqe rc_cqe; + struct xdr_buf rc_arg; + u32 rc_byte_len; + unsigned int rc_page_count; + unsigned int rc_hdr_count; + struct ib_sge rc_sges[1 + + RPCRDMA_MAX_INLINE_THRESH / PAGE_SIZE]; + struct page *rc_pages[RPCSVC_MAXPAGES]; +}; + /* Track DMA maps for this transport and context */ static inline void svc_rdma_count_mappings(struct svcxprt_rdma *rdma, struct svc_rdma_op_ctxt *ctxt) @@ -155,13 +171,19 @@ extern int svc_rdma_handle_bc_reply(struct rpc_xprt *xprt, struct xdr_buf *rcvbuf); /* svc_rdma_recvfrom.c */ +extern void svc_rdma_recv_ctxts_destroy(struct svcxprt_rdma *rdma); +extern bool svc_rdma_post_recvs(struct svcxprt_rdma *rdma); +extern void svc_rdma_recv_ctxt_put(struct svcxprt_rdma *rdma, + struct svc_rdma_recv_ctxt *ctxt, + int free_pages); +extern void svc_rdma_flush_recv_queues(struct svcxprt_rdma *rdma); extern int svc_rdma_recvfrom(struct svc_rqst *); /* svc_rdma_rw.c */ extern void svc_rdma_destroy_rw_ctxts(struct svcxprt_rdma *rdma); extern int svc_rdma_recv_read_chunk(struct svcxprt_rdma *rdma, struct svc_rqst *rqstp, - struct svc_rdma_op_ctxt *head, __be32 *p); + struct svc_rdma_recv_ctxt *head, __be32 *p); extern int svc_rdma_send_write_chunk(struct svcxprt_rdma *rdma, __be32 *wr_ch, struct xdr_buf *xdr); extern int svc_rdma_send_reply_chunk(struct svcxprt_rdma *rdma, diff --git a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c index 330d542fd96e5a0129baf11ee43c740d38cd998d..b7d9c55ee896b3c9c5b4a400ffd77e6c04f5ccdb 100644 --- a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c +++ b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c @@ -1,6 +1,6 @@ // SPDX-License-Identifier: GPL-2.0 OR BSD-3-Clause /* - * Copyright (c) 2016, 2017 Oracle. All rights reserved. + * Copyright (c) 2016-2018 Oracle. All rights reserved. * Copyright (c) 2014 Open Grid Computing, Inc. All rights reserved. * Copyright (c) 2005-2006 Network Appliance, Inc. All rights reserved. * @@ -61,7 +61,7 @@ * svc_rdma_recvfrom must post RDMA Reads to pull the RPC Call's * data payload from the client. svc_rdma_recvfrom sets up the * RDMA Reads using pages in svc_rqst::rq_pages, which are - * transferred to an svc_rdma_op_ctxt for the duration of the + * transferred to an svc_rdma_recv_ctxt for the duration of the * I/O. svc_rdma_recvfrom then returns zero, since the RPC message * is still not yet ready. * @@ -70,18 +70,18 @@ * svc_rdma_recvfrom again. This second call may use a different * svc_rqst than the first one, thus any information that needs * to be preserved across these two calls is kept in an - * svc_rdma_op_ctxt. + * svc_rdma_recv_ctxt. * * The second call to svc_rdma_recvfrom performs final assembly * of the RPC Call message, using the RDMA Read sink pages kept in - * the svc_rdma_op_ctxt. The xdr_buf is copied from the - * svc_rdma_op_ctxt to the second svc_rqst. The second call returns + * the svc_rdma_recv_ctxt. The xdr_buf is copied from the + * svc_rdma_recv_ctxt to the second svc_rqst. The second call returns * the length of the completed RPC Call message. * * Page Management * * Pages under I/O must be transferred from the first svc_rqst to an - * svc_rdma_op_ctxt before the first svc_rdma_recvfrom call returns. + * svc_rdma_recv_ctxt before the first svc_rdma_recvfrom call returns. * * The first svc_rqst supplies pages for RDMA Reads. These are moved * from rqstp::rq_pages into ctxt::pages. The consumed elements of @@ -89,7 +89,7 @@ * svc_rdma_recvfrom call returns. * * During the second svc_rdma_recvfrom call, RDMA Read sink pages - * are transferred from the svc_rdma_op_ctxt to the second svc_rqst + * are transferred from the svc_rdma_recv_ctxt to the second svc_rqst * (see rdma_read_complete() below). */ @@ -108,13 +108,247 @@ #define RPCDBG_FACILITY RPCDBG_SVCXPRT +static void svc_rdma_wc_receive(struct ib_cq *cq, struct ib_wc *wc); + +static inline struct svc_rdma_recv_ctxt * +svc_rdma_next_recv_ctxt(struct list_head *list) +{ + return list_first_entry_or_null(list, struct svc_rdma_recv_ctxt, + rc_list); +} + +/** + * svc_rdma_recv_ctxts_destroy - Release all recv_ctxt's for an xprt + * @rdma: svcxprt_rdma being torn down + * + */ +void svc_rdma_recv_ctxts_destroy(struct svcxprt_rdma *rdma) +{ + struct svc_rdma_recv_ctxt *ctxt; + + while ((ctxt = svc_rdma_next_recv_ctxt(&rdma->sc_recv_ctxts))) { + list_del(&ctxt->rc_list); + kfree(ctxt); + } +} + +static struct svc_rdma_recv_ctxt * +svc_rdma_recv_ctxt_get(struct svcxprt_rdma *rdma) +{ + struct svc_rdma_recv_ctxt *ctxt; + + spin_lock(&rdma->sc_recv_lock); + ctxt = svc_rdma_next_recv_ctxt(&rdma->sc_recv_ctxts); + if (!ctxt) + goto out_empty; + list_del(&ctxt->rc_list); + spin_unlock(&rdma->sc_recv_lock); + +out: + ctxt->rc_recv_wr.num_sge = 0; + ctxt->rc_page_count = 0; + return ctxt; + +out_empty: + spin_unlock(&rdma->sc_recv_lock); + + ctxt = kmalloc(sizeof(*ctxt), GFP_KERNEL); + if (!ctxt) + return NULL; + goto out; +} + +static void svc_rdma_recv_ctxt_unmap(struct svcxprt_rdma *rdma, + struct svc_rdma_recv_ctxt *ctxt) +{ + struct ib_device *device = rdma->sc_cm_id->device; + int i; + + for (i = 0; i < ctxt->rc_recv_wr.num_sge; i++) + ib_dma_unmap_page(device, + ctxt->rc_sges[i].addr, + ctxt->rc_sges[i].length, + DMA_FROM_DEVICE); +} + +/** + * svc_rdma_recv_ctxt_put - Return recv_ctxt to free list + * @rdma: controlling svcxprt_rdma + * @ctxt: object to return to the free list + * @free_pages: Non-zero if rc_pages should be freed + * + */ +void svc_rdma_recv_ctxt_put(struct svcxprt_rdma *rdma, + struct svc_rdma_recv_ctxt *ctxt, + int free_pages) +{ + unsigned int i; + + if (free_pages) + for (i = 0; i < ctxt->rc_page_count; i++) + put_page(ctxt->rc_pages[i]); + spin_lock(&rdma->sc_recv_lock); + list_add(&ctxt->rc_list, &rdma->sc_recv_ctxts); + spin_unlock(&rdma->sc_recv_lock); +} + +static int svc_rdma_post_recv(struct svcxprt_rdma *rdma) +{ + struct ib_device *device = rdma->sc_cm_id->device; + struct svc_rdma_recv_ctxt *ctxt; + struct ib_recv_wr *bad_recv_wr; + int sge_no, buflen, ret; + struct page *page; + dma_addr_t pa; + + ctxt = svc_rdma_recv_ctxt_get(rdma); + if (!ctxt) + return -ENOMEM; + + buflen = 0; + ctxt->rc_cqe.done = svc_rdma_wc_receive; + for (sge_no = 0; buflen < rdma->sc_max_req_size; sge_no++) { + if (sge_no >= rdma->sc_max_sge) { + pr_err("svcrdma: Too many sges (%d)\n", sge_no); + goto err_put_ctxt; + } + + page = alloc_page(GFP_KERNEL); + if (!page) + goto err_put_ctxt; + ctxt->rc_pages[sge_no] = page; + ctxt->rc_page_count++; + + pa = ib_dma_map_page(device, ctxt->rc_pages[sge_no], + 0, PAGE_SIZE, DMA_FROM_DEVICE); + if (ib_dma_mapping_error(device, pa)) + goto err_put_ctxt; + ctxt->rc_sges[sge_no].addr = pa; + ctxt->rc_sges[sge_no].length = PAGE_SIZE; + ctxt->rc_sges[sge_no].lkey = rdma->sc_pd->local_dma_lkey; + ctxt->rc_recv_wr.num_sge++; + + buflen += PAGE_SIZE; + } + ctxt->rc_recv_wr.next = NULL; + ctxt->rc_recv_wr.sg_list = &ctxt->rc_sges[0]; + ctxt->rc_recv_wr.wr_cqe = &ctxt->rc_cqe; + + svc_xprt_get(&rdma->sc_xprt); + ret = ib_post_recv(rdma->sc_qp, &ctxt->rc_recv_wr, &bad_recv_wr); + trace_svcrdma_post_recv(&ctxt->rc_recv_wr, ret); + if (ret) + goto err_post; + return 0; + +err_put_ctxt: + svc_rdma_recv_ctxt_unmap(rdma, ctxt); + svc_rdma_recv_ctxt_put(rdma, ctxt, 1); + return -ENOMEM; +err_post: + svc_rdma_recv_ctxt_unmap(rdma, ctxt); + svc_rdma_recv_ctxt_put(rdma, ctxt, 1); + svc_xprt_put(&rdma->sc_xprt); + return ret; +} + +/** + * svc_rdma_post_recvs - Post initial set of Recv WRs + * @rdma: fresh svcxprt_rdma + * + * Returns true if successful, otherwise false. + */ +bool svc_rdma_post_recvs(struct svcxprt_rdma *rdma) +{ + unsigned int i; + int ret; + + for (i = 0; i < rdma->sc_max_requests; i++) { + ret = svc_rdma_post_recv(rdma); + if (ret) { + pr_err("svcrdma: failure posting recv buffers: %d\n", + ret); + return false; + } + } + return true; +} + +/** + * svc_rdma_wc_receive - Invoked by RDMA provider for each polled Receive WC + * @cq: Completion Queue context + * @wc: Work Completion object + * + * NB: The svc_xprt/svcxprt_rdma is pinned whenever it's possible that + * the Receive completion handler could be running. + */ +static void svc_rdma_wc_receive(struct ib_cq *cq, struct ib_wc *wc) +{ + struct svcxprt_rdma *rdma = cq->cq_context; + struct ib_cqe *cqe = wc->wr_cqe; + struct svc_rdma_recv_ctxt *ctxt; + + trace_svcrdma_wc_receive(wc); + + /* WARNING: Only wc->wr_cqe and wc->status are reliable */ + ctxt = container_of(cqe, struct svc_rdma_recv_ctxt, rc_cqe); + svc_rdma_recv_ctxt_unmap(rdma, ctxt); + + if (wc->status != IB_WC_SUCCESS) + goto flushed; + + if (svc_rdma_post_recv(rdma)) + goto post_err; + + /* All wc fields are now known to be valid */ + ctxt->rc_byte_len = wc->byte_len; + spin_lock(&rdma->sc_rq_dto_lock); + list_add_tail(&ctxt->rc_list, &rdma->sc_rq_dto_q); + spin_unlock(&rdma->sc_rq_dto_lock); + set_bit(XPT_DATA, &rdma->sc_xprt.xpt_flags); + if (!test_bit(RDMAXPRT_CONN_PENDING, &rdma->sc_flags)) + svc_xprt_enqueue(&rdma->sc_xprt); + goto out; + +flushed: + if (wc->status != IB_WC_WR_FLUSH_ERR) + pr_err("svcrdma: Recv: %s (%u/0x%x)\n", + ib_wc_status_msg(wc->status), + wc->status, wc->vendor_err); +post_err: + svc_rdma_recv_ctxt_put(rdma, ctxt, 1); + set_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags); + svc_xprt_enqueue(&rdma->sc_xprt); +out: + svc_xprt_put(&rdma->sc_xprt); +} + +/** + * svc_rdma_flush_recv_queues - Drain pending Receive work + * @rdma: svcxprt_rdma being shut down + * + */ +void svc_rdma_flush_recv_queues(struct svcxprt_rdma *rdma) +{ + struct svc_rdma_recv_ctxt *ctxt; + + while ((ctxt = svc_rdma_next_recv_ctxt(&rdma->sc_read_complete_q))) { + list_del(&ctxt->rc_list); + svc_rdma_recv_ctxt_put(rdma, ctxt, 1); + } + while ((ctxt = svc_rdma_next_recv_ctxt(&rdma->sc_rq_dto_q))) { + list_del(&ctxt->rc_list); + svc_rdma_recv_ctxt_put(rdma, ctxt, 1); + } +} + /* * Replace the pages in the rq_argpages array with the pages from the SGE in * the RDMA_RECV completion. The SGL should contain full pages up until the * last one. */ static void svc_rdma_build_arg_xdr(struct svc_rqst *rqstp, - struct svc_rdma_op_ctxt *ctxt) + struct svc_rdma_recv_ctxt *ctxt) { struct page *page; int sge_no; @@ -123,30 +357,30 @@ static void svc_rdma_build_arg_xdr(struct svc_rqst *rqstp, /* The reply path assumes the Call's transport header resides * in rqstp->rq_pages[0]. */ - page = ctxt->pages[0]; + page = ctxt->rc_pages[0]; put_page(rqstp->rq_pages[0]); rqstp->rq_pages[0] = page; /* Set up the XDR head */ rqstp->rq_arg.head[0].iov_base = page_address(page); rqstp->rq_arg.head[0].iov_len = - min_t(size_t, ctxt->byte_len, ctxt->sge[0].length); - rqstp->rq_arg.len = ctxt->byte_len; - rqstp->rq_arg.buflen = ctxt->byte_len; + min_t(size_t, ctxt->rc_byte_len, ctxt->rc_sges[0].length); + rqstp->rq_arg.len = ctxt->rc_byte_len; + rqstp->rq_arg.buflen = ctxt->rc_byte_len; /* Compute bytes past head in the SGL */ - len = ctxt->byte_len - rqstp->rq_arg.head[0].iov_len; + len = ctxt->rc_byte_len - rqstp->rq_arg.head[0].iov_len; /* If data remains, store it in the pagelist */ rqstp->rq_arg.page_len = len; rqstp->rq_arg.page_base = 0; sge_no = 1; - while (len && sge_no < ctxt->count) { - page = ctxt->pages[sge_no]; + while (len && sge_no < ctxt->rc_recv_wr.num_sge) { + page = ctxt->rc_pages[sge_no]; put_page(rqstp->rq_pages[sge_no]); rqstp->rq_pages[sge_no] = page; - len -= min_t(u32, len, ctxt->sge[sge_no].length); + len -= min_t(u32, len, ctxt->rc_sges[sge_no].length); sge_no++; } rqstp->rq_respages = &rqstp->rq_pages[sge_no]; @@ -154,11 +388,11 @@ static void svc_rdma_build_arg_xdr(struct svc_rqst *rqstp, /* If not all pages were used from the SGL, free the remaining ones */ len = sge_no; - while (sge_no < ctxt->count) { - page = ctxt->pages[sge_no++]; + while (sge_no < ctxt->rc_recv_wr.num_sge) { + page = ctxt->rc_pages[sge_no++]; put_page(page); } - ctxt->count = len; + ctxt->rc_page_count = len; /* Set up tail */ rqstp->rq_arg.tail[0].iov_base = NULL; @@ -364,29 +598,29 @@ static int svc_rdma_xdr_decode_req(struct xdr_buf *rq_arg) } static void rdma_read_complete(struct svc_rqst *rqstp, - struct svc_rdma_op_ctxt *head) + struct svc_rdma_recv_ctxt *head) { int page_no; /* Copy RPC pages */ - for (page_no = 0; page_no < head->count; page_no++) { + for (page_no = 0; page_no < head->rc_page_count; page_no++) { put_page(rqstp->rq_pages[page_no]); - rqstp->rq_pages[page_no] = head->pages[page_no]; + rqstp->rq_pages[page_no] = head->rc_pages[page_no]; } /* Point rq_arg.pages past header */ - rqstp->rq_arg.pages = &rqstp->rq_pages[head->hdr_count]; - rqstp->rq_arg.page_len = head->arg.page_len; + rqstp->rq_arg.pages = &rqstp->rq_pages[head->rc_hdr_count]; + rqstp->rq_arg.page_len = head->rc_arg.page_len; /* rq_respages starts after the last arg page */ rqstp->rq_respages = &rqstp->rq_pages[page_no]; rqstp->rq_next_page = rqstp->rq_respages + 1; /* Rebuild rq_arg head and tail. */ - rqstp->rq_arg.head[0] = head->arg.head[0]; - rqstp->rq_arg.tail[0] = head->arg.tail[0]; - rqstp->rq_arg.len = head->arg.len; - rqstp->rq_arg.buflen = head->arg.buflen; + rqstp->rq_arg.head[0] = head->rc_arg.head[0]; + rqstp->rq_arg.tail[0] = head->rc_arg.tail[0]; + rqstp->rq_arg.len = head->rc_arg.len; + rqstp->rq_arg.buflen = head->rc_arg.buflen; } static void svc_rdma_send_error(struct svcxprt_rdma *xprt, @@ -506,28 +740,26 @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp) struct svc_xprt *xprt = rqstp->rq_xprt; struct svcxprt_rdma *rdma_xprt = container_of(xprt, struct svcxprt_rdma, sc_xprt); - struct svc_rdma_op_ctxt *ctxt; + struct svc_rdma_recv_ctxt *ctxt; __be32 *p; int ret; spin_lock(&rdma_xprt->sc_rq_dto_lock); - if (!list_empty(&rdma_xprt->sc_read_complete_q)) { - ctxt = list_first_entry(&rdma_xprt->sc_read_complete_q, - struct svc_rdma_op_ctxt, list); - list_del(&ctxt->list); + ctxt = svc_rdma_next_recv_ctxt(&rdma_xprt->sc_read_complete_q); + if (ctxt) { + list_del(&ctxt->rc_list); spin_unlock(&rdma_xprt->sc_rq_dto_lock); rdma_read_complete(rqstp, ctxt); goto complete; - } else if (!list_empty(&rdma_xprt->sc_rq_dto_q)) { - ctxt = list_first_entry(&rdma_xprt->sc_rq_dto_q, - struct svc_rdma_op_ctxt, list); - list_del(&ctxt->list); - } else { + } + ctxt = svc_rdma_next_recv_ctxt(&rdma_xprt->sc_rq_dto_q); + if (!ctxt) { /* No new incoming requests, terminate the loop */ clear_bit(XPT_DATA, &xprt->xpt_flags); spin_unlock(&rdma_xprt->sc_rq_dto_lock); return 0; } + list_del(&ctxt->rc_list); spin_unlock(&rdma_xprt->sc_rq_dto_lock); atomic_inc(&rdma_stat_recv); @@ -545,7 +777,7 @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp) if (svc_rdma_is_backchannel_reply(xprt, p)) { ret = svc_rdma_handle_bc_reply(xprt->xpt_bc_xprt, p, &rqstp->rq_arg); - svc_rdma_put_context(ctxt, 0); + svc_rdma_recv_ctxt_put(rdma_xprt, ctxt, 0); return ret; } @@ -554,7 +786,7 @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp) goto out_readchunk; complete: - svc_rdma_put_context(ctxt, 0); + svc_rdma_recv_ctxt_put(rdma_xprt, ctxt, 0); rqstp->rq_prot = IPPROTO_MAX; svc_xprt_copy_addrs(rqstp, xprt); return rqstp->rq_arg.len; @@ -567,16 +799,16 @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp) out_err: svc_rdma_send_error(rdma_xprt, p, ret); - svc_rdma_put_context(ctxt, 0); + svc_rdma_recv_ctxt_put(rdma_xprt, ctxt, 0); return 0; out_postfail: if (ret == -EINVAL) svc_rdma_send_error(rdma_xprt, p, ret); - svc_rdma_put_context(ctxt, 1); + svc_rdma_recv_ctxt_put(rdma_xprt, ctxt, 1); return ret; out_drop: - svc_rdma_put_context(ctxt, 1); + svc_rdma_recv_ctxt_put(rdma_xprt, ctxt, 1); return 0; } diff --git a/net/sunrpc/xprtrdma/svc_rdma_rw.c b/net/sunrpc/xprtrdma/svc_rdma_rw.c index 887ceef125b285f5123528a28614e9225fe6897c..c080ce20ff4044381981e44150944ef038ed6370 100644 --- a/net/sunrpc/xprtrdma/svc_rdma_rw.c +++ b/net/sunrpc/xprtrdma/svc_rdma_rw.c @@ -1,6 +1,6 @@ // SPDX-License-Identifier: GPL-2.0 /* - * Copyright (c) 2016 Oracle. All rights reserved. + * Copyright (c) 2016-2018 Oracle. All rights reserved. * * Use the core R/W API to move RPC-over-RDMA Read and Write chunks. */ @@ -227,7 +227,7 @@ static void svc_rdma_write_done(struct ib_cq *cq, struct ib_wc *wc) /* State for pulling a Read chunk. */ struct svc_rdma_read_info { - struct svc_rdma_op_ctxt *ri_readctxt; + struct svc_rdma_recv_ctxt *ri_readctxt; unsigned int ri_position; unsigned int ri_pageno; unsigned int ri_pageoff; @@ -282,10 +282,10 @@ static void svc_rdma_wc_read_done(struct ib_cq *cq, struct ib_wc *wc) pr_err("svcrdma: read ctx: %s (%u/0x%x)\n", ib_wc_status_msg(wc->status), wc->status, wc->vendor_err); - svc_rdma_put_context(info->ri_readctxt, 1); + svc_rdma_recv_ctxt_put(rdma, info->ri_readctxt, 1); } else { spin_lock(&rdma->sc_rq_dto_lock); - list_add_tail(&info->ri_readctxt->list, + list_add_tail(&info->ri_readctxt->rc_list, &rdma->sc_read_complete_q); spin_unlock(&rdma->sc_rq_dto_lock); @@ -607,7 +607,7 @@ static int svc_rdma_build_read_segment(struct svc_rdma_read_info *info, struct svc_rqst *rqstp, u32 rkey, u32 len, u64 offset) { - struct svc_rdma_op_ctxt *head = info->ri_readctxt; + struct svc_rdma_recv_ctxt *head = info->ri_readctxt; struct svc_rdma_chunk_ctxt *cc = &info->ri_cc; struct svc_rdma_rw_ctxt *ctxt; unsigned int sge_no, seg_len; @@ -625,10 +625,10 @@ static int svc_rdma_build_read_segment(struct svc_rdma_read_info *info, seg_len = min_t(unsigned int, len, PAGE_SIZE - info->ri_pageoff); - head->arg.pages[info->ri_pageno] = + head->rc_arg.pages[info->ri_pageno] = rqstp->rq_pages[info->ri_pageno]; if (!info->ri_pageoff) - head->count++; + head->rc_page_count++; sg_set_page(sg, rqstp->rq_pages[info->ri_pageno], seg_len, info->ri_pageoff); @@ -705,9 +705,9 @@ static int svc_rdma_build_read_chunk(struct svc_rqst *rqstp, } /* Construct RDMA Reads to pull over a normal Read chunk. The chunk - * data lands in the page list of head->arg.pages. + * data lands in the page list of head->rc_arg.pages. * - * Currently NFSD does not look at the head->arg.tail[0] iovec. + * Currently NFSD does not look at the head->rc_arg.tail[0] iovec. * Therefore, XDR round-up of the Read chunk and trailing * inline content must both be added at the end of the pagelist. */ @@ -715,10 +715,10 @@ static int svc_rdma_build_normal_read_chunk(struct svc_rqst *rqstp, struct svc_rdma_read_info *info, __be32 *p) { - struct svc_rdma_op_ctxt *head = info->ri_readctxt; + struct svc_rdma_recv_ctxt *head = info->ri_readctxt; int ret; - info->ri_pageno = head->hdr_count; + info->ri_pageno = head->rc_hdr_count; info->ri_pageoff = 0; ret = svc_rdma_build_read_chunk(rqstp, info, p); @@ -732,11 +732,11 @@ static int svc_rdma_build_normal_read_chunk(struct svc_rqst *rqstp, * chunk is not included in either the pagelist or in * the tail. */ - head->arg.tail[0].iov_base = - head->arg.head[0].iov_base + info->ri_position; - head->arg.tail[0].iov_len = - head->arg.head[0].iov_len - info->ri_position; - head->arg.head[0].iov_len = info->ri_position; + head->rc_arg.tail[0].iov_base = + head->rc_arg.head[0].iov_base + info->ri_position; + head->rc_arg.tail[0].iov_len = + head->rc_arg.head[0].iov_len - info->ri_position; + head->rc_arg.head[0].iov_len = info->ri_position; /* Read chunk may need XDR roundup (see RFC 8166, s. 3.4.5.2). * @@ -749,9 +749,9 @@ static int svc_rdma_build_normal_read_chunk(struct svc_rqst *rqstp, */ info->ri_chunklen = XDR_QUADLEN(info->ri_chunklen) << 2; - head->arg.page_len = info->ri_chunklen; - head->arg.len += info->ri_chunklen; - head->arg.buflen += info->ri_chunklen; + head->rc_arg.page_len = info->ri_chunklen; + head->rc_arg.len += info->ri_chunklen; + head->rc_arg.buflen += info->ri_chunklen; out: return ret; @@ -760,7 +760,7 @@ static int svc_rdma_build_normal_read_chunk(struct svc_rqst *rqstp, /* Construct RDMA Reads to pull over a Position Zero Read chunk. * The start of the data lands in the first page just after * the Transport header, and the rest lands in the page list of - * head->arg.pages. + * head->rc_arg.pages. * * Assumptions: * - A PZRC has an XDR-aligned length (no implicit round-up). @@ -772,11 +772,11 @@ static int svc_rdma_build_pz_read_chunk(struct svc_rqst *rqstp, struct svc_rdma_read_info *info, __be32 *p) { - struct svc_rdma_op_ctxt *head = info->ri_readctxt; + struct svc_rdma_recv_ctxt *head = info->ri_readctxt; int ret; - info->ri_pageno = head->hdr_count - 1; - info->ri_pageoff = offset_in_page(head->byte_len); + info->ri_pageno = head->rc_hdr_count - 1; + info->ri_pageoff = offset_in_page(head->rc_byte_len); ret = svc_rdma_build_read_chunk(rqstp, info, p); if (ret < 0) @@ -784,22 +784,22 @@ static int svc_rdma_build_pz_read_chunk(struct svc_rqst *rqstp, trace_svcrdma_encode_pzr(info->ri_chunklen); - head->arg.len += info->ri_chunklen; - head->arg.buflen += info->ri_chunklen; + head->rc_arg.len += info->ri_chunklen; + head->rc_arg.buflen += info->ri_chunklen; - if (head->arg.buflen <= head->sge[0].length) { + if (head->rc_arg.buflen <= head->rc_sges[0].length) { /* Transport header and RPC message fit entirely * in page where head iovec resides. */ - head->arg.head[0].iov_len = info->ri_chunklen; + head->rc_arg.head[0].iov_len = info->ri_chunklen; } else { /* Transport header and part of RPC message reside * in the head iovec's page. */ - head->arg.head[0].iov_len = - head->sge[0].length - head->byte_len; - head->arg.page_len = - info->ri_chunklen - head->arg.head[0].iov_len; + head->rc_arg.head[0].iov_len = + head->rc_sges[0].length - head->rc_byte_len; + head->rc_arg.page_len = + info->ri_chunklen - head->rc_arg.head[0].iov_len; } out: @@ -824,24 +824,24 @@ static int svc_rdma_build_pz_read_chunk(struct svc_rqst *rqstp, * - All Read segments in @p have the same Position value. */ int svc_rdma_recv_read_chunk(struct svcxprt_rdma *rdma, struct svc_rqst *rqstp, - struct svc_rdma_op_ctxt *head, __be32 *p) + struct svc_rdma_recv_ctxt *head, __be32 *p) { struct svc_rdma_read_info *info; struct page **page; int ret; /* The request (with page list) is constructed in - * head->arg. Pages involved with RDMA Read I/O are + * head->rc_arg. Pages involved with RDMA Read I/O are * transferred there. */ - head->hdr_count = head->count; - head->arg.head[0] = rqstp->rq_arg.head[0]; - head->arg.tail[0] = rqstp->rq_arg.tail[0]; - head->arg.pages = head->pages; - head->arg.page_base = 0; - head->arg.page_len = 0; - head->arg.len = rqstp->rq_arg.len; - head->arg.buflen = rqstp->rq_arg.buflen; + head->rc_hdr_count = head->rc_page_count; + head->rc_arg.head[0] = rqstp->rq_arg.head[0]; + head->rc_arg.tail[0] = rqstp->rq_arg.tail[0]; + head->rc_arg.pages = head->rc_pages; + head->rc_arg.page_base = 0; + head->rc_arg.page_len = 0; + head->rc_arg.len = rqstp->rq_arg.len; + head->rc_arg.buflen = rqstp->rq_arg.buflen; info = svc_rdma_read_info_alloc(rdma); if (!info) @@ -867,7 +867,7 @@ int svc_rdma_recv_read_chunk(struct svcxprt_rdma *rdma, struct svc_rqst *rqstp, out: /* Read sink pages have been moved from rqstp->rq_pages to - * head->arg.pages. Force svc_recv to refill those slots + * head->rc_arg.pages. Force svc_recv to refill those slots * in rq_pages. */ for (page = rqstp->rq_pages; page < rqstp->rq_respages; page++) diff --git a/net/sunrpc/xprtrdma/svc_rdma_sendto.c b/net/sunrpc/xprtrdma/svc_rdma_sendto.c index fed28de78d37ceb785c737b74142df8e246aac66..a397d9a3d80e9228ee30495268095beeddd56702 100644 --- a/net/sunrpc/xprtrdma/svc_rdma_sendto.c +++ b/net/sunrpc/xprtrdma/svc_rdma_sendto.c @@ -1,6 +1,6 @@ // SPDX-License-Identifier: GPL-2.0 OR BSD-3-Clause /* - * Copyright (c) 2016 Oracle. All rights reserved. + * Copyright (c) 2016-2018 Oracle. All rights reserved. * Copyright (c) 2014 Open Grid Computing, Inc. All rights reserved. * Copyright (c) 2005-2006 Network Appliance, Inc. All rights reserved. * diff --git a/net/sunrpc/xprtrdma/svc_rdma_transport.c b/net/sunrpc/xprtrdma/svc_rdma_transport.c index 05edb18f8ca32d080d386ecc25386bfa3710c94c..05544f2f50d4b815f0b810b6caf3b2e79c9c66a1 100644 --- a/net/sunrpc/xprtrdma/svc_rdma_transport.c +++ b/net/sunrpc/xprtrdma/svc_rdma_transport.c @@ -63,7 +63,6 @@ #define RPCDBG_FACILITY RPCDBG_SVCXPRT -static int svc_rdma_post_recv(struct svcxprt_rdma *xprt); static struct svcxprt_rdma *svc_rdma_create_xprt(struct svc_serv *serv, struct net *net); static struct svc_xprt *svc_rdma_create(struct svc_serv *serv, @@ -175,11 +174,7 @@ static bool svc_rdma_prealloc_ctxts(struct svcxprt_rdma *xprt) { unsigned int i; - /* Each RPC/RDMA credit can consume one Receive and - * one Send WQE at the same time. - */ - i = xprt->sc_sq_depth + xprt->sc_rq_depth; - + i = xprt->sc_sq_depth; while (i--) { struct svc_rdma_op_ctxt *ctxt; @@ -297,54 +292,6 @@ static void qp_event_handler(struct ib_event *event, void *context) } } -/** - * svc_rdma_wc_receive - Invoked by RDMA provider for each polled Receive WC - * @cq: completion queue - * @wc: completed WR - * - */ -static void svc_rdma_wc_receive(struct ib_cq *cq, struct ib_wc *wc) -{ - struct svcxprt_rdma *xprt = cq->cq_context; - struct ib_cqe *cqe = wc->wr_cqe; - struct svc_rdma_op_ctxt *ctxt; - - trace_svcrdma_wc_receive(wc); - - /* WARNING: Only wc->wr_cqe and wc->status are reliable */ - ctxt = container_of(cqe, struct svc_rdma_op_ctxt, cqe); - svc_rdma_unmap_dma(ctxt); - - if (wc->status != IB_WC_SUCCESS) - goto flushed; - - /* All wc fields are now known to be valid */ - ctxt->byte_len = wc->byte_len; - spin_lock(&xprt->sc_rq_dto_lock); - list_add_tail(&ctxt->list, &xprt->sc_rq_dto_q); - spin_unlock(&xprt->sc_rq_dto_lock); - - svc_rdma_post_recv(xprt); - - set_bit(XPT_DATA, &xprt->sc_xprt.xpt_flags); - if (test_bit(RDMAXPRT_CONN_PENDING, &xprt->sc_flags)) - goto out; - goto out_enqueue; - -flushed: - if (wc->status != IB_WC_WR_FLUSH_ERR) - pr_err("svcrdma: Recv: %s (%u/0x%x)\n", - ib_wc_status_msg(wc->status), - wc->status, wc->vendor_err); - set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags); - svc_rdma_put_context(ctxt, 1); - -out_enqueue: - svc_xprt_enqueue(&xprt->sc_xprt); -out: - svc_xprt_put(&xprt->sc_xprt); -} - /** * svc_rdma_wc_send - Invoked by RDMA provider for each polled Send WC * @cq: completion queue @@ -392,12 +339,14 @@ static struct svcxprt_rdma *svc_rdma_create_xprt(struct svc_serv *serv, INIT_LIST_HEAD(&cma_xprt->sc_rq_dto_q); INIT_LIST_HEAD(&cma_xprt->sc_read_complete_q); INIT_LIST_HEAD(&cma_xprt->sc_ctxts); + INIT_LIST_HEAD(&cma_xprt->sc_recv_ctxts); INIT_LIST_HEAD(&cma_xprt->sc_rw_ctxts); init_waitqueue_head(&cma_xprt->sc_send_wait); spin_lock_init(&cma_xprt->sc_lock); spin_lock_init(&cma_xprt->sc_rq_dto_lock); spin_lock_init(&cma_xprt->sc_ctxt_lock); + spin_lock_init(&cma_xprt->sc_recv_lock); spin_lock_init(&cma_xprt->sc_rw_ctxt_lock); /* @@ -411,63 +360,6 @@ static struct svcxprt_rdma *svc_rdma_create_xprt(struct svc_serv *serv, return cma_xprt; } -static int -svc_rdma_post_recv(struct svcxprt_rdma *xprt) -{ - struct ib_recv_wr recv_wr, *bad_recv_wr; - struct svc_rdma_op_ctxt *ctxt; - struct page *page; - dma_addr_t pa; - int sge_no; - int buflen; - int ret; - - ctxt = svc_rdma_get_context(xprt); - buflen = 0; - ctxt->direction = DMA_FROM_DEVICE; - ctxt->cqe.done = svc_rdma_wc_receive; - for (sge_no = 0; buflen < xprt->sc_max_req_size; sge_no++) { - if (sge_no >= xprt->sc_max_sge) { - pr_err("svcrdma: Too many sges (%d)\n", sge_no); - goto err_put_ctxt; - } - page = alloc_page(GFP_KERNEL); - if (!page) - goto err_put_ctxt; - ctxt->pages[sge_no] = page; - pa = ib_dma_map_page(xprt->sc_cm_id->device, - page, 0, PAGE_SIZE, - DMA_FROM_DEVICE); - if (ib_dma_mapping_error(xprt->sc_cm_id->device, pa)) - goto err_put_ctxt; - svc_rdma_count_mappings(xprt, ctxt); - ctxt->sge[sge_no].addr = pa; - ctxt->sge[sge_no].length = PAGE_SIZE; - ctxt->sge[sge_no].lkey = xprt->sc_pd->local_dma_lkey; - ctxt->count = sge_no + 1; - buflen += PAGE_SIZE; - } - recv_wr.next = NULL; - recv_wr.sg_list = &ctxt->sge[0]; - recv_wr.num_sge = ctxt->count; - recv_wr.wr_cqe = &ctxt->cqe; - - svc_xprt_get(&xprt->sc_xprt); - ret = ib_post_recv(xprt->sc_qp, &recv_wr, &bad_recv_wr); - trace_svcrdma_post_recv(&recv_wr, ret); - if (ret) { - svc_rdma_unmap_dma(ctxt); - svc_rdma_put_context(ctxt, 1); - svc_xprt_put(&xprt->sc_xprt); - } - return ret; - - err_put_ctxt: - svc_rdma_unmap_dma(ctxt); - svc_rdma_put_context(ctxt, 1); - return -ENOMEM; -} - static void svc_rdma_parse_connect_private(struct svcxprt_rdma *newxprt, struct rdma_conn_param *param) @@ -698,7 +590,7 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt) struct ib_qp_init_attr qp_attr; struct ib_device *dev; struct sockaddr *sap; - unsigned int i, ctxts; + unsigned int ctxts; int ret = 0; listen_rdma = container_of(xprt, struct svcxprt_rdma, sc_xprt); @@ -803,14 +695,8 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt) !rdma_ib_or_roce(dev, newxprt->sc_port_num)) goto errout; - /* Post receive buffers */ - for (i = 0; i < newxprt->sc_max_requests; i++) { - ret = svc_rdma_post_recv(newxprt); - if (ret) { - dprintk("svcrdma: failure posting receive buffers\n"); - goto errout; - } - } + if (!svc_rdma_post_recvs(newxprt)) + goto errout; /* Swap out the handler */ newxprt->sc_cm_id->event_handler = rdma_cma_handler; @@ -907,20 +793,7 @@ static void __svc_rdma_free(struct work_struct *work) pr_err("svcrdma: sc_xprt still in use? (%d)\n", kref_read(&xprt->xpt_ref)); - while (!list_empty(&rdma->sc_read_complete_q)) { - struct svc_rdma_op_ctxt *ctxt; - ctxt = list_first_entry(&rdma->sc_read_complete_q, - struct svc_rdma_op_ctxt, list); - list_del(&ctxt->list); - svc_rdma_put_context(ctxt, 1); - } - while (!list_empty(&rdma->sc_rq_dto_q)) { - struct svc_rdma_op_ctxt *ctxt; - ctxt = list_first_entry(&rdma->sc_rq_dto_q, - struct svc_rdma_op_ctxt, list); - list_del(&ctxt->list); - svc_rdma_put_context(ctxt, 1); - } + svc_rdma_flush_recv_queues(rdma); /* Warn if we leaked a resource or under-referenced */ if (rdma->sc_ctxt_used != 0) @@ -935,6 +808,7 @@ static void __svc_rdma_free(struct work_struct *work) svc_rdma_destroy_rw_ctxts(rdma); svc_rdma_destroy_ctxts(rdma); + svc_rdma_recv_ctxts_destroy(rdma); /* Destroy the QP if present (not a listener) */ if (rdma->sc_qp && !IS_ERR(rdma->sc_qp))