From 480cff632221dc4d4889bf72dd0f09cd35096bc1 Mon Sep 17 00:00:00 2001 From: Paolo Bonzini Date: Mon, 13 Feb 2017 19:12:40 +0100 Subject: [PATCH] coroutine-lock: add limited spinning to CoMutex Running a very small critical section on pthread_mutex_t and CoMutex shows that pthread_mutex_t is much faster because it doesn't actually go to sleep. What happens is that the critical section is shorter than the latency of entering the kernel and thus FUTEX_WAIT always fails. With CoMutex there is no such latency but you still want to avoid wait and wakeup. So introduce it artificially. This only works with one waiters; because CoMutex is fair, it will always have more waits and wakeups than a pthread_mutex_t. Signed-off-by: Paolo Bonzini Reviewed-by: Fam Zheng Message-id: 20170213181244.16297-3-pbonzini@redhat.com Signed-off-by: Stefan Hajnoczi --- include/qemu/coroutine.h | 5 ++++ util/qemu-coroutine-lock.c | 51 +++++++++++++++++++++++++++++++++----- util/qemu-coroutine.c | 2 +- 3 files changed, 51 insertions(+), 7 deletions(-) diff --git a/include/qemu/coroutine.h b/include/qemu/coroutine.h index fce228f68a..12ce8e109e 100644 --- a/include/qemu/coroutine.h +++ b/include/qemu/coroutine.h @@ -167,6 +167,11 @@ typedef struct CoMutex { */ unsigned locked; + /* Context that is holding the lock. Useful to avoid spinning + * when two coroutines on the same AioContext try to get the lock. :) + */ + AioContext *ctx; + /* A queue of waiters. Elements are added atomically in front of * from_push. to_pop is only populated, and popped from, by whoever * is in charge of the next wakeup. This can be an unlocker or, diff --git a/util/qemu-coroutine-lock.c b/util/qemu-coroutine-lock.c index 25da9fa8d0..73fe77cc80 100644 --- a/util/qemu-coroutine-lock.c +++ b/util/qemu-coroutine-lock.c @@ -30,6 +30,7 @@ #include "qemu-common.h" #include "qemu/coroutine.h" #include "qemu/coroutine_int.h" +#include "qemu/processor.h" #include "qemu/queue.h" #include "block/aio.h" #include "trace.h" @@ -181,7 +182,18 @@ void qemu_co_mutex_init(CoMutex *mutex) memset(mutex, 0, sizeof(*mutex)); } -static void coroutine_fn qemu_co_mutex_lock_slowpath(CoMutex *mutex) +static void coroutine_fn qemu_co_mutex_wake(CoMutex *mutex, Coroutine *co) +{ + /* Read co before co->ctx; pairs with smp_wmb() in + * qemu_coroutine_enter(). + */ + smp_read_barrier_depends(); + mutex->ctx = co->ctx; + aio_co_wake(co); +} + +static void coroutine_fn qemu_co_mutex_lock_slowpath(AioContext *ctx, + CoMutex *mutex) { Coroutine *self = qemu_coroutine_self(); CoWaitRecord w; @@ -206,10 +218,11 @@ static void coroutine_fn qemu_co_mutex_lock_slowpath(CoMutex *mutex) if (co == self) { /* We got the lock ourselves! */ assert(to_wake == &w); + mutex->ctx = ctx; return; } - aio_co_wake(co); + qemu_co_mutex_wake(mutex, co); } qemu_coroutine_yield(); @@ -218,13 +231,39 @@ static void coroutine_fn qemu_co_mutex_lock_slowpath(CoMutex *mutex) void coroutine_fn qemu_co_mutex_lock(CoMutex *mutex) { + AioContext *ctx = qemu_get_current_aio_context(); Coroutine *self = qemu_coroutine_self(); + int waiters, i; + + /* Running a very small critical section on pthread_mutex_t and CoMutex + * shows that pthread_mutex_t is much faster because it doesn't actually + * go to sleep. What happens is that the critical section is shorter + * than the latency of entering the kernel and thus FUTEX_WAIT always + * fails. With CoMutex there is no such latency but you still want to + * avoid wait and wakeup. So introduce it artificially. + */ + i = 0; +retry_fast_path: + waiters = atomic_cmpxchg(&mutex->locked, 0, 1); + if (waiters != 0) { + while (waiters == 1 && ++i < 1000) { + if (atomic_read(&mutex->ctx) == ctx) { + break; + } + if (atomic_read(&mutex->locked) == 0) { + goto retry_fast_path; + } + cpu_relax(); + } + waiters = atomic_fetch_inc(&mutex->locked); + } - if (atomic_fetch_inc(&mutex->locked) == 0) { + if (waiters == 0) { /* Uncontended. */ trace_qemu_co_mutex_lock_uncontended(mutex, self); + mutex->ctx = ctx; } else { - qemu_co_mutex_lock_slowpath(mutex); + qemu_co_mutex_lock_slowpath(ctx, mutex); } mutex->holder = self; self->locks_held++; @@ -240,6 +279,7 @@ void coroutine_fn qemu_co_mutex_unlock(CoMutex *mutex) assert(mutex->holder == self); assert(qemu_in_coroutine()); + mutex->ctx = NULL; mutex->holder = NULL; self->locks_held--; if (atomic_fetch_dec(&mutex->locked) == 1) { @@ -252,8 +292,7 @@ void coroutine_fn qemu_co_mutex_unlock(CoMutex *mutex) unsigned our_handoff; if (to_wake) { - Coroutine *co = to_wake->co; - aio_co_wake(co); + qemu_co_mutex_wake(mutex, to_wake->co); break; } diff --git a/util/qemu-coroutine.c b/util/qemu-coroutine.c index 415600dc30..72412e5649 100644 --- a/util/qemu-coroutine.c +++ b/util/qemu-coroutine.c @@ -118,7 +118,7 @@ void qemu_coroutine_enter(Coroutine *co) co->ctx = qemu_get_current_aio_context(); /* Store co->ctx before anything that stores co. Matches - * barrier in aio_co_wake. + * barrier in aio_co_wake and qemu_co_mutex_wake. */ smp_wmb(); -- GitLab