提交 b87f2fa2 编写于 作者: S Stephan Ewen

Improve error message when scheduler cannot find a slot for immediate scheduling.

上级 1ddec930
...@@ -23,16 +23,23 @@ import org.apache.flink.runtime.JobException; ...@@ -23,16 +23,23 @@ import org.apache.flink.runtime.JobException;
public class NoResourceAvailableException extends JobException { public class NoResourceAvailableException extends JobException {
private static final long serialVersionUID = -2249953165298717803L; private static final long serialVersionUID = -2249953165298717803L;
private static final String BASE_MESSAGE = "Not enough free slots available to run the job. "
+ "You can decrease the operator parallelism or increase the number of slots per TaskManager in the configuration.";
public NoResourceAvailableException() { public NoResourceAvailableException() {
super("Not enough free slots available to run the job. " super(BASE_MESSAGE);
+ "You can decrease the operator parallelism or increase the number of slots per TaskManager in the configuration.");
} }
public NoResourceAvailableException(ScheduledUnit unit) { public NoResourceAvailableException(ScheduledUnit unit) {
super("No resource available to schedule unit " + unit super("No resource available to schedule unit " + unit
+ ". You can decrease the operator parallelism or increase the number of slots per TaskManager in the configuration."); + ". You can decrease the operator parallelism or increase the number of slots per TaskManager in the configuration.");
} }
NoResourceAvailableException(int numInstances, int numSlotsTotal) {
super(String.format("%s Resources available to scheduler: Number of instances=%d, total number of slots=%d",
BASE_MESSAGE, numInstances, numSlotsTotal));
}
public NoResourceAvailableException(String message) { public NoResourceAvailableException(String message) {
super(message); super(message);
......
...@@ -108,6 +108,20 @@ public class Scheduler implements InstanceListener, SlotAvailablilityListener { ...@@ -108,6 +108,20 @@ public class Scheduler implements InstanceListener, SlotAvailablilityListener {
return count; return count;
} }
public int getTotalNumberOfSlots() {
int count = 0;
synchronized (globalLock) {
for (Instance instance : allInstances) {
if (instance.isAlive()) {
count += instance.getTotalNumberOfSlots();
}
}
}
return count;
}
// -------------------------------------------------------------------------------------------- // --------------------------------------------------------------------------------------------
// Scheduling // Scheduling
// -------------------------------------------------------------------------------------------- // --------------------------------------------------------------------------------------------
...@@ -198,7 +212,7 @@ public class Scheduler implements InstanceListener, SlotAvailablilityListener { ...@@ -198,7 +212,7 @@ public class Scheduler implements InstanceListener, SlotAvailablilityListener {
if (slotFromGroup == null) { if (slotFromGroup == null) {
// both null // both null
if (constraint == null || constraint.isUnassigned()) { if (constraint == null || constraint.isUnassigned()) {
throw new NoResourceAvailableException(); throw new NoResourceAvailableException(getNumberOfAvailableInstances(), getTotalNumberOfSlots());
} else { } else {
throw new NoResourceAvailableException("Could not allocate a slot on instance " + throw new NoResourceAvailableException("Could not allocate a slot on instance " +
constraint.getLocation() + ", as required by the co-location constraint."); constraint.getLocation() + ", as required by the co-location constraint.");
...@@ -271,7 +285,7 @@ public class Scheduler implements InstanceListener, SlotAvailablilityListener { ...@@ -271,7 +285,7 @@ public class Scheduler implements InstanceListener, SlotAvailablilityListener {
return future; return future;
} }
else { else {
throw new NoResourceAvailableException(task); throw new NoResourceAvailableException(getNumberOfAvailableInstances(), getTotalNumberOfSlots());
} }
} }
} }
...@@ -439,7 +453,7 @@ public class Scheduler implements InstanceListener, SlotAvailablilityListener { ...@@ -439,7 +453,7 @@ public class Scheduler implements InstanceListener, SlotAvailablilityListener {
throw new RuntimeException(locality.name()); throw new RuntimeException(locality.name());
} }
} }
// -------------------------------------------------------------------------------------------- // --------------------------------------------------------------------------------------------
// Instance Availability // Instance Availability
// -------------------------------------------------------------------------------------------- // --------------------------------------------------------------------------------------------
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册