提交 b0c29f79 编写于 作者: D Davidlohr Bueso 提交者: Ingo Molnar

futexes: Avoid taking the hb->lock if there's nothing to wake up

In futex_wake() there is clearly no point in taking the hb->lock
if we know beforehand that there are no tasks to be woken. While
the hash bucket's plist head is a cheap way of knowing this, we
cannot rely 100% on it as there is a racy window between the
futex_wait call and when the task is actually added to the
plist. To this end, we couple it with the spinlock check as
tasks trying to enter the critical region are most likely
potential waiters that will be added to the plist, thus
preventing tasks sleeping forever if wakers don't acknowledge
all possible waiters.

Furthermore, the futex ordering guarantees are preserved,
ensuring that waiters either observe the changed user space
value before blocking or is woken by a concurrent waker. For
wakers, this is done by relying on the barriers in
get_futex_key_refs() -- for archs that do not have implicit mb
in atomic_inc(), we explicitly add them through a new
futex_get_mm function. For waiters we rely on the fact that
spin_lock calls already update the head counter, so spinners
are visible even if the lock hasn't been acquired yet.

For more details please refer to the updated comments in the
code and related discussion:

  https://lkml.org/lkml/2013/11/26/556

Special thanks to tglx for careful review and feedback.
Suggested-by: NLinus Torvalds <torvalds@linux-foundation.org>
Reviewed-by: NDarren Hart <dvhart@linux.intel.com>
Reviewed-by: NThomas Gleixner <tglx@linutronix.de>
Reviewed-by: NPeter Zijlstra <peterz@infradead.org>
Signed-off-by: NDavidlohr Bueso <davidlohr@hp.com>
Cc: Paul E. McKenney <paulmck@linux.vnet.ibm.com>
Cc: Mike Galbraith <efault@gmx.de>
Cc: Jeff Mahoney <jeffm@suse.com>
Cc: Scott Norton <scott.norton@hp.com>
Cc: Tom Vaden <tom.vaden@hp.com>
Cc: Aswin Chandramouleeswaran <aswin@hp.com>
Cc: Waiman Long <Waiman.Long@hp.com>
Cc: Jason Low <jason.low2@hp.com>
Cc: Andrew Morton <akpm@linux-foundation.org>
Link: http://lkml.kernel.org/r/1389569486-25487-5-git-send-email-davidlohr@hp.comSigned-off-by: NIngo Molnar <mingo@kernel.org>
上级 99b60ce6
...@@ -75,17 +75,20 @@ ...@@ -75,17 +75,20 @@
* The waiter reads the futex value in user space and calls * The waiter reads the futex value in user space and calls
* futex_wait(). This function computes the hash bucket and acquires * futex_wait(). This function computes the hash bucket and acquires
* the hash bucket lock. After that it reads the futex user space value * the hash bucket lock. After that it reads the futex user space value
* again and verifies that the data has not changed. If it has not * again and verifies that the data has not changed. If it has not changed
* changed it enqueues itself into the hash bucket, releases the hash * it enqueues itself into the hash bucket, releases the hash bucket lock
* bucket lock and schedules. * and schedules.
* *
* The waker side modifies the user space value of the futex and calls * The waker side modifies the user space value of the futex and calls
* futex_wake(). This functions computes the hash bucket and acquires * futex_wake(). This function computes the hash bucket and acquires the
* the hash bucket lock. Then it looks for waiters on that futex in the * hash bucket lock. Then it looks for waiters on that futex in the hash
* hash bucket and wakes them. * bucket and wakes them.
* *
* Note that the spin_lock serializes waiters and wakers, so that the * In futex wake up scenarios where no tasks are blocked on a futex, taking
* following scenario is avoided: * the hb spinlock can be avoided and simply return. In order for this
* optimization to work, ordering guarantees must exist so that the waiter
* being added to the list is acknowledged when the list is concurrently being
* checked by the waker, avoiding scenarios like the following:
* *
* CPU 0 CPU 1 * CPU 0 CPU 1
* val = *futex; * val = *futex;
...@@ -106,24 +109,52 @@ ...@@ -106,24 +109,52 @@
* This would cause the waiter on CPU 0 to wait forever because it * This would cause the waiter on CPU 0 to wait forever because it
* missed the transition of the user space value from val to newval * missed the transition of the user space value from val to newval
* and the waker did not find the waiter in the hash bucket queue. * and the waker did not find the waiter in the hash bucket queue.
* The spinlock serializes that:
* *
* CPU 0 CPU 1 * The correct serialization ensures that a waiter either observes
* the changed user space value before blocking or is woken by a
* concurrent waker:
*
* CPU 0 CPU 1
* val = *futex; * val = *futex;
* sys_futex(WAIT, futex, val); * sys_futex(WAIT, futex, val);
* futex_wait(futex, val); * futex_wait(futex, val);
* lock(hash_bucket(futex)); *
* uval = *futex; * waiters++;
* *futex = newval; * mb(); (A) <-- paired with -.
* sys_futex(WAKE, futex); * |
* futex_wake(futex); * lock(hash_bucket(futex)); |
* lock(hash_bucket(futex)); * |
* uval = *futex; |
* | *futex = newval;
* | sys_futex(WAKE, futex);
* | futex_wake(futex);
* |
* `-------> mb(); (B)
* if (uval == val) * if (uval == val)
* queue(); * queue();
* unlock(hash_bucket(futex)); * unlock(hash_bucket(futex));
* schedule(); if (!queue_empty()) * schedule(); if (waiters)
* wake_waiters(futex); * lock(hash_bucket(futex));
* unlock(hash_bucket(futex)); * wake_waiters(futex);
* unlock(hash_bucket(futex));
*
* Where (A) orders the waiters increment and the futex value read -- this
* is guaranteed by the head counter in the hb spinlock; and where (B)
* orders the write to futex and the waiters read -- this is done by the
* barriers in get_futex_key_refs(), through either ihold or atomic_inc,
* depending on the futex type.
*
* This yields the following case (where X:=waiters, Y:=futex):
*
* X = Y = 0
*
* w[X]=1 w[Y]=1
* MB MB
* r[Y]=y r[X]=x
*
* Which guarantees that x==0 && y==0 is impossible; which translates back into
* the guarantee that we cannot both miss the futex variable change and the
* enqueue.
*/ */
int __read_mostly futex_cmpxchg_enabled; int __read_mostly futex_cmpxchg_enabled;
...@@ -211,6 +242,36 @@ static unsigned long __read_mostly futex_hashsize; ...@@ -211,6 +242,36 @@ static unsigned long __read_mostly futex_hashsize;
static struct futex_hash_bucket *futex_queues; static struct futex_hash_bucket *futex_queues;
static inline void futex_get_mm(union futex_key *key)
{
atomic_inc(&key->private.mm->mm_count);
/*
* Ensure futex_get_mm() implies a full barrier such that
* get_futex_key() implies a full barrier. This is relied upon
* as full barrier (B), see the ordering comment above.
*/
smp_mb__after_atomic_inc();
}
static inline bool hb_waiters_pending(struct futex_hash_bucket *hb)
{
#ifdef CONFIG_SMP
/*
* Tasks trying to enter the critical region are most likely
* potential waiters that will be added to the plist. Ensure
* that wakers won't miss to-be-slept tasks in the window between
* the wait call and the actual plist_add.
*/
if (spin_is_locked(&hb->lock))
return true;
smp_rmb(); /* Make sure we check the lock state first */
return !plist_head_empty(&hb->chain);
#else
return true;
#endif
}
/* /*
* We hash on the keys returned from get_futex_key (see below). * We hash on the keys returned from get_futex_key (see below).
*/ */
...@@ -245,10 +306,10 @@ static void get_futex_key_refs(union futex_key *key) ...@@ -245,10 +306,10 @@ static void get_futex_key_refs(union futex_key *key)
switch (key->both.offset & (FUT_OFF_INODE|FUT_OFF_MMSHARED)) { switch (key->both.offset & (FUT_OFF_INODE|FUT_OFF_MMSHARED)) {
case FUT_OFF_INODE: case FUT_OFF_INODE:
ihold(key->shared.inode); ihold(key->shared.inode); /* implies MB (B) */
break; break;
case FUT_OFF_MMSHARED: case FUT_OFF_MMSHARED:
atomic_inc(&key->private.mm->mm_count); futex_get_mm(key); /* implies MB (B) */
break; break;
} }
} }
...@@ -322,7 +383,7 @@ get_futex_key(u32 __user *uaddr, int fshared, union futex_key *key, int rw) ...@@ -322,7 +383,7 @@ get_futex_key(u32 __user *uaddr, int fshared, union futex_key *key, int rw)
if (!fshared) { if (!fshared) {
key->private.mm = mm; key->private.mm = mm;
key->private.address = address; key->private.address = address;
get_futex_key_refs(key); get_futex_key_refs(key); /* implies MB (B) */
return 0; return 0;
} }
...@@ -429,7 +490,7 @@ get_futex_key(u32 __user *uaddr, int fshared, union futex_key *key, int rw) ...@@ -429,7 +490,7 @@ get_futex_key(u32 __user *uaddr, int fshared, union futex_key *key, int rw)
key->shared.pgoff = basepage_index(page); key->shared.pgoff = basepage_index(page);
} }
get_futex_key_refs(key); get_futex_key_refs(key); /* implies MB (B) */
out: out:
unlock_page(page_head); unlock_page(page_head);
...@@ -1052,6 +1113,11 @@ futex_wake(u32 __user *uaddr, unsigned int flags, int nr_wake, u32 bitset) ...@@ -1052,6 +1113,11 @@ futex_wake(u32 __user *uaddr, unsigned int flags, int nr_wake, u32 bitset)
goto out; goto out;
hb = hash_futex(&key); hb = hash_futex(&key);
/* Make sure we really have tasks to wakeup */
if (!hb_waiters_pending(hb))
goto out_put_key;
spin_lock(&hb->lock); spin_lock(&hb->lock);
plist_for_each_entry_safe(this, next, &hb->chain, list) { plist_for_each_entry_safe(this, next, &hb->chain, list) {
...@@ -1072,6 +1138,7 @@ futex_wake(u32 __user *uaddr, unsigned int flags, int nr_wake, u32 bitset) ...@@ -1072,6 +1138,7 @@ futex_wake(u32 __user *uaddr, unsigned int flags, int nr_wake, u32 bitset)
} }
spin_unlock(&hb->lock); spin_unlock(&hb->lock);
out_put_key:
put_futex_key(&key); put_futex_key(&key);
out: out:
return ret; return ret;
...@@ -1535,7 +1602,7 @@ static inline struct futex_hash_bucket *queue_lock(struct futex_q *q) ...@@ -1535,7 +1602,7 @@ static inline struct futex_hash_bucket *queue_lock(struct futex_q *q)
hb = hash_futex(&q->key); hb = hash_futex(&q->key);
q->lock_ptr = &hb->lock; q->lock_ptr = &hb->lock;
spin_lock(&hb->lock); spin_lock(&hb->lock); /* implies MB (A) */
return hb; return hb;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册