提交 a3b9d971 编写于 作者: U Ufuk Celebi

[tests] Try to improve CI test stability

Squashes the following commits:
- [tests] Wait for task managers in JobManagerFailsITCase
  Possible fix for: https://s3.amazonaws.com/archive.travis-ci.org/jobs/110235128/log.txt
- [tests] Move ITCase from runtime to tests
- [tests] Merge abstract and sub type class
- [tests] Determine JobManagerProcess ports from logs

This closes #1676.
上级 59f978f8
......@@ -25,7 +25,6 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.jobmanager.JobManager;
import org.apache.flink.runtime.jobmanager.JobManagerMode;
import org.apache.flink.util.NetUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;
......@@ -36,6 +35,8 @@ import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
......@@ -47,51 +48,37 @@ public class JobManagerProcess extends TestJvmProcess {
private static final Logger LOG = LoggerFactory.getLogger(JobManagerProcess.class);
/** Pattern to parse the job manager port from the logs. */
private static final Pattern PORT_PATTERN = Pattern.compile(".*Starting JobManager at akka\\.tcp://flink@.*:(\\d+).*");
/** ID for this JobManager */
private final int id;
/** The port the JobManager listens on */
private final int jobManagerPort;
/** The configuration for the JobManager */
private final Configuration config;
/** Configuration parsed as args for {@link JobManagerProcess.JobManagerProcessEntryPoint} */
private final String[] jvmArgs;
/** The port the JobManager listens on */
private int jobManagerPort;
private ActorRef jobManagerRef;
/**
* Creates a {@link JobManager} running in a separate JVM.
*
* <p>See {@link #JobManagerProcess(int, Configuration, int)} for a more
* detailed
* description.
*
* @param id ID for the JobManager
* @param config Configuration for the job manager process
* @throws Exception
*/
public JobManagerProcess(int id, Configuration config) throws Exception {
this(id, config, 0);
}
/**
* Creates a {@link JobManager} running in a separate JVM.
*
* @param id ID for the JobManager
* @param config Configuration for the job manager process
* @param jobManagerPort Job manager port (if <code>0</code>, pick any available port)
* @throws Exception
*/
public JobManagerProcess(int id, Configuration config, int jobManagerPort) throws Exception {
public JobManagerProcess(int id, Configuration config) throws Exception {
checkArgument(id >= 0, "Negative ID");
this.id = id;
this.config = checkNotNull(config, "Configuration");
this.jobManagerPort = jobManagerPort <= 0 ? NetUtils.getAvailablePort() : jobManagerPort;
ArrayList<String> args = new ArrayList<>();
args.add("--port");
args.add(String.valueOf(this.jobManagerPort));
for (Map.Entry<String, String> entry : config.toMap().entrySet()) {
args.add("--" + entry.getKey());
......@@ -117,20 +104,50 @@ public class JobManagerProcess extends TestJvmProcess {
return JobManagerProcessEntryPoint.class.getName();
}
public int getJobManagerPort() {
return jobManagerPort;
}
public Configuration getConfig() {
return config;
}
/**
* Parses the port from the job manager logs and returns it.
*
* <p>If a call to this method succeeds, successive calls will directly
* return the port and re-parse the logs.
*
* @param timeout Timeout for log parsing.
* @return The port of the job manager
* @throws InterruptedException If interrupted while waiting before
* retrying to parse the logs
* @throws NumberFormatException If the parsed port is not a number
*/
public int getJobManagerPort(FiniteDuration timeout) throws InterruptedException, NumberFormatException {
if (jobManagerPort > 0) {
return jobManagerPort;
} else {
Deadline deadline = timeout.fromNow();
while (deadline.hasTimeLeft()) {
Matcher matcher = PORT_PATTERN.matcher(getProcessOutput());
if (matcher.find()) {
String port = matcher.group(1);
jobManagerPort = Integer.parseInt(port);
return jobManagerPort;
} else {
Thread.sleep(100);
}
}
throw new RuntimeException("Could not parse port from logs");
}
}
/**
* Returns the Akka URL of this JobManager.
*/
public String getJobManagerAkkaURL() {
public String getJobManagerAkkaURL(FiniteDuration timeout) throws InterruptedException {
int port = getJobManagerPort(timeout);
return JobManager.getRemoteJobManagerAkkaURL(
new InetSocketAddress("localhost", jobManagerPort),
new InetSocketAddress("localhost", port),
Option.<String>empty());
}
......@@ -166,7 +183,7 @@ public class JobManagerProcess extends TestJvmProcess {
// If the Actor is not reachable yet, this throws an Exception. Retry until the
// deadline passes.
this.jobManagerRef = AkkaUtils.getActorRef(
getJobManagerAkkaURL(),
getJobManagerAkkaURL(deadline.timeLeft()),
actorSystem,
deadline.timeLeft());
......@@ -201,14 +218,11 @@ public class JobManagerProcess extends TestJvmProcess {
public static void main(String[] args) {
try {
ParameterTool params = ParameterTool.fromArgs(args);
final int port = Integer.valueOf(params.getRequired("port"));
LOG.info("Running on port {}.", port);
Configuration config = params.getConfiguration();
LOG.info("Configuration: {}.", config);
// Run the JobManager
JobManager.runJobManager(config, JobManagerMode.CLUSTER, "localhost", port);
JobManager.runJobManager(config, JobManagerMode.CLUSTER, "localhost", 0);
// Run forever. Forever, ever? Forever, ever!
new CountDownLatch(1).await();
......@@ -219,5 +233,4 @@ public class JobManagerProcess extends TestJvmProcess {
}
}
}
}
......@@ -42,7 +42,7 @@ public abstract class TestJvmProcess {
private static final Logger LOG = LoggerFactory.getLogger(TestJvmProcess.class);
/** Lock to guard {@link #createAndStart()} and {@link #destroy()} calls. */
/** Lock to guard {@link #startProcess()} and {@link #destroy()} calls. */
private final Object createDestroyLock = new Object();
/** The java command path */
......@@ -122,7 +122,7 @@ public abstract class TestJvmProcess {
* resource leaks. The created process will be child process and is not guaranteed to
* terminate when the parent process terminates.
*/
public void createAndStart() throws IOException {
public void startProcess() throws IOException {
String[] cmd = new String[] {
javaCommandPath,
"-Dlog.level=DEBUG",
......
......@@ -679,7 +679,7 @@ public class ChaosMonkeyITCase extends TestLogger {
for (int i = 0; i < jobManagerProcesses.size(); i++) {
JobManagerProcess jobManager = jobManagerProcesses.get(i);
if (jobManager.getJobManagerAkkaURL().equals(currentLeader)) {
if (jobManager.getJobManagerAkkaURL(timeout).equals(currentLeader)) {
leaderIndex = i;
break;
}
......@@ -696,7 +696,7 @@ public class ChaosMonkeyITCase extends TestLogger {
throws Exception {
JobManagerProcess jobManager = new JobManagerProcess(jobManagerPid++, config);
jobManager.createAndStart();
jobManager.startProcess();
LOG.info("Created and started {}.", jobManager);
return jobManager;
......@@ -706,7 +706,7 @@ public class ChaosMonkeyITCase extends TestLogger {
throws Exception {
TaskManagerProcess taskManager = new TaskManagerProcess(taskManagerPid++, config);
taskManager.createAndStart();
taskManager.startProcess();
LOG.info("Created and started {}.", taskManager);
return taskManager;
......
......@@ -173,8 +173,8 @@ public class JobManagerCheckpointRecoveryITCase extends TestLogger {
jobManagerProcess[0] = new JobManagerProcess(0, config);
jobManagerProcess[1] = new JobManagerProcess(1, config);
jobManagerProcess[0].createAndStart();
jobManagerProcess[1].createAndStart();
jobManagerProcess[0].startProcess();
jobManagerProcess[1].startProcess();
// Leader listener
TestingListener leaderListener = new TestingListener();
......@@ -210,7 +210,7 @@ public class JobManagerCheckpointRecoveryITCase extends TestLogger {
// Who's the boss?
JobManagerProcess leadingJobManagerProcess;
if (jobManagerProcess[0].getJobManagerAkkaURL().equals(leaderListener.getAddress())) {
if (jobManagerProcess[0].getJobManagerAkkaURL(deadline.timeLeft()).equals(leaderListener.getAddress())) {
leadingJobManagerProcess = jobManagerProcess[0];
}
else {
......@@ -324,8 +324,8 @@ public class JobManagerCheckpointRecoveryITCase extends TestLogger {
jobManagerProcess[0] = new JobManagerProcess(0, config);
jobManagerProcess[1] = new JobManagerProcess(1, config);
jobManagerProcess[0].createAndStart();
jobManagerProcess[1].createAndStart();
jobManagerProcess[0].startProcess();
jobManagerProcess[1].startProcess();
// Leader listener
TestingListener leaderListener = new TestingListener();
......@@ -354,7 +354,7 @@ public class JobManagerCheckpointRecoveryITCase extends TestLogger {
// Who's the boss?
JobManagerProcess leadingJobManagerProcess;
JobManagerProcess nonLeadingJobManagerProcess;
if (jobManagerProcess[0].getJobManagerAkkaURL().equals(leaderListener.getAddress())) {
if (jobManagerProcess[0].getJobManagerAkkaURL(testDeadline.timeLeft()).equals(leaderListener.getAddress())) {
leadingJobManagerProcess = jobManagerProcess[0];
nonLeadingJobManagerProcess = jobManagerProcess[1];
}
......@@ -409,7 +409,7 @@ public class JobManagerCheckpointRecoveryITCase extends TestLogger {
assertTrue("Did not find expected output in logs.", success);
}
catch (Throwable t) {
// Print early (in some situtations the process logs get too big
// Print early (in some situations the process logs get too big
// for Travis and the root problem is not shown)
t.printStackTrace();
......
......@@ -16,7 +16,7 @@
* limitations under the License.
*/
package org.apache.flink.runtime.jobmanager;
package org.apache.flink.test.recovery;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
......@@ -34,6 +34,8 @@ import org.apache.flink.runtime.instance.AkkaActorGateway;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobmanager.SubmittedJobGraph;
import org.apache.flink.runtime.jobmanager.Tasks;
import org.apache.flink.runtime.leaderelection.TestingListener;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.messages.JobManagerMessages;
......@@ -75,7 +77,7 @@ import static org.junit.Assert.fail;
/**
* Tests recovery of {@link SubmittedJobGraph} instances.
*/
public class JobManagerSubmittedJobGraphsRecoveryITCase extends TestLogger {
public class JobManagerHAJobGraphRecoveryITCase extends TestLogger {
private final static ZooKeeperTestEnvironment ZooKeeper = new ZooKeeperTestEnvironment(1);
......@@ -253,8 +255,8 @@ public class JobManagerSubmittedJobGraphsRecoveryITCase extends TestLogger {
jobManagerProcess[0] = new JobManagerProcess(0, config);
jobManagerProcess[1] = new JobManagerProcess(1, config);
jobManagerProcess[0].createAndStart();
jobManagerProcess[1].createAndStart();
jobManagerProcess[0].startProcess();
jobManagerProcess[1].startProcess();
// Leader listener
TestingListener leaderListener = new TestingListener();
......@@ -299,7 +301,7 @@ public class JobManagerSubmittedJobGraphsRecoveryITCase extends TestLogger {
// Who's the boss?
JobManagerProcess leadingJobManagerProcess;
if (jobManagerProcess[0].getJobManagerAkkaURL().equals(leaderListener.getAddress())) {
if (jobManagerProcess[0].getJobManagerAkkaURL(deadline.timeLeft()).equals(leaderListener.getAddress())) {
leadingJobManagerProcess = jobManagerProcess[0];
}
else {
......
......@@ -21,6 +21,13 @@ package org.apache.flink.test.recovery;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import org.apache.commons.io.FileUtils;
import org.apache.flink.api.common.ExecutionMode;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
......@@ -35,16 +42,21 @@ import org.apache.flink.runtime.testutils.JobManagerProcess;
import org.apache.flink.runtime.testutils.ZooKeeperTestUtils;
import org.apache.flink.runtime.util.ZooKeeperUtils;
import org.apache.flink.runtime.zookeeper.ZooKeeperTestEnvironment;
import org.apache.flink.util.Collector;
import org.apache.flink.util.TestLogger;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import scala.Option;
import scala.concurrent.duration.Deadline;
import scala.concurrent.duration.FiniteDuration;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
......@@ -67,7 +79,9 @@ import static org.junit.Assert.fail;
*
* <p>This follows the same structure as {@link AbstractTaskManagerProcessFailureRecoveryTest}.
*/
public abstract class AbstractJobManagerProcessFailureRecoveryITCase extends TestLogger {
@SuppressWarnings("serial")
@RunWith(Parameterized.class)
public class JobManagerHAProcessFailureBatchRecoveryITCase extends TestLogger {
private final static ZooKeeperTestEnvironment ZooKeeper = new ZooKeeperTestEnvironment(1);
......@@ -108,6 +122,23 @@ public abstract class AbstractJobManagerProcessFailureRecoveryITCase extends Tes
protected static final int PARALLELISM = 4;
// --------------------------------------------------------------------------------------------
// Parametrization (run pipelined and batch)
// --------------------------------------------------------------------------------------------
private final ExecutionMode executionMode;
public JobManagerHAProcessFailureBatchRecoveryITCase(ExecutionMode executionMode) {
this.executionMode = executionMode;
}
@Parameterized.Parameters
public static Collection<Object[]> executionMode() {
return Arrays.asList(new Object[][]{
{ ExecutionMode.PIPELINED},
{ExecutionMode.BATCH}});
}
/**
* Test program with JobManager failure.
*
......@@ -115,7 +146,75 @@ public abstract class AbstractJobManagerProcessFailureRecoveryITCase extends Tes
* @param coordinateDir Coordination directory
* @throws Exception
*/
public abstract void testJobManagerFailure(String zkQuorum, File coordinateDir) throws Exception;
public void testJobManagerFailure(String zkQuorum, final File coordinateDir) throws Exception {
Configuration config = new Configuration();
config.setString(ConfigConstants.RECOVERY_MODE, "ZOOKEEPER");
config.setString(ConfigConstants.ZOOKEEPER_QUORUM_KEY, zkQuorum);
ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
"leader", 1, config);
env.setParallelism(PARALLELISM);
env.setNumberOfExecutionRetries(1);
env.getConfig().setExecutionMode(executionMode);
env.getConfig().disableSysoutLogging();
final long NUM_ELEMENTS = 100000L;
final DataSet<Long> result = env.generateSequence(1, NUM_ELEMENTS)
// make sure every mapper is involved (no one is skipped because of lazy split assignment)
.rebalance()
// the majority of the behavior is in the MapFunction
.map(new RichMapFunction<Long, Long>() {
private final File proceedFile = new File(coordinateDir, PROCEED_MARKER_FILE);
private boolean markerCreated = false;
private boolean checkForProceedFile = true;
@Override
public Long map(Long value) throws Exception {
if (!markerCreated) {
int taskIndex = getRuntimeContext().getIndexOfThisSubtask();
AbstractTaskManagerProcessFailureRecoveryTest.touchFile(
new File(coordinateDir, READY_MARKER_FILE_PREFIX + taskIndex));
markerCreated = true;
}
// check if the proceed file exists
if (checkForProceedFile) {
if (proceedFile.exists()) {
checkForProceedFile = false;
}
else {
// otherwise wait so that we make slow progress
Thread.sleep(100);
}
}
return value;
}
})
.reduce(new ReduceFunction<Long>() {
@Override
public Long reduce(Long value1, Long value2) {
return value1 + value2;
}
})
// The check is done in the mapper, because the client can currently not handle
// job manager losses/reconnects.
.flatMap(new RichFlatMapFunction<Long, Long>() {
@Override
public void flatMap(Long value, Collector<Long> out) throws Exception {
assertEquals(NUM_ELEMENTS * (NUM_ELEMENTS + 1L) / 2L, (long) value);
int taskIndex = getRuntimeContext().getIndexOfThisSubtask();
AbstractTaskManagerProcessFailureRecoveryTest.touchFile(
new File(coordinateDir, FINISH_MARKER_FILE_PREFIX + taskIndex));
}
});
result.output(new DiscardingOutputFormat<Long>());
env.execute();
}
@Test
public void testJobManagerProcessFailure() throws Exception {
......@@ -154,7 +253,7 @@ public abstract class AbstractJobManagerProcessFailureRecoveryITCase extends Tes
// Start first process
jmProcess[0] = new JobManagerProcess(0, config);
jmProcess[0].createAndStart();
jmProcess[0].startProcess();
// Task manager configuration
config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 4);
......@@ -222,7 +321,7 @@ public abstract class AbstractJobManagerProcessFailureRecoveryITCase extends Tes
jmProcess[0].destroy();
jmProcess[1] = new JobManagerProcess(1, config);
jmProcess[1].createAndStart();
jmProcess[1].startProcess();
jmProcess[1].getActorRef(testActorSystem, deadline.timeLeft());
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.test.recovery;
import org.apache.flink.api.common.ExecutionMode;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.io.File;
import java.util.Arrays;
import java.util.Collection;
import static org.junit.Assert.assertEquals;
/**
* Test the recovery of a simple batch program in the case of JobManager process failure.
*/
@SuppressWarnings("serial")
@RunWith(Parameterized.class)
public class JobManagerProcessFailureBatchRecoveryITCase extends AbstractJobManagerProcessFailureRecoveryITCase {
// --------------------------------------------------------------------------------------------
// Parametrization (run pipelined and batch)
// --------------------------------------------------------------------------------------------
private final ExecutionMode executionMode;
public JobManagerProcessFailureBatchRecoveryITCase(ExecutionMode executionMode) {
this.executionMode = executionMode;
}
@Parameterized.Parameters
public static Collection<Object[]> executionMode() {
return Arrays.asList(new Object[][]{
{ExecutionMode.PIPELINED},
{ExecutionMode.BATCH}});
}
// --------------------------------------------------------------------------------------------
// Test the program
// --------------------------------------------------------------------------------------------
// This is slightly modified copy the task manager process failure program.
@Override
public void testJobManagerFailure(String zkQuorum, final File coordinateDir) throws Exception {
Configuration config = new Configuration();
config.setString(ConfigConstants.RECOVERY_MODE, "ZOOKEEPER");
config.setString(ConfigConstants.ZOOKEEPER_QUORUM_KEY, zkQuorum);
ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
"leader", 1, config);
env.setParallelism(PARALLELISM);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 1000));
env.getConfig().setExecutionMode(executionMode);
env.getConfig().disableSysoutLogging();
final long NUM_ELEMENTS = 100000L;
final DataSet<Long> result = env.generateSequence(1, NUM_ELEMENTS)
// make sure every mapper is involved (no one is skipped because of lazy split assignment)
.rebalance()
// the majority of the behavior is in the MapFunction
.map(new RichMapFunction<Long, Long>() {
private final File proceedFile = new File(coordinateDir, PROCEED_MARKER_FILE);
private boolean markerCreated = false;
private boolean checkForProceedFile = true;
@Override
public Long map(Long value) throws Exception {
if (!markerCreated) {
int taskIndex = getRuntimeContext().getIndexOfThisSubtask();
AbstractTaskManagerProcessFailureRecoveryTest.touchFile(
new File(coordinateDir, READY_MARKER_FILE_PREFIX + taskIndex));
markerCreated = true;
}
// check if the proceed file exists
if (checkForProceedFile) {
if (proceedFile.exists()) {
checkForProceedFile = false;
}
else {
// otherwise wait so that we make slow progress
Thread.sleep(100);
}
}
return value;
}
})
.reduce(new ReduceFunction<Long>() {
@Override
public Long reduce(Long value1, Long value2) {
return value1 + value2;
}
})
// The check is done in the mapper, because the client can currently not handle
// job manager losses/reconnects.
.flatMap(new RichFlatMapFunction<Long, Long>() {
@Override
public void flatMap(Long value, Collector<Long> out) throws Exception {
assertEquals(NUM_ELEMENTS * (NUM_ELEMENTS + 1L) / 2L, (long) value);
int taskIndex = getRuntimeContext().getIndexOfThisSubtask();
AbstractTaskManagerProcessFailureRecoveryTest.touchFile(
new File(coordinateDir, FINISH_MARKER_FILE_PREFIX + taskIndex));
}
});
result.output(new DiscardingOutputFormat<Long>());
env.execute();
}
}
......@@ -20,21 +20,20 @@ package org.apache.flink.api.scala.runtime.jobmanager
import akka.actor.{ActorSystem, PoisonPill}
import akka.testkit.{ImplicitSender, TestKit}
import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}
import org.apache.flink.configuration.{ConfigConstants, Configuration}
import org.apache.flink.runtime.akka.{ListeningBehaviour, AkkaUtils}
import org.apache.flink.runtime.jobgraph.{JobVertex, JobGraph}
import org.apache.flink.runtime.akka.{AkkaUtils, ListeningBehaviour}
import org.apache.flink.runtime.jobgraph.{JobGraph, JobVertex}
import org.apache.flink.runtime.jobmanager.Tasks.{BlockingNoOpInvokable, NoOpInvokable}
import org.apache.flink.runtime.messages.JobManagerMessages._
import org.apache.flink.runtime.messages.Messages.Acknowledge
import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.NotifyWhenAtLeastNumTaskManagerAreRegistered
import org.apache.flink.runtime.testingUtils.TestingMessages.DisableDisconnect
import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages.{JobManagerTerminated, NotifyWhenJobManagerTerminated}
import org.apache.flink.runtime.testingUtils.{ScalaTestingUtils, TestingUtils}
import org.apache.flink.test.util.ForkableFlinkMiniCluster
import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}
@RunWith(classOf[JUnitRunner])
class JobManagerFailsITCase(_system: ActorSystem)
......@@ -57,13 +56,13 @@ class JobManagerFailsITCase(_system: ActorSystem)
val num_slots = 13
val cluster = startDeathwatchCluster(num_slots, 1)
val tm = cluster.getTaskManagers(0)
val jmGateway = cluster.getLeaderGateway(TestingUtils.TESTING_DURATION)
try {
val tm = cluster.getTaskManagers(0)
val jmGateway = cluster.getLeaderGateway(TestingUtils.TESTING_DURATION)
// disable disconnect message to test death watch
tm ! DisableDisconnect
// disable disconnect message to test death watch
tm ! DisableDisconnect
try {
within(TestingUtils.TESTING_DURATION) {
jmGateway.tell(RequestNumberRegisteredTaskManager, self)
expectMsg(1)
......@@ -103,10 +102,10 @@ class JobManagerFailsITCase(_system: ActorSystem)
val cluster = startDeathwatchCluster(num_slots / 2, 2)
var jmGateway = cluster.getLeaderGateway(TestingUtils.TESTING_DURATION)
val tm = cluster.getTaskManagers(0)
try {
var jmGateway = cluster.getLeaderGateway(TestingUtils.TESTING_DURATION)
val tm = cluster.getTaskManagers(0)
within(TestingUtils.TESTING_DURATION) {
jmGateway.tell(SubmitJob(jobGraph, ListeningBehaviour.DETACHED), self)
expectMsg(JobSubmitSuccess(jobGraph.getJobID))
......@@ -119,10 +118,13 @@ class JobManagerFailsITCase(_system: ActorSystem)
cluster.restartLeadingJobManager()
cluster.waitForTaskManagersToBeRegistered()
jmGateway = cluster.getLeaderGateway(TestingUtils.TESTING_DURATION)
// Ask the job manager for the TMs. Don't ask the TMs, because they
// can still have state associated with the old job manager.
jmGateway.tell(NotifyWhenAtLeastNumTaskManagerAreRegistered(2), self)
expectMsg(Acknowledge)
jmGateway.tell(SubmitJob(jobGraph2, ListeningBehaviour.EXECUTION_RESULT), self)
expectMsg(JobSubmitSuccess(jobGraph2.getJobID()))
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册