提交 513a45b3 编写于 作者: K Kohsuke Kawaguchi

Added QueueListener extension point

This can track items as they go through the queue.
上级 7b2541d4
......@@ -61,6 +61,8 @@ Upcoming changes</a>
<li class=bug>
“Projects tied to slave” shows unrelated Maven module jobs.
(<a href="https://issues.jenkins-ci.org/browse/JENKINS-17451">issue 17451</a>)
<li class=rfe>
Added a new extension point to monitor the flow of stuff in the queue.
<li class=rfe>
Executors running the builds can be now a subject of access control.
(<a href="https://issues.jenkins-ci.org/browse/JENKINS-18285">issue 18285</a>)
......
......@@ -44,6 +44,7 @@ import hudson.cli.declarative.CLIResolver;
import hudson.model.labels.LabelAssignmentAction;
import hudson.model.queue.AbstractQueueTask;
import hudson.model.queue.Executables;
import hudson.model.queue.QueueListener;
import hudson.model.queue.QueueTaskFuture;
import hudson.model.queue.SubTask;
import hudson.model.queue.FutureImpl;
......@@ -141,6 +142,8 @@ import org.kohsuke.stapler.interceptor.RequirePOST;
* cancels a job in the queue.) See the corresponding field for their exact meanings.
*
* @author Kohsuke Kawaguchi
* @see QueueListener
* @see QueueTaskDispatcher
*/
@ExportedBean
public class Queue extends ResourceController implements Saveable {
......@@ -365,11 +368,11 @@ public class Queue extends ResourceController implements Saveable {
maxId = Math.max(maxId, item.id);
if (item instanceof WaitingItem) {
waitingList.add((WaitingItem) item);
item.enter(this);
} else if (item instanceof BlockedItem) {
blockedProjects.put(item.task, (BlockedItem) item);
item.enter(this);
} else if (item instanceof BuildableItem) {
buildables.add((BuildableItem) item);
item.enter(this);
} else {
throw new IllegalStateException("Unknown item type! " + item);
}
......@@ -421,10 +424,10 @@ public class Queue extends ResourceController implements Saveable {
@CLIMethod(name="clear-queue")
public synchronized void clear() {
Jenkins.getInstance().checkPermission(Jenkins.ADMINISTER);
for (WaitingItem i : waitingList)
i.onCancelled();
waitingList.clear();
for (WaitingItem i : new ArrayList<WaitingItem>(waitingList)) // copy the list as we'll modify it in the loop
i.cancel(this);
blockedProjects.cancelAll();
pendings.cancelAll();
buildables.cancelAll();
scheduleMaintenance();
}
......@@ -537,7 +540,7 @@ public class Queue extends ResourceController implements Saveable {
// put the item in the queue
WaitingItem added = new WaitingItem(due,p,actions);
waitingList.add(added);
added.enter(this);
scheduleMaintenance(); // let an executor know that a new item is in the queue.
return added;
}
......@@ -566,16 +569,17 @@ public class Queue extends ResourceController implements Saveable {
}
// waitingList is sorted, so when we change a timestamp we need to maintain order
waitingList.remove(wi);
wi.leave(this);
wi.timestamp = due;
waitingList.add(wi);
wi.enter(this);
queueUpdated=true;
}
if (queueUpdated) scheduleMaintenance();
return null;
}
/**
* @deprecated as of 1.311
* Use {@link #schedule(Task, int)}
......@@ -611,12 +615,9 @@ public class Queue extends ResourceController implements Saveable {
*/
public synchronized boolean cancel(Task p) {
LOGGER.log(Level.FINE, "Cancelling {0}", p);
for (Iterator<WaitingItem> itr = waitingList.iterator(); itr.hasNext();) {
Item item = itr.next();
for (WaitingItem item : waitingList) {
if (item.task.equals(p)) {
itr.remove();
item.onCancelled();
return true;
return item.cancel(this);
}
}
// use bitwise-OR to make sure that both branches get evaluated all the time
......@@ -625,14 +626,11 @@ public class Queue extends ResourceController implements Saveable {
public synchronized boolean cancel(Item item) {
LOGGER.log(Level.FINE, "Cancelling {0} item#{1}", new Object[] {item.task, item.id});
// use bitwise-OR to make sure that all the branches get evaluated all the time
boolean r = (item instanceof WaitingItem && waitingList.remove(item)) | blockedProjects.remove(item) | buildables.remove(item);
boolean r = item.leave(this);
LeftItem li = new LeftItem(item);
leftItems.put(li.id,li);
li.enter(this);
if(r)
item.onCancelled();
return r;
}
......@@ -892,7 +890,7 @@ public class Queue extends ResourceController implements Saveable {
pendings.remove(wu.context.item);
LeftItem li = new LeftItem(wu.context);
leftItems.put(li.id,li);
li.enter(this);
return wu;
}
......@@ -939,7 +937,7 @@ public class Queue extends ResourceController implements Saveable {
if (offer.workUnit.isMainWork()) {
pendings.remove(offer.workUnit.context.item);
LeftItem li = new LeftItem(offer.workUnit.context);
leftItems.put(li.id,li);
li.enter(this);
}
return offer.workUnit;
......@@ -1028,13 +1026,10 @@ public class Queue extends ResourceController implements Saveable {
LOGGER.log(Level.FINE, "Queue maintenance started {0}", this);
{// blocked -> buildable
Iterator<BlockedItem> itr = blockedProjects.values().iterator();
while (itr.hasNext()) {
BlockedItem p = itr.next();
for (BlockedItem p : new ArrayList<BlockedItem>(blockedProjects.values())) {// copy as we'll mutate the list
if (!isBuildBlocked(p) && allowNewBuildableTask(p.task)) {
// ready to be executed
LOGGER.log(Level.FINE, "{0} no longer blocked", p.task);
itr.remove();
p.leave(this);
makeBuildable(new BuildableItem(p));
}
}
......@@ -1047,17 +1042,15 @@ public class Queue extends ResourceController implements Saveable {
if (!top.timestamp.before(new GregorianCalendar()))
break; // finished moving all ready items from queue
waitingList.remove(top);
top.leave(this);
Task p = top.task;
if (!isBuildBlocked(top) && allowNewBuildableTask(p)) {
// ready to be executed immediately
LOGGER.log(Level.FINE, "{0} ready to build", p);
makeBuildable(new BuildableItem(top));
} else {
// this can't be built now because another build is in progress
// set this project aside.
LOGGER.log(Level.FINE, "{0} is blocked", p);
blockedProjects.put(p,new BlockedItem(top));
new BlockedItem(top).enter(this);
}
}
......@@ -1066,21 +1059,18 @@ public class Queue extends ResourceController implements Saveable {
s.sortBuildableItems(buildables);
// allocate buildable jobs to executors
Iterator<BuildableItem> itr = buildables.iterator();
while (itr.hasNext()) {
BuildableItem p = itr.next();
for (BuildableItem p : new ArrayList<BuildableItem>(buildables)) {// copy as we'll mutate the list in the loop
// one last check to make sure this build is not blocked.
if (isBuildBlocked(p)) {
itr.remove();
blockedProjects.put(p.task,new BlockedItem(p));
p.leave(this);
new BlockedItem(p).enter(this);
LOGGER.log(Level.FINE, "Catching that {0} is blocked in the last minute", p);
continue;
}
List<JobOffer> candidates = new ArrayList<JobOffer>(parked.size());
for (JobOffer j : parked.values())
if(j.canTake(p))
if (j.canTake(p))
candidates.add(j);
MappingWorksheet ws = new MappingWorksheet(p, candidates);
......@@ -1089,7 +1079,7 @@ public class Queue extends ResourceController implements Saveable {
// if we couldn't find the executor that fits,
// just leave it in the buildables list and
// check if we can execute other projects
LOGGER.log(Level.FINER, "Failed to map {0} to executors. candidates={1} parked={2}", new Object[] {p, candidates, parked.values()});
LOGGER.log(Level.FINER, "Failed to map {0} to executors. candidates={1} parked={2}", new Object[]{p, candidates, parked.values()});
continue;
}
......@@ -1097,7 +1087,7 @@ public class Queue extends ResourceController implements Saveable {
WorkUnitContext wuc = new WorkUnitContext(p);
m.execute(wuc);
itr.remove();
p.leave(this);
if (!wuc.getWorkUnits().isEmpty())
makePending(p);
else
......@@ -1131,8 +1121,8 @@ public class Queue extends ResourceController implements Saveable {
// if the execution get here, it means we couldn't schedule it anywhere.
// so do the scheduling like other normal jobs.
}
buildables.put(p.task,p);
p.enter(this);
}
private boolean makePending(BuildableItem p) {
......@@ -1557,13 +1547,6 @@ public class Queue extends ResourceController implements Saveable {
}
/**
* Participates in the cancellation logic to set the {@link #future} accordingly.
*/
/*package*/ void onCancelled() {
future.setAsCancelled();
}
public Api getApi() {
return new Api(this);
}
......@@ -1577,6 +1560,27 @@ public class Queue extends ResourceController implements Saveable {
public String toString() {
return getClass().getName() + ':' + task + ':' + getWhy();
}
/**
* Enters the appropriate queue for this type of item.
*/
/*package*/ abstract void enter(Queue q);
/**
* Leaves the appropriate queue for this type of item.
*/
/*package*/ abstract boolean leave(Queue q);
/**
* Cancels this item, which updates {@link #future} to notify the listener, and
* also leaves the queue.
*/
/*package*/ boolean cancel(Queue q) {
boolean r = leave(q);
if (r) future.setAsCancelled();
return r;
}
}
/**
......@@ -1657,6 +1661,38 @@ public class Queue extends ResourceController implements Saveable {
else
return CauseOfBlockage.fromMessage(Messages._Queue_Unknown());
}
@Override
/*package*/ void enter(Queue q) {
if (q.waitingList.add(this)) {
for (QueueListener ql : QueueListener.all()) {
try {
ql.onEnterWaiting(this);
} catch (Throwable e) {
// don't let this kill the queue
LOGGER.log(Level.WARNING, "QueueListener failed while processing "+this,e);
}
}
}
}
@Override
/*package*/ boolean leave(Queue q) {
boolean r = q.waitingList.remove(this);
if (r) {
for (QueueListener ql : QueueListener.all()) {
try {
ql.onLeaveWaiting(this);
} catch (Throwable e) {
// don't let this kill the queue
LOGGER.log(Level.WARNING, "QueueListener failed while processing "+this,e);
}
}
}
return r;
}
}
/**
......@@ -1708,6 +1744,35 @@ public class Queue extends ResourceController implements Saveable {
return task.getCauseOfBlockage();
}
/*package*/ void enter(Queue q) {
LOGGER.log(Level.FINE, "{0} is blocked", this);
blockedProjects.add(this);
for (QueueListener ql : QueueListener.all()) {
try {
ql.onEnterBlocked(this);
} catch (Throwable e) {
// don't let this kill the queue
LOGGER.log(Level.WARNING, "QueueListener failed while processing "+this,e);
}
}
}
/*package*/ boolean leave(Queue q) {
boolean r = blockedProjects.remove(this);
if (r) {
LOGGER.log(Level.FINE, "{0} no longer blocked", this);
for (QueueListener ql : QueueListener.all()) {
try {
ql.onLeaveBlocked(this);
} catch (Throwable e) {
// don't let this kill the queue
LOGGER.log(Level.WARNING, "QueueListener failed while processing "+this,e);
}
}
}
return r;
}
}
/**
......@@ -1789,6 +1854,36 @@ public class Queue extends ResourceController implements Saveable {
public boolean isPending() {
return isPending;
}
@Override
/*package*/ void enter(Queue q) {
q.buildables.add(this);
for (QueueListener ql : QueueListener.all()) {
try {
ql.onEnterBuildable(this);
} catch (Throwable e) {
// don't let this kill the queue
LOGGER.log(Level.WARNING, "QueueListener failed while processing "+this,e);
}
}
}
@Override
/*package*/ boolean leave(Queue q) {
boolean r = q.buildables.remove(this);
if (r) {
LOGGER.log(Level.FINE, "{0} no longer blocked", this);
for (QueueListener ql : QueueListener.all()) {
try {
ql.onLeaveBuildable(this);
} catch (Throwable e) {
// don't let this kill the queue
LOGGER.log(Level.WARNING, "QueueListener failed while processing "+this,e);
}
}
}
return r;
}
}
/**
......@@ -1837,6 +1932,25 @@ public class Queue extends ResourceController implements Saveable {
public boolean isCancelled() {
return outcome==null;
}
@Override
void enter(Queue q) {
q.leftItems.put(id,this);
for (QueueListener ql : QueueListener.all()) {
try {
ql.onLeft(this);
} catch (Throwable e) {
// don't let this kill the queue
LOGGER.log(Level.WARNING, "QueueListener failed while processing "+this,e);
}
}
}
@Override
boolean leave(Queue q) {
// there's no leave operation
return false;
}
}
private static final Logger LOGGER = Logger.getLogger(Queue.class.getName());
......@@ -1944,7 +2058,7 @@ public class Queue extends ResourceController implements Saveable {
/**
* {@link ArrayList} of {@link Item} with more convenience methods.
*/
private static class ItemList<T extends Item> extends ArrayList<T> {
private class ItemList<T extends Item> extends ArrayList<T> {
public T get(Task task) {
for (T item: this) {
if (item.task == task) {
......@@ -1993,24 +2107,16 @@ public class Queue extends ResourceController implements Saveable {
* Works like {@link #remove(Task)} but also marks the {@link Item} as cancelled.
*/
public T cancel(Task p) {
T x = remove(p);
if(x!=null) x.onCancelled();
T x = get(p);
if(x!=null) x.cancel(Queue.this);
return x;
}
/**
* Works like {@link #remove(Object)} but also marks the {@link Item} as cancelled.
*/
public boolean cancel(Item t) {
boolean r = remove(t);
if(r) t.onCancelled();
return r;
}
public void cancelAll() {
for (T t : this)
t.onCancelled();
clear();
for (T t : new ArrayList<T>(this))
t.cancel(Queue.this);
clear(); // just to be sure
}
}
......
package hudson.model.queue;
import hudson.ExtensionList;
import hudson.ExtensionPoint;
import hudson.model.Computer;
import hudson.model.Queue;
import hudson.model.Queue.BlockedItem;
import hudson.model.Queue.BuildableItem;
import hudson.model.Queue.Item;
import hudson.model.Queue.LeftItem;
import hudson.model.Queue.WaitingItem;
import jenkins.model.Jenkins;
import java.util.concurrent.Executor;
/**
* Listener for events in {@link Queue}.
*
* <p>
* {@link Queue} is highly synchronized objects, and these callbacks are invoked synchronously.
* To avoid the risk of deadlocks and general slow down, please minimize the amount of work callbacks
* will perform, and push any sizable work to asynchronous execution via {@link Executor}, such as
* {@link Computer#threadPoolForRemoting}.
*
* <p>
* For the state transition of {@link Queue.Item} in {@link Queue}, please refer to the Queue javadoc.
*
* @author Kohsuke Kawaguchi
* @since 1.520
*/
public abstract class QueueListener implements ExtensionPoint {
/**
* When a task is submitted to the queue, it first gets to the waiting phase,
* where it spends until the quiet period runs out and the item becomes executable.
*
* @see WaitingItem#timestamp
*/
public void onEnterWaiting(WaitingItem wi) {}
/**
* An item leaves the waiting phase when the current time of the system is past its
* {@linkplain WaitingItem#timestamp due date}. The item will then enter either the blocked phase
* or the buildable phase.
*/
public void onLeaveWaiting(WaitingItem wi) {}
/**
* An item enters the blocked phase when there's someone saying "NO" to it proceeding to the buildable phase,
* such as {@link QueueTaskDispatcher}. Note that waiting for an executor to become available is not a part of this.
*/
public void onEnterBlocked(BlockedItem bi) {}
/**
* An item leaves the blocked phase and becomes buildable when there's no one vetoing.
*/
public void onLeaveBlocked(BlockedItem bi) {}
/**
* An item enters the buildable phase when all signals are green (or blue!) and it's just waiting
* for the scheduler to allocate it to the available executor. An item can spend considerable time
* in this phase for example if all the executors are busy.
*/
public void onEnterBuildable(BuildableItem bi) {}
/**
* An item leaves the buildable phase.
*
* It will move to the "left" state if the executors are allocated to it, or it will move to the
* blocked phase if someone starts vetoing once again.
*/
public void onLeaveBuildable(BuildableItem bi) {}
/**
* The item has left the queue, either by getting {@link Queue#cancel(Item) cancelled} or by getting
* picked up by an executor and starts running.
*/
public void onLeft(LeftItem li) {}
/**
* Returns all the registered {@link QueueListener}s.
*/
public static ExtensionList<QueueListener> all() {
return Jenkins.getInstance().getExtensionList(QueueListener.class);
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册