diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/NoResourceAvailableException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/NoResourceAvailableException.java index c1c3f94aa8db4ea21508d6af6629d33cb3d86033..11fec7201fae0c6ff8598de26ccf5e34293e1375 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/NoResourceAvailableException.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/NoResourceAvailableException.java @@ -23,16 +23,23 @@ import org.apache.flink.runtime.JobException; public class NoResourceAvailableException extends JobException { private static final long serialVersionUID = -2249953165298717803L; + + private static final String BASE_MESSAGE = "Not enough free slots available to run the job. " + + "You can decrease the operator parallelism or increase the number of slots per TaskManager in the configuration."; public NoResourceAvailableException() { - super("Not enough free slots available to run the job. " - + "You can decrease the operator parallelism or increase the number of slots per TaskManager in the configuration."); + super(BASE_MESSAGE); } public NoResourceAvailableException(ScheduledUnit unit) { super("No resource available to schedule unit " + unit + ". You can decrease the operator parallelism or increase the number of slots per TaskManager in the configuration."); } + + NoResourceAvailableException(int numInstances, int numSlotsTotal) { + super(String.format("%s Resources available to scheduler: Number of instances=%d, total number of slots=%d", + BASE_MESSAGE, numInstances, numSlotsTotal)); + } public NoResourceAvailableException(String message) { super(message); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java index a3b84716d161d06e82f19fdc7f4c07498c52d3a4..9ef30b8d1889115f452117e1e59535ec69f2f4a4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java @@ -108,6 +108,20 @@ public class Scheduler implements InstanceListener, SlotAvailablilityListener { return count; } + public int getTotalNumberOfSlots() { + int count = 0; + + synchronized (globalLock) { + for (Instance instance : allInstances) { + if (instance.isAlive()) { + count += instance.getTotalNumberOfSlots(); + } + } + } + + return count; + } + // -------------------------------------------------------------------------------------------- // Scheduling // -------------------------------------------------------------------------------------------- @@ -198,7 +212,7 @@ public class Scheduler implements InstanceListener, SlotAvailablilityListener { if (slotFromGroup == null) { // both null if (constraint == null || constraint.isUnassigned()) { - throw new NoResourceAvailableException(); + throw new NoResourceAvailableException(getNumberOfAvailableInstances(), getTotalNumberOfSlots()); } else { throw new NoResourceAvailableException("Could not allocate a slot on instance " + constraint.getLocation() + ", as required by the co-location constraint."); @@ -271,7 +285,7 @@ public class Scheduler implements InstanceListener, SlotAvailablilityListener { return future; } else { - throw new NoResourceAvailableException(task); + throw new NoResourceAvailableException(getNumberOfAvailableInstances(), getTotalNumberOfSlots()); } } } @@ -439,7 +453,7 @@ public class Scheduler implements InstanceListener, SlotAvailablilityListener { throw new RuntimeException(locality.name()); } } - + // -------------------------------------------------------------------------------------------- // Instance Availability // --------------------------------------------------------------------------------------------