提交 4f8126bb 编写于 作者: G Gabriel Krisman Bertazi 提交者: Jens Axboe

sbitmap: Use single per-bitmap counting to wake up queued tags

sbitmap suffers from code complexity, as demonstrated by recent fixes,
and eventual lost wake ups on nested I/O completion.  The later happens,
from what I understand, due to the non-atomic nature of the updates to
wait_cnt, which needs to be subtracted and eventually reset when equal
to zero.  This two step process can eventually miss an update when a
nested completion happens to interrupt the CPU in between the wait_cnt
updates.  This is very hard to fix, as shown by the recent changes to
this code.

The code complexity arises mostly from the corner cases to avoid missed
wakes in this scenario.  In addition, the handling of wake_batch
recalculation plus the synchronization with sbq_queue_wake_up is
non-trivial.

This patchset implements the idea originally proposed by Jan [1], which
removes the need for the two-step updates of wait_cnt.  This is done by
tracking the number of completions and wakeups in always increasing,
per-bitmap counters.  Instead of having to reset the wait_cnt when it
reaches zero, we simply keep counting, and attempt to wake up N threads
in a single wait queue whenever there is enough space for a batch.
Waking up less than batch_wake shouldn't be a problem, because we
haven't changed the conditions for wake up, and the existing batch
calculation guarantees at least enough remaining completions to wake up
a batch for each queue at any time.

Performance-wise, one should expect very similar performance to the
original algorithm for the case where there is no queueing.  In both the
old algorithm and this implementation, the first thing is to check
ws_active, which bails out if there is no queueing to be managed. In the
new code, we took care to avoid accounting completions and wakeups when
there is no queueing, to not pay the cost of atomic operations
unnecessarily, since it doesn't skew the numbers.

For more interesting cases, where there is queueing, we need to take
into account the cross-communication of the atomic operations.  I've
been benchmarking by running parallel fio jobs against a single hctx
nullb in different hardware queue depth scenarios, and verifying both
IOPS and queueing.

Each experiment was repeated 5 times on a 20-CPU box, with 20 parallel
jobs. fio was issuing fixed-size randwrites with qd=64 against nullb,
varying only the hardware queue length per test.

queue size 2                 4                 8                 16                 32                 64
6.1-rc2    1681.1K (1.6K)    2633.0K (12.7K)   6940.8K (16.3K)   8172.3K (617.5K)   8391.7K (367.1K)   8606.1K (351.2K)
patched    1721.8K (15.1K)   3016.7K (3.8K)    7543.0K (89.4K)   8132.5K (303.4K)   8324.2K (230.6K)   8401.8K (284.7K)

The following is a similar experiment, ran against a nullb with a single
bitmap shared by 20 hctx spread across 2 NUMA nodes. This has 40
parallel fio jobs operating on the same device

queue size 2 	             4                 8              	16             	    32		       64
6.1-rc2	   1081.0K (2.3K)    957.2K (1.5K)     1699.1K (5.7K) 	6178.2K (124.6K)    12227.9K (37.7K)   13286.6K (92.9K)
patched	   1081.8K (2.8K)    1316.5K (5.4K)    2364.4K (1.8K) 	6151.4K  (20.0K)    11893.6K (17.5K)   12385.6K (18.4K)

It has also survived blktests and a 12h-stress run against nullb. I also
ran the code against nvme and a scsi SSD, and I didn't observe
performance regression in those. If there are other tests you think I
should run, please let me know and I will follow up with results.

[1] https://lore.kernel.org/all/aef9de29-e9f5-259a-f8be-12d1b734e72@google.com/

Cc: Hugh Dickins <hughd@google.com>
Cc: Keith Busch <kbusch@kernel.org>
Cc: Liu Song <liusong@linux.alibaba.com>
Suggested-by: NJan Kara <jack@suse.cz>
Signed-off-by: NGabriel Krisman Bertazi <krisman@suse.de>
Link: https://lore.kernel.org/r/20221105231055.25953-1-krisman@suse.deSigned-off-by: NJens Axboe <axboe@kernel.dk>
上级 ee9d5521
......@@ -86,11 +86,6 @@ struct sbitmap {
* struct sbq_wait_state - Wait queue in a &struct sbitmap_queue.
*/
struct sbq_wait_state {
/**
* @wait_cnt: Number of frees remaining before we wake up.
*/
atomic_t wait_cnt;
/**
* @wait: Wait queue.
*/
......@@ -138,6 +133,17 @@ struct sbitmap_queue {
* sbitmap_queue_get_shallow()
*/
unsigned int min_shallow_depth;
/**
* @completion_cnt: Number of bits cleared passed to the
* wakeup function.
*/
atomic_t completion_cnt;
/**
* @wakeup_cnt: Number of thread wake ups issued.
*/
atomic_t wakeup_cnt;
};
/**
......
......@@ -434,6 +434,8 @@ int sbitmap_queue_init_node(struct sbitmap_queue *sbq, unsigned int depth,
sbq->wake_batch = sbq_calc_wake_batch(sbq, depth);
atomic_set(&sbq->wake_index, 0);
atomic_set(&sbq->ws_active, 0);
atomic_set(&sbq->completion_cnt, 0);
atomic_set(&sbq->wakeup_cnt, 0);
sbq->ws = kzalloc_node(SBQ_WAIT_QUEUES * sizeof(*sbq->ws), flags, node);
if (!sbq->ws) {
......@@ -441,40 +443,21 @@ int sbitmap_queue_init_node(struct sbitmap_queue *sbq, unsigned int depth,
return -ENOMEM;
}
for (i = 0; i < SBQ_WAIT_QUEUES; i++) {
for (i = 0; i < SBQ_WAIT_QUEUES; i++)
init_waitqueue_head(&sbq->ws[i].wait);
atomic_set(&sbq->ws[i].wait_cnt, sbq->wake_batch);
}
return 0;
}
EXPORT_SYMBOL_GPL(sbitmap_queue_init_node);
static inline void __sbitmap_queue_update_wake_batch(struct sbitmap_queue *sbq,
unsigned int wake_batch)
{
int i;
if (sbq->wake_batch != wake_batch) {
WRITE_ONCE(sbq->wake_batch, wake_batch);
/*
* Pairs with the memory barrier in sbitmap_queue_wake_up()
* to ensure that the batch size is updated before the wait
* counts.
*/
smp_mb();
for (i = 0; i < SBQ_WAIT_QUEUES; i++)
atomic_set(&sbq->ws[i].wait_cnt, 1);
}
}
static void sbitmap_queue_update_wake_batch(struct sbitmap_queue *sbq,
unsigned int depth)
{
unsigned int wake_batch;
wake_batch = sbq_calc_wake_batch(sbq, depth);
__sbitmap_queue_update_wake_batch(sbq, wake_batch);
if (sbq->wake_batch != wake_batch)
WRITE_ONCE(sbq->wake_batch, wake_batch);
}
void sbitmap_queue_recalculate_wake_batch(struct sbitmap_queue *sbq,
......@@ -488,7 +471,8 @@ void sbitmap_queue_recalculate_wake_batch(struct sbitmap_queue *sbq,
wake_batch = clamp_val(depth / SBQ_WAIT_QUEUES,
min_batch, SBQ_WAKE_BATCH);
__sbitmap_queue_update_wake_batch(sbq, wake_batch);
WRITE_ONCE(sbq->wake_batch, wake_batch);
}
EXPORT_SYMBOL_GPL(sbitmap_queue_recalculate_wake_batch);
......@@ -587,7 +571,7 @@ static struct sbq_wait_state *sbq_wake_ptr(struct sbitmap_queue *sbq)
for (i = 0; i < SBQ_WAIT_QUEUES; i++) {
struct sbq_wait_state *ws = &sbq->ws[wake_index];
if (waitqueue_active(&ws->wait) && atomic_read(&ws->wait_cnt)) {
if (waitqueue_active(&ws->wait)) {
if (wake_index != atomic_read(&sbq->wake_index))
atomic_set(&sbq->wake_index, wake_index);
return ws;
......@@ -599,83 +583,31 @@ static struct sbq_wait_state *sbq_wake_ptr(struct sbitmap_queue *sbq)
return NULL;
}
static bool __sbq_wake_up(struct sbitmap_queue *sbq, int *nr)
void sbitmap_queue_wake_up(struct sbitmap_queue *sbq, int nr)
{
struct sbq_wait_state *ws;
unsigned int wake_batch;
int wait_cnt, cur, sub;
bool ret;
unsigned int wake_batch = READ_ONCE(sbq->wake_batch);
struct sbq_wait_state *ws = NULL;
unsigned int wakeups;
if (*nr <= 0)
return false;
if (!atomic_read(&sbq->ws_active))
return;
ws = sbq_wake_ptr(sbq);
if (!ws)
return false;
atomic_add(nr, &sbq->completion_cnt);
wakeups = atomic_read(&sbq->wakeup_cnt);
cur = atomic_read(&ws->wait_cnt);
do {
/*
* For concurrent callers of this, callers should call this
* function again to wakeup a new batch on a different 'ws'.
*/
if (cur == 0)
return true;
sub = min(*nr, cur);
wait_cnt = cur - sub;
} while (!atomic_try_cmpxchg(&ws->wait_cnt, &cur, wait_cnt));
/*
* If we decremented queue without waiters, retry to avoid lost
* wakeups.
*/
if (wait_cnt > 0)
return !waitqueue_active(&ws->wait);
if (atomic_read(&sbq->completion_cnt) - wakeups < wake_batch)
return;
*nr -= sub;
/*
* When wait_cnt == 0, we have to be particularly careful as we are
* responsible to reset wait_cnt regardless whether we've actually
* woken up anybody. But in case we didn't wakeup anybody, we still
* need to retry.
*/
ret = !waitqueue_active(&ws->wait);
wake_batch = READ_ONCE(sbq->wake_batch);
if (!ws) {
ws = sbq_wake_ptr(sbq);
if (!ws)
return;
}
} while (!atomic_try_cmpxchg(&sbq->wakeup_cnt,
&wakeups, wakeups + wake_batch));
/*
* Wake up first in case that concurrent callers decrease wait_cnt
* while waitqueue is empty.
*/
wake_up_nr(&ws->wait, wake_batch);
/*
* Pairs with the memory barrier in sbitmap_queue_resize() to
* ensure that we see the batch size update before the wait
* count is reset.
*
* Also pairs with the implicit barrier between decrementing wait_cnt
* and checking for waitqueue_active() to make sure waitqueue_active()
* sees result of the wakeup if atomic_dec_return() has seen the result
* of atomic_set().
*/
smp_mb__before_atomic();
/*
* Increase wake_index before updating wait_cnt, otherwise concurrent
* callers can see valid wait_cnt in old waitqueue, which can cause
* invalid wakeup on the old waitqueue.
*/
sbq_index_atomic_inc(&sbq->wake_index);
atomic_set(&ws->wait_cnt, wake_batch);
return ret || *nr;
}
void sbitmap_queue_wake_up(struct sbitmap_queue *sbq, int nr)
{
while (__sbq_wake_up(sbq, &nr))
;
}
EXPORT_SYMBOL_GPL(sbitmap_queue_wake_up);
......@@ -792,9 +724,7 @@ void sbitmap_queue_show(struct sbitmap_queue *sbq, struct seq_file *m)
seq_puts(m, "ws={\n");
for (i = 0; i < SBQ_WAIT_QUEUES; i++) {
struct sbq_wait_state *ws = &sbq->ws[i];
seq_printf(m, "\t{.wait_cnt=%d, .wait=%s},\n",
atomic_read(&ws->wait_cnt),
seq_printf(m, "\t{.wait=%s},\n",
waitqueue_active(&ws->wait) ? "active" : "inactive");
}
seq_puts(m, "}\n");
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册