[FLINK-8087] Decouple Slot from AllocatedSlot

This commit introduces the SlotContext which is an abstraction for the SimpleSlot
to obtain the relevant slot information to do the communication with the
TaskManager without relying on the AllocatedSlot which is now only used by the
SlotPool.

This closes #5088.
上级 627bcda6
......@@ -16,14 +16,20 @@
* limitations under the License.
*/
package org.apache.flink.runtime.jobmanager.slots;
package org.apache.flink.runtime.instance;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.jobmanager.scheduler.Locality;
import org.apache.flink.runtime.jobmanager.slots.SlotContext;
import org.apache.flink.runtime.jobmanager.slots.SlotException;
import org.apache.flink.runtime.jobmanager.slots.SlotOwner;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import java.util.concurrent.atomic.AtomicReference;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
......@@ -38,14 +44,11 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
* an AllocatedSlot was allocated to the JobManager as soon as the TaskManager registered at the
* JobManager. All slots had a default unknown resource profile.
*/
public class AllocatedSlot {
public class AllocatedSlot implements SlotContext {
/** The ID under which the slot is allocated. Uniquely identifies the slot. */
private final AllocationID slotAllocationId;
/** The ID of the job this slot is allocated for */
private final JobID jobID;
/** The location information of the TaskManager to which this slot belongs */
private final TaskManagerLocation taskManagerLocation;
......@@ -56,23 +59,29 @@ public class AllocatedSlot {
private final TaskManagerGateway taskManagerGateway;
/** The number of the slot on the TaskManager to which slot belongs. Purely informational. */
private final int slotNumber;
private final int physicalSlotNumber;
private final SlotOwner slotOwner;
private final AtomicReference<LogicalSlot> logicalSlotReference;
// ------------------------------------------------------------------------
public AllocatedSlot(
AllocationID slotAllocationId,
JobID jobID,
TaskManagerLocation location,
int slotNumber,
int physicalSlotNumber,
ResourceProfile resourceProfile,
TaskManagerGateway taskManagerGateway) {
TaskManagerGateway taskManagerGateway,
SlotOwner slotOwner) {
this.slotAllocationId = checkNotNull(slotAllocationId);
this.jobID = checkNotNull(jobID);
this.taskManagerLocation = checkNotNull(location);
this.slotNumber = slotNumber;
this.physicalSlotNumber = physicalSlotNumber;
this.resourceProfile = checkNotNull(resourceProfile);
this.taskManagerGateway = checkNotNull(taskManagerGateway);
this.slotOwner = checkNotNull(slotOwner);
logicalSlotReference = new AtomicReference<>(null);
}
// ------------------------------------------------------------------------
......@@ -82,7 +91,7 @@ public class AllocatedSlot {
*
* @return The ID under which the slot is allocated
*/
public AllocationID getSlotAllocationId() {
public AllocationID getAllocationId() {
return slotAllocationId;
}
......@@ -97,22 +106,13 @@ public class AllocatedSlot {
return getTaskManagerLocation().getResourceID();
}
/**
* Returns the ID of the job this allocated slot belongs to.
*
* @return the ID of the job this allocated slot belongs to
*/
public JobID getJobID() {
return jobID;
}
/**
* Gets the number of the slot.
*
* @return The number of the slot on the TaskManager.
*/
public int getSlotNumber() {
return slotNumber;
public int getPhysicalSlotNumber() {
return physicalSlotNumber;
}
/**
......@@ -144,6 +144,78 @@ public class AllocatedSlot {
return taskManagerGateway;
}
/**
* Triggers the release of the logical slot.
*/
public void triggerLogicalSlotRelease() {
final LogicalSlot logicalSlot = logicalSlotReference.get();
if (logicalSlot != null) {
logicalSlot.releaseSlot();
}
}
/**
* Releases the logical slot.
*
* @return true if the logical slot could be released, false otherwise.
*/
public boolean releaseLogicalSlot() {
final LogicalSlot logicalSlot = logicalSlotReference.get();
if (logicalSlot != null) {
if (logicalSlot instanceof Slot) {
final Slot slot = (Slot) logicalSlot;
if (slot.markReleased()) {
logicalSlotReference.set(null);
return true;
}
} else {
throw new RuntimeException("Unsupported logical slot type encountered " + logicalSlot.getClass());
}
}
return false;
}
/**
* Allocates a logical {@link SimpleSlot}.
*
* @return an allocated logical simple slot
* @throws SlotException if we could not allocate a simple slot
*/
public SimpleSlot allocateSimpleSlot(Locality locality) throws SlotException {
final SimpleSlot simpleSlot = new SimpleSlot(this, slotOwner, physicalSlotNumber);
if (logicalSlotReference.compareAndSet(null, simpleSlot)) {
simpleSlot.setLocality(locality);
return simpleSlot;
} else {
throw new SlotException("Could not allocate logical simple slot because the allocated slot is already used.");
}
}
/**
* Allocates a logical {@link SharedSlot}.
*
* @param slotSharingGroupAssignment the slot sharing group to which the shared slot shall belong
* @return an allocated logical shared slot
* @throws SlotException if we could not allocate a shared slot
*/
public SharedSlot allocateSharedSlot(SlotSharingGroupAssignment slotSharingGroupAssignment) throws SlotException {
final SharedSlot sharedSlot = new SharedSlot(this, slotOwner, slotSharingGroupAssignment);
if (logicalSlotReference.compareAndSet(null, sharedSlot)) {
return sharedSlot;
} else {
throw new SlotException("Could not allocate logical shared slot because the allocated slot is already used.");
}
}
// ------------------------------------------------------------------------
/**
......@@ -164,6 +236,6 @@ public class AllocatedSlot {
@Override
public String toString() {
return "AllocatedSlot " + slotAllocationId + " @ " + taskManagerLocation + " - " + slotNumber;
return "AllocatedSlot " + slotAllocationId + " @ " + taskManagerLocation + " - " + physicalSlotNumber;
}
}
......@@ -18,7 +18,6 @@
package org.apache.flink.runtime.instance;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.jobmanager.scheduler.SlotAvailabilityListener;
import org.apache.flink.runtime.jobmanager.slots.SlotOwner;
......@@ -210,19 +209,13 @@ public class Instance implements SlotOwner {
* Allocates a simple slot on this TaskManager instance. This method returns {@code null}, if no slot
* is available at the moment.
*
* @param jobID The ID of the job that the slot is allocated for.
*
* @return A simple slot that represents a task slot on this TaskManager instance, or null, if the
* TaskManager instance has no more slots available.
*
* @throws InstanceDiedException Thrown if the instance is no longer alive by the time the
* slot is allocated.
*/
public SimpleSlot allocateSimpleSlot(JobID jobID) throws InstanceDiedException {
if (jobID == null) {
throw new IllegalArgumentException();
}
public SimpleSlot allocateSimpleSlot() throws InstanceDiedException {
synchronized (instanceLock) {
if (isDead) {
throw new InstanceDiedException(this);
......@@ -233,7 +226,7 @@ public class Instance implements SlotOwner {
return null;
}
else {
SimpleSlot slot = new SimpleSlot(jobID, this, location, nextSlot, taskManagerGateway);
SimpleSlot slot = new SimpleSlot(this, location, nextSlot, taskManagerGateway);
allocatedSlots.add(slot);
return slot;
}
......@@ -244,7 +237,6 @@ public class Instance implements SlotOwner {
* Allocates a shared slot on this TaskManager instance. This method returns {@code null}, if no slot
* is available at the moment. The shared slot will be managed by the given SlotSharingGroupAssignment.
*
* @param jobID The ID of the job that the slot is allocated for.
* @param sharingGroupAssignment The assignment group that manages this shared slot.
*
* @return A shared slot that represents a task slot on this TaskManager instance and can hold other
......@@ -252,13 +244,8 @@ public class Instance implements SlotOwner {
*
* @throws InstanceDiedException Thrown if the instance is no longer alive by the time the slot is allocated.
*/
public SharedSlot allocateSharedSlot(JobID jobID, SlotSharingGroupAssignment sharingGroupAssignment)
throws InstanceDiedException
{
// the slot needs to be in the returned to taskManager state
if (jobID == null) {
throw new IllegalArgumentException();
}
public SharedSlot allocateSharedSlot(SlotSharingGroupAssignment sharingGroupAssignment)
throws InstanceDiedException {
synchronized (instanceLock) {
if (isDead) {
......@@ -271,7 +258,11 @@ public class Instance implements SlotOwner {
}
else {
SharedSlot slot = new SharedSlot(
jobID, this, location, nextSlot, taskManagerGateway, sharingGroupAssignment);
this,
location,
nextSlot,
taskManagerGateway,
sharingGroupAssignment);
allocatedSlots.add(slot);
return slot;
}
......
......@@ -18,17 +18,20 @@
package org.apache.flink.runtime.instance;
import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.jobmanager.slots.SlotContext;
import org.apache.flink.runtime.jobmanager.slots.SlotOwner;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.util.AbstractID;
import org.apache.flink.api.common.JobID;
import org.apache.flink.util.FlinkException;
import javax.annotation.Nullable;
import java.util.ConcurrentModificationException;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import static org.apache.flink.util.Preconditions.checkNotNull;
......@@ -44,7 +47,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
* passed through a {@link SlotSharingGroupAssignment} object which is responsible for
* synchronization.
*/
public class SharedSlot extends Slot {
public class SharedSlot extends Slot implements LogicalSlot {
/** The assignment group os shared slots that manages the availability and release of the slots */
private final SlotSharingGroupAssignment assignmentGroup;
......@@ -52,6 +55,8 @@ public class SharedSlot extends Slot {
/** The set os sub-slots allocated from this shared slot */
private final Set<Slot> subSlots;
private final CompletableFuture<?> cancellationFuture = new CompletableFuture<>();
// ------------------------------------------------------------------------
// Old Constructors (prior FLIP-6)
// ------------------------------------------------------------------------
......@@ -60,7 +65,6 @@ public class SharedSlot extends Slot {
* Creates a new shared slot that has no parent (is a root slot) and does not belong to any task group.
* This constructor is used to create a slot directly from an instance.
*
* @param jobID The ID of the job that the slot is created for.
* @param owner The component from which this slot is allocated.
* @param location The location info of the TaskManager where the slot was allocated from
* @param slotNumber The number of the slot.
......@@ -68,18 +72,17 @@ public class SharedSlot extends Slot {
* @param assignmentGroup The assignment group that this shared slot belongs to.
*/
public SharedSlot(
JobID jobID, SlotOwner owner, TaskManagerLocation location, int slotNumber,
SlotOwner owner, TaskManagerLocation location, int slotNumber,
TaskManagerGateway taskManagerGateway,
SlotSharingGroupAssignment assignmentGroup) {
this(jobID, owner, location, slotNumber, taskManagerGateway, assignmentGroup, null, null);
this(owner, location, slotNumber, taskManagerGateway, assignmentGroup, null, null);
}
/**
* Creates a new shared slot that has is a sub-slot of the given parent shared slot, and that belongs
* to the given task group.
*
* @param jobID The ID of the job that the slot is created for.
* @param owner The component from which this slot is allocated.
* @param location The location info of the TaskManager where the slot was allocated from
* @param slotNumber The number of the slot.
......@@ -89,7 +92,6 @@ public class SharedSlot extends Slot {
* @param groupId The assignment group of this slot.
*/
public SharedSlot(
JobID jobID,
SlotOwner owner,
TaskManagerLocation location,
int slotNumber,
......@@ -98,7 +100,7 @@ public class SharedSlot extends Slot {
@Nullable SharedSlot parent,
@Nullable AbstractID groupId) {
super(jobID, owner, location, slotNumber, taskManagerGateway, parent, groupId);
super(owner, location, slotNumber, taskManagerGateway, parent, groupId);
this.assignmentGroup = checkNotNull(assignmentGroup);
this.subSlots = new HashSet<Slot>();
......@@ -112,38 +114,23 @@ public class SharedSlot extends Slot {
* Creates a new shared slot that has no parent (is a root slot) and does not belong to any task group.
* This constructor is used to create a slot directly from an instance.
*
* @param allocatedSlot The allocated slot that this slot represents.
* @param slotContext The slot context of this shared slot
* @param owner The component from which this slot is allocated.
* @param assignmentGroup The assignment group that this shared slot belongs to.
*/
public SharedSlot(AllocatedSlot allocatedSlot, SlotOwner owner, SlotSharingGroupAssignment assignmentGroup) {
this(allocatedSlot, owner, allocatedSlot.getSlotNumber(), assignmentGroup, null, null);
}
/**
* Creates a new shared slot that is a sub-slot of the given parent shared slot, and that belongs
* to the given task group.
*
* @param parent The parent slot of this slot.
* @param owner The component from which this slot is allocated.
* @param slotNumber The number of the slot.
* @param assignmentGroup The assignment group that this shared slot belongs to.
* @param groupId The assignment group of this slot.
*/
public SharedSlot(
SharedSlot parent, SlotOwner owner, int slotNumber,
SlotSharingGroupAssignment assignmentGroup,
AbstractID groupId) {
this(parent.getAllocatedSlot(), owner, slotNumber, assignmentGroup, parent, groupId);
public SharedSlot(SlotContext slotContext, SlotOwner owner, SlotSharingGroupAssignment assignmentGroup) {
this(slotContext, owner, slotContext.getPhysicalSlotNumber(), assignmentGroup, null, null);
}
private SharedSlot(
AllocatedSlot allocatedSlot, SlotOwner owner, int slotNumber,
SlotContext slotInformation,
SlotOwner owner,
int slotNumber,
SlotSharingGroupAssignment assignmentGroup,
@Nullable SharedSlot parent, @Nullable AbstractID groupId) {
@Nullable SharedSlot parent,
@Nullable AbstractID groupId) {
super(allocatedSlot, owner, slotNumber, parent, groupId);
super(slotInformation, owner, slotNumber, parent, groupId);
this.assignmentGroup = checkNotNull(assignmentGroup);
this.subSlots = new HashSet<Slot>();
......@@ -186,14 +173,44 @@ public class SharedSlot extends Slot {
public boolean hasChildren() {
return subSlots.size() > 0;
}
@Override
public void releaseInstanceSlot() {
public boolean tryAssignPayload(Payload payload) {
throw new UnsupportedOperationException("Cannot assign an execution attempt id to a shared slot.");
}
@Nullable
@Override
public Payload getPayload() {
return null;
}
@Override
public CompletableFuture<?> releaseSlot() {
cancellationFuture.completeExceptionally(new FlinkException("Shared slot " + this + " is being released."));
assignmentGroup.releaseSharedSlot(this);
if (!(isReleased() && subSlots.isEmpty())) {
throw new IllegalStateException("Bug: SharedSlot is not empty and released after call to releaseSlot()");
}
return CompletableFuture.completedFuture(null);
}
@Override
public void releaseInstanceSlot() {
releaseSlot();
}
@Override
public int getPhysicalSlotNumber() {
return getRootSlotNumber();
}
@Override
public AllocationID getAllocationId() {
return getSlotContext().getAllocationId();
}
/**
......@@ -222,8 +239,12 @@ public class SharedSlot extends Slot {
SimpleSlot allocateSubSlot(AbstractID groupId) {
if (isAlive()) {
SimpleSlot slot = new SimpleSlot(
getJobID(), getOwner(), getTaskManagerLocation(), subSlots.size(),
getTaskManagerGateway(), this, groupId);
getOwner(),
getTaskManagerLocation(),
subSlots.size(),
getTaskManagerGateway(),
this,
groupId);
subSlots.add(slot);
return slot;
}
......@@ -244,8 +265,13 @@ public class SharedSlot extends Slot {
SharedSlot allocateSharedSlot(AbstractID groupId){
if (isAlive()) {
SharedSlot slot = new SharedSlot(
getJobID(), getOwner(), getTaskManagerLocation(), subSlots.size(),
getTaskManagerGateway(), assignmentGroup, this, groupId);
getOwner(),
getTaskManagerLocation(),
subSlots.size(),
getTaskManagerGateway(),
assignmentGroup,
this,
groupId);
subSlots.add(slot);
return slot;
}
......
......@@ -18,11 +18,10 @@
package org.apache.flink.runtime.instance;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.jobmanager.scheduler.Locality;
import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
import org.apache.flink.runtime.jobmanager.slots.SimpleSlotContext;
import org.apache.flink.runtime.jobmanager.slots.SlotContext;
import org.apache.flink.runtime.jobmanager.slots.SlotOwner;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
......@@ -64,23 +63,21 @@ public class SimpleSlot extends Slot implements LogicalSlot {
/**
* Creates a new simple slot that stands alone and does not belong to shared slot.
*
* @param jobID The ID of the job that the slot is allocated for.
* @param owner The component from which this slot is allocated.
* @param location The location info of the TaskManager where the slot was allocated from
* @param slotNumber The number of the task slot on the instance.
* @param taskManagerGateway The gateway to communicate with the TaskManager of this slot
*/
public SimpleSlot(
JobID jobID, SlotOwner owner, TaskManagerLocation location, int slotNumber,
SlotOwner owner, TaskManagerLocation location, int slotNumber,
TaskManagerGateway taskManagerGateway) {
this(jobID, owner, location, slotNumber, taskManagerGateway, null, null);
this(owner, location, slotNumber, taskManagerGateway, null, null);
}
/**
* Creates a new simple slot that belongs to the given shared slot and
* is identified by the given ID.
*
* @param jobID The ID of the job that the slot is allocated for.
* @param owner The component from which this slot is allocated.
* @param location The location info of the TaskManager where the slot was allocated from
* @param slotNumber The number of the simple slot in its parent shared slot.
......@@ -89,15 +86,25 @@ public class SimpleSlot extends Slot implements LogicalSlot {
* @param groupID The ID that identifies the group that the slot belongs to.
*/
public SimpleSlot(
JobID jobID, SlotOwner owner, TaskManagerLocation location, int slotNumber,
SlotOwner owner,
TaskManagerLocation location,
int slotNumber,
TaskManagerGateway taskManagerGateway,
@Nullable SharedSlot parent, @Nullable AbstractID groupID) {
super(parent != null ?
parent.getAllocatedSlot() :
new AllocatedSlot(NO_ALLOCATION_ID, jobID, location, slotNumber,
ResourceProfile.UNKNOWN, taskManagerGateway),
owner, slotNumber, parent, groupID);
@Nullable SharedSlot parent,
@Nullable AbstractID groupID) {
super(
parent != null ?
parent.getSlotContext() :
new SimpleSlotContext(
NO_ALLOCATION_ID,
location,
slotNumber,
taskManagerGateway),
owner,
slotNumber,
parent,
groupID);
}
// ------------------------------------------------------------------------
......@@ -107,12 +114,11 @@ public class SimpleSlot extends Slot implements LogicalSlot {
/**
* Creates a new simple slot that stands alone and does not belong to shared slot.
*
* @param allocatedSlot The allocated slot that this slot represents.
* @param slotContext The slot context of this simple slot
* @param owner The component from which this slot is allocated.
* @param slotNumber The number of the task slot on the instance.
*/
public SimpleSlot(AllocatedSlot allocatedSlot, SlotOwner owner, int slotNumber) {
this(allocatedSlot, owner, slotNumber, null, null);
public SimpleSlot(SlotContext slotContext, SlotOwner owner, int slotNumber) {
this(slotContext, owner, slotNumber, null, null);
}
/**
......@@ -121,27 +127,29 @@ public class SimpleSlot extends Slot implements LogicalSlot {
*
* @param parent The parent shared slot.
* @param owner The component from which this slot is allocated.
* @param slotNumber The number of the simple slot in its parent shared slot.
* @param groupID The ID that identifies the group that the slot belongs to.
*/
public SimpleSlot(SharedSlot parent, SlotOwner owner, int slotNumber, AbstractID groupID) {
this(parent.getAllocatedSlot(), owner, slotNumber, parent, groupID);
this(parent.getSlotContext(), owner, slotNumber, parent, groupID);
}
/**
* Creates a new simple slot that belongs to the given shared slot and
* is identified by the given ID..
*
* @param allocatedSlot The allocated slot that this slot represents.
* @param slotContext The slot context of this simple slot
* @param owner The component from which this slot is allocated.
* @param slotNumber The number of the simple slot in its parent shared slot.
* @param parent The parent shared slot.
* @param groupID The ID that identifies the group that the slot belongs to.
*/
private SimpleSlot(
AllocatedSlot allocatedSlot, SlotOwner owner, int slotNumber,
@Nullable SharedSlot parent, @Nullable AbstractID groupID) {
super(allocatedSlot, owner, slotNumber, parent, groupID);
SlotContext slotContext,
SlotOwner owner,
int slotNumber,
@Nullable SharedSlot parent,
@Nullable AbstractID groupID) {
super(slotContext, owner, slotNumber, parent, groupID);
}
// ------------------------------------------------------------------------
......@@ -263,7 +271,7 @@ public class SimpleSlot extends Slot implements LogicalSlot {
@Override
public AllocationID getAllocationId() {
return getAllocatedSlot().getSlotAllocationId();
return getSlotContext().getAllocationId();
}
// ------------------------------------------------------------------------
......
......@@ -18,11 +18,10 @@
package org.apache.flink.runtime.instance;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
import org.apache.flink.runtime.jobmanager.slots.SimpleSlotContext;
import org.apache.flink.runtime.jobmanager.slots.SlotContext;
import org.apache.flink.runtime.jobmanager.slots.SlotOwner;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
......@@ -66,8 +65,8 @@ public abstract class Slot {
// ------------------------------------------------------------------------
/** The allocated slot that this slot represents. */
private final AllocatedSlot allocatedSlot;
/** Context of this logical slot. */
private final SlotContext slotContext;
/** The owner of this slot - the slot was taken from that owner and must be disposed to it */
private final SlotOwner owner;
......@@ -80,7 +79,6 @@ public abstract class Slot {
@Nullable
private final AbstractID groupID;
/** The number of the slot on which the task is deployed */
private final int slotNumber;
/** The state of the vertex, only atomically updated */
......@@ -93,7 +91,6 @@ public abstract class Slot {
*
* <p>This is the old way of constructing slots, prior to the FLIP-6 resource management refactoring.
*
* @param jobID The ID of the job that this slot is allocated for.
* @param owner The component from which this slot is allocated.
* @param location The location info of the TaskManager where the slot was allocated from
* @param slotNumber The number of this slot.
......@@ -103,7 +100,6 @@ public abstract class Slot {
* if the slot does not belong to any task group.
*/
protected Slot(
JobID jobID,
SlotOwner owner,
TaskManagerLocation location,
int slotNumber,
......@@ -113,12 +109,11 @@ public abstract class Slot {
checkArgument(slotNumber >= 0);
this.allocatedSlot = new AllocatedSlot(
// create a simple slot context
this.slotContext = new SimpleSlotContext(
NO_ALLOCATION_ID,
jobID,
location,
slotNumber,
ResourceProfile.UNKNOWN,
taskManagerGateway);
this.owner = checkNotNull(owner);
......@@ -130,7 +125,7 @@ public abstract class Slot {
/**
* Base constructor for slots.
*
* @param allocatedSlot The allocated slot that this slot represents.
* @param slotContext The slot context of this slot.
* @param owner The component from which this slot is allocated.
* @param slotNumber The number of this slot.
* @param parent The parent slot that contains this slot. May be null, if this slot is the root.
......@@ -138,12 +133,13 @@ public abstract class Slot {
* if the slot does not belong to any task group.
*/
protected Slot(
AllocatedSlot allocatedSlot, SlotOwner owner, int slotNumber,
@Nullable SharedSlot parent, @Nullable AbstractID groupID) {
checkArgument(slotNumber >= 0);
SlotContext slotContext,
SlotOwner owner,
int slotNumber,
@Nullable SharedSlot parent,
@Nullable AbstractID groupID) {
this.allocatedSlot = checkNotNull(allocatedSlot);
this.slotContext = checkNotNull(slotContext);
this.owner = checkNotNull(owner);
this.parent = parent; // may be null
this.groupID = groupID; // may be null
......@@ -157,17 +153,8 @@ public abstract class Slot {
*
* @return This slot's allocated slot.
*/
public AllocatedSlot getAllocatedSlot() {
return allocatedSlot;
}
/**
* Returns the ID of the job this allocated slot belongs to.
*
* @return the ID of the job this allocated slot belongs to
*/
public JobID getJobID() {
return allocatedSlot.getJobID();
public SlotContext getSlotContext() {
return slotContext;
}
/**
......@@ -176,7 +163,7 @@ public abstract class Slot {
* @return The ID of the TaskManager that offers this slot
*/
public ResourceID getTaskManagerID() {
return allocatedSlot.getTaskManagerLocation().getResourceID();
return slotContext.getTaskManagerLocation().getResourceID();
}
/**
......@@ -185,7 +172,7 @@ public abstract class Slot {
* @return The location info of the TaskManager that offers this slot
*/
public TaskManagerLocation getTaskManagerLocation() {
return allocatedSlot.getTaskManagerLocation();
return slotContext.getTaskManagerLocation();
}
/**
......@@ -196,7 +183,7 @@ public abstract class Slot {
* @return The actor gateway that can be used to send messages to the TaskManager.
*/
public TaskManagerGateway getTaskManagerGateway() {
return allocatedSlot.getTaskManagerGateway();
return slotContext.getTaskManagerGateway();
}
/**
......@@ -373,7 +360,7 @@ public abstract class Slot {
}
protected String hierarchy() {
return (getParent() != null ? getParent().hierarchy() : "") + '(' + slotNumber + ')';
return (getParent() != null ? getParent().hierarchy() : "") + '(' + getSlotNumber() + ')';
}
private static String getStateName(int state) {
......
......@@ -21,7 +21,6 @@ package org.apache.flink.runtime.instance;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
......@@ -29,9 +28,11 @@ import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.jobmanager.scheduler.Locality;
import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
import org.apache.flink.runtime.jobmanager.slots.SlotContext;
import org.apache.flink.runtime.jobmanager.slots.SlotAndLocality;
import org.apache.flink.runtime.jobmanager.slots.SlotException;
import org.apache.flink.runtime.jobmanager.slots.SlotOwner;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.jobmaster.JobMasterId;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
......@@ -59,11 +60,11 @@ import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
......@@ -249,7 +250,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
// work on all slots waiting for this connection
for (PendingRequest pendingRequest : waitingForResourceManager.values()) {
requestSlotFromResourceManager(pendingRequest);
requestSlotFromResourceManager(resourceManagerGateway, pendingRequest);
}
// all sent off
......@@ -277,24 +278,23 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
}
@Override
public void returnAllocatedSlot(Slot slot) {
internalReturnAllocatedSlot(slot);
public void returnAllocatedSlot(SlotContext allocatedSlot) {
internalReturnAllocatedSlot(allocatedSlot.getAllocationId());
}
@Override
public CompletableFuture<Acknowledge> cancelSlotAllocation(SlotRequestID requestId) {
public CompletableFuture<Acknowledge> cancelSlotRequest(SlotRequestID requestId) {
final PendingRequest pendingRequest = removePendingRequest(requestId);
if (pendingRequest != null) {
failPendingRequest(pendingRequest, new CancellationException("Allocation with request id" + requestId + " cancelled."));
} else {
final Slot slot = allocatedSlots.get(requestId);
final AllocatedSlot allocatedSlot = allocatedSlots.get(requestId);
if (slot != null) {
LOG.info("Returning allocated slot {} because the corresponding allocation request {} was cancelled.", slot, requestId);
if (slot.markCancelled()) {
internalReturnAllocatedSlot(slot);
}
if (allocatedSlot != null) {
LOG.info("Returning allocated slot {} because the corresponding allocation request {} was cancelled.", allocatedSlot, requestId);
// TODO: Avoid having to send another message to do the slot releasing (e.g. introduce Slot#cancelExecution) and directly return slot
allocatedSlot.triggerLogicalSlotRelease();
} else {
LOG.debug("There was no slot allocation with {} to be cancelled.", requestId);
}
......@@ -312,24 +312,36 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
// (1) do we have a slot available already?
SlotAndLocality slotFromPool = availableSlots.poll(resources, locationPreferences);
if (slotFromPool != null) {
SimpleSlot slot = createSimpleSlot(slotFromPool.slot(), slotFromPool.locality());
allocatedSlots.add(requestId, slot);
return CompletableFuture.completedFuture(slot);
}
final AllocatedSlot allocatedSlot = slotFromPool.slot();
// the request will be completed by a future
final CompletableFuture<LogicalSlot> future = new CompletableFuture<>();
final SimpleSlot simpleSlot;
try {
simpleSlot = allocatedSlot.allocateSimpleSlot(slotFromPool.locality());
} catch (SlotException e) {
availableSlots.add(allocatedSlot, clock.relativeTimeMillis());
// (2) need to request a slot
if (resourceManagerGateway == null) {
// no slot available, and no resource manager connection
stashRequestWaitingForResourceManager(requestId, resources, future);
} else {
// we have a resource manager connection, so let's ask it for more resources
requestSlotFromResourceManager(new PendingRequest(requestId, future, resources));
return FutureUtils.completedExceptionally(e);
}
allocatedSlots.add(requestId, allocatedSlot);
return CompletableFuture.completedFuture(simpleSlot);
}
return future;
// we have to request a new allocated slot
CompletableFuture<AllocatedSlot> allocatedSlotFuture = requestSlot(
requestId,
resources);
return allocatedSlotFuture.thenApply(
(AllocatedSlot allocatedSlot) -> {
try {
return allocatedSlot.allocateSimpleSlot(Locality.UNKNOWN);
} catch (SlotException e) {
returnAllocatedSlot(allocatedSlot);
throw new CompletionException("Could not allocate a logical simple slot.", e);
}
});
}
/**
......@@ -354,16 +366,37 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
}
}
private CompletableFuture<AllocatedSlot> requestSlot(
SlotRequestID slotRequestId,
ResourceProfile resourceProfile) {
final PendingRequest pendingRequest = new PendingRequest(
slotRequestId,
resourceProfile);
if (resourceManagerGateway == null) {
stashRequestWaitingForResourceManager(pendingRequest);
} else {
requestSlotFromResourceManager(resourceManagerGateway, pendingRequest);
}
return pendingRequest.getAllocatedSlotFuture();
}
private void requestSlotFromResourceManager(
final ResourceManagerGateway resourceManagerGateway,
final PendingRequest pendingRequest) {
Preconditions.checkNotNull(resourceManagerGateway);
Preconditions.checkNotNull(pendingRequest);
LOG.info("Requesting slot with profile {} from resource manager (request = {}).", pendingRequest.getResourceProfile(), pendingRequest.getSlotRequestId());
final AllocationID allocationId = new AllocationID();
pendingRequests.put(pendingRequest.getSlotRequestId(), allocationId, pendingRequest);
pendingRequest.getFuture().whenComplete(
pendingRequest.getAllocatedSlotFuture().whenComplete(
(value, throwable) -> {
if (throwable != null) {
resourceManagerGateway.cancelSlotRequest(allocationId);
......@@ -405,7 +438,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
private void slotRequestToResourceManagerFailed(SlotRequestID slotRequestID, Throwable failure) {
PendingRequest request = pendingRequests.removeKeyA(slotRequestID);
if (request != null) {
request.getFuture().completeExceptionally(new NoResourceAvailableException(
request.getAllocatedSlotFuture().completeExceptionally(new NoResourceAvailableException(
"No pooled slot available and request to ResourceManager for new slot failed", failure));
} else {
if (LOG.isDebugEnabled()) {
......@@ -425,25 +458,22 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
Preconditions.checkNotNull(pendingRequest);
Preconditions.checkNotNull(e);
if (!pendingRequest.getFuture().isDone()) {
pendingRequest.getFuture().completeExceptionally(e);
if (!pendingRequest.getAllocatedSlotFuture().isDone()) {
pendingRequest.getAllocatedSlotFuture().completeExceptionally(e);
}
}
private void stashRequestWaitingForResourceManager(
final SlotRequestID requestId,
final ResourceProfile resources,
final CompletableFuture<LogicalSlot> future) {
private void stashRequestWaitingForResourceManager(final PendingRequest pendingRequest) {
LOG.info("Cannot serve slot request, no ResourceManager connected. " +
"Adding as pending request {}", requestId);
"Adding as pending request {}", pendingRequest.getSlotRequestId());
waitingForResourceManager.put(requestId, new PendingRequest(requestId, future, resources));
waitingForResourceManager.put(pendingRequest.getSlotRequestId(), pendingRequest);
scheduleRunAsync(new Runnable() {
@Override
public void run() {
checkTimeoutRequestWaitingForResourceManager(requestId);
checkTimeoutRequestWaitingForResourceManager(pendingRequest.getSlotRequestId());
}
}, resourceManagerRequestsTimeout);
}
......@@ -465,38 +495,31 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
* Return the slot back to this pool without releasing it. It's mainly called by failed / cancelled tasks, and the
* slot can be reused by other pending requests if the resource profile matches.n
*
* @param slot The slot needs to be returned
* @param allocationId identifying the slot which is returned
*/
private void internalReturnAllocatedSlot(Slot slot) {
checkNotNull(slot);
checkArgument(!slot.isAlive(), "slot is still alive");
checkArgument(slot.getOwner() == providerAndOwner, "slot belongs to the wrong pool.");
// markReleased() is an atomic check-and-set operation, so that the slot is guaranteed
// to be returned only once
if (slot.markReleased()) {
if (allocatedSlots.remove(slot)) {
// this slot allocation is still valid, use the slot to fulfill another request
// or make it available again
final AllocatedSlot taskManagerSlot = slot.getAllocatedSlot();
final PendingRequest pendingRequest = pollMatchingPendingRequest(taskManagerSlot);
private void internalReturnAllocatedSlot(AllocationID allocationId) {
final AllocatedSlot allocatedSlot = allocatedSlots.remove(allocationId);
if (allocatedSlot != null) {
if (allocatedSlot.releaseLogicalSlot()) {
final PendingRequest pendingRequest = pollMatchingPendingRequest(allocatedSlot);
if (pendingRequest != null) {
LOG.debug("Fulfilling pending request [{}] early with returned slot [{}]",
pendingRequest.getSlotRequestId(), taskManagerSlot.getSlotAllocationId());
pendingRequest.getSlotRequestId(), allocatedSlot.getAllocationId());
SimpleSlot newSlot = createSimpleSlot(taskManagerSlot, Locality.UNKNOWN);
allocatedSlots.add(pendingRequest.getSlotRequestId(), newSlot);
pendingRequest.getFuture().complete(newSlot);
allocatedSlots.add(pendingRequest.getSlotRequestId(), allocatedSlot);
pendingRequest.getAllocatedSlotFuture().complete(allocatedSlot);
} else {
LOG.debug("Adding returned slot [{}] to available slots", allocatedSlot.getAllocationId());
availableSlots.add(allocatedSlot, clock.relativeTimeMillis());
}
else {
LOG.debug("Adding returned slot [{}] to available slots", taskManagerSlot.getSlotAllocationId());
availableSlots.add(taskManagerSlot, clock.relativeTimeMillis());
}
}
else {
LOG.debug("Returned slot's allocation has been failed. Dropping slot.");
} else {
LOG.debug("Failed to mark the logical slot of {} as released.", allocatedSlot);
}
} else {
LOG.debug("Could not find allocated slot {}. Ignoring returning slot.", allocationId);
}
}
......@@ -524,19 +547,26 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
}
@Override
public CompletableFuture<Collection<SlotOffer>> offerSlots(Collection<Tuple2<AllocatedSlot, SlotOffer>> offers) {
public CompletableFuture<Collection<SlotOffer>> offerSlots(
TaskManagerLocation taskManagerLocation,
TaskManagerGateway taskManagerGateway,
Collection<SlotOffer> offers) {
validateRunsInMainThread();
List<CompletableFuture<Optional<SlotOffer>>> acceptedSlotOffers = offers.stream().map(
offer -> {
CompletableFuture<Optional<SlotOffer>> acceptedSlotOffer = offerSlot(offer.f0).thenApply(
(acceptedSlot) -> {
if (acceptedSlot) {
return Optional.of(offer.f1);
} else {
return Optional.empty();
}
});
CompletableFuture<Optional<SlotOffer>> acceptedSlotOffer = offerSlot(
taskManagerLocation,
taskManagerGateway,
offer)
.thenApply(
(acceptedSlot) -> {
if (acceptedSlot) {
return Optional.of(offer);
} else {
return Optional.empty();
}
});
return acceptedSlotOffer;
}
......@@ -564,20 +594,25 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
* we issued. Slot offering may be rejected if we find something mismatching or there is actually no pending
* request waiting for this slot (maybe fulfilled by some other returned slot).
*
* @param slot The offered slot
* @param taskManagerLocation location from where the offer comes from
* @param taskManagerGateway TaskManager gateway
* @param slotOffer the offered slot
* @return True if we accept the offering
*/
@Override
public CompletableFuture<Boolean> offerSlot(final AllocatedSlot slot) {
public CompletableFuture<Boolean> offerSlot(
final TaskManagerLocation taskManagerLocation,
final TaskManagerGateway taskManagerGateway,
final SlotOffer slotOffer) {
validateRunsInMainThread();
// check if this TaskManager is valid
final ResourceID resourceID = slot.getTaskManagerId();
final AllocationID allocationID = slot.getSlotAllocationId();
final ResourceID resourceID = taskManagerLocation.getResourceID();
final AllocationID allocationID = slotOffer.getAllocationId();
if (!registeredTaskManagers.contains(resourceID)) {
LOG.debug("Received outdated slot offering [{}] from unregistered TaskManager: {}",
slot.getSlotAllocationId(), slot);
slotOffer.getAllocationId(), taskManagerLocation);
return CompletableFuture.completedFuture(false);
}
......@@ -590,19 +625,26 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
return CompletableFuture.completedFuture(true);
}
final AllocatedSlot allocatedSlot = new AllocatedSlot(
slotOffer.getAllocationId(),
taskManagerLocation,
slotOffer.getSlotIndex(),
slotOffer.getResourceProfile(),
taskManagerGateway,
providerAndOwner);
// check whether we have request waiting for this slot
PendingRequest pendingRequest = pendingRequests.removeKeyB(allocationID);
if (pendingRequest != null) {
// we were waiting for this!
SimpleSlot resultSlot = createSimpleSlot(slot, Locality.UNKNOWN);
pendingRequest.getFuture().complete(resultSlot);
allocatedSlots.add(pendingRequest.getSlotRequestId(), resultSlot);
allocatedSlots.add(pendingRequest.getSlotRequestId(), allocatedSlot);
pendingRequest.getAllocatedSlotFuture().complete(allocatedSlot);
}
else {
// we were actually not waiting for this:
// - could be that this request had been fulfilled
// - we are receiving the slots from TaskManagers after becoming leaders
availableSlots.add(slot, clock.relativeTimeMillis());
availableSlots.add(allocatedSlot, clock.relativeTimeMillis());
}
// we accepted the request in any case. slot will be released after it idled for
......@@ -639,11 +681,11 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
LOG.debug("Failed available slot [{}] with ", allocationID, cause);
}
else {
Slot slot = allocatedSlots.remove(allocationID);
if (slot != null) {
AllocatedSlot allocatedSlot = allocatedSlots.remove(allocationID);
if (allocatedSlot != null) {
// release the slot.
// since it is not in 'allocatedSlots' any more, it will be dropped o return'
slot.releaseInstanceSlot();
allocatedSlot.triggerLogicalSlotRelease();
}
else {
LOG.debug("Outdated request to fail slot [{}] with ", allocationID, cause);
......@@ -681,27 +723,17 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
if (registeredTaskManagers.remove(resourceID)) {
availableSlots.removeAllForTaskManager(resourceID);
final Set<Slot> allocatedSlotsForResource = allocatedSlots.removeSlotsForTaskManager(resourceID);
for (Slot slot : allocatedSlotsForResource) {
slot.releaseInstanceSlot();
final Set<AllocatedSlot> allocatedSlotsForResource = allocatedSlots.removeSlotsForTaskManager(resourceID);
for (AllocatedSlot allocatedSlot : allocatedSlotsForResource) {
allocatedSlot.triggerLogicalSlotRelease();
// TODO: This is a work-around to mark the logical slot as released. We should split up the internalReturnSlot method to not poll pending requests
allocatedSlot.releaseLogicalSlot();
}
}
return CompletableFuture.completedFuture(Acknowledge.get());
}
// ------------------------------------------------------------------------
// Utilities
// ------------------------------------------------------------------------
private SimpleSlot createSimpleSlot(AllocatedSlot slot, Locality locality) {
SimpleSlot result = new SimpleSlot(slot, providerAndOwner, slot.getSlotNumber());
if (locality != null) {
result.setLocality(locality);
}
return result;
}
// ------------------------------------------------------------------------
// Methods for tests
// ------------------------------------------------------------------------
......@@ -736,10 +768,10 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
static class AllocatedSlots {
/** All allocated slots organized by TaskManager's id */
private final Map<ResourceID, Set<Slot>> allocatedSlotsByTaskManager;
private final Map<ResourceID, Set<AllocatedSlot>> allocatedSlotsByTaskManager;
/** All allocated slots organized by AllocationID */
private final DualKeyMap<AllocationID, SlotRequestID, Slot> allocatedSlotsById;
private final DualKeyMap<AllocationID, SlotRequestID, AllocatedSlot> allocatedSlotsById;
AllocatedSlots() {
this.allocatedSlotsByTaskManager = new HashMap<>(16);
......@@ -749,18 +781,18 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
/**
* Adds a new slot to this collection.
*
* @param slot The allocated slot
* @param allocatedSlot The allocated slot
*/
void add(SlotRequestID slotRequestId, Slot slot) {
allocatedSlotsById.put(slot.getAllocatedSlot().getSlotAllocationId(), slotRequestId, slot);
final ResourceID resourceID = slot.getTaskManagerID();
Set<Slot> slotsForTaskManager = allocatedSlotsByTaskManager.get(resourceID);
if (slotsForTaskManager == null) {
slotsForTaskManager = new HashSet<>();
allocatedSlotsByTaskManager.put(resourceID, slotsForTaskManager);
}
slotsForTaskManager.add(slot);
void add(SlotRequestID slotRequestId, AllocatedSlot allocatedSlot) {
allocatedSlotsById.put(allocatedSlot.getAllocationId(), slotRequestId, allocatedSlot);
final ResourceID resourceID = allocatedSlot.getTaskManagerLocation().getResourceID();
Set<AllocatedSlot> slotsForTaskManager = allocatedSlotsByTaskManager.computeIfAbsent(
resourceID,
resourceId -> new HashSet<>(4));
slotsForTaskManager.add(allocatedSlot);
}
/**
......@@ -769,11 +801,11 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
* @param allocationID The allocation id
* @return The allocated slot, null if we can't find a match
*/
Slot get(final AllocationID allocationID) {
AllocatedSlot get(final AllocationID allocationID) {
return allocatedSlotsById.getKeyA(allocationID);
}
Slot get(final SlotRequestID slotRequestId) {
AllocatedSlot get(final SlotRequestID slotRequestId) {
return allocatedSlotsById.getKeyB(slotRequestId);
}
......@@ -787,30 +819,23 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
return allocatedSlotsById.containsKeyA(slotAllocationId);
}
/**
* Remove an allocation with slot.
*
* @param slot The slot needs to be removed
*/
boolean remove(final Slot slot) {
return remove(slot.getAllocatedSlot().getSlotAllocationId()) != null;
}
/**
* Remove an allocation with slot.
*
* @param slotId The ID of the slot to be removed
*/
Slot remove(final AllocationID slotId) {
Slot slot = allocatedSlotsById.removeKeyA(slotId);
if (slot != null) {
final ResourceID taskManagerId = slot.getTaskManagerID();
Set<Slot> slotsForTM = allocatedSlotsByTaskManager.get(taskManagerId);
slotsForTM.remove(slot);
AllocatedSlot remove(final AllocationID slotId) {
AllocatedSlot allocatedSlot = allocatedSlotsById.removeKeyA(slotId);
if (allocatedSlot != null) {
final ResourceID taskManagerId = allocatedSlot.getTaskManagerLocation().getResourceID();
Set<AllocatedSlot> slotsForTM = allocatedSlotsByTaskManager.get(taskManagerId);
slotsForTM.remove(allocatedSlot);
if (slotsForTM.isEmpty()) {
allocatedSlotsByTaskManager.remove(taskManagerId);
}
return slot;
return allocatedSlot;
}
else {
return null;
......@@ -823,11 +848,11 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
* @param resourceID The id of the TaskManager
* @return Set of slots which are allocated from the same TaskManager
*/
Set<Slot> removeSlotsForTaskManager(final ResourceID resourceID) {
Set<Slot> slotsForTaskManager = allocatedSlotsByTaskManager.remove(resourceID);
Set<AllocatedSlot> removeSlotsForTaskManager(final ResourceID resourceID) {
Set<AllocatedSlot> slotsForTaskManager = allocatedSlotsByTaskManager.remove(resourceID);
if (slotsForTaskManager != null) {
for (Slot slot : slotsForTaskManager) {
allocatedSlotsById.removeKeyA(slot.getAllocatedSlot().getSlotAllocationId());
for (AllocatedSlot allocatedSlot : slotsForTaskManager) {
allocatedSlotsById.removeKeyA(allocatedSlot.getAllocationId());
}
return slotsForTaskManager;
}
......@@ -852,7 +877,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
}
@VisibleForTesting
Set<Slot> getSlotsForTaskManager(ResourceID resourceId) {
Set<AllocatedSlot> getSlotsForTaskManager(ResourceID resourceId) {
if (allocatedSlotsByTaskManager.containsKey(resourceId)) {
return allocatedSlotsByTaskManager.get(resourceId);
} else {
......@@ -892,7 +917,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
checkNotNull(slot);
SlotAndTimestamp previous = availableSlots.put(
slot.getSlotAllocationId(), new SlotAndTimestamp(slot, timestamp));
slot.getAllocationId(), new SlotAndTimestamp(slot, timestamp));
if (previous == null) {
final ResourceID resourceID = slot.getTaskManagerLocation().getResourceID();
......@@ -951,7 +976,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
if (onTaskManager != null) {
for (AllocatedSlot candidate : onTaskManager) {
if (candidate.getResourceProfile().isMatching(resourceProfile)) {
remove(candidate.getSlotAllocationId());
remove(candidate.getAllocationId());
return new SlotAndLocality(candidate, Locality.LOCAL);
}
}
......@@ -964,7 +989,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
if (onHost != null) {
for (AllocatedSlot candidate : onHost) {
if (candidate.getResourceProfile().isMatching(resourceProfile)) {
remove(candidate.getSlotAllocationId());
remove(candidate.getAllocationId());
return new SlotAndLocality(candidate, Locality.HOST_LOCAL);
}
}
......@@ -977,7 +1002,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
final AllocatedSlot slot = candidate.slot();
if (slot.getResourceProfile().isMatching(resourceProfile)) {
remove(slot.getSlotAllocationId());
remove(slot.getAllocationId());
return new SlotAndLocality(
slot, hadLocationPreference ? Locality.NON_LOCAL : Locality.UNCONSTRAINED);
}
......@@ -1002,7 +1027,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
// remove from the base set and the by-host view
for (AllocatedSlot slot : slotsForTm) {
availableSlots.remove(slot.getSlotAllocationId());
availableSlots.remove(slot.getAllocationId());
slotsForHost.remove(slot);
}
......@@ -1082,7 +1107,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
@Override
public CompletableFuture<Boolean> returnAllocatedSlot(Slot slot) {
gateway.returnAllocatedSlot(slot);
gateway.returnAllocatedSlot(slot.getSlotContext());
return CompletableFuture.completedFuture(true);
}
......@@ -1097,7 +1122,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
slotFuture.whenComplete(
(LogicalSlot slot, Throwable failure) -> {
if (failure != null) {
gateway.cancelSlotAllocation(requestId);
gateway.cancelSlotRequest(requestId);
}
});
return slotFuture;
......@@ -1113,25 +1138,25 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
private final SlotRequestID slotRequestId;
private final CompletableFuture<LogicalSlot> future;
private final ResourceProfile resourceProfile;
private final CompletableFuture<AllocatedSlot> allocatedSlotFuture;
PendingRequest(
SlotRequestID slotRequestId,
CompletableFuture<LogicalSlot> future,
ResourceProfile resourceProfile) {
this.slotRequestId = Preconditions.checkNotNull(slotRequestId);
this.future = Preconditions.checkNotNull(future);
this.resourceProfile = Preconditions.checkNotNull(resourceProfile);
allocatedSlotFuture = new CompletableFuture<>();
}
public SlotRequestID getSlotRequestId() {
return slotRequestId;
}
public CompletableFuture<LogicalSlot> getFuture() {
return future;
public CompletableFuture<AllocatedSlot> getAllocatedSlotFuture() {
return allocatedSlotFuture;
}
public ResourceProfile getResourceProfile() {
......
......@@ -19,12 +19,12 @@
package org.apache.flink.runtime.instance;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
import org.apache.flink.runtime.jobmanager.slots.SlotContext;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.rpc.RpcGateway;
......@@ -76,9 +76,15 @@ public interface SlotPoolGateway extends RpcGateway {
CompletableFuture<Acknowledge> releaseTaskManager(ResourceID resourceID);
CompletableFuture<Boolean> offerSlot(AllocatedSlot slot);
CompletableFuture<Boolean> offerSlot(
TaskManagerLocation taskManagerLocation,
TaskManagerGateway taskManagerGateway,
SlotOffer slotOffer);
CompletableFuture<Collection<SlotOffer>> offerSlots(Collection<Tuple2<AllocatedSlot, SlotOffer>> offers);
CompletableFuture<Collection<SlotOffer>> offerSlots(
TaskManagerLocation taskManagerLocation,
TaskManagerGateway taskManagerGateway,
Collection<SlotOffer> offers);
void failAllocation(AllocationID allocationID, Exception cause);
......@@ -93,7 +99,7 @@ public interface SlotPoolGateway extends RpcGateway {
Iterable<TaskManagerLocation> locationPreferences,
@RpcTimeout Time timeout);
void returnAllocatedSlot(Slot slot);
void returnAllocatedSlot(SlotContext slotInformation);
/**
* Cancel a slot allocation request.
......@@ -101,7 +107,7 @@ public interface SlotPoolGateway extends RpcGateway {
* @param requestId identifying the slot allocation request
* @return Future acknowledge if the slot allocation has been cancelled
*/
CompletableFuture<Acknowledge> cancelSlotAllocation(SlotRequestID requestId);
CompletableFuture<Acknowledge> cancelSlotRequest(SlotRequestID requestId);
/**
* Request ID identifying different slot requests.
......
......@@ -364,7 +364,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener, Sl
Locality locality = instanceLocalityPair.getRight();
try {
SimpleSlot slot = instanceToUse.allocateSimpleSlot(vertex.getJobId());
SimpleSlot slot = instanceToUse.allocateSimpleSlot();
// if the instance has further available slots, re-add it to the set of available resources.
if (instanceToUse.hasResourcesAvailable()) {
......@@ -426,7 +426,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener, Sl
JobVertexID groupID = vertex.getJobvertexId();
// allocate a shared slot from the instance
SharedSlot sharedSlot = instanceToUse.allocateSharedSlot(vertex.getJobId(), groupAssignment);
SharedSlot sharedSlot = instanceToUse.allocateSharedSlot(groupAssignment);
// if the instance has further available slots, re-add it to the set of available resources.
if (instanceToUse.hasResourcesAvailable()) {
......@@ -562,7 +562,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener, Sl
ExecutionVertex vertex = task.getTaskToExecute().getVertex();
try {
SimpleSlot newSlot = instance.allocateSimpleSlot(vertex.getJobId());
SimpleSlot newSlot = instance.allocateSimpleSlot();
if (newSlot != null) {
// success, remove from the task queue and notify the future
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.jobmanager.slots;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.util.Preconditions;
/**
* Simple implementation of the {@link SlotContext} interface for the legacy code.
*/
public class SimpleSlotContext implements SlotContext {
private final AllocationID allocationId;
private final TaskManagerLocation taskManagerLocation;
private final int physicalSlotNumber;
private final TaskManagerGateway taskManagerGateway;
public SimpleSlotContext(
AllocationID allocationId,
TaskManagerLocation taskManagerLocation,
int physicalSlotNumber,
TaskManagerGateway taskManagerGateway) {
this.allocationId = Preconditions.checkNotNull(allocationId);
this.taskManagerLocation = Preconditions.checkNotNull(taskManagerLocation);
this.physicalSlotNumber = physicalSlotNumber;
this.taskManagerGateway = Preconditions.checkNotNull(taskManagerGateway);
}
@Override
public AllocationID getAllocationId() {
return allocationId;
}
@Override
public TaskManagerLocation getTaskManagerLocation() {
return taskManagerLocation;
}
@Override
public int getPhysicalSlotNumber() {
return physicalSlotNumber;
}
@Override
public TaskManagerGateway getTaskManagerGateway() {
return taskManagerGateway;
}
}
......@@ -18,6 +18,7 @@
package org.apache.flink.runtime.jobmanager.slots;
import org.apache.flink.runtime.instance.AllocatedSlot;
import org.apache.flink.runtime.jobmanager.scheduler.Locality;
import static org.apache.flink.util.Preconditions.checkNotNull;
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.jobmanager.slots;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.instance.Slot;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
/**
* Interface for the context of a logical {@link Slot}. This context contains information
* about the underlying allocated slot and how to communicate with the TaskManager on which
* it was allocated.
*/
public interface SlotContext {
/**
* Gets the ID under which the slot is allocated, which uniquely identifies the slot.
*
* @return The ID under which the slot is allocated
*/
AllocationID getAllocationId();
/**
* Gets the location info of the TaskManager that offers this slot.
*
* @return The location info of the TaskManager that offers this slot
*/
TaskManagerLocation getTaskManagerLocation();
/**
* Gets the number of the slot.
*
* @return The number of the slot on the TaskManager.
*/
int getPhysicalSlotNumber();
/**
* Gets the actor gateway that can be used to send messages to the TaskManager.
* <p>
* This method should be removed once the new interface-based RPC abstraction is in place
*
* @return The gateway that can be used to send messages to the TaskManager.
*/
TaskManagerGateway getTaskManagerGateway();
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.jobmanager.slots;
import org.apache.flink.util.FlinkException;
/**
* Base class for slot related exceptions.
*/
public class SlotException extends FlinkException {
private static final long serialVersionUID = -8009227041400667546L;
public SlotException(String message) {
super(message);
}
public SlotException(Throwable cause) {
super(cause);
}
public SlotException(String message, Throwable cause) {
super(message, cause);
}
}
......@@ -65,7 +65,6 @@ import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.OnCompletionActions;
import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException;
import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
import org.apache.flink.runtime.jobmaster.message.ClassloadingProps;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
......@@ -109,7 +108,6 @@ import javax.annotation.Nullable;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
......@@ -649,7 +647,7 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
@Override
public CompletableFuture<Collection<SlotOffer>> offerSlots(
final ResourceID taskManagerId,
final Iterable<SlotOffer> slots,
final Collection<SlotOffer> slots,
final Time timeout) {
Tuple2<TaskManagerLocation, TaskExecutorGateway> taskManager = registeredTaskManagers.get(taskManagerId);
......@@ -658,27 +656,15 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
return FutureUtils.completedExceptionally(new Exception("Unknown TaskManager " + taskManagerId));
}
final JobID jid = jobGraph.getJobID();
final TaskManagerLocation taskManagerLocation = taskManager.f0;
final TaskExecutorGateway taskExecutorGateway = taskManager.f1;
final ArrayList<Tuple2<AllocatedSlot, SlotOffer>> slotsAndOffers = new ArrayList<>();
final RpcTaskManagerGateway rpcTaskManagerGateway = new RpcTaskManagerGateway(taskExecutorGateway, getFencingToken());
for (SlotOffer slotOffer : slots) {
final AllocatedSlot slot = new AllocatedSlot(
slotOffer.getAllocationId(),
jid,
taskManagerLocation,
slotOffer.getSlotIndex(),
slotOffer.getResourceProfile(),
rpcTaskManagerGateway);
slotsAndOffers.add(new Tuple2<>(slot, slotOffer));
}
return slotPoolGateway.offerSlots(slotsAndOffers);
return slotPoolGateway.offerSlots(
taskManagerLocation,
rpcTaskManagerGateway,
slots);
}
@Override
......
......@@ -195,7 +195,7 @@ public interface JobMasterGateway extends CheckpointCoordinatorGateway, FencedRp
*/
CompletableFuture<Collection<SlotOffer>> offerSlots(
final ResourceID taskManagerId,
final Iterable<SlotOffer> slots,
final Collection<SlotOffer> slots,
@RpcTimeout final Time timeout);
/**
......
......@@ -184,7 +184,7 @@ public class ExecutionGraphDeploymentTest extends TestLogger {
final Instance instance = getInstance(new ActorTaskManagerGateway(instanceGateway));
final SimpleSlot slot = instance.allocateSimpleSlot(jobId);
final SimpleSlot slot = instance.allocateSimpleSlot();
assertEquals(ExecutionState.CREATED, vertex.getExecutionState());
......@@ -631,13 +631,13 @@ public class ExecutionGraphDeploymentTest extends TestLogger {
final TaskManagerLocation localTaskManagerLocation = new LocalTaskManagerLocation();
final SimpleSlot sourceSlot1 = createSlot(executionGraph.getJobID(), localTaskManagerLocation, 0);
final SimpleSlot sourceSlot1 = createSlot(localTaskManagerLocation, 0);
final SimpleSlot sourceSlot2 = createSlot(executionGraph.getJobID(), localTaskManagerLocation, 1);
final SimpleSlot sourceSlot2 = createSlot(localTaskManagerLocation, 1);
final SimpleSlot sinkSlot1 = createSlot(executionGraph.getJobID(), localTaskManagerLocation, 0);
final SimpleSlot sinkSlot1 = createSlot(localTaskManagerLocation, 0);
final SimpleSlot sinkSlot2 = createSlot(executionGraph.getJobID(), localTaskManagerLocation, 1);
final SimpleSlot sinkSlot2 = createSlot(localTaskManagerLocation, 1);
slotFutures.get(sourceVertexId)[0].complete(sourceSlot1);
slotFutures.get(sourceVertexId)[1].complete(sourceSlot2);
......@@ -654,9 +654,8 @@ public class ExecutionGraphDeploymentTest extends TestLogger {
}
}
private SimpleSlot createSlot(JobID jobId, TaskManagerLocation taskManagerLocation, int index) {
private SimpleSlot createSlot(TaskManagerLocation taskManagerLocation, int index) {
return new SimpleSlot(
jobId,
mock(SlotOwner.class),
taskManagerLocation,
index,
......
......@@ -26,7 +26,6 @@ import org.apache.flink.runtime.blob.VoidBlobWriter;
import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
import org.apache.flink.runtime.instance.LogicalSlot;
......@@ -39,7 +38,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.ScheduleMode;
import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
import org.apache.flink.runtime.jobmanager.slots.SimpleSlotContext;
import org.apache.flink.runtime.jobmanager.slots.SlotOwner;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.jobmanager.slots.TestingSlotOwner;
......@@ -285,7 +284,7 @@ public class ExecutionGraphSchedulingTest extends TestLogger {
final BlockingQueue<AllocationID> returnedSlots = new ArrayBlockingQueue<>(parallelism);
final TestingSlotOwner slotOwner = new TestingSlotOwner();
slotOwner.setReturnAllocatedSlotConsumer(
(Slot slot) -> returnedSlots.offer(slot.getAllocatedSlot().getSlotAllocationId()));
(Slot slot) -> returnedSlots.offer(slot.getSlotContext().getAllocationId()));
final SimpleSlot[] sourceSlots = new SimpleSlot[parallelism];
final SimpleSlot[] targetSlots = new SimpleSlot[parallelism];
......@@ -366,7 +365,7 @@ public class ExecutionGraphSchedulingTest extends TestLogger {
final BlockingQueue<AllocationID> returnedSlots = new ArrayBlockingQueue<>(2);
final TestingSlotOwner slotOwner = new TestingSlotOwner();
slotOwner.setReturnAllocatedSlotConsumer(
(Slot slot) -> returnedSlots.offer(slot.getAllocatedSlot().getSlotAllocationId()));
(Slot slot) -> returnedSlots.offer(slot.getSlotContext().getAllocationId()));
final TaskManagerGateway taskManager = mock(TaskManagerGateway.class);
final SimpleSlot[] slots = new SimpleSlot[parallelism];
......@@ -448,8 +447,11 @@ public class ExecutionGraphSchedulingTest extends TestLogger {
TaskManagerLocation location = new TaskManagerLocation(
ResourceID.generate(), InetAddress.getLoopbackAddress(), 12345);
AllocatedSlot slot = new AllocatedSlot(
new AllocationID(), jobId, location, 0, ResourceProfile.UNKNOWN, taskManager);
SimpleSlotContext slot = new SimpleSlotContext(
new AllocationID(),
location,
0,
taskManager);
return new SimpleSlot(slot, slotOwner, 0);
}
......
......@@ -111,28 +111,28 @@ public class ExecutionGraphStopTest extends TestLogger {
// deploy source 1
for (ExecutionVertex ev : eg.getJobVertex(source1.getID()).getTaskVertices()) {
SimpleSlot slot = ExecutionGraphTestUtils.createMockSimpleSlot(jid, sourceGateway);
SimpleSlot slot = ExecutionGraphTestUtils.createMockSimpleSlot(sourceGateway);
ev.getCurrentExecutionAttempt().tryAssignResource(slot);
ev.getCurrentExecutionAttempt().deploy();
}
// deploy source 2
for (ExecutionVertex ev : eg.getJobVertex(source2.getID()).getTaskVertices()) {
SimpleSlot slot = ExecutionGraphTestUtils.createMockSimpleSlot(jid, sourceGateway);
SimpleSlot slot = ExecutionGraphTestUtils.createMockSimpleSlot(sourceGateway);
ev.getCurrentExecutionAttempt().tryAssignResource(slot);
ev.getCurrentExecutionAttempt().deploy();
}
// deploy non-source 1
for (ExecutionVertex ev : eg.getJobVertex(nonSource1.getID()).getTaskVertices()) {
SimpleSlot slot = ExecutionGraphTestUtils.createMockSimpleSlot(jid, nonSourceGateway);
SimpleSlot slot = ExecutionGraphTestUtils.createMockSimpleSlot(nonSourceGateway);
ev.getCurrentExecutionAttempt().tryAssignResource(slot);
ev.getCurrentExecutionAttempt().deploy();
}
// deploy non-source 2
for (ExecutionVertex ev : eg.getJobVertex(nonSource2.getID()).getTaskVertices()) {
SimpleSlot slot = ExecutionGraphTestUtils.createMockSimpleSlot(jid, nonSourceGateway);
SimpleSlot slot = ExecutionGraphTestUtils.createMockSimpleSlot(nonSourceGateway);
ev.getCurrentExecutionAttempt().tryAssignResource(slot);
ev.getCurrentExecutionAttempt().deploy();
}
......@@ -164,7 +164,7 @@ public class ExecutionGraphStopTest extends TestLogger {
when(gateway.stopTask(any(ExecutionAttemptID.class), any(Time.class)))
.thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
final SimpleSlot slot = ExecutionGraphTestUtils.createMockSimpleSlot(jid, gateway);
final SimpleSlot slot = ExecutionGraphTestUtils.createMockSimpleSlot(gateway);
exec.tryAssignResource(slot);
exec.deploy();
......
......@@ -29,7 +29,6 @@ import org.apache.flink.runtime.blob.VoidBlobWriter;
import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.failover.FailoverRegion;
......@@ -49,7 +48,7 @@ import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
import org.apache.flink.runtime.jobmanager.slots.SimpleSlotContext;
import org.apache.flink.runtime.jobmanager.slots.SlotOwner;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.messages.Acknowledge;
......@@ -240,19 +239,20 @@ public class ExecutionGraphTestUtils {
// Mocking Slots
// ------------------------------------------------------------------------
public static SimpleSlot createMockSimpleSlot(JobID jid, TaskManagerGateway gateway) {
public static SimpleSlot createMockSimpleSlot(TaskManagerGateway gateway) {
final TaskManagerLocation location = new TaskManagerLocation(
ResourceID.generate(), InetAddress.getLoopbackAddress(), 6572);
final AllocatedSlot allocatedSlot = new AllocatedSlot(
final SimpleSlotContext allocatedSlot = new SimpleSlotContext(
new AllocationID(),
jid,
location,
0,
ResourceProfile.UNKNOWN,
gateway);
return new SimpleSlot(allocatedSlot, mock(SlotOwner.class), 0);
return new SimpleSlot(
allocatedSlot,
mock(SlotOwner.class),
0);
}
// ------------------------------------------------------------------------
......
......@@ -81,7 +81,6 @@ public class ExecutionTest extends TestLogger {
final SingleSlotTestingSlotOwner slotOwner = new SingleSlotTestingSlotOwner();
final SimpleSlot slot = new SimpleSlot(
new JobID(),
slotOwner,
new LocalTaskManagerLocation(),
0,
......@@ -121,7 +120,6 @@ public class ExecutionTest extends TestLogger {
final SingleSlotTestingSlotOwner slotOwner = new SingleSlotTestingSlotOwner();
final SimpleSlot slot = new SimpleSlot(
new JobID(),
slotOwner,
new LocalTaskManagerLocation(),
0,
......@@ -171,7 +169,6 @@ public class ExecutionTest extends TestLogger {
final SingleSlotTestingSlotOwner slotOwner = new SingleSlotTestingSlotOwner();
final SimpleSlot slot = new SimpleSlot(
new JobID(),
slotOwner,
new LocalTaskManagerLocation(),
0,
......@@ -285,11 +282,12 @@ public class ExecutionTest extends TestLogger {
final SingleSlotTestingSlotOwner slotOwner = new SingleSlotTestingSlotOwner();
final SimpleSlot slot = new SimpleSlot(
new JobID(),
slotOwner,
new LocalTaskManagerLocation(),
0,
new SimpleAckingTaskManagerGateway());
new SimpleAckingTaskManagerGateway(),
null,
null);
final ProgrammedSlotProvider slotProvider = new ProgrammedSlotProvider(1);
slotProvider.addSlot(jobVertexId, 0, CompletableFuture.completedFuture(slot));
......
......@@ -18,36 +18,42 @@
package org.apache.flink.runtime.executiongraph;
import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.*;
import static org.junit.Assert.*;
import static org.mockito.Mockito.mock;
import java.io.IOException;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.instance.BaseTestingActorGateway;
import org.apache.flink.runtime.instance.DummyActorGateway;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.instance.SimpleSlot;
import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.instance.SimpleSlot;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint;
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.TaskMessages.SubmitTask;
import org.apache.flink.runtime.messages.TaskMessages.CancelTask;
import org.apache.flink.runtime.messages.TaskMessages.SubmitTask;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
import org.apache.flink.util.TestLogger;
import org.junit.Test;
import java.io.IOException;
import scala.concurrent.ExecutionContext;
import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getExecutionVertex;
import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getInstance;
import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.setVertexResource;
import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.setVertexState;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
@SuppressWarnings("serial")
public class ExecutionVertexCancelTest extends TestLogger {
......@@ -134,7 +140,7 @@ public class ExecutionVertexCancelTest extends TestLogger {
executionContext, 2);
Instance instance = getInstance(new ActorTaskManagerGateway(actorGateway));
SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
SimpleSlot slot = instance.allocateSimpleSlot();
vertex.deployToSlot(slot);
......@@ -202,7 +208,7 @@ public class ExecutionVertexCancelTest extends TestLogger {
2);
Instance instance = getInstance(new ActorTaskManagerGateway(actorGateway));
SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
SimpleSlot slot = instance.allocateSimpleSlot();
vertex.deployToSlot(slot);
......@@ -262,7 +268,7 @@ public class ExecutionVertexCancelTest extends TestLogger {
1);
Instance instance = getInstance(new ActorTaskManagerGateway(actorGateway));
SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
SimpleSlot slot = instance.allocateSimpleSlot();
setVertexResource(vertex, slot);
setVertexState(vertex, ExecutionState.RUNNING);
......@@ -302,7 +308,7 @@ public class ExecutionVertexCancelTest extends TestLogger {
1);
Instance instance = getInstance(new ActorTaskManagerGateway(actorGateway));
SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
SimpleSlot slot = instance.allocateSimpleSlot();
setVertexResource(vertex, slot);
setVertexState(vertex, ExecutionState.RUNNING);
......@@ -350,7 +356,7 @@ public class ExecutionVertexCancelTest extends TestLogger {
1);
Instance instance = getInstance(new ActorTaskManagerGateway(actorGateway));
SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
SimpleSlot slot = instance.allocateSimpleSlot();
setVertexResource(vertex, slot);
setVertexState(vertex, ExecutionState.RUNNING);
......@@ -383,7 +389,7 @@ public class ExecutionVertexCancelTest extends TestLogger {
final ActorGateway gateway = new CancelSequenceActorGateway(TestingUtils.directExecutionContext(), 0);
Instance instance = getInstance(new ActorTaskManagerGateway(gateway));
SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
SimpleSlot slot = instance.allocateSimpleSlot();
setVertexResource(vertex, slot);
setVertexState(vertex, ExecutionState.RUNNING);
......@@ -458,7 +464,7 @@ public class ExecutionVertexCancelTest extends TestLogger {
// the scheduler (or any caller) needs to know that the slot should be released
try {
Instance instance = getInstance(new ActorTaskManagerGateway(DummyActorGateway.INSTANCE));
SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
SimpleSlot slot = instance.allocateSimpleSlot();
vertex.deployToSlot(slot);
fail("Method should throw an exception");
......@@ -501,7 +507,7 @@ public class ExecutionVertexCancelTest extends TestLogger {
setVertexState(vertex, ExecutionState.CANCELING);
Instance instance = getInstance(new ActorTaskManagerGateway(DummyActorGateway.INSTANCE));
SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
SimpleSlot slot = instance.allocateSimpleSlot();
vertex.deployToSlot(slot);
fail("Method should throw an exception");
......@@ -517,7 +523,7 @@ public class ExecutionVertexCancelTest extends TestLogger {
AkkaUtils.getDefaultTimeout());
Instance instance = getInstance(new ActorTaskManagerGateway(DummyActorGateway.INSTANCE));
SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
SimpleSlot slot = instance.allocateSimpleSlot();
setVertexResource(vertex, slot);
setVertexState(vertex, ExecutionState.CANCELING);
......
......@@ -32,7 +32,7 @@ import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.ScheduleMode;
import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway;
import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
import org.apache.flink.runtime.instance.AllocatedSlot;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
import org.apache.flink.util.TestLogger;
......@@ -67,7 +67,7 @@ public class ExecutionVertexDeploymentTest extends TestLogger {
Instance instance = getInstance(
new ActorTaskManagerGateway(
new SimpleActorGateway(TestingUtils.directExecutionContext())));
final SimpleSlot slot = instance.allocateSimpleSlot(ejv.getJobId());
final SimpleSlot slot = instance.allocateSimpleSlot();
final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
AkkaUtils.getDefaultTimeout());
......@@ -104,7 +104,7 @@ public class ExecutionVertexDeploymentTest extends TestLogger {
final Instance instance = getInstance(
new ActorTaskManagerGateway(
new SimpleActorGateway(TestingUtils.directExecutionContext())));
final SimpleSlot slot = instance.allocateSimpleSlot(ejv.getJobId());
final SimpleSlot slot = instance.allocateSimpleSlot();
final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
AkkaUtils.getDefaultTimeout());
......@@ -146,7 +146,7 @@ public class ExecutionVertexDeploymentTest extends TestLogger {
final Instance instance = getInstance(
new ActorTaskManagerGateway(
new SimpleActorGateway(TestingUtils.defaultExecutionContext())));
final SimpleSlot slot = instance.allocateSimpleSlot(ejv.getJobId());
final SimpleSlot slot = instance.allocateSimpleSlot();
assertEquals(ExecutionState.CREATED, vertex.getExecutionState());
......@@ -191,7 +191,7 @@ public class ExecutionVertexDeploymentTest extends TestLogger {
final Instance instance = getInstance(
new ActorTaskManagerGateway(
new SimpleFailingActorGateway(TestingUtils.directExecutionContext())));
final SimpleSlot slot = instance.allocateSimpleSlot(ejv.getJobId());
final SimpleSlot slot = instance.allocateSimpleSlot();
assertEquals(ExecutionState.CREATED, vertex.getExecutionState());
......@@ -221,7 +221,7 @@ public class ExecutionVertexDeploymentTest extends TestLogger {
final Instance instance = getInstance(
new ActorTaskManagerGateway(
new SimpleFailingActorGateway(TestingUtils.directExecutionContext())));
final SimpleSlot slot = instance.allocateSimpleSlot(ejv.getJobId());
final SimpleSlot slot = instance.allocateSimpleSlot();
assertEquals(ExecutionState.CREATED, vertex.getExecutionState());
......@@ -265,7 +265,7 @@ public class ExecutionVertexDeploymentTest extends TestLogger {
final Instance instance = getInstance(
new ActorTaskManagerGateway(
new SimpleActorGateway(TestingUtils.directExecutionContext())));
final SimpleSlot slot = instance.allocateSimpleSlot(ejv.getJobId());
final SimpleSlot slot = instance.allocateSimpleSlot();
assertEquals(ExecutionState.CREATED, vertex.getExecutionState());
vertex.deployToSlot(slot);
......@@ -310,7 +310,7 @@ public class ExecutionVertexDeploymentTest extends TestLogger {
context,
2)));
final SimpleSlot slot = instance.allocateSimpleSlot(ejv.getJobId());
final SimpleSlot slot = instance.allocateSimpleSlot();
assertEquals(ExecutionState.CREATED, vertex.getExecutionState());
......@@ -372,7 +372,7 @@ public class ExecutionVertexDeploymentTest extends TestLogger {
result.getPartitions()[0].addConsumer(mockEdge, 0);
AllocatedSlot allocatedSlot = mock(AllocatedSlot.class);
when(allocatedSlot.getSlotAllocationId()).thenReturn(new AllocationID());
when(allocatedSlot.getAllocationId()).thenReturn(new AllocationID());
LogicalSlot slot = mock(LogicalSlot.class);
when(slot.getAllocationId()).thenReturn(new AllocationID());
......
......@@ -27,7 +27,6 @@ import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
import org.apache.flink.runtime.instance.SimpleSlot;
......@@ -37,7 +36,8 @@ import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
import org.apache.flink.runtime.jobmanager.slots.SlotContext;
import org.apache.flink.runtime.jobmanager.slots.SimpleSlotContext;
import org.apache.flink.runtime.jobmanager.slots.SlotOwner;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
......@@ -233,8 +233,8 @@ public class ExecutionVertexLocalityTest extends TestLogger {
// - mocking the scheduler created fragile tests that break whenever the scheduler is adjusted
// - exposing test methods in the ExecutionVertex leads to undesirable setters
AllocatedSlot slot = new AllocatedSlot(
new AllocationID(), jobId, location, 0, ResourceProfile.UNKNOWN, mock(TaskManagerGateway.class));
SlotContext slot = new SimpleSlotContext(
new AllocationID(), location, 0, mock(TaskManagerGateway.class));
SimpleSlot simpleSlot = new SimpleSlot(slot, mock(SlotOwner.class), 0);
......
......@@ -57,7 +57,7 @@ public class ExecutionVertexSchedulingTest {
// a slot than cannot be deployed to
final Instance instance = getInstance(new ActorTaskManagerGateway(DummyActorGateway.INSTANCE));
final SimpleSlot slot = instance.allocateSimpleSlot(ejv.getJobId());
final SimpleSlot slot = instance.allocateSimpleSlot();
slot.releaseInstanceSlot();
assertTrue(slot.isReleased());
......@@ -89,7 +89,7 @@ public class ExecutionVertexSchedulingTest {
// a slot than cannot be deployed to
final Instance instance = getInstance(new ActorTaskManagerGateway(DummyActorGateway.INSTANCE));
final SimpleSlot slot = instance.allocateSimpleSlot(ejv.getJobId());
final SimpleSlot slot = instance.allocateSimpleSlot();
slot.releaseInstanceSlot();
assertTrue(slot.isReleased());
......@@ -126,7 +126,7 @@ public class ExecutionVertexSchedulingTest {
final Instance instance = getInstance(new ActorTaskManagerGateway(
new ExecutionGraphTestUtils.SimpleActorGateway(TestingUtils.defaultExecutionContext())));
final SimpleSlot slot = instance.allocateSimpleSlot(ejv.getJobId());
final SimpleSlot slot = instance.allocateSimpleSlot();
Scheduler scheduler = mock(Scheduler.class);
CompletableFuture<SimpleSlot> future = new CompletableFuture<>();
......
......@@ -21,7 +21,6 @@ package org.apache.flink.runtime.executiongraph.utils;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.instance.LogicalSlot;
import org.apache.flink.runtime.instance.SimpleSlot;
......@@ -29,7 +28,8 @@ import org.apache.flink.runtime.instance.Slot;
import org.apache.flink.runtime.instance.SlotProvider;
import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
import org.apache.flink.runtime.jobmanager.slots.SlotContext;
import org.apache.flink.runtime.jobmanager.slots.SimpleSlotContext;
import org.apache.flink.runtime.jobmanager.slots.SlotOwner;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
......@@ -47,7 +47,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
*/
public class SimpleSlotProvider implements SlotProvider, SlotOwner {
private final ArrayDeque<AllocatedSlot> slots;
private final ArrayDeque<SlotContext> slots;
public SimpleSlotProvider(JobID jobId, int numSlots) {
this(jobId, numSlots, new SimpleAckingTaskManagerGateway());
......@@ -60,12 +60,10 @@ public class SimpleSlotProvider implements SlotProvider, SlotOwner {
this.slots = new ArrayDeque<>(numSlots);
for (int i = 0; i < numSlots; i++) {
AllocatedSlot as = new AllocatedSlot(
SimpleSlotContext as = new SimpleSlotContext(
new AllocationID(),
jobId,
new TaskManagerLocation(ResourceID.generate(), InetAddress.getLoopbackAddress(), 10000 + i),
0,
ResourceProfile.UNKNOWN,
taskManagerGateway);
slots.add(as);
}
......@@ -76,7 +74,7 @@ public class SimpleSlotProvider implements SlotProvider, SlotOwner {
ScheduledUnit task,
boolean allowQueued,
Collection<TaskManagerLocation> preferredLocations) {
final AllocatedSlot slot;
final SlotContext slot;
synchronized (slots) {
if (slots.isEmpty()) {
......@@ -98,7 +96,7 @@ public class SimpleSlotProvider implements SlotProvider, SlotOwner {
@Override
public CompletableFuture<Boolean> returnAllocatedSlot(Slot slot) {
synchronized (slots) {
slots.add(slot.getAllocatedSlot());
slots.add(slot.getSlotContext());
}
return CompletableFuture.completedFuture(true);
}
......
......@@ -20,7 +20,12 @@ package org.apache.flink.runtime.instance;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
import org.apache.flink.runtime.jobmanager.slots.DummySlotOwner;
import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.util.TestLogger;
import org.junit.Test;
......@@ -28,10 +33,8 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class AllocatedSlotsTest {
public class AllocatedSlotsTest extends TestLogger {
@Test
public void testOperations() throws Exception {
......@@ -39,12 +42,13 @@ public class AllocatedSlotsTest {
final AllocationID allocation1 = new AllocationID();
final SlotPoolGateway.SlotRequestID slotRequestID = new SlotPoolGateway.SlotRequestID();
final ResourceID resource1 = new ResourceID("resource1");
final Slot slot1 = createSlot(resource1, allocation1);
final TaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation();
final ResourceID resource1 = taskManagerLocation.getResourceID();
final AllocatedSlot slot1 = createSlot(allocation1, taskManagerLocation);
allocatedSlots.add(slotRequestID, slot1);
assertTrue(allocatedSlots.contains(slot1.getAllocatedSlot().getSlotAllocationId()));
assertTrue(allocatedSlots.contains(slot1.getAllocationId()));
assertTrue(allocatedSlots.containResource(resource1));
assertEquals(slot1, allocatedSlots.get(allocation1));
......@@ -53,12 +57,12 @@ public class AllocatedSlotsTest {
final AllocationID allocation2 = new AllocationID();
final SlotPoolGateway.SlotRequestID slotRequestID2 = new SlotPoolGateway.SlotRequestID();
final Slot slot2 = createSlot(resource1, allocation2);
final AllocatedSlot slot2 = createSlot(allocation2, taskManagerLocation);
allocatedSlots.add(slotRequestID2, slot2);
assertTrue(allocatedSlots.contains(slot1.getAllocatedSlot().getSlotAllocationId()));
assertTrue(allocatedSlots.contains(slot2.getAllocatedSlot().getSlotAllocationId()));
assertTrue(allocatedSlots.contains(slot1.getAllocationId()));
assertTrue(allocatedSlots.contains(slot2.getAllocationId()));
assertTrue(allocatedSlots.containResource(resource1));
assertEquals(slot1, allocatedSlots.get(allocation1));
......@@ -68,14 +72,15 @@ public class AllocatedSlotsTest {
final AllocationID allocation3 = new AllocationID();
final SlotPoolGateway.SlotRequestID slotRequestID3 = new SlotPoolGateway.SlotRequestID();
final ResourceID resource2 = new ResourceID("resource2");
final Slot slot3 = createSlot(resource2, allocation3);
final TaskManagerLocation taskManagerLocation2 = new LocalTaskManagerLocation();
final ResourceID resource2 = taskManagerLocation2.getResourceID();
final AllocatedSlot slot3 = createSlot(allocation3, taskManagerLocation2);
allocatedSlots.add(slotRequestID3, slot3);
assertTrue(allocatedSlots.contains(slot1.getAllocatedSlot().getSlotAllocationId()));
assertTrue(allocatedSlots.contains(slot2.getAllocatedSlot().getSlotAllocationId()));
assertTrue(allocatedSlots.contains(slot3.getAllocatedSlot().getSlotAllocationId()));
assertTrue(allocatedSlots.contains(slot1.getAllocationId()));
assertTrue(allocatedSlots.contains(slot2.getAllocationId()));
assertTrue(allocatedSlots.contains(slot3.getAllocationId()));
assertTrue(allocatedSlots.containResource(resource1));
assertTrue(allocatedSlots.containResource(resource2));
......@@ -86,11 +91,11 @@ public class AllocatedSlotsTest {
assertEquals(1, allocatedSlots.getSlotsForTaskManager(resource2).size());
assertEquals(3, allocatedSlots.size());
allocatedSlots.remove(slot2);
allocatedSlots.remove(slot2.getAllocationId());
assertTrue(allocatedSlots.contains(slot1.getAllocatedSlot().getSlotAllocationId()));
assertFalse(allocatedSlots.contains(slot2.getAllocatedSlot().getSlotAllocationId()));
assertTrue(allocatedSlots.contains(slot3.getAllocatedSlot().getSlotAllocationId()));
assertTrue(allocatedSlots.contains(slot1.getAllocationId()));
assertFalse(allocatedSlots.contains(slot2.getAllocationId()));
assertTrue(allocatedSlots.contains(slot3.getAllocationId()));
assertTrue(allocatedSlots.containResource(resource1));
assertTrue(allocatedSlots.containResource(resource2));
......@@ -101,11 +106,11 @@ public class AllocatedSlotsTest {
assertEquals(1, allocatedSlots.getSlotsForTaskManager(resource2).size());
assertEquals(2, allocatedSlots.size());
allocatedSlots.remove(slot1);
allocatedSlots.remove(slot1.getAllocationId());
assertFalse(allocatedSlots.contains(slot1.getAllocatedSlot().getSlotAllocationId()));
assertFalse(allocatedSlots.contains(slot2.getAllocatedSlot().getSlotAllocationId()));
assertTrue(allocatedSlots.contains(slot3.getAllocatedSlot().getSlotAllocationId()));
assertFalse(allocatedSlots.contains(slot1.getAllocationId()));
assertFalse(allocatedSlots.contains(slot2.getAllocationId()));
assertTrue(allocatedSlots.contains(slot3.getAllocationId()));
assertFalse(allocatedSlots.containResource(resource1));
assertTrue(allocatedSlots.containResource(resource2));
......@@ -116,11 +121,11 @@ public class AllocatedSlotsTest {
assertEquals(1, allocatedSlots.getSlotsForTaskManager(resource2).size());
assertEquals(1, allocatedSlots.size());
allocatedSlots.remove(slot3);
allocatedSlots.remove(slot3.getAllocationId());
assertFalse(allocatedSlots.contains(slot1.getAllocatedSlot().getSlotAllocationId()));
assertFalse(allocatedSlots.contains(slot2.getAllocatedSlot().getSlotAllocationId()));
assertFalse(allocatedSlots.contains(slot3.getAllocatedSlot().getSlotAllocationId()));
assertFalse(allocatedSlots.contains(slot1.getAllocationId()));
assertFalse(allocatedSlots.contains(slot2.getAllocationId()));
assertFalse(allocatedSlots.contains(slot3.getAllocationId()));
assertFalse(allocatedSlots.containResource(resource1));
assertFalse(allocatedSlots.containResource(resource2));
......@@ -132,13 +137,13 @@ public class AllocatedSlotsTest {
assertEquals(0, allocatedSlots.size());
}
private Slot createSlot(final ResourceID resourceId, final AllocationID allocationId) {
AllocatedSlot mockAllocatedSlot = mock(AllocatedSlot.class);
Slot slot = mock(Slot.class);
when(slot.getTaskManagerID()).thenReturn(resourceId);
when(slot.getAllocatedSlot()).thenReturn(mockAllocatedSlot);
when(mockAllocatedSlot.getSlotAllocationId()).thenReturn(allocationId);
return slot;
private AllocatedSlot createSlot(final AllocationID allocationId, final TaskManagerLocation taskManagerLocation) {
return new AllocatedSlot(
allocationId,
taskManagerLocation,
0,
ResourceProfile.UNKNOWN,
new SimpleAckingTaskManagerGateway(),
new DummySlotOwner());
}
}
......@@ -18,14 +18,15 @@
package org.apache.flink.runtime.instance;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
import org.apache.flink.runtime.jobmanager.slots.DummySlotOwner;
import org.apache.flink.runtime.jobmanager.slots.SlotAndLocality;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.util.TestLogger;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
......@@ -35,7 +36,7 @@ import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class AvailableSlotsTest {
public class AvailableSlotsTest extends TestLogger {
static final ResourceProfile DEFAULT_TESTING_PROFILE = new ResourceProfile(1.0, 512);
......@@ -57,27 +58,27 @@ public class AvailableSlotsTest {
availableSlots.add(slot3, 3L);
assertEquals(3, availableSlots.size());
assertTrue(availableSlots.contains(slot1.getSlotAllocationId()));
assertTrue(availableSlots.contains(slot2.getSlotAllocationId()));
assertTrue(availableSlots.contains(slot3.getSlotAllocationId()));
assertTrue(availableSlots.contains(slot1.getAllocationId()));
assertTrue(availableSlots.contains(slot2.getAllocationId()));
assertTrue(availableSlots.contains(slot3.getAllocationId()));
assertTrue(availableSlots.containsTaskManager(resource1));
assertTrue(availableSlots.containsTaskManager(resource2));
availableSlots.removeAllForTaskManager(resource1);
assertEquals(1, availableSlots.size());
assertFalse(availableSlots.contains(slot1.getSlotAllocationId()));
assertFalse(availableSlots.contains(slot2.getSlotAllocationId()));
assertTrue(availableSlots.contains(slot3.getSlotAllocationId()));
assertFalse(availableSlots.contains(slot1.getAllocationId()));
assertFalse(availableSlots.contains(slot2.getAllocationId()));
assertTrue(availableSlots.contains(slot3.getAllocationId()));
assertFalse(availableSlots.containsTaskManager(resource1));
assertTrue(availableSlots.containsTaskManager(resource2));
availableSlots.removeAllForTaskManager(resource2);
assertEquals(0, availableSlots.size());
assertFalse(availableSlots.contains(slot1.getSlotAllocationId()));
assertFalse(availableSlots.contains(slot2.getSlotAllocationId()));
assertFalse(availableSlots.contains(slot3.getSlotAllocationId()));
assertFalse(availableSlots.contains(slot1.getAllocationId()));
assertFalse(availableSlots.contains(slot2.getAllocationId()));
assertFalse(availableSlots.contains(slot3.getAllocationId()));
assertFalse(availableSlots.containsTaskManager(resource1));
assertFalse(availableSlots.containsTaskManager(resource2));
}
......@@ -92,7 +93,7 @@ public class AvailableSlotsTest {
availableSlots.add(slot1, 1L);
assertEquals(1, availableSlots.size());
assertTrue(availableSlots.contains(slot1.getSlotAllocationId()));
assertTrue(availableSlots.contains(slot1.getAllocationId()));
assertTrue(availableSlots.containsTaskManager(resource1));
assertNull(availableSlots.poll(DEFAULT_TESTING_BIG_PROFILE, null));
......@@ -100,7 +101,7 @@ public class AvailableSlotsTest {
SlotAndLocality slotAndLocality = availableSlots.poll(DEFAULT_TESTING_PROFILE, null);
assertEquals(slot1, slotAndLocality.slot());
assertEquals(0, availableSlots.size());
assertFalse(availableSlots.contains(slot1.getSlotAllocationId()));
assertFalse(availableSlots.contains(slot1.getAllocationId()));
assertFalse(availableSlots.containsTaskManager(resource1));
}
......@@ -112,10 +113,10 @@ public class AvailableSlotsTest {
return new AllocatedSlot(
new AllocationID(),
new JobID(),
mockTaskManagerLocation,
0,
DEFAULT_TESTING_PROFILE,
mockTaskManagerGateway);
mockTaskManagerGateway,
new DummySlotOwner());
}
}
......@@ -18,17 +18,22 @@
package org.apache.flink.runtime.instance;
import static org.junit.Assert.*;
import java.lang.reflect.Method;
import java.net.InetAddress;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.junit.Test;
import java.lang.reflect.Method;
import java.net.InetAddress;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
/**
* Tests for the {@link Instance} class.
*/
......@@ -53,10 +58,10 @@ public class InstanceTest {
assertEquals(4, instance.getNumberOfAvailableSlots());
assertEquals(0, instance.getNumberOfAllocatedSlots());
SimpleSlot slot1 = instance.allocateSimpleSlot(new JobID());
SimpleSlot slot2 = instance.allocateSimpleSlot(new JobID());
SimpleSlot slot3 = instance.allocateSimpleSlot(new JobID());
SimpleSlot slot4 = instance.allocateSimpleSlot(new JobID());
SimpleSlot slot1 = instance.allocateSimpleSlot();
SimpleSlot slot2 = instance.allocateSimpleSlot();
SimpleSlot slot3 = instance.allocateSimpleSlot();
SimpleSlot slot4 = instance.allocateSimpleSlot();
assertNotNull(slot1);
assertNotNull(slot2);
......@@ -69,7 +74,7 @@ public class InstanceTest {
slot3.getSlotNumber() + slot4.getSlotNumber());
// no more slots
assertNull(instance.allocateSimpleSlot(new JobID()));
assertNull(instance.allocateSimpleSlot());
try {
instance.returnAllocatedSlot(slot2);
fail("instance accepted a non-cancelled slot.");
......@@ -118,9 +123,9 @@ public class InstanceTest {
assertEquals(3, instance.getNumberOfAvailableSlots());
SimpleSlot slot1 = instance.allocateSimpleSlot(new JobID());
SimpleSlot slot2 = instance.allocateSimpleSlot(new JobID());
SimpleSlot slot3 = instance.allocateSimpleSlot(new JobID());
SimpleSlot slot1 = instance.allocateSimpleSlot();
SimpleSlot slot2 = instance.allocateSimpleSlot();
SimpleSlot slot3 = instance.allocateSimpleSlot();
instance.markDead();
......@@ -154,9 +159,9 @@ public class InstanceTest {
assertEquals(3, instance.getNumberOfAvailableSlots());
SimpleSlot slot1 = instance.allocateSimpleSlot(new JobID());
SimpleSlot slot2 = instance.allocateSimpleSlot(new JobID());
SimpleSlot slot3 = instance.allocateSimpleSlot(new JobID());
SimpleSlot slot1 = instance.allocateSimpleSlot();
SimpleSlot slot2 = instance.allocateSimpleSlot();
SimpleSlot slot3 = instance.allocateSimpleSlot();
instance.cancelAndReleaseAllSlots();
......
......@@ -18,7 +18,6 @@
package org.apache.flink.runtime.instance;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
......@@ -34,7 +33,12 @@ import org.junit.Test;
import java.util.Collections;
import static org.junit.Assert.*;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
/**
* Tests for the allocation, properties, and release of shared slots.
......@@ -46,7 +50,6 @@ public class SharedSlotsTest extends TestLogger {
@Test
public void allocateAndReleaseEmptySlot() {
try {
JobID jobId = new JobID();
JobVertexID vertexId = new JobVertexID();
SlotSharingGroup sharingGroup = new SlotSharingGroup(vertexId);
......@@ -62,7 +65,7 @@ public class SharedSlotsTest extends TestLogger {
assertEquals(2, instance.getNumberOfAvailableSlots());
// allocate a shared slot
SharedSlot slot = instance.allocateSharedSlot(jobId, assignment);
SharedSlot slot = instance.allocateSharedSlot(assignment);
assertEquals(2, instance.getTotalNumberOfSlots());
assertEquals(1, instance.getNumberOfAllocatedSlots());
assertEquals(1, instance.getNumberOfAvailableSlots());
......@@ -110,7 +113,6 @@ public class SharedSlotsTest extends TestLogger {
@Test
public void allocateSimpleSlotsAndReleaseFromRoot() {
try {
JobID jobId = new JobID();
JobVertexID vid1 = new JobVertexID();
JobVertexID vid2 = new JobVertexID();
JobVertexID vid3 = new JobVertexID();
......@@ -122,7 +124,7 @@ public class SharedSlotsTest extends TestLogger {
Instance instance = SchedulerTestUtils.getRandomInstance(1);
// allocate a shared slot
SharedSlot sharedSlot = instance.allocateSharedSlot(jobId, assignment);
SharedSlot sharedSlot = instance.allocateSharedSlot(assignment);
// allocate a series of sub slots
......@@ -134,7 +136,6 @@ public class SharedSlotsTest extends TestLogger {
assertEquals(1, sub1.getNumberLeaves());
assertEquals(vid1, sub1.getGroupID());
assertEquals(instance.getTaskManagerID(), sub1.getTaskManagerID());
assertEquals(jobId, sub1.getJobID());
assertEquals(sharedSlot, sub1.getParent());
assertEquals(sharedSlot, sub1.getRoot());
assertEquals(0, sub1.getRootSlotNumber());
......@@ -153,7 +154,6 @@ public class SharedSlotsTest extends TestLogger {
assertEquals(1, sub2.getNumberLeaves());
assertEquals(vid2, sub2.getGroupID());
assertEquals(instance.getTaskManagerID(), sub2.getTaskManagerID());
assertEquals(jobId, sub2.getJobID());
assertEquals(sharedSlot, sub2.getParent());
assertEquals(sharedSlot, sub2.getRoot());
assertEquals(0, sub2.getRootSlotNumber());
......@@ -172,7 +172,6 @@ public class SharedSlotsTest extends TestLogger {
assertEquals(1, sub3.getNumberLeaves());
assertEquals(vid3, sub3.getGroupID());
assertEquals(instance.getTaskManagerID(), sub3.getTaskManagerID());
assertEquals(jobId, sub3.getJobID());
assertEquals(sharedSlot, sub3.getParent());
assertEquals(sharedSlot, sub3.getRoot());
assertEquals(0, sub3.getRootSlotNumber());
......@@ -192,7 +191,6 @@ public class SharedSlotsTest extends TestLogger {
assertEquals(1, sub4.getNumberLeaves());
assertEquals(vid4, sub4.getGroupID());
assertEquals(instance.getTaskManagerID(), sub4.getTaskManagerID());
assertEquals(jobId, sub4.getJobID());
assertEquals(sharedSlot, sub4.getParent());
assertEquals(sharedSlot, sub4.getRoot());
assertEquals(0, sub4.getRootSlotNumber());
......@@ -235,7 +233,6 @@ public class SharedSlotsTest extends TestLogger {
@Test
public void allocateSimpleSlotsAndReleaseFromLeaves() {
try {
JobID jobId = new JobID();
JobVertexID vid1 = new JobVertexID();
JobVertexID vid2 = new JobVertexID();
JobVertexID vid3 = new JobVertexID();
......@@ -246,7 +243,7 @@ public class SharedSlotsTest extends TestLogger {
Instance instance = SchedulerTestUtils.getRandomInstance(1);
// allocate a shared slot
SharedSlot sharedSlot = instance.allocateSharedSlot(jobId, assignment);
SharedSlot sharedSlot = instance.allocateSharedSlot(assignment);
// allocate a series of sub slots
......@@ -320,7 +317,6 @@ public class SharedSlotsTest extends TestLogger {
@Test
public void allocateAndReleaseInMixedOrder() {
try {
JobID jobId = new JobID();
JobVertexID vid1 = new JobVertexID();
JobVertexID vid2 = new JobVertexID();
JobVertexID vid3 = new JobVertexID();
......@@ -331,7 +327,7 @@ public class SharedSlotsTest extends TestLogger {
Instance instance = SchedulerTestUtils.getRandomInstance(1);
// allocate a shared slot
SharedSlot sharedSlot = instance.allocateSharedSlot(jobId, assignment);
SharedSlot sharedSlot = instance.allocateSharedSlot(assignment);
// allocate a series of sub slots
......@@ -427,7 +423,7 @@ public class SharedSlotsTest extends TestLogger {
Instance instance = SchedulerTestUtils.getRandomInstance(1);
// allocate a shared slot
SharedSlot sharedSlot = instance.allocateSharedSlot(new JobID(), assignment);
SharedSlot sharedSlot = instance.allocateSharedSlot(assignment);
// get the first simple slot
SimpleSlot sourceSlot = assignment.addSharedSlotAndAllocateSubSlot(sharedSlot, Locality.LOCAL, sourceId);
......@@ -563,7 +559,7 @@ public class SharedSlotsTest extends TestLogger {
Instance instance = SchedulerTestUtils.getRandomInstance(1);
// allocate a shared slot
SharedSlot sharedSlot = instance.allocateSharedSlot(new JobID(), assignment);
SharedSlot sharedSlot = instance.allocateSharedSlot(assignment);
// get the first simple slot
SimpleSlot sourceSlot = assignment.addSharedSlotAndAllocateSubSlot(sharedSlot, Locality.LOCAL, sourceId);
......@@ -607,7 +603,6 @@ public class SharedSlotsTest extends TestLogger {
@Test
public void testImmediateReleaseOneLevel() {
try {
JobID jobId = new JobID();
JobVertexID vid = new JobVertexID();
SlotSharingGroup sharingGroup = new SlotSharingGroup(vid);
......@@ -615,7 +610,7 @@ public class SharedSlotsTest extends TestLogger {
Instance instance = SchedulerTestUtils.getRandomInstance(1);
SharedSlot sharedSlot = instance.allocateSharedSlot(jobId, assignment);
SharedSlot sharedSlot = instance.allocateSharedSlot(assignment);
SimpleSlot sub = assignment.addSharedSlotAndAllocateSubSlot(sharedSlot, Locality.UNCONSTRAINED, vid);
sub.releaseInstanceSlot();
......@@ -635,7 +630,6 @@ public class SharedSlotsTest extends TestLogger {
@Test
public void testImmediateReleaseTwoLevel() {
try {
JobID jobId = new JobID();
JobVertexID vid = new JobVertexID();
JobVertex vertex = new JobVertex("vertex", vid);
......@@ -647,7 +641,7 @@ public class SharedSlotsTest extends TestLogger {
Instance instance = SchedulerTestUtils.getRandomInstance(1);
SharedSlot sharedSlot = instance.allocateSharedSlot(jobId, assignment);
SharedSlot sharedSlot = instance.allocateSharedSlot(assignment);
SimpleSlot sub = assignment.addSharedSlotAndAllocateSubSlot(sharedSlot, Locality.UNCONSTRAINED, constraint);
......
......@@ -18,7 +18,6 @@
package org.apache.flink.runtime.instance;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
......@@ -34,7 +33,7 @@ import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class SimpleSlotTest extends TestLogger {
public class SimpleSlotTest extends TestLogger {
@Test
public void testStateTransitions() {
......@@ -137,6 +136,6 @@ public class SimpleSlotTest extends TestLogger {
hardwareDescription,
1);
return instance.allocateSimpleSlot(new JobID());
return instance.allocateSimpleSlot();
}
}
......@@ -23,11 +23,11 @@ import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
import org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestUtils;
import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.jobmaster.JobMasterId;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
......@@ -36,6 +36,9 @@ import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGate
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.util.clock.Clock;
import org.apache.flink.runtime.util.clock.SystemClock;
......@@ -158,7 +161,7 @@ public class SlotPoolRpcTest extends TestLogger {
assertEquals(1L, (long) pool.getNumberOfWaitingForResourceRequests().get());
slotPoolGateway.cancelSlotAllocation(requestId).get();
slotPoolGateway.cancelSlotRequest(requestId).get();
assertEquals(0L, (long) pool.getNumberOfWaitingForResourceRequests().get());
} finally {
......@@ -202,7 +205,7 @@ public class SlotPoolRpcTest extends TestLogger {
assertEquals(1L, (long) pool.getNumberOfPendingRequests().get());
slotPoolGateway.cancelSlotAllocation(requestId).get();
slotPoolGateway.cancelSlotRequest(requestId).get();
assertEquals(0L, (long) pool.getNumberOfPendingRequests().get());
} finally {
RpcUtils.terminateRpcEndpoint(pool, timeout);
......@@ -252,17 +255,22 @@ public class SlotPoolRpcTest extends TestLogger {
}
AllocationID allocationId = allocationIdFuture.get();
ResourceID resourceID = ResourceID.generate();
AllocatedSlot allocatedSlot = SlotPoolTest.createAllocatedSlot(resourceID, allocationId, jid, DEFAULT_TESTING_PROFILE);
slotPoolGateway.registerTaskManager(resourceID).get();
final SlotOffer slotOffer = new SlotOffer(
allocationId,
0,
DEFAULT_TESTING_PROFILE);
final TaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation();
final TaskManagerGateway taskManagerGateway = new SimpleAckingTaskManagerGateway();
assertTrue(slotPoolGateway.offerSlot(allocatedSlot).get());
slotPoolGateway.registerTaskManager(taskManagerLocation.getResourceID()).get();
assertTrue(slotPoolGateway.offerSlot(taskManagerLocation, taskManagerGateway, slotOffer).get());
assertEquals(0L, (long) pool.getNumberOfPendingRequests().get());
assertTrue(pool.containsAllocatedSlot(allocationId).get());
pool.cancelSlotAllocation(requestId).get();
pool.cancelSlotRequest(requestId).get();
assertFalse(pool.containsAllocatedSlot(allocationId).get());
assertTrue(pool.containsAvailableSlot(allocationId).get());
......@@ -351,14 +359,14 @@ public class SlotPoolRpcTest extends TestLogger {
}
@Override
public CompletableFuture<Acknowledge> cancelSlotAllocation(SlotRequestID slotRequestId) {
public CompletableFuture<Acknowledge> cancelSlotRequest(SlotRequestID slotRequestId) {
final Consumer<SlotRequestID> currentCancelSlotAllocationConsumer = cancelSlotAllocationConsumer;
if (currentCancelSlotAllocationConsumer != null) {
currentCancelSlotAllocationConsumer.accept(slotRequestId);
}
return super.cancelSlotAllocation(slotRequestId);
return super.cancelSlotRequest(slotRequestId);
}
CompletableFuture<Boolean> containsAllocatedSlot(AllocationID allocationId) {
......
......@@ -21,11 +21,11 @@ package org.apache.flink.runtime.instance;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
import org.apache.flink.runtime.jobmanager.slots.SlotContext;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.jobmaster.JobMasterId;
import org.apache.flink.runtime.messages.Acknowledge;
......@@ -35,6 +35,8 @@ import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGate
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.TestLogger;
......@@ -74,10 +76,17 @@ public class SlotPoolTest extends TestLogger {
private JobID jobId;
private TaskManagerLocation taskManagerLocation;
private TaskManagerGateway taskManagerGateway;
@Before
public void setUp() throws Exception {
this.rpcService = new TestingRpcService();
this.jobId = new JobID();
taskManagerLocation = new LocalTaskManagerLocation();
taskManagerGateway = new SimpleAckingTaskManagerGateway();
}
@After
......@@ -92,8 +101,7 @@ public class SlotPoolTest extends TestLogger {
try {
SlotPoolGateway slotPoolGateway = setupSlotPool(slotPool, resourceManagerGateway);
ResourceID resourceID = new ResourceID("resource");
slotPoolGateway.registerTaskManager(resourceID);
slotPoolGateway.registerTaskManager(taskManagerLocation.getResourceID());
SlotPoolGateway.SlotRequestID requestId = new SlotPoolGateway.SlotRequestID();
CompletableFuture<LogicalSlot> future = slotPoolGateway.allocateSlot(requestId, mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout);
......@@ -104,14 +112,17 @@ public class SlotPoolTest extends TestLogger {
final SlotRequest slotRequest = slotRequestArgumentCaptor.getValue();
AllocatedSlot allocatedSlot = createAllocatedSlot(resourceID, slotRequest.getAllocationId(), jobId, DEFAULT_TESTING_PROFILE);
assertTrue(slotPoolGateway.offerSlot(allocatedSlot).get());
final SlotOffer slotOffer = new SlotOffer(
slotRequest.getAllocationId(),
0,
DEFAULT_TESTING_PROFILE);
assertTrue(slotPoolGateway.offerSlot(taskManagerLocation, taskManagerGateway, slotOffer).get());
LogicalSlot slot = future.get(1, TimeUnit.SECONDS);
assertTrue(future.isDone());
assertTrue(slot.isAlive());
assertEquals(resourceID, slot.getTaskManagerLocation().getResourceID());
assertEquals(slotPool.getAllocatedSlots().get(slot.getAllocationId()), slot);
assertEquals(taskManagerLocation, slot.getTaskManagerLocation());
} finally {
slotPool.shutDown();
}
......@@ -124,8 +135,7 @@ public class SlotPoolTest extends TestLogger {
try {
SlotPoolGateway slotPoolGateway = setupSlotPool(slotPool, resourceManagerGateway);
ResourceID resourceID = new ResourceID("resource");
slotPool.registerTaskManager(resourceID);
slotPool.registerTaskManager(taskManagerLocation.getResourceID());
CompletableFuture<LogicalSlot> future1 = slotPoolGateway.allocateSlot(new SlotPoolGateway.SlotRequestID(), mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout);
CompletableFuture<LogicalSlot> future2 = slotPoolGateway.allocateSlot(new SlotPoolGateway.SlotRequestID(), mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout);
......@@ -139,8 +149,12 @@ public class SlotPoolTest extends TestLogger {
final List<SlotRequest> slotRequests = slotRequestArgumentCaptor.getAllValues();
AllocatedSlot allocatedSlot = createAllocatedSlot(resourceID, slotRequests.get(0).getAllocationId(), jobId, DEFAULT_TESTING_PROFILE);
assertTrue(slotPoolGateway.offerSlot(allocatedSlot).get());
final SlotOffer slotOffer = new SlotOffer(
slotRequests.get(0).getAllocationId(),
0,
DEFAULT_TESTING_PROFILE);
assertTrue(slotPoolGateway.offerSlot(taskManagerLocation, taskManagerGateway, slotOffer).get());
LogicalSlot slot1 = future1.get(1, TimeUnit.SECONDS);
assertTrue(future1.isDone());
......@@ -158,7 +172,7 @@ public class SlotPoolTest extends TestLogger {
assertTrue(slot2.isAlive());
assertEquals(slot1.getTaskManagerLocation(), slot2.getTaskManagerLocation());
assertEquals(slot1.getPhysicalSlotNumber(), slot2.getPhysicalSlotNumber());
assertEquals(slotPool.getAllocatedSlots().get(slot1.getAllocationId()), slot2);
assertEquals(slot1.getAllocationId(), slot2.getAllocationId());
} finally {
slotPool.shutDown();
}
......@@ -171,8 +185,7 @@ public class SlotPoolTest extends TestLogger {
try {
SlotPoolGateway slotPoolGateway = setupSlotPool(slotPool, resourceManagerGateway);
ResourceID resourceID = new ResourceID("resource");
slotPoolGateway.registerTaskManager(resourceID);
slotPoolGateway.registerTaskManager(taskManagerLocation.getResourceID());
CompletableFuture<LogicalSlot> future1 = slotPoolGateway.allocateSlot(new SlotPoolGateway.SlotRequestID(), mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout);
assertFalse(future1.isDone());
......@@ -182,8 +195,12 @@ public class SlotPoolTest extends TestLogger {
final SlotRequest slotRequest = slotRequestArgumentCaptor.getValue();
AllocatedSlot allocatedSlot = createAllocatedSlot(resourceID, slotRequest.getAllocationId(), jobId, DEFAULT_TESTING_PROFILE);
assertTrue(slotPoolGateway.offerSlot(allocatedSlot).get());
final SlotOffer slotOffer = new SlotOffer(
slotRequest.getAllocationId(),
0,
DEFAULT_TESTING_PROFILE);
assertTrue(slotPoolGateway.offerSlot(taskManagerLocation, taskManagerGateway, slotOffer).get());
LogicalSlot slot1 = future1.get(1, TimeUnit.SECONDS);
assertTrue(future1.isDone());
......@@ -214,8 +231,7 @@ public class SlotPoolTest extends TestLogger {
try {
SlotPoolGateway slotPoolGateway = setupSlotPool(slotPool, resourceManagerGateway);
ResourceID resourceID = new ResourceID("resource");
slotPoolGateway.registerTaskManager(resourceID);
slotPoolGateway.registerTaskManager(taskManagerLocation.getResourceID());
CompletableFuture<LogicalSlot> future = slotPoolGateway.allocateSlot(new SlotPoolGateway.SlotRequestID(), mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout);
assertFalse(future.isDone());
......@@ -225,29 +241,36 @@ public class SlotPoolTest extends TestLogger {
final SlotRequest slotRequest = slotRequestArgumentCaptor.getValue();
final SlotOffer slotOffer = new SlotOffer(
slotRequest.getAllocationId(),
0,
DEFAULT_TESTING_PROFILE);
final TaskManagerLocation invalidTaskManagerLocation = new LocalTaskManagerLocation();
// slot from unregistered resource
AllocatedSlot invalid = createAllocatedSlot(new ResourceID("unregistered"), slotRequest.getAllocationId(), jobId, DEFAULT_TESTING_PROFILE);
assertFalse(slotPoolGateway.offerSlot(invalid).get());
assertFalse(slotPoolGateway.offerSlot(invalidTaskManagerLocation, taskManagerGateway, slotOffer).get());
AllocatedSlot notRequested = createAllocatedSlot(resourceID, new AllocationID(), jobId, DEFAULT_TESTING_PROFILE);
final SlotOffer nonRequestedSlotOffer = new SlotOffer(
new AllocationID(),
0,
DEFAULT_TESTING_PROFILE);
// we'll also accept non requested slots
assertTrue(slotPoolGateway.offerSlot(notRequested).get());
AllocatedSlot allocatedSlot = createAllocatedSlot(resourceID, slotRequest.getAllocationId(), jobId, DEFAULT_TESTING_PROFILE);
assertTrue(slotPoolGateway.offerSlot(taskManagerLocation, taskManagerGateway, nonRequestedSlotOffer).get());
// accepted slot
assertTrue(slotPoolGateway.offerSlot(allocatedSlot).get());
assertTrue(slotPoolGateway.offerSlot(taskManagerLocation, taskManagerGateway, slotOffer).get());
LogicalSlot slot = future.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
assertTrue(slot.isAlive());
// duplicated offer with using slot
assertTrue(slotPoolGateway.offerSlot(allocatedSlot).get());
assertTrue(slotPoolGateway.offerSlot(taskManagerLocation, taskManagerGateway, slotOffer).get());
assertTrue(slot.isAlive());
// duplicated offer with free slot
slot.releaseSlot();
assertTrue(slotPoolGateway.offerSlot(allocatedSlot).get());
assertTrue(slotPoolGateway.offerSlot(taskManagerLocation, taskManagerGateway, slotOffer).get());
} finally {
slotPool.shutDown();
}
......@@ -261,8 +284,8 @@ public class SlotPoolTest extends TestLogger {
final SlotPool slotPool = new SlotPool(rpcService, jobId) {
@Override
public void returnAllocatedSlot(Slot slot) {
super.returnAllocatedSlot(slot);
public void returnAllocatedSlot(SlotContext allocatedSlot) {
super.returnAllocatedSlot(allocatedSlot);
slotReturnFuture.complete(true);
}
......@@ -270,8 +293,7 @@ public class SlotPoolTest extends TestLogger {
try {
SlotPoolGateway slotPoolGateway = setupSlotPool(slotPool, resourceManagerGateway);
ResourceID resourceID = new ResourceID("resource");
slotPoolGateway.registerTaskManager(resourceID);
slotPoolGateway.registerTaskManager(taskManagerLocation.getResourceID());
CompletableFuture<LogicalSlot> future1 = slotPoolGateway.allocateSlot(new SlotPoolGateway.SlotRequestID(), mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout);
......@@ -282,14 +304,18 @@ public class SlotPoolTest extends TestLogger {
CompletableFuture<LogicalSlot> future2 = slotPoolGateway.allocateSlot(new SlotPoolGateway.SlotRequestID(), mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout);
AllocatedSlot allocatedSlot = createAllocatedSlot(resourceID, slotRequest.getAllocationId(), jobId, DEFAULT_TESTING_PROFILE);
assertTrue(slotPoolGateway.offerSlot(allocatedSlot).get());
final SlotOffer slotOffer = new SlotOffer(
slotRequest.getAllocationId(),
0,
DEFAULT_TESTING_PROFILE);
assertTrue(slotPoolGateway.offerSlot(taskManagerLocation, taskManagerGateway, slotOffer).get());
LogicalSlot slot1 = future1.get(1, TimeUnit.SECONDS);
assertTrue(future1.isDone());
assertFalse(future2.isDone());
slotPoolGateway.releaseTaskManager(resourceID);
slotPoolGateway.releaseTaskManager(taskManagerLocation.getResourceID());
// wait until the slot has been returned
slotReturnFuture.get();
......@@ -378,24 +404,4 @@ public class SlotPoolTest extends TestLogger {
return slotPool.getSelfGateway(SlotPoolGateway.class);
}
static AllocatedSlot createAllocatedSlot(
final ResourceID resourceId,
final AllocationID allocationId,
final JobID jobId,
final ResourceProfile resourceProfile) {
TaskManagerLocation mockTaskManagerLocation = mock(TaskManagerLocation.class);
when(mockTaskManagerLocation.getResourceID()).thenReturn(resourceId);
TaskManagerGateway mockTaskManagerGateway = mock(TaskManagerGateway.class);
return new AllocatedSlot(
allocationId,
jobId,
mockTaskManagerLocation,
0,
resourceProfile,
mockTaskManagerGateway);
}
}
......@@ -18,7 +18,6 @@
package org.apache.flink.runtime.instance;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.scheduler.Locality;
......@@ -49,14 +48,12 @@ public class SlotSharingGroupAssignmentTest extends TestLogger {
final int numberSlots = 2;
final JobVertexID sourceId = new JobVertexID();
final JobVertexID sinkId = new JobVertexID();
final JobID jobId = new JobID();
for (int i = 0; i < numberTaskManagers; i++) {
final TaskManagerLocation taskManagerLocation = new TaskManagerLocation(ResourceID.generate(), InetAddress.getLocalHost(), i + 1000);
for (int j = 0; j < numberSlots; j++) {
final SharedSlot slot = new SharedSlot(
jobId,
mock(SlotOwner.class),
taskManagerLocation,
j,
......
......@@ -90,10 +90,10 @@ public class CoLocationConstraintTest {
Instance instance1 = SchedulerTestUtils.getRandomInstance(2);
Instance instance2 = SchedulerTestUtils.getRandomInstance(2);
SharedSlot slot1_1 = instance1.allocateSharedSlot(jid, assignment);
SharedSlot slot1_2 = instance1.allocateSharedSlot(jid, assignment);
SharedSlot slot2_1 = instance2.allocateSharedSlot(jid, assignment);
SharedSlot slot2_2 = instance2.allocateSharedSlot(jid, assignment);
SharedSlot slot1_1 = instance1.allocateSharedSlot(assignment);
SharedSlot slot1_2 = instance1.allocateSharedSlot(assignment);
SharedSlot slot2_1 = instance2.allocateSharedSlot(assignment);
SharedSlot slot2_2 = instance2.allocateSharedSlot(assignment);
// constraint is still completely unassigned
assertFalse(constraint.isAssigned());
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.jobmanager.slots;
import org.apache.flink.runtime.instance.Slot;
import java.util.concurrent.CompletableFuture;
/**
* SlotOwner implementation used for testing purposes only.
*/
public class DummySlotOwner implements SlotOwner {
@Override
public CompletableFuture<Boolean> returnAllocatedSlot(Slot slot) {
return CompletableFuture.completedFuture(false);
}
}
......@@ -58,12 +58,14 @@ import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
import org.apache.flink.util.TestLogger;
import org.hamcrest.Matchers;
import org.junit.Test;
import org.mockito.Mockito;
import java.net.InetAddress;
import java.util.Arrays;
import java.util.Collection;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
......@@ -169,7 +171,7 @@ public class TaskExecutorITCase extends TestLogger {
when(jmGateway.getHostname()).thenReturn(jmAddress);
when(jmGateway.offerSlots(
eq(taskManagerResourceId),
any(Iterable.class),
any(Collection.class),
any(Time.class))).thenReturn(mock(CompletableFuture.class, RETURNS_MOCKS));
when(jmGateway.getFencingToken()).thenReturn(jobMasterId);
......@@ -214,7 +216,7 @@ public class TaskExecutorITCase extends TestLogger {
verify(jmGateway, Mockito.timeout(timeout.toMilliseconds())).offerSlots(
eq(taskManagerResourceId),
(Iterable<SlotOffer>)argThat(Matchers.contains(slotOffer)),
(Collection<SlotOffer>)argThat(Matchers.contains(slotOffer)),
any(Time.class));
} finally {
if (testingFatalErrorHandler.hasExceptionOccurred()) {
......
......@@ -852,7 +852,7 @@ public class TaskExecutorTest extends TestLogger {
when(jobMasterGateway.getHostname()).thenReturn(jobManagerAddress);
when(jobMasterGateway.offerSlots(
any(ResourceID.class),
any(Iterable.class),
any(Collection.class),
any(Time.class))).thenReturn(mock(CompletableFuture.class, RETURNS_MOCKS));
rpc.registerGateway(resourceManagerAddress, resourceManagerGateway);
......@@ -904,7 +904,7 @@ public class TaskExecutorTest extends TestLogger {
// the job leader should get the allocation id offered
verify(jobMasterGateway, Mockito.timeout(timeout.toMilliseconds())).offerSlots(
any(ResourceID.class),
(Iterable<SlotOffer>)Matchers.argThat(contains(slotOffer)),
(Collection<SlotOffer>)Matchers.argThat(contains(slotOffer)),
any(Time.class));
// check if a concurrent error occurred
......@@ -975,7 +975,7 @@ public class TaskExecutorTest extends TestLogger {
when(jobMasterGateway.getHostname()).thenReturn(jobManagerAddress);
when(jobMasterGateway.offerSlots(
any(ResourceID.class), any(Iterable.class), any(Time.class)))
any(ResourceID.class), any(Collection.class), any(Time.class)))
.thenReturn(CompletableFuture.completedFuture((Collection<SlotOffer>)Collections.singleton(offer1)));
rpc.registerGateway(resourceManagerAddress, resourceManagerGateway);
......@@ -1315,7 +1315,7 @@ public class TaskExecutorTest extends TestLogger {
when(
jobMasterGateway.offerSlots(
any(ResourceID.class),
any(Iterable.class),
any(Collection.class),
any(Time.class)))
.thenReturn(offerResultFuture);
......@@ -1323,7 +1323,7 @@ public class TaskExecutorTest extends TestLogger {
// been properly started. This will also offer the slots to the job master
jobLeaderService.addJob(jobId, jobManagerAddress);
verify(jobMasterGateway, Mockito.timeout(timeout.toMilliseconds())).offerSlots(any(ResourceID.class), any(Iterable.class), any(Time.class));
verify(jobMasterGateway, Mockito.timeout(timeout.toMilliseconds())).offerSlots(any(ResourceID.class), any(Collection.class), any(Time.class));
// submit the task without having acknowledge the offered slots
tmGateway.submitTask(tdd, jobMasterId, timeout);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册