[FLINK-7956] [flip6] Add support for queued scheduling with slot sharing to SlotPool

This commit adds support for queued scheduling with slot sharing to the
SlotPool. The idea of slot sharing is that multiple tasks can run in the
same slot. Moreover, queued scheduling means that a slot request must not
be completed right away but at a later point in time. This allows to
start new TaskExecutors in case that there are no more slots left.

The main component responsible for the management of shared slots is the
SlotSharingManager. The SlotSharingManager maintains internally a tree-like
structure which stores the SlotContext future of the underlying
AllocatedSlot. Whenever this future is completed potentially pending
LogicalSlot instantiations are executed and sent to the slot requester.

A shared slot is represented by a MultiTaskSlot which can harbour multiple
TaskSlots. A TaskSlot can either be a MultiTaskSlot or a SingleTaskSlot.

In order to represent co-location constraints, we first obtain a root
MultiTaskSlot and then allocate a nested MultiTaskSlot in which the
co-located tasks are allocated. The corresponding SlotRequestID is assigned
to the CoLocationConstraint in order to make the TaskSlot retrievable for
other tasks assigned to the same CoLocationConstraint.

Port SchedulerSlotSharingTest, SchedulerIsolatedTasksTest and
ScheduleWithCoLocationHintTest to run with SlotPool.

Restructure SlotPool components.

Add SlotSharingManagerTest, SlotPoolSlotSharingTest and
SlotPoolCoLocationTest.

This closes #5091.
上级 331ce82c
...@@ -24,11 +24,11 @@ import org.apache.flink.runtime.executiongraph.Execution; ...@@ -24,11 +24,11 @@ import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionEdge; import org.apache.flink.runtime.executiongraph.ExecutionEdge;
import org.apache.flink.runtime.executiongraph.ExecutionGraphException; import org.apache.flink.runtime.executiongraph.ExecutionGraphException;
import org.apache.flink.runtime.executiongraph.IntermediateResultPartition; import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
import org.apache.flink.runtime.instance.LogicalSlot;
import org.apache.flink.runtime.io.network.ConnectionID; import org.apache.flink.runtime.io.network.ConnectionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannel; import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate; import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import java.io.Serializable; import java.io.Serializable;
......
...@@ -33,8 +33,9 @@ import org.apache.flink.runtime.deployment.PartialInputChannelDeploymentDescript ...@@ -33,8 +33,9 @@ import org.apache.flink.runtime.deployment.PartialInputChannelDeploymentDescript
import org.apache.flink.runtime.deployment.ResultPartitionLocation; import org.apache.flink.runtime.deployment.ResultPartitionLocation;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.instance.LogicalSlot; import org.apache.flink.runtime.instance.SlotSharingGroupId;
import org.apache.flink.runtime.instance.SlotProvider; import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
import org.apache.flink.runtime.io.network.ConnectionID; import org.apache.flink.runtime.io.network.ConnectionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint; import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
...@@ -51,6 +52,8 @@ import org.apache.flink.util.FlinkRuntimeException; ...@@ -51,6 +52,8 @@ import org.apache.flink.util.FlinkRuntimeException;
import org.slf4j.Logger; import org.slf4j.Logger;
import javax.annotation.Nullable;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
...@@ -441,9 +444,11 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution ...@@ -441,9 +444,11 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
// this method only works if the execution is in the state 'CREATED' // this method only works if the execution is in the state 'CREATED'
if (transitionState(CREATED, SCHEDULED)) { if (transitionState(CREATED, SCHEDULED)) {
final SlotSharingGroupId slotSharingGroupId = sharingGroup != null ? sharingGroup.getSlotSharingGroupId() : null;
ScheduledUnit toSchedule = locationConstraint == null ? ScheduledUnit toSchedule = locationConstraint == null ?
new ScheduledUnit(this, sharingGroup) : new ScheduledUnit(this, slotSharingGroupId) :
new ScheduledUnit(this, sharingGroup, locationConstraint); new ScheduledUnit(this, slotSharingGroupId, locationConstraint);
// calculate the preferred locations // calculate the preferred locations
final CompletableFuture<Collection<TaskManagerLocation>> preferredLocationsFuture = calculatePreferredLocations(locationPreferenceConstraint); final CompletableFuture<Collection<TaskManagerLocation>> preferredLocationsFuture = calculatePreferredLocations(locationPreferenceConstraint);
...@@ -461,7 +466,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution ...@@ -461,7 +466,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
return this; return this;
} else { } else {
// release the slot // release the slot
logicalSlot.releaseSlot(); logicalSlot.releaseSlot(new FlinkException("Could not assign logical slot to execution " + this + '.'));
throw new CompletionException(new FlinkException("Could not assign slot " + logicalSlot + " to execution " + this + " because it has already been assigned ")); throw new CompletionException(new FlinkException("Could not assign slot " + logicalSlot + " to execution " + this + " because it has already been assigned "));
} }
...@@ -513,7 +518,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution ...@@ -513,7 +518,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
// race double check, did we fail/cancel and do we need to release the slot? // race double check, did we fail/cancel and do we need to release the slot?
if (this.state != DEPLOYING) { if (this.state != DEPLOYING) {
slot.releaseSlot(); slot.releaseSlot(new FlinkException("Actual state of execution " + this + " (" + state + ") does not match expected state DEPLOYING."));
return; return;
} }
...@@ -622,7 +627,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution ...@@ -622,7 +627,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
try { try {
vertex.getExecutionGraph().deregisterExecution(this); vertex.getExecutionGraph().deregisterExecution(this);
releaseAssignedResource(); releaseAssignedResource(new FlinkException("Execution " + this + " was cancelled."));
} }
finally { finally {
vertex.executionCanceled(this); vertex.executionCanceled(this);
...@@ -890,7 +895,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution ...@@ -890,7 +895,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
updateAccumulatorsAndMetrics(userAccumulators, metrics); updateAccumulatorsAndMetrics(userAccumulators, metrics);
releaseAssignedResource(); releaseAssignedResource(null);
vertex.getExecutionGraph().deregisterExecution(this); vertex.getExecutionGraph().deregisterExecution(this);
} }
...@@ -943,7 +948,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution ...@@ -943,7 +948,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
if (transitionState(current, CANCELED)) { if (transitionState(current, CANCELED)) {
try { try {
releaseAssignedResource(); releaseAssignedResource(new FlinkException("Execution " + this + " was cancelled."));
vertex.getExecutionGraph().deregisterExecution(this); vertex.getExecutionGraph().deregisterExecution(this);
} }
...@@ -1035,7 +1040,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution ...@@ -1035,7 +1040,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
updateAccumulatorsAndMetrics(userAccumulators, metrics); updateAccumulatorsAndMetrics(userAccumulators, metrics);
try { try {
releaseAssignedResource(); releaseAssignedResource(t);
vertex.getExecutionGraph().deregisterExecution(this); vertex.getExecutionGraph().deregisterExecution(this);
} }
finally { finally {
...@@ -1176,12 +1181,14 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution ...@@ -1176,12 +1181,14 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
/** /**
* Releases the assigned resource and completes the release future * Releases the assigned resource and completes the release future
* once the assigned resource has been successfully released * once the assigned resource has been successfully released
*
* @param cause for the resource release, null if none
*/ */
private void releaseAssignedResource() { private void releaseAssignedResource(@Nullable Throwable cause) {
final LogicalSlot slot = assignedResource; final LogicalSlot slot = assignedResource;
if (slot != null) { if (slot != null) {
slot.releaseSlot().whenComplete( slot.releaseSlot(cause).whenComplete(
(Object ignored, Throwable throwable) -> { (Object ignored, Throwable throwable) -> {
if (throwable != null) { if (throwable != null) {
releaseFuture.completeExceptionally(throwable); releaseFuture.completeExceptionally(throwable);
......
...@@ -50,7 +50,7 @@ import org.apache.flink.runtime.executiongraph.failover.RestartAllStrategy; ...@@ -50,7 +50,7 @@ import org.apache.flink.runtime.executiongraph.failover.RestartAllStrategy;
import org.apache.flink.runtime.executiongraph.restart.ExecutionGraphRestartCallback; import org.apache.flink.runtime.executiongraph.restart.ExecutionGraphRestartCallback;
import org.apache.flink.runtime.executiongraph.restart.RestartCallback; import org.apache.flink.runtime.executiongraph.restart.RestartCallback;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategy; import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
import org.apache.flink.runtime.instance.SlotProvider; import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobgraph.JobStatus;
......
...@@ -43,7 +43,7 @@ import org.apache.flink.runtime.executiongraph.metrics.NumberOfFullRestartsGauge ...@@ -43,7 +43,7 @@ import org.apache.flink.runtime.executiongraph.metrics.NumberOfFullRestartsGauge
import org.apache.flink.runtime.executiongraph.metrics.RestartTimeGauge; import org.apache.flink.runtime.executiongraph.metrics.RestartTimeGauge;
import org.apache.flink.runtime.executiongraph.metrics.UpTimeGauge; import org.apache.flink.runtime.executiongraph.metrics.UpTimeGauge;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategy; import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
import org.apache.flink.runtime.instance.SlotProvider; import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.JobVertexID;
......
...@@ -35,7 +35,7 @@ import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult; ...@@ -35,7 +35,7 @@ import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
import org.apache.flink.runtime.blob.BlobWriter; import org.apache.flink.runtime.blob.BlobWriter;
import org.apache.flink.runtime.blob.PermanentBlobKey; import org.apache.flink.runtime.blob.PermanentBlobKey;
import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.instance.SlotProvider; import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
import org.apache.flink.runtime.jobgraph.IntermediateDataSet; import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobEdge; import org.apache.flink.runtime.jobgraph.JobEdge;
......
...@@ -32,9 +32,9 @@ import org.apache.flink.runtime.deployment.PartialInputChannelDeploymentDescript ...@@ -32,9 +32,9 @@ import org.apache.flink.runtime.deployment.PartialInputChannelDeploymentDescript
import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor; import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.instance.LogicalSlot; import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.instance.SimpleSlot; import org.apache.flink.runtime.instance.SimpleSlot;
import org.apache.flink.runtime.instance.SlotProvider; import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType; import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern; import org.apache.flink.runtime.jobgraph.DistributionPattern;
......
...@@ -20,9 +20,11 @@ package org.apache.flink.runtime.instance; ...@@ -20,9 +20,11 @@ package org.apache.flink.runtime.instance;
import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.jobmanager.scheduler.SlotAvailabilityListener; import org.apache.flink.runtime.jobmanager.scheduler.SlotAvailabilityListener;
import org.apache.flink.runtime.jobmanager.slots.SlotOwner;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.jobmaster.SlotOwner;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions; import org.apache.flink.util.Preconditions;
import org.slf4j.Logger; import org.slf4j.Logger;
...@@ -163,8 +165,9 @@ public class Instance implements SlotOwner { ...@@ -163,8 +165,9 @@ public class Instance implements SlotOwner {
* owning the assignment group lock wants to give itself back to the instance which requires * owning the assignment group lock wants to give itself back to the instance which requires
* the instance lock * the instance lock
*/ */
final FlinkException cause = new FlinkException("Instance " + this + " has been marked as dead.");
for (Slot slot : slots) { for (Slot slot : slots) {
slot.releaseInstanceSlot(); slot.releaseSlot(cause);
} }
} }
...@@ -321,8 +324,9 @@ public class Instance implements SlotOwner { ...@@ -321,8 +324,9 @@ public class Instance implements SlotOwner {
copy = new ArrayList<Slot>(this.allocatedSlots); copy = new ArrayList<Slot>(this.allocatedSlots);
} }
final FlinkException cause = new FlinkException("Cancel and release all slots of instance " + this + '.');
for (Slot slot : copy) { for (Slot slot : copy) {
slot.releaseInstanceSlot(); slot.releaseSlot(cause);
} }
} }
......
...@@ -19,12 +19,14 @@ ...@@ -19,12 +19,14 @@
package org.apache.flink.runtime.instance; package org.apache.flink.runtime.instance;
import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.jobmanager.slots.SlotContext; import org.apache.flink.runtime.jobmanager.scheduler.Locality;
import org.apache.flink.runtime.jobmanager.slots.SlotOwner;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.jobmaster.SlotContext;
import org.apache.flink.runtime.jobmaster.SlotOwner;
import org.apache.flink.runtime.jobmaster.SlotRequestId;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.util.AbstractID; import org.apache.flink.util.AbstractID;
import org.apache.flink.util.FlinkException;
import javax.annotation.Nullable; import javax.annotation.Nullable;
...@@ -55,8 +57,6 @@ public class SharedSlot extends Slot implements LogicalSlot { ...@@ -55,8 +57,6 @@ public class SharedSlot extends Slot implements LogicalSlot {
/** The set os sub-slots allocated from this shared slot */ /** The set os sub-slots allocated from this shared slot */
private final Set<Slot> subSlots; private final Set<Slot> subSlots;
private final CompletableFuture<?> cancellationFuture = new CompletableFuture<>();
// ------------------------------------------------------------------------ // ------------------------------------------------------------------------
// Old Constructors (prior FLIP-6) // Old Constructors (prior FLIP-6)
// ------------------------------------------------------------------------ // ------------------------------------------------------------------------
...@@ -72,9 +72,9 @@ public class SharedSlot extends Slot implements LogicalSlot { ...@@ -72,9 +72,9 @@ public class SharedSlot extends Slot implements LogicalSlot {
* @param assignmentGroup The assignment group that this shared slot belongs to. * @param assignmentGroup The assignment group that this shared slot belongs to.
*/ */
public SharedSlot( public SharedSlot(
SlotOwner owner, TaskManagerLocation location, int slotNumber, SlotOwner owner, TaskManagerLocation location, int slotNumber,
TaskManagerGateway taskManagerGateway, TaskManagerGateway taskManagerGateway,
SlotSharingGroupAssignment assignmentGroup) { SlotSharingGroupAssignment assignmentGroup) {
this(owner, location, slotNumber, taskManagerGateway, assignmentGroup, null, null); this(owner, location, slotNumber, taskManagerGateway, assignmentGroup, null, null);
} }
...@@ -174,6 +174,11 @@ public class SharedSlot extends Slot implements LogicalSlot { ...@@ -174,6 +174,11 @@ public class SharedSlot extends Slot implements LogicalSlot {
return subSlots.size() > 0; return subSlots.size() > 0;
} }
@Override
public Locality getLocality() {
return Locality.UNKNOWN;
}
@Override @Override
public boolean tryAssignPayload(Payload payload) { public boolean tryAssignPayload(Payload payload) {
throw new UnsupportedOperationException("Cannot assign an execution attempt id to a shared slot."); throw new UnsupportedOperationException("Cannot assign an execution attempt id to a shared slot.");
...@@ -186,9 +191,7 @@ public class SharedSlot extends Slot implements LogicalSlot { ...@@ -186,9 +191,7 @@ public class SharedSlot extends Slot implements LogicalSlot {
} }
@Override @Override
public CompletableFuture<?> releaseSlot() { public CompletableFuture<?> releaseSlot(@Nullable Throwable cause) {
cancellationFuture.completeExceptionally(new FlinkException("Shared slot " + this + " is being released."));
assignmentGroup.releaseSharedSlot(this); assignmentGroup.releaseSharedSlot(this);
if (!(isReleased() && subSlots.isEmpty())) { if (!(isReleased() && subSlots.isEmpty())) {
...@@ -198,11 +201,6 @@ public class SharedSlot extends Slot implements LogicalSlot { ...@@ -198,11 +201,6 @@ public class SharedSlot extends Slot implements LogicalSlot {
return CompletableFuture.completedFuture(null); return CompletableFuture.completedFuture(null);
} }
@Override
public void releaseInstanceSlot() {
releaseSlot();
}
@Override @Override
public int getPhysicalSlotNumber() { public int getPhysicalSlotNumber() {
return getRootSlotNumber(); return getRootSlotNumber();
...@@ -214,8 +212,14 @@ public class SharedSlot extends Slot implements LogicalSlot { ...@@ -214,8 +212,14 @@ public class SharedSlot extends Slot implements LogicalSlot {
} }
@Override @Override
public SlotRequestID getSlotRequestId() { public SlotRequestId getSlotRequestId() {
return getSlotContext().getSlotRequestId(); return NO_SLOT_REQUEST_ID;
}
@Nullable
@Override
public SlotSharingGroupId getSlotSharingGroupId() {
return NO_SLOT_SHARING_GROUP_ID;
} }
/** /**
......
...@@ -20,10 +20,11 @@ package org.apache.flink.runtime.instance; ...@@ -20,10 +20,11 @@ package org.apache.flink.runtime.instance;
import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.jobmanager.scheduler.Locality; import org.apache.flink.runtime.jobmanager.scheduler.Locality;
import org.apache.flink.runtime.jobmanager.slots.SimpleSlotContext;
import org.apache.flink.runtime.jobmanager.slots.SlotContext;
import org.apache.flink.runtime.jobmanager.slots.SlotOwner;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.jobmaster.SlotContext;
import org.apache.flink.runtime.jobmaster.SlotOwner;
import org.apache.flink.runtime.jobmaster.SlotRequestId;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.util.AbstractID; import org.apache.flink.util.AbstractID;
import org.apache.flink.util.FlinkException; import org.apache.flink.util.FlinkException;
...@@ -69,8 +70,8 @@ public class SimpleSlot extends Slot implements LogicalSlot { ...@@ -69,8 +70,8 @@ public class SimpleSlot extends Slot implements LogicalSlot {
* @param taskManagerGateway The gateway to communicate with the TaskManager of this slot * @param taskManagerGateway The gateway to communicate with the TaskManager of this slot
*/ */
public SimpleSlot( public SimpleSlot(
SlotOwner owner, TaskManagerLocation location, int slotNumber, SlotOwner owner, TaskManagerLocation location, int slotNumber,
TaskManagerGateway taskManagerGateway) { TaskManagerGateway taskManagerGateway) {
this(owner, location, slotNumber, taskManagerGateway, null, null); this(owner, location, slotNumber, taskManagerGateway, null, null);
} }
...@@ -97,7 +98,6 @@ public class SimpleSlot extends Slot implements LogicalSlot { ...@@ -97,7 +98,6 @@ public class SimpleSlot extends Slot implements LogicalSlot {
parent != null ? parent != null ?
parent.getSlotContext() : parent.getSlotContext() :
new SimpleSlotContext( new SimpleSlotContext(
NO_SLOT_REQUEST_ID,
NO_ALLOCATION_ID, NO_ALLOCATION_ID,
location, location,
slotNumber, slotNumber,
...@@ -218,18 +218,13 @@ public class SimpleSlot extends Slot implements LogicalSlot { ...@@ -218,18 +218,13 @@ public class SimpleSlot extends Slot implements LogicalSlot {
// ------------------------------------------------------------------------ // ------------------------------------------------------------------------
@Override @Override
public void releaseInstanceSlot() { public CompletableFuture<?> releaseSlot(@Nullable Throwable cause) {
releaseSlot();
}
@Override
public CompletableFuture<?> releaseSlot() {
if (!isCanceled()) { if (!isCanceled()) {
final CompletableFuture<?> terminationFuture; final CompletableFuture<?> terminationFuture;
if (payload != null) { if (payload != null) {
// trigger the failure of the slot payload // trigger the failure of the slot payload
payload.fail(new FlinkException("TaskManager was lost/killed: " + getTaskManagerLocation())); payload.fail(cause != null ? cause : new FlinkException("TaskManager was lost/killed: " + getTaskManagerLocation()));
// wait for the termination of the payload before releasing the slot // wait for the termination of the payload before releasing the slot
terminationFuture = payload.getTerminalStateFuture(); terminationFuture = payload.getTerminalStateFuture();
...@@ -276,8 +271,14 @@ public class SimpleSlot extends Slot implements LogicalSlot { ...@@ -276,8 +271,14 @@ public class SimpleSlot extends Slot implements LogicalSlot {
} }
@Override @Override
public SlotRequestID getSlotRequestId() { public SlotRequestId getSlotRequestId() {
return getSlotContext().getSlotRequestId(); return NO_SLOT_REQUEST_ID;
}
@Nullable
@Override
public SlotSharingGroupId getSlotSharingGroupId() {
return NO_SLOT_SHARING_GROUP_ID;
} }
// ------------------------------------------------------------------------ // ------------------------------------------------------------------------
......
...@@ -16,10 +16,11 @@ ...@@ -16,10 +16,11 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.flink.runtime.jobmanager.slots; package org.apache.flink.runtime.instance;
import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.instance.SlotRequestID; import org.apache.flink.runtime.jobmaster.SlotContext;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.util.Preconditions; import org.apache.flink.util.Preconditions;
...@@ -28,8 +29,6 @@ import org.apache.flink.util.Preconditions; ...@@ -28,8 +29,6 @@ import org.apache.flink.util.Preconditions;
*/ */
public class SimpleSlotContext implements SlotContext { public class SimpleSlotContext implements SlotContext {
private final SlotRequestID slotRequestId;
private final AllocationID allocationId; private final AllocationID allocationId;
private final TaskManagerLocation taskManagerLocation; private final TaskManagerLocation taskManagerLocation;
...@@ -39,23 +38,16 @@ public class SimpleSlotContext implements SlotContext { ...@@ -39,23 +38,16 @@ public class SimpleSlotContext implements SlotContext {
private final TaskManagerGateway taskManagerGateway; private final TaskManagerGateway taskManagerGateway;
public SimpleSlotContext( public SimpleSlotContext(
SlotRequestID slotRequestId,
AllocationID allocationId, AllocationID allocationId,
TaskManagerLocation taskManagerLocation, TaskManagerLocation taskManagerLocation,
int physicalSlotNumber, int physicalSlotNumber,
TaskManagerGateway taskManagerGateway) { TaskManagerGateway taskManagerGateway) {
this.slotRequestId = Preconditions.checkNotNull(slotRequestId);
this.allocationId = Preconditions.checkNotNull(allocationId); this.allocationId = Preconditions.checkNotNull(allocationId);
this.taskManagerLocation = Preconditions.checkNotNull(taskManagerLocation); this.taskManagerLocation = Preconditions.checkNotNull(taskManagerLocation);
this.physicalSlotNumber = physicalSlotNumber; this.physicalSlotNumber = physicalSlotNumber;
this.taskManagerGateway = Preconditions.checkNotNull(taskManagerGateway); this.taskManagerGateway = Preconditions.checkNotNull(taskManagerGateway);
} }
@Override
public SlotRequestID getSlotRequestId() {
return slotRequestId;
}
@Override @Override
public AllocationID getAllocationId() { public AllocationID getAllocationId() {
return allocationId; return allocationId;
......
...@@ -20,15 +20,16 @@ package org.apache.flink.runtime.instance; ...@@ -20,15 +20,16 @@ package org.apache.flink.runtime.instance;
import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.jobmanager.slots.SimpleSlotContext;
import org.apache.flink.runtime.jobmanager.slots.SlotContext;
import org.apache.flink.runtime.jobmanager.slots.SlotOwner;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.jobmaster.SlotContext;
import org.apache.flink.runtime.jobmaster.SlotOwner;
import org.apache.flink.runtime.jobmaster.SlotRequestId;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.util.AbstractID; import org.apache.flink.util.AbstractID;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkArgument;
...@@ -62,7 +63,8 @@ public abstract class Slot { ...@@ -62,7 +63,8 @@ public abstract class Slot {
// temporary placeholder for Slots that are not constructed from an AllocatedSlot (prior to FLIP-6) // temporary placeholder for Slots that are not constructed from an AllocatedSlot (prior to FLIP-6)
protected static final AllocationID NO_ALLOCATION_ID = new AllocationID(0L, 0L); protected static final AllocationID NO_ALLOCATION_ID = new AllocationID(0L, 0L);
protected static final SlotRequestID NO_SLOT_REQUEST_ID = new SlotRequestID(0L, 0L); protected static final SlotRequestId NO_SLOT_REQUEST_ID = new SlotRequestId(0L, 0L);
protected static final SlotSharingGroupId NO_SLOT_SHARING_GROUP_ID = new SlotSharingGroupId(0L, 0L);
// ------------------------------------------------------------------------ // ------------------------------------------------------------------------
...@@ -112,7 +114,6 @@ public abstract class Slot { ...@@ -112,7 +114,6 @@ public abstract class Slot {
// create a simple slot context // create a simple slot context
this.slotContext = new SimpleSlotContext( this.slotContext = new SimpleSlotContext(
NO_SLOT_REQUEST_ID,
NO_ALLOCATION_ID, NO_ALLOCATION_ID,
location, location,
slotNumber, slotNumber,
...@@ -333,7 +334,7 @@ public abstract class Slot { ...@@ -333,7 +334,7 @@ public abstract class Slot {
* If this slot is a simple slot, it will be returned to its instance. If it is a shared slot, * If this slot is a simple slot, it will be returned to its instance. If it is a shared slot,
* it will release all of its sub-slots and release itself. * it will release all of its sub-slots and release itself.
*/ */
public abstract void releaseInstanceSlot(); public abstract CompletableFuture<?> releaseSlot(@Nullable Throwable cause);
// -------------------------------------------------------------------------------------------- // --------------------------------------------------------------------------------------------
......
...@@ -18,28 +18,29 @@ ...@@ -18,28 +18,29 @@
package org.apache.flink.runtime.instance; package org.apache.flink.runtime.instance;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint; import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
import org.apache.flink.runtime.jobmanager.scheduler.Locality; import org.apache.flink.runtime.jobmanager.scheduler.Locality;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.util.AbstractID; import org.apache.flink.util.AbstractID;
import org.apache.flink.runtime.executiongraph.ExecutionVertex; import org.apache.flink.util.FlinkException;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
/** /**
* The SlotSharingGroupAssignment manages a set of shared slots, which are shared between * The SlotSharingGroupAssignment manages a set of shared slots, which are shared between
...@@ -215,7 +216,7 @@ public class SlotSharingGroupAssignment { ...@@ -215,7 +216,7 @@ public class SlotSharingGroupAssignment {
// note that this does implicitly release the slot we have just added // note that this does implicitly release the slot we have just added
// as well, because we release its last child slot. That is expected // as well, because we release its last child slot. That is expected
// and desired. // and desired.
constraintGroupSlot.releaseInstanceSlot(); constraintGroupSlot.releaseSlot(new FlinkException("Could not create a sub slot in this shared slot."));
} }
} }
else { else {
...@@ -273,7 +274,7 @@ public class SlotSharingGroupAssignment { ...@@ -273,7 +274,7 @@ public class SlotSharingGroupAssignment {
*/ */
public SimpleSlot getSlotForTask(JobVertexID vertexID, Iterable<TaskManagerLocation> locationPreferences) { public SimpleSlot getSlotForTask(JobVertexID vertexID, Iterable<TaskManagerLocation> locationPreferences) {
synchronized (lock) { synchronized (lock) {
Tuple2<SharedSlot, Locality> p = getSlotForTaskInternal(vertexID, locationPreferences, false); Tuple2<SharedSlot, Locality> p = getSharedSlotForTask(vertexID, locationPreferences, false);
if (p != null) { if (p != null) {
SharedSlot ss = p.f0; SharedSlot ss = p.f0;
...@@ -324,7 +325,7 @@ public class SlotSharingGroupAssignment { ...@@ -324,7 +325,7 @@ public class SlotSharingGroupAssignment {
} }
TaskManagerLocation location = previous.getTaskManagerLocation(); TaskManagerLocation location = previous.getTaskManagerLocation();
Tuple2<SharedSlot, Locality> p = getSlotForTaskInternal( Tuple2<SharedSlot, Locality> p = getSharedSlotForTask(
constraint.getGroupId(), Collections.singleton(location), true); constraint.getGroupId(), Collections.singleton(location), true);
if (p == null) { if (p == null) {
...@@ -355,7 +356,7 @@ public class SlotSharingGroupAssignment { ...@@ -355,7 +356,7 @@ public class SlotSharingGroupAssignment {
// grab a new slot and initialize the constraint with that one. // grab a new slot and initialize the constraint with that one.
// preferred locations are defined by the vertex // preferred locations are defined by the vertex
Tuple2<SharedSlot, Locality> p = Tuple2<SharedSlot, Locality> p =
getSlotForTaskInternal(constraint.getGroupId(), locationPreferences, false); getSharedSlotForTask(constraint.getGroupId(), locationPreferences, false);
if (p == null) { if (p == null) {
// could not get a shared slot for this co-location-group // could not get a shared slot for this co-location-group
return null; return null;
...@@ -382,9 +383,10 @@ public class SlotSharingGroupAssignment { ...@@ -382,9 +383,10 @@ public class SlotSharingGroupAssignment {
} }
private Tuple2<SharedSlot, Locality> getSlotForTaskInternal( public Tuple2<SharedSlot, Locality> getSharedSlotForTask(
AbstractID groupId, Iterable<TaskManagerLocation> preferredLocations, boolean localOnly) AbstractID groupId,
{ Iterable<TaskManagerLocation> preferredLocations,
boolean localOnly) {
// check if there is anything at all in this group assignment // check if there is anything at all in this group assignment
if (allSlots.isEmpty()) { if (allSlots.isEmpty()) {
return null; return null;
...@@ -507,7 +509,7 @@ public class SlotSharingGroupAssignment { ...@@ -507,7 +509,7 @@ public class SlotSharingGroupAssignment {
} }
/** /**
* Called from {@link org.apache.flink.runtime.instance.SharedSlot#releaseInstanceSlot()}. * Called from {@link org.apache.flink.runtime.instance.SharedSlot#releaseSlot(Throwable)}.
* *
* @param sharedSlot The slot to be released. * @param sharedSlot The slot to be released.
*/ */
...@@ -517,10 +519,11 @@ public class SlotSharingGroupAssignment { ...@@ -517,10 +519,11 @@ public class SlotSharingGroupAssignment {
// we are releasing this slot // we are releasing this slot
if (sharedSlot.hasChildren()) { if (sharedSlot.hasChildren()) {
final FlinkException cause = new FlinkException("Releasing shared slot parent.");
// by simply releasing all children, we should eventually release this slot. // by simply releasing all children, we should eventually release this slot.
Set<Slot> children = sharedSlot.getSubSlots(); Set<Slot> children = sharedSlot.getSubSlots();
while (children.size() > 0) { while (children.size() > 0) {
children.iterator().next().releaseInstanceSlot(); children.iterator().next().releaseSlot(cause);
} }
} }
else { else {
......
...@@ -20,15 +20,13 @@ package org.apache.flink.runtime.instance; ...@@ -20,15 +20,13 @@ package org.apache.flink.runtime.instance;
import org.apache.flink.util.AbstractID; import org.apache.flink.util.AbstractID;
/** public class SlotSharingGroupId extends AbstractID {
* Request ID identifying different slot requests. private static final long serialVersionUID = 8837647978345422042L;
*/
public final class SlotRequestID extends AbstractID {
private static final long serialVersionUID = -6072105912250154283L;
public SlotRequestID(long lowerPart, long upperPart) { public SlotSharingGroupId(long lowerPart, long upperPart) {
super(lowerPart, upperPart); super(lowerPart, upperPart);
} }
public SlotRequestID() {} public SlotSharingGroupId() {
}
} }
...@@ -18,16 +18,20 @@ ...@@ -18,16 +18,20 @@
package org.apache.flink.runtime.jobmanager.scheduler; package org.apache.flink.runtime.jobmanager.scheduler;
import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.instance.SharedSlot;
import org.apache.flink.runtime.jobmaster.SlotRequestId;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.util.AbstractID; import org.apache.flink.util.AbstractID;
import org.apache.flink.runtime.instance.Instance; import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions; import org.apache.flink.util.Preconditions;
import org.apache.flink.runtime.instance.SharedSlot;
import static org.apache.flink.util.Preconditions.checkState; import javax.annotation.Nullable;
import java.util.Objects;
import static org.apache.flink.util.Preconditions.checkNotNull; import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkState;
/** /**
* A CoLocationConstraint manages the location of a set of tasks * A CoLocationConstraint manages the location of a set of tasks
...@@ -43,12 +47,14 @@ public class CoLocationConstraint { ...@@ -43,12 +47,14 @@ public class CoLocationConstraint {
private volatile SharedSlot sharedSlot; private volatile SharedSlot sharedSlot;
private volatile ResourceID lockedLocation; private volatile TaskManagerLocation lockedLocation;
private volatile SlotRequestId slotRequestId;
CoLocationConstraint(CoLocationGroup group) { CoLocationConstraint(CoLocationGroup group) {
Preconditions.checkNotNull(group); Preconditions.checkNotNull(group);
this.group = group; this.group = group;
this.slotRequestId = null;
} }
// ------------------------------------------------------------------------ // ------------------------------------------------------------------------
...@@ -107,7 +113,7 @@ public class CoLocationConstraint { ...@@ -107,7 +113,7 @@ public class CoLocationConstraint {
*/ */
public TaskManagerLocation getLocation() { public TaskManagerLocation getLocation() {
if (lockedLocation != null) { if (lockedLocation != null) {
return sharedSlot.getTaskManagerLocation(); return lockedLocation;
} else { } else {
throw new IllegalStateException("Location not yet locked"); throw new IllegalStateException("Location not yet locked");
} }
...@@ -136,12 +142,12 @@ public class CoLocationConstraint { ...@@ -136,12 +142,12 @@ public class CoLocationConstraint {
this.sharedSlot = newSlot; this.sharedSlot = newSlot;
} }
else if (newSlot != this.sharedSlot){ else if (newSlot != this.sharedSlot){
if (lockedLocation != null && lockedLocation != newSlot.getTaskManagerID()) { if (lockedLocation != null && !Objects.equals(lockedLocation, newSlot.getTaskManagerLocation())) {
throw new IllegalArgumentException( throw new IllegalArgumentException(
"Cannot assign different location to a constraint whose location is locked."); "Cannot assign different location to a constraint whose location is locked.");
} }
if (this.sharedSlot.isAlive()) { if (this.sharedSlot.isAlive()) {
this.sharedSlot.releaseInstanceSlot(); this.sharedSlot.releaseSlot(new FlinkException("Setting new shared slot for co-location constraint."));
} }
this.sharedSlot = newSlot; this.sharedSlot = newSlot;
...@@ -159,7 +165,43 @@ public class CoLocationConstraint { ...@@ -159,7 +165,43 @@ public class CoLocationConstraint {
checkState(lockedLocation == null, "Location is already locked"); checkState(lockedLocation == null, "Location is already locked");
checkState(sharedSlot != null, "Cannot lock location without a slot."); checkState(sharedSlot != null, "Cannot lock location without a slot.");
lockedLocation = sharedSlot.getTaskManagerID(); lockedLocation = sharedSlot.getTaskManagerLocation();
}
/**
* Locks the location of this slot. The location can be locked only once
* and only after a shared slot has been assigned.
*
* <p>Note: This method exists for compatibility reasons with the Flip-6 SlotPool
*
* @param taskManagerLocation to lock this co-location constraint to
*/
public void lockLocation(TaskManagerLocation taskManagerLocation) {
checkNotNull(taskManagerLocation);
checkState(lockedLocation == null, "Location is already locked.");
lockedLocation = taskManagerLocation;
}
/**
* Sets the slot request id of the currently assigned slot to the co-location constraint.
* All other tasks belonging to this co-location constraint will be deployed to the same slot.
*
* @param slotRequestId identifying the assigned slot for this co-location constraint
*/
public void setSlotRequestId(@Nullable SlotRequestId slotRequestId) {
this.slotRequestId = slotRequestId;
}
/**
* Returns the currently assigned slot request id identifying the slot to which tasks
* belonging to this co-location constraint will be deployed to.
*
* @return Slot request id of the assigned slot or null if none
*/
@Nullable
public SlotRequestId getSlotRequestId() {
return slotRequestId;
} }
// ------------------------------------------------------------------------ // ------------------------------------------------------------------------
......
...@@ -44,8 +44,8 @@ public class NoResourceAvailableException extends JobException { ...@@ -44,8 +44,8 @@ public class NoResourceAvailableException extends JobException {
NoResourceAvailableException(ScheduledUnit task, int numInstances, int numSlotsTotal, int availableSlots) { NoResourceAvailableException(ScheduledUnit task, int numInstances, int numSlotsTotal, int availableSlots) {
super(String.format("%s Task to schedule: < %s > with groupID < %s > in sharing group < %s >. Resources available to scheduler: Number of instances=%d, total number of slots=%d, available slots=%d", super(String.format("%s Task to schedule: < %s > with groupID < %s > in sharing group < %s >. Resources available to scheduler: Number of instances=%d, total number of slots=%d, available slots=%d",
BASE_MESSAGE, task.getTaskToExecute(), BASE_MESSAGE, task.getTaskToExecute(),
task.getLocationConstraint() == null ? task.getTaskToExecute().getVertex().getJobvertexId() : task.getLocationConstraint().getGroupId(), task.getCoLocationConstraint() == null ? task.getTaskToExecute().getVertex().getJobvertexId() : task.getCoLocationConstraint().getGroupId(),
task.getSlotSharingGroup(), task.getSlotSharingGroupId(),
numInstances, numInstances,
numSlotsTotal, numSlotsTotal,
availableSlots)); availableSlots));
......
...@@ -19,68 +19,108 @@ ...@@ -19,68 +19,108 @@
package org.apache.flink.runtime.jobmanager.scheduler; package org.apache.flink.runtime.jobmanager.scheduler;
import org.apache.flink.runtime.executiongraph.Execution; import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.instance.SlotSharingGroupId;
import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.util.Preconditions; import org.apache.flink.util.Preconditions;
import javax.annotation.Nullable;
/**
* ScheduledUnit contains the information necessary to allocate a slot for the given
* {@link JobVertexID}.
*/
public class ScheduledUnit { public class ScheduledUnit {
@Nullable
private final Execution vertexExecution; private final Execution vertexExecution;
private final SlotSharingGroup sharingGroup; private final JobVertexID jobVertexId;
private final CoLocationConstraint locationConstraint; @Nullable
private final SlotSharingGroupId slotSharingGroupId;
@Nullable
private final CoLocationConstraint coLocationConstraint;
// -------------------------------------------------------------------------------------------- // --------------------------------------------------------------------------------------------
public ScheduledUnit(Execution task) { public ScheduledUnit(Execution task) {
Preconditions.checkNotNull(task); this(
Preconditions.checkNotNull(task),
this.vertexExecution = task; task.getVertex().getJobvertexId(),
this.sharingGroup = null; null,
this.locationConstraint = null; null);
} }
public ScheduledUnit(Execution task, SlotSharingGroup sharingUnit) { public ScheduledUnit(Execution task, @Nullable SlotSharingGroupId slotSharingGroupId) {
Preconditions.checkNotNull(task); this(
Preconditions.checkNotNull(task),
this.vertexExecution = task; task.getVertex().getJobvertexId(),
this.sharingGroup = sharingUnit; slotSharingGroupId,
this.locationConstraint = null; null);
} }
public ScheduledUnit(Execution task, SlotSharingGroup sharingUnit, CoLocationConstraint locationConstraint) { public ScheduledUnit(
Preconditions.checkNotNull(task); Execution task,
Preconditions.checkNotNull(sharingUnit); @Nullable SlotSharingGroupId slotSharingGroupId,
Preconditions.checkNotNull(locationConstraint); @Nullable CoLocationConstraint coLocationConstraint) {
this(
Preconditions.checkNotNull(task),
task.getVertex().getJobvertexId(),
slotSharingGroupId,
coLocationConstraint);
}
public ScheduledUnit(
JobVertexID jobVertexId,
@Nullable SlotSharingGroupId slotSharingGroupId,
@Nullable CoLocationConstraint coLocationConstraint) {
this(
null,
jobVertexId,
slotSharingGroupId,
coLocationConstraint);
}
public ScheduledUnit(
@Nullable Execution task,
JobVertexID jobVertexId,
@Nullable SlotSharingGroupId slotSharingGroupId,
@Nullable CoLocationConstraint coLocationConstraint) {
this.vertexExecution = task; this.vertexExecution = task;
this.sharingGroup = sharingUnit; this.jobVertexId = Preconditions.checkNotNull(jobVertexId);
this.locationConstraint = locationConstraint; this.slotSharingGroupId = slotSharingGroupId;
this.coLocationConstraint = coLocationConstraint;
} }
// -------------------------------------------------------------------------------------------- // --------------------------------------------------------------------------------------------
public JobVertexID getJobVertexId() { public JobVertexID getJobVertexId() {
return this.vertexExecution.getVertex().getJobvertexId(); return jobVertexId;
} }
@Nullable
public Execution getTaskToExecute() { public Execution getTaskToExecute() {
return vertexExecution; return vertexExecution;
} }
public SlotSharingGroup getSlotSharingGroup() { @Nullable
return sharingGroup; public SlotSharingGroupId getSlotSharingGroupId() {
return slotSharingGroupId;
} }
public CoLocationConstraint getLocationConstraint() { @Nullable
return locationConstraint; public CoLocationConstraint getCoLocationConstraint() {
return coLocationConstraint;
} }
// -------------------------------------------------------------------------------------------- // --------------------------------------------------------------------------------------------
@Override @Override
public String toString() { public String toString() {
return "{task=" + vertexExecution.getVertexWithAttempt() + ", sharingUnit=" + sharingGroup + return "{task=" + vertexExecution.getVertexWithAttempt() + ", sharingUnit=" + slotSharingGroupId +
", locationConstraint=" + locationConstraint + '}'; ", locationConstraint=" + coLocationConstraint + '}';
} }
} }
...@@ -18,20 +18,22 @@ ...@@ -18,20 +18,22 @@
package org.apache.flink.runtime.jobmanager.scheduler; package org.apache.flink.runtime.jobmanager.scheduler;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.executiongraph.ExecutionVertex; import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.instance.Instance; import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.instance.InstanceDiedException; import org.apache.flink.runtime.instance.InstanceDiedException;
import org.apache.flink.runtime.instance.InstanceListener; import org.apache.flink.runtime.instance.InstanceListener;
import org.apache.flink.runtime.instance.LogicalSlot; import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.instance.SharedSlot; import org.apache.flink.runtime.instance.SharedSlot;
import org.apache.flink.runtime.instance.SimpleSlot; import org.apache.flink.runtime.instance.SimpleSlot;
import org.apache.flink.runtime.instance.SlotProvider; import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
import org.apache.flink.runtime.instance.SlotSharingGroupAssignment; import org.apache.flink.runtime.instance.SlotSharingGroupAssignment;
import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions; import org.apache.flink.util.Preconditions;
import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.ImmutablePair;
...@@ -39,6 +41,8 @@ import org.apache.commons.lang3.tuple.Pair; ...@@ -39,6 +41,8 @@ import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.util.ArrayDeque; import java.util.ArrayDeque;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
...@@ -49,6 +53,7 @@ import java.util.Iterator; ...@@ -49,6 +53,7 @@ import java.util.Iterator;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects;
import java.util.Queue; import java.util.Queue;
import java.util.Set; import java.util.Set;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
...@@ -177,7 +182,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener, Sl ...@@ -177,7 +182,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener, Sl
synchronized (globalLock) { synchronized (globalLock) {
SlotSharingGroup sharingUnit = task.getSlotSharingGroup(); SlotSharingGroup sharingUnit = vertex.getJobVertex().getSlotSharingGroup();
if (sharingUnit != null) { if (sharingUnit != null) {
...@@ -189,7 +194,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener, Sl ...@@ -189,7 +194,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener, Sl
} }
final SlotSharingGroupAssignment assignment = sharingUnit.getTaskAssignment(); final SlotSharingGroupAssignment assignment = sharingUnit.getTaskAssignment();
final CoLocationConstraint constraint = task.getLocationConstraint(); final CoLocationConstraint constraint = task.getCoLocationConstraint();
// sanity check that we do not use an externally forced location and a co-location constraint together // sanity check that we do not use an externally forced location and a co-location constraint together
if (constraint != null && forceExternalLocation) { if (constraint != null && forceExternalLocation) {
...@@ -274,7 +279,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener, Sl ...@@ -274,7 +279,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener, Sl
// if there is no slot from the group, or the new slot is local, // if there is no slot from the group, or the new slot is local,
// then we use the new slot // then we use the new slot
if (slotFromGroup != null) { if (slotFromGroup != null) {
slotFromGroup.releaseInstanceSlot(); slotFromGroup.releaseSlot(null);
} }
toUse = newSlot; toUse = newSlot;
} }
...@@ -282,7 +287,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener, Sl ...@@ -282,7 +287,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener, Sl
// both are available and usable. neither is local. in that case, we may // both are available and usable. neither is local. in that case, we may
// as well use the slot from the sharing group, to minimize the number of // as well use the slot from the sharing group, to minimize the number of
// instances that the job occupies // instances that the job occupies
newSlot.releaseInstanceSlot(); newSlot.releaseSlot(null);
toUse = slotFromGroup; toUse = slotFromGroup;
} }
...@@ -299,10 +304,10 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener, Sl ...@@ -299,10 +304,10 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener, Sl
} }
catch (Throwable t) { catch (Throwable t) {
if (slotFromGroup != null) { if (slotFromGroup != null) {
slotFromGroup.releaseInstanceSlot(); slotFromGroup.releaseSlot(t);
} }
if (newSlot != null) { if (newSlot != null) {
newSlot.releaseInstanceSlot(); newSlot.releaseSlot(t);
} }
ExceptionUtils.rethrow(t, "An error occurred while allocating a slot in a sharing group"); ExceptionUtils.rethrow(t, "An error occurred while allocating a slot in a sharing group");
...@@ -444,7 +449,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener, Sl ...@@ -444,7 +449,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener, Sl
} }
else { else {
// could not add and allocate the sub-slot, so release shared slot // could not add and allocate the sub-slot, so release shared slot
sharedSlot.releaseInstanceSlot(); sharedSlot.releaseSlot(new FlinkException("Could not allocate sub-slot."));
} }
} }
} }
...@@ -854,4 +859,19 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener, Sl ...@@ -854,4 +859,19 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener, Sl
return future; return future;
} }
} }
// ------------------------------------------------------------------------
// Testing methods
// ------------------------------------------------------------------------
@VisibleForTesting
@Nullable
public Instance getInstance(ResourceID resourceId) {
for (Instance instance : allInstances) {
if (Objects.equals(resourceId, instance.getTaskManagerID())) {
return instance;
}
}
return null;
}
} }
...@@ -23,6 +23,7 @@ import java.util.Set; ...@@ -23,6 +23,7 @@ import java.util.Set;
import java.util.TreeSet; import java.util.TreeSet;
import org.apache.flink.runtime.instance.SlotSharingGroupAssignment; import org.apache.flink.runtime.instance.SlotSharingGroupAssignment;
import org.apache.flink.runtime.instance.SlotSharingGroupId;
import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.JobVertexID;
/** /**
...@@ -39,7 +40,8 @@ public class SlotSharingGroup implements java.io.Serializable { ...@@ -39,7 +40,8 @@ public class SlotSharingGroup implements java.io.Serializable {
/** Mapping of tasks to subslots. This field is only needed inside the JobManager, and is not RPCed. */ /** Mapping of tasks to subslots. This field is only needed inside the JobManager, and is not RPCed. */
private transient SlotSharingGroupAssignment taskAssignment; private transient SlotSharingGroupAssignment taskAssignment;
private final SlotSharingGroupId slotSharingGroupId = new SlotSharingGroupId();
public SlotSharingGroup() {} public SlotSharingGroup() {}
...@@ -62,8 +64,11 @@ public class SlotSharingGroup implements java.io.Serializable { ...@@ -62,8 +64,11 @@ public class SlotSharingGroup implements java.io.Serializable {
public Set<JobVertexID> getJobVertexIds() { public Set<JobVertexID> getJobVertexIds() {
return Collections.unmodifiableSet(ids); return Collections.unmodifiableSet(ids);
} }
public SlotSharingGroupId getSlotSharingGroupId() {
return slotSharingGroupId;
}
public SlotSharingGroupAssignment getTaskAssignment() { public SlotSharingGroupAssignment getTaskAssignment() {
if (this.taskAssignment == null) { if (this.taskAssignment == null) {
this.taskAssignment = new SlotSharingGroupAssignment(); this.taskAssignment = new SlotSharingGroupAssignment();
......
...@@ -18,7 +18,7 @@ ...@@ -18,7 +18,7 @@
package org.apache.flink.runtime.jobmanager.slots; package org.apache.flink.runtime.jobmanager.slots;
import org.apache.flink.runtime.instance.AllocatedSlot; import org.apache.flink.runtime.jobmaster.slotpool.AllocatedSlot;
import org.apache.flink.runtime.jobmanager.scheduler.Locality; import org.apache.flink.runtime.jobmanager.scheduler.Locality;
import static org.apache.flink.util.Preconditions.checkNotNull; import static org.apache.flink.util.Preconditions.checkNotNull;
...@@ -39,11 +39,11 @@ public class SlotAndLocality { ...@@ -39,11 +39,11 @@ public class SlotAndLocality {
// ------------------------------------------------------------------------ // ------------------------------------------------------------------------
public AllocatedSlot slot() { public AllocatedSlot getSlot() {
return slot; return slot;
} }
public Locality locality() { public Locality getLocality() {
return locality; return locality;
} }
......
...@@ -55,9 +55,8 @@ import org.apache.flink.runtime.heartbeat.HeartbeatManager; ...@@ -55,9 +55,8 @@ import org.apache.flink.runtime.heartbeat.HeartbeatManager;
import org.apache.flink.runtime.heartbeat.HeartbeatServices; import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.heartbeat.HeartbeatTarget; import org.apache.flink.runtime.heartbeat.HeartbeatTarget;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.instance.LogicalSlot; import org.apache.flink.runtime.jobmaster.slotpool.SlotPool;
import org.apache.flink.runtime.instance.SlotPool; import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolGateway;
import org.apache.flink.runtime.instance.SlotPoolGateway;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobGraph;
......
...@@ -16,9 +16,11 @@ ...@@ -16,9 +16,11 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.flink.runtime.instance; package org.apache.flink.runtime.jobmaster;
import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.instance.SlotSharingGroupId;
import org.apache.flink.runtime.jobmanager.scheduler.Locality;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
...@@ -32,8 +34,22 @@ import java.util.concurrent.CompletableFuture; ...@@ -32,8 +34,22 @@ import java.util.concurrent.CompletableFuture;
*/ */
public interface LogicalSlot { public interface LogicalSlot {
Payload TERMINATED_PAYLOAD = new Payload() {
private final CompletableFuture<?> completedTerminationFuture = CompletableFuture.completedFuture(null);
@Override
public void fail(Throwable cause) {
// ignore
}
@Override
public CompletableFuture<?> getTerminalStateFuture() {
return completedTerminationFuture;
}
};
/** /**
* Return the TaskManager location of this slot * Return the TaskManager location of this slot.
* *
* @return TaskManager location of this slot * @return TaskManager location of this slot
*/ */
...@@ -47,18 +63,25 @@ public interface LogicalSlot { ...@@ -47,18 +63,25 @@ public interface LogicalSlot {
TaskManagerGateway getTaskManagerGateway(); TaskManagerGateway getTaskManagerGateway();
/** /**
* True if the slot is still alive. * Gets the locality of this slot.
* *
* @return True if the slot is still alive, otherwise false * @return locality of this slot
*/
Locality getLocality();
/**
* True if the slot is alive and has not been released.
*
* @return True if the slot is alive, otherwise false if the slot is released
*/ */
boolean isAlive(); boolean isAlive();
/** /**
* Tries to assign a payload to this slot. This can only happens * Tries to assign a payload to this slot. One can only assign a single
* exactly once. * payload once.
* *
* @param payload to be assigned to this slot. * @param payload to be assigned to this slot.
* @return true if the payload could be set, otherwise false * @return true if the payload could be assigned, otherwise false
*/ */
boolean tryAssignPayload(Payload payload); boolean tryAssignPayload(Payload payload);
...@@ -75,8 +98,19 @@ public interface LogicalSlot { ...@@ -75,8 +98,19 @@ public interface LogicalSlot {
* *
* @return Future which is completed once the slot has been released, * @return Future which is completed once the slot has been released,
* in case of a failure it is completed exceptionally * in case of a failure it is completed exceptionally
* @deprecated Added because extended the actual releaseSlot method with cause parameter.
*/
default CompletableFuture<?> releaseSlot() {
return releaseSlot(null);
}
/**
* Releases this slot.
*
* @param cause why the slot was released or null if none
* @return future which is completed once the slot has been released
*/ */
CompletableFuture<?> releaseSlot(); CompletableFuture<?> releaseSlot(@Nullable Throwable cause);
/** /**
* Gets the slot number on the TaskManager. * Gets the slot number on the TaskManager.
...@@ -98,7 +132,15 @@ public interface LogicalSlot { ...@@ -98,7 +132,15 @@ public interface LogicalSlot {
* *
* @return Unique id identifying the slot request with which this slot was allocated * @return Unique id identifying the slot request with which this slot was allocated
*/ */
SlotRequestID getSlotRequestId(); SlotRequestId getSlotRequestId();
/**
* Gets the slot sharing group id to which this slot belongs.
*
* @return slot sharing group id of this slot or null, if none.
*/
@Nullable
SlotSharingGroupId getSlotSharingGroupId();
/** /**
* Payload for a logical slot. * Payload for a logical slot.
......
...@@ -16,27 +16,18 @@ ...@@ -16,27 +16,18 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.flink.runtime.jobmanager.slots; package org.apache.flink.runtime.jobmaster;
import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.instance.Slot; import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.instance.SlotRequestID;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
/** /**
* Interface for the context of a logical {@link Slot}. This context contains information * Interface for the context of a {@link LogicalSlot}. This context contains information
* about the underlying allocated slot and how to communicate with the TaskManager on which * about the underlying allocated slot and how to communicate with the TaskManager on which
* it was allocated. * it was allocated.
*/ */
public interface SlotContext { public interface SlotContext {
/**
* Gets the slot request id under which the slot has been requested. This id uniquely identifies the logical slot.
*
* @return The id under which the slot has been requested
*/
SlotRequestID getSlotRequestId();
/** /**
* Gets the id under which the slot has been allocated on the TaskManager. This id uniquely identifies the * Gets the id under which the slot has been allocated on the TaskManager. This id uniquely identifies the
* physical slot. * physical slot.
......
...@@ -16,9 +16,7 @@ ...@@ -16,9 +16,7 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.flink.runtime.jobmanager.slots; package org.apache.flink.runtime.jobmaster;
import org.apache.flink.runtime.instance.LogicalSlot;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.jobmaster;
import org.apache.flink.runtime.jobmaster.slotpool.SlotPool;
import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
import org.apache.flink.util.AbstractID;
/**
* Request id identifying slot requests made by the {@link SlotProvider} towards the
* {@link SlotPool}.
*/
public final class SlotRequestId extends AbstractID {
private static final long serialVersionUID = -6072105912250154283L;
public SlotRequestId(long lowerPart, long upperPart) {
super(lowerPart, upperPart);
}
public SlotRequestId() {}
}
...@@ -16,36 +16,32 @@ ...@@ -16,36 +16,32 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.flink.runtime.instance; package org.apache.flink.runtime.jobmaster.slotpool;
import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile; import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.jobmanager.scheduler.Locality;
import org.apache.flink.runtime.jobmanager.slots.SlotContext;
import org.apache.flink.runtime.jobmanager.slots.SlotException;
import org.apache.flink.runtime.jobmanager.slots.SlotOwner;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.jobmaster.SlotContext;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.util.Preconditions;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import static org.apache.flink.util.Preconditions.checkNotNull; import static org.apache.flink.util.Preconditions.checkNotNull;
/** /**
* The {@code AllocatedSlot} represents a slot that the JobManager allocated from a TaskManager. * The {@code AllocatedSlot} represents a slot that the JobMaster allocated from a TaskExecutor.
* It represents a slice of allocated resources from the TaskManager. * It represents a slice of allocated resources from the TaskExecutor.
* *
* <p>To allocate an {@code AllocatedSlot}, the requests a slot from the ResourceManager. The * <p>To allocate an {@code AllocatedSlot}, the requests a slot from the ResourceManager. The
* ResourceManager picks (or starts) a TaskManager that will then allocate the slot to the * ResourceManager picks (or starts) a TaskExecutor that will then allocate the slot to the
* JobManager and notify the JobManager. * JobMaster and notify the JobMaster.
* *
* <p>Note: Prior to the resource management changes introduced in (Flink Improvement Proposal 6), * <p>Note: Prior to the resource management changes introduced in (Flink Improvement Proposal 6),
* an AllocatedSlot was allocated to the JobManager as soon as the TaskManager registered at the * an AllocatedSlot was allocated to the JobManager as soon as the TaskManager registered at the
* JobManager. All slots had a default unknown resource profile. * JobManager. All slots had a default unknown resource profile.
*/ */
public class AllocatedSlot { public class AllocatedSlot implements SlotContext {
/** The ID under which the slot is allocated. Uniquely identifies the slot. */ /** The ID under which the slot is allocated. Uniquely identifies the slot. */
private final AllocationID allocationId; private final AllocationID allocationId;
...@@ -62,9 +58,7 @@ public class AllocatedSlot { ...@@ -62,9 +58,7 @@ public class AllocatedSlot {
/** The number of the slot on the TaskManager to which slot belongs. Purely informational. */ /** The number of the slot on the TaskManager to which slot belongs. Purely informational. */
private final int physicalSlotNumber; private final int physicalSlotNumber;
private final SlotOwner slotOwner; private final AtomicReference<Payload> payloadReference;
private final AtomicReference<LogicalSlot> logicalSlotReference;
// ------------------------------------------------------------------------ // ------------------------------------------------------------------------
...@@ -73,16 +67,14 @@ public class AllocatedSlot { ...@@ -73,16 +67,14 @@ public class AllocatedSlot {
TaskManagerLocation location, TaskManagerLocation location,
int physicalSlotNumber, int physicalSlotNumber,
ResourceProfile resourceProfile, ResourceProfile resourceProfile,
TaskManagerGateway taskManagerGateway, TaskManagerGateway taskManagerGateway) {
SlotOwner slotOwner) {
this.allocationId = checkNotNull(allocationId); this.allocationId = checkNotNull(allocationId);
this.taskManagerLocation = checkNotNull(location); this.taskManagerLocation = checkNotNull(location);
this.physicalSlotNumber = physicalSlotNumber; this.physicalSlotNumber = physicalSlotNumber;
this.resourceProfile = checkNotNull(resourceProfile); this.resourceProfile = checkNotNull(resourceProfile);
this.taskManagerGateway = checkNotNull(taskManagerGateway); this.taskManagerGateway = checkNotNull(taskManagerGateway);
this.slotOwner = checkNotNull(slotOwner);
logicalSlotReference = new AtomicReference<>(null); payloadReference = new AtomicReference<>(null);
} }
// ------------------------------------------------------------------------ // ------------------------------------------------------------------------
...@@ -137,91 +129,55 @@ public class AllocatedSlot { ...@@ -137,91 +129,55 @@ public class AllocatedSlot {
} }
/** /**
* Returns true if this slot is not being used (e.g. a logical slot is allocated from this slot). * Returns the physical slot number of the allocated slot. The physical slot number corresponds
* to the slot index on the TaskExecutor.
* *
* @return true if a logical slot is allocated from this slot, otherwise false * @return Physical slot number of the allocated slot
*/
public boolean isUsed() {
return logicalSlotReference.get() != null;
}
/**
* Triggers the release of the logical slot.
*/ */
public void triggerLogicalSlotRelease() { public int getPhysicalSlotNumber() {
final LogicalSlot logicalSlot = logicalSlotReference.get(); return physicalSlotNumber;
if (logicalSlot != null) {
logicalSlot.releaseSlot();
}
} }
/** /**
* Releases the logical slot. * Returns true if this slot is not being used (e.g. a logical slot is allocated from this slot).
* *
* @return true if the logical slot could be released, false otherwise. * @return true if a logical slot is allocated from this slot, otherwise false
*/ */
public boolean releaseLogicalSlot() { public boolean isUsed() {
final LogicalSlot logicalSlot = logicalSlotReference.get(); return payloadReference.get() != null;
if (logicalSlot != null) {
if (logicalSlot instanceof Slot) {
final Slot slot = (Slot) logicalSlot;
if (slot.markReleased()) {
logicalSlotReference.set(null);
return true;
}
} else {
throw new RuntimeException("Unsupported logical slot type encountered " + logicalSlot.getClass());
}
}
return false;
} }
/** /**
* Allocates a logical {@link SimpleSlot}. * Tries to assign the given payload to this allocated slot. This only works if there has not
* been another payload assigned to this slot.
* *
* @param slotRequestId identifying the corresponding slot request * @param payload to assign to this slot
* @param locality specifying the locality of the allocated slot * @return true if the payload could be assigned, otherwise false
* @return an allocated logical simple slot
* @throws SlotException if we could not allocate a simple slot
*/ */
public SimpleSlot allocateSimpleSlot(SlotRequestID slotRequestId, Locality locality) throws SlotException { public boolean tryAssignPayload(Payload payload) {
final AllocatedSlotContext allocatedSlotContext = new AllocatedSlotContext( return payloadReference.compareAndSet(null, payload);
slotRequestId);
final SimpleSlot simpleSlot = new SimpleSlot(allocatedSlotContext, slotOwner, physicalSlotNumber);
if (logicalSlotReference.compareAndSet(null, simpleSlot)) {
simpleSlot.setLocality(locality);
return simpleSlot;
} else {
throw new SlotException("Could not allocate logical simple slot because the allocated slot is already used.");
}
} }
/** /**
* Allocates a logical {@link SharedSlot}. * Triggers the release of the assigned payload. If the payload could be released,
* then it is removed from the slot.
* *
* @param slotRequestId identifying the corresponding slot request * @param cause of the release operation
* @param slotSharingGroupAssignment the slot sharing group to which the shared slot shall belong * @return true if the payload could be released and was removed from the slot, otherwise false
* @return an allocated logical shared slot
* @throws SlotException if we could not allocate a shared slot
*/ */
public SharedSlot allocateSharedSlot(SlotRequestID slotRequestId, SlotSharingGroupAssignment slotSharingGroupAssignment) throws SlotException { public boolean releasePayload(Throwable cause) {
final Payload payload = payloadReference.get();
final AllocatedSlotContext allocatedSlotContext = new AllocatedSlotContext(
slotRequestId);
final SharedSlot sharedSlot = new SharedSlot(allocatedSlotContext, slotOwner, slotSharingGroupAssignment);
if (logicalSlotReference.compareAndSet(null, sharedSlot)) { if (payload != null) {
if (payload.release(cause)) {
payloadReference.set(null);
return true;
return sharedSlot; } else {
return false;
}
} else { } else {
throw new SlotException("Could not allocate logical shared slot because the allocated slot is already used."); return true;
} }
} }
...@@ -248,40 +204,22 @@ public class AllocatedSlot { ...@@ -248,40 +204,22 @@ public class AllocatedSlot {
return "AllocatedSlot " + allocationId + " @ " + taskManagerLocation + " - " + physicalSlotNumber; return "AllocatedSlot " + allocationId + " @ " + taskManagerLocation + " - " + physicalSlotNumber;
} }
// -----------------------------------------------------------------------
// Interfaces
// -----------------------------------------------------------------------
/** /**
* Slot context for {@link AllocatedSlot}. * Payload which can be assigned to an {@link AllocatedSlot}.
*/ */
private final class AllocatedSlotContext implements SlotContext { interface Payload {
private final SlotRequestID slotRequestId; /**
* Releases the payload. If the payload could be released, then it returns true,
private AllocatedSlotContext(SlotRequestID slotRequestId) { * otherwise false.
this.slotRequestId = Preconditions.checkNotNull(slotRequestId); *
} * @param cause of the payload release
* @return true if the payload could be released, otherwise false
@Override */
public SlotRequestID getSlotRequestId() { boolean release(Throwable cause);
return slotRequestId;
}
@Override
public AllocationID getAllocationId() {
return allocationId;
}
@Override
public TaskManagerLocation getTaskManagerLocation() {
return taskManagerLocation;
}
@Override
public int getPhysicalSlotNumber() {
return physicalSlotNumber;
}
@Override
public TaskManagerGateway getTaskManagerGateway() {
return taskManagerGateway;
}
} }
} }
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.jobmaster.slotpool;
import org.apache.flink.runtime.jobmaster.SlotRequestId;
import org.apache.flink.runtime.instance.SlotSharingGroupId;
import org.apache.flink.runtime.messages.Acknowledge;
import javax.annotation.Nullable;
import java.util.concurrent.CompletableFuture;
/**
* Interface for components which have to perform actions on allocated slots.
*/
public interface AllocatedSlotActions {
/**
* Releases the slot with the given {@link SlotRequestId}. If the slot belonged to a
* slot sharing group, then the corresponding {@link SlotSharingGroupId} has to be
* provided. Additionally, one can provide a cause for the slot release.
*
* @param slotRequestId identifying the slot to release
* @param slotSharingGroupId identifying the slot sharing group to which the slot belongs, null if none
* @param cause of the slot release, null if none
* @return Acknowledge (future) after the slot has been released
*/
CompletableFuture<Acknowledge> releaseSlot(
SlotRequestId slotRequestId,
@Nullable SlotSharingGroupId slotSharingGroupId,
@Nullable Throwable cause);
}
...@@ -16,7 +16,7 @@ ...@@ -16,7 +16,7 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.flink.runtime.instance; package org.apache.flink.runtime.jobmaster.slotpool;
import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple2;
...@@ -130,6 +130,13 @@ public class DualKeyMap<A, B, V> { ...@@ -130,6 +130,13 @@ public class DualKeyMap<A, B, V> {
bMap.clear(); bMap.clear();
} }
// -----------------------------------------------------------------------
// Inner classes
// -----------------------------------------------------------------------
/**
* Collection which contains the values of the dual key map.
*/
private final class Values extends AbstractCollection<V> { private final class Values extends AbstractCollection<V> {
@Override @Override
...@@ -143,6 +150,9 @@ public class DualKeyMap<A, B, V> { ...@@ -143,6 +150,9 @@ public class DualKeyMap<A, B, V> {
} }
} }
/**
* Iterator which iterates over the values of the dual key map.
*/
private final class ValueIterator implements Iterator<V> { private final class ValueIterator implements Iterator<V> {
private final Iterator<Tuple2<B, V>> iterator = aMap.values().iterator(); private final Iterator<Tuple2<B, V>> iterator = aMap.values().iterator();
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.jobmaster.slotpool;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.jobmaster.SlotContext;
import org.apache.flink.runtime.jobmaster.SlotOwner;
import org.apache.flink.runtime.jobmaster.SlotRequestId;
import org.apache.flink.runtime.instance.SlotSharingGroupId;
import org.apache.flink.runtime.jobmanager.scheduler.Locality;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.util.Preconditions;
import javax.annotation.Nullable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Function;
/**
* Implementation of the {@link LogicalSlot} which is used by the {@link SlotPool}.
*/
public class SingleLogicalSlot implements LogicalSlot, AllocatedSlot.Payload {
private static final AtomicReferenceFieldUpdater<SingleLogicalSlot, Payload> PAYLOAD_UPDATER = AtomicReferenceFieldUpdater.newUpdater(
SingleLogicalSlot.class,
Payload.class,
"payload");
private final SlotRequestId slotRequestId;
private final SlotContext slotContext;
// null if the logical slot does not belong to a slot sharing group, otherwise non-null
@Nullable
private final SlotSharingGroupId slotSharingGroupId;
// locality of this slot wrt the requested preferred locations
private final Locality locality;
// owner of this slot to which it is returned upon release
private final SlotOwner slotOwner;
// LogicalSlot.Payload of this slot
private volatile Payload payload;
public SingleLogicalSlot(
SlotRequestId slotRequestId,
SlotContext slotContext,
@Nullable SlotSharingGroupId slotSharingGroupId,
Locality locality,
SlotOwner slotOwner) {
this.slotRequestId = Preconditions.checkNotNull(slotRequestId);
this.slotContext = Preconditions.checkNotNull(slotContext);
this.slotSharingGroupId = slotSharingGroupId;
this.locality = Preconditions.checkNotNull(locality);
this.slotOwner = Preconditions.checkNotNull(slotOwner);
payload = null;
}
@Override
public TaskManagerLocation getTaskManagerLocation() {
return slotContext.getTaskManagerLocation();
}
@Override
public TaskManagerGateway getTaskManagerGateway() {
return slotContext.getTaskManagerGateway();
}
@Override
public Locality getLocality() {
return locality;
}
@Override
public boolean isAlive() {
final Payload currentPayload = payload;
if (currentPayload != null) {
return !currentPayload.getTerminalStateFuture().isDone();
} else {
// We are always alive if there is no payload assigned yet.
// If this slot is released and no payload is assigned, then the TERMINATED_PAYLOAD is assigned
return true;
}
}
@Override
public boolean tryAssignPayload(Payload payload) {
Preconditions.checkNotNull(payload);
return PAYLOAD_UPDATER.compareAndSet(this, null, payload);
}
@Nullable
@Override
public Payload getPayload() {
return payload;
}
@Override
public CompletableFuture<?> releaseSlot(@Nullable Throwable cause) {
// set an already terminated payload if the payload of this slot is still empty
tryAssignPayload(TERMINATED_PAYLOAD);
// notify the payload that the slot will be released
payload.fail(cause);
// Wait until the payload has been terminated. Only then, we return the slot to its rightful owner
return payload.getTerminalStateFuture()
.handle((Object ignored, Throwable throwable) -> slotOwner.returnAllocatedSlot(this))
.thenApply(Function.identity());
}
@Override
public int getPhysicalSlotNumber() {
return slotContext.getPhysicalSlotNumber();
}
@Override
public AllocationID getAllocationId() {
return slotContext.getAllocationId();
}
@Override
public SlotRequestId getSlotRequestId() {
return slotRequestId;
}
@Nullable
@Override
public SlotSharingGroupId getSlotSharingGroupId() {
return slotSharingGroupId;
}
// -------------------------------------------------------------------------
// AllocatedSlot.Payload implementation
// -------------------------------------------------------------------------
/**
* A release of the payload by the {@link AllocatedSlot} triggers a release of the payload of
* the logical slot.
*
* @param cause of the payload release
* @return true if the logical slot's payload could be released, otherwise false
*/
@Override
public boolean release(Throwable cause) {
return releaseSlot(cause).isDone();
}
}
...@@ -16,14 +16,17 @@ ...@@ -16,14 +16,17 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.flink.runtime.instance; package org.apache.flink.runtime.jobmaster.slotpool;
import org.apache.flink.api.common.time.Time; import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile; import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.instance.SlotSharingGroupId;
import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit; import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.jobmaster.SlotRequestId;
import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.rpc.RpcGateway; import org.apache.flink.runtime.rpc.RpcGateway;
...@@ -37,7 +40,7 @@ import java.util.concurrent.CompletableFuture; ...@@ -37,7 +40,7 @@ import java.util.concurrent.CompletableFuture;
/** /**
* The gateway for calls on the {@link SlotPool}. * The gateway for calls on the {@link SlotPool}.
*/ */
public interface SlotPoolGateway extends RpcGateway { public interface SlotPoolGateway extends AllocatedSlotActions, RpcGateway {
// ------------------------------------------------------------------------ // ------------------------------------------------------------------------
// shutdown // shutdown
...@@ -70,41 +73,87 @@ public interface SlotPoolGateway extends RpcGateway { ...@@ -70,41 +73,87 @@ public interface SlotPoolGateway extends RpcGateway {
// registering / un-registering TaskManagers and slots // registering / un-registering TaskManagers and slots
// ------------------------------------------------------------------------ // ------------------------------------------------------------------------
/**
* Registers a TaskExecutor with the given {@link ResourceID} at {@link SlotPool}.
*
* @param resourceID identifying the TaskExecutor to register
* @return Future acknowledge which is completed after the TaskExecutor has been registered
*/
CompletableFuture<Acknowledge> registerTaskManager(ResourceID resourceID); CompletableFuture<Acknowledge> registerTaskManager(ResourceID resourceID);
/**
* Releases a TaskExecutor with the given {@link ResourceID} from the {@link SlotPool}.
*
* @param resourceID identifying the TaskExecutor which shall be released from the SlotPool
* @return Future acknowledge which is completed after the TaskExecutor has been released
*/
CompletableFuture<Acknowledge> releaseTaskManager(ResourceID resourceID); CompletableFuture<Acknowledge> releaseTaskManager(ResourceID resourceID);
/**
* Offers a slot to the {@link SlotPool}. The slot offer can be accepted or
* rejected.
*
* @param taskManagerLocation from which the slot offer originates
* @param taskManagerGateway to talk to the slot offerer
* @param slotOffer slot which is offered to the {@link SlotPool}
* @return True (future) if the slot has been accepted, otherwise false (future)
*/
CompletableFuture<Boolean> offerSlot( CompletableFuture<Boolean> offerSlot(
TaskManagerLocation taskManagerLocation, TaskManagerLocation taskManagerLocation,
TaskManagerGateway taskManagerGateway, TaskManagerGateway taskManagerGateway,
SlotOffer slotOffer); SlotOffer slotOffer);
/**
* Offers multiple slots to the {@link SlotPool}. The slot offerings can be
* individually accepted or rejected by returning the collection of accepted
* slot offers.
*
* @param taskManagerLocation from which the slot offeres originate
* @param taskManagerGateway to talk to the slot offerer
* @param offers slot offers which are offered to the {@link SlotPool}
* @return A collection of accepted slot offers (future). The remaining slot offers are
* implicitly rejected.
*/
CompletableFuture<Collection<SlotOffer>> offerSlots( CompletableFuture<Collection<SlotOffer>> offerSlots(
TaskManagerLocation taskManagerLocation, TaskManagerLocation taskManagerLocation,
TaskManagerGateway taskManagerGateway, TaskManagerGateway taskManagerGateway,
Collection<SlotOffer> offers); Collection<SlotOffer> offers);
/**
* Fails the slot with the given allocation id.
*
* @param allocationID identifying the slot which is being failed
* @param cause of the failure
*/
void failAllocation(AllocationID allocationID, Exception cause); void failAllocation(AllocationID allocationID, Exception cause);
// ------------------------------------------------------------------------ // ------------------------------------------------------------------------
// allocating and disposing slots // allocating and disposing slots
// ------------------------------------------------------------------------ // ------------------------------------------------------------------------
CompletableFuture<LogicalSlot> allocateSlot(
SlotRequestID requestId,
ScheduledUnit task,
ResourceProfile resources,
Iterable<TaskManagerLocation> locationPreferences,
@RpcTimeout Time timeout);
void returnAllocatedSlot(SlotRequestID slotRequestId);
/** /**
* Cancel a slot allocation request. * Requests to allocate a slot for the given {@link ScheduledUnit}. The request
* is uniquely identified by the provided {@link SlotRequestId} which can also
* be used to release the slot via {@link #releaseSlot(SlotRequestId, SlotSharingGroupId, Throwable)}.
* The allocated slot will fulfill the requested {@link ResourceProfile} and it
* is tried to place it on one of the location preferences.
*
* <p>If the returned future must not be completed right away (a.k.a. the slot request
* can be queued), allowQueuedScheduling must be set to true.
* *
* @param slotRequestId identifying the slot allocation request * @param slotRequestId identifying the requested slot
* @return Future acknowledge if the slot allocation has been cancelled * @param scheduledUnit for which to allocate slot
* @param resourceProfile which the allocated slot must fulfill
* @param locationPreferences which define where the allocated slot should be placed, this can also be empty
* @param allowQueuedScheduling true if the slot request can be queued (e.g. the returned future must not be completed)
* @param timeout for the operation
* @return
*/ */
CompletableFuture<Acknowledge> cancelSlotRequest(SlotRequestID slotRequestId); CompletableFuture<LogicalSlot> allocateSlot(
SlotRequestId slotRequestId,
ScheduledUnit scheduledUnit,
ResourceProfile resourceProfile,
Collection<TaskManagerLocation> locationPreferences,
boolean allowQueuedScheduling,
@RpcTimeout Time timeout);
} }
...@@ -16,8 +16,9 @@ ...@@ -16,8 +16,9 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.flink.runtime.instance; package org.apache.flink.runtime.jobmaster.slotpool;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit; import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
......
...@@ -31,7 +31,7 @@ import org.apache.flink.runtime.execution.Environment; ...@@ -31,7 +31,7 @@ import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.executiongraph.ExecutionGraph; import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder; import org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder;
import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy; import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
import org.apache.flink.runtime.instance.SlotProvider; import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings; import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
......
...@@ -27,10 +27,10 @@ import org.apache.flink.runtime.executiongraph.ExecutionGraphException; ...@@ -27,10 +27,10 @@ import org.apache.flink.runtime.executiongraph.ExecutionGraphException;
import org.apache.flink.runtime.executiongraph.ExecutionVertex; import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.IntermediateResult; import org.apache.flink.runtime.executiongraph.IntermediateResult;
import org.apache.flink.runtime.executiongraph.IntermediateResultPartition; import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
import org.apache.flink.runtime.instance.LogicalSlot;
import org.apache.flink.runtime.io.network.ConnectionID; import org.apache.flink.runtime.io.network.ConnectionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.junit.Test; import org.junit.Test;
......
...@@ -36,7 +36,7 @@ import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter; ...@@ -36,7 +36,7 @@ import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore; import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore;
import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy; import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
import org.apache.flink.runtime.instance.SlotProvider; import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.JobVertexID;
......
...@@ -40,7 +40,7 @@ import org.apache.flink.runtime.executiongraph.failover.RestartAllStrategy; ...@@ -40,7 +40,7 @@ import org.apache.flink.runtime.executiongraph.failover.RestartAllStrategy;
import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy; import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway; import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
import org.apache.flink.runtime.instance.Instance; import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.instance.LogicalSlot; import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.instance.SimpleSlot; import org.apache.flink.runtime.instance.SimpleSlot;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType; import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern; import org.apache.flink.runtime.jobgraph.DistributionPattern;
...@@ -54,7 +54,7 @@ import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings; ...@@ -54,7 +54,7 @@ import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings; import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler; import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway; import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway;
import org.apache.flink.runtime.jobmanager.slots.SlotOwner; import org.apache.flink.runtime.jobmaster.SlotOwner;
import org.apache.flink.runtime.operators.BatchTask; import org.apache.flink.runtime.operators.BatchTask;
import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation; import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
import org.apache.flink.runtime.taskmanager.TaskExecutionState; import org.apache.flink.runtime.taskmanager.TaskExecutionState;
......
...@@ -27,8 +27,8 @@ import org.apache.flink.runtime.execution.SuppressRestartsException; ...@@ -27,8 +27,8 @@ import org.apache.flink.runtime.execution.SuppressRestartsException;
import org.apache.flink.runtime.executiongraph.metrics.RestartTimeGauge; import org.apache.flink.runtime.executiongraph.metrics.RestartTimeGauge;
import org.apache.flink.runtime.executiongraph.restart.RestartCallback; import org.apache.flink.runtime.executiongraph.restart.RestartCallback;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategy; import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
import org.apache.flink.runtime.instance.LogicalSlot; import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.instance.TestingLogicalSlot; import org.apache.flink.runtime.jobmaster.TestingLogicalSlot;
import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.JobVertex;
......
...@@ -41,7 +41,7 @@ import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider; ...@@ -41,7 +41,7 @@ import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
import org.apache.flink.runtime.instance.HardwareDescription; import org.apache.flink.runtime.instance.HardwareDescription;
import org.apache.flink.runtime.instance.Instance; import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.instance.InstanceID; import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.instance.SlotProvider; import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType; import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern; import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobGraph;
......
...@@ -28,18 +28,17 @@ import org.apache.flink.runtime.clusterframework.types.AllocationID; ...@@ -28,18 +28,17 @@ import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy; import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
import org.apache.flink.runtime.instance.LogicalSlot; import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.instance.SimpleSlot; import org.apache.flink.runtime.instance.SimpleSlot;
import org.apache.flink.runtime.instance.SlotProvider; import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
import org.apache.flink.runtime.instance.SlotRequestID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType; import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern; import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.ScheduleMode; import org.apache.flink.runtime.jobgraph.ScheduleMode;
import org.apache.flink.runtime.jobmanager.slots.SimpleSlotContext; import org.apache.flink.runtime.instance.SimpleSlotContext;
import org.apache.flink.runtime.jobmanager.slots.SlotOwner; import org.apache.flink.runtime.jobmaster.SlotOwner;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.jobmanager.slots.TestingSlotOwner; import org.apache.flink.runtime.jobmanager.slots.TestingSlotOwner;
import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.messages.Acknowledge;
...@@ -448,7 +447,6 @@ public class ExecutionGraphSchedulingTest extends TestLogger { ...@@ -448,7 +447,6 @@ public class ExecutionGraphSchedulingTest extends TestLogger {
ResourceID.generate(), InetAddress.getLoopbackAddress(), 12345); ResourceID.generate(), InetAddress.getLoopbackAddress(), 12345);
SimpleSlotContext slot = new SimpleSlotContext( SimpleSlotContext slot = new SimpleSlotContext(
new SlotRequestID(),
new AllocationID(), new AllocationID(),
location, location,
0, 0,
......
...@@ -25,7 +25,7 @@ import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy ...@@ -25,7 +25,7 @@ import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy
import org.apache.flink.runtime.executiongraph.restart.InfiniteDelayRestartStrategy; import org.apache.flink.runtime.executiongraph.restart.InfiniteDelayRestartStrategy;
import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway; import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider; import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
import org.apache.flink.runtime.instance.SlotProvider; import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
......
...@@ -41,16 +41,15 @@ import org.apache.flink.runtime.instance.HardwareDescription; ...@@ -41,16 +41,15 @@ import org.apache.flink.runtime.instance.HardwareDescription;
import org.apache.flink.runtime.instance.Instance; import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.instance.InstanceID; import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.instance.SimpleSlot; import org.apache.flink.runtime.instance.SimpleSlot;
import org.apache.flink.runtime.instance.SlotProvider; import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
import org.apache.flink.runtime.instance.SlotRequestID;
import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler; import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
import org.apache.flink.runtime.jobmanager.slots.SimpleSlotContext; import org.apache.flink.runtime.instance.SimpleSlotContext;
import org.apache.flink.runtime.jobmanager.slots.SlotOwner; import org.apache.flink.runtime.jobmaster.SlotOwner;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.TaskMessages.CancelTask; import org.apache.flink.runtime.messages.TaskMessages.CancelTask;
...@@ -245,7 +244,6 @@ public class ExecutionGraphTestUtils { ...@@ -245,7 +244,6 @@ public class ExecutionGraphTestUtils {
ResourceID.generate(), InetAddress.getLoopbackAddress(), 6572); ResourceID.generate(), InetAddress.getLoopbackAddress(), 6572);
final SimpleSlotContext allocatedSlot = new SimpleSlotContext( final SimpleSlotContext allocatedSlot = new SimpleSlotContext(
new SlotRequestID(),
new AllocationID(), new AllocationID(),
location, location,
0, 0,
......
...@@ -22,13 +22,13 @@ import org.apache.flink.api.common.JobID; ...@@ -22,13 +22,13 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy; import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway; import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
import org.apache.flink.runtime.instance.LogicalSlot; import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.instance.SimpleSlot; import org.apache.flink.runtime.instance.SimpleSlot;
import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint; import org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint;
import org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestUtils; import org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestUtils;
import org.apache.flink.runtime.jobmanager.slots.SlotOwner; import org.apache.flink.runtime.jobmaster.SlotOwner;
import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation; import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.runtime.testingUtils.TestingUtils;
......
...@@ -25,14 +25,14 @@ import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor; ...@@ -25,14 +25,14 @@ import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.instance.Instance; import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.instance.LogicalSlot; import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.instance.SimpleSlot; import org.apache.flink.runtime.instance.SimpleSlot;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType; import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.ScheduleMode; import org.apache.flink.runtime.jobgraph.ScheduleMode;
import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway; import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway;
import org.apache.flink.runtime.jobmanager.slots.SlotContext; import org.apache.flink.runtime.jobmaster.SlotContext;
import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testutils.DirectScheduledExecutorService; import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
import org.apache.flink.util.TestLogger; import org.apache.flink.util.TestLogger;
......
...@@ -30,16 +30,15 @@ import org.apache.flink.runtime.clusterframework.types.ResourceID; ...@@ -30,16 +30,15 @@ import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy; import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
import org.apache.flink.runtime.instance.SimpleSlot; import org.apache.flink.runtime.instance.SimpleSlot;
import org.apache.flink.runtime.instance.SlotProvider; import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
import org.apache.flink.runtime.instance.SlotRequestID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType; import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern; import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.slots.SimpleSlotContext; import org.apache.flink.runtime.instance.SimpleSlotContext;
import org.apache.flink.runtime.jobmanager.slots.SlotContext; import org.apache.flink.runtime.jobmaster.SlotContext;
import org.apache.flink.runtime.jobmanager.slots.SlotOwner; import org.apache.flink.runtime.jobmaster.SlotOwner;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.runtime.testingUtils.TestingUtils;
...@@ -235,7 +234,6 @@ public class ExecutionVertexLocalityTest extends TestLogger { ...@@ -235,7 +234,6 @@ public class ExecutionVertexLocalityTest extends TestLogger {
// - exposing test methods in the ExecutionVertex leads to undesirable setters // - exposing test methods in the ExecutionVertex leads to undesirable setters
SlotContext slot = new SimpleSlotContext( SlotContext slot = new SimpleSlotContext(
new SlotRequestID(),
new AllocationID(), new AllocationID(),
location, location,
0, 0,
......
...@@ -59,7 +59,7 @@ public class ExecutionVertexSchedulingTest { ...@@ -59,7 +59,7 @@ public class ExecutionVertexSchedulingTest {
final Instance instance = getInstance(new ActorTaskManagerGateway(DummyActorGateway.INSTANCE)); final Instance instance = getInstance(new ActorTaskManagerGateway(DummyActorGateway.INSTANCE));
final SimpleSlot slot = instance.allocateSimpleSlot(); final SimpleSlot slot = instance.allocateSimpleSlot();
slot.releaseInstanceSlot(); slot.releaseSlot();
assertTrue(slot.isReleased()); assertTrue(slot.isReleased());
Scheduler scheduler = mock(Scheduler.class); Scheduler scheduler = mock(Scheduler.class);
...@@ -91,7 +91,7 @@ public class ExecutionVertexSchedulingTest { ...@@ -91,7 +91,7 @@ public class ExecutionVertexSchedulingTest {
final Instance instance = getInstance(new ActorTaskManagerGateway(DummyActorGateway.INSTANCE)); final Instance instance = getInstance(new ActorTaskManagerGateway(DummyActorGateway.INSTANCE));
final SimpleSlot slot = instance.allocateSimpleSlot(); final SimpleSlot slot = instance.allocateSimpleSlot();
slot.releaseInstanceSlot(); slot.releaseSlot();
assertTrue(slot.isReleased()); assertTrue(slot.isReleased());
final CompletableFuture<SimpleSlot> future = new CompletableFuture<>(); final CompletableFuture<SimpleSlot> future = new CompletableFuture<>();
......
...@@ -31,7 +31,7 @@ import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy; ...@@ -31,7 +31,7 @@ import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategy; import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider; import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
import org.apache.flink.runtime.instance.Instance; import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.instance.SlotProvider; import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType; import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern; import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobgraph.JobStatus;
......
...@@ -36,7 +36,7 @@ import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy ...@@ -36,7 +36,7 @@ import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy
import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy; import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategy; import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider; import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
import org.apache.flink.runtime.instance.SlotProvider; import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.JobVertex;
......
...@@ -22,7 +22,7 @@ import org.apache.flink.api.common.JobID; ...@@ -22,7 +22,7 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time; import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategy; import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
import org.apache.flink.runtime.instance.SlotProvider; import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.jobgraph.OperatorID;
......
...@@ -30,7 +30,7 @@ import org.apache.flink.runtime.executiongraph.failover.RestartPipelinedRegionSt ...@@ -30,7 +30,7 @@ import org.apache.flink.runtime.executiongraph.failover.RestartPipelinedRegionSt
import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy; import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategy; import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider; import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
import org.apache.flink.runtime.instance.SlotProvider; import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.JobVertex;
......
...@@ -18,8 +18,8 @@ ...@@ -18,8 +18,8 @@
package org.apache.flink.runtime.executiongraph; package org.apache.flink.runtime.executiongraph;
import org.apache.flink.runtime.instance.LogicalSlot; import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.instance.SlotProvider; import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit; import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
......
...@@ -30,7 +30,7 @@ import org.apache.flink.runtime.executiongraph.ExecutionGraph; ...@@ -30,7 +30,7 @@ import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder; import org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder;
import org.apache.flink.runtime.executiongraph.ExecutionVertex; import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy; import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
import org.apache.flink.runtime.instance.SlotProvider; import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType; import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern; import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobGraph;
......
...@@ -22,16 +22,15 @@ import org.apache.flink.api.common.JobID; ...@@ -22,16 +22,15 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.instance.LogicalSlot; import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.instance.SimpleSlot; import org.apache.flink.runtime.instance.SimpleSlot;
import org.apache.flink.runtime.instance.Slot; import org.apache.flink.runtime.instance.Slot;
import org.apache.flink.runtime.instance.SlotProvider; import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
import org.apache.flink.runtime.instance.SlotRequestID;
import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException; import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit; import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
import org.apache.flink.runtime.jobmanager.slots.SlotContext; import org.apache.flink.runtime.instance.SimpleSlotContext;
import org.apache.flink.runtime.jobmanager.slots.SimpleSlotContext; import org.apache.flink.runtime.jobmaster.SlotContext;
import org.apache.flink.runtime.jobmanager.slots.SlotOwner; import org.apache.flink.runtime.jobmaster.SlotOwner;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.util.Preconditions; import org.apache.flink.util.Preconditions;
...@@ -63,7 +62,6 @@ public class SimpleSlotProvider implements SlotProvider, SlotOwner { ...@@ -63,7 +62,6 @@ public class SimpleSlotProvider implements SlotProvider, SlotOwner {
for (int i = 0; i < numSlots; i++) { for (int i = 0; i < numSlots; i++) {
SimpleSlotContext as = new SimpleSlotContext( SimpleSlotContext as = new SimpleSlotContext(
new SlotRequestID(),
new AllocationID(), new AllocationID(),
new TaskManagerLocation(ResourceID.generate(), InetAddress.getLoopbackAddress(), 10000 + i), new TaskManagerLocation(ResourceID.generate(), InetAddress.getLoopbackAddress(), 10000 + i),
0, 0,
......
...@@ -84,10 +84,10 @@ public class InstanceTest { ...@@ -84,10 +84,10 @@ public class InstanceTest {
} }
// release the slots. this returns them to the instance // release the slots. this returns them to the instance
slot1.releaseInstanceSlot(); slot1.releaseSlot();
slot2.releaseInstanceSlot(); slot2.releaseSlot();
slot3.releaseInstanceSlot(); slot3.releaseSlot();
slot4.releaseInstanceSlot(); slot4.releaseSlot();
assertEquals(4, instance.getNumberOfAvailableSlots()); assertEquals(4, instance.getNumberOfAvailableSlots());
assertEquals(0, instance.getNumberOfAllocatedSlots()); assertEquals(0, instance.getNumberOfAllocatedSlots());
......
...@@ -83,7 +83,7 @@ public class SharedSlotsTest extends TestLogger { ...@@ -83,7 +83,7 @@ public class SharedSlotsTest extends TestLogger {
assertEquals(0, slot.getRootSlotNumber()); assertEquals(0, slot.getRootSlotNumber());
// release the slot immediately. // release the slot immediately.
slot.releaseInstanceSlot(); slot.releaseSlot();
assertTrue(slot.isCanceled()); assertTrue(slot.isCanceled());
assertTrue(slot.isReleased()); assertTrue(slot.isReleased());
...@@ -202,7 +202,7 @@ public class SharedSlotsTest extends TestLogger { ...@@ -202,7 +202,7 @@ public class SharedSlotsTest extends TestLogger {
assertEquals(0, assignment.getNumberOfAvailableSlotsForGroup(vid4)); assertEquals(0, assignment.getNumberOfAvailableSlotsForGroup(vid4));
// release from the root. // release from the root.
sharedSlot.releaseInstanceSlot(); sharedSlot.releaseSlot();
assertTrue(sharedSlot.isReleased()); assertTrue(sharedSlot.isReleased());
assertTrue(sub1.isReleased()); assertTrue(sub1.isReleased());
...@@ -261,7 +261,7 @@ public class SharedSlotsTest extends TestLogger { ...@@ -261,7 +261,7 @@ public class SharedSlotsTest extends TestLogger {
// release from the leaves. // release from the leaves.
sub2.releaseInstanceSlot(); sub2.releaseSlot();
assertTrue(sharedSlot.isAlive()); assertTrue(sharedSlot.isAlive());
assertTrue(sub1.isAlive()); assertTrue(sub1.isAlive());
...@@ -276,7 +276,7 @@ public class SharedSlotsTest extends TestLogger { ...@@ -276,7 +276,7 @@ public class SharedSlotsTest extends TestLogger {
assertEquals(2, sharedSlot.getNumberLeaves()); assertEquals(2, sharedSlot.getNumberLeaves());
sub1.releaseInstanceSlot(); sub1.releaseSlot();
assertTrue(sharedSlot.isAlive()); assertTrue(sharedSlot.isAlive());
assertTrue(sub1.isReleased()); assertTrue(sub1.isReleased());
...@@ -290,7 +290,7 @@ public class SharedSlotsTest extends TestLogger { ...@@ -290,7 +290,7 @@ public class SharedSlotsTest extends TestLogger {
assertEquals(1, sharedSlot.getNumberLeaves()); assertEquals(1, sharedSlot.getNumberLeaves());
sub3.releaseInstanceSlot(); sub3.releaseSlot();
assertTrue(sharedSlot.isReleased()); assertTrue(sharedSlot.isReleased());
assertTrue(sub1.isReleased()); assertTrue(sub1.isReleased());
...@@ -344,7 +344,7 @@ public class SharedSlotsTest extends TestLogger { ...@@ -344,7 +344,7 @@ public class SharedSlotsTest extends TestLogger {
assertEquals(1, assignment.getNumberOfSlots()); assertEquals(1, assignment.getNumberOfSlots());
sub2.releaseInstanceSlot(); sub2.releaseSlot();
assertEquals(1, sharedSlot.getNumberLeaves()); assertEquals(1, sharedSlot.getNumberLeaves());
assertEquals(0, assignment.getNumberOfAvailableSlotsForGroup(vid1)); assertEquals(0, assignment.getNumberOfAvailableSlotsForGroup(vid1));
...@@ -362,8 +362,8 @@ public class SharedSlotsTest extends TestLogger { ...@@ -362,8 +362,8 @@ public class SharedSlotsTest extends TestLogger {
assertEquals(0, assignment.getNumberOfAvailableSlotsForGroup(vid3)); assertEquals(0, assignment.getNumberOfAvailableSlotsForGroup(vid3));
assertEquals(1, assignment.getNumberOfSlots()); assertEquals(1, assignment.getNumberOfSlots());
sub3.releaseInstanceSlot(); sub3.releaseSlot();
sub1.releaseInstanceSlot(); sub1.releaseSlot();
assertTrue(sharedSlot.isReleased()); assertTrue(sharedSlot.isReleased());
assertEquals(0, sharedSlot.getNumberLeaves()); assertEquals(0, sharedSlot.getNumberLeaves());
...@@ -439,7 +439,7 @@ public class SharedSlotsTest extends TestLogger { ...@@ -439,7 +439,7 @@ public class SharedSlotsTest extends TestLogger {
assertFalse(constraint.isAssigned()); assertFalse(constraint.isAssigned());
// we do not immediately lock the location // we do not immediately lock the location
headSlot.releaseInstanceSlot(); headSlot.releaseSlot();
assertEquals(1, sharedSlot.getNumberLeaves()); assertEquals(1, sharedSlot.getNumberLeaves());
assertNotNull(constraint.getSharedSlot()); assertNotNull(constraint.getSharedSlot());
...@@ -464,8 +464,8 @@ public class SharedSlotsTest extends TestLogger { ...@@ -464,8 +464,8 @@ public class SharedSlotsTest extends TestLogger {
assertEquals(4, sharedSlot.getNumberLeaves()); assertEquals(4, sharedSlot.getNumberLeaves());
// we release our co-location constraint tasks // we release our co-location constraint tasks
headSlot.releaseInstanceSlot(); headSlot.releaseSlot();
tailSlot.releaseInstanceSlot(); tailSlot.releaseSlot();
assertEquals(2, sharedSlot.getNumberLeaves()); assertEquals(2, sharedSlot.getNumberLeaves());
assertTrue(headSlot.isReleased()); assertTrue(headSlot.isReleased());
...@@ -497,10 +497,10 @@ public class SharedSlotsTest extends TestLogger { ...@@ -497,10 +497,10 @@ public class SharedSlotsTest extends TestLogger {
assertEquals(constraint.getGroupId(), constraint.getSharedSlot().getGroupID()); assertEquals(constraint.getGroupId(), constraint.getSharedSlot().getGroupID());
// release all // release all
sourceSlot.releaseInstanceSlot(); sourceSlot.releaseSlot();
headSlot.releaseInstanceSlot(); headSlot.releaseSlot();
tailSlot.releaseInstanceSlot(); tailSlot.releaseSlot();
sinkSlot.releaseInstanceSlot(); sinkSlot.releaseSlot();
assertTrue(sharedSlot.isReleased()); assertTrue(sharedSlot.isReleased());
assertTrue(sourceSlot.isReleased()); assertTrue(sourceSlot.isReleased());
...@@ -573,10 +573,10 @@ public class SharedSlotsTest extends TestLogger { ...@@ -573,10 +573,10 @@ public class SharedSlotsTest extends TestLogger {
assertEquals(4, sharedSlot.getNumberLeaves()); assertEquals(4, sharedSlot.getNumberLeaves());
// release all // release all
sourceSlot.releaseInstanceSlot(); sourceSlot.releaseSlot();
headSlot.releaseInstanceSlot(); headSlot.releaseSlot();
tailSlot.releaseInstanceSlot(); tailSlot.releaseSlot();
sinkSlot.releaseInstanceSlot(); sinkSlot.releaseSlot();
assertTrue(sharedSlot.isReleased()); assertTrue(sharedSlot.isReleased());
assertTrue(sourceSlot.isReleased()); assertTrue(sourceSlot.isReleased());
...@@ -613,7 +613,7 @@ public class SharedSlotsTest extends TestLogger { ...@@ -613,7 +613,7 @@ public class SharedSlotsTest extends TestLogger {
SharedSlot sharedSlot = instance.allocateSharedSlot(assignment); SharedSlot sharedSlot = instance.allocateSharedSlot(assignment);
SimpleSlot sub = assignment.addSharedSlotAndAllocateSubSlot(sharedSlot, Locality.UNCONSTRAINED, vid); SimpleSlot sub = assignment.addSharedSlotAndAllocateSubSlot(sharedSlot, Locality.UNCONSTRAINED, vid);
sub.releaseInstanceSlot(); sub.releaseSlot();
assertTrue(sub.isReleased()); assertTrue(sub.isReleased());
assertTrue(sharedSlot.isReleased()); assertTrue(sharedSlot.isReleased());
...@@ -648,7 +648,7 @@ public class SharedSlotsTest extends TestLogger { ...@@ -648,7 +648,7 @@ public class SharedSlotsTest extends TestLogger {
assertNull(sub.getGroupID()); assertNull(sub.getGroupID());
assertEquals(constraint.getSharedSlot(), sub.getParent()); assertEquals(constraint.getSharedSlot(), sub.getParent());
sub.releaseInstanceSlot(); sub.releaseSlot();
assertTrue(sub.isReleased()); assertTrue(sub.isReleased());
assertTrue(sharedSlot.isReleased()); assertTrue(sharedSlot.isReleased());
......
...@@ -20,6 +20,7 @@ package org.apache.flink.runtime.instance; ...@@ -20,6 +20,7 @@ package org.apache.flink.runtime.instance;
import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway; import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway;
import org.apache.flink.runtime.jobmaster.TestingPayload;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.util.TestLogger; import org.apache.flink.util.TestLogger;
...@@ -43,7 +44,7 @@ public class SimpleSlotTest extends TestLogger { ...@@ -43,7 +44,7 @@ public class SimpleSlotTest extends TestLogger {
SimpleSlot slot = getSlot(); SimpleSlot slot = getSlot();
assertTrue(slot.isAlive()); assertTrue(slot.isAlive());
slot.releaseInstanceSlot(); slot.releaseSlot();
assertFalse(slot.isAlive()); assertFalse(slot.isAlive());
assertTrue(slot.isCanceled()); assertTrue(slot.isCanceled());
assertTrue(slot.isReleased()); assertTrue(slot.isReleased());
...@@ -111,7 +112,7 @@ public class SimpleSlotTest extends TestLogger { ...@@ -111,7 +112,7 @@ public class SimpleSlotTest extends TestLogger {
// assign to released // assign to released
{ {
SimpleSlot slot = getSlot(); SimpleSlot slot = getSlot();
slot.releaseInstanceSlot(); slot.releaseSlot();
assertFalse(slot.tryAssignPayload(payload1)); assertFalse(slot.tryAssignPayload(payload1));
assertNull(slot.getPayload()); assertNull(slot.getPayload());
......
...@@ -21,8 +21,8 @@ package org.apache.flink.runtime.instance; ...@@ -21,8 +21,8 @@ package org.apache.flink.runtime.instance;
import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.scheduler.Locality; import org.apache.flink.runtime.jobmanager.scheduler.Locality;
import org.apache.flink.runtime.jobmanager.slots.SlotOwner;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.jobmaster.SlotOwner;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.util.TestLogger; import org.apache.flink.util.TestLogger;
......
...@@ -143,7 +143,7 @@ public class CoLocationConstraintTest { ...@@ -143,7 +143,7 @@ public class CoLocationConstraintTest {
assertEquals(instance2.getTaskManagerLocation(), constraint.getLocation()); assertEquals(instance2.getTaskManagerLocation(), constraint.getLocation());
// release the slot // release the slot
slot2_1.releaseInstanceSlot(); slot2_1.releaseSlot();
// we should still have a location // we should still have a location
assertTrue(constraint.isAssigned()); assertTrue(constraint.isAssigned());
......
...@@ -18,7 +18,8 @@ ...@@ -18,7 +18,8 @@
package org.apache.flink.runtime.jobmanager.slots; package org.apache.flink.runtime.jobmanager.slots;
import org.apache.flink.runtime.instance.LogicalSlot; import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.jobmaster.SlotOwner;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册