提交 fb9de970 编写于 作者: M Michael S. Tsirkin

ptr_ring: batch ring zeroing

A known weakness in ptr_ring design is that it does not handle well the
situation when ring is almost full: as entries are consumed they are
immediately used again by the producer, so consumer and producer are
writing to a shared cache line.

To fix this, add batching to consume calls: as entries are
consumed do not write NULL into the ring until we get
a multiple (in current implementation 2x) of cache lines
away from the producer. At that point, write them all out.

We do the write out in the reverse order to keep
producer from sharing cache with consumer for as long
as possible.

Writeout also triggers when ring wraps around - there's
no special reason to do this but it helps keep the code
a bit simpler.

What should we do if getting away from producer by 2 cache lines
would mean we are keeping the ring moe than half empty?
Maybe we should reduce the batching in this case,
current patch simply reduces the batching.

Notes:
- it is no longer true that a call to consume guarantees
  that the following call to produce will succeed.
  No users seem to assume that.
- batching can also in theory reduce the signalling rate:
  users that would previously send interrups to the producer
  to wake it up after consuming each entry would now only
  need to do this once in a batch.
  Doing this would be easy by returning a flag to the caller.
  No users seem to do signalling on consume yet so this was not
  implemented yet.
Signed-off-by: NMichael S. Tsirkin <mst@redhat.com>
Reviewed-by: NJesper Dangaard Brouer <brouer@redhat.com>
Acked-by: NJason Wang <jasowang@redhat.com>
上级 9ea762a5
...@@ -34,11 +34,13 @@ ...@@ -34,11 +34,13 @@
struct ptr_ring { struct ptr_ring {
int producer ____cacheline_aligned_in_smp; int producer ____cacheline_aligned_in_smp;
spinlock_t producer_lock; spinlock_t producer_lock;
int consumer ____cacheline_aligned_in_smp; int consumer_head ____cacheline_aligned_in_smp; /* next valid entry */
int consumer_tail; /* next entry to invalidate */
spinlock_t consumer_lock; spinlock_t consumer_lock;
/* Shared consumer/producer data */ /* Shared consumer/producer data */
/* Read-only by both the producer and the consumer */ /* Read-only by both the producer and the consumer */
int size ____cacheline_aligned_in_smp; /* max entries in queue */ int size ____cacheline_aligned_in_smp; /* max entries in queue */
int batch; /* number of entries to consume in a batch */
void **queue; void **queue;
}; };
...@@ -170,7 +172,7 @@ static inline int ptr_ring_produce_bh(struct ptr_ring *r, void *ptr) ...@@ -170,7 +172,7 @@ static inline int ptr_ring_produce_bh(struct ptr_ring *r, void *ptr)
static inline void *__ptr_ring_peek(struct ptr_ring *r) static inline void *__ptr_ring_peek(struct ptr_ring *r)
{ {
if (likely(r->size)) if (likely(r->size))
return r->queue[r->consumer]; return r->queue[r->consumer_head];
return NULL; return NULL;
} }
...@@ -231,9 +233,38 @@ static inline bool ptr_ring_empty_bh(struct ptr_ring *r) ...@@ -231,9 +233,38 @@ static inline bool ptr_ring_empty_bh(struct ptr_ring *r)
/* Must only be called after __ptr_ring_peek returned !NULL */ /* Must only be called after __ptr_ring_peek returned !NULL */
static inline void __ptr_ring_discard_one(struct ptr_ring *r) static inline void __ptr_ring_discard_one(struct ptr_ring *r)
{ {
r->queue[r->consumer++] = NULL; /* Fundamentally, what we want to do is update consumer
if (unlikely(r->consumer >= r->size)) * index and zero out the entry so producer can reuse it.
r->consumer = 0; * Doing it naively at each consume would be as simple as:
* r->queue[r->consumer++] = NULL;
* if (unlikely(r->consumer >= r->size))
* r->consumer = 0;
* but that is suboptimal when the ring is full as producer is writing
* out new entries in the same cache line. Defer these updates until a
* batch of entries has been consumed.
*/
int head = r->consumer_head++;
/* Once we have processed enough entries invalidate them in
* the ring all at once so producer can reuse their space in the ring.
* We also do this when we reach end of the ring - not mandatory
* but helps keep the implementation simple.
*/
if (unlikely(r->consumer_head - r->consumer_tail >= r->batch ||
r->consumer_head >= r->size)) {
/* Zero out entries in the reverse order: this way we touch the
* cache line that producer might currently be reading the last;
* producer won't make progress and touch other cache lines
* besides the first one until we write out all entries.
*/
while (likely(head >= r->consumer_tail))
r->queue[head--] = NULL;
r->consumer_tail = r->consumer_head;
}
if (unlikely(r->consumer_head >= r->size)) {
r->consumer_head = 0;
r->consumer_tail = 0;
}
} }
static inline void *__ptr_ring_consume(struct ptr_ring *r) static inline void *__ptr_ring_consume(struct ptr_ring *r)
...@@ -345,14 +376,27 @@ static inline void **__ptr_ring_init_queue_alloc(int size, gfp_t gfp) ...@@ -345,14 +376,27 @@ static inline void **__ptr_ring_init_queue_alloc(int size, gfp_t gfp)
return kzalloc(ALIGN(size * sizeof(void *), SMP_CACHE_BYTES), gfp); return kzalloc(ALIGN(size * sizeof(void *), SMP_CACHE_BYTES), gfp);
} }
static inline void __ptr_ring_set_size(struct ptr_ring *r, int size)
{
r->size = size;
r->batch = SMP_CACHE_BYTES * 2 / sizeof(*(r->queue));
/* We need to set batch at least to 1 to make logic
* in __ptr_ring_discard_one work correctly.
* Batching too much (because ring is small) would cause a lot of
* burstiness. Needs tuning, for now disable batching.
*/
if (r->batch > r->size / 2 || !r->batch)
r->batch = 1;
}
static inline int ptr_ring_init(struct ptr_ring *r, int size, gfp_t gfp) static inline int ptr_ring_init(struct ptr_ring *r, int size, gfp_t gfp)
{ {
r->queue = __ptr_ring_init_queue_alloc(size, gfp); r->queue = __ptr_ring_init_queue_alloc(size, gfp);
if (!r->queue) if (!r->queue)
return -ENOMEM; return -ENOMEM;
r->size = size; __ptr_ring_set_size(r, size);
r->producer = r->consumer = 0; r->producer = r->consumer_head = r->consumer_tail = 0;
spin_lock_init(&r->producer_lock); spin_lock_init(&r->producer_lock);
spin_lock_init(&r->consumer_lock); spin_lock_init(&r->consumer_lock);
...@@ -373,9 +417,10 @@ static inline void **__ptr_ring_swap_queue(struct ptr_ring *r, void **queue, ...@@ -373,9 +417,10 @@ static inline void **__ptr_ring_swap_queue(struct ptr_ring *r, void **queue,
else if (destroy) else if (destroy)
destroy(ptr); destroy(ptr);
r->size = size; __ptr_ring_set_size(r, size);
r->producer = producer; r->producer = producer;
r->consumer = 0; r->consumer_head = 0;
r->consumer_tail = 0;
old = r->queue; old = r->queue;
r->queue = queue; r->queue = queue;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册