提交 cc38c656 编写于 作者: K kohsuke

Work in progress for job with multiple executor consumption support.

git-svn-id: https://hudson.dev.java.net/svn/hudson/trunk/hudson/main@34607 71c3de6d-444a-0410-be80-ed276b4c234a
上级 99294e58
......@@ -41,6 +41,7 @@ import hudson.model.Cause.UserCause;
import hudson.model.Descriptor.FormException;
import hudson.model.Fingerprint.RangeSet;
import hudson.model.Queue.Executable;
import hudson.model.Queue.ExecutionUnit;
import hudson.model.Queue.WaitingItem;
import hudson.model.RunMap.Constructor;
import hudson.model.labels.LabelAtom;
......@@ -997,6 +998,11 @@ public abstract class AbstractProject<P extends AbstractProject<P,R>,R extends A
return null;
}
public Collection<? extends ExecutionUnit> getMemberExecutionUnits() {
// TODO
return Collections.emptyList();
}
public R createExecutable() throws IOException {
if(isDisabled()) return null;
return newBuild();
......
......@@ -26,6 +26,7 @@ package hudson.model;
import hudson.Util;
import hudson.model.Queue.*;
import hudson.FilePath;
import hudson.model.queue.WorkUnit;
import hudson.util.TimeUnit2;
import hudson.util.InterceptingProxy;
import hudson.security.ACL;
......@@ -67,6 +68,8 @@ public class Executor extends Thread implements ModelObject {
*/
private volatile Queue.Executable executable;
private volatile WorkUnit workUnit;
private Throwable causeOfDeath;
public Executor(Computer owner, int n) {
......@@ -85,6 +88,7 @@ public class Executor extends Thread implements ModelObject {
finishTime = System.currentTimeMillis();
while(shouldRun()) {
executable = null;
workUnit = null;
synchronized(owner) {
if(owner.getNumExecutors()<owner.getExecutors().size()) {
......@@ -99,12 +103,11 @@ public class Executor extends Thread implements ModelObject {
// see issue #1583
if (Thread.interrupted()) continue;
Queue.Item queueItem;
Queue.Task task;
ExecutionUnit task;
try {
synchronized (queue) {// perform this state change as an atomic operation wrt other queue operations
queueItem = grabJob();
task = queueItem.task;
workUnit = grabJob();
task = workUnit.work;
startTime = System.currentTimeMillis();
executable = task.createExecutable();
}
......@@ -118,10 +121,10 @@ public class Executor extends Thread implements ModelObject {
Throwable problems = null;
final String threadName = getName();
try {
owner.taskAccepted(this, task);
workUnit.context.synchronizeStart();
if (executable instanceof Actionable) {
for (Action action: queueItem.getActions()) {
for (Action action: workUnit.actions) {
((Actionable) executable).addAction(action);
}
}
......@@ -136,13 +139,7 @@ public class Executor extends Thread implements ModelObject {
} finally {
setName(threadName);
finishTime = System.currentTimeMillis();
if (problems == null) {
queueItem.future.set(executable);
owner.taskCompleted(this, task, finishTime - startTime);
} else {
queueItem.future.set(problems);
owner.taskCompletedWithProblems(this, task, finishTime - startTime, problems);
}
workUnit.context.synchronizeEnd(executable,problems,finishTime - startTime);
}
}
} catch(RuntimeException e) {
......@@ -161,7 +158,7 @@ public class Executor extends Thread implements ModelObject {
return Hudson.getInstance() != null && !Hudson.getInstance().isTerminating();
}
protected Queue.Item grabJob() throws InterruptedException {
protected WorkUnit grabJob() throws InterruptedException {
return queue.pop();
}
......@@ -176,6 +173,18 @@ public class Executor extends Thread implements ModelObject {
return executable;
}
/**
* Returns the current {@link WorkUnit} (of {@link #getCurrentExecutable() the current executable})
* that this executor is running.
*
* @return
* null if the executor is idle.
*/
@Exported
public WorkUnit getCurrentWorkUnit() {
return workUnit;
}
/**
* If {@linkplain #getCurrentExecutable() current executable} is {@link AbstractBuild},
* return the workspace that this executor is using, or null if the build hasn't gotten
......
......@@ -24,6 +24,7 @@
package hudson.model;
import hudson.model.Queue.FlyweightTask;
import hudson.model.queue.WorkUnit;
/**
* {@link Executor} that's temporarily added to carry out tasks that doesn't consume
......@@ -33,11 +34,11 @@ import hudson.model.Queue.FlyweightTask;
* @see FlyweightTask
*/
public class OneOffExecutor extends Executor {
private Queue.Item item;
private WorkUnit item;
public OneOffExecutor(Computer owner, Queue.Item item) {
super(owner,-1);
this.item = item;
this.item = new WorkUnit(item,item.task);
}
@Override
......@@ -48,8 +49,8 @@ public class OneOffExecutor extends Executor {
}
@Override
protected Queue.Item grabJob() throws InterruptedException {
Queue.Item r = item;
protected WorkUnit grabJob() throws InterruptedException {
WorkUnit r = item;
item = null;
return r;
}
......
......@@ -37,6 +37,7 @@ import hudson.cli.declarative.CLIResolver;
import hudson.model.queue.AbstractQueueTask;
import hudson.model.queue.QueueSorter;
import hudson.model.queue.QueueTaskDispatcher;
import hudson.model.queue.WorkUnit;
import hudson.remoting.AsyncFutureImpl;
import hudson.model.Node.Mode;
import hudson.model.listeners.SaveableListener;
......@@ -63,6 +64,7 @@ import java.lang.ref.WeakReference;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Calendar;
import java.util.Collection;
import java.util.Collections;
import java.util.GregorianCalendar;
import java.util.HashMap;
......@@ -221,7 +223,7 @@ public class Queue extends ResourceController implements Saveable {
private volatile transient LoadBalancer loadBalancer;
private volatile transient QueueSorter sorter;
public Queue(LoadBalancer loadBalancer) {
this.loadBalancer = loadBalancer.sanitize();
// if all the executors are busy doing something, then the queue won't be maintained in
......@@ -708,7 +710,7 @@ public class Queue extends ResourceController implements Saveable {
* <p>
* This method blocks until a next project becomes buildable.
*/
public synchronized Queue.Item pop() throws InterruptedException {
public synchronized WorkUnit pop() throws InterruptedException {
final Executor exec = Executor.currentExecutor();
try {
......@@ -778,7 +780,7 @@ public class Queue extends ResourceController implements Saveable {
LOGGER.fine("Pop returning " + offer.item + " for " + exec.getName());
offer.item.future.startExecuting(exec);
pendings.remove(offer.item);
return offer.item;
return new WorkUnit(offer.item, offer.item.task);
}
// otherwise run a queue maintenance
}
......@@ -1017,7 +1019,7 @@ public class Queue extends ResourceController implements Saveable {
* instead of implementing this interface directly, to maintain
* compatibility with future changes to this interface.
*/
public interface Task extends ModelObject, ResourceActivity {
public interface Task extends ModelObject, ExecutionUnit {
/**
* If this task needs to be run on a node with a particular label,
* return that {@link Label}. Otherwise null, indicating
......@@ -1118,6 +1120,23 @@ public class Queue extends ResourceController implements Saveable {
* @since 1.338
*/
boolean isConcurrentBuild();
Collection<? extends ExecutionUnit> getMemberExecutionUnits();
}
public interface ExecutionUnit extends ResourceActivity {
/**
* Estimate of how long will it take to execute this task.
* Measured in milliseconds.
*
* @return -1 if it's impossible to estimate.
*/
long getEstimatedDuration();
/**
* Creates {@link Executable}, which performs the actual execution of the task.
*/
Executable createExecutable() throws IOException;
}
public interface Executable extends Runnable {
......
......@@ -25,6 +25,10 @@
package hudson.model.queue;
import hudson.model.Queue;
import hudson.model.Queue.ExecutionUnit;
import java.util.Collection;
import java.util.Collections;
/**
* Abstract base class for {@link Queue.Task} to protect plugins
......@@ -34,4 +38,7 @@ import hudson.model.Queue;
* @since 1.360
*/
public abstract class AbstractQueueTask implements Queue.Task {
public Collection<? extends ExecutionUnit> getMemberExecutionUnits() {
return Collections.emptyList();
}
}
package hudson.model.queue;
/**
* A concurrency primitive that waits for N number of threads to synchronize.
* If any of the threads are interrupted while waiting for the completion of the condition,
* then all the involved threads get interrupted.
*
* @author Kohsuke Kawaguchi
*/
class Latch {
private final int n;
private int i=0;
private boolean interrupted;
public Latch(int n) {
this.n = n;
}
public synchronized void abort() {
interrupted = true;
notifyAll();
}
public synchronized void synchronize() throws InterruptedException {
check(n);
boolean success=false;
try {
onCriteriaMet();
success=true;
} finally {
if (!success)
abort();
}
check(n*2);
}
private void check(int threshold) throws InterruptedException {
i++;
if (i==threshold) {
notifyAll();
} else {
while (i<threshold && !interrupted) {
try {
wait();
} catch (InterruptedException e) {
interrupted = true;
notifyAll();
throw e;
}
}
}
// all of us either leave normally or get interrupted
if (interrupted)
throw new InterruptedException();
}
protected void onCriteriaMet() throws InterruptedException {}
}
......@@ -28,10 +28,12 @@ import hudson.model.Label;
import hudson.model.Node;
import hudson.model.Queue;
import hudson.model.Queue.Executable;
import hudson.model.Queue.ExecutionUnit;
import hudson.model.Queue.Task;
import hudson.model.ResourceList;
import java.io.IOException;
import java.util.Collection;
/**
* Base class for defining filter {@link Queue.Task}.
......@@ -105,4 +107,8 @@ public abstract class QueueTaskFilter implements Queue.Task {
public ResourceList getResourceList() {
return base.getResourceList();
}
public Collection<? extends ExecutionUnit> getMemberExecutionUnits() {
return base.getMemberExecutionUnits();
}
}
package hudson.model.queue;
import hudson.model.Action;
import hudson.model.Executor;
import hudson.model.Queue;
import hudson.model.Queue.ExecutionUnit;
import hudson.model.Queue.FutureImpl;
import hudson.model.Queue.Item;
import hudson.model.Queue.Task;
import java.util.List;
/**
* Represents a unit of hand-over to {@link Executor} from {@link Queue}.
*
* @author Kohsuke Kawaguchi
*/
public final class WorkUnit {
/**
* Task to be executed.
*/
public final ExecutionUnit work;
/**
* Which task does this work belong to?
*/
public final Task task;
/**
* Associated parameters to the build.
*/
public final List<Action> actions;
/**
* Once the execution is complete, update this future object with the outcome.
*/
public final FutureImpl future;
public final WorkUnitContext context;
public WorkUnit(Item item, ExecutionUnit work) {
this.task = item.task;
this.future = item.future; // TODO: this is incorrect
this.actions = item.getActions();
this.work = work;
}
/**
* Is this work unit the "main work", which is the primary {@link ExecutionUnit}
* represented by {@link Task} itself.
*/
public boolean isMainWork() {
return task==work;
}
}
package hudson.model.queue;
import hudson.model.Executor;
import hudson.model.Queue;
import hudson.model.Queue.Task;
/**
* Holds the information shared between {@link WorkUnit}s created from the same {@link Task}.
*
* @author Kohsuke Kawaguchi
*/
public final class WorkUnitContext {
private final int workUnitSize;
public final Task task;
private final Latch startLatch, endLatch;
public WorkUnitContext(Task _task) {
this.task = _task;
// +1 for the main task
workUnitSize = task.getMemberExecutionUnits().size() + 1;
startLatch = new Latch(workUnitSize) {
@Override
protected void onCriteriaMet() {
// on behalf of the member Executors,
// the one that executes the main thing will send notifications
Executor e = Executor.currentExecutor();
if (e.getCurrentWorkUnit().isMainWork()) {
e.getOwner().taskAccepted(e,task);
}
}
};
endLatch = new Latch(workUnitSize);
}
/**
* {@link Executor}s call this method to synchronize on the start of the task
*/
public synchronized void synchronizeStart() throws InterruptedException {
startLatch.synchronize();
}
public synchronized void synchronizeEnd(Queue.Executable executable, Throwable problems, long duration) throws InterruptedException {
endLatch.synchronize();
// the main thread will send a notification
Executor e = Executor.currentExecutor();
if (e.getCurrentWorkUnit().isMainWork()) {
if (problems == null) {
queueItem.future.set(executable);
e.getOwner().taskCompleted(e, task, duration);
} else {
queueItem.future.set(problems);
e.getOwner().taskCompletedWithProblems(e, task, duration, problems);
}
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册