提交 2a49eaaf 编写于 作者: M Maximilian Michels

[FLINK-3300] fix concurrency bug in YarnJobManager

Adds message passing between Hadoop's async resource manager client and
the YarnJobManager actor.

This closes #1561.
上级 086acf68
......@@ -180,6 +180,7 @@ class YarnJobManager(
_ ! decorateMessage(JobManagerStopped)
// Shutdown and discard all queued messages
case RegisterApplicationClient =>
......@@ -220,6 +221,85 @@ class YarnJobManager(
case StartYarnSession(hadoopConfig, webServerPort) =>
startYarnSession(hadoopConfig, webServerPort)
case YarnContainersAllocated(containers: JavaList[Container]) =>
val newlyAllocatedContainers = containers.asScala
newlyAllocatedContainers.foreach {
container => log.info(s"Got new container for allocation: ${container.getId}")
allocatedContainersList ++= containers.asScala
numPendingRequests = math.max(0, numPendingRequests - newlyAllocatedContainers.length)
if (runningContainers >= numTaskManagers && allocatedContainersList.nonEmpty) {
log.info(s"Flink has ${allocatedContainersList.size} allocated containers which " +
s"are not needed right now. Returning them")
for (container <- allocatedContainersList) {
rmClientOption match {
case Some(client) => client.releaseAssignedContainer(container.getId)
case None =>
allocatedContainersList = List()
case YarnContainersCompleted(statuses: JavaList[ContainerStatus]) =>
val completedContainerStatuses = statuses.asScala
val idStatusMap = completedContainerStatuses
.map(status => (status.getContainerId, status)).toMap
completedContainerStatuses.foreach {
status => log.info(s"Container ${status.getContainerId} is completed " +
s"with diagnostics: ${status.getDiagnostics}")
// get failed containers (returned containers are also completed, so we have to
// distinguish if it was running before).
val (completedContainers, remainingRunningContainers) = runningContainersList
.partition(idStatusMap contains _.getId)
completedContainers.foreach {
container =>
val status = idStatusMap(container.getId)
failedContainers += 1
runningContainers -= 1
log.info(s"Container ${status.getContainerId} was a running container. " +
s"Total failed containers $failedContainers.")
val detail = status.getExitStatus match {
case -103 => "Vmem limit exceeded";
case -104 => "Pmem limit exceeded";
case _ => ""
messageListener foreach {
_ ! decorateMessage(
YarnMessage(s"Diagnostics for containerID=${status.getContainerId} in " +
s"state=${status.getState}.\n${status.getDiagnostics} $detail")
runningContainersList = remainingRunningContainers
// maxFailedContainers == -1 is infinite number of retries.
if (maxFailedContainers != -1 && failedContainers >= maxFailedContainers) {
val msg = s"Stopping YARN session because the number of failed " +
s"containers ($failedContainers) exceeded the maximum failed container " +
s"count ($maxFailedContainers). This number is controlled by " +
s"the '${ConfigConstants.YARN_MAX_FAILED_CONTAINERS}' configuration " +
s"setting. By default its the number of requested containers"
self ! decorateMessage(StopYarnSession(FinalApplicationStatus.FAILED, msg))
case jnf: JobNotFound =>
log.warn(s"Job with ID ${jnf.jobID} not found in JobManager")
if (stopWhenJobFinished == null) {
......@@ -315,9 +395,6 @@ class YarnJobManager(
// inject client into handler to adjust the heartbeat interval and make requests
......@@ -531,16 +608,135 @@ class YarnJobManager(
* Heartbeats with the resource manager and handles container updates.
* Allocates new containers if necessary.
object AMRMClientAsyncHandler extends AMRMClientAsync.CallbackHandler {
private def allocateContainers() : Unit = {
// check if we want to start some of our allocated containers.
if (runningContainers < numTaskManagers) {
val missingContainers = numTaskManagers - runningContainers
log.info(s"The user requested $numTaskManagers containers, $runningContainers " +
s"running. $missingContainers containers missing")
val numStartedContainers = startTMsInAllocatedContainers(missingContainers)
// if there are still containers missing, request them from YARN
val toAllocateFromYarn = Math.max(
missingContainers - numStartedContainers - numPendingRequests,
if (toAllocateFromYarn > 0) {
val reallocate = flinkConfiguration
.getBoolean(ConfigConstants.YARN_REALLOCATE_FAILED_CONTAINERS, true)
log.info(s"There are $missingContainers containers missing." +
s" $numPendingRequests are already requested. " +
s"Requesting $toAllocateFromYarn additional container(s) from YARN. " +
s"Reallocation of failed containers is enabled=$reallocate " +
// there are still containers missing. Request them from YARN
if (reallocate) {
for (i <- 1 to toAllocateFromYarn) {
val containerRequest = getContainerRequest(memoryPerTaskManager)
rmClientOption match {
case Some(client) => client.addContainerRequest(containerRequest)
case None =>
numPendingRequests += 1
log.info("Requested additional container from YARN. Pending requests " +
/** Starts min(numTMsToStart, allocatedContainersList.size) TaskManager in the available
* allocated containers. The number of successfully started TaskManagers is returned.
* @param numTMsToStart Number of TaskManagers to start if enough allocated containers are
* available. If not, then all allocated containers are used
* @return Number of successfully started TaskManagers
private def startTMsInAllocatedContainers(numTMsToStart: Int): Int = {
// not enough containers running
if (allocatedContainersList.nonEmpty) {
log.info(s"${allocatedContainersList.size} containers already allocated by YARN. " +
nmClientOption match {
case Some(nmClient) =>
containerLaunchContext match {
case Some(ctx) =>
val (containersToBeStarted, remainingContainers) = allocatedContainersList
val startedContainers = containersToBeStarted.flatMap {
container =>
try {
nmClient.startContainer(container, ctx)
val message = s"Launching container (${container.getId} on host " +
messageListener foreach {
_ ! decorateMessage(YarnMessage(message))
} catch {
case e: YarnException =>
log.error(s"Exception while starting YARN " +
s"container ${container.getId} on " +
s"host ${container.getNodeId.getHost}", e)
runningContainers += startedContainers.length
runningContainersList :::= startedContainers
allocatedContainersList = remainingContainers
val heartbeatInterval =
if (runningContainers < numTaskManagers) {
} else {
* Asynchronous client to make requests to the RM.
* Must be set via setClient(..) before its service is started.
private var client : AMRMClientAsync[ContainerRequest] = null
rmClientOption match {
case Some(client) => client.setHeartbeatInterval(heartbeatInterval.toMillis.toInt)
case None =>
case None =>
log.error("The ContainerLaunchContext was not set.")
self ! decorateMessage(
"Fatal error in AM: The ContainerLaunchContext was not set."))
case None =>
log.error("The NMClient was not set.")
self ! decorateMessage(
"Fatal error in AM: The NMClient was not set."))
} else {
* Heartbeats with the resource manager and informs of container updates.
object AMRMClientAsyncHandler extends AMRMClientAsync.CallbackHandler {
override def onError(e: Throwable): Unit = {
self ! decorateMessage(
......@@ -563,213 +759,16 @@ class YarnJobManager(
override def onContainersCompleted(statuses: JavaList[ContainerStatus]): Unit = {
val completedContainerStatuses = statuses.asScala
val idStatusMap = completedContainerStatuses
.map(status => (status.getContainerId, status)).toMap
completedContainerStatuses.foreach {
status => log.info(s"Container ${status.getContainerId} is completed " +
s"with diagnostics: ${status.getDiagnostics}")
// get failed containers (returned containers are also completed, so we have to
// distinguish if it was running before).
val (completedContainers, remainingRunningContainers) = runningContainersList
.partition(idStatusMap contains _.getId)
completedContainers.foreach {
container =>
val status = idStatusMap(container.getId)
failedContainers += 1
runningContainers -= 1
log.info(s"Container ${status.getContainerId} was a running container. " +
s"Total failed containers $failedContainers.")
val detail = status.getExitStatus match {
case -103 => "Vmem limit exceeded";
case -104 => "Pmem limit exceeded";
case _ => ""
messageListener foreach {
_ ! decorateMessage(
YarnMessage(s"Diagnostics for containerID=${status.getContainerId} in " +
s"state=${status.getState}.\n${status.getDiagnostics} $detail")
runningContainersList = remainingRunningContainers
// maxFailedContainers == -1 is infinite number of retries.
if (maxFailedContainers != -1 && failedContainers >= maxFailedContainers) {
val msg = s"Stopping YARN session because the number of failed " +
s"containers ($failedContainers) exceeded the maximum failed container " +
s"count ($maxFailedContainers). This number is controlled by " +
s"the '${ConfigConstants.YARN_MAX_FAILED_CONTAINERS}' configuration " +
s"setting. By default its the number of requested containers"
self ! decorateMessage(StopYarnSession(FinalApplicationStatus.FAILED, msg))
self ! decorateMessage(
override def onContainersAllocated(containers: JavaList[Container]): Unit = {
val newlyAllocatedContainers = containers.asScala
newlyAllocatedContainers.foreach {
container => log.info(s"Got new container for allocation: ${container.getId}")
allocatedContainersList ++= containers.asScala
numPendingRequests = math.max(0, numPendingRequests - newlyAllocatedContainers.length)
if (runningContainers >= numTaskManagers && allocatedContainersList.nonEmpty) {
log.info(s"Flink has ${allocatedContainersList.size} allocated containers which " +
s"are not needed right now. Returning them")
for (container <- allocatedContainersList) {
allocatedContainersList = List()
* Allocates new containers if necessary.
private def allocateContainers() : Unit = {
// check if we want to start some of our allocated containers.
if (runningContainers < numTaskManagers) {
val missingContainers = numTaskManagers - runningContainers
log.info(s"The user requested $numTaskManagers containers, $runningContainers " +
s"running. $missingContainers containers missing")
val numStartedContainers = startTMsInAllocatedContainers(missingContainers)
// if there are still containers missing, request them from YARN
val toAllocateFromYarn = Math.max(
missingContainers - numStartedContainers - numPendingRequests,
if (toAllocateFromYarn > 0) {
val reallocate = flinkConfiguration
.getBoolean(ConfigConstants.YARN_REALLOCATE_FAILED_CONTAINERS, true)
log.info(s"There are $missingContainers containers missing." +
s" $numPendingRequests are already requested. " +
s"Requesting $toAllocateFromYarn additional container(s) from YARN. " +
s"Reallocation of failed containers is enabled=$reallocate " +
// there are still containers missing. Request them from YARN
if (reallocate) {
for (i <- 1 to toAllocateFromYarn) {
val containerRequest = getContainerRequest(memoryPerTaskManager)
numPendingRequests += 1
log.info("Requested additional container from YARN. Pending requests " +
/** Starts min(numTMsToStart, allocatedContainersList.size) TaskManager in the available
* allocated containers. The number of successfully started TaskManagers is returned.
* @param numTMsToStart Number of TaskManagers to start if enough allocated containers are
* available. If not, then all allocated containers are used
* @return Number of successfully started TaskManagers
private def startTMsInAllocatedContainers(numTMsToStart: Int): Int = {
// not enough containers running
if (allocatedContainersList.nonEmpty) {
log.info(s"${allocatedContainersList.size} containers already allocated by YARN. " +
nmClientOption match {
case Some(nmClient) =>
containerLaunchContext match {
case Some(ctx) =>
val (containersToBeStarted, remainingContainers) = allocatedContainersList
val startedContainers = containersToBeStarted.flatMap {
container =>
try {
nmClient.startContainer(container, ctx)
val message = s"Launching container (${container.getId} on host " +
messageListener foreach {
_ ! decorateMessage(YarnMessage(message))
} catch {
case e: YarnException =>
log.error(s"Exception while starting YARN " +
s"container ${container.getId} on " +
s"host ${container.getNodeId.getHost}", e)
runningContainers += startedContainers.length
runningContainersList :::= startedContainers
allocatedContainersList = remainingContainers
if (runningContainers < numTaskManagers) {
} else {
case None =>
log.error("The ContainerLaunchContext was not set.")
self ! decorateMessage(
"Fatal error in AM: The ContainerLaunchContext was not set."))
case None =>
log.error("The NMClient was not set.")
self ! decorateMessage(
"Fatal error in AM: The NMClient was not set."))
} else {
* Adjusts the heartbeat interval of the asynchronous client.
* @param interval The interval between the heartbeats.
private def setHeartbeatRate(interval : FiniteDuration): Unit = {
* Register the client with the CallbackHandler. Must be called before the client is started.
* @param clientAsync The AMRM client to make requests with.
def setClient(clientAsync: AMRMClientAsync[ContainerRequest]) = {
client = clientAsync
self ! decorateMessage(
......@@ -18,12 +18,12 @@
package org.apache.flink.yarn
import java.util.{UUID, Date}
import java.util.{List => JavaList, UUID, Date}
import org.apache.flink.api.common.JobID
import org.apache.flink.runtime.messages.RequiresLeaderSessionID
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus
import org.apache.hadoop.yarn.api.records.{ContainerStatus, Container, FinalApplicationStatus}
import scala.concurrent.duration.{Deadline, FiniteDuration}
......@@ -40,8 +40,25 @@ object YarnMessages {
case object JobManagerStopped
* Entry point to start a new YarnSession.
* @param config The configuration to start the YarnSession with.
* @param webServerPort The port of the web server to bind to.
case class StartYarnSession(config: Configuration, webServerPort: Int)
* Callback from the async resource manager client when containers were allocated.
* @param containers List of containers which were allocated.
case class YarnContainersAllocated(containers: JavaList[Container])
* Callback from the async resource manager client when containers were completed.
* @param statuses List of the completed containers' status.
case class YarnContainersCompleted(statuses: JavaList[ContainerStatus])
/** Triggers the registration of the ApplicationClient to the YarnJobManager
* @param jobManagerAkkaURL JobManager's Akka URL
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
想要评论请 注册