提交 11b021b0 编写于 作者: R Robert Metzger

[FLINK-2079] Add TaskManager deathwatch thread for YARN case

上级 b2b0fe79
......@@ -406,6 +406,11 @@ public final class ConfigConstants {
* Timeout for all blocking calls that look up remote actors
*/
public static final String AKKA_LOOKUP_TIMEOUT = "akka.lookup.timeout";
/**
* Exit JVM on fatal Akka errors
*/
public static final String AKKA_JVM_EXIT_ON_FATAL_ERROR = "akka.jvm-exit-on-fatal-error";
// ----------------------------- Streaming --------------------------------
......
......@@ -56,7 +56,7 @@ public class ProcessReaper extends UntypedActor {
Thread.sleep(100);
}
catch (InterruptedException e) {
// not really problem if we don't sleep...
// not really a problem if we don't sleep...
}
}
}
......
......@@ -139,6 +139,13 @@ object AkkaUtils {
val lifecycleEvents = configuration.getBoolean(ConfigConstants.AKKA_LOG_LIFECYCLE_EVENTS,
ConfigConstants.DEFAULT_AKKA_LOG_LIFECYCLE_EVENTS)
val jvmExitOnFatalError = if (
configuration.getBoolean(ConfigConstants.AKKA_JVM_EXIT_ON_FATAL_ERROR, false)){
"on"
} else {
"off"
}
val logLifecycleEvents = if (lifecycleEvents) "on" else "off"
val logLevel = getLogLevel
......@@ -152,7 +159,7 @@ object AkkaUtils {
| logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
| log-config-on-start = off
|
| jvm-exit-on-fatal-error = off
| jvm-exit-on-fatal-error = $jvmExitOnFatalError
|
| serialize-messages = off
|
......
......@@ -158,7 +158,6 @@ extends Actor with ActorLogMessages with ActorSynchronousLogging {
private var heartbeatScheduler: Option[Cancellable] = None
// --------------------------------------------------------------------------
// Actor messages and life cycle
// --------------------------------------------------------------------------
......@@ -192,7 +191,7 @@ extends Actor with ActorLogMessages with ActorSynchronousLogging {
*/
override def postStop(): Unit = {
log.info(s"Stopping TaskManager ${self.path.toSerializationFormat}.")
cancelAndClearEverything(new Exception("TaskManager is shutting down."))
if (isConnected) {
......@@ -1289,7 +1288,7 @@ object TaskManager {
streamingMode,
taskManagerClass)
// start a process reaper that watches the JobManager. If the JobManager actor dies,
// start a process reaper that watches the JobManager. If the TaskManager actor dies,
// the process reaper will kill the JVM process (to ensure easy failure detection)
LOG.debug("Starting TaskManager process reaper")
taskManagerSystem.actorOf(
......
......@@ -85,6 +85,9 @@ public class YarnTaskManagerRunner {
LOG.info("YARN daemon runs as '" + UserGroupInformation.getCurrentUser().getShortUserName()
+"' setting user to execute Flink TaskManager to '"+yarnClientUsername+"'");
// tell akka to die in case of an error
configuration.setBoolean(ConfigConstants.AKKA_JVM_EXIT_ON_FATAL_ERROR, true);
UserGroupInformation ugi = UserGroupInformation.createRemoteUser(yarnClientUsername);
for (Token<? extends TokenIdentifier> toks : UserGroupInformation.getCurrentUser().getTokens()) {
ugi.addToken(toks);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册