提交 fc8b81a5 编写于 作者: D David S. Miller

Merge branch 'lockless-qdisc-series'

John Fastabend says:

====================
lockless qdisc series

This series adds support for building lockless qdiscs. This is
the result of noticing the qdisc lock is a common hot-spot in
perf analysis of the Linux network stack, especially when testing
with high packet per second rates. However, nothing is free and
most qdiscs rely on the qdisc lock for their data structures so
each qdisc must be converted on a case by case basis. In this
series, to kick things off, we make pfifo_fast, mq, and mqprio
lockless. Follow up series can address additional qdiscs as needed.
For example sch_tbf might be useful. To allow this the lockless
design is an opt-in flag. In some future utopia we convert all
qdiscs and we get to drop this case analysis, but in order to
make progress we live in the real-world.

There are also a handful of optimizations I have behind this
series and a few code cleanups that I couldn't figure out how
to fit neatly into this series with out increasing the patch
count. Once this is in additional patches can address this. The
most notable is in skb_dequeue we can push the consumer lock
out a bit and consume multiple skbs off the skb_array in pfifo
fast per iteration. Ideally we could push arrays of packets at
drivers as well but we would need the infrastructure for this.
The other notable improvement is to do less locking in the
overrun cases where bad tx queue list and gso_skb are being
hit. Although, nice in theory in practice this is the error
case and I haven't found a benchmark where this matters yet.

For testing...

My first test case uses multiple containers (via cilium) where
multiple client containers use 'wrk' to benchmark connections with
a server container running lighttpd. Where lighttpd is configured
to use multiple threads, one per core. Additionally this test has
a proxy agent running so all traffic takes an extra hop through a
proxy container. In these cases each TCP packet traverses the egress
qdisc layer at least four times and the ingress qdisc layer an
additional four times. This makes for a good stress test IMO, perf
details below.

The other micro-benchmark I run is injecting packets directly into
qdisc layer using pktgen. This uses the benchmark script,

 ./pktgen_bench_xmit_mode_queue_xmit.sh

Benchmarks taken in two cases, "base" running latest net-next no
changes to qdisc layer and "qdisc" tests run with qdisc lockless
updates. Numbers reported in req/sec. All virtual 'veth' devices
run with pfifo_fast in the qdisc test case.

`wrk -t16 -c $conns -d30 "http://[$SERVER_IP4]:80"`

conns    16      32     64   1024
-----------------------------------------------
base:   18831  20201  21393  29151
qdisc:  19309  21063  23899  29265

notice in all cases we see performance improvement when running
with qdisc case.

Microbenchmarks using pktgen are as follows,

`pktgen_bench_xmit_mode_queue_xmit.sh -t 1 -i eth2 -c 20000000

base(mq):          2.1Mpps
base(pfifo_fast):  2.1Mpps
qdisc(mq):         2.6Mpps
qdisc(pfifo_fast): 2.6Mpps

notice numbers are the same for mq and pfifo_fast because only
testing a single thread here. In both tests we see a nice bump
in performance gain. The key with 'mq' is it is already per
txq ring so contention is minimal in the above cases. Qdiscs
such as tbf or htb which have more contention will likely show
larger gains when/if lockless versions are implemented.

Thanks to everyone who helped with this work especially Daniel
Borkmann, Eric Dumazet and Willem de Bruijn for discussing the
design and reviewing versions of the code.

Changes from the RFC: dropped a couple patches off the end,
fixed a bug with skb_queue_walk_safe not unlinking skb in all
cases, fixed a lockdep splat with pfifo_fast_destroy not calling
*_bh lock variant, addressed _most_ of Willem's comments, there
was a bug in the bulk locking (final patch) of the RFC series.

@Willem, I left out lockdep annotation for a follow on series
to add lockdep more completely, rather than just in code I
touched.

Comments and feedback welcome.
====================
Signed-off-by: NDavid S. Miller <davem@davemloft.net>
......@@ -72,6 +72,11 @@ static inline bool __skb_array_empty(struct skb_array *a)
return !__ptr_ring_peek(&a->ring);
}
static inline struct sk_buff *__skb_array_peek(struct skb_array *a)
{
return __ptr_ring_peek(&a->ring);
}
static inline bool skb_array_empty(struct skb_array *a)
{
return ptr_ring_empty(&a->ring);
......
......@@ -49,6 +49,9 @@ int gnet_stats_copy_rate_est(struct gnet_dump *d,
int gnet_stats_copy_queue(struct gnet_dump *d,
struct gnet_stats_queue __percpu *cpu_q,
struct gnet_stats_queue *q, __u32 qlen);
void __gnet_stats_copy_queue(struct gnet_stats_queue *qstats,
const struct gnet_stats_queue __percpu *cpu_q,
const struct gnet_stats_queue *q, __u32 qlen);
int gnet_stats_copy_app(struct gnet_dump *d, void *st, int len);
int gnet_stats_finish_copy(struct gnet_dump *d);
......
......@@ -105,16 +105,18 @@ struct qdisc_rate_table *qdisc_get_rtab(struct tc_ratespec *r,
void qdisc_put_rtab(struct qdisc_rate_table *tab);
void qdisc_put_stab(struct qdisc_size_table *tab);
void qdisc_warn_nonwc(const char *txt, struct Qdisc *qdisc);
int sch_direct_xmit(struct sk_buff *skb, struct Qdisc *q,
struct net_device *dev, struct netdev_queue *txq,
spinlock_t *root_lock, bool validate);
bool sch_direct_xmit(struct sk_buff *skb, struct Qdisc *q,
struct net_device *dev, struct netdev_queue *txq,
spinlock_t *root_lock, bool validate);
void __qdisc_run(struct Qdisc *q);
static inline void qdisc_run(struct Qdisc *q)
{
if (qdisc_run_begin(q))
if (qdisc_run_begin(q)) {
__qdisc_run(q);
qdisc_run_end(q);
}
}
static inline __be16 tc_skb_protocol(const struct sk_buff *skb)
......
......@@ -71,6 +71,7 @@ struct Qdisc {
* qdisc_tree_decrease_qlen() should stop.
*/
#define TCQ_F_INVISIBLE 0x80 /* invisible by default in dump */
#define TCQ_F_NOLOCK 0x100 /* qdisc does not require locking */
u32 limit;
const struct Qdisc_ops *ops;
struct qdisc_size_table __rcu *stab;
......@@ -87,14 +88,14 @@ struct Qdisc {
/*
* For performance sake on SMP, we put highly modified fields at the end
*/
struct sk_buff *gso_skb ____cacheline_aligned_in_smp;
struct sk_buff_head gso_skb ____cacheline_aligned_in_smp;
struct qdisc_skb_head q;
struct gnet_stats_basic_packed bstats;
seqcount_t running;
struct gnet_stats_queue qstats;
unsigned long state;
struct Qdisc *next_sched;
struct sk_buff *skb_bad_txq;
struct sk_buff_head skb_bad_txq;
int padded;
refcount_t refcnt;
......@@ -179,6 +180,7 @@ struct Qdisc_ops {
const struct Qdisc_class_ops *cl_ops;
char id[IFNAMSIZ];
int priv_size;
unsigned int static_flags;
int (*enqueue)(struct sk_buff *skb,
struct Qdisc *sch,
......@@ -290,11 +292,31 @@ static inline void qdisc_cb_private_validate(const struct sk_buff *skb, int sz)
BUILD_BUG_ON(sizeof(qcb->data) < sz);
}
static inline int qdisc_qlen_cpu(const struct Qdisc *q)
{
return this_cpu_ptr(q->cpu_qstats)->qlen;
}
static inline int qdisc_qlen(const struct Qdisc *q)
{
return q->q.qlen;
}
static inline int qdisc_qlen_sum(const struct Qdisc *q)
{
__u32 qlen = 0;
int i;
if (q->flags & TCQ_F_NOLOCK) {
for_each_possible_cpu(i)
qlen += per_cpu_ptr(q->cpu_qstats, i)->qlen;
} else {
qlen = q->q.qlen;
}
return qlen;
}
static inline struct qdisc_skb_cb *qdisc_skb_cb(const struct sk_buff *skb)
{
return (struct qdisc_skb_cb *)skb->cb;
......@@ -631,12 +653,39 @@ static inline void qdisc_qstats_backlog_dec(struct Qdisc *sch,
sch->qstats.backlog -= qdisc_pkt_len(skb);
}
static inline void qdisc_qstats_cpu_backlog_dec(struct Qdisc *sch,
const struct sk_buff *skb)
{
this_cpu_sub(sch->cpu_qstats->backlog, qdisc_pkt_len(skb));
}
static inline void qdisc_qstats_backlog_inc(struct Qdisc *sch,
const struct sk_buff *skb)
{
sch->qstats.backlog += qdisc_pkt_len(skb);
}
static inline void qdisc_qstats_cpu_backlog_inc(struct Qdisc *sch,
const struct sk_buff *skb)
{
this_cpu_add(sch->cpu_qstats->backlog, qdisc_pkt_len(skb));
}
static inline void qdisc_qstats_cpu_qlen_inc(struct Qdisc *sch)
{
this_cpu_inc(sch->cpu_qstats->qlen);
}
static inline void qdisc_qstats_cpu_qlen_dec(struct Qdisc *sch)
{
this_cpu_dec(sch->cpu_qstats->qlen);
}
static inline void qdisc_qstats_cpu_requeues_inc(struct Qdisc *sch)
{
this_cpu_inc(sch->cpu_qstats->requeues);
}
static inline void __qdisc_qstats_drop(struct Qdisc *sch, int count)
{
sch->qstats.drops += count;
......@@ -767,26 +816,30 @@ static inline struct sk_buff *qdisc_peek_head(struct Qdisc *sch)
/* generic pseudo peek method for non-work-conserving qdisc */
static inline struct sk_buff *qdisc_peek_dequeued(struct Qdisc *sch)
{
struct sk_buff *skb = skb_peek(&sch->gso_skb);
/* we can reuse ->gso_skb because peek isn't called for root qdiscs */
if (!sch->gso_skb) {
sch->gso_skb = sch->dequeue(sch);
if (sch->gso_skb) {
if (!skb) {
skb = sch->dequeue(sch);
if (skb) {
__skb_queue_head(&sch->gso_skb, skb);
/* it's still part of the queue */
qdisc_qstats_backlog_inc(sch, sch->gso_skb);
qdisc_qstats_backlog_inc(sch, skb);
sch->q.qlen++;
}
}
return sch->gso_skb;
return skb;
}
/* use instead of qdisc->dequeue() for all qdiscs queried with ->peek() */
static inline struct sk_buff *qdisc_dequeue_peeked(struct Qdisc *sch)
{
struct sk_buff *skb = sch->gso_skb;
struct sk_buff *skb = skb_peek(&sch->gso_skb);
if (skb) {
sch->gso_skb = NULL;
skb = __skb_dequeue(&sch->gso_skb);
qdisc_qstats_backlog_dec(sch, skb);
sch->q.qlen--;
} else {
......@@ -844,6 +897,14 @@ static inline void rtnl_qdisc_drop(struct sk_buff *skb, struct Qdisc *sch)
qdisc_qstats_drop(sch);
}
static inline int qdisc_drop_cpu(struct sk_buff *skb, struct Qdisc *sch,
struct sk_buff **to_free)
{
__qdisc_drop(skb, to_free);
qdisc_qstats_cpu_drop(sch);
return NET_XMIT_DROP;
}
static inline int qdisc_drop(struct sk_buff *skb, struct Qdisc *sch,
struct sk_buff **to_free)
......
......@@ -3162,6 +3162,21 @@ static inline int __dev_xmit_skb(struct sk_buff *skb, struct Qdisc *q,
int rc;
qdisc_calculate_pkt_len(skb, q);
if (q->flags & TCQ_F_NOLOCK) {
if (unlikely(test_bit(__QDISC_STATE_DEACTIVATED, &q->state))) {
__qdisc_drop(skb, &to_free);
rc = NET_XMIT_DROP;
} else {
rc = q->enqueue(skb, q, &to_free) & NET_XMIT_MASK;
__qdisc_run(q);
}
if (unlikely(to_free))
kfree_skb_list(to_free);
return rc;
}
/*
* Heuristic to force contended enqueues to serialize on a
* separate lock before trying to get qdisc main lock.
......@@ -3192,9 +3207,9 @@ static inline int __dev_xmit_skb(struct sk_buff *skb, struct Qdisc *q,
contended = false;
}
__qdisc_run(q);
} else
qdisc_run_end(q);
}
qdisc_run_end(q);
rc = NET_XMIT_SUCCESS;
} else {
rc = q->enqueue(skb, q, &to_free) & NET_XMIT_MASK;
......@@ -3204,6 +3219,7 @@ static inline int __dev_xmit_skb(struct sk_buff *skb, struct Qdisc *q,
contended = false;
}
__qdisc_run(q);
qdisc_run_end(q);
}
}
spin_unlock(root_lock);
......@@ -4143,19 +4159,22 @@ static __latent_entropy void net_tx_action(struct softirq_action *h)
while (head) {
struct Qdisc *q = head;
spinlock_t *root_lock;
spinlock_t *root_lock = NULL;
head = head->next_sched;
root_lock = qdisc_lock(q);
spin_lock(root_lock);
if (!(q->flags & TCQ_F_NOLOCK)) {
root_lock = qdisc_lock(q);
spin_lock(root_lock);
}
/* We need to make sure head->next_sched is read
* before clearing __QDISC_STATE_SCHED
*/
smp_mb__before_atomic();
clear_bit(__QDISC_STATE_SCHED, &q->state);
qdisc_run(q);
spin_unlock(root_lock);
if (root_lock)
spin_unlock(root_lock);
}
}
}
......
......@@ -252,10 +252,10 @@ __gnet_stats_copy_queue_cpu(struct gnet_stats_queue *qstats,
}
}
static void __gnet_stats_copy_queue(struct gnet_stats_queue *qstats,
const struct gnet_stats_queue __percpu *cpu,
const struct gnet_stats_queue *q,
__u32 qlen)
void __gnet_stats_copy_queue(struct gnet_stats_queue *qstats,
const struct gnet_stats_queue __percpu *cpu,
const struct gnet_stats_queue *q,
__u32 qlen)
{
if (cpu) {
__gnet_stats_copy_queue_cpu(qstats, cpu);
......@@ -269,6 +269,7 @@ static void __gnet_stats_copy_queue(struct gnet_stats_queue *qstats,
qstats->qlen = qlen;
}
EXPORT_SYMBOL(__gnet_stats_copy_queue);
/**
* gnet_stats_copy_queue - copy queue statistics into statistics TLV
......
......@@ -797,7 +797,8 @@ static int tc_fill_qdisc(struct sk_buff *skb, struct Qdisc *q, u32 clid,
goto nla_put_failure;
if (q->ops->dump && q->ops->dump(q, skb) < 0)
goto nla_put_failure;
qlen = q->q.qlen;
qlen = qdisc_qlen_sum(q);
stab = rtnl_dereference(q->stab);
if (stab && qdisc_dump_stab(skb, stab) < 0)
......@@ -954,6 +955,11 @@ static int qdisc_graft(struct net_device *dev, struct Qdisc *parent,
} else {
const struct Qdisc_class_ops *cops = parent->ops->cl_ops;
/* Only support running class lockless if parent is lockless */
if (new && (new->flags & TCQ_F_NOLOCK) &&
parent && !(parent->flags & TCQ_F_NOLOCK))
new->flags &= ~TCQ_F_NOLOCK;
err = -EOPNOTSUPP;
if (cops && cops->graft) {
unsigned long cl = cops->find(parent, classid);
......
......@@ -26,6 +26,7 @@
#include <linux/list.h>
#include <linux/slab.h>
#include <linux/if_vlan.h>
#include <linux/skb_array.h>
#include <net/sch_generic.h>
#include <net/pkt_sched.h>
#include <net/dst.h>
......@@ -46,9 +47,70 @@ EXPORT_SYMBOL(default_qdisc_ops);
* - updates to tree and tree walking are only done under the rtnl mutex.
*/
static inline int dev_requeue_skb(struct sk_buff *skb, struct Qdisc *q)
static inline struct sk_buff *__skb_dequeue_bad_txq(struct Qdisc *q)
{
const struct netdev_queue *txq = q->dev_queue;
spinlock_t *lock = NULL;
struct sk_buff *skb;
if (q->flags & TCQ_F_NOLOCK) {
lock = qdisc_lock(q);
spin_lock(lock);
}
skb = skb_peek(&q->skb_bad_txq);
if (skb) {
/* check the reason of requeuing without tx lock first */
txq = skb_get_tx_queue(txq->dev, skb);
if (!netif_xmit_frozen_or_stopped(txq)) {
skb = __skb_dequeue(&q->skb_bad_txq);
if (qdisc_is_percpu_stats(q)) {
qdisc_qstats_cpu_backlog_dec(q, skb);
qdisc_qstats_cpu_qlen_dec(q);
} else {
qdisc_qstats_backlog_dec(q, skb);
q->q.qlen--;
}
} else {
skb = NULL;
}
}
if (lock)
spin_unlock(lock);
return skb;
}
static inline struct sk_buff *qdisc_dequeue_skb_bad_txq(struct Qdisc *q)
{
struct sk_buff *skb = skb_peek(&q->skb_bad_txq);
if (unlikely(skb))
skb = __skb_dequeue_bad_txq(q);
return skb;
}
static inline void qdisc_enqueue_skb_bad_txq(struct Qdisc *q,
struct sk_buff *skb)
{
q->gso_skb = skb;
spinlock_t *lock = NULL;
if (q->flags & TCQ_F_NOLOCK) {
lock = qdisc_lock(q);
spin_lock(lock);
}
__skb_queue_tail(&q->skb_bad_txq, skb);
if (lock)
spin_unlock(lock);
}
static inline int __dev_requeue_skb(struct sk_buff *skb, struct Qdisc *q)
{
__skb_queue_head(&q->gso_skb, skb);
q->qstats.requeues++;
qdisc_qstats_backlog_inc(q, skb);
q->q.qlen++; /* it's still part of the queue */
......@@ -57,6 +119,30 @@ static inline int dev_requeue_skb(struct sk_buff *skb, struct Qdisc *q)
return 0;
}
static inline int dev_requeue_skb_locked(struct sk_buff *skb, struct Qdisc *q)
{
spinlock_t *lock = qdisc_lock(q);
spin_lock(lock);
__skb_queue_tail(&q->gso_skb, skb);
spin_unlock(lock);
qdisc_qstats_cpu_requeues_inc(q);
qdisc_qstats_cpu_backlog_inc(q, skb);
qdisc_qstats_cpu_qlen_inc(q);
__netif_schedule(q);
return 0;
}
static inline int dev_requeue_skb(struct sk_buff *skb, struct Qdisc *q)
{
if (q->flags & TCQ_F_NOLOCK)
return dev_requeue_skb_locked(skb, q);
else
return __dev_requeue_skb(skb, q);
}
static void try_bulk_dequeue_skb(struct Qdisc *q,
struct sk_buff *skb,
const struct netdev_queue *txq,
......@@ -94,9 +180,15 @@ static void try_bulk_dequeue_skb_slow(struct Qdisc *q,
if (!nskb)
break;
if (unlikely(skb_get_queue_mapping(nskb) != mapping)) {
q->skb_bad_txq = nskb;
qdisc_qstats_backlog_inc(q, nskb);
q->q.qlen++;
qdisc_enqueue_skb_bad_txq(q, nskb);
if (qdisc_is_percpu_stats(q)) {
qdisc_qstats_cpu_backlog_inc(q, nskb);
qdisc_qstats_cpu_qlen_inc(q);
} else {
qdisc_qstats_backlog_inc(q, nskb);
q->q.qlen++;
}
break;
}
skb->next = nskb;
......@@ -112,40 +204,60 @@ static void try_bulk_dequeue_skb_slow(struct Qdisc *q,
static struct sk_buff *dequeue_skb(struct Qdisc *q, bool *validate,
int *packets)
{
struct sk_buff *skb = q->gso_skb;
const struct netdev_queue *txq = q->dev_queue;
struct sk_buff *skb = NULL;
*packets = 1;
if (unlikely(skb)) {
if (unlikely(!skb_queue_empty(&q->gso_skb))) {
spinlock_t *lock = NULL;
if (q->flags & TCQ_F_NOLOCK) {
lock = qdisc_lock(q);
spin_lock(lock);
}
skb = skb_peek(&q->gso_skb);
/* skb may be null if another cpu pulls gso_skb off in between
* empty check and lock.
*/
if (!skb) {
if (lock)
spin_unlock(lock);
goto validate;
}
/* skb in gso_skb were already validated */
*validate = false;
/* check the reason of requeuing without tx lock first */
txq = skb_get_tx_queue(txq->dev, skb);
if (!netif_xmit_frozen_or_stopped(txq)) {
q->gso_skb = NULL;
qdisc_qstats_backlog_dec(q, skb);
q->q.qlen--;
} else
skb = __skb_dequeue(&q->gso_skb);
if (qdisc_is_percpu_stats(q)) {
qdisc_qstats_cpu_backlog_dec(q, skb);
qdisc_qstats_cpu_qlen_dec(q);
} else {
qdisc_qstats_backlog_dec(q, skb);
q->q.qlen--;
}
} else {
skb = NULL;
goto trace;
}
*validate = true;
skb = q->skb_bad_txq;
if (unlikely(skb)) {
/* check the reason of requeuing without tx lock first */
txq = skb_get_tx_queue(txq->dev, skb);
if (!netif_xmit_frozen_or_stopped(txq)) {
q->skb_bad_txq = NULL;
qdisc_qstats_backlog_dec(q, skb);
q->q.qlen--;
goto bulk;
}
skb = NULL;
if (lock)
spin_unlock(lock);
goto trace;
}
if (!(q->flags & TCQ_F_ONETXQUEUE) ||
!netif_xmit_frozen_or_stopped(txq))
skb = q->dequeue(q);
validate:
*validate = true;
if ((q->flags & TCQ_F_ONETXQUEUE) &&
netif_xmit_frozen_or_stopped(txq))
return skb;
skb = qdisc_dequeue_skb_bad_txq(q);
if (unlikely(skb))
goto bulk;
skb = q->dequeue(q);
if (skb) {
bulk:
if (qdisc_may_bulk(q))
......@@ -164,17 +276,18 @@ static struct sk_buff *dequeue_skb(struct Qdisc *q, bool *validate,
* only one CPU can execute this function.
*
* Returns to the caller:
* 0 - queue is empty or throttled.
* >0 - queue is not empty.
* false - hardware queue frozen backoff
* true - feel free to send more pkts
*/
int sch_direct_xmit(struct sk_buff *skb, struct Qdisc *q,
struct net_device *dev, struct netdev_queue *txq,
spinlock_t *root_lock, bool validate)
bool sch_direct_xmit(struct sk_buff *skb, struct Qdisc *q,
struct net_device *dev, struct netdev_queue *txq,
spinlock_t *root_lock, bool validate)
{
int ret = NETDEV_TX_BUSY;
/* And release qdisc */
spin_unlock(root_lock);
if (root_lock)
spin_unlock(root_lock);
/* Note that we validate skb (GSO, checksum, ...) outside of locks */
if (validate)
......@@ -187,27 +300,28 @@ int sch_direct_xmit(struct sk_buff *skb, struct Qdisc *q,
HARD_TX_UNLOCK(dev, txq);
} else {
spin_lock(root_lock);
return qdisc_qlen(q);
if (root_lock)
spin_lock(root_lock);
return true;
}
spin_lock(root_lock);
if (dev_xmit_complete(ret)) {
/* Driver sent out skb successfully or skb was consumed */
ret = qdisc_qlen(q);
} else {
if (root_lock)
spin_lock(root_lock);
if (!dev_xmit_complete(ret)) {
/* Driver returned NETDEV_TX_BUSY - requeue skb */
if (unlikely(ret != NETDEV_TX_BUSY))
net_warn_ratelimited("BUG %s code %d qlen %d\n",
dev->name, ret, q->q.qlen);
ret = dev_requeue_skb(skb, q);
dev_requeue_skb(skb, q);
return false;
}
if (ret && netif_xmit_frozen_or_stopped(txq))
ret = 0;
return false;
return ret;
return true;
}
/*
......@@ -229,20 +343,22 @@ int sch_direct_xmit(struct sk_buff *skb, struct Qdisc *q,
* >0 - queue is not empty.
*
*/
static inline int qdisc_restart(struct Qdisc *q, int *packets)
static inline bool qdisc_restart(struct Qdisc *q, int *packets)
{
spinlock_t *root_lock = NULL;
struct netdev_queue *txq;
struct net_device *dev;
spinlock_t *root_lock;
struct sk_buff *skb;
bool validate;
/* Dequeue packet */
skb = dequeue_skb(q, &validate, packets);
if (unlikely(!skb))
return 0;
return false;
if (!(q->flags & TCQ_F_NOLOCK))
root_lock = qdisc_lock(q);
root_lock = qdisc_lock(q);
dev = qdisc_dev(q);
txq = skb_get_tx_queue(dev, skb);
......@@ -266,8 +382,6 @@ void __qdisc_run(struct Qdisc *q)
break;
}
}
qdisc_run_end(q);
}
unsigned long dev_trans_start(struct net_device *dev)
......@@ -465,93 +579,93 @@ static const u8 prio2band[TC_PRIO_MAX + 1] = {
/*
* Private data for a pfifo_fast scheduler containing:
* - queues for the three band
* - bitmap indicating which of the bands contain skbs
* - rings for priority bands
*/
struct pfifo_fast_priv {
u32 bitmap;
struct qdisc_skb_head q[PFIFO_FAST_BANDS];
struct skb_array q[PFIFO_FAST_BANDS];
};
/*
* Convert a bitmap to the first band number where an skb is queued, where:
* bitmap=0 means there are no skbs on any band.
* bitmap=1 means there is an skb on band 0.
* bitmap=7 means there are skbs on all 3 bands, etc.
*/
static const int bitmap2band[] = {-1, 0, 1, 0, 2, 0, 1, 0};
static inline struct qdisc_skb_head *band2list(struct pfifo_fast_priv *priv,
int band)
static inline struct skb_array *band2list(struct pfifo_fast_priv *priv,
int band)
{
return priv->q + band;
return &priv->q[band];
}
static int pfifo_fast_enqueue(struct sk_buff *skb, struct Qdisc *qdisc,
struct sk_buff **to_free)
{
if (qdisc->q.qlen < qdisc_dev(qdisc)->tx_queue_len) {
int band = prio2band[skb->priority & TC_PRIO_MAX];
struct pfifo_fast_priv *priv = qdisc_priv(qdisc);
struct qdisc_skb_head *list = band2list(priv, band);
priv->bitmap |= (1 << band);
qdisc->q.qlen++;
return __qdisc_enqueue_tail(skb, qdisc, list);
}
int band = prio2band[skb->priority & TC_PRIO_MAX];
struct pfifo_fast_priv *priv = qdisc_priv(qdisc);
struct skb_array *q = band2list(priv, band);
int err;
err = skb_array_produce(q, skb);
return qdisc_drop(skb, qdisc, to_free);
if (unlikely(err))
return qdisc_drop_cpu(skb, qdisc, to_free);
qdisc_qstats_cpu_qlen_inc(qdisc);
qdisc_qstats_cpu_backlog_inc(qdisc, skb);
return NET_XMIT_SUCCESS;
}
static struct sk_buff *pfifo_fast_dequeue(struct Qdisc *qdisc)
{
struct pfifo_fast_priv *priv = qdisc_priv(qdisc);
int band = bitmap2band[priv->bitmap];
if (likely(band >= 0)) {
struct qdisc_skb_head *qh = band2list(priv, band);
struct sk_buff *skb = __qdisc_dequeue_head(qh);
struct sk_buff *skb = NULL;
int band;
if (likely(skb != NULL)) {
qdisc_qstats_backlog_dec(qdisc, skb);
qdisc_bstats_update(qdisc, skb);
}
for (band = 0; band < PFIFO_FAST_BANDS && !skb; band++) {
struct skb_array *q = band2list(priv, band);
qdisc->q.qlen--;
if (qh->qlen == 0)
priv->bitmap &= ~(1 << band);
if (__skb_array_empty(q))
continue;
return skb;
skb = skb_array_consume_bh(q);
}
if (likely(skb)) {
qdisc_qstats_cpu_backlog_dec(qdisc, skb);
qdisc_bstats_cpu_update(qdisc, skb);
qdisc_qstats_cpu_qlen_dec(qdisc);
}
return NULL;
return skb;
}
static struct sk_buff *pfifo_fast_peek(struct Qdisc *qdisc)
{
struct pfifo_fast_priv *priv = qdisc_priv(qdisc);
int band = bitmap2band[priv->bitmap];
struct sk_buff *skb = NULL;
int band;
if (band >= 0) {
struct qdisc_skb_head *qh = band2list(priv, band);
for (band = 0; band < PFIFO_FAST_BANDS && !skb; band++) {
struct skb_array *q = band2list(priv, band);
return qh->head;
skb = __skb_array_peek(q);
}
return NULL;
return skb;
}
static void pfifo_fast_reset(struct Qdisc *qdisc)
{
int prio;
int i, band;
struct pfifo_fast_priv *priv = qdisc_priv(qdisc);
for (prio = 0; prio < PFIFO_FAST_BANDS; prio++)
__qdisc_reset_queue(band2list(priv, prio));
for (band = 0; band < PFIFO_FAST_BANDS; band++) {
struct skb_array *q = band2list(priv, band);
struct sk_buff *skb;
priv->bitmap = 0;
qdisc->qstats.backlog = 0;
qdisc->q.qlen = 0;
while ((skb = skb_array_consume_bh(q)) != NULL)
kfree_skb(skb);
}
for_each_possible_cpu(i) {
struct gnet_stats_queue *q = per_cpu_ptr(qdisc->cpu_qstats, i);
q->backlog = 0;
q->qlen = 0;
}
}
static int pfifo_fast_dump(struct Qdisc *qdisc, struct sk_buff *skb)
......@@ -569,17 +683,48 @@ static int pfifo_fast_dump(struct Qdisc *qdisc, struct sk_buff *skb)
static int pfifo_fast_init(struct Qdisc *qdisc, struct nlattr *opt)
{
int prio;
unsigned int qlen = qdisc_dev(qdisc)->tx_queue_len;
struct pfifo_fast_priv *priv = qdisc_priv(qdisc);
int prio;
for (prio = 0; prio < PFIFO_FAST_BANDS; prio++)
qdisc_skb_head_init(band2list(priv, prio));
/* guard against zero length rings */
if (!qlen)
return -EINVAL;
for (prio = 0; prio < PFIFO_FAST_BANDS; prio++) {
struct skb_array *q = band2list(priv, prio);
int err;
err = skb_array_init(q, qlen, GFP_KERNEL);
if (err)
return -ENOMEM;
}
/* Can by-pass the queue discipline */
qdisc->flags |= TCQ_F_CAN_BYPASS;
return 0;
}
static void pfifo_fast_destroy(struct Qdisc *sch)
{
struct pfifo_fast_priv *priv = qdisc_priv(sch);
int prio;
for (prio = 0; prio < PFIFO_FAST_BANDS; prio++) {
struct skb_array *q = band2list(priv, prio);
/* NULL ring is possible if destroy path is due to a failed
* skb_array_init() in pfifo_fast_init() case.
*/
if (!&q->ring.queue)
continue;
/* Destroy ring but no need to kfree_skb because a call to
* pfifo_fast_reset() has already done that work.
*/
ptr_ring_cleanup(&q->ring, NULL);
}
}
struct Qdisc_ops pfifo_fast_ops __read_mostly = {
.id = "pfifo_fast",
.priv_size = sizeof(struct pfifo_fast_priv),
......@@ -587,9 +732,11 @@ struct Qdisc_ops pfifo_fast_ops __read_mostly = {
.dequeue = pfifo_fast_dequeue,
.peek = pfifo_fast_peek,
.init = pfifo_fast_init,
.destroy = pfifo_fast_destroy,
.reset = pfifo_fast_reset,
.dump = pfifo_fast_dump,
.owner = THIS_MODULE,
.static_flags = TCQ_F_NOLOCK | TCQ_F_CPUSTATS,
};
EXPORT_SYMBOL(pfifo_fast_ops);
......@@ -627,9 +774,24 @@ struct Qdisc *qdisc_alloc(struct netdev_queue *dev_queue,
sch = (struct Qdisc *) QDISC_ALIGN((unsigned long) p);
sch->padded = (char *) sch - (char *) p;
}
__skb_queue_head_init(&sch->gso_skb);
__skb_queue_head_init(&sch->skb_bad_txq);
qdisc_skb_head_init(&sch->q);
spin_lock_init(&sch->q.lock);
if (ops->static_flags & TCQ_F_CPUSTATS) {
sch->cpu_bstats =
netdev_alloc_pcpu_stats(struct gnet_stats_basic_cpu);
if (!sch->cpu_bstats)
goto errout1;
sch->cpu_qstats = alloc_percpu(struct gnet_stats_queue);
if (!sch->cpu_qstats) {
free_percpu(sch->cpu_bstats);
goto errout1;
}
}
spin_lock_init(&sch->busylock);
lockdep_set_class(&sch->busylock,
dev->qdisc_tx_busylock ?: &qdisc_tx_busylock);
......@@ -639,6 +801,7 @@ struct Qdisc *qdisc_alloc(struct netdev_queue *dev_queue,
dev->qdisc_running_key ?: &qdisc_running_key);
sch->ops = ops;
sch->flags = ops->static_flags;
sch->enqueue = ops->enqueue;
sch->dequeue = ops->dequeue;
sch->dev_queue = dev_queue;
......@@ -646,6 +809,8 @@ struct Qdisc *qdisc_alloc(struct netdev_queue *dev_queue,
refcount_set(&sch->refcnt, 1);
return sch;
errout1:
kfree(p);
errout:
return ERR_PTR(err);
}
......@@ -679,17 +844,21 @@ EXPORT_SYMBOL(qdisc_create_dflt);
void qdisc_reset(struct Qdisc *qdisc)
{
const struct Qdisc_ops *ops = qdisc->ops;
struct sk_buff *skb, *tmp;
if (ops->reset)
ops->reset(qdisc);
kfree_skb(qdisc->skb_bad_txq);
qdisc->skb_bad_txq = NULL;
skb_queue_walk_safe(&qdisc->gso_skb, skb, tmp) {
__skb_unlink(skb, &qdisc->gso_skb);
kfree_skb_list(skb);
}
if (qdisc->gso_skb) {
kfree_skb_list(qdisc->gso_skb);
qdisc->gso_skb = NULL;
skb_queue_walk_safe(&qdisc->skb_bad_txq, skb, tmp) {
__skb_unlink(skb, &qdisc->skb_bad_txq);
kfree_skb_list(skb);
}
qdisc->q.qlen = 0;
qdisc->qstats.backlog = 0;
}
......@@ -708,6 +877,7 @@ static void qdisc_free(struct Qdisc *qdisc)
void qdisc_destroy(struct Qdisc *qdisc)
{
const struct Qdisc_ops *ops = qdisc->ops;
struct sk_buff *skb, *tmp;
if (qdisc->flags & TCQ_F_BUILTIN ||
!refcount_dec_and_test(&qdisc->refcnt))
......@@ -727,8 +897,16 @@ void qdisc_destroy(struct Qdisc *qdisc)
module_put(ops->owner);
dev_put(qdisc_dev(qdisc));
kfree_skb_list(qdisc->gso_skb);
kfree_skb(qdisc->skb_bad_txq);
skb_queue_walk_safe(&qdisc->gso_skb, skb, tmp) {
__skb_unlink(skb, &qdisc->gso_skb);
kfree_skb_list(skb);
}
skb_queue_walk_safe(&qdisc->skb_bad_txq, skb, tmp) {
__skb_unlink(skb, &qdisc->skb_bad_txq);
kfree_skb_list(skb);
}
qdisc_free(qdisc);
}
EXPORT_SYMBOL(qdisc_destroy);
......@@ -743,10 +921,6 @@ struct Qdisc *dev_graft_qdisc(struct netdev_queue *dev_queue,
root_lock = qdisc_lock(oqdisc);
spin_lock_bh(root_lock);
/* Prune old scheduler */
if (oqdisc && refcount_read(&oqdisc->refcnt) <= 1)
qdisc_reset(oqdisc);
/* ... and graft new one */
if (qdisc == NULL)
qdisc = &noop_qdisc;
......@@ -882,14 +1056,18 @@ static bool some_qdisc_is_busy(struct net_device *dev)
dev_queue = netdev_get_tx_queue(dev, i);
q = dev_queue->qdisc_sleeping;
root_lock = qdisc_lock(q);
spin_lock_bh(root_lock);
if (q->flags & TCQ_F_NOLOCK) {
val = test_bit(__QDISC_STATE_SCHED, &q->state);
} else {
root_lock = qdisc_lock(q);
spin_lock_bh(root_lock);
val = (qdisc_is_running(q) ||
test_bit(__QDISC_STATE_SCHED, &q->state));
val = (qdisc_is_running(q) ||
test_bit(__QDISC_STATE_SCHED, &q->state));
spin_unlock_bh(root_lock);
spin_unlock_bh(root_lock);
}
if (val)
return true;
......@@ -897,6 +1075,16 @@ static bool some_qdisc_is_busy(struct net_device *dev)
return false;
}
static void dev_qdisc_reset(struct net_device *dev,
struct netdev_queue *dev_queue,
void *none)
{
struct Qdisc *qdisc = dev_queue->qdisc_sleeping;
if (qdisc)
qdisc_reset(qdisc);
}
/**
* dev_deactivate_many - deactivate transmissions on several devices
* @head: list of devices to deactivate
......@@ -907,7 +1095,6 @@ static bool some_qdisc_is_busy(struct net_device *dev)
void dev_deactivate_many(struct list_head *head)
{
struct net_device *dev;
bool sync_needed = false;
list_for_each_entry(dev, head, close_list) {
netdev_for_each_tx_queue(dev, dev_deactivate_queue,
......@@ -917,20 +1104,25 @@ void dev_deactivate_many(struct list_head *head)
&noop_qdisc);
dev_watchdog_down(dev);
sync_needed |= !dev->dismantle;
}
/* Wait for outstanding qdisc-less dev_queue_xmit calls.
* This is avoided if all devices are in dismantle phase :
* Caller will call synchronize_net() for us
*/
if (sync_needed)
synchronize_net();
synchronize_net();
/* Wait for outstanding qdisc_run calls. */
list_for_each_entry(dev, head, close_list)
list_for_each_entry(dev, head, close_list) {
while (some_qdisc_is_busy(dev))
yield();
/* The new qdisc is assigned at this point so we can safely
* unwind stale skb lists and qdisc statistics
*/
netdev_for_each_tx_queue(dev, dev_qdisc_reset, NULL);
if (dev_ingress_queue(dev))
dev_qdisc_reset(dev, dev_ingress_queue(dev), NULL);
}
}
void dev_deactivate(struct net_device *dev)
......@@ -951,6 +1143,8 @@ static void dev_init_scheduler_queue(struct net_device *dev,
rcu_assign_pointer(dev_queue->qdisc, qdisc);
dev_queue->qdisc_sleeping = qdisc;
__skb_queue_head_init(&qdisc->gso_skb);
__skb_queue_head_init(&qdisc->skb_bad_txq);
}
void dev_init_scheduler(struct net_device *dev)
......
......@@ -17,6 +17,7 @@
#include <linux/skbuff.h>
#include <net/netlink.h>
#include <net/pkt_sched.h>
#include <net/sch_generic.h>
struct mq_sched {
struct Qdisc **qdiscs;
......@@ -97,23 +98,42 @@ static int mq_dump(struct Qdisc *sch, struct sk_buff *skb)
struct net_device *dev = qdisc_dev(sch);
struct Qdisc *qdisc;
unsigned int ntx;
__u32 qlen = 0;
sch->q.qlen = 0;
memset(&sch->bstats, 0, sizeof(sch->bstats));
memset(&sch->qstats, 0, sizeof(sch->qstats));
/* MQ supports lockless qdiscs. However, statistics accounting needs
* to account for all, none, or a mix of locked and unlocked child
* qdiscs. Percpu stats are added to counters in-band and locking
* qdisc totals are added at end.
*/
for (ntx = 0; ntx < dev->num_tx_queues; ntx++) {
qdisc = netdev_get_tx_queue(dev, ntx)->qdisc_sleeping;
spin_lock_bh(qdisc_lock(qdisc));
sch->q.qlen += qdisc->q.qlen;
sch->bstats.bytes += qdisc->bstats.bytes;
sch->bstats.packets += qdisc->bstats.packets;
sch->qstats.backlog += qdisc->qstats.backlog;
sch->qstats.drops += qdisc->qstats.drops;
sch->qstats.requeues += qdisc->qstats.requeues;
sch->qstats.overlimits += qdisc->qstats.overlimits;
if (qdisc_is_percpu_stats(qdisc)) {
qlen = qdisc_qlen_sum(qdisc);
__gnet_stats_copy_basic(NULL, &sch->bstats,
qdisc->cpu_bstats,
&qdisc->bstats);
__gnet_stats_copy_queue(&sch->qstats,
qdisc->cpu_qstats,
&qdisc->qstats, qlen);
} else {
sch->q.qlen += qdisc->q.qlen;
sch->bstats.bytes += qdisc->bstats.bytes;
sch->bstats.packets += qdisc->bstats.packets;
sch->qstats.backlog += qdisc->qstats.backlog;
sch->qstats.drops += qdisc->qstats.drops;
sch->qstats.requeues += qdisc->qstats.requeues;
sch->qstats.overlimits += qdisc->qstats.overlimits;
}
spin_unlock_bh(qdisc_lock(qdisc));
}
return 0;
}
......
......@@ -388,22 +388,40 @@ static int mqprio_dump(struct Qdisc *sch, struct sk_buff *skb)
struct nlattr *nla = (struct nlattr *)skb_tail_pointer(skb);
struct tc_mqprio_qopt opt = { 0 };
struct Qdisc *qdisc;
unsigned int i;
unsigned int ntx, tc;
sch->q.qlen = 0;
memset(&sch->bstats, 0, sizeof(sch->bstats));
memset(&sch->qstats, 0, sizeof(sch->qstats));
for (i = 0; i < dev->num_tx_queues; i++) {
qdisc = rtnl_dereference(netdev_get_tx_queue(dev, i)->qdisc);
/* MQ supports lockless qdiscs. However, statistics accounting needs
* to account for all, none, or a mix of locked and unlocked child
* qdiscs. Percpu stats are added to counters in-band and locking
* qdisc totals are added at end.
*/
for (ntx = 0; ntx < dev->num_tx_queues; ntx++) {
qdisc = netdev_get_tx_queue(dev, ntx)->qdisc_sleeping;
spin_lock_bh(qdisc_lock(qdisc));
sch->q.qlen += qdisc->q.qlen;
sch->bstats.bytes += qdisc->bstats.bytes;
sch->bstats.packets += qdisc->bstats.packets;
sch->qstats.backlog += qdisc->qstats.backlog;
sch->qstats.drops += qdisc->qstats.drops;
sch->qstats.requeues += qdisc->qstats.requeues;
sch->qstats.overlimits += qdisc->qstats.overlimits;
if (qdisc_is_percpu_stats(qdisc)) {
__u32 qlen = qdisc_qlen_sum(qdisc);
__gnet_stats_copy_basic(NULL, &sch->bstats,
qdisc->cpu_bstats,
&qdisc->bstats);
__gnet_stats_copy_queue(&sch->qstats,
qdisc->cpu_qstats,
&qdisc->qstats, qlen);
} else {
sch->q.qlen += qdisc->q.qlen;
sch->bstats.bytes += qdisc->bstats.bytes;
sch->bstats.packets += qdisc->bstats.packets;
sch->qstats.backlog += qdisc->qstats.backlog;
sch->qstats.drops += qdisc->qstats.drops;
sch->qstats.requeues += qdisc->qstats.requeues;
sch->qstats.overlimits += qdisc->qstats.overlimits;
}
spin_unlock_bh(qdisc_lock(qdisc));
}
......@@ -411,9 +429,9 @@ static int mqprio_dump(struct Qdisc *sch, struct sk_buff *skb)
memcpy(opt.prio_tc_map, dev->prio_tc_map, sizeof(opt.prio_tc_map));
opt.hw = priv->hw_offload;
for (i = 0; i < netdev_get_num_tc(dev); i++) {
opt.count[i] = dev->tc_to_txq[i].count;
opt.offset[i] = dev->tc_to_txq[i].offset;
for (tc = 0; tc < netdev_get_num_tc(dev); tc++) {
opt.count[tc] = dev->tc_to_txq[tc].count;
opt.offset[tc] = dev->tc_to_txq[tc].offset;
}
if (nla_put(skb, TCA_OPTIONS, NLA_ALIGN(sizeof(opt)), &opt))
......@@ -495,7 +513,6 @@ static int mqprio_dump_class_stats(struct Qdisc *sch, unsigned long cl,
if (cl >= TC_H_MIN_PRIORITY) {
int i;
__u32 qlen = 0;
struct Qdisc *qdisc;
struct gnet_stats_queue qstats = {0};
struct gnet_stats_basic_packed bstats = {0};
struct net_device *dev = qdisc_dev(sch);
......@@ -511,18 +528,26 @@ static int mqprio_dump_class_stats(struct Qdisc *sch, unsigned long cl,
for (i = tc.offset; i < tc.offset + tc.count; i++) {
struct netdev_queue *q = netdev_get_tx_queue(dev, i);
struct Qdisc *qdisc = rtnl_dereference(q->qdisc);
struct gnet_stats_basic_cpu __percpu *cpu_bstats = NULL;
struct gnet_stats_queue __percpu *cpu_qstats = NULL;
qdisc = rtnl_dereference(q->qdisc);
spin_lock_bh(qdisc_lock(qdisc));
qlen += qdisc->q.qlen;
bstats.bytes += qdisc->bstats.bytes;
bstats.packets += qdisc->bstats.packets;
qstats.backlog += qdisc->qstats.backlog;
qstats.drops += qdisc->qstats.drops;
qstats.requeues += qdisc->qstats.requeues;
qstats.overlimits += qdisc->qstats.overlimits;
if (qdisc_is_percpu_stats(qdisc)) {
cpu_bstats = qdisc->cpu_bstats;
cpu_qstats = qdisc->cpu_qstats;
}
qlen = qdisc_qlen_sum(qdisc);
__gnet_stats_copy_basic(NULL, &sch->bstats,
cpu_bstats, &qdisc->bstats);
__gnet_stats_copy_queue(&sch->qstats,
cpu_qstats,
&qdisc->qstats,
qlen);
spin_unlock_bh(qdisc_lock(qdisc));
}
/* Reclaim root sleeping lock before completing stats */
if (d->lock)
spin_lock_bh(d->lock);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册