提交 d7db7ec8 编写于 作者: S shade

8023234: StampedLock serializes readers on writer unlock

Summary: Sync-up the fix from jsr166 CVS, signal more readers on writer unlock
Reviewed-by: martin, shade
Contributed-by: NDoug Lea <dl@cs.oswego.edu>
上级 e717494b
...@@ -226,7 +226,11 @@ public class StampedLock implements java.io.Serializable { ...@@ -226,7 +226,11 @@ public class StampedLock implements java.io.Serializable {
* incoming reader arrives while read lock is held but there is a * incoming reader arrives while read lock is held but there is a
* queued writer, this incoming reader is queued. (This rule is * queued writer, this incoming reader is queued. (This rule is
* responsible for some of the complexity of method acquireRead, * responsible for some of the complexity of method acquireRead,
* but without it, the lock becomes highly unfair.) * but without it, the lock becomes highly unfair.) Method release
* does not (and sometimes cannot) itself wake up cowaiters. This
* is done by the primary thread, but helped by any other threads
* with nothing better to do in methods acquireRead and
* acquireWrite.
* *
* These rules apply to threads actually queued. All tryLock forms * These rules apply to threads actually queued. All tryLock forms
* opportunistically try to acquire locks regardless of preference * opportunistically try to acquire locks regardless of preference
...@@ -267,11 +271,14 @@ public class StampedLock implements java.io.Serializable { ...@@ -267,11 +271,14 @@ public class StampedLock implements java.io.Serializable {
/** Number of processors, for spin control */ /** Number of processors, for spin control */
private static final int NCPU = Runtime.getRuntime().availableProcessors(); private static final int NCPU = Runtime.getRuntime().availableProcessors();
/** Maximum number of retries before blocking on acquisition */ /** Maximum number of retries before enqueuing on acquisition */
private static final int SPINS = (NCPU > 1) ? 1 << 6 : 0; private static final int SPINS = (NCPU > 1) ? 1 << 6 : 0;
/** Maximum number of retries before blocking at head on acquisition */
private static final int HEAD_SPINS = (NCPU > 1) ? 1 << 10 : 0;
/** Maximum number of retries before re-blocking */ /** Maximum number of retries before re-blocking */
private static final int MAX_HEAD_SPINS = (NCPU > 1) ? 1 << 12 : 0; private static final int MAX_HEAD_SPINS = (NCPU > 1) ? 1 << 16 : 0;
/** The period for yielding when waiting for overflow spinlock */ /** The period for yielding when waiting for overflow spinlock */
private static final int OVERFLOW_YIELD_RATE = 7; // must be power 2 - 1 private static final int OVERFLOW_YIELD_RATE = 7; // must be power 2 - 1
...@@ -415,8 +422,8 @@ public class StampedLock implements java.io.Serializable { ...@@ -415,8 +422,8 @@ public class StampedLock implements java.io.Serializable {
* @return a stamp that can be used to unlock or convert mode * @return a stamp that can be used to unlock or convert mode
*/ */
public long readLock() { public long readLock() {
long s, next; // bypass acquireRead on fully unlocked case only long s = state, next; // bypass acquireRead on common uncontended case
return ((((s = state) & ABITS) == 0L && return ((whead == wtail && (s & ABITS) < RFULL &&
U.compareAndSwapLong(this, STATE, s, next = s + RUNIT)) ? U.compareAndSwapLong(this, STATE, s, next = s + RUNIT)) ?
next : acquireRead(false, 0L)); next : acquireRead(false, 0L));
} }
...@@ -1012,18 +1019,9 @@ public class StampedLock implements java.io.Serializable { ...@@ -1012,18 +1019,9 @@ public class StampedLock implements java.io.Serializable {
if (t.status <= 0) if (t.status <= 0)
q = t; q = t;
} }
if (q != null) { if (q != null && (w = q.thread) != null)
for (WNode r = q;;) { // release co-waiters too
if ((w = r.thread) != null) {
r.thread = null;
U.unpark(w); U.unpark(w);
} }
if ((r = q.cowait) == null)
break;
U.compareAndSwapObject(q, WCOWAIT, r, r.cowait);
}
}
}
} }
/** /**
...@@ -1038,22 +1036,22 @@ public class StampedLock implements java.io.Serializable { ...@@ -1038,22 +1036,22 @@ public class StampedLock implements java.io.Serializable {
private long acquireWrite(boolean interruptible, long deadline) { private long acquireWrite(boolean interruptible, long deadline) {
WNode node = null, p; WNode node = null, p;
for (int spins = -1;;) { // spin while enqueuing for (int spins = -1;;) { // spin while enqueuing
long s, ns; long m, s, ns;
if (((s = state) & ABITS) == 0L) { if ((m = (s = state) & ABITS) == 0L) {
if (U.compareAndSwapLong(this, STATE, s, ns = s + WBIT)) if (U.compareAndSwapLong(this, STATE, s, ns = s + WBIT))
return ns; return ns;
} }
else if (spins < 0)
spins = (m == WBIT && wtail == whead) ? SPINS : 0;
else if (spins > 0) { else if (spins > 0) {
if (LockSupport.nextSecondarySeed() >= 0) if (LockSupport.nextSecondarySeed() >= 0)
--spins; --spins;
} }
else if ((p = wtail) == null) { // initialize queue else if ((p = wtail) == null) { // initialize queue
WNode h = new WNode(WMODE, null); WNode hd = new WNode(WMODE, null);
if (U.compareAndSwapObject(this, WHEAD, null, h)) if (U.compareAndSwapObject(this, WHEAD, null, hd))
wtail = h; wtail = hd;
} }
else if (spins < 0)
spins = (p == whead) ? SPINS : 0;
else if (node == null) else if (node == null)
node = new WNode(WMODE, p); node = new WNode(WMODE, p);
else if (node.prev != p) else if (node.prev != p)
...@@ -1064,14 +1062,18 @@ public class StampedLock implements java.io.Serializable { ...@@ -1064,14 +1062,18 @@ public class StampedLock implements java.io.Serializable {
} }
} }
for (int spins = SPINS;;) { for (int spins = -1;;) {
WNode np, pp; int ps; long s, ns; Thread w; WNode h, np, pp; int ps;
while ((np = node.prev) != p && np != null) if ((h = whead) == p) {
(p = np).next = node; // stale if (spins < 0)
if (whead == p) { spins = HEAD_SPINS;
else if (spins < MAX_HEAD_SPINS)
spins <<= 1;
for (int k = spins;;) { // spin at head for (int k = spins;;) { // spin at head
long s, ns;
if (((s = state) & ABITS) == 0L) { if (((s = state) & ABITS) == 0L) {
if (U.compareAndSwapLong(this, STATE, s, ns = s+WBIT)) { if (U.compareAndSwapLong(this, STATE, s,
ns = s + WBIT)) {
whead = node; whead = node;
node.prev = null; node.prev = null;
return ns; return ns;
...@@ -1081,10 +1083,21 @@ public class StampedLock implements java.io.Serializable { ...@@ -1081,10 +1083,21 @@ public class StampedLock implements java.io.Serializable {
--k <= 0) --k <= 0)
break; break;
} }
if (spins < MAX_HEAD_SPINS)
spins <<= 1;
} }
if ((ps = p.status) == 0) else if (h != null) { // help release stale waiters
WNode c; Thread w;
while ((c = h.cowait) != null) {
if (U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) &&
(w = c.thread) != null)
U.unpark(w);
}
}
if (whead == h) {
if ((np = node.prev) != p) {
if (np != null)
(p = np).next = node; // stale
}
else if ((ps = p.status) == 0)
U.compareAndSwapInt(p, WSTATUS, 0, WAITING); U.compareAndSwapInt(p, WSTATUS, 0, WAITING);
else if (ps == CANCELLED) { else if (ps == CANCELLED) {
if ((pp = p.prev) != null) { if ((pp = p.prev) != null) {
...@@ -1099,11 +1112,11 @@ public class StampedLock implements java.io.Serializable { ...@@ -1099,11 +1112,11 @@ public class StampedLock implements java.io.Serializable {
else if ((time = deadline - System.nanoTime()) <= 0L) else if ((time = deadline - System.nanoTime()) <= 0L)
return cancelWaiter(node, node, false); return cancelWaiter(node, node, false);
Thread wt = Thread.currentThread(); Thread wt = Thread.currentThread();
U.putObject(wt, PARKBLOCKER, this); // emulate LockSupport.park U.putObject(wt, PARKBLOCKER, this);
node.thread = wt; node.thread = wt;
if (node.prev == p && p.status == WAITING && // recheck if (p.status < 0 && (p != h || (state & ABITS) != 0L) &&
(p != whead || (state & ABITS) != 0L)) whead == h && node.prev == p)
U.park(false, time); U.park(false, time); // emulate LockSupport.park
node.thread = null; node.thread = null;
U.putObject(wt, PARKBLOCKER, null); U.putObject(wt, PARKBLOCKER, null);
if (interruptible && Thread.interrupted()) if (interruptible && Thread.interrupted())
...@@ -1111,6 +1124,7 @@ public class StampedLock implements java.io.Serializable { ...@@ -1111,6 +1124,7 @@ public class StampedLock implements java.io.Serializable {
} }
} }
} }
}
/** /**
* See above for explanation. * See above for explanation.
...@@ -1122,115 +1136,134 @@ public class StampedLock implements java.io.Serializable { ...@@ -1122,115 +1136,134 @@ public class StampedLock implements java.io.Serializable {
* @return next state, or INTERRUPTED * @return next state, or INTERRUPTED
*/ */
private long acquireRead(boolean interruptible, long deadline) { private long acquireRead(boolean interruptible, long deadline) {
WNode node = null, group = null, p; WNode node = null, p;
for (int spins = -1;;) { for (int spins = -1;;) {
for (;;) { WNode h;
long s, m, ns; WNode h, q; Thread w; // anti-barging guard if ((h = whead) == (p = wtail)) {
if (group == null && (h = whead) != null && for (long m, s, ns;;) {
(q = h.next) != null && q.mode != RMODE)
break;
if ((m = (s = state) & ABITS) < RFULL ? if ((m = (s = state) & ABITS) < RFULL ?
U.compareAndSwapLong(this, STATE, s, ns = s + RUNIT) : U.compareAndSwapLong(this, STATE, s, ns = s + RUNIT) :
(m < WBIT && (ns = tryIncReaderOverflow(s)) != 0L)) { (m < WBIT && (ns = tryIncReaderOverflow(s)) != 0L))
if (group != null) { // help release others return ns;
for (WNode r = group;;) { else if (m >= WBIT) {
if ((w = r.thread) != null) { if (spins > 0) {
r.thread = null; if (LockSupport.nextSecondarySeed() >= 0)
U.unpark(w); --spins;
} }
if ((r = group.cowait) == null) else {
if (spins == 0) {
WNode nh = whead, np = wtail;
if ((nh == h && np == p) || (h = nh) != (p = np))
break; break;
U.compareAndSwapObject(group, WCOWAIT, r, r.cowait);
} }
spins = SPINS;
} }
return ns;
} }
if (m >= WBIT)
break;
} }
if (spins > 0) {
if (LockSupport.nextSecondarySeed() >= 0)
--spins;
} }
else if ((p = wtail) == null) { if (p == null) { // initialize queue
WNode h = new WNode(WMODE, null); WNode hd = new WNode(WMODE, null);
if (U.compareAndSwapObject(this, WHEAD, null, h)) if (U.compareAndSwapObject(this, WHEAD, null, hd))
wtail = h; wtail = hd;
} }
else if (spins < 0)
spins = (p == whead) ? SPINS : 0;
else if (node == null) else if (node == null)
node = new WNode(WMODE, p); node = new WNode(RMODE, p);
else if (node.prev != p) else if (h == p || p.mode != RMODE) {
if (node.prev != p)
node.prev = p; node.prev = p;
else if (p.mode == RMODE && p != whead) { else if (U.compareAndSwapObject(this, WTAIL, p, node)) {
WNode pp = p.prev; // become co-waiter with group p p.next = node;
if (pp != null && p == wtail && break;
U.compareAndSwapObject(p, WCOWAIT, }
node.cowait = p.cowait, node)) { }
node.thread = Thread.currentThread(); else if (!U.compareAndSwapObject(p, WCOWAIT,
for (long time;;) { node.cowait = p.cowait, node))
node.cowait = null;
else {
for (;;) {
WNode pp, c; Thread w;
if ((h = whead) != null && (c = h.cowait) != null &&
U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) &&
(w = c.thread) != null) // help release
U.unpark(w);
if (h == (pp = p.prev) || h == p || pp == null) {
long m, s, ns;
do {
if ((m = (s = state) & ABITS) < RFULL ?
U.compareAndSwapLong(this, STATE, s,
ns = s + RUNIT) :
(m < WBIT &&
(ns = tryIncReaderOverflow(s)) != 0L))
return ns;
} while (m < WBIT);
}
if (whead == h && p.prev == pp) {
long time;
if (pp == null || h == p || p.status > 0) {
node = null; // throw away
break;
}
if (deadline == 0L) if (deadline == 0L)
time = 0L; time = 0L;
else if ((time = deadline - System.nanoTime()) <= 0L) else if ((time = deadline - System.nanoTime()) <= 0L)
return cancelWaiter(node, p, false); return cancelWaiter(node, p, false);
if (node.thread == null)
break;
if (p.prev != pp || p.status == CANCELLED ||
p == whead || p.prev != pp) {
node.thread = null;
break;
}
Thread wt = Thread.currentThread(); Thread wt = Thread.currentThread();
U.putObject(wt, PARKBLOCKER, this); U.putObject(wt, PARKBLOCKER, this);
if (node.thread == null) // must recheck node.thread = wt;
break; if ((h != pp || (state & ABITS) == WBIT) &&
whead == h && p.prev == pp)
U.park(false, time); U.park(false, time);
node.thread = null;
U.putObject(wt, PARKBLOCKER, null); U.putObject(wt, PARKBLOCKER, null);
if (interruptible && Thread.interrupted()) if (interruptible && Thread.interrupted())
return cancelWaiter(node, p, true); return cancelWaiter(node, p, true);
} }
group = p;
} }
node = null; // throw away
}
else if (U.compareAndSwapObject(this, WTAIL, p, node)) {
p.next = node;
break;
} }
} }
for (int spins = SPINS;;) { for (int spins = -1;;) {
WNode np, pp, r; int ps; long m, s, ns; Thread w; WNode h, np, pp; int ps;
while ((np = node.prev) != p && np != null) if ((h = whead) == p) {
(p = np).next = node; if (spins < 0)
if (whead == p) { spins = HEAD_SPINS;
for (int k = spins;;) { else if (spins < MAX_HEAD_SPINS)
if ((m = (s = state) & ABITS) != WBIT) { spins <<= 1;
if (m < RFULL ? for (int k = spins;;) { // spin at head
U.compareAndSwapLong(this, STATE, s, ns = s + RUNIT): long m, s, ns;
(ns = tryIncReaderOverflow(s)) != 0L) { if ((m = (s = state) & ABITS) < RFULL ?
U.compareAndSwapLong(this, STATE, s, ns = s + RUNIT) :
(m < WBIT && (ns = tryIncReaderOverflow(s)) != 0L)) {
WNode c; Thread w;
whead = node; whead = node;
node.prev = null; node.prev = null;
while ((r = node.cowait) != null) { while ((c = node.cowait) != null) {
if (U.compareAndSwapObject(node, WCOWAIT, if (U.compareAndSwapObject(node, WCOWAIT,
r, r.cowait) && c, c.cowait) &&
(w = r.thread) != null) { (w = c.thread) != null)
r.thread = null; U.unpark(w);
U.unpark(w); // release co-waiter
}
} }
return ns; return ns;
} }
} else if (m >= WBIT &&
else if (LockSupport.nextSecondarySeed() >= 0 && LockSupport.nextSecondarySeed() >= 0 && --k <= 0)
--k <= 0)
break; break;
} }
if (spins < MAX_HEAD_SPINS)
spins <<= 1;
} }
if ((ps = p.status) == 0) else if (h != null) {
WNode c; Thread w;
while ((c = h.cowait) != null) {
if (U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) &&
(w = c.thread) != null)
U.unpark(w);
}
}
if (whead == h) {
if ((np = node.prev) != p) {
if (np != null)
(p = np).next = node; // stale
}
else if ((ps = p.status) == 0)
U.compareAndSwapInt(p, WSTATUS, 0, WAITING); U.compareAndSwapInt(p, WSTATUS, 0, WAITING);
else if (ps == CANCELLED) { else if (ps == CANCELLED) {
if ((pp = p.prev) != null) { if ((pp = p.prev) != null) {
...@@ -1247,8 +1280,9 @@ public class StampedLock implements java.io.Serializable { ...@@ -1247,8 +1280,9 @@ public class StampedLock implements java.io.Serializable {
Thread wt = Thread.currentThread(); Thread wt = Thread.currentThread();
U.putObject(wt, PARKBLOCKER, this); U.putObject(wt, PARKBLOCKER, this);
node.thread = wt; node.thread = wt;
if (node.prev == p && p.status == WAITING && if (p.status < 0 &&
(p != whead || (state & ABITS) != WBIT)) (p != h || (state & ABITS) == WBIT) &&
whead == h && node.prev == p)
U.park(false, time); U.park(false, time);
node.thread = null; node.thread = null;
U.putObject(wt, PARKBLOCKER, null); U.putObject(wt, PARKBLOCKER, null);
...@@ -1257,6 +1291,7 @@ public class StampedLock implements java.io.Serializable { ...@@ -1257,6 +1291,7 @@ public class StampedLock implements java.io.Serializable {
} }
} }
} }
}
/** /**
* If node non-null, forces cancel status and unsplices it from * If node non-null, forces cancel status and unsplices it from
...@@ -1278,22 +1313,19 @@ public class StampedLock implements java.io.Serializable { ...@@ -1278,22 +1313,19 @@ public class StampedLock implements java.io.Serializable {
if (node != null && group != null) { if (node != null && group != null) {
Thread w; Thread w;
node.status = CANCELLED; node.status = CANCELLED;
node.thread = null;
// unsplice cancelled nodes from group // unsplice cancelled nodes from group
for (WNode p = group, q; (q = p.cowait) != null;) { for (WNode p = group, q; (q = p.cowait) != null;) {
if (q.status == CANCELLED) if (q.status == CANCELLED) {
U.compareAndSwapObject(p, WNEXT, q, q.next); U.compareAndSwapObject(p, WCOWAIT, q, q.cowait);
p = group; // restart
}
else else
p = q; p = q;
} }
if (group == node) { if (group == node) {
WNode r; // detach and wake up uncancelled co-waiters for (WNode r = group.cowait; r != null; r = r.cowait) {
while ((r = node.cowait) != null) { if ((w = r.thread) != null)
if (U.compareAndSwapObject(node, WCOWAIT, r, r.cowait) && U.unpark(w); // wake up uncancelled co-waiters
(w = r.thread) != null) {
r.thread = null;
U.unpark(w);
}
} }
for (WNode pred = node.prev; pred != null; ) { // unsplice for (WNode pred = node.prev; pred != null; ) { // unsplice
WNode succ, pp; // find valid successor WNode succ, pp; // find valid successor
......
/*
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
/*
* @test
* @run main/othervm/timeout=60 ReadersUnlockAfterWriteUnlock
* @bug 8023234
* @summary StampedLock serializes readers on writer unlock
* @author Dmitry Chyuko
* @author Aleksey Shipilev
*/
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.locks.StampedLock;
public class ReadersUnlockAfterWriteUnlock {
static final int RNUM = 2;
static final StampedLock sl = new StampedLock();
static volatile boolean isDone;
static CyclicBarrier iterationStart = new CyclicBarrier(RNUM + 1);
static CyclicBarrier readersHaveLocks = new CyclicBarrier(RNUM);
static CyclicBarrier writerHasLock = new CyclicBarrier(RNUM + 1);
static class Reader extends Thread {
final String name;
Reader(String name) {
super();
this.name = name;
}
public void run() {
while (!isDone && !isInterrupted()) {
try {
iterationStart.await();
writerHasLock.await();
long rs = sl.readLock();
// single reader blocks here indefinitely if readers
// are serialized
readersHaveLocks.await();
sl.unlockRead(rs);
} catch (Exception e) {
throw new IllegalStateException(e);
}
}
}
}
public static void main(String[] args) throws InterruptedException {
for (int r = 0 ; r < RNUM; ++r) {
new Reader("r" + r).start();
}
int i;
for (i = 0; i < 1024; ++i) {
try {
iterationStart.await();
long ws = sl.writeLock();
writerHasLock.await();
Thread.sleep(10);
sl.unlockWrite(ws);
} catch (Exception e) {
throw new IllegalStateException(e);
}
}
isDone = true;
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册