提交 db4d139d 编写于 作者: T Till Rohrmann

[FLINK-3184] [timeouts] Set default cluster side timeout to 10 s and the...

[FLINK-3184] [timeouts] Set default cluster side timeout to 10 s and the client side timeout to 60 s

Add missing param descriptions to FlinkYarnCluster, remove implicit timeout from ApplicationClient

This closes #1468
上级 adbeec2f
......@@ -139,7 +139,7 @@ public class CliFrontend {
private final Configuration config;
private final FiniteDuration askTimeout;
private final FiniteDuration clientTimeout;
private final FiniteDuration lookupTimeout;
......@@ -226,7 +226,7 @@ public class CliFrontend {
}
}
this.askTimeout = AkkaUtils.getTimeout(config);
this.clientTimeout = AkkaUtils.getClientTimeout(config);
this.lookupTimeout = AkkaUtils.getLookupTimeout(config);
}
......@@ -482,12 +482,12 @@ public class CliFrontend {
LOG.info("Connecting to JobManager to retrieve list of jobs");
Future<Object> response = jobManagerGateway.ask(
JobManagerMessages.getRequestRunningJobsStatus(),
askTimeout);
JobManagerMessages.getRequestRunningJobsStatus(),
clientTimeout);
Object result;
try {
result = Await.result(response, askTimeout);
result = Await.result(response, clientTimeout);
}
catch (Exception e) {
throw new Exception("Could not retrieve running jobs from the JobManager.", e);
......@@ -614,10 +614,10 @@ public class CliFrontend {
try {
ActorGateway jobManager = getJobManagerGateway(options);
Future<Object> response = jobManager.ask(new CancelJob(jobId), askTimeout);
Future<Object> response = jobManager.ask(new CancelJob(jobId), clientTimeout);
try {
Await.result(response, askTimeout);
Await.result(response, clientTimeout);
return 0;
}
catch (Exception e) {
......@@ -733,12 +733,12 @@ public class CliFrontend {
try {
ActorGateway jobManager = getJobManagerGateway(options);
logAndSysout("Disposing savepoint '" + savepointPath + "'.");
Future<Object> response = jobManager.ask(new DisposeSavepoint(savepointPath), askTimeout);
Future<Object> response = jobManager.ask(new DisposeSavepoint(savepointPath), clientTimeout);
Object result;
try {
logAndSysout("Waiting for response...");
result = Await.result(response, askTimeout);
result = Await.result(response, clientTimeout);
}
catch (Exception e) {
throw new Exception("Disposing the savepoint with path" + savepointPath + " failed.", e);
......
......@@ -141,8 +141,8 @@ public class Client {
throw new IOException("Could start client actor system.", e);
}
timeout = AkkaUtils.getTimeout(config);
lookupTimeout = AkkaUtils.getTimeout(config);
timeout = AkkaUtils.getClientTimeout(config);
lookupTimeout = AkkaUtils.getLookupTimeout(config);
}
// ------------------------------------------------------------------------
......
......@@ -315,7 +315,7 @@ public class FlinkClient {
configuration.setString(ConfigConstants.AKKA_ASK_TIMEOUT, this.timeout);
}
return AkkaUtils.getTimeout(configuration);
return AkkaUtils.getClientTimeout(configuration);
}
private ActorRef getJobManager() throws IOException {
......
......@@ -42,13 +42,13 @@ public final class ConfigConstants {
* Config parameter for the number of re-tries for failed tasks. Setting this
* value to 0 effectively disables fault tolerance.
*/
public static final String DEFAULT_EXECUTION_RETRIES_KEY = "execution-retries.default";
public static final String EXECUTION_RETRIES_KEY = "execution-retries.default";
/**
* Config parameter for the delay between execution retries. The value must be specified in the
* notation "10 s" or "1 min" (style of Scala Finite Durations)
*/
public static final String DEFAULT_EXECUTION_RETRY_DELAY_KEY = "execution-retries.delay";
public static final String EXECUTION_RETRY_DELAY_KEY = "execution-retries.delay";
// -------------------------------- Runtime -------------------------------
......@@ -417,7 +417,7 @@ public final class ConfigConstants {
public static final String AKKA_LOG_LIFECYCLE_EVENTS = "akka.log.lifecycle.events";
/**
* Timeout for all blocking calls
* Timeout for all blocking calls on the cluster side
*/
public static final String AKKA_ASK_TIMEOUT = "akka.ask.timeout";
......@@ -426,6 +426,11 @@ public final class ConfigConstants {
*/
public static final String AKKA_LOOKUP_TIMEOUT = "akka.lookup.timeout";
/**
* Timeout for all blocking calls on the client side
*/
public static final String AKKA_CLIENT_TIMEOUT = "akka.client.timeout";
/**
* Exit JVM on fatal Akka errors
*/
......@@ -704,9 +709,11 @@ public final class ConfigConstants {
public static String DEFAULT_AKKA_FRAMESIZE = "10485760b";
public static String DEFAULT_AKKA_ASK_TIMEOUT = "100 s";
public static String DEFAULT_AKKA_ASK_TIMEOUT = "10 s";
public static String DEFAULT_AKKA_LOOKUP_TIMEOUT = "10 s";
public static String DEFAULT_AKKA_CLIENT_TIMEOUT = "60 s";
// ----------------------------- Streaming Values --------------------------
......
......@@ -82,10 +82,10 @@ public class JobClientActor extends FlinkUntypedActor implements LeaderRetrieval
public JobClientActor(
LeaderRetrievalService leaderRetrievalService,
FiniteDuration submissionTimeout,
FiniteDuration timeout,
boolean sysoutUpdates) {
this.leaderRetrievalService = Preconditions.checkNotNull(leaderRetrievalService);
this.timeout = Preconditions.checkNotNull(submissionTimeout);
this.timeout = Preconditions.checkNotNull(timeout);
this.sysoutUpdates = sysoutUpdates;
}
......@@ -315,6 +315,7 @@ public class JobClientActor extends FlinkUntypedActor implements LeaderRetrieval
}
private void disconnectFromJobManager() {
LOG.info("Disconnect from JobManager {}.", jobManager);
if (jobManager != ActorRef.noSender()) {
getContext().unwatch(jobManager);
jobManager = ActorRef.noSender();
......@@ -322,6 +323,7 @@ public class JobClientActor extends FlinkUntypedActor implements LeaderRetrieval
}
private void connectToJobManager(ActorRef jobManager) {
LOG.info("Connect to JobManager {}.", jobManager);
if (jobManager != ActorRef.noSender()) {
getContext().unwatch(jobManager);
}
......@@ -401,6 +403,7 @@ public class JobClientActor extends FlinkUntypedActor implements LeaderRetrieval
}
private void terminate() {
LOG.info("Terminate JobClientActor.");
terminated = true;
disconnectFromJobManager();
getSelf().tell(decorateMessage(PoisonPill.getInstance()), ActorRef.noSender());
......
......@@ -206,7 +206,7 @@ object AkkaUtils {
val startupTimeout = configuration.getString(
ConfigConstants.AKKA_STARTUP_TIMEOUT,
akkaAskTimeout.toString)
(akkaAskTimeout * 10).toString)
val transportHeartbeatInterval = configuration.getString(
ConfigConstants.AKKA_TRANSPORT_HEARTBEAT_INTERVAL,
......@@ -222,11 +222,11 @@ object AkkaUtils {
val watchHeartbeatInterval = configuration.getString(
ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL,
(akkaAskTimeout/10).toString)
(akkaAskTimeout).toString)
val watchHeartbeatPause = configuration.getString(
ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE,
akkaAskTimeout.toString)
(akkaAskTimeout * 10).toString)
val watchThreshold = configuration.getDouble(
ConfigConstants.AKKA_WATCH_THRESHOLD,
......@@ -234,7 +234,7 @@ object AkkaUtils {
val akkaTCPTimeout = configuration.getString(
ConfigConstants.AKKA_TCP_TIMEOUT,
akkaAskTimeout.toString)
(akkaAskTimeout * 10).toString)
val akkaFramesize = configuration.getString(
ConfigConstants.AKKA_FRAMESIZE,
......@@ -476,6 +476,22 @@ object AkkaUtils {
new FiniteDuration(duration.toMillis, TimeUnit.MILLISECONDS)
}
def getClientTimeout(config: Configuration): FiniteDuration = {
val duration = Duration(
config.getString(
ConfigConstants.AKKA_CLIENT_TIMEOUT,
ConfigConstants.DEFAULT_AKKA_CLIENT_TIMEOUT
))
new FiniteDuration(duration.toMillis, TimeUnit.MILLISECONDS)
}
def getDefaultClientTimeout: FiniteDuration = {
val duration = Duration(ConfigConstants.DEFAULT_AKKA_CLIENT_TIMEOUT)
new FiniteDuration(duration.toMillis, TimeUnit.MILLISECONDS)
}
/** Returns the address of the given [[ActorSystem]]. The [[Address]] object contains
* the port and the host under which the actor system is reachable
*
......
......@@ -1993,7 +1993,7 @@ object JobManager {
ConfigConstants.DEFAULT_LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL) * 1000
val executionRetries = configuration.getInteger(
ConfigConstants.DEFAULT_EXECUTION_RETRIES_KEY,
ConfigConstants.EXECUTION_RETRIES_KEY,
ConfigConstants.DEFAULT_EXECUTION_RETRIES)
val archiveCount = configuration.getInteger(ConfigConstants.JOB_MANAGER_WEB_ARCHIVE_COUNT,
......@@ -2003,7 +2003,7 @@ object JobManager {
// unless explicitly specifies, this is dependent on the heartbeat timeout
val pauseString = configuration.getString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE,
ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT)
val delayString = configuration.getString(ConfigConstants.DEFAULT_EXECUTION_RETRY_DELAY_KEY,
val delayString = configuration.getString(ConfigConstants.EXECUTION_RETRY_DELAY_KEY,
pauseString)
val delayBetweenRetries: Long = try {
......@@ -2011,7 +2011,7 @@ object JobManager {
}
catch {
case n: NumberFormatException => throw new Exception(
s"Invalid config value for ${ConfigConstants.DEFAULT_EXECUTION_RETRY_DELAY_KEY}: " +
s"Invalid config value for ${ConfigConstants.EXECUTION_RETRY_DELAY_KEY}: " +
s"$pauseString. Value must be a valid duration (such as 100 milli or 1 min)");
}
......
......@@ -1835,8 +1835,7 @@ object TaskManager {
val timeout = try {
AkkaUtils.getTimeout(configuration)
}
catch {
} catch {
case e: Exception => throw new IllegalArgumentException(
s"Invalid format for '${ConfigConstants.AKKA_ASK_TIMEOUT}'. " +
s"Use formats like '50 s' or '1 min' to specify the timeout.")
......
......@@ -87,7 +87,7 @@ public class ZooKeeperTestUtils {
config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, "1000 ms");
config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "6 s");
config.setInteger(ConfigConstants.AKKA_WATCH_THRESHOLD, 9);
config.setString(ConfigConstants.DEFAULT_EXECUTION_RETRY_DELAY_KEY, "10 s");
config.setString(ConfigConstants.EXECUTION_RETRY_DELAY_KEY, "10 s");
return config;
}
......
......@@ -57,7 +57,7 @@ class RecoveryITCase(_system: ActorSystem)
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots)
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTaskManagers)
config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, heartbeatTimeout)
config.setString(ConfigConstants.DEFAULT_EXECUTION_RETRY_DELAY_KEY, heartbeatTimeout)
config.setString(ConfigConstants.EXECUTION_RETRY_DELAY_KEY, heartbeatTimeout)
new TestingCluster(config)
}
......
......@@ -44,8 +44,6 @@ import org.apache.flink.test.util.ForkableFlinkMiniCluster;
import org.apache.flink.util.NetUtils;
import org.apache.flink.util.TestLogger;
import org.apache.kafka.common.PartitionInfo;
import org.junit.AfterClass;
import org.junit.BeforeClass;
......@@ -186,7 +184,7 @@ public abstract class KafkaTestBase extends TestLogger {
flinkConfig.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8);
flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 16);
flinkConfig.setString(ConfigConstants.DEFAULT_EXECUTION_RETRY_DELAY_KEY, "0 s");
flinkConfig.setString(ConfigConstants.EXECUTION_RETRY_DELAY_KEY, "0 s");
flink = new ForkableFlinkMiniCluster(flinkConfig, false);
flink.start();
......
......@@ -78,7 +78,7 @@ public class TimestampITCase {
Configuration config = new Configuration();
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TASK_MANAGERS);
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_TASK_SLOTS);
config.setString(ConfigConstants.DEFAULT_EXECUTION_RETRY_DELAY_KEY, "0 ms");
config.setString(ConfigConstants.EXECUTION_RETRY_DELAY_KEY, "0 ms");
config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 12);
cluster = new ForkableFlinkMiniCluster(config, false);
......
......@@ -67,7 +67,7 @@ public class EventTimeAllWindowCheckpointingITCase extends TestLogger {
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, PARALLELISM / 2);
config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 48);
config.setString(ConfigConstants.DEFAULT_EXECUTION_RETRY_DELAY_KEY, "0 ms");
config.setString(ConfigConstants.EXECUTION_RETRY_DELAY_KEY, "0 ms");
cluster = new ForkableFlinkMiniCluster(config, false);
cluster.start();
......
......@@ -68,7 +68,7 @@ public class EventTimeWindowCheckpointingITCase extends TestLogger {
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, PARALLELISM / 2);
config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 48);
config.setString(ConfigConstants.DEFAULT_EXECUTION_RETRY_DELAY_KEY, "0 ms");
config.setString(ConfigConstants.EXECUTION_RETRY_DELAY_KEY, "0 ms");
cluster = new ForkableFlinkMiniCluster(config, false);
cluster.start();
......
......@@ -706,7 +706,7 @@ public class SavepointITCase extends TestLogger {
// Long delay to ensure that the test times out if the job
// manager tries to restart the job.
config.setString(ConfigConstants.DEFAULT_EXECUTION_RETRY_DELAY_KEY, "1 hour");
config.setString(ConfigConstants.EXECUTION_RETRY_DELAY_KEY, "1 hour");
LOG.info("Flink configuration: " + config + ".");
......
......@@ -84,7 +84,7 @@ public class StreamCheckpointNotifierITCase {
Configuration config = new Configuration();
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TASK_MANAGERS);
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_TASK_SLOTS);
config.setString(ConfigConstants.DEFAULT_EXECUTION_RETRY_DELAY_KEY, "0 ms");
config.setString(ConfigConstants.EXECUTION_RETRY_DELAY_KEY, "0 ms");
config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 12);
cluster = new ForkableFlinkMiniCluster(config, false);
......
......@@ -49,7 +49,7 @@ public abstract class StreamFaultToleranceTestBase extends TestLogger {
Configuration config = new Configuration();
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TASK_MANAGERS);
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_TASK_SLOTS);
config.setString(ConfigConstants.DEFAULT_EXECUTION_RETRY_DELAY_KEY, "0 ms");
config.setString(ConfigConstants.EXECUTION_RETRY_DELAY_KEY, "0 ms");
config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 12);
cluster = new ForkableFlinkMiniCluster(config, false);
......
......@@ -79,7 +79,7 @@ public class WindowCheckpointingITCase extends TestLogger {
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, PARALLELISM / 2);
config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 48);
config.setString(ConfigConstants.DEFAULT_EXECUTION_RETRY_DELAY_KEY, "0 ms");
config.setString(ConfigConstants.EXECUTION_RETRY_DELAY_KEY, "0 ms");
cluster = new ForkableFlinkMiniCluster(config, false);
cluster.start();
......
......@@ -54,7 +54,7 @@ public class ClassLoaderITCase {
Configuration config = new Configuration();
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2);
config.setString(ConfigConstants.DEFAULT_EXECUTION_RETRY_DELAY_KEY, "0 s");
config.setString(ConfigConstants.EXECUTION_RETRY_DELAY_KEY, "0 s");
// we need to use the "filesystem" state backend to ensure FLINK-2543 is not happening again.
config.setString(ConfigConstants.STATE_BACKEND, "filesystem");
......
......@@ -121,7 +121,7 @@ public abstract class AbstractTaskManagerProcessFailureRecoveryTest extends Test
jmConfig.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, "1000 ms");
jmConfig.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "6 s");
jmConfig.setInteger(ConfigConstants.AKKA_WATCH_THRESHOLD, 9);
jmConfig.setString(ConfigConstants.DEFAULT_EXECUTION_RETRY_DELAY_KEY, "10 s");
jmConfig.setString(ConfigConstants.EXECUTION_RETRY_DELAY_KEY, "10 s");
jmActorSystem = AkkaUtils.createActorSystem(jmConfig, new Some<Tuple2<String, Object>>(localAddress));
ActorRef jmActor = JobManager.startJobManagerActors(
......
......@@ -51,7 +51,7 @@ public class SimpleRecoveryITCase {
Configuration config = new Configuration();
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2);
config.setString(ConfigConstants.DEFAULT_EXECUTION_RETRY_DELAY_KEY, "100 ms");
config.setString(ConfigConstants.EXECUTION_RETRY_DELAY_KEY, "100 ms");
cluster = new ForkableFlinkMiniCluster(config, false);
......
......@@ -148,7 +148,7 @@ public class ZooKeeperLeaderElectionITCase extends TestLogger {
// we "effectively" disable the automatic RecoverAllJobs message and sent it manually to make
// sure that all TMs have registered to the JM prior to issueing the RecoverAllJobs message
configuration.setString(ConfigConstants.DEFAULT_EXECUTION_RETRY_DELAY_KEY, AkkaUtils.INF_TIMEOUT().toString());
configuration.setString(ConfigConstants.EXECUTION_RETRY_DELAY_KEY, AkkaUtils.INF_TIMEOUT().toString());
Tasks.BlockingOnceReceiver$.MODULE$.blocking_$eq(true);
......
......@@ -18,7 +18,7 @@
# Set root logger level to OFF to not flood build logs
# set manually to INFO for debugging purposes
log4j.rootLogger=INFO, testlogger
log4j.rootLogger=OFF, testlogger
# A1 is set to be a ConsoleAppender.
log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
......
......@@ -97,11 +97,11 @@ public class FlinkYarnCluster extends AbstractFlinkYarnCluster {
/**
* Create a new Flink on YARN cluster.
*
* @param yarnClient
* @param yarnClient Client to talk to YARN
* @param appId the YARN application ID
* @param hadoopConfig
* @param flinkConfig
* @param sessionFilesDir
* @param hadoopConfig Hadoop configuration
* @param flinkConfig Flink configuration
* @param sessionFilesDir Location of files required for YARN session
* @param detached Set to true if no actor system or RPC communication with the cluster should be established
* @throws IOException
* @throws YarnException
......@@ -149,7 +149,7 @@ public class FlinkYarnCluster extends AbstractFlinkYarnCluster {
// try to find address for 2 seconds. log after 400 ms.
InetAddress ownHostname = ConnectionUtils.findConnectingAddress(jobManagerAddress, 2000, 400);
actorSystem = AkkaUtils.createActorSystem(flinkConfig,
new Some(new Tuple2<String, Integer>(ownHostname.getCanonicalHostName(), 0)));
new Some<Tuple2<String, Object>>(new Tuple2<String, Object>(ownHostname.getCanonicalHostName(), 0)));
// Create the leader election service
flinkConfig.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, this.jobManagerAddress.getHostName());
......@@ -373,7 +373,7 @@ public class FlinkYarnCluster extends AbstractFlinkYarnCluster {
// get messages from ApplicationClient (locally)
while(true) {
Object result = null;
Object result;
try {
Future<Object> response = Patterns.ask(applicationClient,
YarnMessages.getLocalGetYarnMessage(), new Timeout(akkaDuration));
......
......@@ -60,7 +60,6 @@ class ApplicationClient(
var yarnJobManager: Option[ActorRef] = None
var pollingTimer: Option[Cancellable] = None
implicit val timeout: FiniteDuration = AkkaUtils.getTimeout(flinkConfig)
var running = false
var messagesQueue : mutable.Queue[YarnMessage] = mutable.Queue[YarnMessage]()
var latestClusterStatus : Option[FlinkYarnClusterStatus] = None
......
......@@ -20,7 +20,7 @@ package org.apache.flink.yarn
import org.apache.flink.runtime.jobmanager.{MemoryArchivist, JobManager}
/** Default implemenation of the [[ApplicationMasterBase]] which starts a [[YarnJobManager]] and a
/** Default implementation of the [[ApplicationMasterBase]] which starts a [[YarnJobManager]] and a
* [[MemoryArchivist]].
*/
class ApplicationMaster extends ApplicationMasterBase {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册