提交 73073387 编写于 作者: K Kohsuke Kawaguchi

Still a work in progress

上级 08a1fc39
......@@ -718,7 +718,6 @@ public /*transient*/ abstract class Computer extends Actionable implements Acces
for (Integer number : availableNumbers) {
Executor e = new Executor(this, number);
e.start();
executors.add(e);
}
}
......@@ -847,7 +846,7 @@ public /*transient*/ abstract class Computer extends Actionable implements Acces
*/
protected boolean isAlive() {
for (Executor e : executors)
if (e.isAlive())
if (e.isActive())
return true;
return false;
}
......@@ -1015,8 +1014,8 @@ public /*transient*/ abstract class Computer extends Actionable implements Acces
* Starts executing a fly-weight task.
*/
/*package*/ final void startFlyWeightTask(WorkUnit p) {
OneOffExecutor e = new OneOffExecutor(this, p);
e.start();
OneOffExecutor e = new OneOffExecutor(this);
e.start(p);
oneOffExecutors.add(e);
}
......
......@@ -87,12 +87,18 @@ public class Executor extends Thread implements ModelObject {
*/
private volatile Queue.Executable executable;
/**
* When {@link Queue} allocates a work for this executor, this field is set
* and the executor is {@linkplain Thread#start() started}.
*/
private volatile WorkUnit workUnit;
private Throwable causeOfDeath;
private boolean induceDeath;
private volatile boolean started;
/**
* When the executor is interrupted, we allow the code that interrupted the thread to override the
* result code it prefers.
......@@ -183,7 +189,9 @@ public class Executor extends Thread implements ModelObject {
try {
finishTime = System.currentTimeMillis();
while(shouldRun()) {
MAIN:
do {
executable = null;
workUnit = null;
interruptStatus = null;
......@@ -200,7 +208,7 @@ public class Executor extends Thread implements ModelObject {
// clear the interrupt flag as a precaution.
// sometime an interrupt aborts a build but without clearing the flag.
// see issue #1583
if (Thread.interrupted()) continue;
if (Thread.interrupted()) break MAIN;
if (induceDeath) throw new ThreadDeath();
SubTask task;
......@@ -208,8 +216,8 @@ public class Executor extends Thread implements ModelObject {
// transition from idle to building.
// perform this state change as an atomic operation wrt other queue operations
synchronized (queue) {
workUnit = grabJob();
workUnit.setExecutor(this);
queue.onStartExecuting(this);
if (LOGGER.isLoggable(FINE))
LOGGER.log(FINE, getName()+" grabbed "+workUnit+" from queue");
task = workUnit.work;
......@@ -221,10 +229,10 @@ public class Executor extends Thread implements ModelObject {
LOGGER.log(FINE, getName()+" is going to execute "+executable);
} catch (IOException e) {
LOGGER.log(Level.SEVERE, "Executor threw an exception", e);
continue;
break MAIN;
} catch (InterruptedException e) {
LOGGER.log(FINE, getName()+" interrupted",e);
continue;
break MAIN;
}
Throwable problems = null;
......@@ -263,12 +271,16 @@ public class Executor extends Thread implements ModelObject {
workUnit.context.synchronizeEnd(executable,problems,finishTime - startTime);
} catch (InterruptedException e) {
workUnit.context.abort(e);
continue;
break MAIN;
} finally {
workUnit.setExecutor(null);
}
}
}
} while (false); // this pointless do-while allows "break" to come here
// let this thread die and be replaced by a fresh unstarted instance
owner.removeExecutor(this);
} catch(RuntimeException e) {
causeOfDeath = e;
throw e;
......@@ -286,17 +298,6 @@ public class Executor extends Thread implements ModelObject {
interrupt();
}
/**
* Returns true if we should keep going.
*/
protected boolean shouldRun() {
return Jenkins.getInstance() != null && !Jenkins.getInstance().isTerminating();
}
protected WorkUnit grabJob() throws InterruptedException {
return queue.pop();
}
/**
* Returns the current {@link hudson.model.Queue.Task} this executor is running.
*
......@@ -369,6 +370,17 @@ public class Executor extends Thread implements ModelObject {
return executable!=null;
}
public boolean isActive() {
return !started || isAlive();
}
/**
* Returns true if this executor is waiting for a task to execute.
*/
public boolean isParking() {
return !started;
}
/**
* If this thread dies unexpectedly, obtain the cause of the failure.
*
......@@ -477,6 +489,23 @@ public class Executor extends Thread implements ModelObject {
return eta;
}
/**
* Can't start executor like you normally start a thread.
*
* @see #start(WorkUnit)
*/
@Override
public synchronized void start() {
throw new UnsupportedOperationException();
}
/*protected*/ synchronized void start(WorkUnit task) {
this.workUnit = task;
super.start();
started = true;
}
/**
* @deprecated as of 1.489
* Use {@link #doStop()}.
......@@ -588,5 +617,4 @@ public class Executor extends Thread implements ModelObject {
private static final ThreadLocal<Executor> IMPERSONATION = new ThreadLocal<Executor>();
private static final Logger LOGGER = Logger.getLogger(Executor.class.getName());
}
......@@ -24,7 +24,6 @@
package hudson.model;
import hudson.model.Queue.FlyweightTask;
import hudson.model.queue.WorkUnit;
/**
* {@link Executor} that's temporarily added to carry out tasks that doesn't consume
......@@ -34,30 +33,8 @@ import hudson.model.queue.WorkUnit;
* @see FlyweightTask
*/
public class OneOffExecutor extends Executor {
private WorkUnit work;
public OneOffExecutor(Computer owner, WorkUnit work) {
public OneOffExecutor(Computer owner) {
super(owner,-1);
this.work = work;
}
@Override
protected boolean shouldRun() {
// TODO: consulting super.shouldRun() here means we'll lose the work if it gets scheduled
// when super.shouldRun() returns false.
return super.shouldRun() && work !=null;
}
public WorkUnit getAssignedWorkUnit() {
return work;
}
@Override
protected WorkUnit grabJob() throws InterruptedException {
WorkUnit r = super.grabJob();
assert r==work;
work = null;
return r;
}
@Override
......
......@@ -227,22 +227,15 @@ public class Queue extends ResourceController implements Saveable {
* This is a job offer from the queue to an executor.
*
* <p>
* An idle executor (that calls {@link Queue#pop()} creates
* a new {@link JobOffer} and gets itself {@linkplain Queue#parked parked},
* and we'll eventually hand out an {@link #workUnit} to build.
* For each idle executor, this gets created to allow the scheduling logic
* to assign a work. Once a work is assigned, the executor actually gets
* started to carry out the task in question.
*/
public class JobOffer extends MappingWorksheet.ExecutorSlot {
public final Executor executor;
/**
* Used to wake up an executor, when it has an offered
* {@link Project} to build.
*/
private final OneShotEvent event = new OneShotEvent(Queue.this);
/**
* The work unit that this {@link Executor} is going to handle.
* (Or null, in which case event is used to trigger a queue maintenance.)
*/
private WorkUnit workUnit;
......@@ -254,7 +247,8 @@ public class Queue extends ResourceController implements Saveable {
protected void set(WorkUnit p) {
assert this.workUnit == null;
this.workUnit = p;
event.signal();
assert executor.isParking();
executor.start(workUnit);
}
@Override
......@@ -313,7 +307,7 @@ public class Queue extends ResourceController implements Saveable {
this.loadBalancer = loadBalancer.sanitize();
// if all the executors are busy doing something, then the queue won't be maintained in
// timely fashion, so use another thread to make sure it happens.
new MaintainTask(this);
new MaintainTask(this).periodic();
}
public LoadBalancer getLoadBalancer() {
......@@ -903,94 +897,16 @@ public class Queue extends ResourceController implements Saveable {
}
/**
* Called by the executor to fetch something to build next.
* <p>
* This method blocks until a next project becomes buildable.
* Called when the executor actually starts executing the assigned work unit.
*
* This moves the task from the pending state to the "left the queue" state.
*/
public synchronized WorkUnit pop() throws InterruptedException {
final Executor exec = Executor.currentExecutor();
if (exec instanceof OneOffExecutor) {
OneOffExecutor ooe = (OneOffExecutor) exec;
final WorkUnit wu = ooe.getAssignedWorkUnit();
pendings.remove(wu.context.item);
LeftItem li = new LeftItem(wu.context);
li.enter(this);
return wu;
}
try {
while (true) {
final JobOffer offer = new JobOffer(exec);
long sleep = -1;
// consider myself parked
assert !parked.containsKey(exec);
parked.put(exec, offer);
// reuse executor thread to do a queue maintenance.
// at the end of this we get all the buildable jobs
// in the buildables field.
maintain();
// we went over all the buildable projects and awaken
// all the executors that got work to do. now, go to sleep
// until this thread is awakened. If this executor assigned a job to
// itself above, the block method will return immediately.
if (!waitingList.isEmpty()) {
// wait until the first item in the queue is due
sleep = peek().timestamp.getTimeInMillis() - new GregorianCalendar().getTimeInMillis();
if (sleep < 100) sleep = 100; // avoid wait(0)
}
if (sleep == -1)
offer.event.block();
else
offer.event.block(sleep);
// retract the offer object
assert parked.get(exec) == offer;
parked.remove(exec);
// am I woken up because I have a project to build?
if (offer.workUnit != null) {
// if so, just build it
LOGGER.log(Level.FINE, "Pop returning {0} for {1}", new Object[] {offer.workUnit, exec.getName()});
// TODO: I think this has to be done by the last executor that leaves the pop(), not by main executor
if (offer.workUnit.isMainWork()) {
pendings.remove(offer.workUnit.context.item);
LeftItem li = new LeftItem(offer.workUnit.context);
li.enter(this);
}
return offer.workUnit;
}
// otherwise run a queue maintenance
}
} finally {
// remove myself from the parked list
JobOffer offer = parked.remove(exec);
if (offer != null && offer.workUnit != null) {
// we are already assigned a project, but now we can't handle it.
offer.workUnit.context.abort(new AbortException());
if(offer.workUnit.context.item!=null && pendings.contains(offer.workUnit.context.item)){
//we are already assigned a project and moved it into pendings, but something wrong had happened before an executor could take it.
pendings.remove(offer.workUnit.context.item);
//return it into queue, it does not have to cause this problem, it can be caused by another item.
buildables.add(offer.workUnit.context.item);
}
}
/*package*/ synchronized void onStartExecuting(Executor exec) throws InterruptedException {
final WorkUnit wu = exec.getCurrentWorkUnit();
pendings.remove(wu.context.item);
// since this executor might have been chosen for
// maintenance, schedule another one. Worst case
// we'll just run a pointless maintenance, and that's
// fine.
scheduleMaintenance();
}
LeftItem li = new LeftItem(wu.context);
li.enter(this);
}
/**
......@@ -1002,15 +918,7 @@ public class Queue extends ResourceController implements Saveable {
* This wakes up one {@link Executor} so that it will maintain a queue.
*/
public synchronized void scheduleMaintenance() {
// this code assumes that after this method is called
// no more executors will be offered job except by
// the pop() code.
for (Entry<Executor, JobOffer> av : parked.entrySet()) {
if (av.getValue().workUnit == null) {
av.getValue().event.signal();
return;
}
}
new MaintainTask(this).once();
}
/**
......@@ -1058,6 +966,21 @@ public class Queue extends ResourceController implements Saveable {
public synchronized void maintain() {
LOGGER.log(Level.FINE, "Queue maintenance started {0}", this);
{// update parked
for (Computer c : Jenkins.getInstance().getComputers()) {
for (Executor e : c.getExecutors()) {
if (e.isParking()) {
if (!parked.containsKey(e)) {
parked.put(e,new JobOffer(e));
}
} else {
parked.remove(e);
}
}
}
}
{// blocked -> buildable
for (BlockedItem p : new ArrayList<BlockedItem>(blockedProjects.values())) {// copy as we'll mutate the list
if (!isBuildBlocked(p) && allowNewBuildableTask(p.task)) {
......@@ -2070,7 +1993,9 @@ public class Queue extends ResourceController implements Saveable {
MaintainTask(Queue queue) {
this.queue = new WeakReference<Queue>(queue);
}
private void periodic() {
long interval = 5000;
Timer timer = Trigger.timer;
if (timer != null) {
......@@ -2078,6 +2003,13 @@ public class Queue extends ResourceController implements Saveable {
}
}
private void once() {
Timer timer = Trigger.timer;
if (timer != null) {
timer.schedule(this,0);
}
}
protected void doRun() {
Queue q = queue.get();
if (q != null)
......
......@@ -182,7 +182,7 @@ public class NodeProvisioner {
cl.onComplete(f,node);
hudson.addNode(node);
LOGGER.info(f.displayName+" provisioning successfully completed. We have now "+hudson.getComputers().length+" computer(s)");
LOGGER.info(f.displayName+" provisioningE successfully completed. We have now "+hudson.getComputers().length+" computer(s)");
} catch (InterruptedException e) {
throw new AssertionError(e); // since we confirmed that the future is already done
} catch (ExecutionException e) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册