提交 da915ad5 编写于 作者: P Paul E. McKenney

srcu: Parallelize callback handling

Peter Zijlstra proposed using SRCU to reduce mmap_sem contention [1,2],
however, there are workloads that could result in a high volume of
concurrent invocations of call_srcu(), which with current SRCU would
result in excessive lock contention on the srcu_struct structure's
->queue_lock, which protects SRCU's callback lists.  This commit therefore
moves SRCU to per-CPU callback lists, thus greatly reducing contention.

Because a given SRCU instance no longer has a single centralized callback
list, starting grace periods and invoking callbacks are both more complex
than in the single-list Classic SRCU implementation.  Starting grace
periods and handling callbacks are now handled using an srcu_node tree
that is in some ways similar to the rcu_node trees used by RCU-bh,
RCU-preempt, and RCU-sched (for example, the srcu_node tree shape is
controlled by exactly the same Kconfig options and boot parameters that
control the shape of the rcu_node tree).

In addition, the old per-CPU srcu_array structure is now named srcu_data
and contains an rcu_segcblist structure named ->srcu_cblist for its
callbacks (and a spinlock to protect this).  The srcu_struct gets
an srcu_gp_seq that is used to associate callback segments with the
corresponding completion-time grace-period number.  These completion-time
grace-period numbers are propagated up the srcu_node tree so that the
grace-period workqueue handler can determine whether additional grace
periods are needed on the one hand and where to look for callbacks that
are ready to be invoked.

The srcu_barrier() function must now wait on all instances of the per-CPU
->srcu_cblist.  Because each ->srcu_cblist is protected by ->lock,
srcu_barrier() can remotely add the needed callbacks.  In theory,
it could also remotely start grace periods, but in practice doing so
is complex and racy.  And interestingly enough, it is never necessary
for srcu_barrier() to start a grace period because srcu_barrier() only
enqueues a callback when a callback is already present--and it turns out
that a grace period has to have already been started for this pre-existing
callback.  Furthermore, it is only the callback that srcu_barrier()
needs to wait on, not any particular grace period.  Therefore, a new
rcu_segcblist_entrain() function enqueues the srcu_barrier() function's
callback into the same segment occupied by the last pre-existing callback
in the list.  The special case where all the pre-existing callbacks are
on a different list (because they are in the process of being invoked)
is handled by enqueuing srcu_barrier()'s callback into the RCU_DONE_TAIL
segment, relying on the done-callbacks check that takes place after all
callbacks are inovked.

Note that the readers use the same algorithm as before.  Note that there
is a separate srcu_idx that tells the readers what counter to increment.
This unfortunately cannot be combined with srcu_gp_seq because they
need to be incremented at different times.

This commit introduces some ugly #ifdefs in rcutorture.  These will go
away when I feel good enough about Tree SRCU to ditch Classic SRCU.

Some crude performance comparisons, courtesy of a quickly hacked rcuperf
asynchronous-grace-period capability:

			Callback Queuing Overhead
			-------------------------
	# CPUS		Classic SRCU	Tree SRCU
	------          ------------    ---------
	     2              0.349 us     0.342 us
	    16             31.66  us     0.4   us
	    41             ---------     0.417 us

The times are the 90th percentiles, a statistic that was chosen to reject
the overheads of the occasional srcu_barrier() call needed to avoid OOMing
the test machine.  The rcuperf test hangs when running Classic SRCU at 41
CPUs, hence the line of dashes.  Despite the hacks to both the rcuperf code
and that statistics, this is a convincing demonstration of Tree SRCU's
performance and scalability advantages.

[1] https://lwn.net/Articles/309030/
[2] https://patchwork.kernel.org/patch/5108281/Signed-off-by: NPaul E. McKenney <paulmck@linux.vnet.ibm.com>
[ paulmck: Fix initialization if synchronize_srcu_expedited() called first. ]
上级 6ade8694
......@@ -401,6 +401,37 @@ static inline void rcu_segcblist_enqueue(struct rcu_segcblist *rsclp,
rsclp->tails[RCU_NEXT_TAIL] = &rhp->next;
}
/*
* Entrain the specified callback onto the specified rcu_segcblist at
* the end of the last non-empty segment. If the entire rcu_segcblist
* is empty, make no change, but return false.
*
* This is intended for use by rcu_barrier()-like primitives, -not-
* for normal grace-period use. IMPORTANT: The callback you enqueue
* will wait for all prior callbacks, NOT necessarily for a grace
* period. You have been warned.
*/
static inline bool rcu_segcblist_entrain(struct rcu_segcblist *rsclp,
struct rcu_head *rhp, bool lazy)
{
int i;
if (rcu_segcblist_n_cbs(rsclp) == 0)
return false;
WRITE_ONCE(rsclp->len, rsclp->len + 1);
if (lazy)
rsclp->len_lazy++;
smp_mb(); /* Ensure counts are updated before callback is entrained. */
rhp->next = NULL;
for (i = RCU_NEXT_TAIL; i > RCU_DONE_TAIL; i--)
if (rsclp->tails[i] != rsclp->tails[i - 1])
break;
*rsclp->tails[i] = rhp;
for (; i <= RCU_NEXT_TAIL; i++)
rsclp->tails[i] = &rhp->next;
return true;
}
/*
* Extract only the counts from the specified rcu_segcblist structure,
* and place them in the specified rcu_cblist structure. This function
......@@ -537,7 +568,8 @@ static inline void rcu_segcblist_advance(struct rcu_segcblist *rsclp,
int i, j;
WARN_ON_ONCE(!rcu_segcblist_is_enabled(rsclp));
WARN_ON_ONCE(rcu_segcblist_restempty(rsclp, RCU_DONE_TAIL));
if (rcu_segcblist_restempty(rsclp, RCU_DONE_TAIL))
return;
/*
* Find all callbacks whose ->gp_seq numbers indicate that they
......@@ -582,8 +614,9 @@ static inline void rcu_segcblist_advance(struct rcu_segcblist *rsclp,
* them to complete at the end of the earlier grace period.
*
* This function operates on an rcu_segcblist structure, and also the
* grace-period sequence number at which new callbacks would become
* ready to invoke.
* grace-period sequence number seq at which new callbacks would become
* ready to invoke. Returns true if there are callbacks that won't be
* ready to invoke until seq, false otherwise.
*/
static inline bool rcu_segcblist_accelerate(struct rcu_segcblist *rsclp,
unsigned long seq)
......@@ -591,7 +624,8 @@ static inline bool rcu_segcblist_accelerate(struct rcu_segcblist *rsclp,
int i;
WARN_ON_ONCE(!rcu_segcblist_is_enabled(rsclp));
WARN_ON_ONCE(rcu_segcblist_restempty(rsclp, RCU_DONE_TAIL));
if (rcu_segcblist_restempty(rsclp, RCU_DONE_TAIL))
return false;
/*
* Find the segment preceding the oldest segment of callbacks
......
......@@ -24,25 +24,75 @@
#ifndef _LINUX_SRCU_TREE_H
#define _LINUX_SRCU_TREE_H
struct srcu_array {
unsigned long lock_count[2];
unsigned long unlock_count[2];
#include <linux/rcu_node_tree.h>
#include <linux/completion.h>
struct srcu_node;
struct srcu_struct;
/*
* Per-CPU structure feeding into leaf srcu_node, similar in function
* to rcu_node.
*/
struct srcu_data {
/* Read-side state. */
unsigned long srcu_lock_count[2]; /* Locks per CPU. */
unsigned long srcu_unlock_count[2]; /* Unlocks per CPU. */
/* Update-side state. */
spinlock_t lock ____cacheline_internodealigned_in_smp;
struct rcu_segcblist srcu_cblist; /* List of callbacks.*/
unsigned long srcu_gp_seq_needed; /* Furthest future GP needed. */
bool srcu_cblist_invoking; /* Invoking these CBs? */
struct delayed_work work; /* Context for CB invoking. */
struct rcu_head srcu_barrier_head; /* For srcu_barrier() use. */
struct srcu_node *mynode; /* Leaf srcu_node. */
int cpu;
struct srcu_struct *sp;
};
/*
* Node in SRCU combining tree, similar in function to rcu_data.
*/
struct srcu_node {
spinlock_t lock;
unsigned long srcu_have_cbs[4]; /* GP seq for children */
/* having CBs, but only */
/* is > ->srcu_gq_seq. */
struct srcu_node *srcu_parent; /* Next up in tree. */
int grplo; /* Least CPU for node. */
int grphi; /* Biggest CPU for node. */
};
/*
* Per-SRCU-domain structure, similar in function to rcu_state.
*/
struct srcu_struct {
unsigned long completed;
unsigned long srcu_gp_seq;
atomic_t srcu_exp_cnt;
struct srcu_array __percpu *per_cpu_ref;
spinlock_t queue_lock; /* protect ->srcu_cblist */
struct rcu_segcblist srcu_cblist;
struct srcu_node node[NUM_RCU_NODES]; /* Combining tree. */
struct srcu_node *level[RCU_NUM_LVLS + 1];
/* First node at each level. */
struct mutex srcu_cb_mutex; /* Serialize CB preparation. */
spinlock_t gp_lock; /* protect ->srcu_cblist */
struct mutex srcu_gp_mutex; /* Serialize GP work. */
unsigned int srcu_idx; /* Current rdr array element. */
unsigned long srcu_gp_seq; /* Grace-period seq #. */
unsigned long srcu_gp_seq_needed; /* Latest gp_seq needed. */
atomic_t srcu_exp_cnt; /* # ongoing expedited GPs. */
struct srcu_data __percpu *sda; /* Per-CPU srcu_data array. */
unsigned long srcu_barrier_seq; /* srcu_barrier seq #. */
struct mutex srcu_barrier_mutex; /* Serialize barrier ops. */
struct completion srcu_barrier_completion;
/* Awaken barrier rq at end. */
atomic_t srcu_barrier_cpu_cnt; /* # CPUs not yet posting a */
/* callback for the barrier */
/* operation. */
struct delayed_work work;
#ifdef CONFIG_DEBUG_LOCK_ALLOC
struct lockdep_map dep_map;
#endif /* #ifdef CONFIG_DEBUG_LOCK_ALLOC */
};
/* Values for -> state variable. */
/* Values for state variable (bottom bits of ->srcu_gp_seq). */
#define SRCU_STATE_IDLE 0
#define SRCU_STATE_SCAN1 1
#define SRCU_STATE_SCAN2 2
......@@ -51,11 +101,9 @@ void process_srcu(struct work_struct *work);
#define __SRCU_STRUCT_INIT(name) \
{ \
.completed = -300, \
.per_cpu_ref = &name##_srcu_array, \
.queue_lock = __SPIN_LOCK_UNLOCKED(name.queue_lock), \
.srcu_cblist = RCU_SEGCBLIST_INITIALIZER(name.srcu_cblist),\
.work = __DELAYED_WORK_INITIALIZER(name.work, process_srcu, 0),\
.sda = &name##_srcu_data, \
.gp_lock = __SPIN_LOCK_UNLOCKED(name.gp_lock), \
.srcu_gp_seq_needed = 0 - 1, \
__SRCU_DEP_MAP_INIT(name) \
}
......@@ -79,7 +127,7 @@ void process_srcu(struct work_struct *work);
* See include/linux/percpu-defs.h for the rules on per-CPU variables.
*/
#define __DEFINE_SRCU(name, is_static) \
static DEFINE_PER_CPU(struct srcu_array, name##_srcu_array);\
static DEFINE_PER_CPU(struct srcu_data, name##_srcu_data);\
is_static struct srcu_struct name = __SRCU_STRUCT_INIT(name)
#define DEFINE_SRCU(name) __DEFINE_SRCU(name, /* not static */)
#define DEFINE_STATIC_SRCU(name) __DEFINE_SRCU(name, static)
......
......@@ -563,17 +563,30 @@ static void srcu_torture_stats(void)
int idx;
#if defined(CONFIG_TREE_SRCU) || defined(CONFIG_CLASSIC_SRCU)
#ifdef CONFIG_TREE_SRCU
idx = srcu_ctlp->srcu_idx & 0x1;
#else /* #ifdef CONFIG_TREE_SRCU */
idx = srcu_ctlp->completed & 0x1;
#endif /* #else #ifdef CONFIG_TREE_SRCU */
pr_alert("%s%s Tree SRCU per-CPU(idx=%d):",
torture_type, TORTURE_FLAG, idx);
for_each_possible_cpu(cpu) {
unsigned long l0, l1;
unsigned long u0, u1;
long c0, c1;
struct srcu_array *counts = per_cpu_ptr(srcu_ctlp->per_cpu_ref, cpu);
#ifdef CONFIG_TREE_SRCU
struct srcu_data *counts;
counts = per_cpu_ptr(srcu_ctlp->sda, cpu);
u0 = counts->srcu_unlock_count[!idx];
u1 = counts->srcu_unlock_count[idx];
#else /* #ifdef CONFIG_TREE_SRCU */
struct srcu_array *counts;
counts = per_cpu_ptr(srcu_ctlp->per_cpu_ref, cpu);
u0 = counts->unlock_count[!idx];
u1 = counts->unlock_count[idx];
#endif /* #else #ifdef CONFIG_TREE_SRCU */
/*
* Make sure that a lock is always counted if the corresponding
......@@ -581,8 +594,13 @@ static void srcu_torture_stats(void)
*/
smp_rmb();
#ifdef CONFIG_TREE_SRCU
l0 = counts->srcu_lock_count[!idx];
l1 = counts->srcu_lock_count[idx];
#else /* #ifdef CONFIG_TREE_SRCU */
l0 = counts->lock_count[!idx];
l1 = counts->lock_count[idx];
#endif /* #else #ifdef CONFIG_TREE_SRCU */
c0 = l0 - u0;
c1 = l1 - u1;
......
此差异已折叠。
......@@ -3776,12 +3776,16 @@ int rcutree_online_cpu(unsigned int cpu)
{
sync_sched_exp_online_cleanup(cpu);
rcutree_affinity_setting(cpu, -1);
if (IS_ENABLED(CONFIG_TREE_SRCU))
srcu_online_cpu(cpu);
return 0;
}
int rcutree_offline_cpu(unsigned int cpu)
{
rcutree_affinity_setting(cpu, cpu);
if (IS_ENABLED(CONFIG_TREE_SRCU))
srcu_offline_cpu(cpu);
return 0;
}
......@@ -4157,6 +4161,8 @@ void __init rcu_init(void)
for_each_online_cpu(cpu) {
rcutree_prepare_cpu(cpu);
rcu_cpu_starting(cpu);
if (IS_ENABLED(CONFIG_TREE_SRCU))
srcu_online_cpu(cpu);
}
}
......
......@@ -541,6 +541,14 @@ static bool rcu_nohz_full_cpu(struct rcu_state *rsp);
static void rcu_dynticks_task_enter(void);
static void rcu_dynticks_task_exit(void);
#ifdef CONFIG_SRCU
void srcu_online_cpu(unsigned int cpu);
void srcu_offline_cpu(unsigned int cpu);
#else /* #ifdef CONFIG_SRCU */
void srcu_online_cpu(unsigned int cpu) { }
void srcu_offline_cpu(unsigned int cpu) { }
#endif /* #else #ifdef CONFIG_SRCU */
#endif /* #ifndef RCU_TREE_NONCORE */
#ifdef CONFIG_RCU_TRACE
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册