提交 55be90e0 编写于 作者: D dholmes

6776941: Improve thread pool shutdown

Reviewed-by: dl, skoivu
上级 f20adc24
...@@ -34,8 +34,10 @@ ...@@ -34,8 +34,10 @@
*/ */
package java.util.concurrent; package java.util.concurrent;
import java.util.concurrent.locks.*; import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.atomic.*; import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.*; import java.util.*;
/** /**
...@@ -491,10 +493,15 @@ public class ThreadPoolExecutor extends AbstractExecutorService { ...@@ -491,10 +493,15 @@ public class ThreadPoolExecutor extends AbstractExecutorService {
* policy limiting the number of threads. Even though it is not * policy limiting the number of threads. Even though it is not
* treated as an error, failure to create threads may result in * treated as an error, failure to create threads may result in
* new tasks being rejected or existing ones remaining stuck in * new tasks being rejected or existing ones remaining stuck in
* the queue. On the other hand, no special precautions exist to * the queue.
* handle OutOfMemoryErrors that might be thrown while trying to *
* create threads, since there is generally no recourse from * We go further and preserve pool invariants even in the face of
* within this class. * errors such as OutOfMemoryError, that might be thrown while
* trying to create threads. Such errors are rather common due to
* the need to allocate a native stack in Thread#start, and users
* will want to perform clean pool shutdown to clean up. There
* will likely be enough memory available for the cleanup code to
* complete without encountering yet another OutOfMemoryError.
*/ */
private volatile ThreadFactory threadFactory; private volatile ThreadFactory threadFactory;
...@@ -568,9 +575,13 @@ public class ThreadPoolExecutor extends AbstractExecutorService { ...@@ -568,9 +575,13 @@ public class ThreadPoolExecutor extends AbstractExecutorService {
* task execution. This protects against interrupts that are * task execution. This protects against interrupts that are
* intended to wake up a worker thread waiting for a task from * intended to wake up a worker thread waiting for a task from
* instead interrupting a task being run. We implement a simple * instead interrupting a task being run. We implement a simple
* non-reentrant mutual exclusion lock rather than use ReentrantLock * non-reentrant mutual exclusion lock rather than use
* because we do not want worker tasks to be able to reacquire the * ReentrantLock because we do not want worker tasks to be able to
* lock when they invoke pool control methods like setCorePoolSize. * reacquire the lock when they invoke pool control methods like
* setCorePoolSize. Additionally, to suppress interrupts until
* the thread actually starts running tasks, we initialize lock
* state to a negative value, and clear it upon start (in
* runWorker).
*/ */
private final class Worker private final class Worker
extends AbstractQueuedSynchronizer extends AbstractQueuedSynchronizer
...@@ -594,6 +605,7 @@ public class ThreadPoolExecutor extends AbstractExecutorService { ...@@ -594,6 +605,7 @@ public class ThreadPoolExecutor extends AbstractExecutorService {
* @param firstTask the first task (null if none) * @param firstTask the first task (null if none)
*/ */
Worker(Runnable firstTask) { Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask; this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this); this.thread = getThreadFactory().newThread(this);
} }
...@@ -609,7 +621,7 @@ public class ThreadPoolExecutor extends AbstractExecutorService { ...@@ -609,7 +621,7 @@ public class ThreadPoolExecutor extends AbstractExecutorService {
// The value 1 represents the locked state. // The value 1 represents the locked state.
protected boolean isHeldExclusively() { protected boolean isHeldExclusively() {
return getState() == 1; return getState() != 0;
} }
protected boolean tryAcquire(int unused) { protected boolean tryAcquire(int unused) {
...@@ -630,6 +642,16 @@ public class ThreadPoolExecutor extends AbstractExecutorService { ...@@ -630,6 +642,16 @@ public class ThreadPoolExecutor extends AbstractExecutorService {
public boolean tryLock() { return tryAcquire(1); } public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); } public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); } public boolean isLocked() { return isHeldExclusively(); }
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
} }
/* /*
...@@ -728,12 +750,8 @@ public class ThreadPoolExecutor extends AbstractExecutorService { ...@@ -728,12 +750,8 @@ public class ThreadPoolExecutor extends AbstractExecutorService {
final ReentrantLock mainLock = this.mainLock; final ReentrantLock mainLock = this.mainLock;
mainLock.lock(); mainLock.lock();
try { try {
for (Worker w : workers) { for (Worker w : workers)
try { w.interruptIfStarted();
w.thread.interrupt();
} catch (SecurityException ignore) {
}
}
} finally { } finally {
mainLock.unlock(); mainLock.unlock();
} }
...@@ -790,19 +808,6 @@ public class ThreadPoolExecutor extends AbstractExecutorService { ...@@ -790,19 +808,6 @@ public class ThreadPoolExecutor extends AbstractExecutorService {
private static final boolean ONLY_ONE = true; private static final boolean ONLY_ONE = true;
/**
* Ensures that unless the pool is stopping, the current thread
* does not have its interrupt set. This requires a double-check
* of state in case the interrupt was cleared concurrently with a
* shutdownNow -- if so, the interrupt is re-enabled.
*/
private void clearInterruptsForTaskRun() {
if (runStateLessThan(ctl.get(), STOP) &&
Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))
Thread.currentThread().interrupt();
}
/* /*
* Misc utilities, most of which are also exported to * Misc utilities, most of which are also exported to
* ScheduledThreadPoolExecutor * ScheduledThreadPoolExecutor
...@@ -862,12 +867,13 @@ public class ThreadPoolExecutor extends AbstractExecutorService { ...@@ -862,12 +867,13 @@ public class ThreadPoolExecutor extends AbstractExecutorService {
* Checks if a new worker can be added with respect to current * Checks if a new worker can be added with respect to current
* pool state and the given bound (either core or maximum). If so, * pool state and the given bound (either core or maximum). If so,
* the worker count is adjusted accordingly, and, if possible, a * the worker count is adjusted accordingly, and, if possible, a
* new worker is created and started running firstTask as its * new worker is created and started, running firstTask as its
* first task. This method returns false if the pool is stopped or * first task. This method returns false if the pool is stopped or
* eligible to shut down. It also returns false if the thread * eligible to shut down. It also returns false if the thread
* factory fails to create a thread when asked, which requires a * factory fails to create a thread when asked. If the thread
* backout of workerCount, and a recheck for termination, in case * creation fails, either due to the thread factory returning
* the existence of this worker was holding up termination. * null, or due to an exception (typically OutOfMemoryError in
* Thread#start), we roll back cleanly.
* *
* @param firstTask the task the new thread should run first (or * @param firstTask the task the new thread should run first (or
* null if none). Workers are created with an initial first task * null if none). Workers are created with an initial first task
...@@ -910,10 +916,14 @@ public class ThreadPoolExecutor extends AbstractExecutorService { ...@@ -910,10 +916,14 @@ public class ThreadPoolExecutor extends AbstractExecutorService {
} }
} }
Worker w = new Worker(firstTask); boolean workerStarted = false;
Thread t = w.thread; boolean workerAdded = false;
Worker w = null;
try {
final ReentrantLock mainLock = this.mainLock; final ReentrantLock mainLock = this.mainLock;
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
mainLock.lock(); mainLock.lock();
try { try {
// Recheck while holding lock. // Recheck while holding lock.
...@@ -922,34 +932,49 @@ public class ThreadPoolExecutor extends AbstractExecutorService { ...@@ -922,34 +932,49 @@ public class ThreadPoolExecutor extends AbstractExecutorService {
int c = ctl.get(); int c = ctl.get();
int rs = runStateOf(c); int rs = runStateOf(c);
if (t == null || if (rs < SHUTDOWN ||
(rs >= SHUTDOWN && (rs == SHUTDOWN && firstTask == null)) {
! (rs == SHUTDOWN && if (t.isAlive()) // precheck that t is startable
firstTask == null))) { throw new IllegalThreadStateException();
decrementWorkerCount();
tryTerminate();
return false;
}
workers.add(w); workers.add(w);
int s = workers.size(); int s = workers.size();
if (s > largestPoolSize) if (s > largestPoolSize)
largestPoolSize = s; largestPoolSize = s;
workerAdded = true;
}
} finally { } finally {
mainLock.unlock(); mainLock.unlock();
} }
if (workerAdded) {
t.start(); t.start();
// It is possible (but unlikely) for a thread to have been workerStarted = true;
// added to workers, but not yet started, during transition to }
// STOP, which could result in a rare missed interrupt, }
// because Thread.interrupt is not guaranteed to have any effect } finally {
// on a non-yet-started Thread (see Thread#interrupt). if (! workerStarted)
if (runStateOf(ctl.get()) == STOP && ! t.isInterrupted()) addWorkerFailed(w);
t.interrupt(); }
return workerStarted;
}
return true; /**
* Rolls back the worker thread creation.
* - removes worker from workers, if present
* - decrements worker count
* - rechecks for termination, in case the existence of this
* worker was holding up termination
*/
private void addWorkerFailed(Worker w) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (w != null)
workers.remove(w);
decrementWorkerCount();
tryTerminate();
} finally {
mainLock.unlock();
}
} }
/** /**
...@@ -1096,15 +1121,25 @@ public class ThreadPoolExecutor extends AbstractExecutorService { ...@@ -1096,15 +1121,25 @@ public class ThreadPoolExecutor extends AbstractExecutorService {
* @param w the worker * @param w the worker
*/ */
final void runWorker(Worker w) { final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask; Runnable task = w.firstTask;
w.firstTask = null; w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true; boolean completedAbruptly = true;
try { try {
while (task != null || (task = getTask()) != null) { while (task != null || (task = getTask()) != null) {
w.lock(); w.lock();
clearInterruptsForTaskRun(); // If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try { try {
beforeExecute(w.thread, task); beforeExecute(wt, task);
Throwable thrown = null; Throwable thrown = null;
try { try {
task.run(); task.run();
...@@ -2064,3 +2099,4 @@ public class ThreadPoolExecutor extends AbstractExecutorService { ...@@ -2064,3 +2099,4 @@ public class ThreadPoolExecutor extends AbstractExecutorService {
} }
} }
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册