提交 6f343dc7 编写于 作者: S Stephen Connolly

[FIXED JENKINS-28840] Deadlock between Queue.maintain and Executor.interrupt

More fun here:

- All this originates from Executor extending Thread.
- There is funky logic in the lock handling code of the JVM that makes assumptions
  about how it might proceed with the lock when the thread holding the lock has its
  interrupt flag set.
- Really it would be better if Executor did not extend Thread as that way we wouldn't
  have to deal with some of that complexity. But OTOH we are where we are and backwards
  compatibility may make such a change not possible without a lot of breakage.
- Fixing the issue at hand, firstly requires that interrupting a Computer happens with the
  Queue lock held (to speed up tests we have Jenkins.cleanup get the lock for all Computers)
  That prevents the Queue maintain thread from getting caught
- Secondly, when removing an executor from a computer we process the removal while
  holding the Queue lock, but we move the removal itself to a separate thread if we cannot
  get the Queue lock in order to avoid deadlock.
- Also add helper methods to wrap tasks to be performed while holding the lock
  and a helper method for Runnables that exposes the tryLock functionality
上级 7e1a9215
......@@ -990,14 +990,13 @@ public /*transient*/ abstract class Computer extends Actionable implements Acces
* Called by {@link Executor} to kill excessive executors from this computer.
*/
/*package*/ void removeExecutor(final Executor e) {
Queue.withLock(new Runnable() {
final Runnable task = new Runnable() {
@Override
public void run() {
synchronized (Computer.this) {
executors.remove(e);
addNewExecutorIfNecessary();
if (!isAlive()) // TODO except from interrupt/doYank this is called while the executor still isActive(), so how could !this.isAlive()?
{
if (!isAlive()) {
AbstractCIBase ciBase = Jenkins.getInstance();
if (ciBase != null) {
ciBase.removeComputer(Computer.this);
......@@ -1005,7 +1004,11 @@ public /*transient*/ abstract class Computer extends Actionable implements Acces
}
}
}
});
};
if (!Queue.tryWithLock(task)) {
// JENKINS-28840 if we couldn't get the lock push the operation to a separate thread to avoid deadlocks
threadPoolForRemoting.submit(Queue.wrapWithLock(task));
}
}
/**
......@@ -1028,9 +1031,14 @@ public /*transient*/ abstract class Computer extends Actionable implements Acces
* Called from {@link Jenkins#cleanUp}.
*/
public void interrupt() {
for (Executor e : executors) {
e.interruptForShutdown();
}
Queue.withLock(new Runnable() {
@Override
public void run() {
for (Executor e : executors) {
e.interruptForShutdown();
}
}
});
}
public String getSearchUrl() {
......
......@@ -99,7 +99,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
import java.util.logging.Logger;
......@@ -113,6 +112,7 @@ import jenkins.util.AtmostOneTaskExecutor;
import org.acegisecurity.AccessDeniedException;
import org.acegisecurity.Authentication;
import org.jenkinsci.bytecode.AdaptField;
import org.jenkinsci.remoting.RoleChecker;
import org.kohsuke.stapler.HttpResponse;
import org.kohsuke.stapler.HttpResponses;
import org.kohsuke.stapler.export.Exported;
......@@ -337,7 +337,7 @@ public class Queue extends ResourceController implements Saveable {
}
});
private transient final Lock lock = new ReentrantLock();
private transient final ReentrantLock lock = new ReentrantLock();
private transient final Condition condition = lock.newCondition();
......@@ -1219,6 +1219,59 @@ public class Queue extends ResourceController implements Saveable {
}
}
/**
* Some operations require to be performed with the {@link Queue} lock held. Use one of these methods rather
* than locking directly on Queue in order to allow for future refactoring.
* @param runnable the operation to perform.
* @return {@code true} if the lock available and the operation performed.
* @since 1.FIXME
*/
public static boolean tryWithLock(Runnable runnable) {
final Jenkins jenkins = Jenkins.getInstance();
final Queue queue = jenkins == null ? null : jenkins.getQueue();
if (queue == null) {
runnable.run();
return true;
} else {
return queue._tryWithLock(runnable);
}
}
/**
* Wraps a {@link Runnable} with the {@link Queue} lock held.
*
* @param runnable the operation to wrap.
* @since 1.FIXME
*/
public static Runnable wrapWithLock(Runnable runnable) {
final Jenkins jenkins = Jenkins.getInstance();
final Queue queue = jenkins == null ? null : jenkins.getQueue();
return queue == null ? runnable : new LockedRunnable(runnable);
}
/**
* Wraps a {@link hudson.remoting.Callable} with the {@link Queue} lock held.
*
* @param callable the operation to wrap.
* @since 1.FIXME
*/
public static <V, T extends Throwable> hudson.remoting.Callable<V, T> wrapWithLock(hudson.remoting.Callable<V, T> callable) {
final Jenkins jenkins = Jenkins.getInstance();
final Queue queue = jenkins == null ? null : jenkins.getQueue();
return queue == null ? callable : new LockedHRCallable<>(callable);
}
/**
* Wraps a {@link java.util.concurrent.Callable} with the {@link Queue} lock held.
*
* @param callable the operation to wrap.
* @since 1.FIXME
*/
public static <V> java.util.concurrent.Callable<V> wrapWithLock(java.util.concurrent.Callable<V> callable) {
final Jenkins jenkins = Jenkins.getInstance();
final Queue queue = jenkins == null ? null : jenkins.getQueue();
return queue == null ? callable : new LockedJUCCallable<V>(callable);
}
@Override
protected void _await() throws InterruptedException {
condition.await();
......@@ -1244,6 +1297,26 @@ public class Queue extends ResourceController implements Saveable {
}
}
/**
* Some operations require to be performed with the {@link Queue} lock held. Use one of these methods rather
* than locking directly on Queue in order to allow for future refactoring.
* @param runnable the operation to perform.
* @return {@code true} if the lock available and the operation performed.
* @since 1.FIXME
*/
protected boolean _tryWithLock(Runnable runnable) {
if (lock.tryLock()) {
try {
runnable.run();
} finally {
lock.unlock();
}
return true;
} else {
return false;
}
}
/**
* Some operations require to be performed with the {@link Queue} lock held. Use one of these methods rather
* than locking directly on Queue in order to allow for future refactoring.
......@@ -1308,6 +1381,13 @@ public class Queue extends ResourceController implements Saveable {
List<BuildableItem> lostPendings = new ArrayList<BuildableItem>(pendings);
for (Computer c : Jenkins.getInstance().getComputers()) {
for (Executor e : c.getExecutors()) {
if (e.isInterrupted()) {
// JENKINS-28840 we will deadlock if we try to touch this executor while interrupt flag set
// we need to clear lost pendings as we cannot know what work unit was on this executor
// while it is interrupted. (All this dancing is a result of Executor extending Thread)
lostPendings.clear(); // we'll get them next time around when the flag is cleared.
continue;
}
if (e.isParking()) {
parked.put(e, new JobOffer(e));
}
......@@ -2571,6 +2651,51 @@ public class Queue extends ResourceController implements Saveable {
this.pendings = new ArrayList<BuildableItem>(pendings);
}
}
private static class LockedRunnable implements Runnable {
private final Runnable delegate;
private LockedRunnable(Runnable delegate) {
this.delegate = delegate;
}
@Override
public void run() {
withLock(delegate);
}
}
private static class LockedJUCCallable<V> implements java.util.concurrent.Callable<V> {
private final java.util.concurrent.Callable<V> delegate;
private LockedJUCCallable(java.util.concurrent.Callable<V> delegate) {
this.delegate = delegate;
}
@Override
public V call() throws Exception {
return withLock(delegate);
}
}
private static class LockedHRCallable<V,T extends Throwable> implements hudson.remoting.Callable<V,T> {
private static final long serialVersionUID = 1L;
private final hudson.remoting.Callable<V,T> delegate;
private LockedHRCallable(hudson.remoting.Callable<V,T> delegate) {
this.delegate = delegate;
}
@Override
public V call() throws T {
return withLock(delegate);
}
@Override
public void checkRoles(RoleChecker checker) throws SecurityException {
delegate.checkRoles(checker);
}
}
@CLIResolver
public static Queue getInstance() {
......
......@@ -2778,13 +2778,19 @@ public class Jenkins extends AbstractCIBase implements DirectlyModifiableTopLeve
LOGGER.log(SEVERE, "Failed to execute termination",e);
}
Set<Future<?>> pending = new HashSet<Future<?>>();
final Set<Future<?>> pending = new HashSet<Future<?>>();
terminating = true;
for( Computer c : computers.values() ) {
c.interrupt();
killComputer(c);
pending.add(c.disconnect(null));
}
// JENKINS-28840 we know we will be interrupting all the Computers so get the Queue lock once for all
Queue.withLock(new Runnable() {
@Override
public void run() {
for( Computer c : computers.values() ) {
c.interrupt();
killComputer(c);
pending.add(c.disconnect(null));
}
}
});
if(udpBroadcastThread!=null)
udpBroadcastThread.shutdown();
if(dnsMultiCast!=null)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册