From b87f2fa2a06841973c5aaf424e8984bda24a9276 Mon Sep 17 00:00:00 2001 From: Stephan Ewen Date: Tue, 23 Sep 2014 19:54:01 +0200 Subject: [PATCH] Improve error message when scheduler cannot find a slot for immediate scheduling. --- .../NoResourceAvailableException.java | 11 ++++++++-- .../jobmanager/scheduler/Scheduler.java | 20 ++++++++++++++++--- 2 files changed, 26 insertions(+), 5 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/NoResourceAvailableException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/NoResourceAvailableException.java index c1c3f94aa8d..11fec7201fa 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/NoResourceAvailableException.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/NoResourceAvailableException.java @@ -23,16 +23,23 @@ import org.apache.flink.runtime.JobException; public class NoResourceAvailableException extends JobException { private static final long serialVersionUID = -2249953165298717803L; + + private static final String BASE_MESSAGE = "Not enough free slots available to run the job. " + + "You can decrease the operator parallelism or increase the number of slots per TaskManager in the configuration."; public NoResourceAvailableException() { - super("Not enough free slots available to run the job. " - + "You can decrease the operator parallelism or increase the number of slots per TaskManager in the configuration."); + super(BASE_MESSAGE); } public NoResourceAvailableException(ScheduledUnit unit) { super("No resource available to schedule unit " + unit + ". You can decrease the operator parallelism or increase the number of slots per TaskManager in the configuration."); } + + NoResourceAvailableException(int numInstances, int numSlotsTotal) { + super(String.format("%s Resources available to scheduler: Number of instances=%d, total number of slots=%d", + BASE_MESSAGE, numInstances, numSlotsTotal)); + } public NoResourceAvailableException(String message) { super(message); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java index a3b84716d16..9ef30b8d188 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java @@ -108,6 +108,20 @@ public class Scheduler implements InstanceListener, SlotAvailablilityListener { return count; } + public int getTotalNumberOfSlots() { + int count = 0; + + synchronized (globalLock) { + for (Instance instance : allInstances) { + if (instance.isAlive()) { + count += instance.getTotalNumberOfSlots(); + } + } + } + + return count; + } + // -------------------------------------------------------------------------------------------- // Scheduling // -------------------------------------------------------------------------------------------- @@ -198,7 +212,7 @@ public class Scheduler implements InstanceListener, SlotAvailablilityListener { if (slotFromGroup == null) { // both null if (constraint == null || constraint.isUnassigned()) { - throw new NoResourceAvailableException(); + throw new NoResourceAvailableException(getNumberOfAvailableInstances(), getTotalNumberOfSlots()); } else { throw new NoResourceAvailableException("Could not allocate a slot on instance " + constraint.getLocation() + ", as required by the co-location constraint."); @@ -271,7 +285,7 @@ public class Scheduler implements InstanceListener, SlotAvailablilityListener { return future; } else { - throw new NoResourceAvailableException(task); + throw new NoResourceAvailableException(getNumberOfAvailableInstances(), getTotalNumberOfSlots()); } } } @@ -439,7 +453,7 @@ public class Scheduler implements InstanceListener, SlotAvailablilityListener { throw new RuntimeException(locality.name()); } } - + // -------------------------------------------------------------------------------------------- // Instance Availability // -------------------------------------------------------------------------------------------- -- GitLab