提交 f2cb5d73 编写于 作者: K kohsuke

Enhanced Queue so that the completion of a task can be tracked more reliably via Future.

git-svn-id: https://hudson.dev.java.net/svn/hudson/trunk/hudson/main@18850 71c3de6d-444a-0410-be80-ed276b4c234a
上级 d487dcf3
......@@ -303,7 +303,7 @@ public class MatrixConfiguration extends Project<MatrixConfiguration,MatrixRun>
}
public boolean scheduleBuild(ParametersAction parameters, Cause c) {
return Hudson.getInstance().getQueue().add(this, getQuietPeriod(), parameters, new CauseAction(c));
return Hudson.getInstance().getQueue().schedule(this, getQuietPeriod(), parameters, new CauseAction(c))!=null;
}
}
......@@ -35,6 +35,7 @@ import hudson.model.Descriptor.FormException;
import hudson.model.Fingerprint.RangeSet;
import hudson.model.RunMap.Constructor;
import hudson.model.listeners.RunListener;
import hudson.model.Queue.WaitingItem;
import hudson.remoting.AsyncFutureImpl;
import hudson.scm.ChangeLogSet;
import hudson.scm.ChangeLogSet.Entry;
......@@ -493,8 +494,16 @@ public abstract class AbstractProject<P extends AbstractProject<P,R>,R extends A
* @return whether the build was actually scheduled
*/
public boolean scheduleBuild(int quietPeriod, Cause c, Action... actions) {
return scheduleBuild2(quietPeriod,c,actions)!=null;
}
/**
* Schedules a build of this project, and returns a {@link Future} object
* to wait for the completion of the build.
*/
public Future<R> scheduleBuild2(int quietPeriod, Cause c, Action... actions) {
if (isDisabled())
return false;
return null;
List<Action> queueActions = new ArrayList(Arrays.asList(actions));
if (isParameterized() && Util.filter(queueActions, ParametersAction.class).isEmpty()) {
......@@ -505,10 +514,10 @@ public abstract class AbstractProject<P extends AbstractProject<P,R>,R extends A
queueActions.add(new CauseAction(c));
}
return Hudson.getInstance().getQueue().add(
this,
quietPeriod,
queueActions.toArray(new Action[queueActions.size()]));
WaitingItem i = Hudson.getInstance().getQueue().schedule(this, quietPeriod, queueActions);
if(i!=null)
return (Future)i.getFuture();
return null;
}
private List<ParameterValue> getDefaultParametersValues() {
......@@ -553,34 +562,6 @@ public abstract class AbstractProject<P extends AbstractProject<P,R>,R extends A
return scheduleBuild2(quietPeriod, c, new Action[0]);
}
/**
* Schedules a build of this project, and returns a {@link Future} object
* to wait for the completion of the build.
*/
public Future<R> scheduleBuild2(int quietPeriod, Cause c, Action... actions) {
R lastBuild = getLastBuild();
final int n;
if(lastBuild!=null) n = lastBuild.getNumber();
else n = -1;
Future<R> f = new AsyncFutureImpl<R>() {
final RunListener r = new RunListener<AbstractBuild>(AbstractBuild.class) {
public void onFinalized(AbstractBuild r) {
if(r.getProject()==AbstractProject.this && r.getNumber()>n) {
set((R)r);
unregister();
}
}
};
{ r.register(); }
};
scheduleBuild(quietPeriod, c, actions);
return f;
}
/**
* Schedules a polling of this project.
*/
......@@ -1108,7 +1089,7 @@ public abstract class AbstractProject<P extends AbstractProject<P,R>,R extends A
// TODO: more unit handling
if(delay.endsWith("sec")) delay=delay.substring(0,delay.length()-3);
if(delay.endsWith("secs")) delay=delay.substring(0,delay.length()-4);
Hudson.getInstance().getQueue().add(this, Integer.parseInt(delay),
Hudson.getInstance().getQueue().schedule(this, Integer.parseInt(delay),
new CauseAction(cause));
} catch (NumberFormatException e) {
throw new ServletException("Invalid delay parameter value: "+delay);
......
......@@ -127,8 +127,10 @@ public class Executor extends Thread implements ModelObject {
} finally {
finishTime = System.currentTimeMillis();
if (problems == null) {
queueItem.future.set(executable);
owner.taskCompleted(this, task, finishTime - startTime);
} else {
queueItem.future.set(problems);
owner.taskCompletedWithProblems(this, task, finishTime - startTime, problems);
}
}
......
......@@ -103,7 +103,7 @@ public class ParametersDefinitionProperty extends JobProperty<AbstractProject<?,
values.add(parameterValue);
}
Hudson.getInstance().getQueue().add(
Hudson.getInstance().getQueue().schedule(
owner, 0, new ParametersAction(values), new CauseAction(new Cause.UserCause()));
// send the user back to the job top page.
......@@ -121,7 +121,7 @@ public class ParametersDefinitionProperty extends JobProperty<AbstractProject<?,
}
}
Hudson.getInstance().getQueue().add(
Hudson.getInstance().getQueue().schedule(
owner, 0, new ParametersAction(values), new CauseAction(new Cause.UserCause()));
// send the user back to the job top page.
......
......@@ -26,6 +26,7 @@ package hudson.model;
import hudson.BulkChange;
import hudson.Util;
import hudson.XmlFile;
import hudson.remoting.AsyncFutureImpl;
import hudson.model.Node.Mode;
import hudson.triggers.SafeTimerTask;
import hudson.triggers.Trigger;
......@@ -52,6 +53,7 @@ import java.util.Set;
import java.util.TreeSet;
import java.util.Map.Entry;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.Future;
import java.util.logging.Level;
import java.util.logging.Logger;
......@@ -229,7 +231,7 @@ public class Queue extends ResourceController implements Saveable {
for (Object o : list) {
if (o instanceof Task) {
// backward compatiblity
add((Task)o, 0);
schedule((Task)o, 0);
} else if (o instanceof Item) {
Item item = (Item)o;
if(item.task==null)
......@@ -289,9 +291,11 @@ public class Queue extends ResourceController implements Saveable {
* Wipes out all the items currently in the queue, as if all of them are cancelled at once.
*/
public synchronized void clear() {
for (WaitingItem i : waitingList)
i.onCancelled();
waitingList.clear();
blockedProjects.clear();
buildables.clear();
blockedProjects.cancelAll();
buildables.cancelAll();
scheduleMaintenance();
}
......@@ -303,6 +307,14 @@ public class Queue extends ResourceController implements Saveable {
return new File(Hudson.getInstance().getRootDir(), "queue.xml");
}
/**
* @deprecated as of 1.311
* Use {@link #schedule(AbstractProject)}
*/
public boolean add(AbstractProject p) {
return schedule(p)!=null;
}
/**
* Schedule a new build for this project.
*
......@@ -310,8 +322,8 @@ public class Queue extends ResourceController implements Saveable {
* false if the queue contained it and therefore the add()
* was noop
*/
public boolean add(AbstractProject p) {
return add(p, p.getQuietPeriod());
public WaitingItem schedule(AbstractProject p) {
return schedule(p, p.getQuietPeriod());
}
/**
......@@ -321,11 +333,13 @@ public class Queue extends ResourceController implements Saveable {
* Left for backward compatibility with &lt;1.114.
*
* @since 1.105
* @deprecated as of 1.311
* Use {@link #schedule(Task, int)}
*/
public synchronized boolean add(AbstractProject p, int quietPeriod) {
return add((Task) p, quietPeriod);
public boolean add(AbstractProject p, int quietPeriod) {
return schedule(p, quietPeriod)!=null;
}
/**
* Schedules an execution of a task.
*
......@@ -336,9 +350,27 @@ public class Queue extends ResourceController implements Saveable {
* false if the queue contained it and therefore the add()
* was noop, or just changed the due date of the task.
* @since 1.114
* @deprecated as of 1.311
* Use {@link #schedule(Task, int, List)}
*/
private synchronized boolean add(Task p, int quietPeriod, List<Action> actions) {
boolean taskConsumed=false;
private boolean add(Task p, int quietPeriod, List<Action> actions) {
return schedule(p,quietPeriod,actions)!=null;
}
/**
* Schedules an execution of a task.
*
* @since 1.311
* @return
* null if this task is already in the queue and therefore the add operation was no-op.
* Otherwise indicates the {@link WaitingItem} object added, although the nature of the queue
* is that such {@link Item} only captures the state of the item at a particular moment,
* and by the time you inspect the object, some of its information can be already stale.
*
* That said, one can still look at {@link WaitingItem#future}, {@link WaitingItem#id}, etc.
*/
public synchronized WaitingItem schedule(Task p, int quietPeriod, List<Action> actions) {
WaitingItem added=null;
List<Item> items = getItems(p);
Calendar due = new GregorianCalendar();
due.add(Calendar.SECOND, quietPeriod);
......@@ -363,8 +395,7 @@ public class Queue extends ResourceController implements Saveable {
LOGGER.fine(p.getFullDisplayName() + " added to queue");
// put the item in the queue
waitingList.add(new WaitingItem(due,p,actions));
taskConsumed=true;
waitingList.add(added=new WaitingItem(due,p,actions));
} else {
// the requested build is already queued, so will not be added
List<WaitingItem> waitingDuplicates = new ArrayList<WaitingItem>();
......@@ -380,7 +411,7 @@ public class Queue extends ResourceController implements Saveable {
if(duplicatesInQueue.size() == 0) {
// all duplicates in the queue are already in the blocked or
// buildable stage no need to requeue
return false;
return null;
}
// TODO: avoid calling scheduleMaintenance() if none of the waiting items
// actually change
......@@ -405,15 +436,31 @@ public class Queue extends ResourceController implements Saveable {
}
scheduleMaintenance(); // let an executor know that a new item is in the queue.
return taskConsumed;
return added;
}
/**
* @deprecated as of 1.311
* Use {@link #schedule(Task, int)}
*/
public synchronized boolean add(Task p, int quietPeriod) {
return add(p, quietPeriod, new Action[0]);
return schedule(p, quietPeriod)!=null;
}
public synchronized WaitingItem schedule(Task p, int quietPeriod) {
return schedule(p, quietPeriod, new Action[0]);
}
/**
* @deprecated as of 1.311
* Use {@link #schedule(Task, int, Action[])}
*/
public synchronized boolean add(Task p, int quietPeriod, Action... actions) {
return add(p, quietPeriod, Arrays.asList(actions));
return schedule(p, quietPeriod, actions)!=null;
}
public synchronized WaitingItem schedule(Task p, int quietPeriod, Action... actions) {
return schedule(p, quietPeriod, Arrays.asList(actions));
}
/**
......@@ -428,17 +475,21 @@ public class Queue extends ResourceController implements Saveable {
Item item = itr.next();
if (item.task.equals(p)) {
itr.remove();
item.onCancelled();
return true;
}
}
// use bitwise-OR to make sure that both branches get evaluated all the time
return blockedProjects.remove(p)!=null | buildables.remove(p)!=null;
return blockedProjects.cancel(p)!=null | buildables.cancel(p)!=null;
}
public synchronized boolean cancel(Item item) {
LOGGER.fine("Cancelling " + item.task.getFullDisplayName() + " item#" + item.id);
// use bitwise-OR to make sure that both branches get evaluated all the time
return (item instanceof WaitingItem && waitingList.remove(item)) | blockedProjects.remove(item) | buildables.remove(item);
// use bitwise-OR to make sure that all the branches get evaluated all the time
boolean r = (item instanceof WaitingItem && waitingList.remove(item)) | blockedProjects.remove(item) | buildables.remove(item);
if(r)
item.onCancelled();
return r;
}
public synchronized boolean isEmpty() {
......@@ -903,7 +954,6 @@ public class Queue extends ResourceController implements Saveable {
* URL that ends with '/'.
*/
String getUrl();
}
public interface Executable extends Runnable {
......@@ -924,7 +974,10 @@ public class Queue extends ResourceController implements Saveable {
*/
@ExportedBean(defaultVisibility = 999)
public static abstract class Item extends Actionable {
/**
* VM-wide unique ID that tracks the {@link Task} as it moves through different stages
* in the queue (each represented by different subtypes of {@link Item}.
*/
public final int id;
/**
......@@ -932,7 +985,9 @@ public class Queue extends ResourceController implements Saveable {
*/
@Exported
public final Task task;
/*package almost final*/ transient AsyncFutureImpl<Executable> future;
/**
* Build is blocked because another build is in progress,
* required {@link Resource}s are not available, or otherwise blocked
......@@ -955,14 +1010,22 @@ public class Queue extends ResourceController implements Saveable {
@Exported
public boolean isStuck() { return false; }
protected Item(Task task, List<Action> actions, int id) {
/**
* Can be used to wait for the completion (either normal, abnormal, or cancellation) of the {@link Task}.
* <p>
* Just like {@link #id}, the same object tracks various stages of the queue.
*/
public Future<Executable> getFuture() { return future; }
protected Item(Task task, List<Action> actions, int id, AsyncFutureImpl<Executable> future) {
this.task = task;
this.id = id;
this.future = future;
for (Action action: actions) addAction(action);
}
protected Item(Item item) {
this(item.task, item.getActions(), item.id);
this(item.task, item.getActions(), item.id, item.future);
}
/**
......@@ -992,6 +1055,18 @@ public class Queue extends ResourceController implements Saveable {
Hudson.getInstance().getQueue().cancel(this);
rsp.forwardToPreviousPage(req);
}
/**
* Participates in the cancellation logic to set the {@link #future} accordingly.
*/
/*package*/ void onCancelled() {
future.setAsCancelled();
}
private Object readResolve() {
this.future = new AsyncFutureImpl<Executable>();
return this;
}
}
/**
......@@ -1019,7 +1094,7 @@ public class Queue extends ResourceController implements Saveable {
public Calendar timestamp;
WaitingItem(Calendar timestamp, Task project, List<Action> actions) {
super(project, actions, COUNTER.incrementAndGet());
super(project, actions, COUNTER.incrementAndGet(), new AsyncFutureImpl<Executable>());
this.timestamp = timestamp;
}
......@@ -1275,5 +1350,29 @@ public class Queue extends ResourceController implements Saveable {
public ItemList<T> values() {
return this;
}
/**
* Works like {@link #remove(Task)} but also marks the {@link Item} as cancelled.
*/
public T cancel(Task p) {
T x = remove(p);
if(x!=null) x.onCancelled();
return x;
}
/**
* Works like {@link #remove(Object)} but also marks the {@link Item} as cancelled.
*/
public boolean cancel(Item t) {
boolean r = remove(t);
if(r) t.onCancelled();
return r;
}
public void cancelAll() {
for (T t : this)
t.onCancelled();
clear();
}
}
}
......@@ -26,6 +26,7 @@ package hudson.remoting;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.CancellationException;
/**
* {@link Future} implementation whose computation is carried out elsewhere.
......@@ -35,19 +36,29 @@ import java.util.concurrent.TimeoutException;
* @author Kohsuke Kawaguchi
*/
public class AsyncFutureImpl<V> implements Future<V> {
/**
* Setting this field to true will indicate that the computation is completed.
*
* <p>
* One of the following three fields also needs to be set at the same time.
*/
private boolean completed;
private V value;
private Throwable problem;
private boolean completed;
private boolean cancelled;
/**
* Not cancellable.
* @deprecated
* Not externally cancellable, since this class doesn't know where the computation is actually happening.
* So you shouldn't be calling this method.
*/
public boolean cancel(boolean mayInterruptIfRunning) {
return false;
}
public boolean isCancelled() {
return false;
return cancelled;
}
public synchronized boolean isDone() {
......@@ -59,6 +70,8 @@ public class AsyncFutureImpl<V> implements Future<V> {
wait();
if(problem!=null)
throw new ExecutionException(problem);
if(cancelled)
throw new CancellationException();
return value;
}
......@@ -67,6 +80,8 @@ public class AsyncFutureImpl<V> implements Future<V> {
wait(unit.toMillis(timeout));
if(!completed)
throw new TimeoutException();
if(cancelled)
throw new CancellationException();
return get();
}
......@@ -81,4 +96,13 @@ public class AsyncFutureImpl<V> implements Future<V> {
this.problem = problem;
notifyAll();
}
/**
* Marks this task as cancelled.
*/
public synchronized void setAsCancelled() {
completed = true;
cancelled = true;
notifyAll();
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册