提交 a34869c0 编写于 作者: P Paris Carbone 提交者: Stephan Ewen

[FLINK-1638] [streaming] Fault tolerance logic in JM and TM

上级 5061edb8
......@@ -41,7 +41,7 @@ import org.apache.flink.runtime.ActorLogMessages
import org.apache.flink.runtime.akka.AkkaUtils
import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
import org.apache.flink.runtime.instance.InstanceManager
import org.apache.flink.runtime.jobgraph.{JobGraph, JobStatus, JobID}
import org.apache.flink.runtime.jobgraph.{ScheduleMode,JobGraph,JobStatus,JobID}
import org.apache.flink.runtime.jobmanager.accumulators.AccumulatorManager
import org.apache.flink.runtime.jobmanager.scheduler.{Scheduler => FlinkScheduler}
import org.apache.flink.runtime.messages.JobManagerMessages._
......@@ -97,12 +97,14 @@ class JobManager(val configuration: Configuration,
val delayBetweenRetries: Long,
val timeout: FiniteDuration)
extends Actor with ActorLogMessages with ActorLogging {
/** Reference to the log, for debugging */
protected val LOG = JobManager.LOG
val LOG = JobManager.LOG
/** List of current jobs running jobs */
protected val currentJobs = scala.collection.mutable.HashMap[JobID, (ExecutionGraph, JobInfo)]()
val currentJobs = scala.collection.mutable.HashMap[JobID, (ExecutionGraph, JobInfo)]()
val barrierMonitors = scala.collection.mutable.HashMap[JobID, ActorRef]()
/**
* Run when the job manager is started. Simply logs an informational message.
......@@ -299,13 +301,38 @@ class JobManager(val configuration: Configuration,
jobInfo.client ! Failure(exception)
throw exception
}
barrierMonitors.get(jobID) match {
case Some(monitor) =>
monitor ! PoisonPill
barrierMonitors.remove(jobID)
case None =>
}
removeJob(jobID)
}
else {
newJobStatus match {
case JobStatus.RUNNING => currentJobs.get(jobID) match {
case Some((executionGraph, _)) =>
//FIXME this is just a fast n dirty check for determining streaming jobs
if (executionGraph.getScheduleMode == ScheduleMode.ALL) {
barrierMonitors += jobID -> StreamStateMonitor.props(context, executionGraph)
}
case None =>
log.error("Cannot create state monitor for job ID {}.", jobID)
new IllegalStateException("Cannot find execution graph for job ID " + jobID)
}
}
}
case None =>
removeJob(jobID)
}
case BarrierAck(jobID, jobVertex, instanceID, checkpoint) =>
barrierMonitors.get(jobID) match {
case Some(monitor) => monitor ! BarrierAck(jobID, jobVertex, instanceID, checkpoint)
case None =>
}
case ScheduleOrUpdateConsumers(jobId, executionId, partitionIndex) =>
currentJobs.get(jobId) match {
case Some((executionGraph, _)) =>
......
......@@ -43,7 +43,8 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync
import org.apache.flink.runtime.io.network.NetworkEnvironment
import org.apache.flink.runtime.io.network.netty.NettyConfig
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID
import org.apache.flink.runtime.jobmanager.JobManager
import org.apache.flink.runtime.jobgraph.tasks.BarrierTransceiver
import org.apache.flink.runtime.jobmanager.{BarrierReq,JobManager}
import org.apache.flink.runtime.memorymanager.DefaultMemoryManager
import org.apache.flink.runtime.messages.JobManagerMessages.UpdateTaskExecutionState
import org.apache.flink.runtime.messages.Messages.{Disconnect, Acknowledge}
......@@ -349,6 +350,20 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo,
networkEnvironment foreach {
_.getPartitionManager.failIntermediateResultPartitions(executionID)
}
case BarrierReq(attemptID, checkpointID) =>
log.debug("[FT-TaskManager] Barrier request received for attempt {}", attemptID)
runningTasks.get(attemptID) match {
case Some(i) =>
if (i.getExecutionState == ExecutionState.RUNNING) {
i.getEnvironment.getInvokable match {
case barrierTransceiver: BarrierTransceiver =>
barrierTransceiver.broadcastBarrier(checkpointID)
case _ => log.error("[FT-TaskManager] Received a barrier for the wrong vertex")
}
}
case None => log.error("[FT-TaskManager] Received a barrier for an unknown vertex")
}
}
/**
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册