提交 46a900f4 编写于 作者: J Jesse Glick

[JENKINS-25938] Introduced AsynchronousExecutable.

上级 a88fb9d4
......@@ -30,8 +30,6 @@ import hudson.EnvVars;
import hudson.FilePath;
import hudson.Functions;
import hudson.Launcher;
import hudson.console.AnnotatedLargeText;
import hudson.console.ExpandableDetailsNote;
import hudson.console.ModelHyperlinkNote;
import hudson.model.Fingerprint.BuildPtr;
import hudson.model.Fingerprint.RangeSet;
......@@ -70,7 +68,6 @@ import javax.servlet.ServletException;
import java.io.File;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.io.StringWriter;
import java.lang.ref.WeakReference;
import java.util.AbstractSet;
import java.util.ArrayList;
......@@ -93,6 +90,7 @@ import static java.util.logging.Level.WARNING;
import jenkins.model.lazy.BuildReference;
import jenkins.model.lazy.LazyBuildMixIn;
import jenkins.model.queue.Executable2;
import org.kohsuke.accmod.Restricted;
import org.kohsuke.accmod.restrictions.DoNotUse;
......@@ -104,7 +102,7 @@ import org.kohsuke.accmod.restrictions.DoNotUse;
* @author Kohsuke Kawaguchi
* @see AbstractProject
*/
public abstract class AbstractBuild<P extends AbstractProject<P,R>,R extends AbstractBuild<P,R>> extends Run<P,R> implements Queue.Executable, LazyBuildMixIn.LazyLoadingRun<P,R> {
public abstract class AbstractBuild<P extends AbstractProject<P,R>,R extends AbstractBuild<P,R>> extends Run<P,R> implements Executable2, LazyBuildMixIn.LazyLoadingRun<P,R> {
/**
* Set if we want the blame information to flow from upstream to downstream build.
......
......@@ -870,7 +870,7 @@ public /*transient*/ abstract class Computer extends Actionable implements Acces
/*package*/ synchronized void removeExecutor(Executor e) {
executors.remove(e);
addNewExecutorIfNecessary();
if(!isAlive())
if(!isAlive()) // TODO except from interrupt/doYank this is called while the executor still isActive(), so how could !this.isAlive()?
{
AbstractCIBase ciBase = Jenkins.getInstance();
ciBase.removeComputer(this);
......@@ -878,7 +878,7 @@ public /*transient*/ abstract class Computer extends Actionable implements Acces
}
/**
* Returns true if any of the executors are functioning.
* Returns true if any of the executors are {@linkplain Executor#isActive active}.
*
* Note that if an executor dies, we'll leave it in {@link #executors} until
* the administrator yanks it out, so that we can see why it died.
......
......@@ -60,13 +60,15 @@ import static hudson.model.queue.Executables.*;
import static java.util.logging.Level.*;
import javax.annotation.CheckForNull;
import javax.annotation.Nonnull;
import jenkins.model.queue.Executable2.AsynchronousExecution;
import org.kohsuke.accmod.Restricted;
import org.kohsuke.accmod.restrictions.NoExternalUse;
/**
* Thread that executes builds.
* Since 1.536, {@link Executor}s start threads on-demand.
* The entire logic should use {@link #isActive()} instead of {@link #isAlive()}
* in order to check if the {@link Executor} it ready to take tasks.
* <p>Callers should use {@link #isActive()} instead of {@link #isAlive()}.
* @author Kohsuke Kawaguchi
*/
@ExportedBean
......@@ -88,6 +90,7 @@ public class Executor extends Thread implements ModelObject {
* {@link hudson.model.Queue.Executable} being executed right now, or null if the executor is idle.
*/
private volatile Queue.Executable executable;
private AsynchronousExecution asynchronousExecution;
/**
* When {@link Queue} allocates a work for this executor, this field is set
......@@ -163,7 +166,11 @@ public class Executor extends Thread implements ModelObject {
this.causes.add(c);
}
}
super.interrupt();
if (asynchronousExecution != null) {
asynchronousExecution.interrupt();
} else {
super.interrupt();
}
}
public Result abortResult() {
......@@ -238,23 +245,14 @@ public class Executor extends Thread implements ModelObject {
if (LOGGER.isLoggable(FINE))
LOGGER.log(FINE, getName()+" is now executing "+executable);
queue.execute(executable, task);
} catch (AsynchronousExecution x) {
x.setExecutor(this);
this.asynchronousExecution = x;
} catch (Throwable e) {
// for some reason the executor died. this is really
// a bug in the code, but we don't want the executor to die,
// so just leave some info and go on to build other things
LOGGER.log(Level.SEVERE, "Executor threw an exception", e);
workUnit.context.abort(e);
problems = e;
} finally {
long time = System.currentTimeMillis()-startTime;
if (LOGGER.isLoggable(FINE))
LOGGER.log(FINE, getName()+" completed "+executable+" in "+time+"ms");
try {
workUnit.context.synchronizeEnd(executable,problems,time);
} catch (InterruptedException e) {
workUnit.context.abort(e);
} finally {
workUnit.setExecutor(null);
if (asynchronousExecution == null) {
finish1(problems);
}
}
} catch (InterruptedException e) {
......@@ -267,12 +265,49 @@ public class Executor extends Thread implements ModelObject {
causeOfDeath = e;
LOGGER.log(SEVERE, "Unexpected executor death", e);
} finally {
if (causeOfDeath==null)
// let this thread die and be replaced by a fresh unstarted instance
owner.removeExecutor(this);
if (asynchronousExecution == null) {
finish2();
}
}
}
private void finish1(@CheckForNull Throwable problems) {
if (problems != null) {
// for some reason the executor died. this is really
// a bug in the code, but we don't want the executor to die,
// so just leave some info and go on to build other things
LOGGER.log(Level.SEVERE, "Executor threw an exception", problems);
workUnit.context.abort(problems);
}
long time = System.currentTimeMillis() - startTime;
LOGGER.log(FINE, "{0} completed {1} in {2}ms", new Object[] {getName(), executable, time});
try {
workUnit.context.synchronizeEnd(this, executable, problems, time);
} catch (InterruptedException e) {
workUnit.context.abort(e);
} finally {
workUnit.setExecutor(null);
}
}
private void finish2() {
if (causeOfDeath == null) {// let this thread die and be replaced by a fresh unstarted instance
owner.removeExecutor(this);
}
if (this instanceof OneOffExecutor) {
owner.remove((OneOffExecutor) this);
}
queue.scheduleMaintenance();
}
queue.scheduleMaintenance();
@Restricted(NoExternalUse.class)
public void completedAsynchronous(@CheckForNull Throwable error) {
try {
finish1(error);
} finally {
finish2();
}
asynchronousExecution = null;
}
/**
......@@ -357,14 +392,14 @@ public class Executor extends Thread implements ModelObject {
/**
* Check if executor is ready to accept tasks.
* This method becomes the critical one since 1.536, which introduces the
* on-demand creation of executor threads. The entire logic should use
* this method instead of {@link #isAlive()}, because it provides wrong
* information for non-started threads.
* on-demand creation of executor threads. Callers should use
* this method instead of {@link #isAlive()}, which would be incorrect for
* non-started threads or running {@link AsynchronousExecution}.
* @return True if the executor is available for tasks
* @since 1.536
*/
public boolean isActive() {
return !started || isAlive();
return !started || asynchronousExecution != null || isAlive();
}
/**
......@@ -377,7 +412,7 @@ public class Executor extends Thread implements ModelObject {
/**
* If this thread dies unexpectedly, obtain the cause of the failure.
*
* @return null if the death is expected death or the thread is {@link #isAlive() still alive}.
* @return null if the death is expected death or the thread {@link #isActive}.
* @since 1.142
*/
public Throwable getCauseOfDeath() {
......@@ -529,7 +564,7 @@ public class Executor extends Thread implements ModelObject {
@RequirePOST
public HttpResponse doYank() {
Jenkins.getInstance().checkPermission(Jenkins.ADMINISTER);
if (isAlive())
if (isActive())
throw new Failure("Can't yank a live executor");
owner.removeExecutor(this);
return HttpResponses.redirectViaContextPath("/");
......
......@@ -36,13 +36,4 @@ public class OneOffExecutor extends Executor {
public OneOffExecutor(Computer owner) {
super(owner,-1);
}
@Override
public void run() {
try {
super.run();
} finally {
owner.remove(this);
}
}
}
......@@ -95,7 +95,6 @@ import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
......@@ -121,6 +120,7 @@ import org.kohsuke.accmod.restrictions.DoNotUse;
import com.thoughtworks.xstream.XStream;
import com.thoughtworks.xstream.converters.basic.AbstractSingleValueConverter;
import javax.annotation.CheckForNull;
import jenkins.model.queue.Executable2;
import org.kohsuke.stapler.QueryParameter;
import org.kohsuke.stapler.interceptor.RequirePOST;
......@@ -1465,6 +1465,7 @@ public class Queue extends ResourceController implements Saveable {
* <p>
* Implementation must have <tt>executorCell.jelly</tt>, which is
* used to render the HTML that indicates this executable is executing.
* @see Executable2
*/
public interface Executable extends Runnable {
/**
......@@ -1480,7 +1481,7 @@ public class Queue extends ResourceController implements Saveable {
/**
* Called by {@link Executor} to perform the task
*/
void run();
@Override void run();
/**
* Estimate of how long will it take to execute this executable.
......
......@@ -88,6 +88,7 @@ public class ResourceController {
try {
task.run();
} finally {
// TODO if AsynchronousExecution, do that later
synchronized(this) {
inProgress.remove(activity);
inUse = ResourceList.union(resourceView);
......
......@@ -76,6 +76,7 @@ public final class WorkUnitContext {
protected void onCriteriaMet() {
// on behalf of the member Executors,
// the one that executes the main thing will send notifications
// Unclear if this will work with AsynchronousExecution; it *seems* this is only called from synchronize which is only called from synchronizeStart which is only called from an executor thread.
Executor e = Executor.currentExecutor();
if (e.getCurrentWorkUnit().isMainWork()) {
e.getOwner().taskAccepted(e,task);
......@@ -121,6 +122,11 @@ public final class WorkUnitContext {
}
}
@Deprecated
public void synchronizeEnd(Queue.Executable executable, Throwable problems, long duration) throws InterruptedException {
synchronizeEnd(Executor.currentExecutor(), executable, problems, duration);
}
/**
* All the {@link Executor}s that jointly execute a {@link Task} call this method to synchronize on the end of the task.
*
......@@ -128,11 +134,10 @@ public final class WorkUnitContext {
* If any of the member thread is interrupted while waiting for other threads to join, all
* the member threads will report {@link InterruptedException}.
*/
public void synchronizeEnd(Queue.Executable executable, Throwable problems, long duration) throws InterruptedException {
public void synchronizeEnd(Executor e, Queue.Executable executable, Throwable problems, long duration) throws InterruptedException {
endLatch.synchronize();
// the main thread will send a notification
Executor e = Executor.currentExecutor();
WorkUnit wu = e.getCurrentWorkUnit();
if (wu.isMainWork()) {
if (problems == null) {
......
/*
* The MIT License
*
* Copyright 2015 Jesse Glick.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
package jenkins.model.queue;
import hudson.model.Executor;
import hudson.model.ExecutorListener;
import hudson.model.OneOffExecutor;
import hudson.model.Queue;
import hudson.model.Resource;
import hudson.model.ResourceActivity;
import hudson.model.ResourceController;
import hudson.model.ResourceList;
import javax.annotation.CheckForNull;
import org.kohsuke.accmod.Restricted;
import org.kohsuke.accmod.restrictions.NoExternalUse;
/**
* Extended interface for running tasks with some additional logic.
* @since TODO
*/
public interface Executable2 extends Queue.Executable {
/**
* {@inheritDoc}
* @throws AsynchronousExecution if you would like to continue without consuming a thread
*/
@Override void run() throws AsynchronousExecution;
/**
* Special means of indicating that an executable will proceed in the background without consuming a native thread ({@link Executor}).
* May be thrown from {@link Executable2#run} after doing any preparatory work synchronously.
* <p>{@link Executor#isActive} will remain true (even though {@link Executor#isAlive} is not) until {@link #completed} is called.
* The thrower will need to hold on to a reference to this instance as a handle to call {@link #completed}.
* <p>The execution may not extend into another Jenkins session; if you wish to model a long-running execution, you must schedule a new task after restart.
* This class is not serializable anyway.
* <p>Mainly intended for use with {@link OneOffExecutor} (from a {@link hudson.model.Queue.FlyweightTask}), of which there could be many,
* but could also be used with a heavyweight executor even though the number of executors is bounded by node configuration.
* <p>{@link ResourceController}/{@link ResourceActivity}/{@link ResourceList}/{@link Resource} are not currently supported.
* Nor are {@link hudson.model.Queue.Task#getSubTasks} other than the primary task.
*/
abstract class AsynchronousExecution extends RuntimeException {
private Executor executor;
/** Constructor for subclasses. */
protected AsynchronousExecution() {}
/**
* Called in lieu of {@link Thread#interrupt} by {@link Executor#interrupt()} and its overloads.
* As with the standard Java method, you are requested to cease work as soon as possible, but there is no enforcement of this.
* You might also want to call {@link Executor#recordCauseOfInterruption} on {@link #getExecutor}.
*/
public abstract void interrupt();
/**
* Obtains the associated executor.
*/
public final Executor getExecutor() {
return executor;
}
@Restricted(NoExternalUse.class)
public final void setExecutor(Executor executor) {
this.executor = executor;
}
/**
* To be called when the task is actually complete.
* @param error normally null (preferable to handle errors yourself), but may be specified to simulate an exception from {@link Executable2#run}, as per {@link ExecutorListener#taskCompletedWithProblems}
*/
public final void completed(@CheckForNull Throwable error) {
executor.completedAsynchronous(error);
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册