提交 94cfe148 编写于 作者: T Till Rohrmann

Fixes race condition in ExecutionGraph which allowed a job to go into the...

Fixes race condition in ExecutionGraph which allowed a job to go into the finished state without all job vertices having properly processed the finalizeOnMaster method.
上级 63ef8e86
......@@ -323,9 +323,8 @@ public class ExecutionJobVertex {
synchronized (stateMonitor) {
if (!finishedSubtasks[subtask]) {
finishedSubtasks[subtask] = true;
numSubtasksInFinalState++;
if (numSubtasksInFinalState == parallelism) {
if (numSubtasksInFinalState+1 == parallelism) {
// call finalizeOnMaster hook
try {
......@@ -334,12 +333,16 @@ public class ExecutionJobVertex {
catch (Throwable t) {
getGraph().fail(t);
}
numSubtasksInFinalState++;
// we are in our final state
stateMonitor.notifyAll();
// tell the graph
graph.jobVertexInFinalState(this);
}else{
numSubtasksInFinalState++;
}
}
}
......
......@@ -793,6 +793,51 @@ public class JobManagerITCase {
fail(e.getMessage());
}
}
@Test
public void testSubtaskInFinalStateRaceCondition() {
final int NUM_TASKS = 1;
try{
final AbstractJobVertex source = new AbstractJobVertex("Source");
final WaitingOnFinalizeJobVertex sink = new WaitingOnFinalizeJobVertex("Sink", 500);
source.setInvokableClass(WaitingNoOpInvokable.class);
sink.setInvokableClass(NoOpInvokable.class);
source.setParallelism(NUM_TASKS);
sink.setParallelism(NUM_TASKS);
final JobGraph jobGraph = new JobGraph("SubtaskInFinalStateRaceCondition", source,
sink);
final JobManager jm = startJobManager(2*NUM_TASKS);
try{
JobSubmissionResult result = jm.submitJob(jobGraph);
if(result.getReturnCode() != AbstractJobResult.ReturnCode.SUCCESS){
fail(result.getDescription());
}
ExecutionGraph eg = jm.getCurrentJobs().get(jobGraph.getJobID());
if(eg != null){
eg.waitForJobEnd();
assertEquals(JobStatus.FINISHED, eg.getState());
}
assertTrue("Sink has to have called finalizeOnMaster before job can finish.", sink
.finished);
waitForTaskThreadsToBeTerminated();
}finally{
jm.shutdown();
}
}catch(Exception e){
e.printStackTrace();
fail(e.getMessage());
}
}
// --------------------------------------------------------------------------------------------
// Simple test tasks
......@@ -886,4 +931,34 @@ public class JobManagerITCase {
}
}
}
public static final class WaitingOnFinalizeJobVertex extends AbstractJobVertex {
private final long waitingTime;
public boolean finished = false;
public WaitingOnFinalizeJobVertex(String name, long waitingTime) {
super(name);
this.waitingTime = waitingTime;
}
@Override
public void finalizeOnMaster(ClassLoader loader) throws Exception {
Thread.sleep(waitingTime);
finished = true;
}
}
public static final class WaitingNoOpInvokable extends AbstractInvokable{
private static long waitingTime = 100;
@Override
public void registerInputOutput() {
}
@Override
public void invoke() throws Exception {
Thread.sleep(waitingTime);
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册