[FLINK-8078] Introduce LogicalSlot interface

The LogicalSlot interface decouples the task deployment from the actual
slot implementation which at the moment is Slot, SimpleSlot and SharedSlot.
This is a helpful step to introduce a different slot implementation for
Flip-6.

This closes #5086.
上级 0d551640
......@@ -24,7 +24,7 @@ import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionEdge;
import org.apache.flink.runtime.executiongraph.ExecutionGraphException;
import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
import org.apache.flink.runtime.instance.SimpleSlot;
import org.apache.flink.runtime.instance.LogicalSlot;
import org.apache.flink.runtime.io.network.ConnectionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
......@@ -85,10 +85,9 @@ public class InputChannelDeploymentDescriptor implements Serializable {
*/
public static InputChannelDeploymentDescriptor[] fromEdges(
ExecutionEdge[] edges,
SimpleSlot consumerSlot,
ResourceID consumerResourceId,
boolean allowLazyDeployment) throws ExecutionGraphException {
final ResourceID consumerTaskManager = consumerSlot.getTaskManagerID();
final InputChannelDeploymentDescriptor[] icdd = new InputChannelDeploymentDescriptor[edges.length];
// Each edge is connected to a different result partition
......@@ -97,7 +96,7 @@ public class InputChannelDeploymentDescriptor implements Serializable {
final Execution producer = consumedPartition.getProducer().getCurrentExecutionAttempt();
final ExecutionState producerState = producer.getState();
final SimpleSlot producerSlot = producer.getAssignedResource();
final LogicalSlot producerSlot = producer.getAssignedResource();
final ResultPartitionLocation partitionLocation;
......@@ -111,7 +110,7 @@ public class InputChannelDeploymentDescriptor implements Serializable {
final TaskManagerLocation partitionTaskManagerLocation = producerSlot.getTaskManagerLocation();
final ResourceID partitionTaskManager = partitionTaskManagerLocation.getResourceID();
if (partitionTaskManager.equals(consumerTaskManager)) {
if (partitionTaskManager.equals(consumerResourceId)) {
// Consuming task is deployed to the same TaskManager as the partition => local
partitionLocation = ResultPartitionLocation.createLocal();
}
......
......@@ -33,7 +33,7 @@ import org.apache.flink.runtime.deployment.PartialInputChannelDeploymentDescript
import org.apache.flink.runtime.deployment.ResultPartitionLocation;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.instance.SimpleSlot;
import org.apache.flink.runtime.instance.LogicalSlot;
import org.apache.flink.runtime.instance.SlotProvider;
import org.apache.flink.runtime.io.network.ConnectionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
......@@ -98,9 +98,9 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
private static final AtomicReferenceFieldUpdater<Execution, ExecutionState> STATE_UPDATER =
AtomicReferenceFieldUpdater.newUpdater(Execution.class, ExecutionState.class, "state");
private static final AtomicReferenceFieldUpdater<Execution, SimpleSlot> ASSIGNED_SLOT_UPDATER = AtomicReferenceFieldUpdater.newUpdater(
private static final AtomicReferenceFieldUpdater<Execution, LogicalSlot> ASSIGNED_SLOT_UPDATER = AtomicReferenceFieldUpdater.newUpdater(
Execution.class,
SimpleSlot.class,
LogicalSlot.class,
"assignedResource");
private static final Logger LOG = ExecutionGraph.LOG;
......@@ -141,7 +141,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
private volatile ExecutionState state = CREATED;
private volatile SimpleSlot assignedResource;
private volatile LogicalSlot assignedResource;
private volatile Throwable failureCause; // once assigned, never changes
......@@ -240,7 +240,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
return taskManagerLocationFuture;
}
public SimpleSlot getAssignedResource() {
public LogicalSlot getAssignedResource() {
return assignedResource;
}
......@@ -248,21 +248,21 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
* Tries to assign the given slot to the execution. The assignment works only if the
* Execution is in state SCHEDULED. Returns true, if the resource could be assigned.
*
* @param slot to assign to this execution
* @param logicalSlot to assign to this execution
* @return true if the slot could be assigned to the execution, otherwise false
*/
@VisibleForTesting
boolean tryAssignResource(final SimpleSlot slot) {
checkNotNull(slot);
boolean tryAssignResource(final LogicalSlot logicalSlot) {
checkNotNull(logicalSlot);
// only allow to set the assigned resource in state SCHEDULED or CREATED
// note: we also accept resource assignment when being in state CREATED for testing purposes
if (state == SCHEDULED || state == CREATED) {
if (ASSIGNED_SLOT_UPDATER.compareAndSet(this, null, slot)) {
if (ASSIGNED_SLOT_UPDATER.compareAndSet(this, null, logicalSlot)) {
// check for concurrent modification (e.g. cancelling call)
if (state == SCHEDULED || state == CREATED) {
checkState(!taskManagerLocationFuture.isDone(), "The TaskManagerLocationFuture should not be set if we haven't assigned a resource yet.");
taskManagerLocationFuture.complete(slot.getTaskManagerLocation());
taskManagerLocationFuture.complete(logicalSlot.getTaskManagerLocation());
return true;
} else {
......@@ -283,7 +283,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
@Override
public TaskManagerLocation getAssignedResourceLocation() {
// returns non-null only when a location is already assigned
final SimpleSlot currentAssignedResource = assignedResource;
final LogicalSlot currentAssignedResource = assignedResource;
return currentAssignedResource != null ? currentAssignedResource.getTaskManagerLocation() : null;
}
......@@ -442,14 +442,14 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
queued,
preferredLocations))
.thenApply(
(SimpleSlot slot) -> {
if (tryAssignResource(slot)) {
(LogicalSlot logicalSlot) -> {
if (tryAssignResource(logicalSlot)) {
return this;
} else {
// release the slot
slot.releaseSlot();
logicalSlot.releaseSlot();
throw new CompletionException(new FlinkException("Could not assign slot " + slot + " to execution " + this + " because it has already been assigned "));
throw new CompletionException(new FlinkException("Could not assign slot " + logicalSlot + " to execution " + this + " because it has already been assigned "));
}
});
}
......@@ -465,7 +465,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
* @throws JobException if the execution cannot be deployed to the assigned resource
*/
public void deploy() throws JobException {
final SimpleSlot slot = assignedResource;
final LogicalSlot slot = assignedResource;
checkNotNull(slot, "In order to deploy the execution we first have to assign a resource via tryAssignResource.");
......@@ -493,7 +493,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
try {
// good, we are allowed to deploy
if (!slot.setExecutedVertex(this)) {
if (!slot.setExecution(this)) {
throw new JobException("Could not assign the ExecutionVertex to the slot " + slot);
}
......@@ -545,7 +545,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
* Sends stop RPC call.
*/
public void stop() {
final SimpleSlot slot = assignedResource;
final LogicalSlot slot = assignedResource;
if (slot != null) {
final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
......@@ -608,7 +608,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
try {
vertex.getExecutionGraph().deregisterExecution(this);
final SimpleSlot slot = assignedResource;
final LogicalSlot slot = assignedResource;
if (slot != null) {
slot.releaseSlot();
......@@ -691,7 +691,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
// ----------------------------------------------------------------
else {
if (consumerState == RUNNING) {
final SimpleSlot consumerSlot = consumer.getAssignedResource();
final LogicalSlot consumerSlot = consumer.getAssignedResource();
if (consumerSlot == null) {
// The consumer has been reset concurrently
......@@ -702,7 +702,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
.getCurrentAssignedResource().getTaskManagerLocation();
final ResourceID partitionTaskManager = partitionTaskManagerLocation.getResourceID();
final ResourceID consumerTaskManager = consumerSlot.getTaskManagerID();
final ResourceID consumerTaskManager = consumerSlot.getTaskManagerLocation().getResourceID();
final ResultPartitionID partitionId = new ResultPartitionID(partition.getPartitionId(), attemptId);
......@@ -778,7 +778,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
int maxStrackTraceDepth,
Time timeout) {
final SimpleSlot slot = assignedResource;
final LogicalSlot slot = assignedResource;
if (slot != null) {
final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
......@@ -802,7 +802,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
* @param timestamp of the completed checkpoint
*/
public void notifyCheckpointComplete(long checkpointId, long timestamp) {
final SimpleSlot slot = assignedResource;
final LogicalSlot slot = assignedResource;
if (slot != null) {
final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
......@@ -822,7 +822,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
* @param checkpointOptions of the checkpoint to trigger
*/
public void triggerCheckpoint(long checkpointId, long timestamp, CheckpointOptions checkpointOptions) {
final SimpleSlot slot = assignedResource;
final LogicalSlot slot = assignedResource;
if (slot != null) {
final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
......@@ -880,7 +880,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
updateAccumulatorsAndMetrics(userAccumulators, metrics);
final SimpleSlot slot = assignedResource;
final LogicalSlot slot = assignedResource;
if (slot != null) {
slot.releaseSlot();
......@@ -938,7 +938,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
if (transitionState(current, CANCELED)) {
try {
final SimpleSlot slot = assignedResource;
final LogicalSlot slot = assignedResource;
if (slot != null) {
slot.releaseSlot();
......@@ -1035,7 +1035,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
updateAccumulatorsAndMetrics(userAccumulators, metrics);
try {
final SimpleSlot slot = assignedResource;
final LogicalSlot slot = assignedResource;
if (slot != null) {
slot.releaseSlot();
}
......@@ -1119,7 +1119,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
* The sending is tried up to NUM_CANCEL_CALL_TRIES times.
*/
private void sendCancelRpcCall() {
final SimpleSlot slot = assignedResource;
final LogicalSlot slot = assignedResource;
if (slot != null) {
final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
......@@ -1140,7 +1140,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
}
private void sendFailIntermediateResultPartitionsRpcCall() {
final SimpleSlot slot = assignedResource;
final LogicalSlot slot = assignedResource;
if (slot != null) {
final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
......@@ -1158,7 +1158,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
private void sendUpdatePartitionInfoRpcCall(
final Iterable<PartitionInfo> partitionInfos) {
final SimpleSlot slot = assignedResource;
final LogicalSlot slot = assignedResource;
if (slot != null) {
final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
......@@ -1318,7 +1318,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
@Override
public String toString() {
final SimpleSlot slot = assignedResource;
final LogicalSlot slot = assignedResource;
return String.format("Attempt #%d (%s) @ %s - [%s]", attemptNumber, vertex.getTaskNameWithSubtaskIndex(),
(slot == null ? "(unassigned)" : slot), state);
......
......@@ -32,6 +32,7 @@ import org.apache.flink.runtime.deployment.PartialInputChannelDeploymentDescript
import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.instance.LogicalSlot;
import org.apache.flink.runtime.instance.SimpleSlot;
import org.apache.flink.runtime.instance.SlotProvider;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
......@@ -272,7 +273,7 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
return currentExecution.getTaskManagerLocationFuture();
}
public SimpleSlot getCurrentAssignedResource() {
public LogicalSlot getCurrentAssignedResource() {
return currentExecution.getAssignedResource();
}
......@@ -744,7 +745,7 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
*/
TaskDeploymentDescriptor createDeploymentDescriptor(
ExecutionAttemptID executionId,
SimpleSlot targetSlot,
LogicalSlot targetSlot,
TaskStateSnapshot taskStateHandles,
int attemptNumber) throws ExecutionGraphException {
......@@ -779,8 +780,10 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
for (ExecutionEdge[] edges : inputEdges) {
InputChannelDeploymentDescriptor[] partitions = InputChannelDeploymentDescriptor
.fromEdges(edges, targetSlot, lazyScheduling);
InputChannelDeploymentDescriptor[] partitions = InputChannelDeploymentDescriptor.fromEdges(
edges,
targetSlot.getTaskManagerLocation().getResourceID(),
lazyScheduling);
// If the produced partition has multiple consumers registered, we
// need to request the one matching our sub task index.
......@@ -829,10 +832,10 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
serializedJobInformation,
serializedTaskInformation,
executionId,
targetSlot.getAllocatedSlot().getSlotAllocationId(),
targetSlot.getAllocationId(),
subTaskIndex,
attemptNumber,
targetSlot.getRoot().getSlotNumber(),
targetSlot.getPhysicalSlotNumber(),
taskStateHandles,
producedPartitions,
consumedPartitions);
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.instance;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
/**
* A logical slot represents a resource on a TaskManager into
* which a single task can be deployed.
*/
public interface LogicalSlot {
/**
* Return the TaskManager location of this slot
*
* @return TaskManager location of this slot
*/
TaskManagerLocation getTaskManagerLocation();
/**
* Return the TaskManager gateway to talk to the TaskManager.
*
* @return TaskManager gateway to talk to the TaskManager
*/
TaskManagerGateway getTaskManagerGateway();
/**
* True if the slot is still alive.
*
* @return True if the slot is still alive, otherwise false
*/
boolean isAlive();
/**
* True if the slot is canceled.
*
* @return True if the slot is canceled, otherwise false
*/
boolean isCanceled();
/**
* True if the slot is released.
*
* @return True if the slot is released, otherwise false
*/
boolean isReleased();
/**
* Sets the execution for this slot.
*
* @param execution to set for this slot
* @return true if the slot could be set, otherwise false
*/
boolean setExecution(Execution execution);
/**
* Releases this slot.
*/
void releaseSlot();
/**
* Gets the slot number on the TaskManager.
*
* @return slot number
*/
int getPhysicalSlotNumber();
/**
* Gets the allocation id of this slot.
*
* @return allocation id of this slot
*/
AllocationID getAllocationId();
}
......@@ -18,6 +18,7 @@
package org.apache.flink.runtime.instance;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.api.common.JobID;
......@@ -37,7 +38,7 @@ import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
* <p>If this slot is part of a {@link SharedSlot}, then the parent attribute will point to that shared slot.
* If not, then the parent attribute is null.
*/
public class SimpleSlot extends Slot {
public class SimpleSlot extends Slot implements LogicalSlot {
/** The updater used to atomically swap in the execution */
private static final AtomicReferenceFieldUpdater<SimpleSlot, Execution> VERTEX_UPDATER =
......@@ -163,7 +164,8 @@ public class SimpleSlot extends Slot {
* @param executedVertex The vertex to assign to this slot.
* @return True, if the vertex was assigned, false, otherwise.
*/
public boolean setExecutedVertex(Execution executedVertex) {
@Override
public boolean setExecution(Execution executedVertex) {
if (executedVertex == null) {
throw new NullPointerException();
}
......@@ -231,6 +233,16 @@ public class SimpleSlot extends Slot {
}
}
@Override
public int getPhysicalSlotNumber() {
return getRootSlotNumber();
}
@Override
public AllocationID getAllocationId() {
return getAllocatedSlot().getSlotAllocationId();
}
// ------------------------------------------------------------------------
// Utilities
// ------------------------------------------------------------------------
......
......@@ -266,7 +266,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
// ------------------------------------------------------------------------
@Override
public CompletableFuture<SimpleSlot> allocateSlot(
public CompletableFuture<LogicalSlot> allocateSlot(
SlotRequestID requestId,
ScheduledUnit task,
ResourceProfile resources,
......@@ -303,7 +303,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
return CompletableFuture.completedFuture(Acknowledge.get());
}
CompletableFuture<SimpleSlot> internalAllocateSlot(
CompletableFuture<LogicalSlot> internalAllocateSlot(
SlotRequestID requestId,
ScheduledUnit task,
ResourceProfile resources,
......@@ -318,7 +318,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
}
// the request will be completed by a future
final CompletableFuture<SimpleSlot> future = new CompletableFuture<>();
final CompletableFuture<LogicalSlot> future = new CompletableFuture<>();
// (2) need to request a slot
if (resourceManagerGateway == null) {
......@@ -433,7 +433,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
private void stashRequestWaitingForResourceManager(
final SlotRequestID requestId,
final ResourceProfile resources,
final CompletableFuture<SimpleSlot> future) {
final CompletableFuture<LogicalSlot> future) {
LOG.info("Cannot serve slot request, no ResourceManager connected. " +
"Adding as pending request {}", requestId);
......@@ -1087,15 +1087,15 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
}
@Override
public CompletableFuture<SimpleSlot> allocateSlot(
public CompletableFuture<LogicalSlot> allocateSlot(
ScheduledUnit task,
boolean allowQueued,
Collection<TaskManagerLocation> preferredLocations) {
final SlotRequestID requestId = new SlotRequestID();
CompletableFuture<SimpleSlot> slotFuture = gateway.allocateSlot(requestId, task, ResourceProfile.UNKNOWN, preferredLocations, timeout);
CompletableFuture<LogicalSlot> slotFuture = gateway.allocateSlot(requestId, task, ResourceProfile.UNKNOWN, preferredLocations, timeout);
slotFuture.whenComplete(
(SimpleSlot slot, Throwable failure) -> {
(LogicalSlot slot, Throwable failure) -> {
if (failure != null) {
gateway.cancelSlotAllocation(requestId);
}
......@@ -1113,13 +1113,13 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
private final SlotRequestID slotRequestId;
private final CompletableFuture<SimpleSlot> future;
private final CompletableFuture<LogicalSlot> future;
private final ResourceProfile resourceProfile;
PendingRequest(
SlotRequestID slotRequestId,
CompletableFuture<SimpleSlot> future,
CompletableFuture<LogicalSlot> future,
ResourceProfile resourceProfile) {
this.slotRequestId = Preconditions.checkNotNull(slotRequestId);
this.future = Preconditions.checkNotNull(future);
......@@ -1130,7 +1130,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
return slotRequestId;
}
public CompletableFuture<SimpleSlot> getFuture() {
public CompletableFuture<LogicalSlot> getFuture() {
return future;
}
......
......@@ -86,7 +86,7 @@ public interface SlotPoolGateway extends RpcGateway {
// allocating and disposing slots
// ------------------------------------------------------------------------
CompletableFuture<SimpleSlot> allocateSlot(
CompletableFuture<LogicalSlot> allocateSlot(
SlotRequestID requestId,
ScheduledUnit task,
ResourceProfile resources,
......
......@@ -45,7 +45,7 @@ public interface SlotProvider {
* @param preferredLocations preferred locations for the slot allocation
* @return The future of the allocation
*/
CompletableFuture<SimpleSlot> allocateSlot(
CompletableFuture<LogicalSlot> allocateSlot(
ScheduledUnit task,
boolean allowQueued,
Collection<TaskManagerLocation> preferredLocations);
......
......@@ -24,6 +24,7 @@ import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.instance.InstanceDiedException;
import org.apache.flink.runtime.instance.InstanceListener;
import org.apache.flink.runtime.instance.LogicalSlot;
import org.apache.flink.runtime.instance.SharedSlot;
import org.apache.flink.runtime.instance.SimpleSlot;
import org.apache.flink.runtime.instance.SlotProvider;
......@@ -133,7 +134,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener, Sl
@Override
public CompletableFuture<SimpleSlot> allocateSlot(
public CompletableFuture<LogicalSlot> allocateSlot(
ScheduledUnit task,
boolean allowQueued,
Collection<TaskManagerLocation> preferredLocations) {
......@@ -146,7 +147,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener, Sl
}
else if (ret instanceof CompletableFuture) {
@SuppressWarnings("unchecked")
CompletableFuture<SimpleSlot> typed = (CompletableFuture<SimpleSlot>) ret;
CompletableFuture<LogicalSlot> typed = (CompletableFuture<LogicalSlot>) ret;
return typed;
}
else {
......@@ -321,7 +322,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener, Sl
else {
// no resource available now, so queue the request
if (queueIfNoResource) {
CompletableFuture<SimpleSlot> future = new CompletableFuture<>();
CompletableFuture<LogicalSlot> future = new CompletableFuture<>();
this.taskQueue.add(new QueuedTask(task, future));
return future;
}
......@@ -837,10 +838,10 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener, Sl
private final ScheduledUnit task;
private final CompletableFuture<SimpleSlot> future;
private final CompletableFuture<LogicalSlot> future;
public QueuedTask(ScheduledUnit task, CompletableFuture<SimpleSlot> future) {
public QueuedTask(ScheduledUnit task, CompletableFuture<LogicalSlot> future) {
this.task = task;
this.future = future;
}
......@@ -849,7 +850,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener, Sl
return task;
}
public CompletableFuture<SimpleSlot> getFuture() {
public CompletableFuture<LogicalSlot> getFuture() {
return future;
}
}
......
......@@ -55,7 +55,7 @@ import org.apache.flink.runtime.heartbeat.HeartbeatManager;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.heartbeat.HeartbeatTarget;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.instance.Slot;
import org.apache.flink.runtime.instance.LogicalSlot;
import org.apache.flink.runtime.instance.SlotPool;
import org.apache.flink.runtime.instance.SlotPoolGateway;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
......@@ -444,7 +444,7 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
return FutureUtils.completedExceptionally(new Exception("No InputSplitAssigner for vertex ID " + vertexID));
}
final Slot slot = execution.getAssignedResource();
final LogicalSlot slot = execution.getAssignedResource();
final int taskId = execution.getVertex().getParallelSubtaskIndex();
final String host = slot != null ? slot.getTaskManagerLocation().getHostname() : null;
final InputSplit nextInputSplit = splitAssigner.getNextInputSplit(host, taskId);
......
......@@ -27,7 +27,7 @@ import org.apache.flink.runtime.executiongraph.ExecutionGraphException;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.IntermediateResult;
import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
import org.apache.flink.runtime.instance.SimpleSlot;
import org.apache.flink.runtime.instance.LogicalSlot;
import org.apache.flink.runtime.io.network.ConnectionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
......@@ -60,7 +60,7 @@ public class InputChannelDeploymentDescriptorTest {
ResourceID consumerResourceId = ResourceID.generate();
ExecutionVertex consumer = mock(ExecutionVertex.class);
SimpleSlot consumerSlot = mockSlot(consumerResourceId);
LogicalSlot consumerSlot = mockSlot(consumerResourceId);
// Local and remote channel are only allowed for certain execution
// states.
......@@ -86,7 +86,7 @@ public class InputChannelDeploymentDescriptorTest {
InputChannelDeploymentDescriptor[] desc = InputChannelDeploymentDescriptor.fromEdges(
new ExecutionEdge[]{localEdge, remoteEdge, unknownEdge},
consumerSlot,
consumerSlot.getTaskManagerLocation().getResourceID(),
allowLazyDeployment);
assertEquals(3, desc.length);
......@@ -124,7 +124,7 @@ public class InputChannelDeploymentDescriptorTest {
public void testUnknownChannelWithoutLazyDeploymentThrows() throws Exception {
ResourceID consumerResourceId = ResourceID.generate();
ExecutionVertex consumer = mock(ExecutionVertex.class);
SimpleSlot consumerSlot = mockSlot(consumerResourceId);
LogicalSlot consumerSlot = mockSlot(consumerResourceId);
// Unknown partition
ExecutionVertex unknownProducer = mockExecutionVertex(ExecutionState.CREATED, null); // no assigned resource
......@@ -137,7 +137,7 @@ public class InputChannelDeploymentDescriptorTest {
InputChannelDeploymentDescriptor[] desc = InputChannelDeploymentDescriptor.fromEdges(
new ExecutionEdge[]{unknownEdge},
consumerSlot,
consumerSlot.getTaskManagerLocation().getResourceID(),
allowLazyDeployment);
assertEquals(1, desc.length);
......@@ -152,7 +152,7 @@ public class InputChannelDeploymentDescriptorTest {
InputChannelDeploymentDescriptor.fromEdges(
new ExecutionEdge[]{unknownEdge},
consumerSlot,
consumerSlot.getTaskManagerLocation().getResourceID(),
allowLazyDeployment);
fail("Did not throw expected ExecutionGraphException");
......@@ -162,10 +162,9 @@ public class InputChannelDeploymentDescriptorTest {
// ------------------------------------------------------------------------
private static SimpleSlot mockSlot(ResourceID resourceId) {
SimpleSlot slot = mock(SimpleSlot.class);
private static LogicalSlot mockSlot(ResourceID resourceId) {
LogicalSlot slot = mock(LogicalSlot.class);
when(slot.getTaskManagerLocation()).thenReturn(new TaskManagerLocation(resourceId, InetAddress.getLoopbackAddress(), 5000));
when(slot.getTaskManagerID()).thenReturn(resourceId);
return slot;
}
......@@ -178,7 +177,7 @@ public class InputChannelDeploymentDescriptorTest {
when(exec.getAttemptId()).thenReturn(new ExecutionAttemptID());
if (resourceId != null) {
SimpleSlot slot = mockSlot(resourceId);
LogicalSlot slot = mockSlot(resourceId);
when(exec.getAssignedResource()).thenReturn(slot);
when(vertex.getCurrentAssignedResource()).thenReturn(slot);
} else {
......
......@@ -40,6 +40,7 @@ import org.apache.flink.runtime.executiongraph.failover.RestartAllStrategy;
import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.instance.LogicalSlot;
import org.apache.flink.runtime.instance.SimpleSlot;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
......@@ -587,10 +588,10 @@ public class ExecutionGraphDeploymentTest extends TestLogger {
sinkVertex.connectNewDataSetAsInput(sourceVertex, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
final Map<JobVertexID, CompletableFuture<SimpleSlot>[]> slotFutures = new HashMap<>(2);
final Map<JobVertexID, CompletableFuture<LogicalSlot>[]> slotFutures = new HashMap<>(2);
for (JobVertexID jobVertexID : Arrays.asList(sourceVertexId, sinkVertexId)) {
CompletableFuture<SimpleSlot>[] slotFutureArray = new CompletableFuture[parallelism];
CompletableFuture<LogicalSlot>[] slotFutureArray = new CompletableFuture[parallelism];
for (int i = 0; i < parallelism; i++) {
slotFutureArray[i] = new CompletableFuture<>();
......
......@@ -18,38 +18,28 @@
package org.apache.flink.runtime.executiongraph;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.execution.SuppressRestartsException;
import org.apache.flink.runtime.executiongraph.metrics.RestartTimeGauge;
import org.apache.flink.runtime.executiongraph.restart.RestartCallback;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.instance.SimpleSlot;
import org.apache.flink.runtime.instance.Slot;
import org.apache.flink.runtime.instance.LogicalSlot;
import org.apache.flink.runtime.instance.TestingLogicalSlot;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.TestLogger;
import org.junit.Test;
import org.mockito.Matchers;
import java.io.IOException;
import java.util.ArrayList;
......@@ -87,40 +77,9 @@ public class ExecutionGraphMetricsTest extends TestLogger {
Time timeout = Time.seconds(10L);
Scheduler scheduler = mock(Scheduler.class);
ResourceID taskManagerId = ResourceID.generate();
TaskManagerLocation taskManagerLocation = mock(TaskManagerLocation.class);
when(taskManagerLocation.getResourceID()).thenReturn(taskManagerId);
when(taskManagerLocation.getHostname()).thenReturn("localhost");
TaskManagerGateway taskManagerGateway = mock(TaskManagerGateway.class);
Instance instance = mock(Instance.class);
when(instance.getTaskManagerLocation()).thenReturn(taskManagerLocation);
when(instance.getTaskManagerID()).thenReturn(taskManagerId);
when(instance.getTaskManagerGateway()).thenReturn(taskManagerGateway);
Slot rootSlot = mock(Slot.class);
AllocatedSlot mockAllocatedSlot = mock(AllocatedSlot.class);
when(mockAllocatedSlot.getSlotAllocationId()).thenReturn(new AllocationID());
SimpleSlot simpleSlot = mock(SimpleSlot.class);
when(simpleSlot.isAlive()).thenReturn(true);
when(simpleSlot.getTaskManagerLocation()).thenReturn(taskManagerLocation);
when(simpleSlot.getTaskManagerID()).thenReturn(taskManagerId);
when(simpleSlot.getTaskManagerGateway()).thenReturn(taskManagerGateway);
when(simpleSlot.setExecutedVertex(Matchers.any(Execution.class))).thenReturn(true);
when(simpleSlot.getRoot()).thenReturn(rootSlot);
when(simpleSlot.getAllocatedSlot()).thenReturn(mockAllocatedSlot);
CompletableFuture<SimpleSlot> future = new CompletableFuture<>();
future.complete(simpleSlot);
when(scheduler.allocateSlot(any(ScheduledUnit.class), anyBoolean(), any(Collection.class))).thenReturn(future);
when(rootSlot.getSlotNumber()).thenReturn(0);
when(taskManagerGateway.submitTask(any(TaskDeploymentDescriptor.class), any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
CompletableFuture<LogicalSlot> slotFuture1 = CompletableFuture.completedFuture(new TestingLogicalSlot());
CompletableFuture<LogicalSlot> slotFuture2 = CompletableFuture.completedFuture(new TestingLogicalSlot());
when(scheduler.allocateSlot(any(ScheduledUnit.class), anyBoolean(), any(Collection.class))).thenReturn(slotFuture1, slotFuture2);
TestingRestartStrategy testingRestartStrategy = new TestingRestartStrategy();
......@@ -130,7 +89,7 @@ public class ExecutionGraphMetricsTest extends TestLogger {
jobGraph.getJobID(),
jobGraph.getName(),
jobConfig,
new SerializedValue<ExecutionConfig>(null),
new SerializedValue<>(null),
timeout,
testingRestartStrategy,
scheduler);
......
......@@ -29,6 +29,7 @@ import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
import org.apache.flink.runtime.instance.LogicalSlot;
import org.apache.flink.runtime.instance.SimpleSlot;
import org.apache.flink.runtime.instance.Slot;
import org.apache.flink.runtime.instance.SlotProvider;
......@@ -109,8 +110,8 @@ public class ExecutionGraphSchedulingTest extends TestLogger {
final JobID jobId = new JobID();
final JobGraph jobGraph = new JobGraph(jobId, "test", sourceVertex, targetVertex);
final CompletableFuture<SimpleSlot> sourceFuture = new CompletableFuture<>();
final CompletableFuture<SimpleSlot> targetFuture = new CompletableFuture<>();
final CompletableFuture<LogicalSlot> sourceFuture = new CompletableFuture<>();
final CompletableFuture<LogicalSlot> targetFuture = new CompletableFuture<>();
ProgrammedSlotProvider slotProvider = new ProgrammedSlotProvider(parallelism);
slotProvider.addSlot(sourceVertex.getID(), 0, sourceFuture);
......@@ -177,9 +178,9 @@ public class ExecutionGraphSchedulingTest extends TestLogger {
final JobGraph jobGraph = new JobGraph(jobId, "test", sourceVertex, targetVertex);
@SuppressWarnings({"unchecked", "rawtypes"})
final CompletableFuture<SimpleSlot>[] sourceFutures = new CompletableFuture[parallelism];
final CompletableFuture<LogicalSlot>[] sourceFutures = new CompletableFuture[parallelism];
@SuppressWarnings({"unchecked", "rawtypes"})
final CompletableFuture<SimpleSlot>[] targetFutures = new CompletableFuture[parallelism];
final CompletableFuture<LogicalSlot>[] targetFutures = new CompletableFuture[parallelism];
//
// Create the slots, futures, and the slot provider
......@@ -283,9 +284,9 @@ public class ExecutionGraphSchedulingTest extends TestLogger {
final SimpleSlot[] targetSlots = new SimpleSlot[parallelism];
@SuppressWarnings({"unchecked", "rawtypes"})
final CompletableFuture<SimpleSlot>[] sourceFutures = new CompletableFuture[parallelism];
final CompletableFuture<LogicalSlot>[] sourceFutures = new CompletableFuture[parallelism];
@SuppressWarnings({"unchecked", "rawtypes"})
final CompletableFuture<SimpleSlot>[] targetFutures = new CompletableFuture[parallelism];
final CompletableFuture<LogicalSlot>[] targetFutures = new CompletableFuture[parallelism];
for (int i = 0; i < parallelism; i++) {
sourceSlots[i] = createSlot(taskManager, jobId, slotOwner);
......@@ -358,7 +359,7 @@ public class ExecutionGraphSchedulingTest extends TestLogger {
final TaskManagerGateway taskManager = mock(TaskManagerGateway.class);
final SimpleSlot[] slots = new SimpleSlot[parallelism];
@SuppressWarnings({"unchecked", "rawtypes"})
final CompletableFuture<SimpleSlot>[] slotFutures = new CompletableFuture[parallelism];
final CompletableFuture<LogicalSlot>[] slotFutures = new CompletableFuture[parallelism];
for (int i = 0; i < parallelism; i++) {
slots[i] = createSlot(taskManager, jobId, slotOwner);
......@@ -392,7 +393,7 @@ public class ExecutionGraphSchedulingTest extends TestLogger {
// verify that no deployments have happened
verify(taskManager, times(0)).submitTask(any(TaskDeploymentDescriptor.class), any(Time.class));
for (CompletableFuture<SimpleSlot> future : slotFutures) {
for (CompletableFuture<LogicalSlot> future : slotFutures) {
if (future.isDone()) {
assertTrue(future.get().isCanceled());
}
......
......@@ -22,6 +22,7 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
import org.apache.flink.runtime.instance.LogicalSlot;
import org.apache.flink.runtime.instance.SimpleSlot;
import org.apache.flink.runtime.instance.Slot;
import org.apache.flink.runtime.jobgraph.JobVertex;
......@@ -62,7 +63,7 @@ public class ExecutionTest extends TestLogger {
final JobVertex jobVertex = new JobVertex("Test vertex", jobVertexId);
jobVertex.setInvokableClass(NoOpInvokable.class);
final CompletableFuture<SimpleSlot> slotFuture = new CompletableFuture<>();
final CompletableFuture<LogicalSlot> slotFuture = new CompletableFuture<>();
final ProgrammedSlotProvider slotProvider = new ProgrammedSlotProvider(1);
slotProvider.addSlot(jobVertexId, 0, slotFuture);
......
......@@ -25,8 +25,8 @@ import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.instance.LogicalSlot;
import org.apache.flink.runtime.instance.SimpleSlot;
import org.apache.flink.runtime.instance.Slot;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
......@@ -374,12 +374,8 @@ public class ExecutionVertexDeploymentTest extends TestLogger {
AllocatedSlot allocatedSlot = mock(AllocatedSlot.class);
when(allocatedSlot.getSlotAllocationId()).thenReturn(new AllocationID());
Slot root = mock(Slot.class);
when(root.getSlotNumber()).thenReturn(1);
SimpleSlot slot = mock(SimpleSlot.class);
when(slot.getRoot()).thenReturn(root);
when(slot.getAllocatedSlot()).thenReturn(allocatedSlot);
when(root.getAllocatedSlot()).thenReturn(allocatedSlot);
LogicalSlot slot = mock(LogicalSlot.class);
when(slot.getAllocationId()).thenReturn(new AllocationID());
for (ScheduleMode mode : ScheduleMode.values()) {
vertex.getExecutionGraph().setScheduleMode(mode);
......
......@@ -18,7 +18,7 @@
package org.apache.flink.runtime.executiongraph;
import org.apache.flink.runtime.instance.SimpleSlot;
import org.apache.flink.runtime.instance.LogicalSlot;
import org.apache.flink.runtime.instance.SlotProvider;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
......@@ -38,7 +38,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
*/
class ProgrammedSlotProvider implements SlotProvider {
private final Map<JobVertexID, CompletableFuture<SimpleSlot>[]> slotFutures = new HashMap<>();
private final Map<JobVertexID, CompletableFuture<LogicalSlot>[]> slotFutures = new HashMap<>();
private final Map<JobVertexID, CompletableFuture<Boolean>[]> slotFutureRequested = new HashMap<>();
......@@ -49,17 +49,17 @@ class ProgrammedSlotProvider implements SlotProvider {
this.parallelism = parallelism;
}
public void addSlot(JobVertexID vertex, int subtaskIndex, CompletableFuture<SimpleSlot> future) {
public void addSlot(JobVertexID vertex, int subtaskIndex, CompletableFuture<LogicalSlot> future) {
checkNotNull(vertex);
checkNotNull(future);
checkArgument(subtaskIndex >= 0 && subtaskIndex < parallelism);
CompletableFuture<SimpleSlot>[] futures = slotFutures.get(vertex);
CompletableFuture<LogicalSlot>[] futures = slotFutures.get(vertex);
CompletableFuture<Boolean>[] requestedFutures = slotFutureRequested.get(vertex);
if (futures == null) {
@SuppressWarnings("unchecked")
CompletableFuture<SimpleSlot>[] newArray = (CompletableFuture<SimpleSlot>[]) new CompletableFuture<?>[parallelism];
CompletableFuture<LogicalSlot>[] newArray = (CompletableFuture<LogicalSlot>[]) new CompletableFuture<?>[parallelism];
futures = newArray;
slotFutures.put(vertex, futures);
......@@ -71,7 +71,7 @@ class ProgrammedSlotProvider implements SlotProvider {
requestedFutures[subtaskIndex] = new CompletableFuture<>();
}
public void addSlots(JobVertexID vertex, CompletableFuture<SimpleSlot>[] futures) {
public void addSlots(JobVertexID vertex, CompletableFuture<LogicalSlot>[] futures) {
checkNotNull(vertex);
checkNotNull(futures);
checkArgument(futures.length == parallelism);
......@@ -92,16 +92,16 @@ class ProgrammedSlotProvider implements SlotProvider {
}
@Override
public CompletableFuture<SimpleSlot> allocateSlot(
public CompletableFuture<LogicalSlot> allocateSlot(
ScheduledUnit task,
boolean allowQueued,
Collection<TaskManagerLocation> preferredLocations) {
JobVertexID vertexId = task.getTaskToExecute().getVertex().getJobvertexId();
int subtask = task.getTaskToExecute().getParallelSubtaskIndex();
CompletableFuture<SimpleSlot>[] forTask = slotFutures.get(vertexId);
CompletableFuture<LogicalSlot>[] forTask = slotFutures.get(vertexId);
if (forTask != null) {
CompletableFuture<SimpleSlot> future = forTask[subtask];
CompletableFuture<LogicalSlot> future = forTask[subtask];
if (future != null) {
slotFutureRequested.get(vertexId)[subtask].complete(true);
......
......@@ -23,6 +23,7 @@ import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.instance.LogicalSlot;
import org.apache.flink.runtime.instance.SimpleSlot;
import org.apache.flink.runtime.instance.Slot;
import org.apache.flink.runtime.instance.SlotProvider;
......@@ -71,7 +72,7 @@ public class SimpleSlotProvider implements SlotProvider, SlotOwner {
}
@Override
public CompletableFuture<SimpleSlot> allocateSlot(
public CompletableFuture<LogicalSlot> allocateSlot(
ScheduledUnit task,
boolean allowQueued,
Collection<TaskManagerLocation> preferredLocations) {
......
......@@ -28,6 +28,7 @@ import org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestUtils;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.util.AbstractID;
import org.apache.flink.util.TestLogger;
import org.junit.Test;
......@@ -38,7 +39,7 @@ import static org.junit.Assert.*;
/**
* Tests for the allocation, properties, and release of shared slots.
*/
public class SharedSlotsTest {
public class SharedSlotsTest extends TestLogger {
private static final Iterable<TaskManagerLocation> NO_LOCATION = Collections.emptySet();
......
......@@ -29,11 +29,13 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.util.TestLogger;
import org.junit.Test;
import org.mockito.Matchers;
public class SimpleSlotTest {
public class SimpleSlotTest extends TestLogger {
@Test
public void testStateTransitions() {
......@@ -81,11 +83,11 @@ public class SimpleSlotTest {
{
SimpleSlot slot = getSlot();
assertTrue(slot.setExecutedVertex(ev));
assertTrue(slot.setExecution(ev));
assertEquals(ev, slot.getExecutedVertex());
// try to add another one
assertFalse(slot.setExecutedVertex(ev_2));
assertFalse(slot.setExecution(ev_2));
assertEquals(ev, slot.getExecutedVertex());
}
......@@ -94,7 +96,7 @@ public class SimpleSlotTest {
SimpleSlot slot = getSlot();
assertTrue(slot.markCancelled());
assertFalse(slot.setExecutedVertex(ev));
assertFalse(slot.setExecution(ev));
assertNull(slot.getExecutedVertex());
}
......@@ -104,7 +106,7 @@ public class SimpleSlotTest {
assertTrue(slot.markCancelled());
assertTrue(slot.markReleased());
assertFalse(slot.setExecutedVertex(ev));
assertFalse(slot.setExecution(ev));
assertNull(slot.getExecutedVertex());
}
......@@ -113,7 +115,7 @@ public class SimpleSlotTest {
SimpleSlot slot = getSlot();
slot.releaseSlot();
assertFalse(slot.setExecutedVertex(ev));
assertFalse(slot.setExecution(ev));
assertNull(slot.getExecutedVertex());
}
}
......@@ -129,7 +131,7 @@ public class SimpleSlotTest {
Execution ev = mock(Execution.class);
SimpleSlot slot = getSlot();
assertTrue(slot.setExecutedVertex(ev));
assertTrue(slot.setExecution(ev));
assertEquals(ev, slot.getExecutedVertex());
slot.releaseSlot();
......
......@@ -107,7 +107,7 @@ public class SlotPoolRpcTest extends TestLogger {
try {
pool.start(JobMasterId.generate(), "foobar");
CompletableFuture<SimpleSlot> future = pool.allocateSlot(
CompletableFuture<LogicalSlot> future = pool.allocateSlot(
new SlotPoolGateway.SlotRequestID(),
new ScheduledUnit(SchedulerTestUtils.getDummyTask()),
DEFAULT_TESTING_PROFILE,
......@@ -142,7 +142,7 @@ public class SlotPoolRpcTest extends TestLogger {
SlotPoolGateway slotPoolGateway = pool.getSelfGateway(SlotPoolGateway.class);
SlotPoolGateway.SlotRequestID requestId = new SlotPoolGateway.SlotRequestID();
CompletableFuture<SimpleSlot> future = slotPoolGateway.allocateSlot(
CompletableFuture<LogicalSlot> future = slotPoolGateway.allocateSlot(
requestId,
new ScheduledUnit(SchedulerTestUtils.getDummyTask()),
DEFAULT_TESTING_PROFILE,
......@@ -186,7 +186,7 @@ public class SlotPoolRpcTest extends TestLogger {
pool.connectToResourceManager(resourceManagerGateway);
SlotPoolGateway.SlotRequestID requestId = new SlotPoolGateway.SlotRequestID();
CompletableFuture<SimpleSlot> future = slotPoolGateway.allocateSlot(
CompletableFuture<LogicalSlot> future = slotPoolGateway.allocateSlot(
requestId,
new ScheduledUnit(SchedulerTestUtils.getDummyTask()),
DEFAULT_TESTING_PROFILE,
......@@ -237,7 +237,7 @@ public class SlotPoolRpcTest extends TestLogger {
pool.connectToResourceManager(resourceManagerGateway);
SlotPoolGateway.SlotRequestID requestId = new SlotPoolGateway.SlotRequestID();
CompletableFuture<SimpleSlot> future = slotPoolGateway.allocateSlot(
CompletableFuture<LogicalSlot> future = slotPoolGateway.allocateSlot(
requestId,
new ScheduledUnit(SchedulerTestUtils.getDummyTask()),
DEFAULT_TESTING_PROFILE,
......@@ -300,7 +300,7 @@ public class SlotPoolRpcTest extends TestLogger {
ScheduledUnit mockScheduledUnit = new ScheduledUnit(SchedulerTestUtils.getDummyTask());
// test the pending request is clear when timed out
CompletableFuture<SimpleSlot> future = pool.getSlotProvider().allocateSlot(
CompletableFuture<LogicalSlot> future = pool.getSlotProvider().allocateSlot(
mockScheduledUnit,
true,
Collections.emptyList());
......
......@@ -96,7 +96,7 @@ public class SlotPoolTest extends TestLogger {
slotPoolGateway.registerTaskManager(resourceID);
SlotPoolGateway.SlotRequestID requestId = new SlotPoolGateway.SlotRequestID();
CompletableFuture<SimpleSlot> future = slotPoolGateway.allocateSlot(requestId, mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout);
CompletableFuture<LogicalSlot> future = slotPoolGateway.allocateSlot(requestId, mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout);
assertFalse(future.isDone());
ArgumentCaptor<SlotRequest> slotRequestArgumentCaptor = ArgumentCaptor.forClass(SlotRequest.class);
......@@ -107,13 +107,11 @@ public class SlotPoolTest extends TestLogger {
AllocatedSlot allocatedSlot = createAllocatedSlot(resourceID, slotRequest.getAllocationId(), jobId, DEFAULT_TESTING_PROFILE);
assertTrue(slotPoolGateway.offerSlot(allocatedSlot).get());
SimpleSlot slot = future.get(1, TimeUnit.SECONDS);
LogicalSlot slot = future.get(1, TimeUnit.SECONDS);
assertTrue(future.isDone());
assertTrue(slot.isAlive());
assertEquals(resourceID, slot.getTaskManagerID());
assertEquals(jobId, slot.getJobID());
assertEquals(slotPool.getSlotOwner(), slot.getOwner());
assertEquals(slotPool.getAllocatedSlots().get(slot.getAllocatedSlot().getSlotAllocationId()), slot);
assertEquals(resourceID, slot.getTaskManagerLocation().getResourceID());
assertEquals(slotPool.getAllocatedSlots().get(slot.getAllocationId()), slot);
} finally {
slotPool.shutDown();
}
......@@ -129,8 +127,8 @@ public class SlotPoolTest extends TestLogger {
ResourceID resourceID = new ResourceID("resource");
slotPool.registerTaskManager(resourceID);
CompletableFuture<SimpleSlot> future1 = slotPoolGateway.allocateSlot(new SlotPoolGateway.SlotRequestID(), mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout);
CompletableFuture<SimpleSlot> future2 = slotPoolGateway.allocateSlot(new SlotPoolGateway.SlotRequestID(), mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout);
CompletableFuture<LogicalSlot> future1 = slotPoolGateway.allocateSlot(new SlotPoolGateway.SlotRequestID(), mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout);
CompletableFuture<LogicalSlot> future2 = slotPoolGateway.allocateSlot(new SlotPoolGateway.SlotRequestID(), mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout);
assertFalse(future1.isDone());
assertFalse(future2.isDone());
......@@ -144,7 +142,7 @@ public class SlotPoolTest extends TestLogger {
AllocatedSlot allocatedSlot = createAllocatedSlot(resourceID, slotRequests.get(0).getAllocationId(), jobId, DEFAULT_TESTING_PROFILE);
assertTrue(slotPoolGateway.offerSlot(allocatedSlot).get());
SimpleSlot slot1 = future1.get(1, TimeUnit.SECONDS);
LogicalSlot slot1 = future1.get(1, TimeUnit.SECONDS);
assertTrue(future1.isDone());
assertFalse(future2.isDone());
......@@ -152,15 +150,15 @@ public class SlotPoolTest extends TestLogger {
slot1.releaseSlot();
// second allocation fulfilled by previous slot returning
SimpleSlot slot2 = future2.get(1, TimeUnit.SECONDS);
LogicalSlot slot2 = future2.get(1, TimeUnit.SECONDS);
assertTrue(future2.isDone());
assertNotEquals(slot1, slot2);
assertTrue(slot1.isReleased());
assertTrue(slot2.isAlive());
assertEquals(slot1.getTaskManagerID(), slot2.getTaskManagerID());
assertEquals(slot1.getSlotNumber(), slot2.getSlotNumber());
assertEquals(slotPool.getAllocatedSlots().get(slot1.getAllocatedSlot().getSlotAllocationId()), slot2);
assertEquals(slot1.getTaskManagerLocation(), slot2.getTaskManagerLocation());
assertEquals(slot1.getPhysicalSlotNumber(), slot2.getPhysicalSlotNumber());
assertEquals(slotPool.getAllocatedSlots().get(slot1.getAllocationId()), slot2);
} finally {
slotPool.shutDown();
}
......@@ -176,7 +174,7 @@ public class SlotPoolTest extends TestLogger {
ResourceID resourceID = new ResourceID("resource");
slotPoolGateway.registerTaskManager(resourceID);
CompletableFuture<SimpleSlot> future1 = slotPoolGateway.allocateSlot(new SlotPoolGateway.SlotRequestID(), mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout);
CompletableFuture<LogicalSlot> future1 = slotPoolGateway.allocateSlot(new SlotPoolGateway.SlotRequestID(), mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout);
assertFalse(future1.isDone());
ArgumentCaptor<SlotRequest> slotRequestArgumentCaptor = ArgumentCaptor.forClass(SlotRequest.class);
......@@ -187,23 +185,23 @@ public class SlotPoolTest extends TestLogger {
AllocatedSlot allocatedSlot = createAllocatedSlot(resourceID, slotRequest.getAllocationId(), jobId, DEFAULT_TESTING_PROFILE);
assertTrue(slotPoolGateway.offerSlot(allocatedSlot).get());
SimpleSlot slot1 = future1.get(1, TimeUnit.SECONDS);
LogicalSlot slot1 = future1.get(1, TimeUnit.SECONDS);
assertTrue(future1.isDone());
// return this slot to pool
slot1.releaseSlot();
CompletableFuture<SimpleSlot> future2 = slotPoolGateway.allocateSlot(new SlotPoolGateway.SlotRequestID(), mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout);
CompletableFuture<LogicalSlot> future2 = slotPoolGateway.allocateSlot(new SlotPoolGateway.SlotRequestID(), mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout);
// second allocation fulfilled by previous slot returning
SimpleSlot slot2 = future2.get(1, TimeUnit.SECONDS);
LogicalSlot slot2 = future2.get(1, TimeUnit.SECONDS);
assertTrue(future2.isDone());
assertNotEquals(slot1, slot2);
assertTrue(slot1.isReleased());
assertTrue(slot2.isAlive());
assertEquals(slot1.getTaskManagerID(), slot2.getTaskManagerID());
assertEquals(slot1.getSlotNumber(), slot2.getSlotNumber());
assertEquals(slot1.getTaskManagerLocation(), slot2.getTaskManagerLocation());
assertEquals(slot1.getPhysicalSlotNumber(), slot2.getPhysicalSlotNumber());
} finally {
slotPool.shutDown();
}
......@@ -219,7 +217,7 @@ public class SlotPoolTest extends TestLogger {
ResourceID resourceID = new ResourceID("resource");
slotPoolGateway.registerTaskManager(resourceID);
CompletableFuture<SimpleSlot> future = slotPoolGateway.allocateSlot(new SlotPoolGateway.SlotRequestID(), mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout);
CompletableFuture<LogicalSlot> future = slotPoolGateway.allocateSlot(new SlotPoolGateway.SlotRequestID(), mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout);
assertFalse(future.isDone());
ArgumentCaptor<SlotRequest> slotRequestArgumentCaptor = ArgumentCaptor.forClass(SlotRequest.class);
......@@ -240,7 +238,7 @@ public class SlotPoolTest extends TestLogger {
// accepted slot
assertTrue(slotPoolGateway.offerSlot(allocatedSlot).get());
SimpleSlot slot = future.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
LogicalSlot slot = future.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
assertTrue(slot.isAlive());
// duplicated offer with using slot
......@@ -275,19 +273,19 @@ public class SlotPoolTest extends TestLogger {
ResourceID resourceID = new ResourceID("resource");
slotPoolGateway.registerTaskManager(resourceID);
CompletableFuture<SimpleSlot> future1 = slotPoolGateway.allocateSlot(new SlotPoolGateway.SlotRequestID(), mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout);
CompletableFuture<LogicalSlot> future1 = slotPoolGateway.allocateSlot(new SlotPoolGateway.SlotRequestID(), mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout);
ArgumentCaptor<SlotRequest> slotRequestArgumentCaptor = ArgumentCaptor.forClass(SlotRequest.class);
verify(resourceManagerGateway, Mockito.timeout(timeout.toMilliseconds())).requestSlot(any(JobMasterId.class), slotRequestArgumentCaptor.capture(), any(Time.class));
final SlotRequest slotRequest = slotRequestArgumentCaptor.getValue();
CompletableFuture<SimpleSlot> future2 = slotPoolGateway.allocateSlot(new SlotPoolGateway.SlotRequestID(), mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout);
CompletableFuture<LogicalSlot> future2 = slotPoolGateway.allocateSlot(new SlotPoolGateway.SlotRequestID(), mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout);
AllocatedSlot allocatedSlot = createAllocatedSlot(resourceID, slotRequest.getAllocationId(), jobId, DEFAULT_TESTING_PROFILE);
assertTrue(slotPoolGateway.offerSlot(allocatedSlot).get());
SimpleSlot slot1 = future1.get(1, TimeUnit.SECONDS);
LogicalSlot slot1 = future1.get(1, TimeUnit.SECONDS);
assertTrue(future1.isDone());
assertFalse(future2.isDone());
......@@ -332,7 +330,7 @@ public class SlotPoolTest extends TestLogger {
slotPoolGateway.connectToResourceManager(resourceManagerGateway);
CompletableFuture<SimpleSlot> slotFuture = slotPoolGateway.allocateSlot(
CompletableFuture<LogicalSlot> slotFuture = slotPoolGateway.allocateSlot(
new SlotPoolGateway.SlotRequestID(),
scheduledUnit,
ResourceProfile.UNKNOWN,
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.instance;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.util.Preconditions;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
/**
* Simple testing logical slot for testing purposes.
*/
public class TestingLogicalSlot implements LogicalSlot {
private final TaskManagerLocation taskManagerLocation;
private final TaskManagerGateway taskManagerGateway;
private final CompletableFuture<?> releaseFuture;
private final AtomicReference<Execution> executionReference;
private final int slotNumber;
private final AllocationID allocationId;
public TestingLogicalSlot() {
this(
new LocalTaskManagerLocation(),
new SimpleAckingTaskManagerGateway(),
0,
new AllocationID());
}
public TestingLogicalSlot(
TaskManagerLocation taskManagerLocation,
TaskManagerGateway taskManagerGateway,
int slotNumber,
AllocationID allocationId) {
this.taskManagerLocation = Preconditions.checkNotNull(taskManagerLocation);
this.taskManagerGateway = Preconditions.checkNotNull(taskManagerGateway);
this.releaseFuture = new CompletableFuture<>();
this.executionReference = new AtomicReference<>();
this.slotNumber = slotNumber;
this.allocationId = Preconditions.checkNotNull(allocationId);
}
@Override
public TaskManagerLocation getTaskManagerLocation() {
return taskManagerLocation;
}
@Override
public TaskManagerGateway getTaskManagerGateway() {
return taskManagerGateway;
}
@Override
public boolean isAlive() {
return !releaseFuture.isDone();
}
@Override
public boolean isCanceled() {
return releaseFuture.isDone();
}
@Override
public boolean isReleased() {
return releaseFuture.isDone();
}
@Override
public boolean setExecution(Execution execution) {
return executionReference.compareAndSet(null, execution);
}
@Override
public void releaseSlot() {
releaseFuture.complete(null);
}
@Override
public int getPhysicalSlotNumber() {
return slotNumber;
}
@Override
public AllocationID getAllocationId() {
return allocationId;
}
}
......@@ -32,11 +32,13 @@ import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.testingUtils.TestingCluster;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.util.TestLogger;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
public class PartialConsumePipelinedResultTest {
public class PartialConsumePipelinedResultTest extends TestLogger {
// Test configuration
private final static int NUMBER_OF_TMS = 1;
......
......@@ -28,6 +28,7 @@ import org.apache.flink.runtime.jobmanager.SlotCountExceedingParallelismTest;
import org.apache.flink.runtime.testingUtils.TestingCluster;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.types.IntValue;
import org.apache.flink.util.TestLogger;
import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
......@@ -39,7 +40,7 @@ import java.util.List;
import static org.apache.flink.runtime.jobmanager.SlotCountExceedingParallelismTest.SubtaskIndexReceiver.CONFIG_KEY;
public class ScheduleOrUpdateConsumersTest {
public class ScheduleOrUpdateConsumersTest extends TestLogger {
private final static int NUMBER_OF_TMS = 2;
private final static int NUMBER_OF_SLOTS_PER_TM = 2;
......
......@@ -18,8 +18,9 @@
package org.apache.flink.runtime.jobmanager.scheduler;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.instance.SimpleSlot;
import org.apache.flink.runtime.instance.LogicalSlot;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.junit.Test;
......@@ -30,6 +31,7 @@ import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
......@@ -125,11 +127,11 @@ public class SchedulerIsolatedTasksTest {
assertEquals(5, scheduler.getNumberOfAvailableSlots());
// schedule something into all slots
SimpleSlot s1 = scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false, Collections.emptyList()).get();
SimpleSlot s2 = scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false, Collections.emptyList()).get();
SimpleSlot s3 = scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false, Collections.emptyList()).get();
SimpleSlot s4 = scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false, Collections.emptyList()).get();
SimpleSlot s5 = scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false, Collections.emptyList()).get();
LogicalSlot s1 = scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false, Collections.emptyList()).get();
LogicalSlot s2 = scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false, Collections.emptyList()).get();
LogicalSlot s3 = scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false, Collections.emptyList()).get();
LogicalSlot s4 = scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false, Collections.emptyList()).get();
LogicalSlot s5 = scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false, Collections.emptyList()).get();
// the slots should all be different
assertTrue(areAllDistinct(s1, s2, s3, s4, s5));
......@@ -148,8 +150,8 @@ public class SchedulerIsolatedTasksTest {
assertEquals(2, scheduler.getNumberOfAvailableSlots());
// now we can schedule some more slots
SimpleSlot s6 = scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false, Collections.emptyList()).get();
SimpleSlot s7 = scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false, Collections.emptyList()).get();
LogicalSlot s6 = scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false, Collections.emptyList()).get();
LogicalSlot s7 = scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false, Collections.emptyList()).get();
assertTrue(areAllDistinct(s1, s2, s3, s4, s5, s6, s7));
......@@ -198,10 +200,10 @@ public class SchedulerIsolatedTasksTest {
final int totalSlots = scheduler.getNumberOfAvailableSlots();
// all slots we ever got.
List<CompletableFuture<SimpleSlot>> allAllocatedSlots = new ArrayList<>();
List<CompletableFuture<LogicalSlot>> allAllocatedSlots = new ArrayList<>();
// slots that need to be released
final Set<SimpleSlot> toRelease = new HashSet<SimpleSlot>();
final Set<LogicalSlot> toRelease = new HashSet<>();
// flag to track errors in the concurrent thread
final AtomicBoolean errored = new AtomicBoolean(false);
......@@ -219,8 +221,8 @@ public class SchedulerIsolatedTasksTest {
toRelease.wait();
}
Iterator<SimpleSlot> iter = toRelease.iterator();
SimpleSlot next = iter.next();
Iterator<LogicalSlot> iter = toRelease.iterator();
LogicalSlot next = iter.next();
iter.remove();
next.releaseSlot();
......@@ -237,9 +239,9 @@ public class SchedulerIsolatedTasksTest {
disposeThread.start();
for (int i = 0; i < NUM_TASKS_TO_SCHEDULE; i++) {
CompletableFuture<SimpleSlot> future = scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), true, Collections.emptyList());
CompletableFuture<LogicalSlot> future = scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), true, Collections.emptyList());
future.thenAcceptAsync(
(SimpleSlot slot) -> {
(LogicalSlot slot) -> {
synchronized (toRelease) {
toRelease.add(slot);
toRelease.notifyAll();
......@@ -253,8 +255,8 @@ public class SchedulerIsolatedTasksTest {
assertFalse("The slot releasing thread caused an error.", errored.get());
List<SimpleSlot> slotsAfter = new ArrayList<SimpleSlot>();
for (CompletableFuture<SimpleSlot> future : allAllocatedSlots) {
List<LogicalSlot> slotsAfter = new ArrayList<>();
for (CompletableFuture<LogicalSlot> future : allAllocatedSlots) {
slotsAfter.add(future.get());
}
......@@ -285,7 +287,7 @@ public class SchedulerIsolatedTasksTest {
scheduler.newInstanceAvailable(i2);
scheduler.newInstanceAvailable(i3);
List<SimpleSlot> slots = new ArrayList<SimpleSlot>();
List<LogicalSlot> slots = new ArrayList<>();
slots.add(scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false, Collections.emptyList()).get());
slots.add(scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false, Collections.emptyList()).get());
slots.add(scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false, Collections.emptyList()).get());
......@@ -294,8 +296,8 @@ public class SchedulerIsolatedTasksTest {
i2.markDead();
for (SimpleSlot slot : slots) {
if (slot.getOwner() == i2) {
for (LogicalSlot slot : slots) {
if (Objects.equals(slot.getTaskManagerLocation().getResourceID(), i2.getTaskManagerID())) {
assertTrue(slot.isCanceled());
} else {
assertFalse(slot.isCanceled());
......@@ -346,37 +348,48 @@ public class SchedulerIsolatedTasksTest {
scheduler.newInstanceAvailable(i3);
// schedule something on an arbitrary instance
SimpleSlot s1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(new Instance[0])), false, Collections.emptyList()).get();
LogicalSlot s1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(new Instance[0])), false, Collections.emptyList()).get();
// figure out how we use the location hints
Instance first = (Instance) s1.getOwner();
Instance second = first != i1 ? i1 : i2;
Instance third = first == i3 ? i2 : i3;
ResourceID firstResourceId = s1.getTaskManagerLocation().getResourceID();
List<Instance> instances = Arrays.asList(i1, i2, i3);
int index = 0;
for (; index < instances.size(); index++) {
if (Objects.equals(instances.get(index).getTaskManagerID(), firstResourceId)) {
break;
}
}
Instance first = instances.get(index);
Instance second = instances.get((index + 1) % instances.size());
Instance third = instances.get((index + 2) % instances.size());
// something that needs to go to the first instance again
SimpleSlot s2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(s1.getTaskManagerLocation())), false, Collections.singleton(s1.getTaskManagerLocation())).get();
assertEquals(first, s2.getOwner());
LogicalSlot s2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(s1.getTaskManagerLocation())), false, Collections.singleton(s1.getTaskManagerLocation())).get();
assertEquals(first.getTaskManagerID(), s2.getTaskManagerLocation().getResourceID());
// first or second --> second, because first is full
SimpleSlot s3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(first, second)), false, Arrays.asList(first.getTaskManagerLocation(), second.getTaskManagerLocation())).get();
assertEquals(second, s3.getOwner());
LogicalSlot s3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(first, second)), false, Arrays.asList(first.getTaskManagerLocation(), second.getTaskManagerLocation())).get();
assertEquals(second.getTaskManagerID(), s3.getTaskManagerLocation().getResourceID());
// first or third --> third (because first is full)
SimpleSlot s4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(first, third)), false, Arrays.asList(first.getTaskManagerLocation(), third.getTaskManagerLocation())).get();
SimpleSlot s5 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(first, third)), false, Arrays.asList(first.getTaskManagerLocation(), third.getTaskManagerLocation())).get();
assertEquals(third, s4.getOwner());
assertEquals(third, s5.getOwner());
LogicalSlot s4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(first, third)), false, Arrays.asList(first.getTaskManagerLocation(), third.getTaskManagerLocation())).get();
LogicalSlot s5 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(first, third)), false, Arrays.asList(first.getTaskManagerLocation(), third.getTaskManagerLocation())).get();
assertEquals(third.getTaskManagerID(), s4.getTaskManagerLocation().getResourceID());
assertEquals(third.getTaskManagerID(), s5.getTaskManagerLocation().getResourceID());
// first or third --> second, because all others are full
SimpleSlot s6 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(first, third)), false, Arrays.asList(first.getTaskManagerLocation(), third.getTaskManagerLocation())).get();
assertEquals(second, s6.getOwner());
LogicalSlot s6 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(first, third)), false, Arrays.asList(first.getTaskManagerLocation(), third.getTaskManagerLocation())).get();
assertEquals(second.getTaskManagerID(), s6.getTaskManagerLocation().getResourceID());
// release something on the first and second instance
s2.releaseSlot();
s6.releaseSlot();
SimpleSlot s7 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(first, third)), false, Arrays.asList(first.getTaskManagerLocation(), third.getTaskManagerLocation())).get();
assertEquals(first, s7.getOwner());
LogicalSlot s7 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(first, third)), false, Arrays.asList(first.getTaskManagerLocation(), third.getTaskManagerLocation())).get();
assertEquals(first.getTaskManagerID(), s7.getTaskManagerLocation().getResourceID());
assertEquals(1, scheduler.getNumberOfUnconstrainedAssignments());
assertEquals(1, scheduler.getNumberOfNonLocalizedAssignments());
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册