diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java index 1b3210064748e054b38009b2bbc4717fd21a091e..fd296c307ca91eec7e533e6281400bef17f9cc43 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java @@ -548,7 +548,7 @@ public class Execution implements Serializable { consumerVertex.getExecutionGraph().getScheduler(), consumerVertex.getExecutionGraph().isQueuedSchedulingAllowed()); } catch (Throwable t) { - fail(new IllegalStateException("Could not schedule consumer " + + consumerVertex.fail(new IllegalStateException("Could not schedule consumer " + "vertex " + consumerVertex, t)); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java index a599f42a58c0a6697a716be3d64af3403714db23..2d0ae4175b4dedd97ef308cb4b7ac2e47a4d19dc 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java @@ -27,11 +27,15 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.Comparator; +import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.accumulators.AccumulatorRegistry; +import org.apache.flink.runtime.accumulators.AccumulatorSnapshot; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor; import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor; @@ -40,12 +44,16 @@ import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy; import org.apache.flink.runtime.instance.Instance; import org.apache.flink.runtime.instance.SimpleSlot; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.DistributionPattern; import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException; import org.apache.flink.runtime.jobmanager.scheduler.Scheduler; import org.apache.flink.runtime.operators.BatchTask; +import org.apache.flink.runtime.taskmanager.TaskExecutionState; import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.util.SerializedValue; @@ -276,6 +284,62 @@ public class ExecutionGraphDeploymentTest { } } + @Test + /** + * Tests that a blocking batch job fails if there are not enough resources left to schedule the + * succeeding tasks. This test case is related to [FLINK-4296] where finished producing tasks + * swallow the fail exception when scheduling a consumer task. + */ + public void testNoResourceAvailableFailure() throws Exception { + final JobID jobId = new JobID(); + JobVertex v1 = new JobVertex("source"); + JobVertex v2 = new JobVertex("sink"); + + int dop1 = 1; + int dop2 = 1; + + v1.setParallelism(dop1); + v2.setParallelism(dop2); + + v1.setInvokableClass(BatchTask.class); + v2.setInvokableClass(BatchTask.class); + + v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE, ResultPartitionType.BLOCKING, false); + + // execution graph that executes actions synchronously + ExecutionGraph eg = new ExecutionGraph( + TestingUtils.directExecutionContext(), + jobId, + "failing test job", + new Configuration(), + new SerializedValue<>(new ExecutionConfig()), + AkkaUtils.getDefaultTimeout(), + new NoRestartStrategy()); + + eg.setQueuedSchedulingAllowed(false); + + List ordered = Arrays.asList(v1, v2); + eg.attachJobGraph(ordered); + + Scheduler scheduler = new Scheduler(TestingUtils.directExecutionContext()); + for (int i = 0; i < dop1; i++) { + scheduler.newInstanceAvailable( + ExecutionGraphTestUtils.getInstance( + new ExecutionGraphTestUtils.SimpleActorGateway( + TestingUtils.directExecutionContext()))); + } + assertEquals(dop1, scheduler.getNumberOfAvailableSlots()); + + // schedule, this triggers mock deployment + eg.scheduleForExecution(scheduler); + + ExecutionAttemptID attemptID = eg.getJobVertex(v1.getID()).getTaskVertices()[0].getCurrentExecutionAttempt().getAttemptId(); + eg.updateState(new TaskExecutionState(jobId, attemptID, ExecutionState.RUNNING)); + eg.updateState(new TaskExecutionState(jobId, attemptID, ExecutionState.FINISHED, null, new AccumulatorSnapshot(jobId, attemptID, new HashMap>(), new HashMap>()))); + + assertEquals(JobStatus.FAILED, eg.getState()); + } + private Map setupExecution(JobVertex v1, int dop1, JobVertex v2, int dop2) throws Exception { final JobID jobId = new JobID();