提交 8718ca6e 编写于 作者: M martin

6609775: Reduce context switches in DelayQueue due to signalAll

Reviewed-by: alanb
Contributed-by: NDoug Lea <dl@cs.oswego.edu>
上级 d9898853
......@@ -69,9 +69,33 @@ public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
implements BlockingQueue<E> {
private transient final ReentrantLock lock = new ReentrantLock();
private transient final Condition available = lock.newCondition();
private final PriorityQueue<E> q = new PriorityQueue<E>();
/**
* Thread designated to wait for the element at the head of
* the queue. This variant of the Leader-Follower pattern
* (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
* minimize unnecessary timed waiting. When a thread becomes
* the leader, it waits only for the next delay to elapse, but
* other threads await indefinitely. The leader thread must
* signal some other thread before returning from take() or
* poll(...), unless some other thread becomes leader in the
* interim. Whenever the head of the queue is replaced with
* an element with an earlier expiration time, the leader
* field is invalidated by being reset to null, and some
* waiting thread, but not necessarily the current leader, is
* signalled. So waiting threads must be prepared to acquire
* and lose leadership while waiting.
*/
private Thread leader = null;
/**
* Condition signalled when a newer element becomes available
* at the head of the queue or a new thread may need to
* become leader.
*/
private final Condition available = lock.newCondition();
/**
* Creates a new <tt>DelayQueue</tt> that is initially empty.
*/
......@@ -111,10 +135,11 @@ public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
final ReentrantLock lock = this.lock;
lock.lock();
try {
E first = q.peek();
q.offer(e);
if (first == null || e.compareTo(first) < 0)
available.signalAll();
if (q.peek() == e) {
leader = null;
available.signal();
}
return true;
} finally {
lock.unlock();
......@@ -160,13 +185,8 @@ public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
E first = q.peek();
if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
return null;
else {
E x = q.poll();
assert x != null;
if (q.size() != 0)
available.signalAll();
return x;
}
else
return q.poll();
} finally {
lock.unlock();
}
......@@ -185,23 +205,29 @@ public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
try {
for (;;) {
E first = q.peek();
if (first == null) {
if (first == null)
available.await();
} else {
long delay = first.getDelay(TimeUnit.NANOSECONDS);
if (delay > 0) {
long tl = available.awaitNanos(delay);
} else {
E x = q.poll();
assert x != null;
if (q.size() != 0)
available.signalAll(); // wake up other takers
return x;
else {
long delay = first.getDelay(TimeUnit.NANOSECONDS);
if (delay <= 0)
return q.poll();
else if (leader != null)
available.await();
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && q.peek() != null)
available.signal();
lock.unlock();
}
}
......@@ -230,23 +256,28 @@ public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
nanos = available.awaitNanos(nanos);
} else {
long delay = first.getDelay(TimeUnit.NANOSECONDS);
if (delay > 0) {
if (nanos <= 0)
return null;
if (delay > nanos)
delay = nanos;
long timeLeft = available.awaitNanos(delay);
nanos -= delay - timeLeft;
} else {
E x = q.poll();
assert x != null;
if (q.size() != 0)
available.signalAll();
return x;
if (delay <= 0)
return q.poll();
if (nanos <= 0)
return null;
if (nanos < delay || leader != null)
nanos = available.awaitNanos(nanos);
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
long timeLeft = available.awaitNanos(delay);
nanos -= delay - timeLeft;
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && q.peek() != null)
available.signal();
lock.unlock();
}
}
......@@ -303,8 +334,6 @@ public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
c.add(q.poll());
++n;
}
if (n > 0)
available.signalAll();
return n;
} finally {
lock.unlock();
......@@ -335,8 +364,6 @@ public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
c.add(q.poll());
++n;
}
if (n > 0)
available.signalAll();
return n;
} finally {
lock.unlock();
......@@ -485,6 +512,7 @@ public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
return cursor < array.length;
}
@SuppressWarnings("unchecked")
public E next() {
if (cursor >= array.length)
throw new NoSuchElementException();
......
/*
* Copyright 2008 Sun Microsystems, Inc. All Rights Reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
* CA 95054 USA or visit www.sun.com if you need additional information or
* have any questions.
*/
import java.util.concurrent.*;
import static java.util.concurrent.TimeUnit.*;
/**
* This is not a regression test, but a stress benchmark test for
* 6609775: Reduce context switches in DelayQueue due to signalAll
*
* This runs in the same wall clock time, but much reduced cpu time,
* with the changes for 6609775.
*/
public class Stress {
public static void main(String[] args) throws Throwable {
final DelayQueue<Delayed> q = new DelayQueue<Delayed>();
final long t0 = System.nanoTime();
for (long i = 0; i < 1000; i++) {
final long expiry = t0 + i*10L*1000L*1000L;
q.add(new Delayed() {
public long getDelay(TimeUnit unit) {
return unit.convert(expiry - System.nanoTime(),
NANOSECONDS);
}
public int compareTo(Delayed x) {
long d = getDelay(NANOSECONDS)
- x.getDelay(NANOSECONDS);
return d < 0 ? -1 : d > 0 ? 1 : 0; }});
}
for (int i = 0; i < 300; i++)
new Thread() { public void run() {
try {
while (!q.isEmpty())
q.poll(10L, TimeUnit.SECONDS);
} catch (Throwable t) { t.printStackTrace(); }
}}.start();
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册