提交 c54d9a64 编写于 作者: T tison 提交者: Chesnay Schepler

[FLINK-10569][runtime] Remove Instance usage in ExecutionVertexCancelTest

上级 991a5595
......@@ -47,6 +47,7 @@ import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.jobmaster.SlotOwner;
import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
import org.apache.flink.runtime.messages.Acknowledge;
......@@ -337,7 +338,7 @@ public class ExecutionGraphTestUtils {
}
}
public static void setVertexResource(ExecutionVertex vertex, SimpleSlot slot) {
public static void setVertexResource(ExecutionVertex vertex, LogicalSlot slot) {
Execution exec = vertex.getCurrentExecutionAttempt();
if(!exec.tryAssignResource(slot)) {
......
......@@ -18,10 +18,11 @@
package org.apache.flink.runtime.executiongraph;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.instance.BaseTestingActorGateway;
import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
import org.apache.flink.runtime.instance.DummyActorGateway;
import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.instance.SimpleSlot;
......@@ -29,10 +30,9 @@ import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint;
import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.jobmaster.TestingLogicalSlot;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.TaskMessages.CancelTask;
import org.apache.flink.runtime.messages.TaskMessages.SubmitTask;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
import org.apache.flink.util.TestLogger;
......@@ -40,8 +40,7 @@ import org.junit.Test;
import java.io.IOException;
import java.util.Collections;
import scala.concurrent.ExecutionContext;
import java.util.concurrent.CompletableFuture;
import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getExecutionVertex;
import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getInstance;
......@@ -124,12 +123,7 @@ public class ExecutionVertexCancelTest extends TestLogger {
final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
AkkaUtils.getDefaultTimeout());
ActorGateway actorGateway = new CancelSequenceActorGateway(
TestingUtils.directExecutionContext(),
1);
Instance instance = getInstance(new ActorTaskManagerGateway(actorGateway));
SimpleSlot slot = instance.allocateSimpleSlot();
LogicalSlot slot = new TestingLogicalSlot(new CancelSequenceSimpleAckingTaskManagerGateway(1));
setVertexResource(vertex, slot);
setVertexState(vertex, ExecutionState.RUNNING);
......@@ -141,7 +135,7 @@ public class ExecutionVertexCancelTest extends TestLogger {
assertEquals(ExecutionState.CANCELED, vertex.getExecutionState());
assertTrue(slot.isReleased());
assertFalse(slot.isAlive());
assertNull(vertex.getFailureCause());
......@@ -164,12 +158,7 @@ public class ExecutionVertexCancelTest extends TestLogger {
final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
AkkaUtils.getDefaultTimeout());
final ActorGateway actorGateway = new CancelSequenceActorGateway(
TestingUtils.directExecutionContext(),
1);
Instance instance = getInstance(new ActorTaskManagerGateway(actorGateway));
SimpleSlot slot = instance.allocateSimpleSlot();
LogicalSlot slot = new TestingLogicalSlot(new CancelSequenceSimpleAckingTaskManagerGateway(1));
setVertexResource(vertex, slot);
setVertexState(vertex, ExecutionState.RUNNING);
......@@ -189,7 +178,7 @@ public class ExecutionVertexCancelTest extends TestLogger {
assertEquals(ExecutionState.CANCELED, vertex.getExecutionState());
assertTrue(slot.isReleased());
assertFalse(slot.isAlive());
assertNull(vertex.getFailureCause());
......@@ -212,12 +201,7 @@ public class ExecutionVertexCancelTest extends TestLogger {
final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
AkkaUtils.getDefaultTimeout());
final ActorGateway actorGateway = new CancelSequenceActorGateway(
TestingUtils.directExecutionContext(),
1);
Instance instance = getInstance(new ActorTaskManagerGateway(actorGateway));
SimpleSlot slot = instance.allocateSimpleSlot();
LogicalSlot slot = new TestingLogicalSlot(new CancelSequenceSimpleAckingTaskManagerGateway(1));
setVertexResource(vertex, slot);
setVertexState(vertex, ExecutionState.RUNNING);
......@@ -247,10 +231,7 @@ public class ExecutionVertexCancelTest extends TestLogger {
final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
AkkaUtils.getDefaultTimeout());
final ActorGateway gateway = new CancelSequenceActorGateway(TestingUtils.directExecutionContext(), 0);
Instance instance = getInstance(new ActorTaskManagerGateway(gateway));
SimpleSlot slot = instance.allocateSimpleSlot();
LogicalSlot slot = new TestingLogicalSlot(new CancelSequenceSimpleAckingTaskManagerGateway(0));
setVertexResource(vertex, slot);
setVertexState(vertex, ExecutionState.RUNNING);
......@@ -262,7 +243,7 @@ public class ExecutionVertexCancelTest extends TestLogger {
// Callback fails, leading to CANCELED
assertEquals(ExecutionState.CANCELED, vertex.getExecutionState());
assertTrue(slot.isReleased());
assertFalse(slot.isAlive());
assertTrue(vertex.getStateTimestamp(ExecutionState.CREATED) > 0);
assertTrue(vertex.getStateTimestamp(ExecutionState.CANCELING) > 0);
......@@ -399,33 +380,24 @@ public class ExecutionVertexCancelTest extends TestLogger {
}
}
public static class CancelSequenceActorGateway extends BaseTestingActorGateway {
public static class CancelSequenceSimpleAckingTaskManagerGateway extends SimpleAckingTaskManagerGateway {
private final int successfulOperations;
private int index = -1;
public CancelSequenceActorGateway(ExecutionContext executionContext, int successfulOperations) {
super(executionContext);
public CancelSequenceSimpleAckingTaskManagerGateway(int successfulOperations) {
super();
this.successfulOperations = successfulOperations;
}
@Override
public Object handleMessage(Object message) throws Exception {
Object result;
if(message instanceof SubmitTask) {
result = Acknowledge.get();
} else if(message instanceof CancelTask) {
index++;
if(index >= successfulOperations){
throw new IOException("RPC call failed.");
} else {
result = Acknowledge.get();
}
public CompletableFuture<Acknowledge> cancelTask(ExecutionAttemptID executionAttemptID, Time timeout) {
index++;
if (index >= successfulOperations) {
return FutureUtils.completedExceptionally(new IOException("Rpc call fails"));
} else {
result = null;
return CompletableFuture.completedFuture(Acknowledge.get());
}
return result;
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册