diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java b/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java index 1ee14138435e3bbbaceb5b5b35ad5fde8d6929ac..b55dea0e59a6d3d779ace193ed8d373d07a09490 100644 --- a/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java +++ b/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java @@ -31,6 +31,7 @@ import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.flink.api.common.InvalidProgramException; import org.apache.flink.api.common.Plan; import org.apache.flink.api.common.operators.Operator; import org.apache.flink.api.common.operators.Union; @@ -704,6 +705,10 @@ public class PactCompiler { n = new BinaryUnionNode((Union) c); } else if (c instanceof PartialSolutionPlaceHolder) { + if (this.parent == null) { + throw new InvalidProgramException("It is currently not supported to create data sinks inside iterations."); + } + final PartialSolutionPlaceHolder holder = (PartialSolutionPlaceHolder) c; final BulkIterationBase enclosingIteration = holder.getContainingBulkIteration(); final BulkIterationNode containingIterationNode = @@ -715,6 +720,10 @@ public class PactCompiler { n = p; } else if (c instanceof WorksetPlaceHolder) { + if (this.parent == null) { + throw new InvalidProgramException("It is currently not supported to create data sinks inside iterations."); + } + final WorksetPlaceHolder holder = (WorksetPlaceHolder) c; final DeltaIterationBase enclosingIteration = holder.getContainingWorksetIteration(); final WorksetIterationNode containingIterationNode = @@ -726,6 +735,10 @@ public class PactCompiler { n = p; } else if (c instanceof SolutionSetPlaceHolder) { + if (this.parent == null) { + throw new InvalidProgramException("It is currently not supported to create data sinks inside iterations."); + } + final SolutionSetPlaceHolder holder = (SolutionSetPlaceHolder) c; final DeltaIterationBase enclosingIteration = holder.getContainingWorksetIteration(); final WorksetIterationNode containingIterationNode =