提交 031f988c 编写于 作者: T Till Rohrmann

Added scala docs to actor messages.

上级 f96bca0b
......@@ -156,7 +156,7 @@ public class CliFrontendListCancelTest {
@Override
public void onReceive(Object message) throws Exception {
if(message instanceof JobManagerMessages.RequestAvailableSlots$){
if(message instanceof JobManagerMessages.RequestTotalNumberOfSlots$){
getSender().tell(1, getSelf());
}else if(message instanceof JobManagerMessages.CancelJob){
JobManagerMessages.CancelJob cancelJob = (JobManagerMessages.CancelJob) message;
......
......@@ -41,7 +41,7 @@ import org.apache.flink.runtime.messages.JobManagerMessages.AccumulatorResultsRe
import org.apache.flink.runtime.messages.JobManagerMessages.AccumulatorResultsFound;
import org.apache.flink.runtime.messages.JobManagerMessages.RunningJobs;
import org.apache.flink.runtime.messages.JobManagerMessages.RequestRunningJobs$;
import org.apache.flink.runtime.messages.JobManagerMessages.RequestAvailableSlots$;
import org.apache.flink.runtime.messages.JobManagerMessages.RequestTotalNumberOfSlots$;
import org.apache.flink.runtime.messages.JobManagerMessages.RequestNumberRegisteredTaskManager$;
import org.apache.flink.runtime.messages.JobManagerMessages.CancellationResponse;
import org.apache.flink.runtime.messages.JobManagerMessages.CancelJob;
......@@ -128,7 +128,7 @@ public class JobmanagerInfoServlet extends HttpServlet {
int numberOfTaskManagrs = AkkaUtils.<Integer>ask(jobmanager,
RequestNumberRegisteredTaskManager$.MODULE$);
int numberOfRegisteredSltos = AkkaUtils.<Integer>ask(jobmanager,
RequestAvailableSlots$.MODULE$);
RequestTotalNumberOfSlots$.MODULE$);
resp.getWriter().write("{\"taskmanagers\": " + numberOfTaskManagrs +", " +
"\"slots\": "+numberOfRegisteredSltos+"}");
......
......@@ -135,7 +135,7 @@ abstract public class FlinkMiniCluster {
List<Future<Object>> responses = new ArrayList<Future<Object>>();
for(ActorRef taskManager: taskManagerActors){
Future<Object> response = Patterns.ask(taskManager, TaskManagerMessages.NotifyWhenRegisteredAtMaster$
Future<Object> response = Patterns.ask(taskManager, TaskManagerMessages.NotifyWhenRegisteredAtJobManager$
.MODULE$, AkkaUtils.FUTURE_TIMEOUT());
responses.add(response);
}
......
......@@ -44,13 +44,13 @@ class JobClient(jobManagerURL: String) extends Actor with ActorLogMessages with
val jobManager = AkkaUtils.getReference(jobManagerURL)
override def receiveWithLogMessages: Receive = {
case SubmitJobDetached(jobGraph, listen) =>
jobManager.tell(SubmitJob(jobGraph, listenToEvents = listen, detach = true), sender())
case SubmitJobDetached(jobGraph) =>
jobManager.tell(SubmitJob(jobGraph, registerForEvents = false, detach = true), sender())
case cancelJob: CancelJob =>
jobManager forward cancelJob
case SubmitJobAndWait(jobGraph, listen) =>
val listener = context.actorOf(Props(classOf[JobClientListener], sender()))
jobManager.tell(SubmitJob(jobGraph, listenToEvents = listen, detach = false), listener)
jobManager.tell(SubmitJob(jobGraph, registerForEvents = listen, detach = false), listener)
case RequestBlobManagerPort =>
jobManager forward RequestBlobManagerPort
}
......
......@@ -56,8 +56,10 @@ Actor with ActorLogMessages with ActorLogging with WrapAsScala {
val (archiveCount, profiling, cleanupInterval) = JobManager.parseConfiguration(configuration)
// Props for the profiler actor
def profilerProps: Props = Props(classOf[JobManagerProfiler])
// Props for the archive actor
def archiveProps: Props = Props(classOf[MemoryArchivist], archiveCount)
val profiler = profiling match {
......@@ -65,7 +67,6 @@ Actor with ActorLogMessages with ActorLogging with WrapAsScala {
case false => None
}
// will be removed
val archive = context.actorOf(archiveProps, JobManager.ARCHIVE_NAME)
val accumulatorManager = new AccumulatorManager(Math.min(1, archiveCount))
......@@ -73,7 +74,10 @@ Actor with ActorLogMessages with ActorLogging with WrapAsScala {
val scheduler = new FlinkScheduler()
val libraryCacheManager = new BlobLibraryCacheManager(new BlobServer(), cleanupInterval)
// List of current jobs running
val currentJobs = scala.collection.mutable.HashMap[JobID, (ExecutionGraph, JobInfo)]()
// Map of actors which want to be notified once a specific job terminates
val finalJobStatusListener = scala.collection.mutable.HashMap[JobID, Set[ActorRef]]()
instanceManager.addInstanceListener(scheduler)
......@@ -92,7 +96,10 @@ Actor with ActorLogMessages with ActorLogging with WrapAsScala {
val taskManager = sender()
val instanceID = instanceManager.registerTaskManager(taskManager, connectionInfo,
hardwareInformation, numberOfSlots)
// to be notified when the taskManager is no longer reachable
context.watch(taskManager);
taskManager ! AcknowledgeRegistration(instanceID, libraryCacheManager.getBlobServerPort)
}
......@@ -100,7 +107,7 @@ Actor with ActorLogMessages with ActorLogging with WrapAsScala {
sender() ! instanceManager.getNumberOfRegisteredTaskManagers
}
case RequestAvailableSlots => {
case RequestTotalNumberOfSlots => {
sender() ! instanceManager.getTotalNumberOfSlots
}
......@@ -151,13 +158,13 @@ Actor with ActorLogMessages with ActorLogging with WrapAsScala {
log.debug(s"Successfully created execution graph from job graph ${jobGraph.getJobID} " +
s"(${jobGraph.getName}).")
// should the job fail if a vertex cannot be deployed immediately (streams,
// closed iterations)
executionGraph.setQueuedSchedulingAllowed(jobGraph.getAllowQueuedScheduling)
// get notified about job status changes
executionGraph.registerJobStatusListener(self)
if(listenToEvents){
// the sender will be notified about state changes
executionGraph.registerExecutionListener(sender())
executionGraph.registerJobStatusListener(sender())
}
......@@ -178,7 +185,8 @@ Actor with ActorLogMessages with ActorLogging with WrapAsScala {
case Some((executionGraph, jobInfo)) =>
executionGraph.fail(t)
// don't send the client the final job status
// don't send the client the final job status because we already send him
// SubmissionFailure
jobInfo.detach = true
val status = Patterns.ask(self, RequestFinalJobStatus(jobGraph.getJobID), 10 second)
......@@ -213,14 +221,16 @@ Actor with ActorLogMessages with ActorLogging with WrapAsScala {
}
case UpdateTaskExecutionState(taskExecutionState) => {
Preconditions.checkNotNull(taskExecutionState)
currentJobs.get(taskExecutionState.getJobID) match {
case Some((executionGraph, _)) => sender() ! executionGraph.updateState(taskExecutionState)
case None => log.error(s"Cannot find execution graph for ID ${taskExecutionState
.getJobID} to change state to" +
s" ${taskExecutionState.getExecutionState}.")
sender() ! false
if(taskExecutionState == null){
sender() ! false
}else {
currentJobs.get(taskExecutionState.getJobID) match {
case Some((executionGraph, _)) => sender() ! executionGraph.updateState(taskExecutionState)
case None => log.error(s"Cannot find execution graph for ID ${taskExecutionState
.getJobID} to change state to ${taskExecutionState.getExecutionState}.")
sender() ! false
}
}
}
......
......@@ -21,10 +21,21 @@ package org.apache.flink.runtime.messages
import org.apache.flink.runtime.executiongraph.ExecutionGraph
import org.apache.flink.runtime.jobgraph.JobID
/**
* This object contains the archive specific messages.
*/
object ArchiveMessages {
case class ArchiveExecutionGraph(jobID: JobID, graph: ExecutionGraph)
/**
* Request the currently archived jobs in the archiver. The resulting response is [[ArchivedJobs]]
*/
case object RequestArchivedJobs
/**
* Response to [[RequestArchivedJobs]] message. The response contains the archived jobs.
* @param jobs
*/
case class ArchivedJobs(val jobs: Iterable[ExecutionGraph]){
def asJavaIterable: java.lang.Iterable[ExecutionGraph] = {
import scala.collection.JavaConverters._
......
......@@ -25,20 +25,44 @@ import org.apache.flink.runtime.execution.{ExecutionState}
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID
import org.apache.flink.runtime.jobgraph.{JobStatus, JobVertexID, JobID}
/**
* This object contains the execution graph specific messages.
*/
object ExecutionGraphMessages {
/**
* Denotes the execution state change of an [[org.apache.flink.runtime.executiongraph.ExecutionVertex]]
*
* @param jobID to which the vertex belongs
* @param vertexID of the ExecutionJobVertex to which the ExecutionVertex belongs
* @param taskName
* @param totalNumberOfSubTasks denotes the number of parallel subtasks
* @param subtaskIndex denotes the index of the ExecutionVertex
* @param executionID
* @param newExecutionState
* @param timestamp of the execution state change
* @param optionalMessage
*/
case class ExecutionStateChanged(jobID: JobID, vertexID: JobVertexID,
taskName: String, totalNumberOfSubTasks: Int, subtask: Int,
taskName: String, totalNumberOfSubTasks: Int, subtaskIndex: Int,
executionID: ExecutionAttemptID,
newExecutionState: ExecutionState, timestamp: Long,
optionalMessage: String){
override def toString: String = {
s"${timestampToString(timestamp)}\t$taskName(${subtask +
s"${timestampToString(timestamp)}\t$taskName(${subtaskIndex +
1}/${totalNumberOfSubTasks}) switched to $newExecutionState ${if(optionalMessage != null)
s"\n${optionalMessage}" else ""}"
}
}
/**
* Denotes the job state change of a job.
*
* @param jobID identifying the correspong job
* @param newJobStatus
* @param timestamp
* @param optionalMessage
*/
case class JobStatusChanged(jobID: JobID, newJobStatus: JobStatus, timestamp: Long,
optionalMessage: String){
override def toString: String = {
......@@ -46,7 +70,6 @@ object ExecutionGraphMessages {
}
}
private val DATE_FORMATTER: SimpleDateFormat = new SimpleDateFormat("MM/dd/yyyy HH:mm:ss")
private def timestampToString(timestamp: Long): String = {
......
......@@ -20,7 +20,31 @@ package org.apache.flink.runtime.messages
import org.apache.flink.runtime.jobgraph.JobGraph
/**
* This object contains the [[org.apache.flink.runtime.client.JobClient]] specific messages
*/
object JobClientMessages {
/**
* This message submits a jobGraph to the JobClient which sends it to the JobManager. The
* JobClient waits until the job has been executed. If listenToEvents is true,
* then the JobClient prints all state change messages to the console. The
* JobClient sends the result of the execution back to the sender. If the execution is
* successful then a [[org.apache.flink.runtime.messages.JobManagerMessages.JobResult]] is sent
* back. If a [[org.apache.flink.runtime.messages.JobManagerMessages.SubmissionFailure]]
* happens, then the cause is sent back to the sender().
*
* @param jobGraph containing the job description
* @param listenToEvents if true then print state change messages
*/
case class SubmitJobAndWait(jobGraph: JobGraph, listenToEvents: Boolean = false)
case class SubmitJobDetached(jobGraph: JobGraph, listenToEvents: Boolean = false)
/**
* This message submits a jobGraph to the JobClient which sends it to the JobManager. The
* JobClient awaits the [[org.apache.flink.runtime.messages.JobManagerMessages.SubmissionResponse]]
* from the JobManager and sends it back to the sender().
*
* @param jobGraph containing the job description
*/
case class SubmitJobDetached(jobGraph: JobGraph)
}
......@@ -20,7 +20,15 @@ package org.apache.flink.runtime.messages
import org.apache.flink.runtime.profiling.impl.types.ProfilingDataContainer
/**
* This object contains the job manager profiler messages
*/
object JobManagerProfilerMessages {
/**
* Reports profiling data to the profiler.
* @param profilingDataContainer
*/
case class ReportProfilingData(profilingDataContainer: ProfilingDataContainer)
}
......@@ -26,30 +26,99 @@ import org.apache.flink.runtime.io.network.channels.ChannelID
import org.apache.flink.runtime.jobgraph.{JobStatus, JobVertexID, JobID, JobGraph}
import org.apache.flink.runtime.taskmanager.TaskExecutionState
/**
* The job manager specific messages
*/
object JobManagerMessages {
case class SubmitJob(jobGraph: JobGraph, listenToEvents: Boolean = false,
/**
* Submits a job to the job manager. If [[registerForEvents]] is true,
* then the sender will be registered as listener for the state change messages. If [[detach]]
* is set to true, then the sender will detach from the job execution. Consequently,
* he will not receive the job execution result [[JobResult]]. The submission result will be sent
* back to the
* sender as a [[SubmissionResponse]] message.
*
* @param jobGraph
* @param registerForEvents if true, then register for state change events
* @param detach if true, then detach from the job execution
*/
case class SubmitJob(jobGraph: JobGraph, registerForEvents: Boolean = false,
detach: Boolean = false)
/**
* Cancels a job with the given [[jobID]] at the JobManager. The result of the cancellation is
* sent back to the sender as a [[CancellationResponse]] message.
*
* @param jobID
*/
case class CancelJob(jobID: JobID)
/**
* Denotes a state change of a task at the JobManager. The update success is acknowledged by a
* boolean value which is sent back to the sender.
*
* @param taskExecutionState
*/
case class UpdateTaskExecutionState(taskExecutionState: TaskExecutionState)
/**
* Requesting next input split for the [[org.apache.flink.runtime.executiongraph.ExecutionJobVertex]]
* of the job specified by [[jobID]]. The next input split is sent back to the sender as a
* [[org.apache.flink.runtime.messages.TaskManagerMessages.NextInputSplit]] message.
*
* @param jobID
* @param vertexID
*/
case class RequestNextInputSplit(jobID: JobID, vertexID: JobVertexID)
/**
* Looks up the connection information of a task being the source of a channel specified by
* [[sourceChannelID]]. The caller denotes the instance information of the task requesting the
* lookup information. The connection information is sent back to the sender as a
* [[ConnectionInformation]] message.
*
* @param caller instance on which the task requesting the connection information runs
* @param jobID
* @param sourceChannelID denoting the channel whose producer shall be found
*/
case class LookupConnectionInformation(caller: InstanceConnectionInfo, jobID: JobID,
sourceChannelID: ChannelID)
/**
* Contains the connection lookup information of a lookup request triggered by
* [[LookupConnectionInformation]].
*
* @param response
*/
case class ConnectionInformation(response: ConnectionInfoLookupResponse)
/**
* Reports the accumulator results of the individual tasks to the job manager.
*
* @param accumulatorEvent
*/
case class ReportAccumulatorResult(accumulatorEvent: AccumulatorEvent)
/**
* Requests the accumulator results of the job identified by [[jobID]] from the job manager.
* The result is sent back to the sender as a [[AccumulatorResultsResponse]] message.
*
* @param jobID
*/
case class RequestAccumulatorResults(jobID: JobID)
sealed trait AccumulatorResultsResponse{
val jobID: JobID
}
/**
* Contains the retrieved accumulator results from the job manager. This response is triggered
* by [[RequestAccumulatorResults]].
*
* @param jobID
* @param results
*/
case class AccumulatorResultsFound(jobID: JobID, results: Map[String,
Object]) extends AccumulatorResultsResponse{
def asJavaMap: java.util.Map[String, Object] = {
......@@ -58,53 +127,134 @@ object JobManagerMessages {
}
}
/**
* Denotes that no accumulator results for [[jobID]] could be found at the job manager.
* @param jobID
*/
case class AccumulatorResultsNotFound(jobID: JobID) extends AccumulatorResultsResponse
/**
* Requests the current [[JobStatus]] of the job identified by [[jobID]]. This message triggers
* as response a [[JobStatusResponse]] message.
*
* @param jobID
*/
case class RequestJobStatus(jobID: JobID)
sealed trait JobStatusResponse {
def jobID: JobID
}
/**
* Denotes the current [[JobStatus]] of the job with [[jobID]].
*
* @param jobID
* @param status
*/
case class CurrentJobStatus(jobID: JobID, status: JobStatus) extends JobStatusResponse
case object RequestInstances
/**
* Requests the number of currently registered task manager at the job manager. The result is
* sent back to the sender as an [[Int]].
*/
case object RequestNumberRegisteredTaskManager
case object RequestAvailableSlots
/**
* Requests the maximum number of slots available to the job manager. The result is sent back
* to the sender as an [[Int]].
*/
case object RequestTotalNumberOfSlots
/**
* Requests the port of the blob manager from the job manager. The result is sent back to the
* sender as an [[Int]].
*/
case object RequestBlobManagerPort
/**
* Requests the final job status of the job with [[jobID]]. If the job has not been terminated
* then the result is sent back upon termination of the job. The result is a
* [[JobStatusResponse]] message.
*
* @param jobID
*/
case class RequestFinalJobStatus(jobID: JobID)
sealed trait JobResult{
def jobID: JobID
}
/**
* Denotes a successful job execution.
*
* @param jobID
* @param runtime
* @param accumulatorResults
*/
case class JobResultSuccess(jobID: JobID, runtime: Long, accumulatorResults: java.util.Map[String,
AnyRef]) extends JobResult {}
/**
* Denotes a cancellation of the job.
* @param jobID
* @param msg
*/
case class JobResultCanceled(jobID: JobID, msg: String)
/**
* Denotes a failed job execution.
* @param jobID
* @param msg
*/
case class JobResultFailed(jobID: JobID, msg:String)
sealed trait SubmissionResponse{
def jobID: JobID
}
/**
* Denotes a successful job submission.
* @param jobID
*/
case class SubmissionSuccess(jobID: JobID) extends SubmissionResponse
/**
* Denotes a failed job submission. The cause of the failure is denoted by [[cause]].
*
* @param jobID
* @param cause of the submission failure
*/
case class SubmissionFailure(jobID: JobID, cause: Throwable) extends SubmissionResponse
sealed trait CancellationResponse{
def jobID: JobID
}
/**
* Denotes a successful job cancellation
* @param jobID
*/
case class CancellationSuccess(jobID: JobID) extends CancellationResponse
/**
* Denotes a failed job cancellation
* @param jobID
* @param cause
*/
case class CancellationFailure(jobID: JobID, cause: Throwable) extends CancellationResponse
/**
* Requests all currently running jobs from the job manager. This message triggers a
* [[RunningJobs]] response.
*/
case object RequestRunningJobs
/**
* This message is the response to the [[RequestRunningJobs]] message. It contains all
* execution graphs of the currently running jobs.
*
* @param runningJobs
*/
case class RunningJobs(runningJobs: Iterable[ExecutionGraph]) {
def this() = this(Seq())
def asJavaIterable: java.lang.Iterable[ExecutionGraph] = {
......@@ -113,16 +263,45 @@ object JobManagerMessages {
}
}
/**
* Requests the execution graph of a specific job identified by [[jobID]]. The result is sent
* back to the sender as a [[JobResponse]].
* @param jobID
*/
case class RequestJob(jobID: JobID)
sealed trait JobResponse{
def jobID: JobID
}
/**
* Contains the [[executionGraph]] of a job with [[jobID]]. This is the response to
* [[RequestJob]] if the job runs or is archived.
*
* @param jobID
* @param executionGraph
*/
case class JobFound(jobID: JobID, executionGraph: ExecutionGraph) extends JobResponse
/**
* Denotes that there is no job with [[jobID]] retrievable. This message can be the response of
* [[RequestJob]] or [[RequestJobStatus]].
*
* @param jobID
*/
case class JobNotFound(jobID: JobID) extends JobResponse with JobStatusResponse
/**
* Requests the instances of all registered task managers.
*/
case object RequestRegisteredTaskManagers
/**
* Contains the [[Instance]] objects of all registered task managers. It is the response to the
* message [[RequestRegisteredTaskManagers]].
*
* @param taskManagers
*/
case class RegisteredTaskManagers(taskManagers: Iterable[Instance]){
def asJavaIterable: java.lang.Iterable[Instance] = {
import scala.collection.JavaConverters._
......
......@@ -22,10 +22,25 @@ import org.apache.flink.runtime.instance.{InstanceConnectionInfo, InstanceID, Ha
object RegistrationMessages {
/**
* Registers a task manager at the job manager. A successful registration is acknowledged by
* [[AcknowledgeRegistration]].
*
* @param connectionInfo
* @param hardwareDescription
* @param numberOfSlots
*/
case class RegisterTaskManager(connectionInfo: InstanceConnectionInfo,
hardwareDescription: HardwareDescription,
numberOfSlots: Int)
/**
* Denotes the successful registration of a task manager at the job manager. This is the
* response triggered by the [[RegisterTaskManager]] message.
*
* @param instanceID
* @param blobPort
*/
case class AcknowledgeRegistration(instanceID: InstanceID, blobPort: Int)
}
......@@ -24,21 +24,85 @@ import org.apache.flink.runtime.executiongraph.ExecutionAttemptID
import org.apache.flink.runtime.instance.InstanceID
object TaskManagerMessages {
/**
* Cancels the task associated with [[attemptID]]. The result is sent back to the sender as a
* [[TaskOperationResult]] message.
*
* @param attemptID
*/
case class CancelTask(attemptID: ExecutionAttemptID)
/**
* Submits a task to the task manager. The submission result is sent back to the sender as a
* [[TaskOperationResult]] message.
*
* @param tasks task deployment descriptor which contains the task relevant information
*/
case class SubmitTask(tasks: TaskDeploymentDescriptor)
/**
* Contains the next input split for a task. This message is a response to
* [[org.apache.flink.runtime.messages.JobManagerMessages.RequestNextInputSplit]].
*
* @param inputSplit
*/
case class NextInputSplit(inputSplit: InputSplit)
/**
* Unregisters a task identified by [[executionID]] from the task manager.
*
* @param executionID
*/
case class UnregisterTask(executionID: ExecutionAttemptID)
/**
* Reports whether a task manager operation has been successful or not. This message will be
* sent to the sender as a response to [[SubmitTask]] and [[CancelTask]].
*
* @param executionID identifying the respective task
* @param success indicating whether the operation has been successful
* @param description
*/
case class TaskOperationResult(executionID: ExecutionAttemptID, success: Boolean,
description: String = ""){
def this(executionID: ExecutionAttemptID, success: Boolean) = this(executionID, success, "")
}
/**
* Reports liveliness of an instance with [[instanceID]] to the
* [[org.apache.flink.runtime.instance.InstanceManager]]. This message is sent to the job
* manager which forwards it to the InstanceManager.
*
* @param instanceID
*/
case class Heartbeat(instanceID: InstanceID)
case object NotifyWhenRegisteredAtMaster
case object RegisteredAtMaster
case object RegisterAtMaster
/**
* Requests a notification from the task manager as soon as the task manager has been
* registered at the job manager. Once the task manager is registered at the job manager a
* [[RegisteredAtJobManager]] message will be sent to the sender.
*/
case object NotifyWhenRegisteredAtJobManager
/**
* Acknowledges that the task manager has been successfully registered at the job manager. This
* message is a response to [[NotifyWhenRegisteredAtJobManager]].
*/
case object RegisteredAtJobManager
/**
* Registers the sender as task manager at the job manager.
*/
case object RegisterAtJobManager
/**
* Makes the task manager sending a heartbeat message to the job manager.
*/
case object SendHeartbeat
case object AcknowledgeLibraryCacheUpdate
/**
* Logs the current memory usage as debug level output.
*/
case object LogMemoryUsage
}
......@@ -22,11 +22,34 @@ import org.apache.flink.runtime.executiongraph.ExecutionAttemptID
import org.apache.flink.runtime.taskmanager.Task
object TaskManagerProfilerMessages {
/**
* Requests to monitor the specified [[task]].
*
* @param task
*/
case class MonitorTask(task: Task)
/**
* Requests to unmonitor the task associated to [[executionID]].
*
* @param executionID
*/
case class UnmonitorTask(executionID: ExecutionAttemptID)
/**
* Registers the sender as a profiling event listener.
*/
case object RegisterProfilingListener
/**
* Unregisters the sender as a profiling event listener.
*/
case object UnregisterProfilingListener
/**
* Makes the task manager profiling the running tasks.
*/
case object ProfileTasks
}
......@@ -76,7 +76,7 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo, val jobManagerAkka
val REGISTRATION_DELAY = 0 seconds
val REGISTRATION_INTERVAL = 10 seconds
val MAX_REGISTRATION_ATTEMPTS = 1
val MAX_REGISTRATION_ATTEMPTS = 10
val HEARTBEAT_INTERVAL = 1000 millisecond
TaskManager.checkTempDirs(tmpDirPaths)
......@@ -84,7 +84,9 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo, val jobManagerAkka
val memoryManager = new DefaultMemoryManager(memorySize, numberOfSlots, pageSize)
val hardwareDescription = HardwareDescription.extractFromSystem(memoryManager.getMemorySize)
val fileCache = new FileCache()
val runningTasks = scala.collection.concurrent.TrieMap[ExecutionAttemptID, Task]()
val runningTasks = scala.collection.mutable.HashMap[ExecutionAttemptID, Task]()
// Actors which want to be notified once this task manager has been registered at the job manager
val waitForRegistration = scala.collection.mutable.Set[ActorRef]();
val profiler = profilingInterval match {
......@@ -151,13 +153,12 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo, val jobManagerAkka
registrationAttempts = 0
import context.dispatcher
registrationScheduler = Some(context.system.scheduler.schedule(REGISTRATION_DELAY,
REGISTRATION_INTERVAL,
self, RegisterAtMaster))
REGISTRATION_INTERVAL, self, RegisterAtJobManager))
}
override def receiveWithLogMessages: Receive = {
case RegisterAtMaster => {
case RegisterAtJobManager => {
registrationAttempts += 1
if (registered) {
......@@ -198,7 +199,7 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo, val jobManagerAkka
}
for (listener <- waitForRegistration) {
listener ! RegisteredAtMaster
listener ! RegisteredAtJobManager
}
waitForRegistration.clear()
......@@ -240,62 +241,57 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo, val jobManagerAkka
val task = new Task(jobID, vertexID, taskIndex, numSubtasks, executionID,
tdd.getTaskName, this)
runningTasks.putIfAbsent(executionID, task) match {
runningTasks.put(executionID, task) match {
case Some(_) => throw new RuntimeException(s"TaskManager contains already a task with " +
s"executionID ${executionID}.")
case None =>
}
var success = false
try {
val splitProvider = new TaskInputSplitProvider(currentJobManager, jobID, vertexID)
val env = new RuntimeEnvironment(task, tdd, userCodeClassLoader, memoryManager,
ioManager, splitProvider,
currentJobManager)
task.setEnvironment(env)
// register the task with the network stack and profilers
channelManager match {
case Some(cm) => cm.register(task)
case None => throw new RuntimeException("ChannelManager has not been properly " +
"instantiated.")
}
val splitProvider = new TaskInputSplitProvider(currentJobManager, jobID, vertexID)
val env = new RuntimeEnvironment(task, tdd, userCodeClassLoader, memoryManager,
ioManager, splitProvider,currentJobManager)
val jobConfig = tdd.getJobConfiguration
task.setEnvironment(env)
if (jobConfig.getBoolean(ProfilingUtils.PROFILE_JOB_KEY, true)) {
profiler match {
case Some(profiler) => profiler ! MonitorTask(task)
case None => log.info("There is no profiling enabled for the task manager.")
}
}
// register the task with the network stack and profilers
channelManager match {
case Some(cm) => cm.register(task)
case None => throw new RuntimeException("ChannelManager has not been properly " +
"instantiated.")
}
val cpTasks = new util.HashMap[String, FutureTask[Path]]()
val jobConfig = tdd.getJobConfiguration
for (entry <- DistributedCache.readFileInfoFromConfig(tdd.getJobConfiguration)) {
val cp = fileCache.createTmpFile(entry.getKey, entry.getValue, jobID)
cpTasks.put(entry.getKey, cp)
if (jobConfig.getBoolean(ProfilingUtils.PROFILE_JOB_KEY, true)) {
profiler match {
case Some(profiler) => profiler ! MonitorTask(task)
case None => log.info("There is no profiling enabled for the task manager.")
}
env.addCopyTasksForCacheFile(cpTasks)
}
if (!task.startExecution()) {
throw new RuntimeException("Cannot start task. Task was canceled or failed.")
}
val cpTasks = new util.HashMap[String, FutureTask[Path]]()
success = true
sender() ! TaskOperationResult(executionID, true)
} finally {
if (!success) {
runningTasks.remove(executionID)
for (entry <- DistributedCache.readFileInfoFromConfig(tdd.getJobConfiguration)) {
fileCache.deleteTmpFile(entry.getKey, entry.getValue, jobID)
}
}
for (entry <- DistributedCache.readFileInfoFromConfig(tdd.getJobConfiguration)) {
val cp = fileCache.createTmpFile(entry.getKey, entry.getValue, jobID)
cpTasks.put(entry.getKey, cp)
}
env.addCopyTasksForCacheFile(cpTasks)
if (!task.startExecution()) {
throw new RuntimeException("Cannot start task. Task was canceled or failed.")
}
sender() ! TaskOperationResult(executionID, true)
} catch {
case t: Throwable =>
log.error(t, s"Could not instantiate task with execution ID ${executionID}.")
runningTasks.remove(executionID)
for (entry <- DistributedCache.readFileInfoFromConfig(tdd.getJobConfiguration)) {
fileCache.deleteTmpFile(entry.getKey, entry.getValue, jobID)
}
if (jarsRegistered) {
try {
libraryCacheManager.unregister(jobID)
......@@ -324,9 +320,9 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo, val jobManagerAkka
}
}
case NotifyWhenRegisteredAtMaster => {
case NotifyWhenRegisteredAtJobManager => {
registered match {
case true => sender() ! RegisteredAtMaster
case true => sender() ! RegisteredAtJobManager
case false => waitForRegistration += sender()
}
}
......@@ -384,7 +380,7 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo, val jobManagerAkka
def setupChannelManager(): Unit = {
//shutdown existing channel manager
channelManager map {
channelManager foreach {
cm =>
try {
cm.shutdown()
......
......@@ -55,7 +55,7 @@ import org.apache.flink.runtime.messages.RegistrationMessages;
import org.apache.flink.runtime.messages.TaskManagerMessages.TaskOperationResult;
import org.apache.flink.runtime.messages.TaskManagerMessages.SubmitTask;
import org.apache.flink.runtime.messages.TaskManagerMessages.CancelTask;
import org.apache.flink.runtime.messages.TaskManagerMessages.NotifyWhenRegisteredAtMaster$;
import org.apache.flink.runtime.messages.TaskManagerMessages.NotifyWhenRegisteredAtJobManager$;
import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.types.IntegerRecord;
......@@ -537,7 +537,7 @@ public class TaskManagerTest {
ActorRef taskManager = TestingUtils.startTestingTaskManagerWithConfiguration("localhost", cfg, system);
Future<Object> response = Patterns.ask(taskManager, NotifyWhenRegisteredAtMaster$.MODULE$,
Future<Object> response = Patterns.ask(taskManager, NotifyWhenRegisteredAtJobManager$.MODULE$,
AkkaUtils.FUTURE_TIMEOUT());
try {
......
......@@ -55,7 +55,7 @@ WordSpecLike with Matchers with BeforeAndAfterAll {
val jm = cluster.getJobManager
try {
val availableSlots = AkkaUtils.ask[Int](jm, RequestAvailableSlots)
val availableSlots = AkkaUtils.ask[Int](jm, RequestTotalNumberOfSlots)
availableSlots should equal(1)
within(1 second) {
......@@ -91,7 +91,7 @@ WordSpecLike with Matchers with BeforeAndAfterAll {
val jm = cluster.getJobManager
try {
val availableSlots = AkkaUtils.ask[Int](jm, RequestAvailableSlots)
val availableSlots = AkkaUtils.ask[Int](jm, RequestTotalNumberOfSlots)
availableSlots should equal(num_tasks)
within(TestingUtils.TESTING_DURATION) {
......@@ -337,7 +337,7 @@ WordSpecLike with Matchers with BeforeAndAfterAll {
try {
within(TestingUtils.TESTING_DURATION) {
jm ! RequestAvailableSlots
jm ! RequestTotalNumberOfSlots
expectMsg(num_tasks)
}
......@@ -380,7 +380,7 @@ WordSpecLike with Matchers with BeforeAndAfterAll {
try {
within(TestingUtils.TESTING_DURATION) {
jm ! RequestAvailableSlots
jm ! RequestTotalNumberOfSlots
expectMsg(num_tasks)
}
......@@ -461,7 +461,7 @@ WordSpecLike with Matchers with BeforeAndAfterAll {
try {
within(TestingUtils.TESTING_DURATION) {
jm ! RequestAvailableSlots
jm ! RequestTotalNumberOfSlots
expectMsg(num_tasks)
jm ! SubmitJob(jobGraph)
......@@ -502,7 +502,7 @@ WordSpecLike with Matchers with BeforeAndAfterAll {
try {
within(TestingUtils.TESTING_DURATION) {
jm ! RequestAvailableSlots
jm ! RequestTotalNumberOfSlots
expectMsg(num_tasks)
jm ! SubmitJob(jobGraph)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册