提交 95d035ab 编写于 作者: S Stephan Ewen

[FLINK-2619] [tests] Fix failing ExecutionGraphRestartTest and JobManagerRegistrationTest

上级 c9edd9a8
......@@ -857,20 +857,25 @@ public class ExecutionGraph implements Serializable {
else if (current == JobStatus.FAILING) {
if (numberOfRetriesLeft > 0 && transitionState(current, JobStatus.RESTARTING)) {
numberOfRetriesLeft--;
future(new Callable<Object>() {
@Override
public Object call() throws Exception {
try {
LOG.info("Delaying retry of job execution for {} ms ...", delayBeforeRetrying);
Thread.sleep(delayBeforeRetrying);
if (delayBeforeRetrying > 0) {
future(new Callable<Object>() {
@Override
public Object call() throws Exception {
try {
LOG.info("Delaying retry of job execution for {} ms ...", delayBeforeRetrying);
Thread.sleep(delayBeforeRetrying);
}
catch(InterruptedException e){
// should only happen on shutdown
}
restart();
return null;
}
catch(InterruptedException e){
// should only happen on shutdown
}
restart();
return null;
}
}, executionContext);
}, executionContext);
} else {
restart();
}
break;
}
else if (numberOfRetriesLeft <= 0 && transitionState(current, JobStatus.FAILED, failureCause)) {
......
......@@ -112,7 +112,9 @@ public class ExecutionGraphTestUtils {
return new Instance(gateway, connection, new InstanceID(), hardwareDescription, numberOfSlots);
}
@SuppressWarnings("serial")
public static class SimpleActorGateway extends BaseTestingActorGateway {
public TaskDeploymentDescriptor lastTDD;
public SimpleActorGateway(ExecutionContext executionContext){
......@@ -139,7 +141,9 @@ public class ExecutionGraphTestUtils {
}
}
@SuppressWarnings("serial")
public static class SimpleFailingActorGateway extends BaseTestingActorGateway {
public SimpleFailingActorGateway(ExecutionContext executionContext) {
super(executionContext);
}
......
......@@ -26,10 +26,13 @@ import org.apache.flink.runtime.jobgraph.{JobStatus, JobGraph, JobVertex}
import org.apache.flink.runtime.jobmanager.Tasks
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler
import org.apache.flink.runtime.testingUtils.TestingUtils
import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
import org.scalatest.{Matchers, WordSpecLike}
import scala.collection.JavaConverters._
@RunWith(classOf[JUnitRunner])
class ExecutionGraphRestartTest extends WordSpecLike with Matchers {
......@@ -39,7 +42,8 @@ class ExecutionGraphRestartTest extends WordSpecLike with Matchers {
"be manually restartable" in {
try {
val instance = ExecutionGraphTestUtils.getInstance(
new SimpleActorGateway(TestingUtils.directExecutionContext))
new SimpleActorGateway(TestingUtils.directExecutionContext),
NUM_TASKS)
val scheduler = new Scheduler(TestingUtils.defaultExecutionContext)
scheduler.newInstanceAvailable(instance)
......@@ -65,14 +69,18 @@ class ExecutionGraphRestartTest extends WordSpecLike with Matchers {
eg.getState should equal(JobStatus.RUNNING)
eg.getAllExecutionVertices.iterator().next().fail(new Exception("Test Exception"))
for (vertex <- eg.getAllExecutionVertices().asScala) {
vertex.getCurrentExecutionAttempt().cancelingComplete()
}
eg.getState should equal(JobStatus.FAILED)
eg.restart()
eg.getState should equal(JobStatus.RUNNING)
import collection.JavaConverters._
for (vertex <- eg.getAllExecutionVertices.asScala) {
vertex.executionFinished()
vertex.getCurrentExecutionAttempt().markFinished()
}
eg.getState should equal(JobStatus.FINISHED)
......@@ -86,7 +94,8 @@ class ExecutionGraphRestartTest extends WordSpecLike with Matchers {
"restart itself automatically" in {
try {
val instance = ExecutionGraphTestUtils.getInstance(
new SimpleActorGateway(TestingUtils.directExecutionContext))
new SimpleActorGateway(TestingUtils.directExecutionContext),
NUM_TASKS)
val scheduler = new Scheduler(TestingUtils.defaultExecutionContext)
scheduler.newInstanceAvailable(instance)
......@@ -112,15 +121,19 @@ class ExecutionGraphRestartTest extends WordSpecLike with Matchers {
eg.getState should equal(JobStatus.RUNNING)
eg.getAllExecutionVertices.iterator().next().fail(new Exception("Test Exception"))
eg.getState should equal(JobStatus.FAILING)
for (vertex <- eg.getAllExecutionVertices.asScala) {
vertex.getCurrentExecutionAttempt().cancelingComplete()
}
eg.getState should equal(JobStatus.RUNNING)
import collection.JavaConverters._
for (vertex <- eg.getAllExecutionVertices.asScala) {
vertex.executionFinished()
vertex.getCurrentExecutionAttempt().markFinished()
}
eg.getState should equal(JobStatus.FINISHED)
eg.getState() should equal(JobStatus.FINISHED)
} catch {
case t: Throwable =>
t.printStackTrace()
......@@ -128,5 +141,4 @@ class ExecutionGraphRestartTest extends WordSpecLike with Matchers {
}
}
}
}
......@@ -19,18 +19,21 @@
package org.apache.flink.runtime.jobmanager
import java.net.InetAddress
import java.util.UUID
import akka.actor._
import akka.testkit.{ImplicitSender, TestKit}
import org.apache.flink.configuration.Configuration
import org.apache.flink.runtime.StreamingMode
import org.apache.flink.runtime.akka.AkkaUtils
import org.apache.flink.runtime.instance.{HardwareDescription, InstanceConnectionInfo, InstanceID}
import org.apache.flink.runtime.instance._
import org.apache.flink.runtime.jobmanager.JobManagerRegistrationTest.PlainForwardingActor
import org.apache.flink.runtime.messages.JobManagerMessages.LeaderSessionMessage
import org.apache.flink.runtime.messages.RegistrationMessages.{AcknowledgeRegistration, AlreadyRegistered, RegisterTaskManager}
import org.junit.Assert.{assertNotEquals, assertNotNull}
import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}
......@@ -56,123 +59,119 @@ ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll {
"assign a TaskManager a unique instance ID" in {
val jm = startTestingJobManager(_system)
val tmDummy1 = _system.actorOf(Props(classOf[JobManagerRegistrationTest.DummyActor]))
val tmDummy2 = _system.actorOf(Props(classOf[JobManagerRegistrationTest.DummyActor]))
try {
val connectionInfo1 = new InstanceConnectionInfo(InetAddress.getLocalHost, 10000)
val connectionInfo2 = new InstanceConnectionInfo(InetAddress.getLocalHost, 10001)
val hardwareDescription = HardwareDescription.extractFromSystem(10)
val leaderSessionID = UUID.randomUUID()
var id1: InstanceID = null
var id2: InstanceID = null
// task manager 1
within(1 second) {
jm.tell(
RegisterTaskManager(
connectionInfo1,
hardwareDescription,
1),
tmDummy1)
val response = receiveOne(1 second)
response match {
case LeaderSessionMessage(leaderSessionID, AcknowledgeRegistration(id, _)) => id1 = id
case _ => fail("Wrong response message: " + response)
}
val tm1 = _system.actorOf(Props(new PlainForwardingActor(testActor)))
val tm2 = _system.actorOf(Props(new PlainForwardingActor(testActor)))
val connectionInfo1 = new InstanceConnectionInfo(InetAddress.getLocalHost, 10000)
val connectionInfo2 = new InstanceConnectionInfo(InetAddress.getLocalHost, 10001)
val hardwareDescription = HardwareDescription.extractFromSystem(10)
var id1: InstanceID = null
var id2: InstanceID = null
// task manager 1
within(1 second) {
jm.tell(
RegisterTaskManager(
connectionInfo1,
hardwareDescription,
1),
new AkkaActorGateway(tm1, null))
val response = expectMsgType[LeaderSessionMessage]
response match {
case LeaderSessionMessage(_, AcknowledgeRegistration(id, _)) => id1 = id
case _ => fail("Wrong response message: " + response)
}
}
// task manager 2
within(1 second) {
jm.tell(
RegisterTaskManager(
connectionInfo2,
hardwareDescription,
1),
tmDummy2)
val response = receiveOne(1 second)
response match {
case LeaderSessionMessage(leaderSessionID, AcknowledgeRegistration(id, _)) => id2 = id
case _ => fail("Wrong response message: " + response)
}
// task manager 2
within(1 second) {
jm.tell(
RegisterTaskManager(
connectionInfo2,
hardwareDescription,
1),
new AkkaActorGateway(tm2, null))
val response = expectMsgType[LeaderSessionMessage]
response match {
case LeaderSessionMessage(leaderSessionID, AcknowledgeRegistration(id, _)) => id2 = id
case _ => fail("Wrong response message: " + response)
}
assertNotNull(id1)
assertNotNull(id2)
assertNotEquals(id1, id2)
}
finally {
tmDummy1 ! Kill
tmDummy2 ! Kill
jm ! Kill
}
assertNotNull(id1)
assertNotNull(id2)
assertNotEquals(id1, id2)
}
"handle repeated registration calls" in {
val jm = startTestingJobManager(_system)
val tmDummy = _system.actorOf(Props(classOf[JobManagerRegistrationTest.DummyActor]))
try {
val connectionInfo = new InstanceConnectionInfo(InetAddress.getLocalHost,1)
val hardwareDescription = HardwareDescription.extractFromSystem(10)
within(1 second) {
jm.tell(
RegisterTaskManager(
connectionInfo,
hardwareDescription,
1),
tmDummy)
jm.tell(
RegisterTaskManager(
connectionInfo,
hardwareDescription,
1),
tmDummy)
jm.tell(
RegisterTaskManager(
connectionInfo,
hardwareDescription,
1),
tmDummy)
expectMsgType[AcknowledgeRegistration]
expectMsgType[AlreadyRegistered]
expectMsgType[AlreadyRegistered]
val selfGateway = new AkkaActorGateway(testActor, null)
val connectionInfo = new InstanceConnectionInfo(InetAddress.getLocalHost,1)
val hardwareDescription = HardwareDescription.extractFromSystem(10)
within(5 second) {
jm.tell(
RegisterTaskManager(
connectionInfo,
hardwareDescription,
1),
selfGateway)
jm.tell(
RegisterTaskManager(
connectionInfo,
hardwareDescription,
1),
selfGateway)
jm.tell(
RegisterTaskManager(
connectionInfo,
hardwareDescription,
1),
selfGateway)
expectMsgType[LeaderSessionMessage] match {
case LeaderSessionMessage(null, AcknowledgeRegistration(_, _)) =>
case m => fail("Wrong message type: " + m)
}
expectMsgType[LeaderSessionMessage] match {
case LeaderSessionMessage(null, AlreadyRegistered(_, _)) =>
case m => fail("Wrong message type: " + m)
}
expectMsgType[LeaderSessionMessage] match {
case LeaderSessionMessage(null, AlreadyRegistered(_, _)) =>
case m => fail("Wrong message type: " + m)
}
} finally {
tmDummy ! Kill
jm ! Kill
}
}
}
private def startTestingJobManager(system: ActorSystem): ActorRef = {
private def startTestingJobManager(system: ActorSystem): ActorGateway = {
val (jm: ActorRef, _) = JobManager.startJobManagerActors(
new Configuration(),
_system,
None,
None,
StreamingMode.BATCH_ONLY)
jm
new AkkaActorGateway(jm, null)
}
}
object JobManagerRegistrationTest {
/** Simply dummy actor that swallows all messages */
class DummyActor extends Actor {
class PlainForwardingActor(private val target: ActorRef) extends Actor {
override def receive: Receive = {
case _ =>
case message => target.forward(message)
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册