提交 6b3ba914 编写于 作者: J John Fastabend 提交者: David S. Miller

net: sched: allow qdiscs to handle locking

This patch adds a flag for queueing disciplines to indicate the stack
does not need to use the qdisc lock to protect operations. This can
be used to build lockless scheduling algorithms and improving
performance.

The flag is checked in the tx path and the qdisc lock is only taken
if it is not set. For now use a conditional if statement. Later we
could be more aggressive if it proves worthwhile and use a static key
or wrap this in a likely().

Also the lockless case drops the TCQ_F_CAN_BYPASS logic. The reason
for this is synchronizing a qlen counter across threads proves to
cost more than doing the enqueue/dequeue operations when tested with
pktgen.
Signed-off-by: NJohn Fastabend <john.fastabend@gmail.com>
Signed-off-by: NDavid S. Miller <davem@davemloft.net>
上级 6c148184
...@@ -71,6 +71,7 @@ struct Qdisc { ...@@ -71,6 +71,7 @@ struct Qdisc {
* qdisc_tree_decrease_qlen() should stop. * qdisc_tree_decrease_qlen() should stop.
*/ */
#define TCQ_F_INVISIBLE 0x80 /* invisible by default in dump */ #define TCQ_F_INVISIBLE 0x80 /* invisible by default in dump */
#define TCQ_F_NOLOCK 0x100 /* qdisc does not require locking */
u32 limit; u32 limit;
const struct Qdisc_ops *ops; const struct Qdisc_ops *ops;
struct qdisc_size_table __rcu *stab; struct qdisc_size_table __rcu *stab;
......
...@@ -3162,6 +3162,21 @@ static inline int __dev_xmit_skb(struct sk_buff *skb, struct Qdisc *q, ...@@ -3162,6 +3162,21 @@ static inline int __dev_xmit_skb(struct sk_buff *skb, struct Qdisc *q,
int rc; int rc;
qdisc_calculate_pkt_len(skb, q); qdisc_calculate_pkt_len(skb, q);
if (q->flags & TCQ_F_NOLOCK) {
if (unlikely(test_bit(__QDISC_STATE_DEACTIVATED, &q->state))) {
__qdisc_drop(skb, &to_free);
rc = NET_XMIT_DROP;
} else {
rc = q->enqueue(skb, q, &to_free) & NET_XMIT_MASK;
__qdisc_run(q);
}
if (unlikely(to_free))
kfree_skb_list(to_free);
return rc;
}
/* /*
* Heuristic to force contended enqueues to serialize on a * Heuristic to force contended enqueues to serialize on a
* separate lock before trying to get qdisc main lock. * separate lock before trying to get qdisc main lock.
...@@ -4144,19 +4159,22 @@ static __latent_entropy void net_tx_action(struct softirq_action *h) ...@@ -4144,19 +4159,22 @@ static __latent_entropy void net_tx_action(struct softirq_action *h)
while (head) { while (head) {
struct Qdisc *q = head; struct Qdisc *q = head;
spinlock_t *root_lock; spinlock_t *root_lock = NULL;
head = head->next_sched; head = head->next_sched;
root_lock = qdisc_lock(q); if (!(q->flags & TCQ_F_NOLOCK)) {
spin_lock(root_lock); root_lock = qdisc_lock(q);
spin_lock(root_lock);
}
/* We need to make sure head->next_sched is read /* We need to make sure head->next_sched is read
* before clearing __QDISC_STATE_SCHED * before clearing __QDISC_STATE_SCHED
*/ */
smp_mb__before_atomic(); smp_mb__before_atomic();
clear_bit(__QDISC_STATE_SCHED, &q->state); clear_bit(__QDISC_STATE_SCHED, &q->state);
qdisc_run(q); qdisc_run(q);
spin_unlock(root_lock); if (root_lock)
spin_unlock(root_lock);
} }
} }
} }
......
...@@ -174,7 +174,8 @@ int sch_direct_xmit(struct sk_buff *skb, struct Qdisc *q, ...@@ -174,7 +174,8 @@ int sch_direct_xmit(struct sk_buff *skb, struct Qdisc *q,
int ret = NETDEV_TX_BUSY; int ret = NETDEV_TX_BUSY;
/* And release qdisc */ /* And release qdisc */
spin_unlock(root_lock); if (root_lock)
spin_unlock(root_lock);
/* Note that we validate skb (GSO, checksum, ...) outside of locks */ /* Note that we validate skb (GSO, checksum, ...) outside of locks */
if (validate) if (validate)
...@@ -187,10 +188,13 @@ int sch_direct_xmit(struct sk_buff *skb, struct Qdisc *q, ...@@ -187,10 +188,13 @@ int sch_direct_xmit(struct sk_buff *skb, struct Qdisc *q,
HARD_TX_UNLOCK(dev, txq); HARD_TX_UNLOCK(dev, txq);
} else { } else {
spin_lock(root_lock); if (root_lock)
spin_lock(root_lock);
return qdisc_qlen(q); return qdisc_qlen(q);
} }
spin_lock(root_lock);
if (root_lock)
spin_lock(root_lock);
if (dev_xmit_complete(ret)) { if (dev_xmit_complete(ret)) {
/* Driver sent out skb successfully or skb was consumed */ /* Driver sent out skb successfully or skb was consumed */
...@@ -231,9 +235,9 @@ int sch_direct_xmit(struct sk_buff *skb, struct Qdisc *q, ...@@ -231,9 +235,9 @@ int sch_direct_xmit(struct sk_buff *skb, struct Qdisc *q,
*/ */
static inline int qdisc_restart(struct Qdisc *q, int *packets) static inline int qdisc_restart(struct Qdisc *q, int *packets)
{ {
spinlock_t *root_lock = NULL;
struct netdev_queue *txq; struct netdev_queue *txq;
struct net_device *dev; struct net_device *dev;
spinlock_t *root_lock;
struct sk_buff *skb; struct sk_buff *skb;
bool validate; bool validate;
...@@ -242,7 +246,9 @@ static inline int qdisc_restart(struct Qdisc *q, int *packets) ...@@ -242,7 +246,9 @@ static inline int qdisc_restart(struct Qdisc *q, int *packets)
if (unlikely(!skb)) if (unlikely(!skb))
return 0; return 0;
root_lock = qdisc_lock(q); if (!(q->flags & TCQ_F_NOLOCK))
root_lock = qdisc_lock(q);
dev = qdisc_dev(q); dev = qdisc_dev(q);
txq = skb_get_tx_queue(dev, skb); txq = skb_get_tx_queue(dev, skb);
...@@ -880,14 +886,18 @@ static bool some_qdisc_is_busy(struct net_device *dev) ...@@ -880,14 +886,18 @@ static bool some_qdisc_is_busy(struct net_device *dev)
dev_queue = netdev_get_tx_queue(dev, i); dev_queue = netdev_get_tx_queue(dev, i);
q = dev_queue->qdisc_sleeping; q = dev_queue->qdisc_sleeping;
root_lock = qdisc_lock(q);
spin_lock_bh(root_lock); if (q->flags & TCQ_F_NOLOCK) {
val = test_bit(__QDISC_STATE_SCHED, &q->state);
} else {
root_lock = qdisc_lock(q);
spin_lock_bh(root_lock);
val = (qdisc_is_running(q) || val = (qdisc_is_running(q) ||
test_bit(__QDISC_STATE_SCHED, &q->state)); test_bit(__QDISC_STATE_SCHED, &q->state));
spin_unlock_bh(root_lock); spin_unlock_bh(root_lock);
}
if (val) if (val)
return true; return true;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册