提交 cdee8750 编写于 作者: S Stephan Ewen

Make Co-Location constraints resilient aginst out of order scheduling and...

Make Co-Location constraints resilient aginst out of order scheduling and depply integrate them with slot sharing
Fix miscellaneous checkstyle errors/warnings
上级 8e7216a0
...@@ -183,7 +183,12 @@ public class CliFrontendListCancelTest { ...@@ -183,7 +183,12 @@ public class CliFrontendListCancelTest {
} }
@Override @Override
public int getAvailableSlots() { public int getTotalNumberOfRegisteredSlots() {
return 1;
}
@Override
public int getNumberOfSlotsAvailableToScheduler() throws IOException {
return 1; return 1;
} }
} }
......
...@@ -493,11 +493,10 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> { ...@@ -493,11 +493,10 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> {
if (this.currentIteration != null) { if (this.currentIteration != null) {
AbstractJobVertex head = this.iterations.get(this.currentIteration).getHeadTask(); AbstractJobVertex head = this.iterations.get(this.currentIteration).getHeadTask();
if (head == null) { // the head may still be null if we descend into the static parts first
throw new CompilerException("Found no iteration head task in the postVisit of translating a task inside an iteration"); if (head != null) {
targetVertex.setStrictlyCoLocatedWith(head);
} }
targetVertex.setStrictlyCoLocatedWith(head);
} }
......
...@@ -574,10 +574,15 @@ public class JobManager implements ExtendedManagementProtocol, InputSplitProvide ...@@ -574,10 +574,15 @@ public class JobManager implements ExtendedManagementProtocol, InputSplitProvide
} }
@Override @Override
public int getAvailableSlots() { public int getTotalNumberOfRegisteredSlots() {
return getInstanceManager().getTotalNumberOfSlots(); return getInstanceManager().getTotalNumberOfSlots();
} }
@Override
public int getNumberOfSlotsAvailableToScheduler() {
return scheduler.getNumberOfAvailableSlots();
}
/** /**
* Starts the Jetty Infoserver for the Jobmanager * Starts the Jetty Infoserver for the Jobmanager
* *
......
...@@ -18,30 +18,52 @@ ...@@ -18,30 +18,52 @@
package org.apache.flink.runtime.jobmanager.scheduler; package org.apache.flink.runtime.jobmanager.scheduler;
import org.apache.flink.runtime.AbstractID;
import org.apache.flink.runtime.instance.Instance; import org.apache.flink.runtime.instance.Instance;
import com.google.common.base.Preconditions;
public class CoLocationConstraint { public class CoLocationConstraint {
private volatile Instance location; private final CoLocationGroup group;
private volatile SharedSlot sharedSlot;
public void setLocation(Instance location) {
if (location == null) { CoLocationConstraint(CoLocationGroup group) {
throw new IllegalArgumentException(); Preconditions.checkNotNull(group);
} this.group = group;
}
if (this.location == null) {
this.location = location;
public SharedSlot getSharedSlot() {
return sharedSlot;
}
public Instance getLocation() {
if (sharedSlot != null) {
return sharedSlot.getAllocatedSlot().getInstance();
} else { } else {
throw new IllegalStateException("The constraint has already been assigned a location."); throw new IllegalStateException("Not assigned");
} }
} }
public Instance getLocation() { public void setSharedSlot(SharedSlot sharedSlot) {
return location; if (this.sharedSlot == sharedSlot) {
return;
}
else if (this.sharedSlot == null || this.sharedSlot.isDisposed()) {
this.sharedSlot = sharedSlot;
} else {
throw new IllegalStateException("Overriding shared slot that is still alive.");
}
} }
public boolean isUnassigned() { public boolean isUnassigned() {
return this.location == null; return this.sharedSlot == null;
}
public AbstractID getGroupId() {
return this.group.getId();
} }
} }
...@@ -21,6 +21,7 @@ package org.apache.flink.runtime.jobmanager.scheduler; ...@@ -21,6 +21,7 @@ package org.apache.flink.runtime.jobmanager.scheduler;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import org.apache.flink.runtime.AbstractID;
import org.apache.flink.runtime.jobgraph.AbstractJobVertex; import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
...@@ -29,6 +30,9 @@ public class CoLocationGroup implements java.io.Serializable { ...@@ -29,6 +30,9 @@ public class CoLocationGroup implements java.io.Serializable {
private static final long serialVersionUID = -2605819490401895297L; private static final long serialVersionUID = -2605819490401895297L;
// we use a job vertex ID, because the co location group acts as a unit inside which exclusive sharing of
// slots is used
private final AbstractID id = new AbstractID();
private final List<AbstractJobVertex> vertices = new ArrayList<AbstractJobVertex>(); private final List<AbstractJobVertex> vertices = new ArrayList<AbstractJobVertex>();
...@@ -80,8 +84,12 @@ public class CoLocationGroup implements java.io.Serializable { ...@@ -80,8 +84,12 @@ public class CoLocationGroup implements java.io.Serializable {
if (num > constraints.size()) { if (num > constraints.size()) {
constraints.ensureCapacity(num); constraints.ensureCapacity(num);
for (int i = constraints.size(); i < num; i++) { for (int i = constraints.size(); i < num; i++) {
constraints.add(new CoLocationConstraint()); constraints.add(new CoLocationConstraint(this));
} }
} }
} }
public AbstractID getId() {
return id;
}
} }
...@@ -41,7 +41,7 @@ import org.apache.flink.util.ExceptionUtils; ...@@ -41,7 +41,7 @@ import org.apache.flink.util.ExceptionUtils;
*/ */
public class Scheduler implements InstanceListener, SlotAvailablilityListener { public class Scheduler implements InstanceListener, SlotAvailablilityListener {
private static final Logger LOG = LoggerFactory.getLogger(Scheduler.class); static final Logger LOG = LoggerFactory.getLogger(Scheduler.class);
private final Object globalLock = new Object(); private final Object globalLock = new Object();
...@@ -150,56 +150,6 @@ public class Scheduler implements InstanceListener, SlotAvailablilityListener { ...@@ -150,56 +150,6 @@ public class Scheduler implements InstanceListener, SlotAvailablilityListener {
final ExecutionVertex vertex = task.getTaskToExecute().getVertex(); final ExecutionVertex vertex = task.getTaskToExecute().getVertex();
synchronized (globalLock) { synchronized (globalLock) {
// // 1) === If the task has a strict co-schedule hint, obey it ===
//
// CoLocationConstraint locationConstraint = task.getLocationConstraint();
// if (locationConstraint != null) {
// // location constraints can never be scheduled in a queued fashion
// if (queueIfNoResource) {
// throw new IllegalArgumentException("A task with a location constraint was scheduled in a queued fashion.");
// }
//
// // since we are inside the global lock scope, we can check, allocate, and assign
// // in one atomic action. however, slots may die and be deallocated
//
// // (a) is the constraint has not yet has a slot, get one
// if (locationConstraint.isUnassigned()) {
// // try and get a slot
// AllocatedSlot newSlot = getFreeSlotForTask(vertex);
// if (newSlot == null) {
// throw new NoResourceAvailableException();
// }
// SharedSlot sl = locationConstraint.swapInNewSlot(newSlot);
// SubSlot slot = sl.allocateSubSlot(vertex.getJobvertexId());
//
// updateLocalityCounters(newSlot.getLocality());
// return slot;
// }
// else {
// // try to get a subslot. returns null, if the location's slot has been released
// // in the meantime
// SubSlot slot = locationConstraint.allocateSubSlot(vertex.getJobvertexId());
// if (slot == null) {
// // get a new slot. at the same instance!!!
// Instance location = locationConstraint.getSlot().getAllocatedSlot().getInstance();
// AllocatedSlot newSlot;
// try {
// newSlot = location.allocateSlot(vertex.getJobId());
// } catch (InstanceDiedException e) {
// throw new NoResourceAvailableException("The instance of the required location died.");
// }
// if (newSlot == null) {
// throw new NoResourceAvailableException();
// }
// SharedSlot sharedSlot = locationConstraint.swapInNewSlot(newSlot);
// slot = sharedSlot.allocateSubSlot(vertex.getJobvertexId());
// }
//
// updateLocalityCounters(Locality.LOCAL);
// return slot;
// }
// }
// 1) === If the task has a slot sharing group, schedule with shared slots === // 1) === If the task has a slot sharing group, schedule with shared slots ===
...@@ -213,18 +163,17 @@ public class Scheduler implements InstanceListener, SlotAvailablilityListener { ...@@ -213,18 +163,17 @@ public class Scheduler implements InstanceListener, SlotAvailablilityListener {
final SlotSharingGroupAssignment assignment = sharingUnit.getTaskAssignment(); final SlotSharingGroupAssignment assignment = sharingUnit.getTaskAssignment();
final CoLocationConstraint constraint = task.getLocationConstraint(); final CoLocationConstraint constraint = task.getLocationConstraint();
AllocatedSlot newSlot = null; // get a slot from the group, if the group has one for us (and can fulfill the constraint)
SubSlot slotFromGroup;
// get a slot from the group. obey location constraints, if existing and assigned if (constraint == null) {
AllocatedSlot slotFromGroup; slotFromGroup = assignment.getSlotForTask(vertex);
if (constraint == null || constraint.isUnassigned()) {
slotFromGroup = assignment.getSlotForTask(vertex.getJobvertexId(), vertex);
} }
else { else {
// this returns null, if the constraint cannot be fulfilled slotFromGroup = assignment.getSlotForTask(vertex, constraint);
slotFromGroup = assignment.getSlotForTask(vertex.getJobvertexId(), constraint);
} }
AllocatedSlot newSlot = null;
// the following needs to make sure any allocated slot is released in case of an error // the following needs to make sure any allocated slot is released in case of an error
try { try {
...@@ -232,12 +181,6 @@ public class Scheduler implements InstanceListener, SlotAvailablilityListener { ...@@ -232,12 +181,6 @@ public class Scheduler implements InstanceListener, SlotAvailablilityListener {
if (slotFromGroup != null) { if (slotFromGroup != null) {
// local (or unconstrained in the current group) // local (or unconstrained in the current group)
if (slotFromGroup.getLocality() != Locality.NON_LOCAL) { if (slotFromGroup.getLocality() != Locality.NON_LOCAL) {
// attach to the locality constraint
if (constraint != null && constraint.isUnassigned()) {
constraint.setLocation(slotFromGroup.getInstance());
}
updateLocalityCounters(slotFromGroup.getLocality()); updateLocalityCounters(slotFromGroup.getLocality());
return slotFromGroup; return slotFromGroup;
} }
...@@ -249,13 +192,19 @@ public class Scheduler implements InstanceListener, SlotAvailablilityListener { ...@@ -249,13 +192,19 @@ public class Scheduler implements InstanceListener, SlotAvailablilityListener {
// get a new slot, since we could not place it into the group, or we could not place it locally // get a new slot, since we could not place it into the group, or we could not place it locally
newSlot = getFreeSlotForTask(vertex, locations); newSlot = getFreeSlotForTask(vertex, locations);
AllocatedSlot toUse; SubSlot toUse;
if (newSlot == null) { if (newSlot == null) {
if (slotFromGroup == null) { if (slotFromGroup == null) {
// both null // both null
throw new NoResourceAvailableException(); if (constraint == null || constraint.isUnassigned()) {
throw new NoResourceAvailableException();
} else {
throw new NoResourceAvailableException("Could not allocate a slot on instance " +
constraint.getLocation() + ", as required by the co-location constraint.");
}
} else { } else {
// got a non-local from the group, and no new one
toUse = slotFromGroup; toUse = slotFromGroup;
} }
} }
...@@ -265,17 +214,28 @@ public class Scheduler implements InstanceListener, SlotAvailablilityListener { ...@@ -265,17 +214,28 @@ public class Scheduler implements InstanceListener, SlotAvailablilityListener {
slotFromGroup.releaseSlot(); slotFromGroup.releaseSlot();
} }
toUse = sharingUnit.getTaskAssignment().addSlotWithTask(newSlot, task.getJobVertexId()); if (constraint == null) {
toUse = assignment.addNewSlotWithTask(newSlot, vertex);
} else {
toUse = assignment.addNewSlotWithTask(newSlot, vertex, constraint);
}
} }
else { else {
// both are available and potentially usable // both are available and usable. neither is local
newSlot.releaseSlot(); newSlot.releaseSlot();
toUse = slotFromGroup; toUse = slotFromGroup;
} }
// assign to the co-location hint, if we have one and it is unassigned // assign to the co-location hint, if we have one and it is unassigned
if (constraint != null && constraint.isUnassigned()) { // if it was assigned before and the new one is not local, it is a fail
constraint.setLocation(toUse.getInstance()); if (constraint != null) {
if (constraint.isUnassigned() || toUse.getLocality() == Locality.LOCAL) {
constraint.setSharedSlot(toUse.getSharedSlot());
} else {
// the fail
throw new NoResourceAvailableException("Could not allocate a slot on instance " +
constraint.getLocation() + ", as required by the co-location constraint.");
}
} }
updateLocalityCounters(toUse.getLocality()); updateLocalityCounters(toUse.getLocality());
......
...@@ -24,7 +24,13 @@ import java.util.Set; ...@@ -24,7 +24,13 @@ import java.util.Set;
import org.apache.flink.runtime.instance.AllocatedSlot; import org.apache.flink.runtime.instance.AllocatedSlot;
import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.JobVertexID;
public class SharedSlot { /**
*
* NOTE: This class does no synchronization by itself and its mutating
* methods may only be called from within the synchronization scope of
* it associated SlotSharingGroupAssignment.
*/
class SharedSlot {
private final AllocatedSlot allocatedSlot; private final AllocatedSlot allocatedSlot;
...@@ -48,77 +54,58 @@ public class SharedSlot { ...@@ -48,77 +54,58 @@ public class SharedSlot {
this.subSlots = new HashSet<SubSlot>(); this.subSlots = new HashSet<SubSlot>();
} }
public SharedSlot(AllocatedSlot allocatedSlot) {
if (allocatedSlot == null) {
throw new NullPointerException();
}
this.allocatedSlot = allocatedSlot;
this.assignmentGroup = null;;
this.subSlots = new HashSet<SubSlot>();
}
// -------------------------------------------------------------------------------------------- // --------------------------------------------------------------------------------------------
public AllocatedSlot getAllocatedSlot() { AllocatedSlot getAllocatedSlot() {
return this.allocatedSlot; return this.allocatedSlot;
} }
public boolean isDisposed() { boolean isDisposed() {
return disposed; return disposed;
} }
public int getNumberOfAllocatedSubSlots() { int getNumberOfAllocatedSubSlots() {
synchronized (this.subSlots) { return this.subSlots.size();
return this.subSlots.size();
}
} }
public SubSlot allocateSubSlot(JobVertexID jid) { SubSlot allocateSubSlot(JobVertexID jid) {
synchronized (this.subSlots) { if (disposed) {
if (isDisposed()) { return null;
return null; } else {
} else { SubSlot ss = new SubSlot(this, subSlotNumber++, jid);
SubSlot ss = new SubSlot(this, subSlotNumber++, jid); this.subSlots.add(ss);
this.subSlots.add(ss); return ss;
return ss;
}
} }
} }
public void rease() { void returnAllocatedSlot(SubSlot slot) {
synchronized (this.subSlots) { if (!slot.isReleased()) {
disposed = true; throw new IllegalArgumentException("SubSlot is not released.");
for (SubSlot ss : subSlots) {
ss.releaseSlot();
}
} }
allocatedSlot.releaseSlot(); this.assignmentGroup.releaseSubSlot(slot, this);
} }
void returnAllocatedSlot(SubSlot slot) { int releaseSlot(SubSlot slot) {
boolean release; if (!this.subSlots.remove(slot)) {
throw new IllegalArgumentException("Wrong shared slot for subslot.");
synchronized (this.subSlots) {
if (!this.subSlots.remove(slot)) {
throw new IllegalArgumentException("Wrong shared slot for subslot.");
}
if (assignmentGroup != null) {
release = assignmentGroup.sharedSlotAvailableForJid(this, slot.getJobVertexId(), this.subSlots.isEmpty());
} else {
release = subSlots.isEmpty();
}
if (release) {
disposed = true;
}
} }
return subSlots.size();
// do this call outside the lock, because releasing the allocated slot may go into further scheduler calls }
if (release) {
void dispose() {
if (subSlots.isEmpty()) {
disposed = true;
this.allocatedSlot.releaseSlot(); this.allocatedSlot.releaseSlot();
} else {
throw new IllegalStateException("Cannot dispose while subslots are still alive.");
} }
} }
// --------------------------------------------------------------------------------------------
@Override
public String toString() {
return "Shared " + allocatedSlot.toString();
}
} }
...@@ -28,34 +28,53 @@ import java.util.List; ...@@ -28,34 +28,53 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.flink.runtime.AbstractID;
import org.apache.flink.runtime.executiongraph.ExecutionVertex; import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.instance.AllocatedSlot; import org.apache.flink.runtime.instance.AllocatedSlot;
import org.apache.flink.runtime.instance.Instance; import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.slf4j.Logger;
public class SlotSharingGroupAssignment { public class SlotSharingGroupAssignment {
private static final Logger LOG = Scheduler.LOG;
private final Object lock = new Object();
/** All slots currently allocated to this sharing group */ /** All slots currently allocated to this sharing group */
private final Set<SharedSlot> allSlots = new LinkedHashSet<SharedSlot>(); private final Set<SharedSlot> allSlots = new LinkedHashSet<SharedSlot>();
/** The slots available per vertex type (jid), keyed by instance, to make them locatable */ /** The slots available per vertex type (jid), keyed by instance, to make them locatable */
private final Map<JobVertexID, Map<Instance, List<SharedSlot>>> availableSlotsPerJid = new LinkedHashMap<JobVertexID, Map<Instance, List<SharedSlot>>>(); private final Map<AbstractID, Map<Instance, List<SharedSlot>>> availableSlotsPerJid = new LinkedHashMap<AbstractID, Map<Instance, List<SharedSlot>>>();
// -------------------------------------------------------------------------------------------- // --------------------------------------------------------------------------------------------
public SubSlot addSlotWithTask(AllocatedSlot slot, JobVertexID jid) { public SubSlot addNewSlotWithTask(AllocatedSlot slot, ExecutionVertex vertex) {
JobVertexID id = vertex.getJobvertexId();
return addNewSlotWithTask(slot, id, id);
}
public SubSlot addNewSlotWithTask(AllocatedSlot slot, ExecutionVertex vertex, CoLocationConstraint constraint) {
AbstractID groupId = constraint.getGroupId();
return addNewSlotWithTask(slot, groupId, null);
}
private SubSlot addNewSlotWithTask(AllocatedSlot slot, AbstractID groupId, JobVertexID vertexId) {
final SharedSlot sharedSlot = new SharedSlot(slot, this); final SharedSlot sharedSlot = new SharedSlot(slot, this);
final Instance location = slot.getInstance(); final Instance location = slot.getInstance();
synchronized (allSlots) { synchronized (lock) {
// add to the total bookkeeping // add to the total bookkeeping
allSlots.add(sharedSlot); allSlots.add(sharedSlot);
// allocate us a sub slot to return // allocate us a sub slot to return
SubSlot subslot = sharedSlot.allocateSubSlot(jid); SubSlot subslot = sharedSlot.allocateSubSlot(vertexId);
// preserve the locality information // preserve the locality information
subslot.setLocality(slot.getLocality()); subslot.setLocality(slot.getLocality());
...@@ -63,10 +82,9 @@ public class SlotSharingGroupAssignment { ...@@ -63,10 +82,9 @@ public class SlotSharingGroupAssignment {
boolean entryForNewJidExists = false; boolean entryForNewJidExists = false;
// let the other vertex types know about this one as well // let the other vertex types know about this one as well
for (Map.Entry<AbstractID, Map<Instance, List<SharedSlot>>> entry : availableSlotsPerJid.entrySet()) {
for (Map.Entry<JobVertexID, Map<Instance, List<SharedSlot>>> entry : availableSlotsPerJid.entrySet()) {
if (entry.getKey().equals(jid)) { if (entry.getKey().equals(groupId)) {
entryForNewJidExists = true; entryForNewJidExists = true;
continue; continue;
} }
...@@ -75,9 +93,9 @@ public class SlotSharingGroupAssignment { ...@@ -75,9 +93,9 @@ public class SlotSharingGroupAssignment {
putIntoMultiMap(available, location, sharedSlot); putIntoMultiMap(available, location, sharedSlot);
} }
// make sure an empty entry exists for this jid, if no other entry exists // make sure an empty entry exists for this group, if no other entry exists
if (!entryForNewJidExists) { if (!entryForNewJidExists) {
availableSlotsPerJid.put(jid, new LinkedHashMap<Instance, List<SharedSlot>>()); availableSlotsPerJid.put(groupId, new LinkedHashMap<Instance, List<SharedSlot>>());
} }
return subslot; return subslot;
...@@ -90,105 +108,97 @@ public class SlotSharingGroupAssignment { ...@@ -90,105 +108,97 @@ public class SlotSharingGroupAssignment {
* slots if no local slot is available. The method returns null, when no slot is available for the * slots if no local slot is available. The method returns null, when no slot is available for the
* given JobVertexID at all. * given JobVertexID at all.
* *
* @param jid
* @param vertex * @param vertex
* *
* @return A task vertex for a task with the given JobVertexID, or null, if none is available. * @return A task vertex for a task with the given JobVertexID, or null, if none is available.
*/ */
public AllocatedSlot getSlotForTask(JobVertexID jid, ExecutionVertex vertex) { public SubSlot getSlotForTask(ExecutionVertex vertex) {
synchronized (allSlots) { synchronized (lock) {
return getSlotForTaskInternal(jid, vertex.getPreferredLocations(), false); Pair<SharedSlot, Locality> p = getSlotForTaskInternal(vertex.getJobvertexId(), vertex, vertex.getPreferredLocations(), false);
}
} if (p != null) {
SharedSlot ss = p.getLeft();
SubSlot slot = ss.allocateSubSlot(vertex.getJobvertexId());
public AllocatedSlot getSlotForTask(JobVertexID jid, CoLocationConstraint constraint) { slot.setLocality(p.getRight());
if (constraint.isUnassigned()) { return slot;
throw new IllegalArgumentException("CoLocationConstraint is unassigned"); }
else {
return null;
}
} }
synchronized (allSlots) {
return getSlotForTaskInternal(jid, Collections.singleton(constraint.getLocation()), true);
}
} }
public SubSlot getSlotForTask(ExecutionVertex vertex, CoLocationConstraint constraint) {
public boolean sharedSlotAvailableForJid(SharedSlot slot, JobVertexID jid, boolean lastSubSlot) {
if (slot == null || jid == null) {
throw new NullPointerException();
}
synchronized (allSlots) { synchronized (lock) {
if (!allSlots.contains(slot)) { SharedSlot shared = constraint.getSharedSlot();
throw new IllegalArgumentException("Slot was not associated with this SlotSharingGroup before.");
}
if (lastSubSlot) { if (shared != null && !shared.isDisposed()) {
// this was the last sub slot. unless there is something pending for this jid // initialized and set
// remove this from the availability list of all jids and SubSlot subslot = shared.allocateSubSlot(null);
// return that this one is good to release subslot.setLocality(Locality.LOCAL);
allSlots.remove(slot); return subslot;
}
Instance location = slot.getAllocatedSlot().getInstance(); else if (shared == null) {
// not initialized, grab a new slot. preferred locations are defined by the vertex
for (Map.Entry<JobVertexID, Map<Instance, List<SharedSlot>>> mapEntry : availableSlotsPerJid.entrySet()) { // we only associate the slot with the constraint, if it was a local match
if (mapEntry.getKey().equals(jid)) { Pair<SharedSlot, Locality> p = getSlotForTaskInternal(constraint.getGroupId(), vertex, vertex.getPreferredLocations(), false);
continue; if (p == null) {
} return null;
} else {
shared = p.getLeft();
Locality l = p.getRight();
Map<Instance, List<SharedSlot>> map = mapEntry.getValue(); SubSlot sub = shared.allocateSubSlot(null);
List<SharedSlot> list = map.get(location); sub.setLocality(l);
if (list == null || !list.remove(slot)) {
throw new IllegalStateException("SharedSlot was not available to another vertex type that it was not allocated for before."); if (l != Locality.NON_LOCAL) {
} constraint.setSharedSlot(shared);
if (list.isEmpty()) {
map.remove(location);
} }
return sub;
} }
return true;
} }
else {
Map<Instance, List<SharedSlot>> slotsForJid = availableSlotsPerJid.get(jid); // disposed. get a new slot on the same instance
Instance location = shared.getAllocatedSlot().getInstance();
// sanity check Pair<SharedSlot, Locality> p = getSlotForTaskInternal(constraint.getGroupId(), vertex, Collections.singleton(location), true);
if (slotsForJid == null) { if (p == null) {
throw new IllegalStateException("Trying to return a slot for jid " + jid + return null;
" when available slots indicated that all slots were available."); } else {
shared = p.getLeft();
constraint.setSharedSlot(shared);
SubSlot subslot = shared.allocateSubSlot(null);
subslot.setLocality(Locality.LOCAL);
return subslot;
}
} }
putIntoMultiMap(slotsForJid, slot.getAllocatedSlot().getInstance(), slot);
// do not release, we are still depending on this shared slot
return false;
} }
} }
/** /**
* NOTE: This method is not synchronized by itself, needs to be synchronized externally. * NOTE: This method is not synchronized by itself, needs to be synchronized externally.
* *
* @param jid
* @return An allocated sub slot, or {@code null}, if no slot is available. * @return An allocated sub slot, or {@code null}, if no slot is available.
*/ */
private AllocatedSlot getSlotForTaskInternal(JobVertexID jid, Iterable<Instance> preferredLocations, boolean localOnly) { private Pair<SharedSlot, Locality> getSlotForTaskInternal(AbstractID groupId, ExecutionVertex vertex, Iterable<Instance> preferredLocations, boolean localOnly) {
if (allSlots.isEmpty()) { if (allSlots.isEmpty()) {
return null; return null;
} }
Map<Instance, List<SharedSlot>> slotsForJid = availableSlotsPerJid.get(jid); Map<Instance, List<SharedSlot>> slotsForGroup = availableSlotsPerJid.get(groupId);
// get the available slots for the vertex type (jid) // get the available slots for the group
if (slotsForJid == null) { if (slotsForGroup == null) {
// no task is yet scheduled for that jid, so all slots are available // no task is yet scheduled for that group, so all slots are available
slotsForJid = new LinkedHashMap<Instance, List<SharedSlot>>(); slotsForGroup = new LinkedHashMap<Instance, List<SharedSlot>>();
availableSlotsPerJid.put(jid, slotsForJid); availableSlotsPerJid.put(groupId, slotsForGroup);
for (SharedSlot availableSlot : allSlots) { for (SharedSlot availableSlot : allSlots) {
putIntoMultiMap(slotsForJid, availableSlot.getAllocatedSlot().getInstance(), availableSlot); putIntoMultiMap(slotsForGroup, availableSlot.getAllocatedSlot().getInstance(), availableSlot);
} }
} }
else if (slotsForJid.isEmpty()) { else if (slotsForGroup.isEmpty()) {
return null; return null;
} }
...@@ -202,32 +212,102 @@ public class SlotSharingGroupAssignment { ...@@ -202,32 +212,102 @@ public class SlotSharingGroupAssignment {
// we return early anyways and skip the flag evaluation // we return early anyways and skip the flag evaluation
didNotGetPreferred = true; didNotGetPreferred = true;
SharedSlot slot = removeFromMultiMap(slotsForJid, location); SharedSlot slot = removeFromMultiMap(slotsForGroup, location);
if (slot != null) { if (slot != null && !slot.isDisposed()) {
SubSlot subslot = slot.allocateSubSlot(jid); if (LOG.isDebugEnabled()) {
subslot.setLocality(Locality.LOCAL); LOG.debug("Local assignment in shared group : " + vertex + " --> " + slot);
return subslot; }
return new ImmutablePair<SharedSlot, Locality>(slot, Locality.LOCAL);
} }
} }
} }
// if we want only local assignments, exit now with a "not found" result // if we want only local assignments, exit now with a "not found" result
if (didNotGetPreferred && localOnly) { if (didNotGetPreferred && localOnly) {
if (LOG.isDebugEnabled()) {
LOG.debug("No local assignment in shared possible for " + vertex);
}
return null; return null;
} }
// schedule the task to any available location // schedule the task to any available location
SharedSlot slot = pollFromMultiMap(slotsForJid); SharedSlot slot = pollFromMultiMap(slotsForGroup);
if (slot != null) { if (slot != null && !slot.isDisposed()) {
SubSlot subslot = slot.allocateSubSlot(jid); if (LOG.isDebugEnabled()) {
subslot.setLocality(didNotGetPreferred ? Locality.NON_LOCAL : Locality.UNCONSTRAINED); LOG.debug((didNotGetPreferred ? "Non-local" : "Unconstrained") + " assignment in shared group : " + vertex + " --> " + slot);
return subslot; }
return new ImmutablePair<SharedSlot, Locality>(slot, didNotGetPreferred ? Locality.NON_LOCAL : Locality.UNCONSTRAINED);
} }
else { else {
return null; return null;
} }
} }
void releaseSubSlot(SubSlot subslot, SharedSlot sharedSlot) {
AbstractID groupId = subslot.getGroupId();
synchronized (lock) {
if (!allSlots.contains(sharedSlot)) {
throw new IllegalArgumentException("Slot was not associated with this SlotSharingGroup before.");
}
int slotsRemaining = sharedSlot.releaseSlot(subslot);
if (slotsRemaining == 0) {
// this was the last sub slot. remove this from the availability list
// and trigger disposal
try {
allSlots.remove(sharedSlot);
Instance location = sharedSlot.getAllocatedSlot().getInstance();
if (groupId != null) {
for (Map.Entry<AbstractID, Map<Instance, List<SharedSlot>>> mapEntry : availableSlotsPerJid.entrySet()) {
AbstractID id = mapEntry.getKey();
// hack: we identify co location hint entries by the fact that they are keyed
// by an abstract id, rather than a job vertex id
if (id.getClass() == AbstractID.class || id.equals(groupId)) {
continue;
}
Map<Instance, List<SharedSlot>> map = mapEntry.getValue();
List<SharedSlot> list = map.get(location);
if (list == null || !list.remove(sharedSlot)) {
throw new IllegalStateException("Bug: SharedSlot was not available to another vertex type that it was not allocated for before.");
}
if (list.isEmpty()) {
map.remove(location);
}
}
}
} finally {
sharedSlot.dispose();
}
}
else if (groupId != null) {
// make the shared slot available to tasks within the group it available to
Map<Instance, List<SharedSlot>> slotsForJid = availableSlotsPerJid.get(groupId);
// sanity check
if (slotsForJid == null) {
throw new IllegalStateException("Trying to return a slot for group " + groupId +
" when available slots indicated that all slots were available.");
}
putIntoMultiMap(slotsForJid, sharedSlot.getAllocatedSlot().getInstance(), sharedSlot);
}
}
}
// -------------------------------------------------------------------------------------------- // --------------------------------------------------------------------------------------------
// State // State
// -------------------------------------------------------------------------------------------- // --------------------------------------------------------------------------------------------
...@@ -237,7 +317,7 @@ public class SlotSharingGroupAssignment { ...@@ -237,7 +317,7 @@ public class SlotSharingGroupAssignment {
} }
public int getNumberOfAvailableSlotsForJid(JobVertexID jid) { public int getNumberOfAvailableSlotsForJid(JobVertexID jid) {
synchronized (allSlots) { synchronized (lock) {
Map<Instance, List<SharedSlot>> available = availableSlotsPerJid.get(jid); Map<Instance, List<SharedSlot>> available = availableSlotsPerJid.get(jid);
if (available != null) { if (available != null) {
...@@ -255,6 +335,10 @@ public class SlotSharingGroupAssignment { ...@@ -255,6 +335,10 @@ public class SlotSharingGroupAssignment {
} }
} }
} }
// --------------------------------------------------------------------------------------------
// -------------------------------------------------------------------------------------------- // --------------------------------------------------------------------------------------------
// Utilities // Utilities
......
...@@ -18,25 +18,25 @@ ...@@ -18,25 +18,25 @@
package org.apache.flink.runtime.jobmanager.scheduler; package org.apache.flink.runtime.jobmanager.scheduler;
import org.apache.flink.runtime.AbstractID;
import org.apache.flink.runtime.instance.AllocatedSlot; import org.apache.flink.runtime.instance.AllocatedSlot;
import org.apache.flink.runtime.jobgraph.JobVertexID;
public class SubSlot extends AllocatedSlot { public class SubSlot extends AllocatedSlot {
private final SharedSlot sharedSlot; private final SharedSlot sharedSlot;
private final JobVertexID jid; private final AbstractID groupId;
private final int subSlotNumber; private final int subSlotNumber;
public SubSlot(SharedSlot sharedSlot, int subSlotNumber, JobVertexID jid) { public SubSlot(SharedSlot sharedSlot, int subSlotNumber, AbstractID groupId) {
super(sharedSlot.getAllocatedSlot().getJobID(), super(sharedSlot.getAllocatedSlot().getJobID(),
sharedSlot.getAllocatedSlot().getInstance(), sharedSlot.getAllocatedSlot().getInstance(),
sharedSlot.getAllocatedSlot().getSlotNumber()); sharedSlot.getAllocatedSlot().getSlotNumber());
this.sharedSlot = sharedSlot; this.sharedSlot = sharedSlot;
this.jid = jid; this.groupId = groupId;
this.subSlotNumber = subSlotNumber; this.subSlotNumber = subSlotNumber;
} }
...@@ -59,8 +59,8 @@ public class SubSlot extends AllocatedSlot { ...@@ -59,8 +59,8 @@ public class SubSlot extends AllocatedSlot {
return this.sharedSlot; return this.sharedSlot;
} }
public JobVertexID getJobVertexId() { public AbstractID getGroupId() {
return jid; return groupId;
} }
// -------------------------------------------------------------------------------------------- // --------------------------------------------------------------------------------------------
......
...@@ -59,5 +59,7 @@ public interface ExtendedManagementProtocol extends JobManagementProtocol { ...@@ -59,5 +59,7 @@ public interface ExtendedManagementProtocol extends JobManagementProtocol {
* @return number of available slots * @return number of available slots
* @throws IOException * @throws IOException
*/ */
int getAvailableSlots() throws IOException; int getTotalNumberOfRegisteredSlots() throws IOException;
int getNumberOfSlotsAvailableToScheduler() throws IOException;
} }
...@@ -46,11 +46,11 @@ public class JobManagerTestUtils { ...@@ -46,11 +46,11 @@ public class JobManagerTestUtils {
// max time is 5 seconds // max time is 5 seconds
long deadline = System.currentTimeMillis() + 5000; long deadline = System.currentTimeMillis() + 5000;
while (jm.getAvailableSlots() < numSlots && System.currentTimeMillis() < deadline) { while (jm.getNumberOfSlotsAvailableToScheduler() < numSlots && System.currentTimeMillis() < deadline) {
Thread.sleep(10); Thread.sleep(10);
} }
assertEquals(numSlots, jm.getAvailableSlots()); assertEquals(numSlots, jm.getNumberOfSlotsAvailableToScheduler());
return jm; return jm;
} }
......
...@@ -68,7 +68,7 @@ public class JobManagerITCase { ...@@ -68,7 +68,7 @@ public class JobManagerITCase {
try { try {
assertEquals(1, jm.getAvailableSlots()); assertEquals(1, jm.getTotalNumberOfRegisteredSlots());
// we need to register the job at the library cache manager (with no libraries) // we need to register the job at the library cache manager (with no libraries)
LibraryCacheManager.register(jobGraph.getJobID(), new String[0]); LibraryCacheManager.register(jobGraph.getJobID(), new String[0]);
...@@ -141,7 +141,7 @@ public class JobManagerITCase { ...@@ -141,7 +141,7 @@ public class JobManagerITCase {
try { try {
assertEquals(NUM_TASKS, jm.getAvailableSlots()); assertEquals(NUM_TASKS, jm.getTotalNumberOfRegisteredSlots());
// we need to register the job at the library cache manager (with no libraries) // we need to register the job at the library cache manager (with no libraries)
LibraryCacheManager.register(jobGraph.getJobID(), new String[0]); LibraryCacheManager.register(jobGraph.getJobID(), new String[0]);
...@@ -535,7 +535,7 @@ public class JobManagerITCase { ...@@ -535,7 +535,7 @@ public class JobManagerITCase {
.getTaskManagers()[0].getChannelManager().getGlobalBufferPool(); .getTaskManagers()[0].getChannelManager().getGlobalBufferPool();
try { try {
assertEquals(NUM_TASKS, jm.getAvailableSlots()); assertEquals(NUM_TASKS, jm.getTotalNumberOfRegisteredSlots());
// we need to register the job at the library cache manager (with no libraries) // we need to register the job at the library cache manager (with no libraries)
LibraryCacheManager.register(jobGraph.getJobID(), new String[0]); LibraryCacheManager.register(jobGraph.getJobID(), new String[0]);
...@@ -599,7 +599,7 @@ public class JobManagerITCase { ...@@ -599,7 +599,7 @@ public class JobManagerITCase {
.getTaskManagers()[0].getChannelManager().getGlobalBufferPool(); .getTaskManagers()[0].getChannelManager().getGlobalBufferPool();
try { try {
assertEquals(NUM_TASKS, jm.getAvailableSlots()); assertEquals(NUM_TASKS, jm.getTotalNumberOfRegisteredSlots());
// we need to register the job at the library cache manager (with no libraries) // we need to register the job at the library cache manager (with no libraries)
LibraryCacheManager.register(jobGraph.getJobID(), new String[0]); LibraryCacheManager.register(jobGraph.getJobID(), new String[0]);
...@@ -727,7 +727,7 @@ public class JobManagerITCase { ...@@ -727,7 +727,7 @@ public class JobManagerITCase {
.getTaskManagers()[0].getChannelManager().getGlobalBufferPool(); .getTaskManagers()[0].getChannelManager().getGlobalBufferPool();
try { try {
assertEquals(NUM_TASKS, jm.getAvailableSlots()); assertEquals(NUM_TASKS, jm.getTotalNumberOfRegisteredSlots());
// we need to register the job at the library cache manager (with no libraries) // we need to register the job at the library cache manager (with no libraries)
LibraryCacheManager.register(jobGraph.getJobID(), new String[0]); LibraryCacheManager.register(jobGraph.getJobID(), new String[0]);
...@@ -788,13 +788,13 @@ public class JobManagerITCase { ...@@ -788,13 +788,13 @@ public class JobManagerITCase {
final JobGraph jobGraph = new JobGraph("Pointwise Job", sender, receiver); final JobGraph jobGraph = new JobGraph("Pointwise Job", sender, receiver);
final JobManager jm = startJobManager(NUM_TASKS); final JobManager jm = startJobManager(2*NUM_TASKS);
final GlobalBufferPool bp = ((LocalInstanceManager) jm.getInstanceManager()) final GlobalBufferPool bp = ((LocalInstanceManager) jm.getInstanceManager())
.getTaskManagers()[0].getChannelManager().getGlobalBufferPool(); .getTaskManagers()[0].getChannelManager().getGlobalBufferPool();
try { try {
assertEquals(NUM_TASKS, jm.getAvailableSlots()); assertEquals(2*NUM_TASKS, jm.getNumberOfSlotsAvailableToScheduler());
// we need to register the job at the library cache manager (with no libraries) // we need to register the job at the library cache manager (with no libraries)
LibraryCacheManager.register(jobGraph.getJobID(), new String[0]); LibraryCacheManager.register(jobGraph.getJobID(), new String[0]);
......
...@@ -20,6 +20,7 @@ package org.apache.flink.runtime.jobmanager.scheduler; ...@@ -20,6 +20,7 @@ package org.apache.flink.runtime.jobmanager.scheduler;
import static org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestUtils.getRandomInstance; import static org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestUtils.getRandomInstance;
import static org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestUtils.getTestVertex; import static org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestUtils.getTestVertex;
import static org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestUtils.getTestVertexWithLocation;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
...@@ -48,12 +49,13 @@ public class ScheduleWithCoLocationHintTest { ...@@ -48,12 +49,13 @@ public class ScheduleWithCoLocationHintTest {
SlotSharingGroup sharingGroup = new SlotSharingGroup(); SlotSharingGroup sharingGroup = new SlotSharingGroup();
CoLocationConstraint c1 = new CoLocationConstraint(); CoLocationGroup ccg = new CoLocationGroup();
CoLocationConstraint c2 = new CoLocationConstraint(); CoLocationConstraint c1 = new CoLocationConstraint(ccg);
CoLocationConstraint c3 = new CoLocationConstraint(); CoLocationConstraint c2 = new CoLocationConstraint(ccg);
CoLocationConstraint c4 = new CoLocationConstraint(); CoLocationConstraint c3 = new CoLocationConstraint(ccg);
CoLocationConstraint c5 = new CoLocationConstraint(); CoLocationConstraint c4 = new CoLocationConstraint(ccg);
CoLocationConstraint c6 = new CoLocationConstraint(); CoLocationConstraint c5 = new CoLocationConstraint(ccg);
CoLocationConstraint c6 = new CoLocationConstraint(ccg);
// schedule 4 tasks from the first vertex group // schedule 4 tasks from the first vertex group
AllocatedSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 0, 6), sharingGroup, c1)); AllocatedSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 0, 6), sharingGroup, c1));
...@@ -174,7 +176,7 @@ public class ScheduleWithCoLocationHintTest { ...@@ -174,7 +176,7 @@ public class ScheduleWithCoLocationHintTest {
assertEquals(2, scheduler.getNumberOfAvailableSlots()); assertEquals(2, scheduler.getNumberOfAvailableSlots());
SlotSharingGroup sharingGroup = new SlotSharingGroup(); SlotSharingGroup sharingGroup = new SlotSharingGroup();
CoLocationConstraint c1 = new CoLocationConstraint(); CoLocationConstraint c1 = new CoLocationConstraint(new CoLocationGroup());
AllocatedSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 0, 1), sharingGroup, c1)); AllocatedSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 0, 1), sharingGroup, c1));
AllocatedSlot s2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 0, 1), sharingGroup, c1)); AllocatedSlot s2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 0, 1), sharingGroup, c1));
...@@ -218,7 +220,7 @@ public class ScheduleWithCoLocationHintTest { ...@@ -218,7 +220,7 @@ public class ScheduleWithCoLocationHintTest {
assertEquals(2, scheduler.getNumberOfAvailableSlots()); assertEquals(2, scheduler.getNumberOfAvailableSlots());
SlotSharingGroup sharingGroup = new SlotSharingGroup(); SlotSharingGroup sharingGroup = new SlotSharingGroup();
CoLocationConstraint c1 = new CoLocationConstraint(); CoLocationConstraint c1 = new CoLocationConstraint(new CoLocationGroup());
AllocatedSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 0, 1), sharingGroup, c1)); AllocatedSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 0, 1), sharingGroup, c1));
s1.releaseSlot(); s1.releaseSlot();
...@@ -260,10 +262,12 @@ public class ScheduleWithCoLocationHintTest { ...@@ -260,10 +262,12 @@ public class ScheduleWithCoLocationHintTest {
assertEquals(4, scheduler.getNumberOfAvailableSlots()); assertEquals(4, scheduler.getNumberOfAvailableSlots());
CoLocationConstraint clc1 = new CoLocationConstraint(); CoLocationGroup grp = new CoLocationGroup();
CoLocationConstraint clc2 = new CoLocationConstraint(); CoLocationConstraint clc1 = new CoLocationConstraint(grp);
CoLocationConstraint clc3 = new CoLocationConstraint(); CoLocationConstraint clc2 = new CoLocationConstraint(grp);
CoLocationConstraint clc4 = new CoLocationConstraint(); CoLocationConstraint clc3 = new CoLocationConstraint(grp);
CoLocationConstraint clc4 = new CoLocationConstraint(grp);
SlotSharingGroup shareGroup = new SlotSharingGroup(); SlotSharingGroup shareGroup = new SlotSharingGroup();
// first wave // first wave
...@@ -303,4 +307,298 @@ public class ScheduleWithCoLocationHintTest { ...@@ -303,4 +307,298 @@ public class ScheduleWithCoLocationHintTest {
fail(e.getMessage()); fail(e.getMessage());
} }
} }
@Test
public void testGetsNonLocalFromSharingGroupFirst() {
try {
JobVertexID jid1 = new JobVertexID();
JobVertexID jid2 = new JobVertexID();
JobVertexID jid3 = new JobVertexID();
Scheduler scheduler = new Scheduler();
Instance i1 = getRandomInstance(1);
Instance i2 = getRandomInstance(1);
scheduler.newInstanceAvailable(i2);
scheduler.newInstanceAvailable(i1);
assertEquals(2, scheduler.getNumberOfAvailableSlots());
SlotSharingGroup sharingGroup = new SlotSharingGroup();
CoLocationGroup ccg = new CoLocationGroup();
CoLocationConstraint cc1 = new CoLocationConstraint(ccg);
CoLocationConstraint cc2 = new CoLocationConstraint(ccg);
// schedule something into the shared group so that both instances are in the sharing group
AllocatedSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, i1), sharingGroup));
AllocatedSlot s2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, i2), sharingGroup));
// schedule one locally to instance 1
AllocatedSlot s3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, i1), sharingGroup, cc1));
// schedule with co location constraint (yet unassigned) and a preference for
// instance 1, but it can only get instance 2
AllocatedSlot s4 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, i1), sharingGroup, cc2));
// schedule something into the assigned co-location constraints and check that they override the
// other preferences
AllocatedSlot s5 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid3, 0, 2, i2), sharingGroup, cc1));
AllocatedSlot s6 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid3, 1, 2, i1), sharingGroup, cc2));
// check that each slot got three
assertEquals(3, ((SubSlot) s1).getSharedSlot().getNumberOfAllocatedSubSlots());
assertEquals(3, ((SubSlot) s2).getSharedSlot().getNumberOfAllocatedSubSlots());
assertEquals(s1.getInstance(), s3.getInstance());
assertEquals(s2.getInstance(), s4.getInstance());
assertEquals(s1.getInstance(), s5.getInstance());
assertEquals(s2.getInstance(), s6.getInstance());
// check the scheduler's bookkeeping
assertEquals(0, scheduler.getNumberOfAvailableSlots());
assertEquals(5, scheduler.getNumberOfLocalizedAssignments());
assertEquals(1, scheduler.getNumberOfNonLocalizedAssignments());
assertEquals(0, scheduler.getNumberOfUnconstrainedAssignments());
// release some slots, be sure that new available ones come up
s1.releaseSlot();
s2.releaseSlot();
s3.releaseSlot();
s4.releaseSlot();
s5.releaseSlot();
s6.releaseSlot();
assertEquals(2, scheduler.getNumberOfAvailableSlots());
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
@Test
public void testSlotReleasedInBetween() {
try {
JobVertexID jid1 = new JobVertexID();
JobVertexID jid2 = new JobVertexID();
Scheduler scheduler = new Scheduler();
Instance i1 = getRandomInstance(1);
Instance i2 = getRandomInstance(1);
scheduler.newInstanceAvailable(i2);
scheduler.newInstanceAvailable(i1);
assertEquals(2, scheduler.getNumberOfAvailableSlots());
SlotSharingGroup sharingGroup = new SlotSharingGroup();
CoLocationGroup ccg = new CoLocationGroup();
CoLocationConstraint cc1 = new CoLocationConstraint(ccg);
CoLocationConstraint cc2 = new CoLocationConstraint(ccg);
AllocatedSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, i1), sharingGroup, cc1));
AllocatedSlot s2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, i2), sharingGroup, cc2));
s1.releaseSlot();
s2.releaseSlot();
assertEquals(2, scheduler.getNumberOfAvailableSlots());
assertEquals(0, sharingGroup.getTaskAssignment().getNumberOfSlots());
AllocatedSlot s3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, i2), sharingGroup, cc1));
AllocatedSlot s4 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, i1), sharingGroup, cc2));
// still preserves the previous instance mapping)
assertEquals(i1, s3.getInstance());
assertEquals(i2, s4.getInstance());
s3.releaseSlot();
s4.releaseSlot();
assertEquals(2, scheduler.getNumberOfAvailableSlots());
assertEquals(4, scheduler.getNumberOfLocalizedAssignments());
assertEquals(0, scheduler.getNumberOfNonLocalizedAssignments());
assertEquals(0, scheduler.getNumberOfUnconstrainedAssignments());
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
@Test
public void testSlotReleasedInBetweenAndNoNewLocal() {
try {
JobVertexID jid1 = new JobVertexID();
JobVertexID jid2 = new JobVertexID();
JobVertexID jidx = new JobVertexID();
Scheduler scheduler = new Scheduler();
Instance i1 = getRandomInstance(1);
Instance i2 = getRandomInstance(1);
scheduler.newInstanceAvailable(i2);
scheduler.newInstanceAvailable(i1);
assertEquals(2, scheduler.getNumberOfAvailableSlots());
SlotSharingGroup sharingGroup = new SlotSharingGroup();
CoLocationGroup ccg = new CoLocationGroup();
CoLocationConstraint cc1 = new CoLocationConstraint(ccg);
CoLocationConstraint cc2 = new CoLocationConstraint(ccg);
AllocatedSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, i1), sharingGroup, cc1));
AllocatedSlot s2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, i2), sharingGroup, cc2));
s1.releaseSlot();
s2.releaseSlot();
assertEquals(2, scheduler.getNumberOfAvailableSlots());
assertEquals(0, sharingGroup.getTaskAssignment().getNumberOfSlots());
AllocatedSlot sa = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jidx, 0, 2)));
AllocatedSlot sb = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jidx, 1, 2)));
try {
scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, i2), sharingGroup, cc1));
fail("should not be able to find a resource");
} catch (NoResourceAvailableException e) {
// good
} catch (Exception e) {
fail("wrong exception");
}
sa.releaseSlot();
sb.releaseSlot();
assertEquals(2, scheduler.getNumberOfAvailableSlots());
assertEquals(2, scheduler.getNumberOfLocalizedAssignments());
assertEquals(0, scheduler.getNumberOfNonLocalizedAssignments());
assertEquals(2, scheduler.getNumberOfUnconstrainedAssignments());
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
@Test
public void testScheduleOutOfOrder() {
try {
JobVertexID jid1 = new JobVertexID();
JobVertexID jid2 = new JobVertexID();
Scheduler scheduler = new Scheduler();
Instance i1 = getRandomInstance(1);
Instance i2 = getRandomInstance(1);
scheduler.newInstanceAvailable(i2);
scheduler.newInstanceAvailable(i1);
assertEquals(2, scheduler.getNumberOfAvailableSlots());
SlotSharingGroup sharingGroup = new SlotSharingGroup();
CoLocationGroup ccg = new CoLocationGroup();
CoLocationConstraint cc1 = new CoLocationConstraint(ccg);
CoLocationConstraint cc2 = new CoLocationConstraint(ccg);
// schedule something from the second job vertex id before the first is filled,
// and give locality preferences that hint at using the same shared slot for both
// co location constraints (which we seek to prevent)
AllocatedSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, i1), sharingGroup, cc1));
AllocatedSlot s2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, i1), sharingGroup, cc2));
AllocatedSlot s3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, i1), sharingGroup, cc1));
AllocatedSlot s4 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, i1), sharingGroup, cc2));
// check that each slot got three
assertEquals(2, ((SubSlot) s1).getSharedSlot().getNumberOfAllocatedSubSlots());
assertEquals(2, ((SubSlot) s2).getSharedSlot().getNumberOfAllocatedSubSlots());
assertEquals(s1.getInstance(), s3.getInstance());
assertEquals(s2.getInstance(), s4.getInstance());
// check the scheduler's bookkeeping
assertEquals(0, scheduler.getNumberOfAvailableSlots());
assertEquals(3, scheduler.getNumberOfLocalizedAssignments());
assertEquals(1, scheduler.getNumberOfNonLocalizedAssignments());
assertEquals(0, scheduler.getNumberOfUnconstrainedAssignments());
// release some slots, be sure that new available ones come up
s1.releaseSlot();
s2.releaseSlot();
s3.releaseSlot();
s4.releaseSlot();
assertEquals(2, scheduler.getNumberOfAvailableSlots());
assertEquals(0, sharingGroup.getTaskAssignment().getNumberOfSlots());
assertEquals(0, sharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForJid(jid1));
assertEquals(0, sharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForJid(jid2));
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
@Test
public void nonColocationFollowsCoLocation() {
try {
JobVertexID jid1 = new JobVertexID();
JobVertexID jid2 = new JobVertexID();
Scheduler scheduler = new Scheduler();
Instance i1 = getRandomInstance(1);
Instance i2 = getRandomInstance(1);
scheduler.newInstanceAvailable(i2);
scheduler.newInstanceAvailable(i1);
assertEquals(2, scheduler.getNumberOfAvailableSlots());
SlotSharingGroup sharingGroup = new SlotSharingGroup();
CoLocationGroup ccg = new CoLocationGroup();
CoLocationConstraint cc1 = new CoLocationConstraint(ccg);
CoLocationConstraint cc2 = new CoLocationConstraint(ccg);
AllocatedSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, i1), sharingGroup, cc1));
AllocatedSlot s2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, i2), sharingGroup, cc2));
AllocatedSlot s3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, i1), sharingGroup));
AllocatedSlot s4 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, i1), sharingGroup));
// check that each slot got three
assertEquals(2, ((SubSlot) s1).getSharedSlot().getNumberOfAllocatedSubSlots());
assertEquals(2, ((SubSlot) s2).getSharedSlot().getNumberOfAllocatedSubSlots());
s1.releaseSlot();
s2.releaseSlot();
s3.releaseSlot();
s4.releaseSlot();
assertEquals(2, scheduler.getNumberOfAvailableSlots());
assertEquals(0, sharingGroup.getTaskAssignment().getNumberOfSlots());
assertEquals(0, sharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForJid(jid1));
assertEquals(0, sharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForJid(jid2));
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
} }
...@@ -101,6 +101,7 @@ public class SchedulerTestUtils { ...@@ -101,6 +101,7 @@ public class SchedulerTestUtils {
when(vertex.getParallelSubtaskIndex()).thenReturn(taskIndex); when(vertex.getParallelSubtaskIndex()).thenReturn(taskIndex);
when(vertex.getTotalNumberOfParallelSubtasks()).thenReturn(numTasks); when(vertex.getTotalNumberOfParallelSubtasks()).thenReturn(numTasks);
when(vertex.toString()).thenReturn("TEST-VERTEX"); when(vertex.toString()).thenReturn("TEST-VERTEX");
when(vertex.getSimpleName()).thenReturn("TEST-VERTEX");
Execution execution = mock(Execution.class); Execution execution = mock(Execution.class);
when(execution.getVertex()).thenReturn(vertex); when(execution.getVertex()).thenReturn(vertex);
......
...@@ -19,14 +19,13 @@ ...@@ -19,14 +19,13 @@
package org.apache.flink.runtime.jobmanager.scheduler; package org.apache.flink.runtime.jobmanager.scheduler;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.doAnswer;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.junit.Assert.*; import static org.junit.Assert.*;
import org.junit.Test; import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.apache.flink.runtime.instance.Instance; import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.jobgraph.JobID; import org.apache.flink.runtime.jobgraph.JobID;
import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.JobVertexID;
...@@ -37,7 +36,16 @@ public class SharedSlotsTest { ...@@ -37,7 +36,16 @@ public class SharedSlotsTest {
public void createAndDoNotRelease() { public void createAndDoNotRelease() {
try { try {
SlotSharingGroupAssignment assignment = mock(SlotSharingGroupAssignment.class); SlotSharingGroupAssignment assignment = mock(SlotSharingGroupAssignment.class);
when(assignment.sharedSlotAvailableForJid(any(SharedSlot.class), any(JobVertexID.class), any(boolean.class))).thenReturn(false); doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
final SubSlot sub = (SubSlot) invocation.getArguments()[0];
final SharedSlot shared = (SharedSlot) invocation.getArguments()[1];
shared.releaseSlot(sub);
return null;
}
}).when(assignment).releaseSubSlot(any(SubSlot.class), any(SharedSlot.class));
Instance instance = SchedulerTestUtils.getRandomInstance(1); Instance instance = SchedulerTestUtils.getRandomInstance(1);
...@@ -77,8 +85,18 @@ public class SharedSlotsTest { ...@@ -77,8 +85,18 @@ public class SharedSlotsTest {
public void createAndRelease() { public void createAndRelease() {
try { try {
SlotSharingGroupAssignment assignment = mock(SlotSharingGroupAssignment.class); SlotSharingGroupAssignment assignment = mock(SlotSharingGroupAssignment.class);
when(assignment.sharedSlotAvailableForJid(any(SharedSlot.class), any(JobVertexID.class), eq(false))).thenReturn(false); doAnswer(new Answer<Void>() {
when(assignment.sharedSlotAvailableForJid(any(SharedSlot.class), any(JobVertexID.class), eq(true))).thenReturn(true); @Override
public Void answer(InvocationOnMock invocation) throws Throwable {
final SubSlot sub = (SubSlot) invocation.getArguments()[0];
final SharedSlot shared = (SharedSlot) invocation.getArguments()[1];
if (shared.releaseSlot(sub) == 0) {
shared.dispose();
}
return null;
}
}).when(assignment).releaseSubSlot(any(SubSlot.class), any(SharedSlot.class));
Instance instance = SchedulerTestUtils.getRandomInstance(1); Instance instance = SchedulerTestUtils.getRandomInstance(1);
......
...@@ -184,7 +184,7 @@ public final class ServerTestUtils { ...@@ -184,7 +184,7 @@ public final class ServerTestUtils {
public static void waitForJobManagerToBecomeReady(final ExtendedManagementProtocol jobManager) throws IOException, public static void waitForJobManagerToBecomeReady(final ExtendedManagementProtocol jobManager) throws IOException,
InterruptedException { InterruptedException {
while (jobManager.getAvailableSlots() == 0) { while (jobManager.getTotalNumberOfRegisteredSlots() == 0) {
Thread.sleep(100); Thread.sleep(100);
} }
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册