提交 1254d91a 编写于 作者: K kohsuke

Redoing rev.31362 in a less hack-ish way.

See http://hudson.361315.n4.nabble.com/Patch-to-fix-concurrent-build-problem-td2229136.html for the discussion.



git-svn-id: https://hudson.dev.java.net/svn/hudson/trunk/hudson/main@31906 71c3de6d-444a-0410-be80-ed276b4c234a
上级 cc4aefcf
......@@ -84,6 +84,8 @@ public class Executor extends Thread implements ModelObject {
try {
finishTime = System.currentTimeMillis();
while(shouldRun()) {
executable = null;
synchronized(owner) {
if(owner.getNumExecutors()<owner.getExecutors().size()) {
// we've got too many executors.
......@@ -98,37 +100,37 @@ public class Executor extends Thread implements ModelObject {
Thread.interrupted();
Queue.Item queueItem;
Queue.Task task;
try {
queueItem = grabJob();
task = queueItem.task;
startTime = System.currentTimeMillis();
executable = task.createExecutable();
} catch (IOException e) {
LOGGER.log(Level.SEVERE, "Executor throw an exception unexpectedly", e);
continue;
} catch (InterruptedException e) {
continue;
}
Queue.Task task = queueItem.task;
Throwable problems = null;
owner.taskAccepted(this, task);
final String threadName = getName();
try {
try {
startTime = System.currentTimeMillis();
executable = task.createExecutable();
queue.completePop(queueItem);
if (executable instanceof Actionable) {
for (Action action: queueItem.getActions()) {
((Actionable) executable).addAction(action);
}
owner.taskAccepted(this, task);
if (executable instanceof Actionable) {
for (Action action: queueItem.getActions()) {
((Actionable) executable).addAction(action);
}
setName(threadName+" : executing "+executable.toString());
queue.execute(executable, task);
} catch (Throwable e) {
// for some reason the executor died. this is really
// a bug in the code, but we don't want the executor to die,
// so just leave some info and go on to build other things
LOGGER.log(Level.SEVERE, "Executor throw an exception unexpectedly", e);
problems = e;
}
setName(threadName+" : executing "+executable.toString());
queue.execute(executable, task);
} catch (Throwable e) {
// for some reason the executor died. this is really
// a bug in the code, but we don't want the executor to die,
// so just leave some info and go on to build other things
LOGGER.log(Level.SEVERE, "Executor throw an exception unexpectedly", e);
problems = e;
} finally {
setName(threadName);
finishTime = System.currentTimeMillis();
......@@ -140,7 +142,6 @@ public class Executor extends Thread implements ModelObject {
owner.taskCompletedWithProblems(this, task, finishTime - startTime, problems);
}
}
executable = null;
}
} catch(RuntimeException e) {
causeOfDeath = e;
......
......@@ -133,20 +133,6 @@ public class Queue extends ResourceController implements Saveable {
*/
private final ItemList<BlockedItem> blockedProjects = new ItemList<BlockedItem>();
/**
* Popped items are the keys, the value is when the item was placed in the Map.
* When an item is popped it is placed here. The Executor that popped it should
* remove it via {@link #completePop()} once it creates the {@link Executable}.
*/
private final Map<Item,Long> popped = new HashMap<Item,Long> ();
/**
* How long we wait for an {@link Executor} to call {@link #completePop()} before
* we assume that something has gone wrong and remove an {@link Item} from
* {@link #popped}.
*/
private final long poppedWaitTime = 1000*60*1; // One minute
/**
* {@link Task}s that can be built immediately
* that are waiting for available {@link Executor}.
......@@ -163,14 +149,14 @@ public class Queue extends ResourceController implements Saveable {
* a new {@link JobOffer} and gets itself {@linkplain Queue#parked parked},
* and we'll eventually hand out an {@link #item} to build.
*/
public static class JobOffer {
public class JobOffer {
public final Executor executor;
/**
* Used to wake up an executor, when it has an offered
* {@link Project} to build.
*/
private final OneShotEvent event = new OneShotEvent();
private final OneShotEvent event = new OneShotEvent(Queue.this);
/**
* The project that this {@link Executor} is going to build.
......@@ -685,7 +671,7 @@ public class Queue extends ResourceController implements Saveable {
* <p>
* This method blocks until a next project becomes buildable.
*/
public Queue.Item pop() throws InterruptedException {
public synchronized Queue.Item pop() throws InterruptedException {
final Executor exec = Executor.currentExecutor();
try {
......@@ -693,97 +679,91 @@ public class Queue extends ResourceController implements Saveable {
final JobOffer offer = new JobOffer(exec);
long sleep = -1;
synchronized (this) {
// consider myself parked
assert !parked.containsKey(exec);
parked.put(exec, offer);
// reuse executor thread to do a queue maintenance.
// at the end of this we get all the buildable jobs
// in the buildables field.
maintain();
// allocate buildable jobs to executors
Iterator<BuildableItem> itr = buildables.iterator();
while (itr.hasNext()) {
BuildableItem p = itr.next();
// one last check to make sure this build is not blocked.
if (isBuildBlocked(p.task)) {
itr.remove();
blockedProjects.put(p.task,new BlockedItem(p));
continue;
}
JobOffer runner = loadBalancer.choose(p.task, new ApplicableJobOfferList(p.task));
if (runner == null)
// if we couldn't find the executor that fits,
// just leave it in the buildables list and
// check if we can execute other projects
continue;
assert runner.canTake(p.task);
// found a matching executor. use it.
runner.set(p);
// consider myself parked
assert !parked.containsKey(exec);
parked.put(exec, offer);
// reuse executor thread to do a queue maintenance.
// at the end of this we get all the buildable jobs
// in the buildables field.
maintain();
// allocate buildable jobs to executors
Iterator<BuildableItem> itr = buildables.iterator();
while (itr.hasNext()) {
BuildableItem p = itr.next();
// one last check to make sure this build is not blocked.
if (isBuildBlocked(p.task)) {
itr.remove();
blockedProjects.put(p.task,new BlockedItem(p));
continue;
}
// we went over all the buildable projects and awaken
// all the executors that got work to do. now, go to sleep
// until this thread is awakened. If this executor assigned a job to
// itself above, the block method will return immediately.
JobOffer runner = loadBalancer.choose(p.task, new ApplicableJobOfferList(p.task));
if (runner == null)
// if we couldn't find the executor that fits,
// just leave it in the buildables list and
// check if we can execute other projects
continue;
if (!waitingList.isEmpty()) {
// wait until the first item in the queue is due
sleep = peek().timestamp.getTimeInMillis() - new GregorianCalendar().getTimeInMillis();
if (sleep < 100) sleep = 100; // avoid wait(0)
}
assert runner.canTake(p.task);
// found a matching executor. use it.
// the item is retracted from the buildables list by the the executor that'll be actually
// running the job, so that the state change from "buildable item" to "building" happens
// atomically.
runner.set(p);
}
// we went over all the buildable projects and awaken
// all the executors that got work to do. now, go to sleep
// until this thread is awakened. If this executor assigned a job to
// itself above, the block method will return immediately.
if (!waitingList.isEmpty()) {
// wait until the first item in the queue is due
sleep = peek().timestamp.getTimeInMillis() - new GregorianCalendar().getTimeInMillis();
if (sleep < 100) sleep = 100; // avoid wait(0)
}
// this needs to be done outside synchronized block,
// so that executors can maintain a queue while others are sleeping
if (sleep == -1)
offer.event.block();
else
offer.event.block(sleep);
synchronized (this) {
// retract the offer object
assert parked.get(exec) == offer;
parked.remove(exec);
// am I woken up because I have a project to build?
if (offer.item != null) {
// if so, just build it
LOGGER.fine("Pop returning " + offer.item + " for " + exec.getName());
offer.item.future.startExecuting(exec);
popped.put(offer.item, System.currentTimeMillis());
return offer.item;
}
// otherwise run a queue maintenance
// retract the offer object
assert parked.get(exec) == offer;
parked.remove(exec);
// am I woken up because I have a project to build?
if (offer.item != null) {
// if so, just build it
LOGGER.fine("Pop returning " + offer.item + " for " + exec.getName());
offer.item.future.startExecuting(exec);
buildables.remove(offer.item);
return offer.item;
}
// otherwise run a queue maintenance
}
} finally {
synchronized (this) {
// remove myself from the parked list
JobOffer offer = parked.remove(exec);
if (offer != null && offer.item != null) {
// we are already assigned a project,
// ask for someone else to build it.
// note that while this thread is waiting for CPU
// someone else can schedule this build again,
// so check the contains method first.
if (!contains(offer.item.task))
buildables.put(offer.item.task,offer.item);
}
// since this executor might have been chosen for
// maintenance, schedule another one. Worst case
// we'll just run a pointless maintenance, and that's
// fine.
scheduleMaintenance();
// remove myself from the parked list
JobOffer offer = parked.remove(exec);
if (offer != null && offer.item != null) {
// we are already assigned a project,
// ask for someone else to build it.
// note that while this thread is waiting for CPU
// someone else can schedule this build again,
// so check the contains method first.
if (!contains(offer.item.task))
buildables.put(offer.item.task,offer.item);
}
// since this executor might have been chosen for
// maintenance, schedule another one. Worst case
// we'll just run a pointless maintenance, and that's
// fine.
scheduleMaintenance();
}
}
......@@ -882,35 +862,9 @@ public class Queue extends ResourceController implements Saveable {
} catch (AbstractMethodError e) {
// earlier versions don't have the "isConcurrentBuild" method, so fall back gracefully
}
return !(buildables.containsKey(t) || taskIsPopped(t));
return !buildables.containsKey(t);
}
/**
* Check if an {@link Item} of a given {@link Task} has been recently popped and
* the {@Executor} has not yet created an {@link Executable} for it.
*/
private boolean taskIsPopped (Task t) {
long cutoff = System.currentTimeMillis() - poppedWaitTime;
for (Map.Entry<Item,Long> e : popped.entrySet()) {
if (e.getValue() < cutoff) {
popped.remove(e.getKey());
} else if (e.getKey().task == t) {
return true;
}
}
return false;
}
/**
* Queue maintenance.
* <p>
* Tell the Queue that a popped item has been started, so we don't have to track
* it in the Queue anymore.
*/
public synchronized void completePop (Item i) {
popped.remove(i);
}
/**
* Queue maintenance.
* <p>
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册