提交 8d3882c0 编写于 作者: D dl

6801020: Concurrent Semaphore release may cause some require thread not signaled

Summary: Introduce PROPAGATE waitStatus
Reviewed-by: martin
上级 3d92ee3d
...@@ -166,6 +166,11 @@ public abstract class AbstractQueuedLongSynchronizer ...@@ -166,6 +166,11 @@ public abstract class AbstractQueuedLongSynchronizer
static final int SIGNAL = -1; static final int SIGNAL = -1;
/** waitStatus value to indicate thread is waiting on condition */ /** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2; static final int CONDITION = -2;
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
*/
static final int PROPAGATE = -3;
/** /**
* Status field, taking on only the values: * Status field, taking on only the values:
...@@ -180,10 +185,16 @@ public abstract class AbstractQueuedLongSynchronizer ...@@ -180,10 +185,16 @@ public abstract class AbstractQueuedLongSynchronizer
* Nodes never leave this state. In particular, * Nodes never leave this state. In particular,
* a thread with cancelled node never again blocks. * a thread with cancelled node never again blocks.
* CONDITION: This node is currently on a condition queue. * CONDITION: This node is currently on a condition queue.
* It will not be used as a sync queue node until * It will not be used as a sync queue node
* transferred. (Use of this value here * until transferred, at which time the status
* has nothing to do with the other uses * will be set to 0. (Use of this value here has
* of the field, but simplifies mechanics.) * nothing to do with the other uses of the
* field, but simplifies mechanics.)
* PROPAGATE: A releaseShared should be propagated to other
* nodes. This is set (for head node only) in
* doReleaseShared to ensure propagation
* continues, even if other operations have
* since intervened.
* 0: None of the above * 0: None of the above
* *
* The values are arranged numerically to simplify use. * The values are arranged numerically to simplify use.
...@@ -403,10 +414,13 @@ public abstract class AbstractQueuedLongSynchronizer ...@@ -403,10 +414,13 @@ public abstract class AbstractQueuedLongSynchronizer
*/ */
private void unparkSuccessor(Node node) { private void unparkSuccessor(Node node) {
/* /*
* Try to clear status in anticipation of signalling. It is * If status is negative (i.e., possibly needing signal) try
* OK if this fails or if status is changed by waiting thread. * to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/ */
compareAndSetWaitStatus(node, Node.SIGNAL, 0); int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/* /*
* Thread to unpark is held in successor, which is normally * Thread to unpark is held in successor, which is normally
...@@ -425,24 +439,71 @@ public abstract class AbstractQueuedLongSynchronizer ...@@ -425,24 +439,71 @@ public abstract class AbstractQueuedLongSynchronizer
LockSupport.unpark(s.thread); LockSupport.unpark(s.thread);
} }
/**
* Release action for shared mode -- signal successor and ensure
* propagation. (Note: For exclusive mode, release just amounts
* to calling unparkSuccessor of head if it needs signal.)
*/
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
/** /**
* Sets head of queue, and checks if successor may be waiting * Sets head of queue, and checks if successor may be waiting
* in shared mode, if so propagating if propagate > 0. * in shared mode, if so propagating if either propagate > 0 or
* PROPAGATE status was set.
* *
* @param pred the node holding waitStatus for node
* @param node the node * @param node the node
* @param propagate the return value from a tryAcquireShared * @param propagate the return value from a tryAcquireShared
*/ */
private void setHeadAndPropagate(Node node, long propagate) { private void setHeadAndPropagate(Node node, long propagate) {
Node h = head; // Record old head for check below
setHead(node); setHead(node);
if (propagate > 0 && node.waitStatus != 0) {
/* /*
* Don't bother fully figuring out successor. If it * Try to signal next queued node if:
* looks null, call unparkSuccessor anyway to be safe. * Propagation was indicated by caller,
*/ * or was recorded (as h.waitStatus) by a previous operation
* (note: this uses sign-check of waitStatus because
* PROPAGATE status may transition to SIGNAL.)
* and
* The next node is waiting in shared mode,
* or we don't know, because it appears null
*
* The conservatism in both of these checks may cause
* unnecessary wake-ups, but only when there are multiple
* racing acquires/releases, so most need signals now or soon
* anyway.
*/
if (propagate > 0 || h == null || h.waitStatus < 0) {
Node s = node.next; Node s = node.next;
if (s == null || s.isShared()) if (s == null || s.isShared())
unparkSuccessor(node); doReleaseShared();
} }
} }
...@@ -465,23 +526,27 @@ public abstract class AbstractQueuedLongSynchronizer ...@@ -465,23 +526,27 @@ public abstract class AbstractQueuedLongSynchronizer
while (pred.waitStatus > 0) while (pred.waitStatus > 0)
node.prev = pred = pred.prev; node.prev = pred = pred.prev;
// Getting this before setting waitStatus ensures staleness // predNext is the apparent node to unsplice. CASes below will
// fail if not, in which case, we lost race vs another cancel
// or signal, so no further action is necessary.
Node predNext = pred.next; Node predNext = pred.next;
// Can use unconditional write instead of CAS here // Can use unconditional write instead of CAS here.
// After this atomic step, other Nodes can skip past us.
// Before, we are free of interference from other threads.
node.waitStatus = Node.CANCELLED; node.waitStatus = Node.CANCELLED;
// If we are the tail, remove ourselves // If we are the tail, remove ourselves.
if (node == tail && compareAndSetTail(node, pred)) { if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null); compareAndSetNext(pred, predNext, null);
} else { } else {
// If "active" predecessor found... // If successor needs signal, try to set pred's next-link
if (pred != head // so it will get one. Otherwise wake it up to propagate.
&& (pred.waitStatus == Node.SIGNAL int ws;
|| compareAndSetWaitStatus(pred, 0, Node.SIGNAL)) if (pred != head &&
&& pred.thread != null) { ((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
// If successor is active, set predecessor's next link pred.thread != null) {
Node next = node.next; Node next = node.next;
if (next != null && next.waitStatus <= 0) if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next); compareAndSetNext(pred, predNext, next);
...@@ -503,14 +568,14 @@ public abstract class AbstractQueuedLongSynchronizer ...@@ -503,14 +568,14 @@ public abstract class AbstractQueuedLongSynchronizer
* @return {@code true} if thread should block * @return {@code true} if thread should block
*/ */
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int s = pred.waitStatus; int ws = pred.waitStatus;
if (s < 0) if (ws == Node.SIGNAL)
/* /*
* This node has already set status asking a release * This node has already set status asking a release
* to signal it, so it can safely park. * to signal it, so it can safely park.
*/ */
return true; return true;
if (s > 0) { if (ws > 0) {
/* /*
* Predecessor was cancelled. Skip over predecessors and * Predecessor was cancelled. Skip over predecessors and
* indicate retry. * indicate retry.
...@@ -519,14 +584,14 @@ public abstract class AbstractQueuedLongSynchronizer ...@@ -519,14 +584,14 @@ public abstract class AbstractQueuedLongSynchronizer
node.prev = pred = pred.prev; node.prev = pred = pred.prev;
} while (pred.waitStatus > 0); } while (pred.waitStatus > 0);
pred.next = node; pred.next = node;
} } else {
else
/* /*
* Indicate that we need a signal, but don't park yet. Caller * waitStatus must be 0 or PROPAGATE. Indicate that we
* will need to retry to make sure it cannot acquire before * need a signal, but don't park yet. Caller will need to
* parking. * retry to make sure it cannot acquire before parking.
*/ */
compareAndSetWaitStatus(pred, 0, Node.SIGNAL); compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false; return false;
} }
...@@ -1046,9 +1111,7 @@ public abstract class AbstractQueuedLongSynchronizer ...@@ -1046,9 +1111,7 @@ public abstract class AbstractQueuedLongSynchronizer
*/ */
public final boolean releaseShared(long arg) { public final boolean releaseShared(long arg) {
if (tryReleaseShared(arg)) { if (tryReleaseShared(arg)) {
Node h = head; doReleaseShared();
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true; return true;
} }
return false; return false;
...@@ -1390,8 +1453,8 @@ public abstract class AbstractQueuedLongSynchronizer ...@@ -1390,8 +1453,8 @@ public abstract class AbstractQueuedLongSynchronizer
* case the waitStatus can be transiently and harmlessly wrong). * case the waitStatus can be transiently and harmlessly wrong).
*/ */
Node p = enq(node); Node p = enq(node);
int c = p.waitStatus; int ws = p.waitStatus;
if (c > 0 || !compareAndSetWaitStatus(p, c, Node.SIGNAL)) if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread); LockSupport.unpark(node.thread);
return true; return true;
} }
......
...@@ -389,6 +389,11 @@ public abstract class AbstractQueuedSynchronizer ...@@ -389,6 +389,11 @@ public abstract class AbstractQueuedSynchronizer
static final int SIGNAL = -1; static final int SIGNAL = -1;
/** waitStatus value to indicate thread is waiting on condition */ /** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2; static final int CONDITION = -2;
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
*/
static final int PROPAGATE = -3;
/** /**
* Status field, taking on only the values: * Status field, taking on only the values:
...@@ -403,10 +408,16 @@ public abstract class AbstractQueuedSynchronizer ...@@ -403,10 +408,16 @@ public abstract class AbstractQueuedSynchronizer
* Nodes never leave this state. In particular, * Nodes never leave this state. In particular,
* a thread with cancelled node never again blocks. * a thread with cancelled node never again blocks.
* CONDITION: This node is currently on a condition queue. * CONDITION: This node is currently on a condition queue.
* It will not be used as a sync queue node until * It will not be used as a sync queue node
* transferred. (Use of this value here * until transferred, at which time the status
* has nothing to do with the other uses * will be set to 0. (Use of this value here has
* of the field, but simplifies mechanics.) * nothing to do with the other uses of the
* field, but simplifies mechanics.)
* PROPAGATE: A releaseShared should be propagated to other
* nodes. This is set (for head node only) in
* doReleaseShared to ensure propagation
* continues, even if other operations have
* since intervened.
* 0: None of the above * 0: None of the above
* *
* The values are arranged numerically to simplify use. * The values are arranged numerically to simplify use.
...@@ -626,10 +637,13 @@ public abstract class AbstractQueuedSynchronizer ...@@ -626,10 +637,13 @@ public abstract class AbstractQueuedSynchronizer
*/ */
private void unparkSuccessor(Node node) { private void unparkSuccessor(Node node) {
/* /*
* Try to clear status in anticipation of signalling. It is * If status is negative (i.e., possibly needing signal) try
* OK if this fails or if status is changed by waiting thread. * to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/ */
compareAndSetWaitStatus(node, Node.SIGNAL, 0); int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/* /*
* Thread to unpark is held in successor, which is normally * Thread to unpark is held in successor, which is normally
...@@ -648,24 +662,71 @@ public abstract class AbstractQueuedSynchronizer ...@@ -648,24 +662,71 @@ public abstract class AbstractQueuedSynchronizer
LockSupport.unpark(s.thread); LockSupport.unpark(s.thread);
} }
/**
* Release action for shared mode -- signal successor and ensure
* propagation. (Note: For exclusive mode, release just amounts
* to calling unparkSuccessor of head if it needs signal.)
*/
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
/** /**
* Sets head of queue, and checks if successor may be waiting * Sets head of queue, and checks if successor may be waiting
* in shared mode, if so propagating if propagate > 0. * in shared mode, if so propagating if either propagate > 0 or
* PROPAGATE status was set.
* *
* @param pred the node holding waitStatus for node
* @param node the node * @param node the node
* @param propagate the return value from a tryAcquireShared * @param propagate the return value from a tryAcquireShared
*/ */
private void setHeadAndPropagate(Node node, int propagate) { private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node); setHead(node);
if (propagate > 0 && node.waitStatus != 0) {
/* /*
* Don't bother fully figuring out successor. If it * Try to signal next queued node if:
* looks null, call unparkSuccessor anyway to be safe. * Propagation was indicated by caller,
*/ * or was recorded (as h.waitStatus) by a previous operation
* (note: this uses sign-check of waitStatus because
* PROPAGATE status may transition to SIGNAL.)
* and
* The next node is waiting in shared mode,
* or we don't know, because it appears null
*
* The conservatism in both of these checks may cause
* unnecessary wake-ups, but only when there are multiple
* racing acquires/releases, so most need signals now or soon
* anyway.
*/
if (propagate > 0 || h == null || h.waitStatus < 0) {
Node s = node.next; Node s = node.next;
if (s == null || s.isShared()) if (s == null || s.isShared())
unparkSuccessor(node); doReleaseShared();
} }
} }
...@@ -688,23 +749,27 @@ public abstract class AbstractQueuedSynchronizer ...@@ -688,23 +749,27 @@ public abstract class AbstractQueuedSynchronizer
while (pred.waitStatus > 0) while (pred.waitStatus > 0)
node.prev = pred = pred.prev; node.prev = pred = pred.prev;
// Getting this before setting waitStatus ensures staleness // predNext is the apparent node to unsplice. CASes below will
// fail if not, in which case, we lost race vs another cancel
// or signal, so no further action is necessary.
Node predNext = pred.next; Node predNext = pred.next;
// Can use unconditional write instead of CAS here // Can use unconditional write instead of CAS here.
// After this atomic step, other Nodes can skip past us.
// Before, we are free of interference from other threads.
node.waitStatus = Node.CANCELLED; node.waitStatus = Node.CANCELLED;
// If we are the tail, remove ourselves // If we are the tail, remove ourselves.
if (node == tail && compareAndSetTail(node, pred)) { if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null); compareAndSetNext(pred, predNext, null);
} else { } else {
// If "active" predecessor found... // If successor needs signal, try to set pred's next-link
if (pred != head // so it will get one. Otherwise wake it up to propagate.
&& (pred.waitStatus == Node.SIGNAL int ws;
|| compareAndSetWaitStatus(pred, 0, Node.SIGNAL)) if (pred != head &&
&& pred.thread != null) { ((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
// If successor is active, set predecessor's next link pred.thread != null) {
Node next = node.next; Node next = node.next;
if (next != null && next.waitStatus <= 0) if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next); compareAndSetNext(pred, predNext, next);
...@@ -726,14 +791,14 @@ public abstract class AbstractQueuedSynchronizer ...@@ -726,14 +791,14 @@ public abstract class AbstractQueuedSynchronizer
* @return {@code true} if thread should block * @return {@code true} if thread should block
*/ */
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int s = pred.waitStatus; int ws = pred.waitStatus;
if (s < 0) if (ws == Node.SIGNAL)
/* /*
* This node has already set status asking a release * This node has already set status asking a release
* to signal it, so it can safely park. * to signal it, so it can safely park.
*/ */
return true; return true;
if (s > 0) { if (ws > 0) {
/* /*
* Predecessor was cancelled. Skip over predecessors and * Predecessor was cancelled. Skip over predecessors and
* indicate retry. * indicate retry.
...@@ -742,14 +807,14 @@ public abstract class AbstractQueuedSynchronizer ...@@ -742,14 +807,14 @@ public abstract class AbstractQueuedSynchronizer
node.prev = pred = pred.prev; node.prev = pred = pred.prev;
} while (pred.waitStatus > 0); } while (pred.waitStatus > 0);
pred.next = node; pred.next = node;
} } else {
else
/* /*
* Indicate that we need a signal, but don't park yet. Caller * waitStatus must be 0 or PROPAGATE. Indicate that we
* will need to retry to make sure it cannot acquire before * need a signal, but don't park yet. Caller will need to
* parking. * retry to make sure it cannot acquire before parking.
*/ */
compareAndSetWaitStatus(pred, 0, Node.SIGNAL); compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false; return false;
} }
...@@ -1269,9 +1334,7 @@ public abstract class AbstractQueuedSynchronizer ...@@ -1269,9 +1334,7 @@ public abstract class AbstractQueuedSynchronizer
*/ */
public final boolean releaseShared(int arg) { public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) { if (tryReleaseShared(arg)) {
Node h = head; doReleaseShared();
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true; return true;
} }
return false; return false;
...@@ -1613,8 +1676,8 @@ public abstract class AbstractQueuedSynchronizer ...@@ -1613,8 +1676,8 @@ public abstract class AbstractQueuedSynchronizer
* case the waitStatus can be transiently and harmlessly wrong). * case the waitStatus can be transiently and harmlessly wrong).
*/ */
Node p = enq(node); Node p = enq(node);
int c = p.waitStatus; int ws = p.waitStatus;
if (c > 0 || !compareAndSetWaitStatus(p, c, Node.SIGNAL)) if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread); LockSupport.unpark(node.thread);
return true; return true;
} }
......
/*
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
* CA 95054 USA or visit www.sun.com if you need additional information or
* have any questions.
*/
/*
* This file is available under and governed by the GNU General Public
* License version 2 only, as published by the Free Software Foundation.
* However, the following notice accompanied the original version of this
* file:
*
* Written by Doug Lea with assistance from members of JCP JSR-166
* Expert Group and released to the public domain, as explained at
* http://creativecommons.org/licenses/publicdomain
*/
/*
* @test
* @bug 6801020 6803402
* @summary Try to tickle race conditions in
* AbstractQueuedSynchronizer "shared" code
*/
import java.util.concurrent.Semaphore;
public class RacingReleases {
/** Increase this for better chance of tickling races */
static final int iterations = 1000;
public static void test(final boolean fair,
final boolean interruptibly)
throws Throwable {
for (int i = 0; i < iterations; i++) {
final Semaphore sem = new Semaphore(0, fair);
final Throwable[] badness = new Throwable[1];
Runnable blocker = interruptibly ?
new Runnable() {
public void run() {
try {
sem.acquire();
} catch (Throwable t) {
badness[0] = t;
throw new Error(t);
}}}
:
new Runnable() {
public void run() {
try {
sem.acquireUninterruptibly();
} catch (Throwable t) {
badness[0] = t;
throw new Error(t);
}}};
Thread b1 = new Thread(blocker);
Thread b2 = new Thread(blocker);
Runnable signaller = new Runnable() {
public void run() {
try {
sem.release();
} catch (Throwable t) {
badness[0] = t;
throw new Error(t);
}}};
Thread s1 = new Thread(signaller);
Thread s2 = new Thread(signaller);
Thread[] threads = { b1, b2, s1, s2 };
java.util.Collections.shuffle(java.util.Arrays.asList(threads));
for (Thread thread : threads)
thread.start();
for (Thread thread : threads) {
thread.join(60 * 1000);
if (thread.isAlive())
throw new Error
(String.format
("Semaphore stuck: permits %d, thread waiting %s%n",
sem.availablePermits(),
sem.hasQueuedThreads() ? "true" : "false"));
}
if (badness[0] != null)
throw new Error(badness[0]);
if (sem.availablePermits() != 0)
throw new Error(String.valueOf(sem.availablePermits()));
if (sem.hasQueuedThreads())
throw new Error(String.valueOf(sem.hasQueuedThreads()));
if (sem.getQueueLength() != 0)
throw new Error(String.valueOf(sem.getQueueLength()));
if (sem.isFair() != fair)
throw new Error(String.valueOf(sem.isFair()));
}
}
public static void main(String[] args) throws Throwable {
for (boolean fair : new boolean[] { true, false })
for (boolean interruptibly : new boolean[] { true, false })
test(fair, interruptibly);
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册