提交 730e056a 编写于 作者: T Till Rohrmann

[FLINK-1352] [runtime] Fix buggy registration of TaskManager to JobManager by...

[FLINK-1352] [runtime] Fix buggy registration of TaskManager to JobManager by introducing dedicated RefusedRegistration messages

Adds exponential backoff strategy for TaskManager registration. Introduces AlreadyRegistered and RefuseRegistration messages.

This closes #328.
上级 a5150a90
......@@ -169,6 +169,11 @@ public final class ConfigConstants {
*/
public static final String TASK_MANAGER_DEBUG_MEMORY_USAGE_LOG_INTERVAL_MS = "taskmanager.debug.memory.logIntervalMs";
/**
*
*/
public static final String TASK_MANAGER_MAX_REGISTRATION_DURATION = "taskmanager.maxRegistrationDuration";
/**
* Parameter for the maximum fan for out-of-core algorithms.
* Corresponds to the maximum fan-in for merge-sorts and the maximum fan-out
......@@ -488,6 +493,11 @@ public final class ConfigConstants {
* The default number of task slots per task manager
*/
public static final int DEFAULT_TASK_MANAGER_NUM_TASK_SLOTS = -1;
/**
* The default task manager's maximum registration duration
*/
public static final String DEFAULT_TASK_MANAGER_MAX_REGISTRATION_DURATION = "Inf";
/**
* The default value for the JobClient's polling interval. 2 Seconds.
......
......@@ -226,6 +226,10 @@ public class InstanceManager {
return new HashSet<Instance>(registeredHostsById.values());
}
}
public Instance getRegisteredInstance(ActorRef ref) {
return registeredHostsByConnection.get(ref);
}
// --------------------------------------------------------------------------------------------
......
......@@ -122,10 +122,16 @@ class JobManager(val configuration: Configuration)
val instanceID = instanceManager.registerTaskManager(taskManager, connectionInfo,
hardwareInformation, numberOfSlots)
// to be notified when the taskManager is no longer reachable
context.watch(taskManager)
taskManager ! AcknowledgeRegistration(instanceID, libraryCacheManager.getBlobServerPort)
// TaskManager is already registered
if(instanceID == null){
val instanceID = instanceManager.getRegisteredInstance(taskManager).getId
taskManager ! AlreadyRegistered(instanceID, libraryCacheManager.getBlobServerPort)
} else {
// to be notified when the taskManager is no longer reachable
context.watch(taskManager)
taskManager ! AcknowledgeRegistration(instanceID, libraryCacheManager.getBlobServerPort)
}
}
case RequestNumberRegisteredTaskManager => {
......
......@@ -43,4 +43,20 @@ object RegistrationMessages {
*/
case class AcknowledgeRegistration(instanceID: InstanceID, blobPort: Int)
/**
* Denotes that the TaskManager has already been registered at the JobManager.
*
* @param instanceID
* @param blobPort
*/
case class AlreadyRegistered(instanceID: InstanceID, blobPort: Int)
/**
* Denotes the unsuccessful registration of a task manager at the job manager. This is the
* response triggered by the [[RegisterTaskManager]] message.
*
* @param reason Reason why the task manager registration was refused
*/
case class RefuseRegistration(reason: String)
}
......@@ -46,7 +46,8 @@ import org.apache.flink.runtime.jobgraph.{IntermediateDataSetID, JobID}
import org.apache.flink.runtime.jobmanager.JobManager
import org.apache.flink.runtime.memorymanager.DefaultMemoryManager
import org.apache.flink.runtime.messages.JobManagerMessages.UpdateTaskExecutionState
import org.apache.flink.runtime.messages.RegistrationMessages.{AcknowledgeRegistration, RegisterTaskManager}
import org.apache.flink.runtime.messages.RegistrationMessages.{AlreadyRegistered,
RefuseRegistration, AcknowledgeRegistration, RegisterTaskManager}
import org.apache.flink.runtime.messages.TaskManagerMessages._
import org.apache.flink.runtime.messages.TaskManagerProfilerMessages.{MonitorTask, RegisterProfilingListener, UnmonitorTask}
import org.apache.flink.runtime.net.NetUtils
......@@ -90,11 +91,11 @@ import scala.collection.JavaConverters._
log.info("Creating {} task slot(s).", numberOfSlots)
log.info("TaskManager connection information {}.", connectionInfo)
val REGISTRATION_DELAY = 0 seconds
val REGISTRATION_INTERVAL = 10 seconds
val MAX_REGISTRATION_ATTEMPTS = 10
val HEARTBEAT_INTERVAL = 5000 millisecond
var registrationDelay = 50 milliseconds
var registrationDuration = 0 seconds
TaskManager.checkTempDirs(tmpDirPaths)
val ioManager = new IOManagerAsync(tmpDirPaths)
val memoryManager = new DefaultMemoryManager(memorySize, numberOfSlots, pageSize)
......@@ -121,7 +122,6 @@ import scala.collection.JavaConverters._
var libraryCacheManager: LibraryCacheManager = null
var networkEnvironment: Option[NetworkEnvironment] = None
var registrationScheduler: Option[Cancellable] = None
var registrationAttempts: Int = 0
var registered: Boolean = false
var currentJobManager = ActorRef.noSender
......@@ -175,62 +175,79 @@ import scala.collection.JavaConverters._
}
private def tryJobManagerRegistration(): Unit = {
registrationAttempts = 0
import context.dispatcher
registrationScheduler = Some(context.system.scheduler.schedule(
TaskManager.REGISTRATION_DELAY, TaskManager.REGISTRATION_INTERVAL,
self, RegisterAtJobManager))
registrationDuration = 0 seconds
registered = false
context.system.scheduler.scheduleOnce(registrationDelay, self, RegisterAtJobManager)
}
override def receiveWithLogMessages: Receive = {
case RegisterAtJobManager => {
registrationAttempts += 1
if(!registered) {
registrationDuration += registrationDelay
// double delay for exponential backoff
registrationDelay *= 2
if (registered) {
registrationScheduler.foreach(_.cancel())
}
else if (registrationAttempts <= TaskManager.MAX_REGISTRATION_ATTEMPTS) {
if (registrationDuration > maxRegistrationDuration) {
log.warning("TaskManager could not register at JobManager {} after {}.", jobManagerAkkaURL,
log.info("Try to register at master {}. Attempt #{}", jobManagerAkkaURL,
registrationAttempts)
val jobManager = context.actorSelection(jobManagerAkkaURL)
maxRegistrationDuration)
jobManager ! RegisterTaskManager(connectionInfo, hardwareDescription, numberOfSlots)
}
else {
log.error("TaskManager could not register at JobManager.");
self ! PoisonPill
self ! PoisonPill
} else if (!registered) {
log.info(s"Try to register at master ${jobManagerAkkaURL}. ${registrationAttempts}. " +
s"Attempt")
val jobManager = context.actorSelection(jobManagerAkkaURL)
jobManager ! RegisterTaskManager(connectionInfo, hardwareDescription, numberOfSlots)
context.system.scheduler.scheduleOnce(registrationDelay, self, RegisterAtJobManager)
}
}
}
case AcknowledgeRegistration(id, blobPort) => {
if (!registered) {
if(!registered) {
finishRegistration(id, blobPort)
registered = true
currentJobManager = sender
instanceID = id
context.watch(currentJobManager)
log.info("TaskManager successfully registered at JobManager {}.",
currentJobManager.path.toString)
setupNetworkEnvironment()
setupLibraryCacheManager(blobPort)
} else {
if (log.isDebugEnabled) {
log.debug("The TaskManager {} is already registered at the JobManager {}, but received " +
"another AcknowledgeRegistration message.", self.path, currentJobManager.path)
}
}
}
heartbeatScheduler = Some(context.system.scheduler.schedule(
TaskManager.HEARTBEAT_INTERVAL, TaskManager.HEARTBEAT_INTERVAL, self, SendHeartbeat))
case AlreadyRegistered(id, blobPort) =>
if(!registered) {
log.warning("The TaskManager {} seems to be already registered at the JobManager {} even" +
"though it has not yet finished the registration process.", self.path, sender.path)
profiler foreach {
_.tell(RegisterProfilingListener, JobManager.getProfiler(currentJobManager))
finishRegistration(id, blobPort)
registered = true
} else {
// ignore AlreadyRegistered messages which arrived after AcknowledgeRegistration
if(log.isDebugEnabled){
log.debug("The TaskManager {} has already been registered at the JobManager {}.",
self.path, sender.path)
}
}
for (listener <- waitForRegistration) {
listener ! RegisteredAtJobManager
}
case RefuseRegistration(reason) =>
if(!registered) {
log.error("The registration of task manager {} was refused by the job manager {} " +
"because {}.", self.path, jobManagerAkkaURL, reason)
waitForRegistration.clear()
// Shut task manager down
self ! PoisonPill
} else {
// ignore RefuseRegistration messages which arrived after AcknowledgeRegistration
if(log.isDebugEnabled) {
log.debug("Received RefuseRegistration from the JobManager even though being already " +
"registered")
}
}
}
case SubmitTask(tdd) => {
submitTask(tdd)
......@@ -454,7 +471,34 @@ import scala.collection.JavaConverters._
}
}
def setupNetworkEnvironment(): Unit = {
private def finishRegistration(id: InstanceID, blobPort: Int): Unit = {
currentJobManager = sender
instanceID = id
context.watch(currentJobManager)
log.info(s"TaskManager successfully registered at JobManager ${
currentJobManager.path.toString
}.")
setupNetworkEnvironment()
setupLibraryCacheManager(blobPort)
heartbeatScheduler = Some(context.system.scheduler.schedule(
TaskManager.HEARTBEAT_INTERVAL, TaskManager.HEARTBEAT_INTERVAL, self, SendHeartbeat))
profiler foreach {
_.tell(RegisterProfilingListener, JobManager.getProfiler(currentJobManager))
}
for (listener <- waitForRegistration) {
listener ! RegisteredAtJobManager
}
waitForRegistration.clear()
}
private def setupNetworkEnvironment(): Unit = {
//shutdown existing network environment
networkEnvironment foreach {
ne =>
......@@ -730,8 +774,13 @@ object TaskManager {
val timeout = FiniteDuration(configuration.getInteger(ConfigConstants.AKKA_ASK_TIMEOUT,
ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT), TimeUnit.SECONDS)
val maxRegistrationDuration = Duration(configuration.getString(
ConfigConstants.TASK_MANAGER_MAX_REGISTRATION_DURATION,
ConfigConstants.DEFAULT_TASK_MANAGER_MAX_REGISTRATION_DURATION))
val taskManagerConfig = TaskManagerConfiguration(numberOfSlots, memorySize, pageSize,
tmpDirs, cleanupInterval, memoryLoggingIntervalMs, profilingInterval, timeout)
tmpDirs, cleanupInterval, memoryLoggingIntervalMs, profilingInterval, timeout,
maxRegistrationDuration)
(connectionInfo, jobManagerURL, taskManagerConfig, networkConfig)
}
......
......@@ -18,10 +18,10 @@
package org.apache.flink.runtime.taskmanager
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.duration.{Duration, FiniteDuration}
case class TaskManagerConfiguration(numberOfSlots: Int, memorySize: Long, pageSize: Int,
tmpDirPaths: Array[String], cleanupInterval: Long,
memoryLogggingIntervalMs: Option[Long],
profilingInterval: Option[Long],
timeout: FiniteDuration)
timeout: FiniteDuration, maxRegistrationDuration: Duration)
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.jobmanager
import java.net.InetAddress
import akka.actor._
import akka.testkit.{TestKit, ImplicitSender}
import org.apache.flink.runtime.instance.{InstanceID, HardwareDescription, InstanceConnectionInfo}
import org.apache.flink.runtime.messages.RegistrationMessages.{AlreadyRegistered,
RefuseRegistration, AcknowledgeRegistration, RegisterTaskManager}
import org.apache.flink.runtime.messages.TaskManagerMessages.Heartbeat
import org.apache.flink.runtime.testingUtils.TestingUtils
import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}
class TaskManagerRegistrationITCase(_system: ActorSystem) extends TestKit(_system) with
ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll {
def this() = this(ActorSystem("TestingActorSystem", TestingUtils.testConfig))
override def afterAll(): Unit = {
TestKit.shutdownActorSystem(system)
}
"The JobManager" should {
"notify already registered TaskManagers" in {
val jm = TestingUtils.startTestingJobManager
val connectionInfo = new InstanceConnectionInfo(InetAddress.getLocalHost,1)
val hardwareDescription = HardwareDescription.extractFromSystem(10)
try {
within(TestingUtils.TESTING_DURATION) {
jm ! RegisterTaskManager(connectionInfo, hardwareDescription, 1)
jm ! RegisterTaskManager(connectionInfo, hardwareDescription, 1)
expectMsgType[AcknowledgeRegistration]
expectMsgType[AlreadyRegistered]
}
} finally {
jm ! Kill
}
}
}
"The TaskManager" should {
"shutdown if its registration is refused by the JobManager" in {
val tm = TestingUtils.startTestingTaskManager(self)
watch(tm)
try{
within(TestingUtils.TESTING_DURATION) {
expectMsgType[RegisterTaskManager]
tm ! RefuseRegistration("Testing connection refusal")
expectTerminated(tm)
}
}
}
"ignore RefuseRegistration messages after it has been successfully registered" in {
val tm = TestingUtils.startTestingTaskManager(self)
try {
within(TestingUtils.TESTING_DURATION) {
expectMsgType[RegisterTaskManager]
tm ! AcknowledgeRegistration(new InstanceID(), 42)
tm ! RefuseRegistration("Should be ignored")
// Check if the TaskManager is still alive
tm ! Identify
expectMsgPF() {
// wait for actor identity
case x: ActorIdentity => true
// ignore heartbeats
case h: Heartbeat => false
}
}
} finally {
tm ! Kill
}
}
}
}
......@@ -18,7 +18,7 @@
package org.apache.flink.runtime.testingUtils
import akka.actor.{Props, ActorSystem}
import akka.actor.{ActorRef, Props, ActorSystem}
import akka.testkit.CallingThreadDispatcher
import com.typesafe.config.ConfigFactory
import org.apache.flink.configuration.{ConfigConstants, Configuration}
......@@ -26,6 +26,7 @@ import org.apache.flink.core.io.IOReadableWritable
import org.apache.flink.runtime.akka.AkkaUtils
import org.apache.flink.runtime.akka.serialization.IOReadableWritableSerializer
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.ActionQueue
import org.apache.flink.runtime.jobmanager.JobManager
import org.apache.flink.runtime.minicluster.FlinkMiniCluster
import org.apache.flink.runtime.taskmanager.TaskManager
import scala.concurrent.duration._
......@@ -98,6 +99,23 @@ object TestingUtils {
networkConnectionConfig) with TestingTaskManager))
}
def startTestingJobManager(implicit system: ActorSystem): ActorRef = {
val config = new Configuration()
system.actorOf(Props(new JobManager(config) with TestingJobManager))
}
def startTestingTaskManager(jobManager: ActorRef)(implicit system: ActorSystem): ActorRef = {
val jmURL = jobManager.path.toString
val config = new Configuration()
config.setString(ConfigConstants.JOB_MANAGER_AKKA_URL, jmURL)
val (connectionInfo, jobManagerURL, taskManagerConfig, networkConnectionConfig) =
TaskManager.parseConfiguration("LOCALHOST", config)
system.actorOf(Props(new TaskManager(connectionInfo, jobManagerURL, taskManagerConfig,
networkConnectionConfig) with TestingTaskManager))
}
def startTestingCluster(numSlots: Int, numTMs: Int = 1, timeout: Int = DEFAULT_AKKA_ASK_TIMEOUT):
FlinkMiniCluster = {
val config = new Configuration()
......
......@@ -39,6 +39,7 @@ object ApplicationMaster {
val CONF_FILE = "flink-conf.yaml"
val MODIFIED_CONF_FILE = "flink-conf-modified.yaml"
val MAX_REGISTRATION_DURATION = "5 minutes"
def main(args: Array[String]): Unit ={
val yarnClientUsername = System.getenv(FlinkYarnClient.ENV_CLIENT_USERNAME)
......@@ -148,6 +149,9 @@ object ApplicationMaster {
s"${ConfigConstants.DEFAULT_PARALLELIZATION_DEGREE_KEY}: ${slots*taskManagerCount}")
}
output.println(s"${ConfigConstants.TASK_MANAGER_MAX_REGISTRATION_DURATION}: " +
s"$MAX_REGISTRATION_DURATION")
// add dynamic properties
val dynamicProperties = CliFrontend.getDynamicProperties(dynamicPropertiesEncodedString)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册