diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/AllocatedSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AllocatedSlot.java similarity index 60% rename from flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/AllocatedSlot.java rename to flink-runtime/src/main/java/org/apache/flink/runtime/instance/AllocatedSlot.java index 49108625aaea5726971ddaa36c25ec319f03820a..70360443fd6dcafeeb95efe81735451b61e39a1b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/AllocatedSlot.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AllocatedSlot.java @@ -16,14 +16,20 @@ * limitations under the License. */ -package org.apache.flink.runtime.jobmanager.slots; +package org.apache.flink.runtime.instance; -import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.jobmanager.scheduler.Locality; +import org.apache.flink.runtime.jobmanager.slots.SlotContext; +import org.apache.flink.runtime.jobmanager.slots.SlotException; +import org.apache.flink.runtime.jobmanager.slots.SlotOwner; +import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; +import java.util.concurrent.atomic.AtomicReference; + import static org.apache.flink.util.Preconditions.checkNotNull; /** @@ -38,14 +44,11 @@ import static org.apache.flink.util.Preconditions.checkNotNull; * an AllocatedSlot was allocated to the JobManager as soon as the TaskManager registered at the * JobManager. All slots had a default unknown resource profile. */ -public class AllocatedSlot { +public class AllocatedSlot implements SlotContext { /** The ID under which the slot is allocated. Uniquely identifies the slot. */ private final AllocationID slotAllocationId; - /** The ID of the job this slot is allocated for */ - private final JobID jobID; - /** The location information of the TaskManager to which this slot belongs */ private final TaskManagerLocation taskManagerLocation; @@ -56,23 +59,29 @@ public class AllocatedSlot { private final TaskManagerGateway taskManagerGateway; /** The number of the slot on the TaskManager to which slot belongs. Purely informational. */ - private final int slotNumber; + private final int physicalSlotNumber; + + private final SlotOwner slotOwner; + + private final AtomicReference logicalSlotReference; // ------------------------------------------------------------------------ public AllocatedSlot( AllocationID slotAllocationId, - JobID jobID, TaskManagerLocation location, - int slotNumber, + int physicalSlotNumber, ResourceProfile resourceProfile, - TaskManagerGateway taskManagerGateway) { + TaskManagerGateway taskManagerGateway, + SlotOwner slotOwner) { this.slotAllocationId = checkNotNull(slotAllocationId); - this.jobID = checkNotNull(jobID); this.taskManagerLocation = checkNotNull(location); - this.slotNumber = slotNumber; + this.physicalSlotNumber = physicalSlotNumber; this.resourceProfile = checkNotNull(resourceProfile); this.taskManagerGateway = checkNotNull(taskManagerGateway); + this.slotOwner = checkNotNull(slotOwner); + + logicalSlotReference = new AtomicReference<>(null); } // ------------------------------------------------------------------------ @@ -82,7 +91,7 @@ public class AllocatedSlot { * * @return The ID under which the slot is allocated */ - public AllocationID getSlotAllocationId() { + public AllocationID getAllocationId() { return slotAllocationId; } @@ -97,22 +106,13 @@ public class AllocatedSlot { return getTaskManagerLocation().getResourceID(); } - /** - * Returns the ID of the job this allocated slot belongs to. - * - * @return the ID of the job this allocated slot belongs to - */ - public JobID getJobID() { - return jobID; - } - /** * Gets the number of the slot. * * @return The number of the slot on the TaskManager. */ - public int getSlotNumber() { - return slotNumber; + public int getPhysicalSlotNumber() { + return physicalSlotNumber; } /** @@ -144,6 +144,78 @@ public class AllocatedSlot { return taskManagerGateway; } + /** + * Triggers the release of the logical slot. + */ + public void triggerLogicalSlotRelease() { + final LogicalSlot logicalSlot = logicalSlotReference.get(); + + if (logicalSlot != null) { + logicalSlot.releaseSlot(); + } + } + + /** + * Releases the logical slot. + * + * @return true if the logical slot could be released, false otherwise. + */ + public boolean releaseLogicalSlot() { + final LogicalSlot logicalSlot = logicalSlotReference.get(); + + if (logicalSlot != null) { + if (logicalSlot instanceof Slot) { + final Slot slot = (Slot) logicalSlot; + if (slot.markReleased()) { + logicalSlotReference.set(null); + return true; + } + } else { + throw new RuntimeException("Unsupported logical slot type encountered " + logicalSlot.getClass()); + } + + } + + return false; + } + + /** + * Allocates a logical {@link SimpleSlot}. + * + * @return an allocated logical simple slot + * @throws SlotException if we could not allocate a simple slot + */ + public SimpleSlot allocateSimpleSlot(Locality locality) throws SlotException { + + final SimpleSlot simpleSlot = new SimpleSlot(this, slotOwner, physicalSlotNumber); + + if (logicalSlotReference.compareAndSet(null, simpleSlot)) { + simpleSlot.setLocality(locality); + return simpleSlot; + } else { + throw new SlotException("Could not allocate logical simple slot because the allocated slot is already used."); + } + } + + /** + * Allocates a logical {@link SharedSlot}. + * + * @param slotSharingGroupAssignment the slot sharing group to which the shared slot shall belong + * @return an allocated logical shared slot + * @throws SlotException if we could not allocate a shared slot + */ + public SharedSlot allocateSharedSlot(SlotSharingGroupAssignment slotSharingGroupAssignment) throws SlotException { + final SharedSlot sharedSlot = new SharedSlot(this, slotOwner, slotSharingGroupAssignment); + + if (logicalSlotReference.compareAndSet(null, sharedSlot)) { + + + return sharedSlot; + } else { + throw new SlotException("Could not allocate logical shared slot because the allocated slot is already used."); + } + } + // ------------------------------------------------------------------------ /** @@ -164,6 +236,6 @@ public class AllocatedSlot { @Override public String toString() { - return "AllocatedSlot " + slotAllocationId + " @ " + taskManagerLocation + " - " + slotNumber; + return "AllocatedSlot " + slotAllocationId + " @ " + taskManagerLocation + " - " + physicalSlotNumber; } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java index d099f6a857e852e3d94b7cdcf584f4a24c9cc14f..54c8971926bbf52bf12d9944f4adc71691c22520 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java @@ -18,7 +18,6 @@ package org.apache.flink.runtime.instance; -import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.jobmanager.scheduler.SlotAvailabilityListener; import org.apache.flink.runtime.jobmanager.slots.SlotOwner; @@ -210,19 +209,13 @@ public class Instance implements SlotOwner { * Allocates a simple slot on this TaskManager instance. This method returns {@code null}, if no slot * is available at the moment. * - * @param jobID The ID of the job that the slot is allocated for. - * * @return A simple slot that represents a task slot on this TaskManager instance, or null, if the * TaskManager instance has no more slots available. * * @throws InstanceDiedException Thrown if the instance is no longer alive by the time the * slot is allocated. */ - public SimpleSlot allocateSimpleSlot(JobID jobID) throws InstanceDiedException { - if (jobID == null) { - throw new IllegalArgumentException(); - } - + public SimpleSlot allocateSimpleSlot() throws InstanceDiedException { synchronized (instanceLock) { if (isDead) { throw new InstanceDiedException(this); @@ -233,7 +226,7 @@ public class Instance implements SlotOwner { return null; } else { - SimpleSlot slot = new SimpleSlot(jobID, this, location, nextSlot, taskManagerGateway); + SimpleSlot slot = new SimpleSlot(this, location, nextSlot, taskManagerGateway); allocatedSlots.add(slot); return slot; } @@ -244,7 +237,6 @@ public class Instance implements SlotOwner { * Allocates a shared slot on this TaskManager instance. This method returns {@code null}, if no slot * is available at the moment. The shared slot will be managed by the given SlotSharingGroupAssignment. * - * @param jobID The ID of the job that the slot is allocated for. * @param sharingGroupAssignment The assignment group that manages this shared slot. * * @return A shared slot that represents a task slot on this TaskManager instance and can hold other @@ -252,13 +244,8 @@ public class Instance implements SlotOwner { * * @throws InstanceDiedException Thrown if the instance is no longer alive by the time the slot is allocated. */ - public SharedSlot allocateSharedSlot(JobID jobID, SlotSharingGroupAssignment sharingGroupAssignment) - throws InstanceDiedException - { - // the slot needs to be in the returned to taskManager state - if (jobID == null) { - throw new IllegalArgumentException(); - } + public SharedSlot allocateSharedSlot(SlotSharingGroupAssignment sharingGroupAssignment) + throws InstanceDiedException { synchronized (instanceLock) { if (isDead) { @@ -271,7 +258,11 @@ public class Instance implements SlotOwner { } else { SharedSlot slot = new SharedSlot( - jobID, this, location, nextSlot, taskManagerGateway, sharingGroupAssignment); + this, + location, + nextSlot, + taskManagerGateway, + sharingGroupAssignment); allocatedSlots.add(slot); return slot; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SharedSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SharedSlot.java index 2ce4fc340b90842d09a433ab36f9550f3f30fea6..8637159c5b2204bc4f2eda9fca4594974979e639 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SharedSlot.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SharedSlot.java @@ -18,17 +18,20 @@ package org.apache.flink.runtime.instance; -import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.jobmanager.slots.SlotContext; import org.apache.flink.runtime.jobmanager.slots.SlotOwner; import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.util.AbstractID; -import org.apache.flink.api.common.JobID; +import org.apache.flink.util.FlinkException; import javax.annotation.Nullable; + import java.util.ConcurrentModificationException; import java.util.HashSet; import java.util.Set; +import java.util.concurrent.CompletableFuture; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -44,7 +47,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull; * passed through a {@link SlotSharingGroupAssignment} object which is responsible for * synchronization. */ -public class SharedSlot extends Slot { +public class SharedSlot extends Slot implements LogicalSlot { /** The assignment group os shared slots that manages the availability and release of the slots */ private final SlotSharingGroupAssignment assignmentGroup; @@ -52,6 +55,8 @@ public class SharedSlot extends Slot { /** The set os sub-slots allocated from this shared slot */ private final Set subSlots; + private final CompletableFuture cancellationFuture = new CompletableFuture<>(); + // ------------------------------------------------------------------------ // Old Constructors (prior FLIP-6) // ------------------------------------------------------------------------ @@ -60,7 +65,6 @@ public class SharedSlot extends Slot { * Creates a new shared slot that has no parent (is a root slot) and does not belong to any task group. * This constructor is used to create a slot directly from an instance. * - * @param jobID The ID of the job that the slot is created for. * @param owner The component from which this slot is allocated. * @param location The location info of the TaskManager where the slot was allocated from * @param slotNumber The number of the slot. @@ -68,18 +72,17 @@ public class SharedSlot extends Slot { * @param assignmentGroup The assignment group that this shared slot belongs to. */ public SharedSlot( - JobID jobID, SlotOwner owner, TaskManagerLocation location, int slotNumber, + SlotOwner owner, TaskManagerLocation location, int slotNumber, TaskManagerGateway taskManagerGateway, SlotSharingGroupAssignment assignmentGroup) { - this(jobID, owner, location, slotNumber, taskManagerGateway, assignmentGroup, null, null); + this(owner, location, slotNumber, taskManagerGateway, assignmentGroup, null, null); } /** * Creates a new shared slot that has is a sub-slot of the given parent shared slot, and that belongs * to the given task group. * - * @param jobID The ID of the job that the slot is created for. * @param owner The component from which this slot is allocated. * @param location The location info of the TaskManager where the slot was allocated from * @param slotNumber The number of the slot. @@ -89,7 +92,6 @@ public class SharedSlot extends Slot { * @param groupId The assignment group of this slot. */ public SharedSlot( - JobID jobID, SlotOwner owner, TaskManagerLocation location, int slotNumber, @@ -98,7 +100,7 @@ public class SharedSlot extends Slot { @Nullable SharedSlot parent, @Nullable AbstractID groupId) { - super(jobID, owner, location, slotNumber, taskManagerGateway, parent, groupId); + super(owner, location, slotNumber, taskManagerGateway, parent, groupId); this.assignmentGroup = checkNotNull(assignmentGroup); this.subSlots = new HashSet(); @@ -112,38 +114,23 @@ public class SharedSlot extends Slot { * Creates a new shared slot that has no parent (is a root slot) and does not belong to any task group. * This constructor is used to create a slot directly from an instance. * - * @param allocatedSlot The allocated slot that this slot represents. + * @param slotContext The slot context of this shared slot * @param owner The component from which this slot is allocated. * @param assignmentGroup The assignment group that this shared slot belongs to. */ - public SharedSlot(AllocatedSlot allocatedSlot, SlotOwner owner, SlotSharingGroupAssignment assignmentGroup) { - this(allocatedSlot, owner, allocatedSlot.getSlotNumber(), assignmentGroup, null, null); - } - - /** - * Creates a new shared slot that is a sub-slot of the given parent shared slot, and that belongs - * to the given task group. - * - * @param parent The parent slot of this slot. - * @param owner The component from which this slot is allocated. - * @param slotNumber The number of the slot. - * @param assignmentGroup The assignment group that this shared slot belongs to. - * @param groupId The assignment group of this slot. - */ - public SharedSlot( - SharedSlot parent, SlotOwner owner, int slotNumber, - SlotSharingGroupAssignment assignmentGroup, - AbstractID groupId) { - - this(parent.getAllocatedSlot(), owner, slotNumber, assignmentGroup, parent, groupId); + public SharedSlot(SlotContext slotContext, SlotOwner owner, SlotSharingGroupAssignment assignmentGroup) { + this(slotContext, owner, slotContext.getPhysicalSlotNumber(), assignmentGroup, null, null); } private SharedSlot( - AllocatedSlot allocatedSlot, SlotOwner owner, int slotNumber, + SlotContext slotInformation, + SlotOwner owner, + int slotNumber, SlotSharingGroupAssignment assignmentGroup, - @Nullable SharedSlot parent, @Nullable AbstractID groupId) { + @Nullable SharedSlot parent, + @Nullable AbstractID groupId) { - super(allocatedSlot, owner, slotNumber, parent, groupId); + super(slotInformation, owner, slotNumber, parent, groupId); this.assignmentGroup = checkNotNull(assignmentGroup); this.subSlots = new HashSet(); @@ -186,14 +173,44 @@ public class SharedSlot extends Slot { public boolean hasChildren() { return subSlots.size() > 0; } - + @Override - public void releaseInstanceSlot() { + public boolean tryAssignPayload(Payload payload) { + throw new UnsupportedOperationException("Cannot assign an execution attempt id to a shared slot."); + } + + @Nullable + @Override + public Payload getPayload() { + return null; + } + + @Override + public CompletableFuture releaseSlot() { + cancellationFuture.completeExceptionally(new FlinkException("Shared slot " + this + " is being released.")); + assignmentGroup.releaseSharedSlot(this); - + if (!(isReleased() && subSlots.isEmpty())) { throw new IllegalStateException("Bug: SharedSlot is not empty and released after call to releaseSlot()"); } + + return CompletableFuture.completedFuture(null); + } + + @Override + public void releaseInstanceSlot() { + releaseSlot(); + } + + @Override + public int getPhysicalSlotNumber() { + return getRootSlotNumber(); + } + + @Override + public AllocationID getAllocationId() { + return getSlotContext().getAllocationId(); } /** @@ -222,8 +239,12 @@ public class SharedSlot extends Slot { SimpleSlot allocateSubSlot(AbstractID groupId) { if (isAlive()) { SimpleSlot slot = new SimpleSlot( - getJobID(), getOwner(), getTaskManagerLocation(), subSlots.size(), - getTaskManagerGateway(), this, groupId); + getOwner(), + getTaskManagerLocation(), + subSlots.size(), + getTaskManagerGateway(), + this, + groupId); subSlots.add(slot); return slot; } @@ -244,8 +265,13 @@ public class SharedSlot extends Slot { SharedSlot allocateSharedSlot(AbstractID groupId){ if (isAlive()) { SharedSlot slot = new SharedSlot( - getJobID(), getOwner(), getTaskManagerLocation(), subSlots.size(), - getTaskManagerGateway(), assignmentGroup, this, groupId); + getOwner(), + getTaskManagerLocation(), + subSlots.size(), + getTaskManagerGateway(), + assignmentGroup, + this, + groupId); subSlots.add(slot); return slot; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java index 0c9e11c9f994a19e1a9d9c538d6817f3952a39ee..d397c08adc1222fb219e2a293422f89c915594ff 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java @@ -18,11 +18,10 @@ package org.apache.flink.runtime.instance; -import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.clusterframework.types.AllocationID; -import org.apache.flink.runtime.clusterframework.types.ResourceProfile; import org.apache.flink.runtime.jobmanager.scheduler.Locality; -import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot; +import org.apache.flink.runtime.jobmanager.slots.SimpleSlotContext; +import org.apache.flink.runtime.jobmanager.slots.SlotContext; import org.apache.flink.runtime.jobmanager.slots.SlotOwner; import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; @@ -64,23 +63,21 @@ public class SimpleSlot extends Slot implements LogicalSlot { /** * Creates a new simple slot that stands alone and does not belong to shared slot. * - * @param jobID The ID of the job that the slot is allocated for. * @param owner The component from which this slot is allocated. * @param location The location info of the TaskManager where the slot was allocated from * @param slotNumber The number of the task slot on the instance. * @param taskManagerGateway The gateway to communicate with the TaskManager of this slot */ public SimpleSlot( - JobID jobID, SlotOwner owner, TaskManagerLocation location, int slotNumber, + SlotOwner owner, TaskManagerLocation location, int slotNumber, TaskManagerGateway taskManagerGateway) { - this(jobID, owner, location, slotNumber, taskManagerGateway, null, null); + this(owner, location, slotNumber, taskManagerGateway, null, null); } /** * Creates a new simple slot that belongs to the given shared slot and * is identified by the given ID. * - * @param jobID The ID of the job that the slot is allocated for. * @param owner The component from which this slot is allocated. * @param location The location info of the TaskManager where the slot was allocated from * @param slotNumber The number of the simple slot in its parent shared slot. @@ -89,15 +86,25 @@ public class SimpleSlot extends Slot implements LogicalSlot { * @param groupID The ID that identifies the group that the slot belongs to. */ public SimpleSlot( - JobID jobID, SlotOwner owner, TaskManagerLocation location, int slotNumber, + SlotOwner owner, + TaskManagerLocation location, + int slotNumber, TaskManagerGateway taskManagerGateway, - @Nullable SharedSlot parent, @Nullable AbstractID groupID) { - - super(parent != null ? - parent.getAllocatedSlot() : - new AllocatedSlot(NO_ALLOCATION_ID, jobID, location, slotNumber, - ResourceProfile.UNKNOWN, taskManagerGateway), - owner, slotNumber, parent, groupID); + @Nullable SharedSlot parent, + @Nullable AbstractID groupID) { + + super( + parent != null ? + parent.getSlotContext() : + new SimpleSlotContext( + NO_ALLOCATION_ID, + location, + slotNumber, + taskManagerGateway), + owner, + slotNumber, + parent, + groupID); } // ------------------------------------------------------------------------ @@ -107,12 +114,11 @@ public class SimpleSlot extends Slot implements LogicalSlot { /** * Creates a new simple slot that stands alone and does not belong to shared slot. * - * @param allocatedSlot The allocated slot that this slot represents. + * @param slotContext The slot context of this simple slot * @param owner The component from which this slot is allocated. - * @param slotNumber The number of the task slot on the instance. */ - public SimpleSlot(AllocatedSlot allocatedSlot, SlotOwner owner, int slotNumber) { - this(allocatedSlot, owner, slotNumber, null, null); + public SimpleSlot(SlotContext slotContext, SlotOwner owner, int slotNumber) { + this(slotContext, owner, slotNumber, null, null); } /** @@ -121,27 +127,29 @@ public class SimpleSlot extends Slot implements LogicalSlot { * * @param parent The parent shared slot. * @param owner The component from which this slot is allocated. - * @param slotNumber The number of the simple slot in its parent shared slot. * @param groupID The ID that identifies the group that the slot belongs to. */ public SimpleSlot(SharedSlot parent, SlotOwner owner, int slotNumber, AbstractID groupID) { - this(parent.getAllocatedSlot(), owner, slotNumber, parent, groupID); + this(parent.getSlotContext(), owner, slotNumber, parent, groupID); } /** * Creates a new simple slot that belongs to the given shared slot and * is identified by the given ID.. * - * @param allocatedSlot The allocated slot that this slot represents. + * @param slotContext The slot context of this simple slot * @param owner The component from which this slot is allocated. * @param slotNumber The number of the simple slot in its parent shared slot. * @param parent The parent shared slot. * @param groupID The ID that identifies the group that the slot belongs to. */ private SimpleSlot( - AllocatedSlot allocatedSlot, SlotOwner owner, int slotNumber, - @Nullable SharedSlot parent, @Nullable AbstractID groupID) { - super(allocatedSlot, owner, slotNumber, parent, groupID); + SlotContext slotContext, + SlotOwner owner, + int slotNumber, + @Nullable SharedSlot parent, + @Nullable AbstractID groupID) { + super(slotContext, owner, slotNumber, parent, groupID); } // ------------------------------------------------------------------------ @@ -263,7 +271,7 @@ public class SimpleSlot extends Slot implements LogicalSlot { @Override public AllocationID getAllocationId() { - return getAllocatedSlot().getSlotAllocationId(); + return getSlotContext().getAllocationId(); } // ------------------------------------------------------------------------ diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java index 804682bb9e396c4f752c17605919044767a7cfcf..6262c9a9125d91ef98eec3c64b118fb3011343d2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java @@ -18,11 +18,10 @@ package org.apache.flink.runtime.instance; -import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceID; -import org.apache.flink.runtime.clusterframework.types.ResourceProfile; -import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot; +import org.apache.flink.runtime.jobmanager.slots.SimpleSlotContext; +import org.apache.flink.runtime.jobmanager.slots.SlotContext; import org.apache.flink.runtime.jobmanager.slots.SlotOwner; import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; @@ -66,8 +65,8 @@ public abstract class Slot { // ------------------------------------------------------------------------ - /** The allocated slot that this slot represents. */ - private final AllocatedSlot allocatedSlot; + /** Context of this logical slot. */ + private final SlotContext slotContext; /** The owner of this slot - the slot was taken from that owner and must be disposed to it */ private final SlotOwner owner; @@ -80,7 +79,6 @@ public abstract class Slot { @Nullable private final AbstractID groupID; - /** The number of the slot on which the task is deployed */ private final int slotNumber; /** The state of the vertex, only atomically updated */ @@ -93,7 +91,6 @@ public abstract class Slot { * *

This is the old way of constructing slots, prior to the FLIP-6 resource management refactoring. * - * @param jobID The ID of the job that this slot is allocated for. * @param owner The component from which this slot is allocated. * @param location The location info of the TaskManager where the slot was allocated from * @param slotNumber The number of this slot. @@ -103,7 +100,6 @@ public abstract class Slot { * if the slot does not belong to any task group. */ protected Slot( - JobID jobID, SlotOwner owner, TaskManagerLocation location, int slotNumber, @@ -113,12 +109,11 @@ public abstract class Slot { checkArgument(slotNumber >= 0); - this.allocatedSlot = new AllocatedSlot( + // create a simple slot context + this.slotContext = new SimpleSlotContext( NO_ALLOCATION_ID, - jobID, location, slotNumber, - ResourceProfile.UNKNOWN, taskManagerGateway); this.owner = checkNotNull(owner); @@ -130,7 +125,7 @@ public abstract class Slot { /** * Base constructor for slots. * - * @param allocatedSlot The allocated slot that this slot represents. + * @param slotContext The slot context of this slot. * @param owner The component from which this slot is allocated. * @param slotNumber The number of this slot. * @param parent The parent slot that contains this slot. May be null, if this slot is the root. @@ -138,12 +133,13 @@ public abstract class Slot { * if the slot does not belong to any task group. */ protected Slot( - AllocatedSlot allocatedSlot, SlotOwner owner, int slotNumber, - @Nullable SharedSlot parent, @Nullable AbstractID groupID) { - - checkArgument(slotNumber >= 0); + SlotContext slotContext, + SlotOwner owner, + int slotNumber, + @Nullable SharedSlot parent, + @Nullable AbstractID groupID) { - this.allocatedSlot = checkNotNull(allocatedSlot); + this.slotContext = checkNotNull(slotContext); this.owner = checkNotNull(owner); this.parent = parent; // may be null this.groupID = groupID; // may be null @@ -157,17 +153,8 @@ public abstract class Slot { * * @return This slot's allocated slot. */ - public AllocatedSlot getAllocatedSlot() { - return allocatedSlot; - } - - /** - * Returns the ID of the job this allocated slot belongs to. - * - * @return the ID of the job this allocated slot belongs to - */ - public JobID getJobID() { - return allocatedSlot.getJobID(); + public SlotContext getSlotContext() { + return slotContext; } /** @@ -176,7 +163,7 @@ public abstract class Slot { * @return The ID of the TaskManager that offers this slot */ public ResourceID getTaskManagerID() { - return allocatedSlot.getTaskManagerLocation().getResourceID(); + return slotContext.getTaskManagerLocation().getResourceID(); } /** @@ -185,7 +172,7 @@ public abstract class Slot { * @return The location info of the TaskManager that offers this slot */ public TaskManagerLocation getTaskManagerLocation() { - return allocatedSlot.getTaskManagerLocation(); + return slotContext.getTaskManagerLocation(); } /** @@ -196,7 +183,7 @@ public abstract class Slot { * @return The actor gateway that can be used to send messages to the TaskManager. */ public TaskManagerGateway getTaskManagerGateway() { - return allocatedSlot.getTaskManagerGateway(); + return slotContext.getTaskManagerGateway(); } /** @@ -373,7 +360,7 @@ public abstract class Slot { } protected String hierarchy() { - return (getParent() != null ? getParent().hierarchy() : "") + '(' + slotNumber + ')'; + return (getParent() != null ? getParent().hierarchy() : "") + '(' + getSlotNumber() + ')'; } private static String getStateName(int state) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java index 771d690275232fbedc35ffec7a20e5fefba78c1e..2ccea75b6e46c428dede1b6f1f63fc2457aa2e71 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java @@ -21,7 +21,6 @@ package org.apache.flink.runtime.instance; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.time.Time; -import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; @@ -29,9 +28,11 @@ import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.jobmanager.scheduler.Locality; import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException; import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit; -import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot; +import org.apache.flink.runtime.jobmanager.slots.SlotContext; import org.apache.flink.runtime.jobmanager.slots.SlotAndLocality; +import org.apache.flink.runtime.jobmanager.slots.SlotException; import org.apache.flink.runtime.jobmanager.slots.SlotOwner; +import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; import org.apache.flink.runtime.jobmaster.JobMasterId; import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; @@ -59,11 +60,11 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; import java.util.stream.Stream; -import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; /** @@ -249,7 +250,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway { // work on all slots waiting for this connection for (PendingRequest pendingRequest : waitingForResourceManager.values()) { - requestSlotFromResourceManager(pendingRequest); + requestSlotFromResourceManager(resourceManagerGateway, pendingRequest); } // all sent off @@ -277,24 +278,23 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway { } @Override - public void returnAllocatedSlot(Slot slot) { - internalReturnAllocatedSlot(slot); + public void returnAllocatedSlot(SlotContext allocatedSlot) { + internalReturnAllocatedSlot(allocatedSlot.getAllocationId()); } @Override - public CompletableFuture cancelSlotAllocation(SlotRequestID requestId) { + public CompletableFuture cancelSlotRequest(SlotRequestID requestId) { final PendingRequest pendingRequest = removePendingRequest(requestId); if (pendingRequest != null) { failPendingRequest(pendingRequest, new CancellationException("Allocation with request id" + requestId + " cancelled.")); } else { - final Slot slot = allocatedSlots.get(requestId); + final AllocatedSlot allocatedSlot = allocatedSlots.get(requestId); - if (slot != null) { - LOG.info("Returning allocated slot {} because the corresponding allocation request {} was cancelled.", slot, requestId); - if (slot.markCancelled()) { - internalReturnAllocatedSlot(slot); - } + if (allocatedSlot != null) { + LOG.info("Returning allocated slot {} because the corresponding allocation request {} was cancelled.", allocatedSlot, requestId); + // TODO: Avoid having to send another message to do the slot releasing (e.g. introduce Slot#cancelExecution) and directly return slot + allocatedSlot.triggerLogicalSlotRelease(); } else { LOG.debug("There was no slot allocation with {} to be cancelled.", requestId); } @@ -312,24 +312,36 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway { // (1) do we have a slot available already? SlotAndLocality slotFromPool = availableSlots.poll(resources, locationPreferences); if (slotFromPool != null) { - SimpleSlot slot = createSimpleSlot(slotFromPool.slot(), slotFromPool.locality()); - allocatedSlots.add(requestId, slot); - return CompletableFuture.completedFuture(slot); - } + final AllocatedSlot allocatedSlot = slotFromPool.slot(); - // the request will be completed by a future - final CompletableFuture future = new CompletableFuture<>(); + final SimpleSlot simpleSlot; + try { + simpleSlot = allocatedSlot.allocateSimpleSlot(slotFromPool.locality()); + } catch (SlotException e) { + availableSlots.add(allocatedSlot, clock.relativeTimeMillis()); - // (2) need to request a slot - if (resourceManagerGateway == null) { - // no slot available, and no resource manager connection - stashRequestWaitingForResourceManager(requestId, resources, future); - } else { - // we have a resource manager connection, so let's ask it for more resources - requestSlotFromResourceManager(new PendingRequest(requestId, future, resources)); + return FutureUtils.completedExceptionally(e); + } + + allocatedSlots.add(requestId, allocatedSlot); + return CompletableFuture.completedFuture(simpleSlot); } - return future; + // we have to request a new allocated slot + CompletableFuture allocatedSlotFuture = requestSlot( + requestId, + resources); + + return allocatedSlotFuture.thenApply( + (AllocatedSlot allocatedSlot) -> { + try { + return allocatedSlot.allocateSimpleSlot(Locality.UNKNOWN); + } catch (SlotException e) { + returnAllocatedSlot(allocatedSlot); + + throw new CompletionException("Could not allocate a logical simple slot.", e); + } + }); } /** @@ -354,16 +366,37 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway { } } + private CompletableFuture requestSlot( + SlotRequestID slotRequestId, + ResourceProfile resourceProfile) { + + final PendingRequest pendingRequest = new PendingRequest( + slotRequestId, + resourceProfile); + + if (resourceManagerGateway == null) { + stashRequestWaitingForResourceManager(pendingRequest); + } else { + requestSlotFromResourceManager(resourceManagerGateway, pendingRequest); + } + + return pendingRequest.getAllocatedSlotFuture(); + } + private void requestSlotFromResourceManager( + final ResourceManagerGateway resourceManagerGateway, final PendingRequest pendingRequest) { + Preconditions.checkNotNull(resourceManagerGateway); + Preconditions.checkNotNull(pendingRequest); + LOG.info("Requesting slot with profile {} from resource manager (request = {}).", pendingRequest.getResourceProfile(), pendingRequest.getSlotRequestId()); final AllocationID allocationId = new AllocationID(); pendingRequests.put(pendingRequest.getSlotRequestId(), allocationId, pendingRequest); - pendingRequest.getFuture().whenComplete( + pendingRequest.getAllocatedSlotFuture().whenComplete( (value, throwable) -> { if (throwable != null) { resourceManagerGateway.cancelSlotRequest(allocationId); @@ -405,7 +438,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway { private void slotRequestToResourceManagerFailed(SlotRequestID slotRequestID, Throwable failure) { PendingRequest request = pendingRequests.removeKeyA(slotRequestID); if (request != null) { - request.getFuture().completeExceptionally(new NoResourceAvailableException( + request.getAllocatedSlotFuture().completeExceptionally(new NoResourceAvailableException( "No pooled slot available and request to ResourceManager for new slot failed", failure)); } else { if (LOG.isDebugEnabled()) { @@ -425,25 +458,22 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway { Preconditions.checkNotNull(pendingRequest); Preconditions.checkNotNull(e); - if (!pendingRequest.getFuture().isDone()) { - pendingRequest.getFuture().completeExceptionally(e); + if (!pendingRequest.getAllocatedSlotFuture().isDone()) { + pendingRequest.getAllocatedSlotFuture().completeExceptionally(e); } } - private void stashRequestWaitingForResourceManager( - final SlotRequestID requestId, - final ResourceProfile resources, - final CompletableFuture future) { + private void stashRequestWaitingForResourceManager(final PendingRequest pendingRequest) { LOG.info("Cannot serve slot request, no ResourceManager connected. " + - "Adding as pending request {}", requestId); + "Adding as pending request {}", pendingRequest.getSlotRequestId()); - waitingForResourceManager.put(requestId, new PendingRequest(requestId, future, resources)); + waitingForResourceManager.put(pendingRequest.getSlotRequestId(), pendingRequest); scheduleRunAsync(new Runnable() { @Override public void run() { - checkTimeoutRequestWaitingForResourceManager(requestId); + checkTimeoutRequestWaitingForResourceManager(pendingRequest.getSlotRequestId()); } }, resourceManagerRequestsTimeout); } @@ -465,38 +495,31 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway { * Return the slot back to this pool without releasing it. It's mainly called by failed / cancelled tasks, and the * slot can be reused by other pending requests if the resource profile matches.n * - * @param slot The slot needs to be returned + * @param allocationId identifying the slot which is returned */ - private void internalReturnAllocatedSlot(Slot slot) { - checkNotNull(slot); - checkArgument(!slot.isAlive(), "slot is still alive"); - checkArgument(slot.getOwner() == providerAndOwner, "slot belongs to the wrong pool."); - - // markReleased() is an atomic check-and-set operation, so that the slot is guaranteed - // to be returned only once - if (slot.markReleased()) { - if (allocatedSlots.remove(slot)) { - // this slot allocation is still valid, use the slot to fulfill another request - // or make it available again - final AllocatedSlot taskManagerSlot = slot.getAllocatedSlot(); - final PendingRequest pendingRequest = pollMatchingPendingRequest(taskManagerSlot); - + private void internalReturnAllocatedSlot(AllocationID allocationId) { + final AllocatedSlot allocatedSlot = allocatedSlots.remove(allocationId); + + if (allocatedSlot != null) { + if (allocatedSlot.releaseLogicalSlot()) { + + final PendingRequest pendingRequest = pollMatchingPendingRequest(allocatedSlot); + if (pendingRequest != null) { LOG.debug("Fulfilling pending request [{}] early with returned slot [{}]", - pendingRequest.getSlotRequestId(), taskManagerSlot.getSlotAllocationId()); + pendingRequest.getSlotRequestId(), allocatedSlot.getAllocationId()); - SimpleSlot newSlot = createSimpleSlot(taskManagerSlot, Locality.UNKNOWN); - allocatedSlots.add(pendingRequest.getSlotRequestId(), newSlot); - pendingRequest.getFuture().complete(newSlot); + allocatedSlots.add(pendingRequest.getSlotRequestId(), allocatedSlot); + pendingRequest.getAllocatedSlotFuture().complete(allocatedSlot); + } else { + LOG.debug("Adding returned slot [{}] to available slots", allocatedSlot.getAllocationId()); + availableSlots.add(allocatedSlot, clock.relativeTimeMillis()); } - else { - LOG.debug("Adding returned slot [{}] to available slots", taskManagerSlot.getSlotAllocationId()); - availableSlots.add(taskManagerSlot, clock.relativeTimeMillis()); - } - } - else { - LOG.debug("Returned slot's allocation has been failed. Dropping slot."); + } else { + LOG.debug("Failed to mark the logical slot of {} as released.", allocatedSlot); } + } else { + LOG.debug("Could not find allocated slot {}. Ignoring returning slot.", allocationId); } } @@ -524,19 +547,26 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway { } @Override - public CompletableFuture> offerSlots(Collection> offers) { + public CompletableFuture> offerSlots( + TaskManagerLocation taskManagerLocation, + TaskManagerGateway taskManagerGateway, + Collection offers) { validateRunsInMainThread(); List>> acceptedSlotOffers = offers.stream().map( offer -> { - CompletableFuture> acceptedSlotOffer = offerSlot(offer.f0).thenApply( - (acceptedSlot) -> { - if (acceptedSlot) { - return Optional.of(offer.f1); - } else { - return Optional.empty(); - } - }); + CompletableFuture> acceptedSlotOffer = offerSlot( + taskManagerLocation, + taskManagerGateway, + offer) + .thenApply( + (acceptedSlot) -> { + if (acceptedSlot) { + return Optional.of(offer); + } else { + return Optional.empty(); + } + }); return acceptedSlotOffer; } @@ -564,20 +594,25 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway { * we issued. Slot offering may be rejected if we find something mismatching or there is actually no pending * request waiting for this slot (maybe fulfilled by some other returned slot). * - * @param slot The offered slot + * @param taskManagerLocation location from where the offer comes from + * @param taskManagerGateway TaskManager gateway + * @param slotOffer the offered slot * @return True if we accept the offering */ @Override - public CompletableFuture offerSlot(final AllocatedSlot slot) { + public CompletableFuture offerSlot( + final TaskManagerLocation taskManagerLocation, + final TaskManagerGateway taskManagerGateway, + final SlotOffer slotOffer) { validateRunsInMainThread(); // check if this TaskManager is valid - final ResourceID resourceID = slot.getTaskManagerId(); - final AllocationID allocationID = slot.getSlotAllocationId(); + final ResourceID resourceID = taskManagerLocation.getResourceID(); + final AllocationID allocationID = slotOffer.getAllocationId(); if (!registeredTaskManagers.contains(resourceID)) { LOG.debug("Received outdated slot offering [{}] from unregistered TaskManager: {}", - slot.getSlotAllocationId(), slot); + slotOffer.getAllocationId(), taskManagerLocation); return CompletableFuture.completedFuture(false); } @@ -590,19 +625,26 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway { return CompletableFuture.completedFuture(true); } + final AllocatedSlot allocatedSlot = new AllocatedSlot( + slotOffer.getAllocationId(), + taskManagerLocation, + slotOffer.getSlotIndex(), + slotOffer.getResourceProfile(), + taskManagerGateway, + providerAndOwner); + // check whether we have request waiting for this slot PendingRequest pendingRequest = pendingRequests.removeKeyB(allocationID); if (pendingRequest != null) { // we were waiting for this! - SimpleSlot resultSlot = createSimpleSlot(slot, Locality.UNKNOWN); - pendingRequest.getFuture().complete(resultSlot); - allocatedSlots.add(pendingRequest.getSlotRequestId(), resultSlot); + allocatedSlots.add(pendingRequest.getSlotRequestId(), allocatedSlot); + pendingRequest.getAllocatedSlotFuture().complete(allocatedSlot); } else { // we were actually not waiting for this: // - could be that this request had been fulfilled // - we are receiving the slots from TaskManagers after becoming leaders - availableSlots.add(slot, clock.relativeTimeMillis()); + availableSlots.add(allocatedSlot, clock.relativeTimeMillis()); } // we accepted the request in any case. slot will be released after it idled for @@ -639,11 +681,11 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway { LOG.debug("Failed available slot [{}] with ", allocationID, cause); } else { - Slot slot = allocatedSlots.remove(allocationID); - if (slot != null) { + AllocatedSlot allocatedSlot = allocatedSlots.remove(allocationID); + if (allocatedSlot != null) { // release the slot. // since it is not in 'allocatedSlots' any more, it will be dropped o return' - slot.releaseInstanceSlot(); + allocatedSlot.triggerLogicalSlotRelease(); } else { LOG.debug("Outdated request to fail slot [{}] with ", allocationID, cause); @@ -681,27 +723,17 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway { if (registeredTaskManagers.remove(resourceID)) { availableSlots.removeAllForTaskManager(resourceID); - final Set allocatedSlotsForResource = allocatedSlots.removeSlotsForTaskManager(resourceID); - for (Slot slot : allocatedSlotsForResource) { - slot.releaseInstanceSlot(); + final Set allocatedSlotsForResource = allocatedSlots.removeSlotsForTaskManager(resourceID); + for (AllocatedSlot allocatedSlot : allocatedSlotsForResource) { + allocatedSlot.triggerLogicalSlotRelease(); + // TODO: This is a work-around to mark the logical slot as released. We should split up the internalReturnSlot method to not poll pending requests + allocatedSlot.releaseLogicalSlot(); } } return CompletableFuture.completedFuture(Acknowledge.get()); } - // ------------------------------------------------------------------------ - // Utilities - // ------------------------------------------------------------------------ - - private SimpleSlot createSimpleSlot(AllocatedSlot slot, Locality locality) { - SimpleSlot result = new SimpleSlot(slot, providerAndOwner, slot.getSlotNumber()); - if (locality != null) { - result.setLocality(locality); - } - return result; - } - // ------------------------------------------------------------------------ // Methods for tests // ------------------------------------------------------------------------ @@ -736,10 +768,10 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway { static class AllocatedSlots { /** All allocated slots organized by TaskManager's id */ - private final Map> allocatedSlotsByTaskManager; + private final Map> allocatedSlotsByTaskManager; /** All allocated slots organized by AllocationID */ - private final DualKeyMap allocatedSlotsById; + private final DualKeyMap allocatedSlotsById; AllocatedSlots() { this.allocatedSlotsByTaskManager = new HashMap<>(16); @@ -749,18 +781,18 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway { /** * Adds a new slot to this collection. * - * @param slot The allocated slot + * @param allocatedSlot The allocated slot */ - void add(SlotRequestID slotRequestId, Slot slot) { - allocatedSlotsById.put(slot.getAllocatedSlot().getSlotAllocationId(), slotRequestId, slot); - - final ResourceID resourceID = slot.getTaskManagerID(); - Set slotsForTaskManager = allocatedSlotsByTaskManager.get(resourceID); - if (slotsForTaskManager == null) { - slotsForTaskManager = new HashSet<>(); - allocatedSlotsByTaskManager.put(resourceID, slotsForTaskManager); - } - slotsForTaskManager.add(slot); + void add(SlotRequestID slotRequestId, AllocatedSlot allocatedSlot) { + allocatedSlotsById.put(allocatedSlot.getAllocationId(), slotRequestId, allocatedSlot); + + final ResourceID resourceID = allocatedSlot.getTaskManagerLocation().getResourceID(); + + Set slotsForTaskManager = allocatedSlotsByTaskManager.computeIfAbsent( + resourceID, + resourceId -> new HashSet<>(4)); + + slotsForTaskManager.add(allocatedSlot); } /** @@ -769,11 +801,11 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway { * @param allocationID The allocation id * @return The allocated slot, null if we can't find a match */ - Slot get(final AllocationID allocationID) { + AllocatedSlot get(final AllocationID allocationID) { return allocatedSlotsById.getKeyA(allocationID); } - Slot get(final SlotRequestID slotRequestId) { + AllocatedSlot get(final SlotRequestID slotRequestId) { return allocatedSlotsById.getKeyB(slotRequestId); } @@ -787,30 +819,23 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway { return allocatedSlotsById.containsKeyA(slotAllocationId); } - /** - * Remove an allocation with slot. - * - * @param slot The slot needs to be removed - */ - boolean remove(final Slot slot) { - return remove(slot.getAllocatedSlot().getSlotAllocationId()) != null; - } - /** * Remove an allocation with slot. * * @param slotId The ID of the slot to be removed */ - Slot remove(final AllocationID slotId) { - Slot slot = allocatedSlotsById.removeKeyA(slotId); - if (slot != null) { - final ResourceID taskManagerId = slot.getTaskManagerID(); - Set slotsForTM = allocatedSlotsByTaskManager.get(taskManagerId); - slotsForTM.remove(slot); + AllocatedSlot remove(final AllocationID slotId) { + AllocatedSlot allocatedSlot = allocatedSlotsById.removeKeyA(slotId); + if (allocatedSlot != null) { + final ResourceID taskManagerId = allocatedSlot.getTaskManagerLocation().getResourceID(); + Set slotsForTM = allocatedSlotsByTaskManager.get(taskManagerId); + + slotsForTM.remove(allocatedSlot); + if (slotsForTM.isEmpty()) { allocatedSlotsByTaskManager.remove(taskManagerId); } - return slot; + return allocatedSlot; } else { return null; @@ -823,11 +848,11 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway { * @param resourceID The id of the TaskManager * @return Set of slots which are allocated from the same TaskManager */ - Set removeSlotsForTaskManager(final ResourceID resourceID) { - Set slotsForTaskManager = allocatedSlotsByTaskManager.remove(resourceID); + Set removeSlotsForTaskManager(final ResourceID resourceID) { + Set slotsForTaskManager = allocatedSlotsByTaskManager.remove(resourceID); if (slotsForTaskManager != null) { - for (Slot slot : slotsForTaskManager) { - allocatedSlotsById.removeKeyA(slot.getAllocatedSlot().getSlotAllocationId()); + for (AllocatedSlot allocatedSlot : slotsForTaskManager) { + allocatedSlotsById.removeKeyA(allocatedSlot.getAllocationId()); } return slotsForTaskManager; } @@ -852,7 +877,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway { } @VisibleForTesting - Set getSlotsForTaskManager(ResourceID resourceId) { + Set getSlotsForTaskManager(ResourceID resourceId) { if (allocatedSlotsByTaskManager.containsKey(resourceId)) { return allocatedSlotsByTaskManager.get(resourceId); } else { @@ -892,7 +917,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway { checkNotNull(slot); SlotAndTimestamp previous = availableSlots.put( - slot.getSlotAllocationId(), new SlotAndTimestamp(slot, timestamp)); + slot.getAllocationId(), new SlotAndTimestamp(slot, timestamp)); if (previous == null) { final ResourceID resourceID = slot.getTaskManagerLocation().getResourceID(); @@ -951,7 +976,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway { if (onTaskManager != null) { for (AllocatedSlot candidate : onTaskManager) { if (candidate.getResourceProfile().isMatching(resourceProfile)) { - remove(candidate.getSlotAllocationId()); + remove(candidate.getAllocationId()); return new SlotAndLocality(candidate, Locality.LOCAL); } } @@ -964,7 +989,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway { if (onHost != null) { for (AllocatedSlot candidate : onHost) { if (candidate.getResourceProfile().isMatching(resourceProfile)) { - remove(candidate.getSlotAllocationId()); + remove(candidate.getAllocationId()); return new SlotAndLocality(candidate, Locality.HOST_LOCAL); } } @@ -977,7 +1002,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway { final AllocatedSlot slot = candidate.slot(); if (slot.getResourceProfile().isMatching(resourceProfile)) { - remove(slot.getSlotAllocationId()); + remove(slot.getAllocationId()); return new SlotAndLocality( slot, hadLocationPreference ? Locality.NON_LOCAL : Locality.UNCONSTRAINED); } @@ -1002,7 +1027,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway { // remove from the base set and the by-host view for (AllocatedSlot slot : slotsForTm) { - availableSlots.remove(slot.getSlotAllocationId()); + availableSlots.remove(slot.getAllocationId()); slotsForHost.remove(slot); } @@ -1082,7 +1107,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway { @Override public CompletableFuture returnAllocatedSlot(Slot slot) { - gateway.returnAllocatedSlot(slot); + gateway.returnAllocatedSlot(slot.getSlotContext()); return CompletableFuture.completedFuture(true); } @@ -1097,7 +1122,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway { slotFuture.whenComplete( (LogicalSlot slot, Throwable failure) -> { if (failure != null) { - gateway.cancelSlotAllocation(requestId); + gateway.cancelSlotRequest(requestId); } }); return slotFuture; @@ -1113,25 +1138,25 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway { private final SlotRequestID slotRequestId; - private final CompletableFuture future; - private final ResourceProfile resourceProfile; + private final CompletableFuture allocatedSlotFuture; + PendingRequest( SlotRequestID slotRequestId, - CompletableFuture future, ResourceProfile resourceProfile) { this.slotRequestId = Preconditions.checkNotNull(slotRequestId); - this.future = Preconditions.checkNotNull(future); this.resourceProfile = Preconditions.checkNotNull(resourceProfile); + + allocatedSlotFuture = new CompletableFuture<>(); } public SlotRequestID getSlotRequestId() { return slotRequestId; } - public CompletableFuture getFuture() { - return future; + public CompletableFuture getAllocatedSlotFuture() { + return allocatedSlotFuture; } public ResourceProfile getResourceProfile() { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPoolGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPoolGateway.java index ad2a6a6369d714e069983d774fa0fbef7bbe5f24..71de054b31d954493176790faacbe412587bbc9a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPoolGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPoolGateway.java @@ -19,12 +19,12 @@ package org.apache.flink.runtime.instance; import org.apache.flink.api.common.time.Time; -import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit; -import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot; +import org.apache.flink.runtime.jobmanager.slots.SlotContext; +import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; import org.apache.flink.runtime.rpc.RpcGateway; @@ -76,9 +76,15 @@ public interface SlotPoolGateway extends RpcGateway { CompletableFuture releaseTaskManager(ResourceID resourceID); - CompletableFuture offerSlot(AllocatedSlot slot); + CompletableFuture offerSlot( + TaskManagerLocation taskManagerLocation, + TaskManagerGateway taskManagerGateway, + SlotOffer slotOffer); - CompletableFuture> offerSlots(Collection> offers); + CompletableFuture> offerSlots( + TaskManagerLocation taskManagerLocation, + TaskManagerGateway taskManagerGateway, + Collection offers); void failAllocation(AllocationID allocationID, Exception cause); @@ -93,7 +99,7 @@ public interface SlotPoolGateway extends RpcGateway { Iterable locationPreferences, @RpcTimeout Time timeout); - void returnAllocatedSlot(Slot slot); + void returnAllocatedSlot(SlotContext slotInformation); /** * Cancel a slot allocation request. @@ -101,7 +107,7 @@ public interface SlotPoolGateway extends RpcGateway { * @param requestId identifying the slot allocation request * @return Future acknowledge if the slot allocation has been cancelled */ - CompletableFuture cancelSlotAllocation(SlotRequestID requestId); + CompletableFuture cancelSlotRequest(SlotRequestID requestId); /** * Request ID identifying different slot requests. diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java index 8857be77a41adce1b57712276a17cfb6b702e72f..a3c38e05cc3b60aa4509ed90d0b2dc2b155e5553 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java @@ -364,7 +364,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener, Sl Locality locality = instanceLocalityPair.getRight(); try { - SimpleSlot slot = instanceToUse.allocateSimpleSlot(vertex.getJobId()); + SimpleSlot slot = instanceToUse.allocateSimpleSlot(); // if the instance has further available slots, re-add it to the set of available resources. if (instanceToUse.hasResourcesAvailable()) { @@ -426,7 +426,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener, Sl JobVertexID groupID = vertex.getJobvertexId(); // allocate a shared slot from the instance - SharedSlot sharedSlot = instanceToUse.allocateSharedSlot(vertex.getJobId(), groupAssignment); + SharedSlot sharedSlot = instanceToUse.allocateSharedSlot(groupAssignment); // if the instance has further available slots, re-add it to the set of available resources. if (instanceToUse.hasResourcesAvailable()) { @@ -562,7 +562,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener, Sl ExecutionVertex vertex = task.getTaskToExecute().getVertex(); try { - SimpleSlot newSlot = instance.allocateSimpleSlot(vertex.getJobId()); + SimpleSlot newSlot = instance.allocateSimpleSlot(); if (newSlot != null) { // success, remove from the task queue and notify the future diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SimpleSlotContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SimpleSlotContext.java new file mode 100644 index 0000000000000000000000000000000000000000..5dccc1fb87ccd8106ca511cc30dce9adb8fa72c4 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SimpleSlotContext.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmanager.slots; + +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; +import org.apache.flink.util.Preconditions; + +/** + * Simple implementation of the {@link SlotContext} interface for the legacy code. + */ +public class SimpleSlotContext implements SlotContext { + + private final AllocationID allocationId; + + private final TaskManagerLocation taskManagerLocation; + + private final int physicalSlotNumber; + + private final TaskManagerGateway taskManagerGateway; + + public SimpleSlotContext( + AllocationID allocationId, + TaskManagerLocation taskManagerLocation, + int physicalSlotNumber, + TaskManagerGateway taskManagerGateway) { + this.allocationId = Preconditions.checkNotNull(allocationId); + this.taskManagerLocation = Preconditions.checkNotNull(taskManagerLocation); + this.physicalSlotNumber = physicalSlotNumber; + this.taskManagerGateway = Preconditions.checkNotNull(taskManagerGateway); + } + + @Override + public AllocationID getAllocationId() { + return allocationId; + } + + @Override + public TaskManagerLocation getTaskManagerLocation() { + return taskManagerLocation; + } + + @Override + public int getPhysicalSlotNumber() { + return physicalSlotNumber; + } + + @Override + public TaskManagerGateway getTaskManagerGateway() { + return taskManagerGateway; + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SlotAndLocality.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SlotAndLocality.java index 3fe534610a29b03324d8909e7b6d693a3e0e66b4..5ae057da2187f8e29feeea3a871cd3a9aea266ca 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SlotAndLocality.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SlotAndLocality.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.jobmanager.slots; +import org.apache.flink.runtime.instance.AllocatedSlot; import org.apache.flink.runtime.jobmanager.scheduler.Locality; import static org.apache.flink.util.Preconditions.checkNotNull; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SlotContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SlotContext.java new file mode 100644 index 0000000000000000000000000000000000000000..d8a1aa41f2cdef7fca2ba71e3b49d4a4f8b29b19 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SlotContext.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmanager.slots; + +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.instance.Slot; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; + +/** + * Interface for the context of a logical {@link Slot}. This context contains information + * about the underlying allocated slot and how to communicate with the TaskManager on which + * it was allocated. + */ +public interface SlotContext { + + /** + * Gets the ID under which the slot is allocated, which uniquely identifies the slot. + * + * @return The ID under which the slot is allocated + */ + AllocationID getAllocationId(); + + /** + * Gets the location info of the TaskManager that offers this slot. + * + * @return The location info of the TaskManager that offers this slot + */ + TaskManagerLocation getTaskManagerLocation(); + + /** + * Gets the number of the slot. + * + * @return The number of the slot on the TaskManager. + */ + int getPhysicalSlotNumber(); + + /** + * Gets the actor gateway that can be used to send messages to the TaskManager. + *

+ * This method should be removed once the new interface-based RPC abstraction is in place + * + * @return The gateway that can be used to send messages to the TaskManager. + */ + TaskManagerGateway getTaskManagerGateway(); +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SlotException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SlotException.java new file mode 100644 index 0000000000000000000000000000000000000000..48e7e25af87afce55a9f0f4842dd5dff18b2650a --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SlotException.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmanager.slots; + +import org.apache.flink.util.FlinkException; + +/** + * Base class for slot related exceptions. + */ +public class SlotException extends FlinkException { + private static final long serialVersionUID = -8009227041400667546L; + + public SlotException(String message) { + super(message); + } + + public SlotException(Throwable cause) { + super(cause); + } + + public SlotException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java index 687b6d12a64c3f37e90ec71487455c242efee62c..324557fbbe37ad19946dcb16676f3f718c91dedb 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java @@ -65,7 +65,6 @@ import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobmanager.OnCompletionActions; import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException; -import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot; import org.apache.flink.runtime.jobmaster.message.ClassloadingProps; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; @@ -109,7 +108,6 @@ import javax.annotation.Nullable; import java.io.IOException; import java.net.InetSocketAddress; -import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -649,7 +647,7 @@ public class JobMaster extends FencedRpcEndpoint implements JobMast @Override public CompletableFuture> offerSlots( final ResourceID taskManagerId, - final Iterable slots, + final Collection slots, final Time timeout) { Tuple2 taskManager = registeredTaskManagers.get(taskManagerId); @@ -658,27 +656,15 @@ public class JobMaster extends FencedRpcEndpoint implements JobMast return FutureUtils.completedExceptionally(new Exception("Unknown TaskManager " + taskManagerId)); } - final JobID jid = jobGraph.getJobID(); final TaskManagerLocation taskManagerLocation = taskManager.f0; final TaskExecutorGateway taskExecutorGateway = taskManager.f1; - final ArrayList> slotsAndOffers = new ArrayList<>(); - final RpcTaskManagerGateway rpcTaskManagerGateway = new RpcTaskManagerGateway(taskExecutorGateway, getFencingToken()); - for (SlotOffer slotOffer : slots) { - final AllocatedSlot slot = new AllocatedSlot( - slotOffer.getAllocationId(), - jid, - taskManagerLocation, - slotOffer.getSlotIndex(), - slotOffer.getResourceProfile(), - rpcTaskManagerGateway); - - slotsAndOffers.add(new Tuple2<>(slot, slotOffer)); - } - - return slotPoolGateway.offerSlots(slotsAndOffers); + return slotPoolGateway.offerSlots( + taskManagerLocation, + rpcTaskManagerGateway, + slots); } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java index ad906c2960e1cdfcae465957eee7cd957493f0c3..09d995e6773187ed6055713dd0108de7e4714413 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java @@ -195,7 +195,7 @@ public interface JobMasterGateway extends CheckpointCoordinatorGateway, FencedRp */ CompletableFuture> offerSlots( final ResourceID taskManagerId, - final Iterable slots, + final Collection slots, @RpcTimeout final Time timeout); /** diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java index b4894788816c225ba68057b6bb2a24f002588a66..16da8e623eb1ffca89f137c31a5c50a2e34d2d92 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java @@ -184,7 +184,7 @@ public class ExecutionGraphDeploymentTest extends TestLogger { final Instance instance = getInstance(new ActorTaskManagerGateway(instanceGateway)); - final SimpleSlot slot = instance.allocateSimpleSlot(jobId); + final SimpleSlot slot = instance.allocateSimpleSlot(); assertEquals(ExecutionState.CREATED, vertex.getExecutionState()); @@ -631,13 +631,13 @@ public class ExecutionGraphDeploymentTest extends TestLogger { final TaskManagerLocation localTaskManagerLocation = new LocalTaskManagerLocation(); - final SimpleSlot sourceSlot1 = createSlot(executionGraph.getJobID(), localTaskManagerLocation, 0); + final SimpleSlot sourceSlot1 = createSlot(localTaskManagerLocation, 0); - final SimpleSlot sourceSlot2 = createSlot(executionGraph.getJobID(), localTaskManagerLocation, 1); + final SimpleSlot sourceSlot2 = createSlot(localTaskManagerLocation, 1); - final SimpleSlot sinkSlot1 = createSlot(executionGraph.getJobID(), localTaskManagerLocation, 0); + final SimpleSlot sinkSlot1 = createSlot(localTaskManagerLocation, 0); - final SimpleSlot sinkSlot2 = createSlot(executionGraph.getJobID(), localTaskManagerLocation, 1); + final SimpleSlot sinkSlot2 = createSlot(localTaskManagerLocation, 1); slotFutures.get(sourceVertexId)[0].complete(sourceSlot1); slotFutures.get(sourceVertexId)[1].complete(sourceSlot2); @@ -654,9 +654,8 @@ public class ExecutionGraphDeploymentTest extends TestLogger { } } - private SimpleSlot createSlot(JobID jobId, TaskManagerLocation taskManagerLocation, int index) { + private SimpleSlot createSlot(TaskManagerLocation taskManagerLocation, int index) { return new SimpleSlot( - jobId, mock(SlotOwner.class), taskManagerLocation, index, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java index 2e6558a57e456a114f4d9e69c17cca6734f4956b..586f51b65b461dca827a71c618c9ffd0eef47668 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java @@ -26,7 +26,6 @@ import org.apache.flink.runtime.blob.VoidBlobWriter; import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory; import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceID; -import org.apache.flink.runtime.clusterframework.types.ResourceProfile; import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy; import org.apache.flink.runtime.instance.LogicalSlot; @@ -39,7 +38,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.ScheduleMode; -import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot; +import org.apache.flink.runtime.jobmanager.slots.SimpleSlotContext; import org.apache.flink.runtime.jobmanager.slots.SlotOwner; import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; import org.apache.flink.runtime.jobmanager.slots.TestingSlotOwner; @@ -285,7 +284,7 @@ public class ExecutionGraphSchedulingTest extends TestLogger { final BlockingQueue returnedSlots = new ArrayBlockingQueue<>(parallelism); final TestingSlotOwner slotOwner = new TestingSlotOwner(); slotOwner.setReturnAllocatedSlotConsumer( - (Slot slot) -> returnedSlots.offer(slot.getAllocatedSlot().getSlotAllocationId())); + (Slot slot) -> returnedSlots.offer(slot.getSlotContext().getAllocationId())); final SimpleSlot[] sourceSlots = new SimpleSlot[parallelism]; final SimpleSlot[] targetSlots = new SimpleSlot[parallelism]; @@ -366,7 +365,7 @@ public class ExecutionGraphSchedulingTest extends TestLogger { final BlockingQueue returnedSlots = new ArrayBlockingQueue<>(2); final TestingSlotOwner slotOwner = new TestingSlotOwner(); slotOwner.setReturnAllocatedSlotConsumer( - (Slot slot) -> returnedSlots.offer(slot.getAllocatedSlot().getSlotAllocationId())); + (Slot slot) -> returnedSlots.offer(slot.getSlotContext().getAllocationId())); final TaskManagerGateway taskManager = mock(TaskManagerGateway.class); final SimpleSlot[] slots = new SimpleSlot[parallelism]; @@ -448,8 +447,11 @@ public class ExecutionGraphSchedulingTest extends TestLogger { TaskManagerLocation location = new TaskManagerLocation( ResourceID.generate(), InetAddress.getLoopbackAddress(), 12345); - AllocatedSlot slot = new AllocatedSlot( - new AllocationID(), jobId, location, 0, ResourceProfile.UNKNOWN, taskManager); + SimpleSlotContext slot = new SimpleSlotContext( + new AllocationID(), + location, + 0, + taskManager); return new SimpleSlot(slot, slotOwner, 0); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphStopTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphStopTest.java index 4ce3f9dbcfc8e9881f67952eb505eaf80bcdd42a..3c8d994ddcb315a50da5f7228f1e74c413528ea4 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphStopTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphStopTest.java @@ -111,28 +111,28 @@ public class ExecutionGraphStopTest extends TestLogger { // deploy source 1 for (ExecutionVertex ev : eg.getJobVertex(source1.getID()).getTaskVertices()) { - SimpleSlot slot = ExecutionGraphTestUtils.createMockSimpleSlot(jid, sourceGateway); + SimpleSlot slot = ExecutionGraphTestUtils.createMockSimpleSlot(sourceGateway); ev.getCurrentExecutionAttempt().tryAssignResource(slot); ev.getCurrentExecutionAttempt().deploy(); } // deploy source 2 for (ExecutionVertex ev : eg.getJobVertex(source2.getID()).getTaskVertices()) { - SimpleSlot slot = ExecutionGraphTestUtils.createMockSimpleSlot(jid, sourceGateway); + SimpleSlot slot = ExecutionGraphTestUtils.createMockSimpleSlot(sourceGateway); ev.getCurrentExecutionAttempt().tryAssignResource(slot); ev.getCurrentExecutionAttempt().deploy(); } // deploy non-source 1 for (ExecutionVertex ev : eg.getJobVertex(nonSource1.getID()).getTaskVertices()) { - SimpleSlot slot = ExecutionGraphTestUtils.createMockSimpleSlot(jid, nonSourceGateway); + SimpleSlot slot = ExecutionGraphTestUtils.createMockSimpleSlot(nonSourceGateway); ev.getCurrentExecutionAttempt().tryAssignResource(slot); ev.getCurrentExecutionAttempt().deploy(); } // deploy non-source 2 for (ExecutionVertex ev : eg.getJobVertex(nonSource2.getID()).getTaskVertices()) { - SimpleSlot slot = ExecutionGraphTestUtils.createMockSimpleSlot(jid, nonSourceGateway); + SimpleSlot slot = ExecutionGraphTestUtils.createMockSimpleSlot(nonSourceGateway); ev.getCurrentExecutionAttempt().tryAssignResource(slot); ev.getCurrentExecutionAttempt().deploy(); } @@ -164,7 +164,7 @@ public class ExecutionGraphStopTest extends TestLogger { when(gateway.stopTask(any(ExecutionAttemptID.class), any(Time.class))) .thenReturn(CompletableFuture.completedFuture(Acknowledge.get())); - final SimpleSlot slot = ExecutionGraphTestUtils.createMockSimpleSlot(jid, gateway); + final SimpleSlot slot = ExecutionGraphTestUtils.createMockSimpleSlot(gateway); exec.tryAssignResource(slot); exec.deploy(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java index 42a63ec481797455d20d9479bed675a3f6565e80..06ffaa0c45816b634397f77d06facf56313b077a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java @@ -29,7 +29,6 @@ import org.apache.flink.runtime.blob.VoidBlobWriter; import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory; import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceID; -import org.apache.flink.runtime.clusterframework.types.ResourceProfile; import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.executiongraph.failover.FailoverRegion; @@ -49,7 +48,7 @@ import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.jobmanager.scheduler.Scheduler; -import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot; +import org.apache.flink.runtime.jobmanager.slots.SimpleSlotContext; import org.apache.flink.runtime.jobmanager.slots.SlotOwner; import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; import org.apache.flink.runtime.messages.Acknowledge; @@ -240,19 +239,20 @@ public class ExecutionGraphTestUtils { // Mocking Slots // ------------------------------------------------------------------------ - public static SimpleSlot createMockSimpleSlot(JobID jid, TaskManagerGateway gateway) { + public static SimpleSlot createMockSimpleSlot(TaskManagerGateway gateway) { final TaskManagerLocation location = new TaskManagerLocation( ResourceID.generate(), InetAddress.getLoopbackAddress(), 6572); - final AllocatedSlot allocatedSlot = new AllocatedSlot( + final SimpleSlotContext allocatedSlot = new SimpleSlotContext( new AllocationID(), - jid, location, 0, - ResourceProfile.UNKNOWN, gateway); - return new SimpleSlot(allocatedSlot, mock(SlotOwner.class), 0); + return new SimpleSlot( + allocatedSlot, + mock(SlotOwner.class), + 0); } // ------------------------------------------------------------------------ diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java index c6fb83605b3146ee0693067cc7ba0f3fb70eea54..71d6f517a5f3871d7b43f525e25f83275cd27471 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java @@ -81,7 +81,6 @@ public class ExecutionTest extends TestLogger { final SingleSlotTestingSlotOwner slotOwner = new SingleSlotTestingSlotOwner(); final SimpleSlot slot = new SimpleSlot( - new JobID(), slotOwner, new LocalTaskManagerLocation(), 0, @@ -121,7 +120,6 @@ public class ExecutionTest extends TestLogger { final SingleSlotTestingSlotOwner slotOwner = new SingleSlotTestingSlotOwner(); final SimpleSlot slot = new SimpleSlot( - new JobID(), slotOwner, new LocalTaskManagerLocation(), 0, @@ -171,7 +169,6 @@ public class ExecutionTest extends TestLogger { final SingleSlotTestingSlotOwner slotOwner = new SingleSlotTestingSlotOwner(); final SimpleSlot slot = new SimpleSlot( - new JobID(), slotOwner, new LocalTaskManagerLocation(), 0, @@ -285,11 +282,12 @@ public class ExecutionTest extends TestLogger { final SingleSlotTestingSlotOwner slotOwner = new SingleSlotTestingSlotOwner(); final SimpleSlot slot = new SimpleSlot( - new JobID(), slotOwner, new LocalTaskManagerLocation(), 0, - new SimpleAckingTaskManagerGateway()); + new SimpleAckingTaskManagerGateway(), + null, + null); final ProgrammedSlotProvider slotProvider = new ProgrammedSlotProvider(1); slotProvider.addSlot(jobVertexId, 0, CompletableFuture.completedFuture(slot)); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java index 1b8dacad69fab0faaf573e492e3674100cbea527..cd613f0f50a2e96ef27383978e09bdd4fa4a9130 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java @@ -18,36 +18,42 @@ package org.apache.flink.runtime.executiongraph; -import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.*; -import static org.junit.Assert.*; -import static org.mockito.Mockito.mock; - -import java.io.IOException; - import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.instance.ActorGateway; import org.apache.flink.runtime.instance.BaseTestingActorGateway; import org.apache.flink.runtime.instance.DummyActorGateway; -import org.apache.flink.runtime.instance.ActorGateway; -import org.apache.flink.runtime.instance.SimpleSlot; import org.apache.flink.runtime.instance.Instance; -import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.instance.SimpleSlot; import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint; import org.apache.flink.runtime.jobmanager.scheduler.Scheduler; import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway; import org.apache.flink.runtime.messages.Acknowledge; -import org.apache.flink.runtime.messages.TaskMessages.SubmitTask; import org.apache.flink.runtime.messages.TaskMessages.CancelTask; +import org.apache.flink.runtime.messages.TaskMessages.SubmitTask; import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.runtime.testutils.DirectScheduledExecutorService; import org.apache.flink.util.TestLogger; import org.junit.Test; +import java.io.IOException; + import scala.concurrent.ExecutionContext; +import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getExecutionVertex; +import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getInstance; +import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.setVertexResource; +import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.setVertexState; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; + @SuppressWarnings("serial") public class ExecutionVertexCancelTest extends TestLogger { @@ -134,7 +140,7 @@ public class ExecutionVertexCancelTest extends TestLogger { executionContext, 2); Instance instance = getInstance(new ActorTaskManagerGateway(actorGateway)); - SimpleSlot slot = instance.allocateSimpleSlot(new JobID()); + SimpleSlot slot = instance.allocateSimpleSlot(); vertex.deployToSlot(slot); @@ -202,7 +208,7 @@ public class ExecutionVertexCancelTest extends TestLogger { 2); Instance instance = getInstance(new ActorTaskManagerGateway(actorGateway)); - SimpleSlot slot = instance.allocateSimpleSlot(new JobID()); + SimpleSlot slot = instance.allocateSimpleSlot(); vertex.deployToSlot(slot); @@ -262,7 +268,7 @@ public class ExecutionVertexCancelTest extends TestLogger { 1); Instance instance = getInstance(new ActorTaskManagerGateway(actorGateway)); - SimpleSlot slot = instance.allocateSimpleSlot(new JobID()); + SimpleSlot slot = instance.allocateSimpleSlot(); setVertexResource(vertex, slot); setVertexState(vertex, ExecutionState.RUNNING); @@ -302,7 +308,7 @@ public class ExecutionVertexCancelTest extends TestLogger { 1); Instance instance = getInstance(new ActorTaskManagerGateway(actorGateway)); - SimpleSlot slot = instance.allocateSimpleSlot(new JobID()); + SimpleSlot slot = instance.allocateSimpleSlot(); setVertexResource(vertex, slot); setVertexState(vertex, ExecutionState.RUNNING); @@ -350,7 +356,7 @@ public class ExecutionVertexCancelTest extends TestLogger { 1); Instance instance = getInstance(new ActorTaskManagerGateway(actorGateway)); - SimpleSlot slot = instance.allocateSimpleSlot(new JobID()); + SimpleSlot slot = instance.allocateSimpleSlot(); setVertexResource(vertex, slot); setVertexState(vertex, ExecutionState.RUNNING); @@ -383,7 +389,7 @@ public class ExecutionVertexCancelTest extends TestLogger { final ActorGateway gateway = new CancelSequenceActorGateway(TestingUtils.directExecutionContext(), 0); Instance instance = getInstance(new ActorTaskManagerGateway(gateway)); - SimpleSlot slot = instance.allocateSimpleSlot(new JobID()); + SimpleSlot slot = instance.allocateSimpleSlot(); setVertexResource(vertex, slot); setVertexState(vertex, ExecutionState.RUNNING); @@ -458,7 +464,7 @@ public class ExecutionVertexCancelTest extends TestLogger { // the scheduler (or any caller) needs to know that the slot should be released try { Instance instance = getInstance(new ActorTaskManagerGateway(DummyActorGateway.INSTANCE)); - SimpleSlot slot = instance.allocateSimpleSlot(new JobID()); + SimpleSlot slot = instance.allocateSimpleSlot(); vertex.deployToSlot(slot); fail("Method should throw an exception"); @@ -501,7 +507,7 @@ public class ExecutionVertexCancelTest extends TestLogger { setVertexState(vertex, ExecutionState.CANCELING); Instance instance = getInstance(new ActorTaskManagerGateway(DummyActorGateway.INSTANCE)); - SimpleSlot slot = instance.allocateSimpleSlot(new JobID()); + SimpleSlot slot = instance.allocateSimpleSlot(); vertex.deployToSlot(slot); fail("Method should throw an exception"); @@ -517,7 +523,7 @@ public class ExecutionVertexCancelTest extends TestLogger { AkkaUtils.getDefaultTimeout()); Instance instance = getInstance(new ActorTaskManagerGateway(DummyActorGateway.INSTANCE)); - SimpleSlot slot = instance.allocateSimpleSlot(new JobID()); + SimpleSlot slot = instance.allocateSimpleSlot(); setVertexResource(vertex, slot); setVertexState(vertex, ExecutionState.CANCELING); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java index 973c7d44f96782fd967659d82540c4ee083d0236..7f97d126054d9df95fca0d75ff1c19e9f00008a0 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java @@ -32,7 +32,7 @@ import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.ScheduleMode; import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway; -import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot; +import org.apache.flink.runtime.instance.AllocatedSlot; import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.runtime.testutils.DirectScheduledExecutorService; import org.apache.flink.util.TestLogger; @@ -67,7 +67,7 @@ public class ExecutionVertexDeploymentTest extends TestLogger { Instance instance = getInstance( new ActorTaskManagerGateway( new SimpleActorGateway(TestingUtils.directExecutionContext()))); - final SimpleSlot slot = instance.allocateSimpleSlot(ejv.getJobId()); + final SimpleSlot slot = instance.allocateSimpleSlot(); final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0], AkkaUtils.getDefaultTimeout()); @@ -104,7 +104,7 @@ public class ExecutionVertexDeploymentTest extends TestLogger { final Instance instance = getInstance( new ActorTaskManagerGateway( new SimpleActorGateway(TestingUtils.directExecutionContext()))); - final SimpleSlot slot = instance.allocateSimpleSlot(ejv.getJobId()); + final SimpleSlot slot = instance.allocateSimpleSlot(); final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0], AkkaUtils.getDefaultTimeout()); @@ -146,7 +146,7 @@ public class ExecutionVertexDeploymentTest extends TestLogger { final Instance instance = getInstance( new ActorTaskManagerGateway( new SimpleActorGateway(TestingUtils.defaultExecutionContext()))); - final SimpleSlot slot = instance.allocateSimpleSlot(ejv.getJobId()); + final SimpleSlot slot = instance.allocateSimpleSlot(); assertEquals(ExecutionState.CREATED, vertex.getExecutionState()); @@ -191,7 +191,7 @@ public class ExecutionVertexDeploymentTest extends TestLogger { final Instance instance = getInstance( new ActorTaskManagerGateway( new SimpleFailingActorGateway(TestingUtils.directExecutionContext()))); - final SimpleSlot slot = instance.allocateSimpleSlot(ejv.getJobId()); + final SimpleSlot slot = instance.allocateSimpleSlot(); assertEquals(ExecutionState.CREATED, vertex.getExecutionState()); @@ -221,7 +221,7 @@ public class ExecutionVertexDeploymentTest extends TestLogger { final Instance instance = getInstance( new ActorTaskManagerGateway( new SimpleFailingActorGateway(TestingUtils.directExecutionContext()))); - final SimpleSlot slot = instance.allocateSimpleSlot(ejv.getJobId()); + final SimpleSlot slot = instance.allocateSimpleSlot(); assertEquals(ExecutionState.CREATED, vertex.getExecutionState()); @@ -265,7 +265,7 @@ public class ExecutionVertexDeploymentTest extends TestLogger { final Instance instance = getInstance( new ActorTaskManagerGateway( new SimpleActorGateway(TestingUtils.directExecutionContext()))); - final SimpleSlot slot = instance.allocateSimpleSlot(ejv.getJobId()); + final SimpleSlot slot = instance.allocateSimpleSlot(); assertEquals(ExecutionState.CREATED, vertex.getExecutionState()); vertex.deployToSlot(slot); @@ -310,7 +310,7 @@ public class ExecutionVertexDeploymentTest extends TestLogger { context, 2))); - final SimpleSlot slot = instance.allocateSimpleSlot(ejv.getJobId()); + final SimpleSlot slot = instance.allocateSimpleSlot(); assertEquals(ExecutionState.CREATED, vertex.getExecutionState()); @@ -372,7 +372,7 @@ public class ExecutionVertexDeploymentTest extends TestLogger { result.getPartitions()[0].addConsumer(mockEdge, 0); AllocatedSlot allocatedSlot = mock(AllocatedSlot.class); - when(allocatedSlot.getSlotAllocationId()).thenReturn(new AllocationID()); + when(allocatedSlot.getAllocationId()).thenReturn(new AllocationID()); LogicalSlot slot = mock(LogicalSlot.class); when(slot.getAllocationId()).thenReturn(new AllocationID()); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java index 15d021aad0901f81e76fe3b0af97a6b4edf7558c..98f72596a83e49f4ba0fc2e2ed964c32c2a40dd3 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java @@ -27,7 +27,6 @@ import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory; import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceID; -import org.apache.flink.runtime.clusterframework.types.ResourceProfile; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy; import org.apache.flink.runtime.instance.SimpleSlot; @@ -37,7 +36,8 @@ import org.apache.flink.runtime.jobgraph.DistributionPattern; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.JobVertexID; -import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot; +import org.apache.flink.runtime.jobmanager.slots.SlotContext; +import org.apache.flink.runtime.jobmanager.slots.SimpleSlotContext; import org.apache.flink.runtime.jobmanager.slots.SlotOwner; import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; @@ -233,8 +233,8 @@ public class ExecutionVertexLocalityTest extends TestLogger { // - mocking the scheduler created fragile tests that break whenever the scheduler is adjusted // - exposing test methods in the ExecutionVertex leads to undesirable setters - AllocatedSlot slot = new AllocatedSlot( - new AllocationID(), jobId, location, 0, ResourceProfile.UNKNOWN, mock(TaskManagerGateway.class)); + SlotContext slot = new SimpleSlotContext( + new AllocationID(), location, 0, mock(TaskManagerGateway.class)); SimpleSlot simpleSlot = new SimpleSlot(slot, mock(SlotOwner.class), 0); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java index 2941739a2d18ee2cca713ae4a59273ffb6c79cba..9310912caab0e99b1e2a961592dedd5bb54a7135 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java @@ -57,7 +57,7 @@ public class ExecutionVertexSchedulingTest { // a slot than cannot be deployed to final Instance instance = getInstance(new ActorTaskManagerGateway(DummyActorGateway.INSTANCE)); - final SimpleSlot slot = instance.allocateSimpleSlot(ejv.getJobId()); + final SimpleSlot slot = instance.allocateSimpleSlot(); slot.releaseInstanceSlot(); assertTrue(slot.isReleased()); @@ -89,7 +89,7 @@ public class ExecutionVertexSchedulingTest { // a slot than cannot be deployed to final Instance instance = getInstance(new ActorTaskManagerGateway(DummyActorGateway.INSTANCE)); - final SimpleSlot slot = instance.allocateSimpleSlot(ejv.getJobId()); + final SimpleSlot slot = instance.allocateSimpleSlot(); slot.releaseInstanceSlot(); assertTrue(slot.isReleased()); @@ -126,7 +126,7 @@ public class ExecutionVertexSchedulingTest { final Instance instance = getInstance(new ActorTaskManagerGateway( new ExecutionGraphTestUtils.SimpleActorGateway(TestingUtils.defaultExecutionContext()))); - final SimpleSlot slot = instance.allocateSimpleSlot(ejv.getJobId()); + final SimpleSlot slot = instance.allocateSimpleSlot(); Scheduler scheduler = mock(Scheduler.class); CompletableFuture future = new CompletableFuture<>(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleSlotProvider.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleSlotProvider.java index 14e0e660fba4eacd77d2cb4d5b294f4727a486c2..9a19d243a22e287bd4f26da399fdbeb209f01cc5 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleSlotProvider.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleSlotProvider.java @@ -21,7 +21,6 @@ package org.apache.flink.runtime.executiongraph.utils; import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceID; -import org.apache.flink.runtime.clusterframework.types.ResourceProfile; import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.instance.LogicalSlot; import org.apache.flink.runtime.instance.SimpleSlot; @@ -29,7 +28,8 @@ import org.apache.flink.runtime.instance.Slot; import org.apache.flink.runtime.instance.SlotProvider; import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException; import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit; -import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot; +import org.apache.flink.runtime.jobmanager.slots.SlotContext; +import org.apache.flink.runtime.jobmanager.slots.SimpleSlotContext; import org.apache.flink.runtime.jobmanager.slots.SlotOwner; import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; @@ -47,7 +47,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull; */ public class SimpleSlotProvider implements SlotProvider, SlotOwner { - private final ArrayDeque slots; + private final ArrayDeque slots; public SimpleSlotProvider(JobID jobId, int numSlots) { this(jobId, numSlots, new SimpleAckingTaskManagerGateway()); @@ -60,12 +60,10 @@ public class SimpleSlotProvider implements SlotProvider, SlotOwner { this.slots = new ArrayDeque<>(numSlots); for (int i = 0; i < numSlots; i++) { - AllocatedSlot as = new AllocatedSlot( + SimpleSlotContext as = new SimpleSlotContext( new AllocationID(), - jobId, new TaskManagerLocation(ResourceID.generate(), InetAddress.getLoopbackAddress(), 10000 + i), 0, - ResourceProfile.UNKNOWN, taskManagerGateway); slots.add(as); } @@ -76,7 +74,7 @@ public class SimpleSlotProvider implements SlotProvider, SlotOwner { ScheduledUnit task, boolean allowQueued, Collection preferredLocations) { - final AllocatedSlot slot; + final SlotContext slot; synchronized (slots) { if (slots.isEmpty()) { @@ -98,7 +96,7 @@ public class SimpleSlotProvider implements SlotProvider, SlotOwner { @Override public CompletableFuture returnAllocatedSlot(Slot slot) { synchronized (slots) { - slots.add(slot.getAllocatedSlot()); + slots.add(slot.getSlotContext()); } return CompletableFuture.completedFuture(true); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AllocatedSlotsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AllocatedSlotsTest.java index 0e4bfc06830fa00407443a742d0ddd8bb7e536ad..bc396c1fe92e5b19e9eab50411bf39d48ddfc052 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AllocatedSlotsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AllocatedSlotsTest.java @@ -20,7 +20,12 @@ package org.apache.flink.runtime.instance; import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceID; -import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway; +import org.apache.flink.runtime.jobmanager.slots.DummySlotOwner; +import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; +import org.apache.flink.util.TestLogger; import org.junit.Test; @@ -28,10 +33,8 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; -public class AllocatedSlotsTest { +public class AllocatedSlotsTest extends TestLogger { @Test public void testOperations() throws Exception { @@ -39,12 +42,13 @@ public class AllocatedSlotsTest { final AllocationID allocation1 = new AllocationID(); final SlotPoolGateway.SlotRequestID slotRequestID = new SlotPoolGateway.SlotRequestID(); - final ResourceID resource1 = new ResourceID("resource1"); - final Slot slot1 = createSlot(resource1, allocation1); + final TaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation(); + final ResourceID resource1 = taskManagerLocation.getResourceID(); + final AllocatedSlot slot1 = createSlot(allocation1, taskManagerLocation); allocatedSlots.add(slotRequestID, slot1); - assertTrue(allocatedSlots.contains(slot1.getAllocatedSlot().getSlotAllocationId())); + assertTrue(allocatedSlots.contains(slot1.getAllocationId())); assertTrue(allocatedSlots.containResource(resource1)); assertEquals(slot1, allocatedSlots.get(allocation1)); @@ -53,12 +57,12 @@ public class AllocatedSlotsTest { final AllocationID allocation2 = new AllocationID(); final SlotPoolGateway.SlotRequestID slotRequestID2 = new SlotPoolGateway.SlotRequestID(); - final Slot slot2 = createSlot(resource1, allocation2); + final AllocatedSlot slot2 = createSlot(allocation2, taskManagerLocation); allocatedSlots.add(slotRequestID2, slot2); - assertTrue(allocatedSlots.contains(slot1.getAllocatedSlot().getSlotAllocationId())); - assertTrue(allocatedSlots.contains(slot2.getAllocatedSlot().getSlotAllocationId())); + assertTrue(allocatedSlots.contains(slot1.getAllocationId())); + assertTrue(allocatedSlots.contains(slot2.getAllocationId())); assertTrue(allocatedSlots.containResource(resource1)); assertEquals(slot1, allocatedSlots.get(allocation1)); @@ -68,14 +72,15 @@ public class AllocatedSlotsTest { final AllocationID allocation3 = new AllocationID(); final SlotPoolGateway.SlotRequestID slotRequestID3 = new SlotPoolGateway.SlotRequestID(); - final ResourceID resource2 = new ResourceID("resource2"); - final Slot slot3 = createSlot(resource2, allocation3); + final TaskManagerLocation taskManagerLocation2 = new LocalTaskManagerLocation(); + final ResourceID resource2 = taskManagerLocation2.getResourceID(); + final AllocatedSlot slot3 = createSlot(allocation3, taskManagerLocation2); allocatedSlots.add(slotRequestID3, slot3); - assertTrue(allocatedSlots.contains(slot1.getAllocatedSlot().getSlotAllocationId())); - assertTrue(allocatedSlots.contains(slot2.getAllocatedSlot().getSlotAllocationId())); - assertTrue(allocatedSlots.contains(slot3.getAllocatedSlot().getSlotAllocationId())); + assertTrue(allocatedSlots.contains(slot1.getAllocationId())); + assertTrue(allocatedSlots.contains(slot2.getAllocationId())); + assertTrue(allocatedSlots.contains(slot3.getAllocationId())); assertTrue(allocatedSlots.containResource(resource1)); assertTrue(allocatedSlots.containResource(resource2)); @@ -86,11 +91,11 @@ public class AllocatedSlotsTest { assertEquals(1, allocatedSlots.getSlotsForTaskManager(resource2).size()); assertEquals(3, allocatedSlots.size()); - allocatedSlots.remove(slot2); + allocatedSlots.remove(slot2.getAllocationId()); - assertTrue(allocatedSlots.contains(slot1.getAllocatedSlot().getSlotAllocationId())); - assertFalse(allocatedSlots.contains(slot2.getAllocatedSlot().getSlotAllocationId())); - assertTrue(allocatedSlots.contains(slot3.getAllocatedSlot().getSlotAllocationId())); + assertTrue(allocatedSlots.contains(slot1.getAllocationId())); + assertFalse(allocatedSlots.contains(slot2.getAllocationId())); + assertTrue(allocatedSlots.contains(slot3.getAllocationId())); assertTrue(allocatedSlots.containResource(resource1)); assertTrue(allocatedSlots.containResource(resource2)); @@ -101,11 +106,11 @@ public class AllocatedSlotsTest { assertEquals(1, allocatedSlots.getSlotsForTaskManager(resource2).size()); assertEquals(2, allocatedSlots.size()); - allocatedSlots.remove(slot1); + allocatedSlots.remove(slot1.getAllocationId()); - assertFalse(allocatedSlots.contains(slot1.getAllocatedSlot().getSlotAllocationId())); - assertFalse(allocatedSlots.contains(slot2.getAllocatedSlot().getSlotAllocationId())); - assertTrue(allocatedSlots.contains(slot3.getAllocatedSlot().getSlotAllocationId())); + assertFalse(allocatedSlots.contains(slot1.getAllocationId())); + assertFalse(allocatedSlots.contains(slot2.getAllocationId())); + assertTrue(allocatedSlots.contains(slot3.getAllocationId())); assertFalse(allocatedSlots.containResource(resource1)); assertTrue(allocatedSlots.containResource(resource2)); @@ -116,11 +121,11 @@ public class AllocatedSlotsTest { assertEquals(1, allocatedSlots.getSlotsForTaskManager(resource2).size()); assertEquals(1, allocatedSlots.size()); - allocatedSlots.remove(slot3); + allocatedSlots.remove(slot3.getAllocationId()); - assertFalse(allocatedSlots.contains(slot1.getAllocatedSlot().getSlotAllocationId())); - assertFalse(allocatedSlots.contains(slot2.getAllocatedSlot().getSlotAllocationId())); - assertFalse(allocatedSlots.contains(slot3.getAllocatedSlot().getSlotAllocationId())); + assertFalse(allocatedSlots.contains(slot1.getAllocationId())); + assertFalse(allocatedSlots.contains(slot2.getAllocationId())); + assertFalse(allocatedSlots.contains(slot3.getAllocationId())); assertFalse(allocatedSlots.containResource(resource1)); assertFalse(allocatedSlots.containResource(resource2)); @@ -132,13 +137,13 @@ public class AllocatedSlotsTest { assertEquals(0, allocatedSlots.size()); } - private Slot createSlot(final ResourceID resourceId, final AllocationID allocationId) { - AllocatedSlot mockAllocatedSlot = mock(AllocatedSlot.class); - Slot slot = mock(Slot.class); - when(slot.getTaskManagerID()).thenReturn(resourceId); - when(slot.getAllocatedSlot()).thenReturn(mockAllocatedSlot); - - when(mockAllocatedSlot.getSlotAllocationId()).thenReturn(allocationId); - return slot; + private AllocatedSlot createSlot(final AllocationID allocationId, final TaskManagerLocation taskManagerLocation) { + return new AllocatedSlot( + allocationId, + taskManagerLocation, + 0, + ResourceProfile.UNKNOWN, + new SimpleAckingTaskManagerGateway(), + new DummySlotOwner()); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AvailableSlotsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AvailableSlotsTest.java index 4ed88c4294bb7f765f3f4e1b5ea5ba85dc14cb4b..9ede8997173a89ffb0efe1a79fb0e0cdae9ca8d4 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AvailableSlotsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AvailableSlotsTest.java @@ -18,14 +18,15 @@ package org.apache.flink.runtime.instance; -import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; -import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot; +import org.apache.flink.runtime.jobmanager.slots.DummySlotOwner; import org.apache.flink.runtime.jobmanager.slots.SlotAndLocality; import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; +import org.apache.flink.util.TestLogger; + import org.junit.Test; import static org.junit.Assert.assertEquals; @@ -35,7 +36,7 @@ import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -public class AvailableSlotsTest { +public class AvailableSlotsTest extends TestLogger { static final ResourceProfile DEFAULT_TESTING_PROFILE = new ResourceProfile(1.0, 512); @@ -57,27 +58,27 @@ public class AvailableSlotsTest { availableSlots.add(slot3, 3L); assertEquals(3, availableSlots.size()); - assertTrue(availableSlots.contains(slot1.getSlotAllocationId())); - assertTrue(availableSlots.contains(slot2.getSlotAllocationId())); - assertTrue(availableSlots.contains(slot3.getSlotAllocationId())); + assertTrue(availableSlots.contains(slot1.getAllocationId())); + assertTrue(availableSlots.contains(slot2.getAllocationId())); + assertTrue(availableSlots.contains(slot3.getAllocationId())); assertTrue(availableSlots.containsTaskManager(resource1)); assertTrue(availableSlots.containsTaskManager(resource2)); availableSlots.removeAllForTaskManager(resource1); assertEquals(1, availableSlots.size()); - assertFalse(availableSlots.contains(slot1.getSlotAllocationId())); - assertFalse(availableSlots.contains(slot2.getSlotAllocationId())); - assertTrue(availableSlots.contains(slot3.getSlotAllocationId())); + assertFalse(availableSlots.contains(slot1.getAllocationId())); + assertFalse(availableSlots.contains(slot2.getAllocationId())); + assertTrue(availableSlots.contains(slot3.getAllocationId())); assertFalse(availableSlots.containsTaskManager(resource1)); assertTrue(availableSlots.containsTaskManager(resource2)); availableSlots.removeAllForTaskManager(resource2); assertEquals(0, availableSlots.size()); - assertFalse(availableSlots.contains(slot1.getSlotAllocationId())); - assertFalse(availableSlots.contains(slot2.getSlotAllocationId())); - assertFalse(availableSlots.contains(slot3.getSlotAllocationId())); + assertFalse(availableSlots.contains(slot1.getAllocationId())); + assertFalse(availableSlots.contains(slot2.getAllocationId())); + assertFalse(availableSlots.contains(slot3.getAllocationId())); assertFalse(availableSlots.containsTaskManager(resource1)); assertFalse(availableSlots.containsTaskManager(resource2)); } @@ -92,7 +93,7 @@ public class AvailableSlotsTest { availableSlots.add(slot1, 1L); assertEquals(1, availableSlots.size()); - assertTrue(availableSlots.contains(slot1.getSlotAllocationId())); + assertTrue(availableSlots.contains(slot1.getAllocationId())); assertTrue(availableSlots.containsTaskManager(resource1)); assertNull(availableSlots.poll(DEFAULT_TESTING_BIG_PROFILE, null)); @@ -100,7 +101,7 @@ public class AvailableSlotsTest { SlotAndLocality slotAndLocality = availableSlots.poll(DEFAULT_TESTING_PROFILE, null); assertEquals(slot1, slotAndLocality.slot()); assertEquals(0, availableSlots.size()); - assertFalse(availableSlots.contains(slot1.getSlotAllocationId())); + assertFalse(availableSlots.contains(slot1.getAllocationId())); assertFalse(availableSlots.containsTaskManager(resource1)); } @@ -112,10 +113,10 @@ public class AvailableSlotsTest { return new AllocatedSlot( new AllocationID(), - new JobID(), mockTaskManagerLocation, 0, DEFAULT_TESTING_PROFILE, - mockTaskManagerGateway); + mockTaskManagerGateway, + new DummySlotOwner()); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceTest.java index 5b85f722bb5f65fd72a9c9b57c8588f786b0c0c1..229237da410740d02562f6004b848746d22af1a9 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceTest.java @@ -18,17 +18,22 @@ package org.apache.flink.runtime.instance; -import static org.junit.Assert.*; - -import java.lang.reflect.Method; -import java.net.InetAddress; - -import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; + import org.junit.Test; +import java.lang.reflect.Method; +import java.net.InetAddress; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + /** * Tests for the {@link Instance} class. */ @@ -53,10 +58,10 @@ public class InstanceTest { assertEquals(4, instance.getNumberOfAvailableSlots()); assertEquals(0, instance.getNumberOfAllocatedSlots()); - SimpleSlot slot1 = instance.allocateSimpleSlot(new JobID()); - SimpleSlot slot2 = instance.allocateSimpleSlot(new JobID()); - SimpleSlot slot3 = instance.allocateSimpleSlot(new JobID()); - SimpleSlot slot4 = instance.allocateSimpleSlot(new JobID()); + SimpleSlot slot1 = instance.allocateSimpleSlot(); + SimpleSlot slot2 = instance.allocateSimpleSlot(); + SimpleSlot slot3 = instance.allocateSimpleSlot(); + SimpleSlot slot4 = instance.allocateSimpleSlot(); assertNotNull(slot1); assertNotNull(slot2); @@ -69,7 +74,7 @@ public class InstanceTest { slot3.getSlotNumber() + slot4.getSlotNumber()); // no more slots - assertNull(instance.allocateSimpleSlot(new JobID())); + assertNull(instance.allocateSimpleSlot()); try { instance.returnAllocatedSlot(slot2); fail("instance accepted a non-cancelled slot."); @@ -118,9 +123,9 @@ public class InstanceTest { assertEquals(3, instance.getNumberOfAvailableSlots()); - SimpleSlot slot1 = instance.allocateSimpleSlot(new JobID()); - SimpleSlot slot2 = instance.allocateSimpleSlot(new JobID()); - SimpleSlot slot3 = instance.allocateSimpleSlot(new JobID()); + SimpleSlot slot1 = instance.allocateSimpleSlot(); + SimpleSlot slot2 = instance.allocateSimpleSlot(); + SimpleSlot slot3 = instance.allocateSimpleSlot(); instance.markDead(); @@ -154,9 +159,9 @@ public class InstanceTest { assertEquals(3, instance.getNumberOfAvailableSlots()); - SimpleSlot slot1 = instance.allocateSimpleSlot(new JobID()); - SimpleSlot slot2 = instance.allocateSimpleSlot(new JobID()); - SimpleSlot slot3 = instance.allocateSimpleSlot(new JobID()); + SimpleSlot slot1 = instance.allocateSimpleSlot(); + SimpleSlot slot2 = instance.allocateSimpleSlot(); + SimpleSlot slot3 = instance.allocateSimpleSlot(); instance.cancelAndReleaseAllSlots(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SharedSlotsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SharedSlotsTest.java index 1e2b6af0aa2292dc290f0a731b290f1deae62f82..5104e48d6a4ee0ec3eaa7af40d33b8b076c7c43c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SharedSlotsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SharedSlotsTest.java @@ -18,7 +18,6 @@ package org.apache.flink.runtime.instance; -import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint; @@ -34,7 +33,12 @@ import org.junit.Test; import java.util.Collections; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; /** * Tests for the allocation, properties, and release of shared slots. @@ -46,7 +50,6 @@ public class SharedSlotsTest extends TestLogger { @Test public void allocateAndReleaseEmptySlot() { try { - JobID jobId = new JobID(); JobVertexID vertexId = new JobVertexID(); SlotSharingGroup sharingGroup = new SlotSharingGroup(vertexId); @@ -62,7 +65,7 @@ public class SharedSlotsTest extends TestLogger { assertEquals(2, instance.getNumberOfAvailableSlots()); // allocate a shared slot - SharedSlot slot = instance.allocateSharedSlot(jobId, assignment); + SharedSlot slot = instance.allocateSharedSlot(assignment); assertEquals(2, instance.getTotalNumberOfSlots()); assertEquals(1, instance.getNumberOfAllocatedSlots()); assertEquals(1, instance.getNumberOfAvailableSlots()); @@ -110,7 +113,6 @@ public class SharedSlotsTest extends TestLogger { @Test public void allocateSimpleSlotsAndReleaseFromRoot() { try { - JobID jobId = new JobID(); JobVertexID vid1 = new JobVertexID(); JobVertexID vid2 = new JobVertexID(); JobVertexID vid3 = new JobVertexID(); @@ -122,7 +124,7 @@ public class SharedSlotsTest extends TestLogger { Instance instance = SchedulerTestUtils.getRandomInstance(1); // allocate a shared slot - SharedSlot sharedSlot = instance.allocateSharedSlot(jobId, assignment); + SharedSlot sharedSlot = instance.allocateSharedSlot(assignment); // allocate a series of sub slots @@ -134,7 +136,6 @@ public class SharedSlotsTest extends TestLogger { assertEquals(1, sub1.getNumberLeaves()); assertEquals(vid1, sub1.getGroupID()); assertEquals(instance.getTaskManagerID(), sub1.getTaskManagerID()); - assertEquals(jobId, sub1.getJobID()); assertEquals(sharedSlot, sub1.getParent()); assertEquals(sharedSlot, sub1.getRoot()); assertEquals(0, sub1.getRootSlotNumber()); @@ -153,7 +154,6 @@ public class SharedSlotsTest extends TestLogger { assertEquals(1, sub2.getNumberLeaves()); assertEquals(vid2, sub2.getGroupID()); assertEquals(instance.getTaskManagerID(), sub2.getTaskManagerID()); - assertEquals(jobId, sub2.getJobID()); assertEquals(sharedSlot, sub2.getParent()); assertEquals(sharedSlot, sub2.getRoot()); assertEquals(0, sub2.getRootSlotNumber()); @@ -172,7 +172,6 @@ public class SharedSlotsTest extends TestLogger { assertEquals(1, sub3.getNumberLeaves()); assertEquals(vid3, sub3.getGroupID()); assertEquals(instance.getTaskManagerID(), sub3.getTaskManagerID()); - assertEquals(jobId, sub3.getJobID()); assertEquals(sharedSlot, sub3.getParent()); assertEquals(sharedSlot, sub3.getRoot()); assertEquals(0, sub3.getRootSlotNumber()); @@ -192,7 +191,6 @@ public class SharedSlotsTest extends TestLogger { assertEquals(1, sub4.getNumberLeaves()); assertEquals(vid4, sub4.getGroupID()); assertEquals(instance.getTaskManagerID(), sub4.getTaskManagerID()); - assertEquals(jobId, sub4.getJobID()); assertEquals(sharedSlot, sub4.getParent()); assertEquals(sharedSlot, sub4.getRoot()); assertEquals(0, sub4.getRootSlotNumber()); @@ -235,7 +233,6 @@ public class SharedSlotsTest extends TestLogger { @Test public void allocateSimpleSlotsAndReleaseFromLeaves() { try { - JobID jobId = new JobID(); JobVertexID vid1 = new JobVertexID(); JobVertexID vid2 = new JobVertexID(); JobVertexID vid3 = new JobVertexID(); @@ -246,7 +243,7 @@ public class SharedSlotsTest extends TestLogger { Instance instance = SchedulerTestUtils.getRandomInstance(1); // allocate a shared slot - SharedSlot sharedSlot = instance.allocateSharedSlot(jobId, assignment); + SharedSlot sharedSlot = instance.allocateSharedSlot(assignment); // allocate a series of sub slots @@ -320,7 +317,6 @@ public class SharedSlotsTest extends TestLogger { @Test public void allocateAndReleaseInMixedOrder() { try { - JobID jobId = new JobID(); JobVertexID vid1 = new JobVertexID(); JobVertexID vid2 = new JobVertexID(); JobVertexID vid3 = new JobVertexID(); @@ -331,7 +327,7 @@ public class SharedSlotsTest extends TestLogger { Instance instance = SchedulerTestUtils.getRandomInstance(1); // allocate a shared slot - SharedSlot sharedSlot = instance.allocateSharedSlot(jobId, assignment); + SharedSlot sharedSlot = instance.allocateSharedSlot(assignment); // allocate a series of sub slots @@ -427,7 +423,7 @@ public class SharedSlotsTest extends TestLogger { Instance instance = SchedulerTestUtils.getRandomInstance(1); // allocate a shared slot - SharedSlot sharedSlot = instance.allocateSharedSlot(new JobID(), assignment); + SharedSlot sharedSlot = instance.allocateSharedSlot(assignment); // get the first simple slot SimpleSlot sourceSlot = assignment.addSharedSlotAndAllocateSubSlot(sharedSlot, Locality.LOCAL, sourceId); @@ -563,7 +559,7 @@ public class SharedSlotsTest extends TestLogger { Instance instance = SchedulerTestUtils.getRandomInstance(1); // allocate a shared slot - SharedSlot sharedSlot = instance.allocateSharedSlot(new JobID(), assignment); + SharedSlot sharedSlot = instance.allocateSharedSlot(assignment); // get the first simple slot SimpleSlot sourceSlot = assignment.addSharedSlotAndAllocateSubSlot(sharedSlot, Locality.LOCAL, sourceId); @@ -607,7 +603,6 @@ public class SharedSlotsTest extends TestLogger { @Test public void testImmediateReleaseOneLevel() { try { - JobID jobId = new JobID(); JobVertexID vid = new JobVertexID(); SlotSharingGroup sharingGroup = new SlotSharingGroup(vid); @@ -615,7 +610,7 @@ public class SharedSlotsTest extends TestLogger { Instance instance = SchedulerTestUtils.getRandomInstance(1); - SharedSlot sharedSlot = instance.allocateSharedSlot(jobId, assignment); + SharedSlot sharedSlot = instance.allocateSharedSlot(assignment); SimpleSlot sub = assignment.addSharedSlotAndAllocateSubSlot(sharedSlot, Locality.UNCONSTRAINED, vid); sub.releaseInstanceSlot(); @@ -635,7 +630,6 @@ public class SharedSlotsTest extends TestLogger { @Test public void testImmediateReleaseTwoLevel() { try { - JobID jobId = new JobID(); JobVertexID vid = new JobVertexID(); JobVertex vertex = new JobVertex("vertex", vid); @@ -647,7 +641,7 @@ public class SharedSlotsTest extends TestLogger { Instance instance = SchedulerTestUtils.getRandomInstance(1); - SharedSlot sharedSlot = instance.allocateSharedSlot(jobId, assignment); + SharedSlot sharedSlot = instance.allocateSharedSlot(assignment); SimpleSlot sub = assignment.addSharedSlotAndAllocateSubSlot(sharedSlot, Locality.UNCONSTRAINED, constraint); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SimpleSlotTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SimpleSlotTest.java index 42cbbbf03cbd4b4313c5beb8ba7aede5091addca..6d572ad3dd61f9e00aaa8c6823ca5e3d0c19126a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SimpleSlotTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SimpleSlotTest.java @@ -18,7 +18,6 @@ package org.apache.flink.runtime.instance; -import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; @@ -34,7 +33,7 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -public class SimpleSlotTest extends TestLogger { +public class SimpleSlotTest extends TestLogger { @Test public void testStateTransitions() { @@ -137,6 +136,6 @@ public class SimpleSlotTest extends TestLogger { hardwareDescription, 1); - return instance.allocateSimpleSlot(new JobID()); + return instance.allocateSimpleSlot(); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java index 8875e000c7deff28bf74b82d67f202767c701b73..5d82f47c30225873fd40deec3ea20288e9ebffe4 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java @@ -23,11 +23,11 @@ import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.clusterframework.types.AllocationID; -import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway; import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException; import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit; import org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestUtils; -import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot; +import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; import org.apache.flink.runtime.jobmaster.JobMasterId; import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; @@ -36,6 +36,9 @@ import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGate import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.runtime.rpc.RpcUtils; import org.apache.flink.runtime.rpc.akka.AkkaRpcService; +import org.apache.flink.runtime.taskexecutor.slot.SlotOffer; +import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.runtime.util.clock.Clock; import org.apache.flink.runtime.util.clock.SystemClock; @@ -158,7 +161,7 @@ public class SlotPoolRpcTest extends TestLogger { assertEquals(1L, (long) pool.getNumberOfWaitingForResourceRequests().get()); - slotPoolGateway.cancelSlotAllocation(requestId).get(); + slotPoolGateway.cancelSlotRequest(requestId).get(); assertEquals(0L, (long) pool.getNumberOfWaitingForResourceRequests().get()); } finally { @@ -202,7 +205,7 @@ public class SlotPoolRpcTest extends TestLogger { assertEquals(1L, (long) pool.getNumberOfPendingRequests().get()); - slotPoolGateway.cancelSlotAllocation(requestId).get(); + slotPoolGateway.cancelSlotRequest(requestId).get(); assertEquals(0L, (long) pool.getNumberOfPendingRequests().get()); } finally { RpcUtils.terminateRpcEndpoint(pool, timeout); @@ -252,17 +255,22 @@ public class SlotPoolRpcTest extends TestLogger { } AllocationID allocationId = allocationIdFuture.get(); - ResourceID resourceID = ResourceID.generate(); - AllocatedSlot allocatedSlot = SlotPoolTest.createAllocatedSlot(resourceID, allocationId, jid, DEFAULT_TESTING_PROFILE); - slotPoolGateway.registerTaskManager(resourceID).get(); + final SlotOffer slotOffer = new SlotOffer( + allocationId, + 0, + DEFAULT_TESTING_PROFILE); + final TaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation(); + final TaskManagerGateway taskManagerGateway = new SimpleAckingTaskManagerGateway(); - assertTrue(slotPoolGateway.offerSlot(allocatedSlot).get()); + slotPoolGateway.registerTaskManager(taskManagerLocation.getResourceID()).get(); + + assertTrue(slotPoolGateway.offerSlot(taskManagerLocation, taskManagerGateway, slotOffer).get()); assertEquals(0L, (long) pool.getNumberOfPendingRequests().get()); assertTrue(pool.containsAllocatedSlot(allocationId).get()); - pool.cancelSlotAllocation(requestId).get(); + pool.cancelSlotRequest(requestId).get(); assertFalse(pool.containsAllocatedSlot(allocationId).get()); assertTrue(pool.containsAvailableSlot(allocationId).get()); @@ -351,14 +359,14 @@ public class SlotPoolRpcTest extends TestLogger { } @Override - public CompletableFuture cancelSlotAllocation(SlotRequestID slotRequestId) { + public CompletableFuture cancelSlotRequest(SlotRequestID slotRequestId) { final Consumer currentCancelSlotAllocationConsumer = cancelSlotAllocationConsumer; if (currentCancelSlotAllocationConsumer != null) { currentCancelSlotAllocationConsumer.accept(slotRequestId); } - return super.cancelSlotAllocation(slotRequestId); + return super.cancelSlotRequest(slotRequestId); } CompletableFuture containsAllocatedSlot(AllocationID allocationId) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java index 450d3771621f546215c9cdce9665694353955e21..9d90a122980eae631153baa6d431973937f0777c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java @@ -21,11 +21,11 @@ package org.apache.flink.runtime.instance; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.clusterframework.types.AllocationID; -import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; import org.apache.flink.runtime.executiongraph.Execution; +import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway; import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit; -import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot; +import org.apache.flink.runtime.jobmanager.slots.SlotContext; import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; import org.apache.flink.runtime.jobmaster.JobMasterId; import org.apache.flink.runtime.messages.Acknowledge; @@ -35,6 +35,8 @@ import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGate import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.runtime.rpc.RpcUtils; import org.apache.flink.runtime.rpc.TestingRpcService; +import org.apache.flink.runtime.taskexecutor.slot.SlotOffer; +import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.util.FlinkException; import org.apache.flink.util.TestLogger; @@ -74,10 +76,17 @@ public class SlotPoolTest extends TestLogger { private JobID jobId; + private TaskManagerLocation taskManagerLocation; + + private TaskManagerGateway taskManagerGateway; + @Before public void setUp() throws Exception { this.rpcService = new TestingRpcService(); this.jobId = new JobID(); + + taskManagerLocation = new LocalTaskManagerLocation(); + taskManagerGateway = new SimpleAckingTaskManagerGateway(); } @After @@ -92,8 +101,7 @@ public class SlotPoolTest extends TestLogger { try { SlotPoolGateway slotPoolGateway = setupSlotPool(slotPool, resourceManagerGateway); - ResourceID resourceID = new ResourceID("resource"); - slotPoolGateway.registerTaskManager(resourceID); + slotPoolGateway.registerTaskManager(taskManagerLocation.getResourceID()); SlotPoolGateway.SlotRequestID requestId = new SlotPoolGateway.SlotRequestID(); CompletableFuture future = slotPoolGateway.allocateSlot(requestId, mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout); @@ -104,14 +112,17 @@ public class SlotPoolTest extends TestLogger { final SlotRequest slotRequest = slotRequestArgumentCaptor.getValue(); - AllocatedSlot allocatedSlot = createAllocatedSlot(resourceID, slotRequest.getAllocationId(), jobId, DEFAULT_TESTING_PROFILE); - assertTrue(slotPoolGateway.offerSlot(allocatedSlot).get()); + final SlotOffer slotOffer = new SlotOffer( + slotRequest.getAllocationId(), + 0, + DEFAULT_TESTING_PROFILE); + + assertTrue(slotPoolGateway.offerSlot(taskManagerLocation, taskManagerGateway, slotOffer).get()); LogicalSlot slot = future.get(1, TimeUnit.SECONDS); assertTrue(future.isDone()); assertTrue(slot.isAlive()); - assertEquals(resourceID, slot.getTaskManagerLocation().getResourceID()); - assertEquals(slotPool.getAllocatedSlots().get(slot.getAllocationId()), slot); + assertEquals(taskManagerLocation, slot.getTaskManagerLocation()); } finally { slotPool.shutDown(); } @@ -124,8 +135,7 @@ public class SlotPoolTest extends TestLogger { try { SlotPoolGateway slotPoolGateway = setupSlotPool(slotPool, resourceManagerGateway); - ResourceID resourceID = new ResourceID("resource"); - slotPool.registerTaskManager(resourceID); + slotPool.registerTaskManager(taskManagerLocation.getResourceID()); CompletableFuture future1 = slotPoolGateway.allocateSlot(new SlotPoolGateway.SlotRequestID(), mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout); CompletableFuture future2 = slotPoolGateway.allocateSlot(new SlotPoolGateway.SlotRequestID(), mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout); @@ -139,8 +149,12 @@ public class SlotPoolTest extends TestLogger { final List slotRequests = slotRequestArgumentCaptor.getAllValues(); - AllocatedSlot allocatedSlot = createAllocatedSlot(resourceID, slotRequests.get(0).getAllocationId(), jobId, DEFAULT_TESTING_PROFILE); - assertTrue(slotPoolGateway.offerSlot(allocatedSlot).get()); + final SlotOffer slotOffer = new SlotOffer( + slotRequests.get(0).getAllocationId(), + 0, + DEFAULT_TESTING_PROFILE); + + assertTrue(slotPoolGateway.offerSlot(taskManagerLocation, taskManagerGateway, slotOffer).get()); LogicalSlot slot1 = future1.get(1, TimeUnit.SECONDS); assertTrue(future1.isDone()); @@ -158,7 +172,7 @@ public class SlotPoolTest extends TestLogger { assertTrue(slot2.isAlive()); assertEquals(slot1.getTaskManagerLocation(), slot2.getTaskManagerLocation()); assertEquals(slot1.getPhysicalSlotNumber(), slot2.getPhysicalSlotNumber()); - assertEquals(slotPool.getAllocatedSlots().get(slot1.getAllocationId()), slot2); + assertEquals(slot1.getAllocationId(), slot2.getAllocationId()); } finally { slotPool.shutDown(); } @@ -171,8 +185,7 @@ public class SlotPoolTest extends TestLogger { try { SlotPoolGateway slotPoolGateway = setupSlotPool(slotPool, resourceManagerGateway); - ResourceID resourceID = new ResourceID("resource"); - slotPoolGateway.registerTaskManager(resourceID); + slotPoolGateway.registerTaskManager(taskManagerLocation.getResourceID()); CompletableFuture future1 = slotPoolGateway.allocateSlot(new SlotPoolGateway.SlotRequestID(), mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout); assertFalse(future1.isDone()); @@ -182,8 +195,12 @@ public class SlotPoolTest extends TestLogger { final SlotRequest slotRequest = slotRequestArgumentCaptor.getValue(); - AllocatedSlot allocatedSlot = createAllocatedSlot(resourceID, slotRequest.getAllocationId(), jobId, DEFAULT_TESTING_PROFILE); - assertTrue(slotPoolGateway.offerSlot(allocatedSlot).get()); + final SlotOffer slotOffer = new SlotOffer( + slotRequest.getAllocationId(), + 0, + DEFAULT_TESTING_PROFILE); + + assertTrue(slotPoolGateway.offerSlot(taskManagerLocation, taskManagerGateway, slotOffer).get()); LogicalSlot slot1 = future1.get(1, TimeUnit.SECONDS); assertTrue(future1.isDone()); @@ -214,8 +231,7 @@ public class SlotPoolTest extends TestLogger { try { SlotPoolGateway slotPoolGateway = setupSlotPool(slotPool, resourceManagerGateway); - ResourceID resourceID = new ResourceID("resource"); - slotPoolGateway.registerTaskManager(resourceID); + slotPoolGateway.registerTaskManager(taskManagerLocation.getResourceID()); CompletableFuture future = slotPoolGateway.allocateSlot(new SlotPoolGateway.SlotRequestID(), mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout); assertFalse(future.isDone()); @@ -225,29 +241,36 @@ public class SlotPoolTest extends TestLogger { final SlotRequest slotRequest = slotRequestArgumentCaptor.getValue(); + final SlotOffer slotOffer = new SlotOffer( + slotRequest.getAllocationId(), + 0, + DEFAULT_TESTING_PROFILE); + + final TaskManagerLocation invalidTaskManagerLocation = new LocalTaskManagerLocation(); + // slot from unregistered resource - AllocatedSlot invalid = createAllocatedSlot(new ResourceID("unregistered"), slotRequest.getAllocationId(), jobId, DEFAULT_TESTING_PROFILE); - assertFalse(slotPoolGateway.offerSlot(invalid).get()); + assertFalse(slotPoolGateway.offerSlot(invalidTaskManagerLocation, taskManagerGateway, slotOffer).get()); - AllocatedSlot notRequested = createAllocatedSlot(resourceID, new AllocationID(), jobId, DEFAULT_TESTING_PROFILE); + final SlotOffer nonRequestedSlotOffer = new SlotOffer( + new AllocationID(), + 0, + DEFAULT_TESTING_PROFILE); // we'll also accept non requested slots - assertTrue(slotPoolGateway.offerSlot(notRequested).get()); - - AllocatedSlot allocatedSlot = createAllocatedSlot(resourceID, slotRequest.getAllocationId(), jobId, DEFAULT_TESTING_PROFILE); + assertTrue(slotPoolGateway.offerSlot(taskManagerLocation, taskManagerGateway, nonRequestedSlotOffer).get()); // accepted slot - assertTrue(slotPoolGateway.offerSlot(allocatedSlot).get()); + assertTrue(slotPoolGateway.offerSlot(taskManagerLocation, taskManagerGateway, slotOffer).get()); LogicalSlot slot = future.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); assertTrue(slot.isAlive()); // duplicated offer with using slot - assertTrue(slotPoolGateway.offerSlot(allocatedSlot).get()); + assertTrue(slotPoolGateway.offerSlot(taskManagerLocation, taskManagerGateway, slotOffer).get()); assertTrue(slot.isAlive()); // duplicated offer with free slot slot.releaseSlot(); - assertTrue(slotPoolGateway.offerSlot(allocatedSlot).get()); + assertTrue(slotPoolGateway.offerSlot(taskManagerLocation, taskManagerGateway, slotOffer).get()); } finally { slotPool.shutDown(); } @@ -261,8 +284,8 @@ public class SlotPoolTest extends TestLogger { final SlotPool slotPool = new SlotPool(rpcService, jobId) { @Override - public void returnAllocatedSlot(Slot slot) { - super.returnAllocatedSlot(slot); + public void returnAllocatedSlot(SlotContext allocatedSlot) { + super.returnAllocatedSlot(allocatedSlot); slotReturnFuture.complete(true); } @@ -270,8 +293,7 @@ public class SlotPoolTest extends TestLogger { try { SlotPoolGateway slotPoolGateway = setupSlotPool(slotPool, resourceManagerGateway); - ResourceID resourceID = new ResourceID("resource"); - slotPoolGateway.registerTaskManager(resourceID); + slotPoolGateway.registerTaskManager(taskManagerLocation.getResourceID()); CompletableFuture future1 = slotPoolGateway.allocateSlot(new SlotPoolGateway.SlotRequestID(), mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout); @@ -282,14 +304,18 @@ public class SlotPoolTest extends TestLogger { CompletableFuture future2 = slotPoolGateway.allocateSlot(new SlotPoolGateway.SlotRequestID(), mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout); - AllocatedSlot allocatedSlot = createAllocatedSlot(resourceID, slotRequest.getAllocationId(), jobId, DEFAULT_TESTING_PROFILE); - assertTrue(slotPoolGateway.offerSlot(allocatedSlot).get()); + final SlotOffer slotOffer = new SlotOffer( + slotRequest.getAllocationId(), + 0, + DEFAULT_TESTING_PROFILE); + + assertTrue(slotPoolGateway.offerSlot(taskManagerLocation, taskManagerGateway, slotOffer).get()); LogicalSlot slot1 = future1.get(1, TimeUnit.SECONDS); assertTrue(future1.isDone()); assertFalse(future2.isDone()); - slotPoolGateway.releaseTaskManager(resourceID); + slotPoolGateway.releaseTaskManager(taskManagerLocation.getResourceID()); // wait until the slot has been returned slotReturnFuture.get(); @@ -378,24 +404,4 @@ public class SlotPoolTest extends TestLogger { return slotPool.getSelfGateway(SlotPoolGateway.class); } - - static AllocatedSlot createAllocatedSlot( - final ResourceID resourceId, - final AllocationID allocationId, - final JobID jobId, - final ResourceProfile resourceProfile) { - TaskManagerLocation mockTaskManagerLocation = mock(TaskManagerLocation.class); - when(mockTaskManagerLocation.getResourceID()).thenReturn(resourceId); - - TaskManagerGateway mockTaskManagerGateway = mock(TaskManagerGateway.class); - - return new AllocatedSlot( - allocationId, - jobId, - mockTaskManagerLocation, - 0, - resourceProfile, - mockTaskManagerGateway); - } - } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignmentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignmentTest.java index dca47d306fb43e1e778434e74624669d971f2942..28cab725a7a408316a468ced9f66598781ddd45d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignmentTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignmentTest.java @@ -18,7 +18,6 @@ package org.apache.flink.runtime.instance; -import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobmanager.scheduler.Locality; @@ -49,14 +48,12 @@ public class SlotSharingGroupAssignmentTest extends TestLogger { final int numberSlots = 2; final JobVertexID sourceId = new JobVertexID(); final JobVertexID sinkId = new JobVertexID(); - final JobID jobId = new JobID(); for (int i = 0; i < numberTaskManagers; i++) { final TaskManagerLocation taskManagerLocation = new TaskManagerLocation(ResourceID.generate(), InetAddress.getLocalHost(), i + 1000); for (int j = 0; j < numberSlots; j++) { final SharedSlot slot = new SharedSlot( - jobId, mock(SlotOwner.class), taskManagerLocation, j, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraintTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraintTest.java index 3f267ac80299f190d86a1542ca60ac085fc42204..d40ff6173c1a10576bcfcdb7d16876b493bd37d1 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraintTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraintTest.java @@ -90,10 +90,10 @@ public class CoLocationConstraintTest { Instance instance1 = SchedulerTestUtils.getRandomInstance(2); Instance instance2 = SchedulerTestUtils.getRandomInstance(2); - SharedSlot slot1_1 = instance1.allocateSharedSlot(jid, assignment); - SharedSlot slot1_2 = instance1.allocateSharedSlot(jid, assignment); - SharedSlot slot2_1 = instance2.allocateSharedSlot(jid, assignment); - SharedSlot slot2_2 = instance2.allocateSharedSlot(jid, assignment); + SharedSlot slot1_1 = instance1.allocateSharedSlot(assignment); + SharedSlot slot1_2 = instance1.allocateSharedSlot(assignment); + SharedSlot slot2_1 = instance2.allocateSharedSlot(assignment); + SharedSlot slot2_2 = instance2.allocateSharedSlot(assignment); // constraint is still completely unassigned assertFalse(constraint.isAssigned()); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/slots/DummySlotOwner.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/slots/DummySlotOwner.java new file mode 100644 index 0000000000000000000000000000000000000000..689454265b4d16fbaba7c3e3a5416ea8765c047d --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/slots/DummySlotOwner.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmanager.slots; + +import org.apache.flink.runtime.instance.Slot; + +import java.util.concurrent.CompletableFuture; + +/** + * SlotOwner implementation used for testing purposes only. + */ +public class DummySlotOwner implements SlotOwner { + @Override + public CompletableFuture returnAllocatedSlot(Slot slot) { + return CompletableFuture.completedFuture(false); + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java index 9c12fffc3359925a44f6322e07777d981ac3359b..01f445b8cc4f88b1c12ad2f8ebdb65545331c7c9 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java @@ -58,12 +58,14 @@ import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.runtime.util.TestingFatalErrorHandler; import org.apache.flink.util.TestLogger; + import org.hamcrest.Matchers; import org.junit.Test; import org.mockito.Mockito; import java.net.InetAddress; import java.util.Arrays; +import java.util.Collection; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ScheduledExecutorService; @@ -169,7 +171,7 @@ public class TaskExecutorITCase extends TestLogger { when(jmGateway.getHostname()).thenReturn(jmAddress); when(jmGateway.offerSlots( eq(taskManagerResourceId), - any(Iterable.class), + any(Collection.class), any(Time.class))).thenReturn(mock(CompletableFuture.class, RETURNS_MOCKS)); when(jmGateway.getFencingToken()).thenReturn(jobMasterId); @@ -214,7 +216,7 @@ public class TaskExecutorITCase extends TestLogger { verify(jmGateway, Mockito.timeout(timeout.toMilliseconds())).offerSlots( eq(taskManagerResourceId), - (Iterable)argThat(Matchers.contains(slotOffer)), + (Collection)argThat(Matchers.contains(slotOffer)), any(Time.class)); } finally { if (testingFatalErrorHandler.hasExceptionOccurred()) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java index 6372792d688701e62de32a743fd40f5da83c846f..29d07fa71607b4e79c9b50811895133434db077a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java @@ -852,7 +852,7 @@ public class TaskExecutorTest extends TestLogger { when(jobMasterGateway.getHostname()).thenReturn(jobManagerAddress); when(jobMasterGateway.offerSlots( any(ResourceID.class), - any(Iterable.class), + any(Collection.class), any(Time.class))).thenReturn(mock(CompletableFuture.class, RETURNS_MOCKS)); rpc.registerGateway(resourceManagerAddress, resourceManagerGateway); @@ -904,7 +904,7 @@ public class TaskExecutorTest extends TestLogger { // the job leader should get the allocation id offered verify(jobMasterGateway, Mockito.timeout(timeout.toMilliseconds())).offerSlots( any(ResourceID.class), - (Iterable)Matchers.argThat(contains(slotOffer)), + (Collection)Matchers.argThat(contains(slotOffer)), any(Time.class)); // check if a concurrent error occurred @@ -975,7 +975,7 @@ public class TaskExecutorTest extends TestLogger { when(jobMasterGateway.getHostname()).thenReturn(jobManagerAddress); when(jobMasterGateway.offerSlots( - any(ResourceID.class), any(Iterable.class), any(Time.class))) + any(ResourceID.class), any(Collection.class), any(Time.class))) .thenReturn(CompletableFuture.completedFuture((Collection)Collections.singleton(offer1))); rpc.registerGateway(resourceManagerAddress, resourceManagerGateway); @@ -1315,7 +1315,7 @@ public class TaskExecutorTest extends TestLogger { when( jobMasterGateway.offerSlots( any(ResourceID.class), - any(Iterable.class), + any(Collection.class), any(Time.class))) .thenReturn(offerResultFuture); @@ -1323,7 +1323,7 @@ public class TaskExecutorTest extends TestLogger { // been properly started. This will also offer the slots to the job master jobLeaderService.addJob(jobId, jobManagerAddress); - verify(jobMasterGateway, Mockito.timeout(timeout.toMilliseconds())).offerSlots(any(ResourceID.class), any(Iterable.class), any(Time.class)); + verify(jobMasterGateway, Mockito.timeout(timeout.toMilliseconds())).offerSlots(any(ResourceID.class), any(Collection.class), any(Time.class)); // submit the task without having acknowledge the offered slots tmGateway.submitTask(tdd, jobMasterId, timeout);