From ae0975c16b997fe792790216820a559d01a01894 Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Tue, 29 Nov 2016 16:02:29 +0100 Subject: [PATCH] [FLINK-5197] [jm] Ignore outdated JobStatusChanged messages Outdated JobStatusChanged messages no longer trigger a RemoveJob message but are logged and ignored. This has the advantage, that an outdated JobStatusChanged message cannot interfere with a recovered job which can have the same job id. --- .../apache/flink/runtime/jobmanager/JobManager.scala | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index 233dbda03d4..7cddb2b644f 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -836,7 +836,7 @@ class JobManager( } }(context.dispatcher) - case JobStatusChanged(jobID, newJobStatus, timeStamp, error) => + case msg @ JobStatusChanged(jobID, newJobStatus, timeStamp, error) => currentJobs.get(jobID) match { case Some((executionGraph, jobInfo)) => executionGraph.getJobName @@ -911,8 +911,7 @@ class JobManager( } }(context.dispatcher) } - case None => - self ! decorateMessage(RemoveJob(jobID, removeJobFromStateBackend = true)) + case None => log.debug(s"Received $msg for nonexistent job $jobID.") } case ScheduleOrUpdateConsumers(jobId, partitionId) => @@ -1077,7 +1076,7 @@ class JobManager( futuresToComplete = Some(futuresToComplete.getOrElse(Seq()) :+ futureToComplete) case None => } - case None => + case None => log.debug(s"Tried to remove nonexistent job $jobID.") } case RemoveCachedJob(jobID) => @@ -1711,14 +1710,14 @@ class JobManager( // shutdown to release all resources. submittedJobGraphs.removeJobGraph(jobID) } catch { - case t: Throwable => log.error(s"Could not remove submitted job graph $jobID.", t) + case t: Throwable => log.warn(s"Could not remove submitted job graph $jobID.", t) } }(context.dispatcher)) try { archive ! decorateMessage(ArchiveExecutionGraph(jobID, eg.archive())) } catch { - case t: Throwable => log.error(s"Could not archive the execution graph $eg.", t) + case t: Throwable => log.warn(s"Could not archive the execution graph $eg.", t) } futureOption -- GitLab