提交 7911747b 编写于 作者: P Paolo Bonzini

rcu: add rcu library

This includes a (mangled) copy of the liburcu code.  The main changes
are: 1) removing dependencies on many other header files in liburcu; 2)
removing for simplicity the tentative busy waiting in synchronize_rcu,
which has limited performance effects; 3) replacing futexes in
synchronize_rcu with QemuEvents for Win32 portability.  The API is
the same as liburcu, so it should be possible in the future to require
liburcu on POSIX systems for example and use our copy only on Windows.

Among the various versions available I chose urcu-mb, which is the
least invasive implementation even though it does not have the
fastest rcu_read_{lock,unlock} implementation.  The urcu flavor can
be changed later, after benchmarking.
Reviewed-by: NFam Zheng <famz@redhat.com>
Signed-off-by: NPaolo Bonzini <pbonzini@redhat.com>
上级 158ef8cb
Using RCU (Read-Copy-Update) for synchronization
================================================
Read-copy update (RCU) is a synchronization mechanism that is used to
protect read-mostly data structures. RCU is very efficient and scalable
on the read side (it is wait-free), and thus can make the read paths
extremely fast.
RCU supports concurrency between a single writer and multiple readers,
thus it is not used alone. Typically, the write-side will use a lock to
serialize multiple updates, but other approaches are possible (e.g.,
restricting updates to a single task). In QEMU, when a lock is used,
this will often be the "iothread mutex", also known as the "big QEMU
lock" (BQL). Also, restricting updates to a single task is done in
QEMU using the "bottom half" API.
RCU is fundamentally a "wait-to-finish" mechanism. The read side marks
sections of code with "critical sections", and the update side will wait
for the execution of all *currently running* critical sections before
proceeding, or before asynchronously executing a callback.
The key point here is that only the currently running critical sections
are waited for; critical sections that are started _after_ the beginning
of the wait do not extend the wait, despite running concurrently with
the updater. This is the reason why RCU is more scalable than,
for example, reader-writer locks. It is so much more scalable that
the system will have a single instance of the RCU mechanism; a single
mechanism can be used for an arbitrary number of "things", without
having to worry about things such as contention or deadlocks.
How is this possible? The basic idea is to split updates in two phases,
"removal" and "reclamation". During removal, we ensure that subsequent
readers will not be able to get a reference to the old data. After
removal has completed, a critical section will not be able to access
the old data. Therefore, critical sections that begin after removal
do not matter; as soon as all previous critical sections have finished,
there cannot be any readers who hold references to the data structure,
and these can now be safely reclaimed (e.g., freed or unref'ed).
Here is a picutre:
thread 1 thread 2 thread 3
------------------- ------------------------ -------------------
enter RCU crit.sec.
| finish removal phase
| begin wait
| | enter RCU crit.sec.
exit RCU crit.sec | |
complete wait |
begin reclamation phase |
exit RCU crit.sec.
Note how thread 3 is still executing its critical section when thread 2
starts reclaiming data. This is possible, because the old version of the
data structure was not accessible at the time thread 3 began executing
that critical section.
RCU API
=======
The core RCU API is small:
void rcu_read_lock(void);
Used by a reader to inform the reclaimer that the reader is
entering an RCU read-side critical section.
void rcu_read_unlock(void);
Used by a reader to inform the reclaimer that the reader is
exiting an RCU read-side critical section. Note that RCU
read-side critical sections may be nested and/or overlapping.
void synchronize_rcu(void);
Blocks until all pre-existing RCU read-side critical sections
on all threads have completed. This marks the end of the removal
phase and the beginning of reclamation phase.
Note that it would be valid for another update to come while
synchronize_rcu is running. Because of this, it is better that
the updater releases any locks it may hold before calling
synchronize_rcu.
typeof(*p) atomic_rcu_read(p);
atomic_rcu_read() is similar to atomic_mb_read(), but it makes
some assumptions on the code that calls it. This allows a more
optimized implementation.
atomic_rcu_read assumes that whenever a single RCU critical
section reads multiple shared data, these reads are either
data-dependent or need no ordering. This is almost always the
case when using RCU, because read-side critical sections typically
navigate one or more pointers (the pointers that are changed on
every update) until reaching a data structure of interest,
and then read from there.
RCU read-side critical sections must use atomic_rcu_read() to
read data, unless concurrent writes are presented by another
synchronization mechanism.
Furthermore, RCU read-side critical sections should traverse the
data structure in a single direction, opposite to the direction
in which the updater initializes it.
void atomic_rcu_set(p, typeof(*p) v);
atomic_rcu_set() is also similar to atomic_mb_set(), and it also
makes assumptions on the code that calls it in order to allow a more
optimized implementation.
In particular, atomic_rcu_set() suffices for synchronization
with readers, if the updater never mutates a field within a
data item that is already accessible to readers. This is the
case when initializing a new copy of the RCU-protected data
structure; just ensure that initialization of *p is carried out
before atomic_rcu_set() makes the data item visible to readers.
If this rule is observed, writes will happen in the opposite
order as reads in the RCU read-side critical sections (or if
there is just one update), and there will be no need for other
synchronization mechanism to coordinate the accesses.
The following APIs must be used before RCU is used in a thread:
void rcu_register_thread(void);
Mark a thread as taking part in the RCU mechanism. Such a thread
will have to report quiescent points regularly, either manually
or through the QemuCond/QemuSemaphore/QemuEvent APIs.
void rcu_unregister_thread(void);
Mark a thread as not taking part anymore in the RCU mechanism.
It is not a problem if such a thread reports quiescent points,
either manually or by using the QemuCond/QemuSemaphore/QemuEvent
APIs.
Note that these APIs are relatively heavyweight, and should _not_ be
nested.
DIFFERENCES WITH LINUX
======================
- Waiting on a mutex is possible, though discouraged, within an RCU critical
section. This is because spinlocks are rarely (if ever) used in userspace
programming; not allowing this would prevent upgrading an RCU read-side
critical section to become an updater.
- atomic_rcu_read and atomic_rcu_set replace rcu_dereference and
rcu_assign_pointer. They take a _pointer_ to the variable being accessed.
RCU PATTERNS
============
Many patterns using read-writer locks translate directly to RCU, with
the advantages of higher scalability and deadlock immunity.
In general, RCU can be used whenever it is possible to create a new
"version" of a data structure every time the updater runs. This may
sound like a very strict restriction, however:
- the updater does not mean "everything that writes to a data structure",
but rather "everything that involves a reclamation step". See the
array example below
- in some cases, creating a new version of a data structure may actually
be very cheap. For example, modifying the "next" pointer of a singly
linked list is effectively creating a new version of the list.
Here are some frequently-used RCU idioms that are worth noting.
RCU list processing
-------------------
TBD (not yet used in QEMU)
RCU reference counting
----------------------
Because grace periods are not allowed to complete while there is an RCU
read-side critical section in progress, the RCU read-side primitives
may be used as a restricted reference-counting mechanism. For example,
consider the following code fragment:
rcu_read_lock();
p = atomic_rcu_read(&foo);
/* do something with p. */
rcu_read_unlock();
The RCU read-side critical section ensures that the value of "p" remains
valid until after the rcu_read_unlock(). In some sense, it is acquiring
a reference to p that is later released when the critical section ends.
The write side looks simply like this (with appropriate locking):
qemu_mutex_lock(&foo_mutex);
old = foo;
atomic_rcu_set(&foo, new);
qemu_mutex_unlock(&foo_mutex);
synchronize_rcu();
free(old);
Note that the same idiom would be possible with reader/writer
locks:
read_lock(&foo_rwlock); write_mutex_lock(&foo_rwlock);
p = foo; p = foo;
/* do something with p. */ foo = new;
read_unlock(&foo_rwlock); free(p);
write_mutex_unlock(&foo_rwlock);
free(p);
RCU resizable arrays
--------------------
Resizable arrays can be used with RCU. The expensive RCU synchronization
only needs to take place when the array is resized. The two items to
take care of are:
- ensuring that the old version of the array is available between removal
and reclamation;
- avoiding mismatches in the read side between the array data and the
array size.
The first problem is avoided simply by not using realloc. Instead,
each resize will allocate a new array and copy the old data into it.
The second problem would arise if the size and the data pointers were
two members of a larger struct:
struct mystuff {
...
int data_size;
int data_alloc;
T *data;
...
};
Instead, we store the size of the array with the array itself:
struct arr {
int size;
int alloc;
T data[];
};
struct arr *global_array;
read side:
rcu_read_lock();
struct arr *array = atomic_rcu_read(&global_array);
x = i < array->size ? array->data[i] : -1;
rcu_read_unlock();
return x;
write side (running under a lock):
if (global_array->size == global_array->alloc) {
/* Creating a new version. */
new_array = g_malloc(sizeof(struct arr) +
global_array->alloc * 2 * sizeof(T));
new_array->size = global_array->size;
new_array->alloc = global_array->alloc * 2;
memcpy(new_array->data, global_array->data,
global_array->alloc * sizeof(T));
/* Removal phase. */
old_array = global_array;
atomic_rcu_set(&new_array->data, new_array);
synchronize_rcu();
/* Reclamation phase. */
free(old_array);
}
SOURCES
=======
* Documentation/RCU/ from the Linux kernel
......@@ -17,6 +17,7 @@
#include "virtio-9p-xattr.h"
#include "fsdev/qemu-fsdev.h"
#include "virtio-9p-synth.h"
#include "qemu/rcu.h"
#include <sys/stat.h>
......
......@@ -129,6 +129,67 @@
#define atomic_set(ptr, i) ((*(__typeof__(*ptr) volatile*) (ptr)) = (i))
#endif
/**
* atomic_rcu_read - reads a RCU-protected pointer to a local variable
* into a RCU read-side critical section. The pointer can later be safely
* dereferenced within the critical section.
*
* This ensures that the pointer copy is invariant thorough the whole critical
* section.
*
* Inserts memory barriers on architectures that require them (currently only
* Alpha) and documents which pointers are protected by RCU.
*
* Unless the __ATOMIC_CONSUME memory order is available, atomic_rcu_read also
* includes a compiler barrier to ensure that value-speculative optimizations
* (e.g. VSS: Value Speculation Scheduling) does not perform the data read
* before the pointer read by speculating the value of the pointer. On new
* enough compilers, atomic_load takes care of such concern about
* dependency-breaking optimizations.
*
* Should match atomic_rcu_set(), atomic_xchg(), atomic_cmpxchg().
*/
#ifndef atomic_rcu_read
#ifdef __ATOMIC_CONSUME
#define atomic_rcu_read(ptr) ({ \
typeof(*ptr) _val; \
__atomic_load(ptr, &_val, __ATOMIC_CONSUME); \
_val; \
})
#else
#define atomic_rcu_read(ptr) ({ \
typeof(*ptr) _val = atomic_read(ptr); \
smp_read_barrier_depends(); \
_val; \
})
#endif
#endif
/**
* atomic_rcu_set - assigns (publicizes) a pointer to a new data structure
* meant to be read by RCU read-side critical sections.
*
* Documents which pointers will be dereferenced by RCU read-side critical
* sections and adds the required memory barriers on architectures requiring
* them. It also makes sure the compiler does not reorder code initializing the
* data structure before its publication.
*
* Should match atomic_rcu_read().
*/
#ifndef atomic_rcu_set
#ifdef __ATOMIC_RELEASE
#define atomic_rcu_set(ptr, i) do { \
typeof(*ptr) _val = (i); \
__atomic_store(ptr, &_val, __ATOMIC_RELEASE); \
} while(0)
#else
#define atomic_rcu_set(ptr, i) do { \
smp_wmb(); \
atomic_set(ptr, i); \
} while (0)
#endif
#endif
/* These have the same semantics as Java volatile variables.
* See http://gee.cs.oswego.edu/dl/jmm/cookbook.html:
* "1. Issue a StoreStore barrier (wmb) before each volatile store."
......
......@@ -104,6 +104,19 @@ struct { \
(head)->lh_first = NULL; \
} while (/*CONSTCOND*/0)
#define QLIST_SWAP(dstlist, srclist, field) do { \
void *tmplist; \
tmplist = (srclist)->lh_first; \
(srclist)->lh_first = (dstlist)->lh_first; \
if ((srclist)->lh_first != NULL) { \
(srclist)->lh_first->field.le_prev = &(srclist)->lh_first; \
} \
(dstlist)->lh_first = tmplist; \
if ((dstlist)->lh_first != NULL) { \
(dstlist)->lh_first->field.le_prev = &(dstlist)->lh_first; \
} \
} while (/*CONSTCOND*/0)
#define QLIST_INSERT_AFTER(listelm, elm, field) do { \
if (((elm)->field.le_next = (listelm)->field.le_next) != NULL) \
(listelm)->field.le_next->field.le_prev = \
......
#ifndef QEMU_RCU_H
#define QEMU_RCU_H
/*
* urcu-mb.h
*
* Userspace RCU header with explicit memory barrier.
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
*
* IBM's contributions to this file may be relicensed under LGPLv2 or later.
*/
#include <stdlib.h>
#include <assert.h>
#include <limits.h>
#include <unistd.h>
#include <stdint.h>
#include <stdbool.h>
#include <glib.h>
#include "qemu/compiler.h"
#include "qemu/thread.h"
#include "qemu/queue.h"
#include "qemu/atomic.h"
#ifdef __cplusplus
extern "C" {
#endif
/*
* Important !
*
* Each thread containing read-side critical sections must be registered
* with rcu_register_thread() before calling rcu_read_lock().
* rcu_unregister_thread() should be called before the thread exits.
*/
#ifdef DEBUG_RCU
#define rcu_assert(args...) assert(args)
#else
#define rcu_assert(args...)
#endif
/*
* Global quiescent period counter with low-order bits unused.
* Using a int rather than a char to eliminate false register dependencies
* causing stalls on some architectures.
*/
extern unsigned long rcu_gp_ctr;
extern QemuEvent rcu_gp_event;
struct rcu_reader_data {
/* Data used by both reader and synchronize_rcu() */
unsigned long ctr;
bool waiting;
/* Data used for registry, protected by rcu_gp_lock */
QLIST_ENTRY(rcu_reader_data) node;
};
extern __thread struct rcu_reader_data rcu_reader;
static inline void rcu_read_lock(void)
{
struct rcu_reader_data *p_rcu_reader = &rcu_reader;
unsigned ctr = atomic_read(&rcu_gp_ctr);
atomic_xchg(&p_rcu_reader->ctr, ctr);
if (atomic_read(&p_rcu_reader->waiting)) {
atomic_set(&p_rcu_reader->waiting, false);
qemu_event_set(&rcu_gp_event);
}
}
static inline void rcu_read_unlock(void)
{
struct rcu_reader_data *p_rcu_reader = &rcu_reader;
atomic_xchg(&p_rcu_reader->ctr, 0);
if (atomic_read(&p_rcu_reader->waiting)) {
atomic_set(&p_rcu_reader->waiting, false);
qemu_event_set(&rcu_gp_event);
}
}
extern void synchronize_rcu(void);
/*
* Reader thread registration.
*/
extern void rcu_register_thread(void);
extern void rcu_unregister_thread(void);
#ifdef __cplusplus
}
#endif
#endif /* QEMU_RCU_H */
......@@ -25,9 +25,6 @@ void qemu_mutex_lock(QemuMutex *mutex);
int qemu_mutex_trylock(QemuMutex *mutex);
void qemu_mutex_unlock(QemuMutex *mutex);
#define rcu_read_lock() do { } while (0)
#define rcu_read_unlock() do { } while (0)
void qemu_cond_init(QemuCond *cond);
void qemu_cond_destroy(QemuCond *cond);
......
......@@ -17,3 +17,4 @@ util-obj-y += throttle.o
util-obj-y += getauxval.o
util-obj-y += readline.o
util-obj-y += rfifolock.o
util-obj-y += rcu.o
/*
* urcu-mb.c
*
* Userspace RCU library with explicit memory barriers
*
* Copyright (c) 2009 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
* Copyright (c) 2009 Paul E. McKenney, IBM Corporation.
* Copyright 2015 Red Hat, Inc.
*
* Ported to QEMU by Paolo Bonzini <pbonzini@redhat.com>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
*
* IBM's contributions to this file may be relicensed under LGPLv2 or later.
*/
#include <stdio.h>
#include <assert.h>
#include <stdlib.h>
#include <stdint.h>
#include <errno.h>
#include "qemu/rcu.h"
#include "qemu/atomic.h"
/*
* Global grace period counter. Bit 0 is always one in rcu_gp_ctr.
* Bits 1 and above are defined in synchronize_rcu.
*/
#define RCU_GP_LOCKED (1UL << 0)
#define RCU_GP_CTR (1UL << 1)
unsigned long rcu_gp_ctr = RCU_GP_LOCKED;
QemuEvent rcu_gp_event;
static QemuMutex rcu_gp_lock;
/*
* Check whether a quiescent state was crossed between the beginning of
* update_counter_and_wait and now.
*/
static inline int rcu_gp_ongoing(unsigned long *ctr)
{
unsigned long v;
v = atomic_read(ctr);
return v && (v != rcu_gp_ctr);
}
/* Written to only by each individual reader. Read by both the reader and the
* writers.
*/
__thread struct rcu_reader_data rcu_reader;
/* Protected by rcu_gp_lock. */
typedef QLIST_HEAD(, rcu_reader_data) ThreadList;
static ThreadList registry = QLIST_HEAD_INITIALIZER(registry);
/* Wait for previous parity/grace period to be empty of readers. */
static void wait_for_readers(void)
{
ThreadList qsreaders = QLIST_HEAD_INITIALIZER(qsreaders);
struct rcu_reader_data *index, *tmp;
for (;;) {
/* We want to be notified of changes made to rcu_gp_ongoing
* while we walk the list.
*/
qemu_event_reset(&rcu_gp_event);
/* Instead of using atomic_mb_set for index->waiting, and
* atomic_mb_read for index->ctr, memory barriers are placed
* manually since writes to different threads are independent.
* atomic_mb_set has a smp_wmb before...
*/
smp_wmb();
QLIST_FOREACH(index, &registry, node) {
atomic_set(&index->waiting, true);
}
/* ... and a smp_mb after. */
smp_mb();
QLIST_FOREACH_SAFE(index, &registry, node, tmp) {
if (!rcu_gp_ongoing(&index->ctr)) {
QLIST_REMOVE(index, node);
QLIST_INSERT_HEAD(&qsreaders, index, node);
/* No need for mb_set here, worst of all we
* get some extra futex wakeups.
*/
atomic_set(&index->waiting, false);
}
}
/* atomic_mb_read has smp_rmb after. */
smp_rmb();
if (QLIST_EMPTY(&registry)) {
break;
}
/* Wait for one thread to report a quiescent state and
* try again.
*/
qemu_event_wait(&rcu_gp_event);
}
/* put back the reader list in the registry */
QLIST_SWAP(&registry, &qsreaders, node);
}
void synchronize_rcu(void)
{
qemu_mutex_lock(&rcu_gp_lock);
if (!QLIST_EMPTY(&registry)) {
/* In either case, the atomic_mb_set below blocks stores that free
* old RCU-protected pointers.
*/
if (sizeof(rcu_gp_ctr) < 8) {
/* For architectures with 32-bit longs, a two-subphases algorithm
* ensures we do not encounter overflow bugs.
*
* Switch parity: 0 -> 1, 1 -> 0.
*/
atomic_mb_set(&rcu_gp_ctr, rcu_gp_ctr ^ RCU_GP_CTR);
wait_for_readers();
atomic_mb_set(&rcu_gp_ctr, rcu_gp_ctr ^ RCU_GP_CTR);
} else {
/* Increment current grace period. */
atomic_mb_set(&rcu_gp_ctr, rcu_gp_ctr + RCU_GP_CTR);
}
wait_for_readers();
}
qemu_mutex_unlock(&rcu_gp_lock);
}
void rcu_register_thread(void)
{
assert(rcu_reader.ctr == 0);
qemu_mutex_lock(&rcu_gp_lock);
QLIST_INSERT_HEAD(&registry, &rcu_reader, node);
qemu_mutex_unlock(&rcu_gp_lock);
}
void rcu_unregister_thread(void)
{
qemu_mutex_lock(&rcu_gp_lock);
QLIST_REMOVE(&rcu_reader, node);
qemu_mutex_unlock(&rcu_gp_lock);
}
static void __attribute__((__constructor__)) rcu_init(void)
{
qemu_mutex_init(&rcu_gp_lock);
qemu_event_init(&rcu_gp_event, true);
rcu_register_thread();
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册