提交 dbcd00eb 编写于 作者: T Tom Tucker

svcrdma: Fix race with dto_tasklet in svc_rdma_send

The svc_rdma_send function will attempt to reap SQ WR to make room for
a new request if it finds the SQ full. This function races with the
dto_tasklet that also reaps SQ WR. To avoid polling and arming the CQ
unnecessarily move the test_and_clear_bit of the RDMAXPRT_SQ_PENDING
flag and arming of the CQ to the sq_cq_reap function.

Refactor the rq_cq_reap function to match sq_cq_reap so that the
code is easier to follow.
Signed-off-by: NTom Tucker <tom@opengridcomputing.com>
上级 0e7f011a
...@@ -228,23 +228,8 @@ static void dto_tasklet_func(unsigned long data) ...@@ -228,23 +228,8 @@ static void dto_tasklet_func(unsigned long data)
list_del_init(&xprt->sc_dto_q); list_del_init(&xprt->sc_dto_q);
spin_unlock_irqrestore(&dto_lock, flags); spin_unlock_irqrestore(&dto_lock, flags);
if (test_and_clear_bit(RDMAXPRT_RQ_PENDING, &xprt->sc_flags)) { rq_cq_reap(xprt);
ib_req_notify_cq(xprt->sc_rq_cq, IB_CQ_NEXT_COMP); sq_cq_reap(xprt);
rq_cq_reap(xprt);
set_bit(XPT_DATA, &xprt->sc_xprt.xpt_flags);
/*
* If data arrived before established event,
* don't enqueue. This defers RPC I/O until the
* RDMA connection is complete.
*/
if (!test_bit(RDMAXPRT_CONN_PENDING, &xprt->sc_flags))
svc_xprt_enqueue(&xprt->sc_xprt);
}
if (test_and_clear_bit(RDMAXPRT_SQ_PENDING, &xprt->sc_flags)) {
ib_req_notify_cq(xprt->sc_sq_cq, IB_CQ_NEXT_COMP);
sq_cq_reap(xprt);
}
svc_xprt_put(&xprt->sc_xprt); svc_xprt_put(&xprt->sc_xprt);
spin_lock_irqsave(&dto_lock, flags); spin_lock_irqsave(&dto_lock, flags);
...@@ -297,6 +282,10 @@ static void rq_cq_reap(struct svcxprt_rdma *xprt) ...@@ -297,6 +282,10 @@ static void rq_cq_reap(struct svcxprt_rdma *xprt)
struct ib_wc wc; struct ib_wc wc;
struct svc_rdma_op_ctxt *ctxt = NULL; struct svc_rdma_op_ctxt *ctxt = NULL;
if (!test_and_clear_bit(RDMAXPRT_RQ_PENDING, &xprt->sc_flags))
return;
ib_req_notify_cq(xprt->sc_rq_cq, IB_CQ_NEXT_COMP);
atomic_inc(&rdma_stat_rq_poll); atomic_inc(&rdma_stat_rq_poll);
spin_lock_bh(&xprt->sc_rq_dto_lock); spin_lock_bh(&xprt->sc_rq_dto_lock);
...@@ -316,6 +305,15 @@ static void rq_cq_reap(struct svcxprt_rdma *xprt) ...@@ -316,6 +305,15 @@ static void rq_cq_reap(struct svcxprt_rdma *xprt)
if (ctxt) if (ctxt)
atomic_inc(&rdma_stat_rq_prod); atomic_inc(&rdma_stat_rq_prod);
set_bit(XPT_DATA, &xprt->sc_xprt.xpt_flags);
/*
* If data arrived before established event,
* don't enqueue. This defers RPC I/O until the
* RDMA connection is complete.
*/
if (!test_bit(RDMAXPRT_CONN_PENDING, &xprt->sc_flags))
svc_xprt_enqueue(&xprt->sc_xprt);
} }
/* /*
...@@ -328,6 +326,11 @@ static void sq_cq_reap(struct svcxprt_rdma *xprt) ...@@ -328,6 +326,11 @@ static void sq_cq_reap(struct svcxprt_rdma *xprt)
struct ib_cq *cq = xprt->sc_sq_cq; struct ib_cq *cq = xprt->sc_sq_cq;
int ret; int ret;
if (!test_and_clear_bit(RDMAXPRT_SQ_PENDING, &xprt->sc_flags))
return;
ib_req_notify_cq(xprt->sc_sq_cq, IB_CQ_NEXT_COMP);
atomic_inc(&rdma_stat_sq_poll); atomic_inc(&rdma_stat_sq_poll);
while ((ret = ib_poll_cq(cq, 1, &wc)) > 0) { while ((ret = ib_poll_cq(cq, 1, &wc)) > 0) {
ctxt = (struct svc_rdma_op_ctxt *)(unsigned long)wc.wr_id; ctxt = (struct svc_rdma_op_ctxt *)(unsigned long)wc.wr_id;
...@@ -1010,7 +1013,8 @@ int svc_rdma_send(struct svcxprt_rdma *xprt, struct ib_send_wr *wr) ...@@ -1010,7 +1013,8 @@ int svc_rdma_send(struct svcxprt_rdma *xprt, struct ib_send_wr *wr)
if (xprt->sc_sq_depth == atomic_read(&xprt->sc_sq_count)) { if (xprt->sc_sq_depth == atomic_read(&xprt->sc_sq_count)) {
spin_unlock_bh(&xprt->sc_lock); spin_unlock_bh(&xprt->sc_lock);
atomic_inc(&rdma_stat_sq_starve); atomic_inc(&rdma_stat_sq_starve);
/* See if we can reap some SQ WR */
/* See if we can opportunistically reap SQ WR to make room */
sq_cq_reap(xprt); sq_cq_reap(xprt);
/* Wait until SQ WR available if SQ still full */ /* Wait until SQ WR available if SQ still full */
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册