From 94cfe14879bf6d399d816d267b00c8a99eee8165 Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Thu, 18 Dec 2014 11:37:23 +0100 Subject: [PATCH] Fixes race condition in ExecutionGraph which allowed a job to go into the finished state without all job vertices having properly processed the finalizeOnMaster method. --- .../executiongraph/ExecutionJobVertex.java | 7 +- .../runtime/jobmanager/JobManagerITCase.java | 75 +++++++++++++++++++ 2 files changed, 80 insertions(+), 2 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java index 9f8b56a486f..3ac0962b28e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java @@ -323,9 +323,8 @@ public class ExecutionJobVertex { synchronized (stateMonitor) { if (!finishedSubtasks[subtask]) { finishedSubtasks[subtask] = true; - numSubtasksInFinalState++; - if (numSubtasksInFinalState == parallelism) { + if (numSubtasksInFinalState+1 == parallelism) { // call finalizeOnMaster hook try { @@ -334,12 +333,16 @@ public class ExecutionJobVertex { catch (Throwable t) { getGraph().fail(t); } + + numSubtasksInFinalState++; // we are in our final state stateMonitor.notifyAll(); // tell the graph graph.jobVertexInFinalState(this); + }else{ + numSubtasksInFinalState++; } } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerITCase.java index ae7857f9c72..3f1c3b6941a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerITCase.java @@ -793,6 +793,51 @@ public class JobManagerITCase { fail(e.getMessage()); } } + + @Test + public void testSubtaskInFinalStateRaceCondition() { + final int NUM_TASKS = 1; + + try{ + final AbstractJobVertex source = new AbstractJobVertex("Source"); + final WaitingOnFinalizeJobVertex sink = new WaitingOnFinalizeJobVertex("Sink", 500); + source.setInvokableClass(WaitingNoOpInvokable.class); + sink.setInvokableClass(NoOpInvokable.class); + + source.setParallelism(NUM_TASKS); + sink.setParallelism(NUM_TASKS); + + final JobGraph jobGraph = new JobGraph("SubtaskInFinalStateRaceCondition", source, + sink); + + final JobManager jm = startJobManager(2*NUM_TASKS); + + try{ + JobSubmissionResult result = jm.submitJob(jobGraph); + + if(result.getReturnCode() != AbstractJobResult.ReturnCode.SUCCESS){ + fail(result.getDescription()); + } + + ExecutionGraph eg = jm.getCurrentJobs().get(jobGraph.getJobID()); + + if(eg != null){ + eg.waitForJobEnd(); + assertEquals(JobStatus.FINISHED, eg.getState()); + } + + assertTrue("Sink has to have called finalizeOnMaster before job can finish.", sink + .finished); + + waitForTaskThreadsToBeTerminated(); + }finally{ + jm.shutdown(); + } + }catch(Exception e){ + e.printStackTrace(); + fail(e.getMessage()); + } + } // -------------------------------------------------------------------------------------------- // Simple test tasks @@ -886,4 +931,34 @@ public class JobManagerITCase { } } } + + public static final class WaitingOnFinalizeJobVertex extends AbstractJobVertex { + private final long waitingTime; + public boolean finished = false; + + public WaitingOnFinalizeJobVertex(String name, long waitingTime) { + super(name); + this.waitingTime = waitingTime; + } + + @Override + public void finalizeOnMaster(ClassLoader loader) throws Exception { + Thread.sleep(waitingTime); + finished = true; + } + } + + public static final class WaitingNoOpInvokable extends AbstractInvokable{ + private static long waitingTime = 100; + + @Override + public void registerInputOutput() { + + } + + @Override + public void invoke() throws Exception { + Thread.sleep(waitingTime); + } + } } -- GitLab