提交 07a9a56c 编写于 作者: F Fabian Hueske

[FLINK-1683] [jobmanager] Fix scheduling preference choice for non-unary execution tasks.

This closes #476
上级 81ebe980
......@@ -47,6 +47,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import static com.google.common.base.Preconditions.checkElementIndex;
......@@ -348,22 +349,39 @@ public class ExecutionVertex implements Serializable {
return Collections.emptySet();
}
else {
HashSet<Instance> locations = new HashSet<Instance>();
Set<Instance> locations = new HashSet<Instance>();
Set<Instance> inputLocations = new HashSet<Instance>();
// go over all inputs
for (int i = 0; i < inputEdges.length; i++) {
inputLocations.clear();
ExecutionEdge[] sources = inputEdges[i];
if (sources != null) {
// go over all input sources
for (int k = 0; k < sources.length; k++) {
// look-up assigned slot of input source
SimpleSlot sourceSlot = sources[k].getSource().getProducer().getCurrentAssignedResource();
if (sourceSlot != null) {
locations.add(sourceSlot.getInstance());
if (locations.size() > MAX_DISTINCT_LOCATIONS_TO_CONSIDER) {
return null;
// add input location
inputLocations.add(sourceSlot.getInstance());
// inputs which have too many distinct sources are not considered
if (inputLocations.size() > MAX_DISTINCT_LOCATIONS_TO_CONSIDER) {
inputLocations.clear();
break;
}
}
}
}
// keep the locations of the input with the least preferred locations
if(locations.isEmpty() || // nothing assigned yet
(!inputLocations.isEmpty() && inputLocations.size() < locations.size())) {
// current input has fewer preferred locations
locations.clear();
locations.addAll(inputLocations);
}
}
return locations;
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册