提交 443be0e5 编写于 作者: S Sowmini Varadhan 提交者: David S. Miller

RDS: make sure not to loop forever inside rds_send_xmit

If a determined set of concurrent senders keep the send queue full,
we can loop forever inside rds_send_xmit.  This fix has two parts.

First we are dropping out of the while(1) loop after we've processed a
large batch of messages.

Second we add a generation number that gets bumped each time the
xmit bit lock is acquired.  If someone else has jumped in and
made progress in the queue, we skip our goto restart.

Original patch by Chris Mason.
Signed-off-by: NSowmini Varadhan <sowmini.varadhan@oracle.com>
Signed-off-by: NDavid S. Miller <davem@davemloft.net>
上级 1789b2c0
...@@ -193,6 +193,7 @@ static struct rds_connection *__rds_conn_create(__be32 laddr, __be32 faddr, ...@@ -193,6 +193,7 @@ static struct rds_connection *__rds_conn_create(__be32 laddr, __be32 faddr,
} }
atomic_set(&conn->c_state, RDS_CONN_DOWN); atomic_set(&conn->c_state, RDS_CONN_DOWN);
conn->c_send_gen = 0;
conn->c_reconnect_jiffies = 0; conn->c_reconnect_jiffies = 0;
INIT_DELAYED_WORK(&conn->c_send_w, rds_send_worker); INIT_DELAYED_WORK(&conn->c_send_w, rds_send_worker);
INIT_DELAYED_WORK(&conn->c_recv_w, rds_recv_worker); INIT_DELAYED_WORK(&conn->c_recv_w, rds_recv_worker);
......
...@@ -110,6 +110,7 @@ struct rds_connection { ...@@ -110,6 +110,7 @@ struct rds_connection {
void *c_transport_data; void *c_transport_data;
atomic_t c_state; atomic_t c_state;
unsigned long c_send_gen;
unsigned long c_flags; unsigned long c_flags;
unsigned long c_reconnect_jiffies; unsigned long c_reconnect_jiffies;
struct delayed_work c_send_w; struct delayed_work c_send_w;
......
...@@ -140,8 +140,11 @@ int rds_send_xmit(struct rds_connection *conn) ...@@ -140,8 +140,11 @@ int rds_send_xmit(struct rds_connection *conn)
struct scatterlist *sg; struct scatterlist *sg;
int ret = 0; int ret = 0;
LIST_HEAD(to_be_dropped); LIST_HEAD(to_be_dropped);
int batch_count;
unsigned long send_gen = 0;
restart: restart:
batch_count = 0;
/* /*
* sendmsg calls here after having queued its message on the send * sendmsg calls here after having queued its message on the send
...@@ -156,6 +159,17 @@ int rds_send_xmit(struct rds_connection *conn) ...@@ -156,6 +159,17 @@ int rds_send_xmit(struct rds_connection *conn)
goto out; goto out;
} }
/*
* we record the send generation after doing the xmit acquire.
* if someone else manages to jump in and do some work, we'll use
* this to avoid a goto restart farther down.
*
* The acquire_in_xmit() check above ensures that only one
* caller can increment c_send_gen at any time.
*/
conn->c_send_gen++;
send_gen = conn->c_send_gen;
/* /*
* rds_conn_shutdown() sets the conn state and then tests RDS_IN_XMIT, * rds_conn_shutdown() sets the conn state and then tests RDS_IN_XMIT,
* we do the opposite to avoid races. * we do the opposite to avoid races.
...@@ -202,6 +216,16 @@ int rds_send_xmit(struct rds_connection *conn) ...@@ -202,6 +216,16 @@ int rds_send_xmit(struct rds_connection *conn)
if (!rm) { if (!rm) {
unsigned int len; unsigned int len;
batch_count++;
/* we want to process as big a batch as we can, but
* we also want to avoid softlockups. If we've been
* through a lot of messages, lets back off and see
* if anyone else jumps in
*/
if (batch_count >= 1024)
goto over_batch;
spin_lock_irqsave(&conn->c_lock, flags); spin_lock_irqsave(&conn->c_lock, flags);
if (!list_empty(&conn->c_send_queue)) { if (!list_empty(&conn->c_send_queue)) {
...@@ -357,9 +381,9 @@ int rds_send_xmit(struct rds_connection *conn) ...@@ -357,9 +381,9 @@ int rds_send_xmit(struct rds_connection *conn)
} }
} }
over_batch:
if (conn->c_trans->xmit_complete) if (conn->c_trans->xmit_complete)
conn->c_trans->xmit_complete(conn); conn->c_trans->xmit_complete(conn);
release_in_xmit(conn); release_in_xmit(conn);
/* Nuke any messages we decided not to retransmit. */ /* Nuke any messages we decided not to retransmit. */
...@@ -380,10 +404,15 @@ int rds_send_xmit(struct rds_connection *conn) ...@@ -380,10 +404,15 @@ int rds_send_xmit(struct rds_connection *conn)
* If the transport cannot continue (i.e ret != 0), then it must * If the transport cannot continue (i.e ret != 0), then it must
* call us when more room is available, such as from the tx * call us when more room is available, such as from the tx
* completion handler. * completion handler.
*
* We have an extra generation check here so that if someone manages
* to jump in after our release_in_xmit, we'll see that they have done
* some work and we will skip our goto
*/ */
if (ret == 0) { if (ret == 0) {
smp_mb(); smp_mb();
if (!list_empty(&conn->c_send_queue)) { if (!list_empty(&conn->c_send_queue) &&
send_gen == conn->c_send_gen) {
rds_stats_inc(s_send_lock_queue_raced); rds_stats_inc(s_send_lock_queue_raced);
goto restart; goto restart;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册