From 513a45b3091b88d1ad9020099bbd1aec04a8c686 Mon Sep 17 00:00:00 2001 From: Kohsuke Kawaguchi Date: Thu, 13 Jun 2013 11:23:53 +0300 Subject: [PATCH] Added QueueListener extension point This can track items as they go through the queue. --- changelog.html | 2 + core/src/main/java/hudson/model/Queue.java | 234 +++++++++++++----- .../hudson/model/queue/QueueListener.java | 85 +++++++ 3 files changed, 257 insertions(+), 64 deletions(-) create mode 100644 core/src/main/java/hudson/model/queue/QueueListener.java diff --git a/changelog.html b/changelog.html index 41295cce38..cc64b13190 100644 --- a/changelog.html +++ b/changelog.html @@ -61,6 +61,8 @@ Upcoming changes
  • “Projects tied to slave” shows unrelated Maven module jobs. (issue 17451) +
  • + Added a new extension point to monitor the flow of stuff in the queue.
  • Executors running the builds can be now a subject of access control. (issue 18285) diff --git a/core/src/main/java/hudson/model/Queue.java b/core/src/main/java/hudson/model/Queue.java index 00d357dfad..524b670634 100644 --- a/core/src/main/java/hudson/model/Queue.java +++ b/core/src/main/java/hudson/model/Queue.java @@ -44,6 +44,7 @@ import hudson.cli.declarative.CLIResolver; import hudson.model.labels.LabelAssignmentAction; import hudson.model.queue.AbstractQueueTask; import hudson.model.queue.Executables; +import hudson.model.queue.QueueListener; import hudson.model.queue.QueueTaskFuture; import hudson.model.queue.SubTask; import hudson.model.queue.FutureImpl; @@ -141,6 +142,8 @@ import org.kohsuke.stapler.interceptor.RequirePOST; * cancels a job in the queue.) See the corresponding field for their exact meanings. * * @author Kohsuke Kawaguchi + * @see QueueListener + * @see QueueTaskDispatcher */ @ExportedBean public class Queue extends ResourceController implements Saveable { @@ -365,11 +368,11 @@ public class Queue extends ResourceController implements Saveable { maxId = Math.max(maxId, item.id); if (item instanceof WaitingItem) { - waitingList.add((WaitingItem) item); + item.enter(this); } else if (item instanceof BlockedItem) { - blockedProjects.put(item.task, (BlockedItem) item); + item.enter(this); } else if (item instanceof BuildableItem) { - buildables.add((BuildableItem) item); + item.enter(this); } else { throw new IllegalStateException("Unknown item type! " + item); } @@ -421,10 +424,10 @@ public class Queue extends ResourceController implements Saveable { @CLIMethod(name="clear-queue") public synchronized void clear() { Jenkins.getInstance().checkPermission(Jenkins.ADMINISTER); - for (WaitingItem i : waitingList) - i.onCancelled(); - waitingList.clear(); + for (WaitingItem i : new ArrayList(waitingList)) // copy the list as we'll modify it in the loop + i.cancel(this); blockedProjects.cancelAll(); + pendings.cancelAll(); buildables.cancelAll(); scheduleMaintenance(); } @@ -537,7 +540,7 @@ public class Queue extends ResourceController implements Saveable { // put the item in the queue WaitingItem added = new WaitingItem(due,p,actions); - waitingList.add(added); + added.enter(this); scheduleMaintenance(); // let an executor know that a new item is in the queue. return added; } @@ -566,16 +569,17 @@ public class Queue extends ResourceController implements Saveable { } // waitingList is sorted, so when we change a timestamp we need to maintain order - waitingList.remove(wi); + wi.leave(this); wi.timestamp = due; - waitingList.add(wi); + wi.enter(this); queueUpdated=true; } if (queueUpdated) scheduleMaintenance(); return null; } - + + /** * @deprecated as of 1.311 * Use {@link #schedule(Task, int)} @@ -611,12 +615,9 @@ public class Queue extends ResourceController implements Saveable { */ public synchronized boolean cancel(Task p) { LOGGER.log(Level.FINE, "Cancelling {0}", p); - for (Iterator itr = waitingList.iterator(); itr.hasNext();) { - Item item = itr.next(); + for (WaitingItem item : waitingList) { if (item.task.equals(p)) { - itr.remove(); - item.onCancelled(); - return true; + return item.cancel(this); } } // use bitwise-OR to make sure that both branches get evaluated all the time @@ -625,14 +626,11 @@ public class Queue extends ResourceController implements Saveable { public synchronized boolean cancel(Item item) { LOGGER.log(Level.FINE, "Cancelling {0} item#{1}", new Object[] {item.task, item.id}); - // use bitwise-OR to make sure that all the branches get evaluated all the time - boolean r = (item instanceof WaitingItem && waitingList.remove(item)) | blockedProjects.remove(item) | buildables.remove(item); + boolean r = item.leave(this); LeftItem li = new LeftItem(item); - leftItems.put(li.id,li); + li.enter(this); - if(r) - item.onCancelled(); return r; } @@ -892,7 +890,7 @@ public class Queue extends ResourceController implements Saveable { pendings.remove(wu.context.item); LeftItem li = new LeftItem(wu.context); - leftItems.put(li.id,li); + li.enter(this); return wu; } @@ -939,7 +937,7 @@ public class Queue extends ResourceController implements Saveable { if (offer.workUnit.isMainWork()) { pendings.remove(offer.workUnit.context.item); LeftItem li = new LeftItem(offer.workUnit.context); - leftItems.put(li.id,li); + li.enter(this); } return offer.workUnit; @@ -1028,13 +1026,10 @@ public class Queue extends ResourceController implements Saveable { LOGGER.log(Level.FINE, "Queue maintenance started {0}", this); {// blocked -> buildable - Iterator itr = blockedProjects.values().iterator(); - while (itr.hasNext()) { - BlockedItem p = itr.next(); + for (BlockedItem p : new ArrayList(blockedProjects.values())) {// copy as we'll mutate the list if (!isBuildBlocked(p) && allowNewBuildableTask(p.task)) { // ready to be executed - LOGGER.log(Level.FINE, "{0} no longer blocked", p.task); - itr.remove(); + p.leave(this); makeBuildable(new BuildableItem(p)); } } @@ -1047,17 +1042,15 @@ public class Queue extends ResourceController implements Saveable { if (!top.timestamp.before(new GregorianCalendar())) break; // finished moving all ready items from queue - waitingList.remove(top); + top.leave(this); Task p = top.task; if (!isBuildBlocked(top) && allowNewBuildableTask(p)) { // ready to be executed immediately - LOGGER.log(Level.FINE, "{0} ready to build", p); makeBuildable(new BuildableItem(top)); } else { // this can't be built now because another build is in progress // set this project aside. - LOGGER.log(Level.FINE, "{0} is blocked", p); - blockedProjects.put(p,new BlockedItem(top)); + new BlockedItem(top).enter(this); } } @@ -1066,21 +1059,18 @@ public class Queue extends ResourceController implements Saveable { s.sortBuildableItems(buildables); // allocate buildable jobs to executors - Iterator itr = buildables.iterator(); - while (itr.hasNext()) { - BuildableItem p = itr.next(); - + for (BuildableItem p : new ArrayList(buildables)) {// copy as we'll mutate the list in the loop // one last check to make sure this build is not blocked. if (isBuildBlocked(p)) { - itr.remove(); - blockedProjects.put(p.task,new BlockedItem(p)); + p.leave(this); + new BlockedItem(p).enter(this); LOGGER.log(Level.FINE, "Catching that {0} is blocked in the last minute", p); continue; } List candidates = new ArrayList(parked.size()); for (JobOffer j : parked.values()) - if(j.canTake(p)) + if (j.canTake(p)) candidates.add(j); MappingWorksheet ws = new MappingWorksheet(p, candidates); @@ -1089,7 +1079,7 @@ public class Queue extends ResourceController implements Saveable { // if we couldn't find the executor that fits, // just leave it in the buildables list and // check if we can execute other projects - LOGGER.log(Level.FINER, "Failed to map {0} to executors. candidates={1} parked={2}", new Object[] {p, candidates, parked.values()}); + LOGGER.log(Level.FINER, "Failed to map {0} to executors. candidates={1} parked={2}", new Object[]{p, candidates, parked.values()}); continue; } @@ -1097,7 +1087,7 @@ public class Queue extends ResourceController implements Saveable { WorkUnitContext wuc = new WorkUnitContext(p); m.execute(wuc); - itr.remove(); + p.leave(this); if (!wuc.getWorkUnits().isEmpty()) makePending(p); else @@ -1131,8 +1121,8 @@ public class Queue extends ResourceController implements Saveable { // if the execution get here, it means we couldn't schedule it anywhere. // so do the scheduling like other normal jobs. } - - buildables.put(p.task,p); + + p.enter(this); } private boolean makePending(BuildableItem p) { @@ -1557,13 +1547,6 @@ public class Queue extends ResourceController implements Saveable { } - /** - * Participates in the cancellation logic to set the {@link #future} accordingly. - */ - /*package*/ void onCancelled() { - future.setAsCancelled(); - } - public Api getApi() { return new Api(this); } @@ -1577,6 +1560,27 @@ public class Queue extends ResourceController implements Saveable { public String toString() { return getClass().getName() + ':' + task + ':' + getWhy(); } + + /** + * Enters the appropriate queue for this type of item. + */ + /*package*/ abstract void enter(Queue q); + + /** + * Leaves the appropriate queue for this type of item. + */ + /*package*/ abstract boolean leave(Queue q); + + /** + * Cancels this item, which updates {@link #future} to notify the listener, and + * also leaves the queue. + */ + /*package*/ boolean cancel(Queue q) { + boolean r = leave(q); + if (r) future.setAsCancelled(); + return r; + } + } /** @@ -1657,6 +1661,38 @@ public class Queue extends ResourceController implements Saveable { else return CauseOfBlockage.fromMessage(Messages._Queue_Unknown()); } + + @Override + /*package*/ void enter(Queue q) { + if (q.waitingList.add(this)) { + for (QueueListener ql : QueueListener.all()) { + try { + ql.onEnterWaiting(this); + } catch (Throwable e) { + // don't let this kill the queue + LOGGER.log(Level.WARNING, "QueueListener failed while processing "+this,e); + } + } + } + } + + @Override + /*package*/ boolean leave(Queue q) { + boolean r = q.waitingList.remove(this); + if (r) { + for (QueueListener ql : QueueListener.all()) { + try { + ql.onLeaveWaiting(this); + } catch (Throwable e) { + // don't let this kill the queue + LOGGER.log(Level.WARNING, "QueueListener failed while processing "+this,e); + } + } + } + return r; + } + + } /** @@ -1708,6 +1744,35 @@ public class Queue extends ResourceController implements Saveable { return task.getCauseOfBlockage(); } + + /*package*/ void enter(Queue q) { + LOGGER.log(Level.FINE, "{0} is blocked", this); + blockedProjects.add(this); + for (QueueListener ql : QueueListener.all()) { + try { + ql.onEnterBlocked(this); + } catch (Throwable e) { + // don't let this kill the queue + LOGGER.log(Level.WARNING, "QueueListener failed while processing "+this,e); + } + } + } + + /*package*/ boolean leave(Queue q) { + boolean r = blockedProjects.remove(this); + if (r) { + LOGGER.log(Level.FINE, "{0} no longer blocked", this); + for (QueueListener ql : QueueListener.all()) { + try { + ql.onLeaveBlocked(this); + } catch (Throwable e) { + // don't let this kill the queue + LOGGER.log(Level.WARNING, "QueueListener failed while processing "+this,e); + } + } + } + return r; + } } /** @@ -1789,6 +1854,36 @@ public class Queue extends ResourceController implements Saveable { public boolean isPending() { return isPending; } + + @Override + /*package*/ void enter(Queue q) { + q.buildables.add(this); + for (QueueListener ql : QueueListener.all()) { + try { + ql.onEnterBuildable(this); + } catch (Throwable e) { + // don't let this kill the queue + LOGGER.log(Level.WARNING, "QueueListener failed while processing "+this,e); + } + } + } + + @Override + /*package*/ boolean leave(Queue q) { + boolean r = q.buildables.remove(this); + if (r) { + LOGGER.log(Level.FINE, "{0} no longer blocked", this); + for (QueueListener ql : QueueListener.all()) { + try { + ql.onLeaveBuildable(this); + } catch (Throwable e) { + // don't let this kill the queue + LOGGER.log(Level.WARNING, "QueueListener failed while processing "+this,e); + } + } + } + return r; + } } /** @@ -1837,6 +1932,25 @@ public class Queue extends ResourceController implements Saveable { public boolean isCancelled() { return outcome==null; } + + @Override + void enter(Queue q) { + q.leftItems.put(id,this); + for (QueueListener ql : QueueListener.all()) { + try { + ql.onLeft(this); + } catch (Throwable e) { + // don't let this kill the queue + LOGGER.log(Level.WARNING, "QueueListener failed while processing "+this,e); + } + } + } + + @Override + boolean leave(Queue q) { + // there's no leave operation + return false; + } } private static final Logger LOGGER = Logger.getLogger(Queue.class.getName()); @@ -1944,7 +2058,7 @@ public class Queue extends ResourceController implements Saveable { /** * {@link ArrayList} of {@link Item} with more convenience methods. */ - private static class ItemList extends ArrayList { + private class ItemList extends ArrayList { public T get(Task task) { for (T item: this) { if (item.task == task) { @@ -1993,24 +2107,16 @@ public class Queue extends ResourceController implements Saveable { * Works like {@link #remove(Task)} but also marks the {@link Item} as cancelled. */ public T cancel(Task p) { - T x = remove(p); - if(x!=null) x.onCancelled(); + T x = get(p); + if(x!=null) x.cancel(Queue.this); return x; } - /** - * Works like {@link #remove(Object)} but also marks the {@link Item} as cancelled. - */ - public boolean cancel(Item t) { - boolean r = remove(t); - if(r) t.onCancelled(); - return r; - } - public void cancelAll() { - for (T t : this) - t.onCancelled(); - clear(); + for (T t : new ArrayList(this)) + t.cancel(Queue.this); + + clear(); // just to be sure } } diff --git a/core/src/main/java/hudson/model/queue/QueueListener.java b/core/src/main/java/hudson/model/queue/QueueListener.java new file mode 100644 index 0000000000..93eafcbb73 --- /dev/null +++ b/core/src/main/java/hudson/model/queue/QueueListener.java @@ -0,0 +1,85 @@ +package hudson.model.queue; + +import hudson.ExtensionList; +import hudson.ExtensionPoint; +import hudson.model.Computer; +import hudson.model.Queue; +import hudson.model.Queue.BlockedItem; +import hudson.model.Queue.BuildableItem; +import hudson.model.Queue.Item; +import hudson.model.Queue.LeftItem; +import hudson.model.Queue.WaitingItem; +import jenkins.model.Jenkins; + +import java.util.concurrent.Executor; + +/** + * Listener for events in {@link Queue}. + * + *

    + * {@link Queue} is highly synchronized objects, and these callbacks are invoked synchronously. + * To avoid the risk of deadlocks and general slow down, please minimize the amount of work callbacks + * will perform, and push any sizable work to asynchronous execution via {@link Executor}, such as + * {@link Computer#threadPoolForRemoting}. + * + *

    + * For the state transition of {@link Queue.Item} in {@link Queue}, please refer to the Queue javadoc. + * + * @author Kohsuke Kawaguchi + * @since 1.520 + */ +public abstract class QueueListener implements ExtensionPoint { + /** + * When a task is submitted to the queue, it first gets to the waiting phase, + * where it spends until the quiet period runs out and the item becomes executable. + * + * @see WaitingItem#timestamp + */ + public void onEnterWaiting(WaitingItem wi) {} + + /** + * An item leaves the waiting phase when the current time of the system is past its + * {@linkplain WaitingItem#timestamp due date}. The item will then enter either the blocked phase + * or the buildable phase. + */ + public void onLeaveWaiting(WaitingItem wi) {} + + /** + * An item enters the blocked phase when there's someone saying "NO" to it proceeding to the buildable phase, + * such as {@link QueueTaskDispatcher}. Note that waiting for an executor to become available is not a part of this. + */ + public void onEnterBlocked(BlockedItem bi) {} + + /** + * An item leaves the blocked phase and becomes buildable when there's no one vetoing. + */ + public void onLeaveBlocked(BlockedItem bi) {} + + /** + * An item enters the buildable phase when all signals are green (or blue!) and it's just waiting + * for the scheduler to allocate it to the available executor. An item can spend considerable time + * in this phase for example if all the executors are busy. + */ + public void onEnterBuildable(BuildableItem bi) {} + + /** + * An item leaves the buildable phase. + * + * It will move to the "left" state if the executors are allocated to it, or it will move to the + * blocked phase if someone starts vetoing once again. + */ + public void onLeaveBuildable(BuildableItem bi) {} + + /** + * The item has left the queue, either by getting {@link Queue#cancel(Item) cancelled} or by getting + * picked up by an executor and starts running. + */ + public void onLeft(LeftItem li) {} + + /** + * Returns all the registered {@link QueueListener}s. + */ + public static ExtensionList all() { + return Jenkins.getInstance().getExtensionList(QueueListener.class); + } +} -- GitLab