From 10642f72f72287063ddb08777e5de349c60ba6ff Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Mon, 1 Aug 2016 18:05:14 +0200 Subject: [PATCH] [FLINK-4296] Fixes failure reporting of consumer task scheduling when producer has already finished This PR changes the failure behaviour such that the consumer task is failed instead of the producer task. The latter is problematic, since a finsihed producer task will simply swallow scheduling exception originating from scheduling the consumer task. This closes #2321. --- .../runtime/executiongraph/Execution.java | 2 +- .../ExecutionGraphDeploymentTest.java | 64 +++++++++++++++++++ 2 files changed, 65 insertions(+), 1 deletion(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java index 1b321006474..fd296c307ca 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java @@ -548,7 +548,7 @@ public class Execution implements Serializable { consumerVertex.getExecutionGraph().getScheduler(), consumerVertex.getExecutionGraph().isQueuedSchedulingAllowed()); } catch (Throwable t) { - fail(new IllegalStateException("Could not schedule consumer " + + consumerVertex.fail(new IllegalStateException("Could not schedule consumer " + "vertex " + consumerVertex, t)); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java index a599f42a58c..2d0ae4175b4 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java @@ -27,11 +27,15 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.Comparator; +import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.accumulators.AccumulatorRegistry; +import org.apache.flink.runtime.accumulators.AccumulatorSnapshot; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor; import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor; @@ -40,12 +44,16 @@ import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy; import org.apache.flink.runtime.instance.Instance; import org.apache.flink.runtime.instance.SimpleSlot; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.DistributionPattern; import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException; import org.apache.flink.runtime.jobmanager.scheduler.Scheduler; import org.apache.flink.runtime.operators.BatchTask; +import org.apache.flink.runtime.taskmanager.TaskExecutionState; import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.util.SerializedValue; @@ -276,6 +284,62 @@ public class ExecutionGraphDeploymentTest { } } + @Test + /** + * Tests that a blocking batch job fails if there are not enough resources left to schedule the + * succeeding tasks. This test case is related to [FLINK-4296] where finished producing tasks + * swallow the fail exception when scheduling a consumer task. + */ + public void testNoResourceAvailableFailure() throws Exception { + final JobID jobId = new JobID(); + JobVertex v1 = new JobVertex("source"); + JobVertex v2 = new JobVertex("sink"); + + int dop1 = 1; + int dop2 = 1; + + v1.setParallelism(dop1); + v2.setParallelism(dop2); + + v1.setInvokableClass(BatchTask.class); + v2.setInvokableClass(BatchTask.class); + + v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE, ResultPartitionType.BLOCKING, false); + + // execution graph that executes actions synchronously + ExecutionGraph eg = new ExecutionGraph( + TestingUtils.directExecutionContext(), + jobId, + "failing test job", + new Configuration(), + new SerializedValue<>(new ExecutionConfig()), + AkkaUtils.getDefaultTimeout(), + new NoRestartStrategy()); + + eg.setQueuedSchedulingAllowed(false); + + List ordered = Arrays.asList(v1, v2); + eg.attachJobGraph(ordered); + + Scheduler scheduler = new Scheduler(TestingUtils.directExecutionContext()); + for (int i = 0; i < dop1; i++) { + scheduler.newInstanceAvailable( + ExecutionGraphTestUtils.getInstance( + new ExecutionGraphTestUtils.SimpleActorGateway( + TestingUtils.directExecutionContext()))); + } + assertEquals(dop1, scheduler.getNumberOfAvailableSlots()); + + // schedule, this triggers mock deployment + eg.scheduleForExecution(scheduler); + + ExecutionAttemptID attemptID = eg.getJobVertex(v1.getID()).getTaskVertices()[0].getCurrentExecutionAttempt().getAttemptId(); + eg.updateState(new TaskExecutionState(jobId, attemptID, ExecutionState.RUNNING)); + eg.updateState(new TaskExecutionState(jobId, attemptID, ExecutionState.FINISHED, null, new AccumulatorSnapshot(jobId, attemptID, new HashMap>(), new HashMap>()))); + + assertEquals(JobStatus.FAILED, eg.getState()); + } + private Map setupExecution(JobVertex v1, int dop1, JobVertex v2, int dop2) throws Exception { final JobID jobId = new JobID(); -- GitLab