From 0190dd24622169a98be1a6ef518b0fdd018e2d44 Mon Sep 17 00:00:00 2001 From: Stephan Ewen Date: Thu, 18 Dec 2014 19:58:45 +0100 Subject: [PATCH] [FLINK-1357] [compiler] Add union between static and dynamic path --- .../apache/flink/compiler/PactCompiler.java | 72 ++++++--- .../compiler/plan/BinaryUnionPlanNode.java | 23 ++- .../compiler/plan/NAryUnionPlanNode.java | 7 +- .../UnionBetweenDynamicAndStaticPathTest.java | 143 ++++++++++++++++++ .../flink/compiler/UnionReplacementTest.java | 34 ++--- .../src/test/resources/log4j.properties | 27 ++++ .../runtime/operators/DriverStrategy.java | 5 +- .../operators/UnionWithTempOperator.java | 124 +++++++-------- .../UnionStaticDynamicIterationITCase.java | 55 +++++++ .../src/test/resources/log4j.properties | 27 ++++ 10 files changed, 412 insertions(+), 105 deletions(-) create mode 100644 flink-compiler/src/test/java/org/apache/flink/compiler/UnionBetweenDynamicAndStaticPathTest.java create mode 100644 flink-compiler/src/test/resources/log4j.properties create mode 100644 flink-tests/src/test/java/org/apache/flink/test/iterative/UnionStaticDynamicIterationITCase.java create mode 100644 flink-tests/src/test/resources/log4j.properties diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java b/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java index a63cfd107f6..4411d3e8133 100644 --- a/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java +++ b/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java @@ -95,6 +95,7 @@ import org.apache.flink.compiler.plan.WorksetPlanNode; import org.apache.flink.compiler.postpass.OptimizerPostPass; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.runtime.operators.DriverStrategy; import org.apache.flink.runtime.operators.shipping.ShipStrategyType; import org.apache.flink.runtime.operators.util.LocalStrategy; import org.apache.flink.util.InstantiationUtil; @@ -580,9 +581,7 @@ public class PactCompiler { // finalize the plan OptimizedPlan plan = new PlanFinalizer().createFinalPlan(bestPlanSinks, program.getJobName(), program); - - // swap the binary unions for n-ary unions. this changes no strategies or memory consumers whatsoever, so - // we can do this after the plan finalization + plan.accept(new BinaryUnionReplacer()); // post pass the plan. this is the phase where the serialization and comparator code is set @@ -1029,7 +1028,6 @@ public class PactCompiler { } } - @Override public void postVisit(OptimizerNode visitable) {} } @@ -1057,8 +1055,11 @@ public class PactCompiler { } /** - * Utility class that traverses a plan to collect all nodes and add them to the OptimizedPlan. - * Besides collecting all nodes, this traversal assigns the memory to the nodes. + * Finalization of the plan: + * - The graph of nodes is double-linked (links from child to parent are inserted) + * - If unions join static and dynamic paths, the cache is marked as a memory consumer + * - Relative memory fractions are assigned to all nodes. + * - All nodes are collected into a set. */ private static final class PlanFinalizer implements Visitor { @@ -1119,9 +1120,7 @@ public class PactCompiler { c.setRelativeTempMemory(relativeMem); if (LOG.isDebugEnabled()) { LOG.debug("Assigned " + relativeMem + " of total memory to each instance of the temp " + - "table" + - " " + - "for " + c + "."); + "table for " + c + "."); } } } @@ -1143,6 +1142,12 @@ public class PactCompiler { else if (visitable instanceof SourcePlanNode) { this.sources.add((SourcePlanNode) visitable); } + else if (visitable instanceof BinaryUnionPlanNode) { + BinaryUnionPlanNode unionNode = (BinaryUnionPlanNode) visitable; + if (unionNode.unionsStaticAndDynamicPath()) { + unionNode.setDriverStrategy(DriverStrategy.UNION_WITH_CACHED); + } + } else if (visitable instanceof BulkPartialSolutionPlanNode) { // tell the partial solution about the iteration node that contains it final BulkPartialSolutionPlanNode pspn = (BulkPartialSolutionPlanNode) visitable; @@ -1229,7 +1234,6 @@ public class PactCompiler { @Override public void postVisit(PlanNode visitable) {} } - /** * A visitor that traverses the graph and collects cascading binary unions into a single n-ary @@ -1256,24 +1260,50 @@ public class PactCompiler { public void postVisit(PlanNode visitable) { if (visitable instanceof BinaryUnionPlanNode) { + final BinaryUnionPlanNode unionNode = (BinaryUnionPlanNode) visitable; final Channel in1 = unionNode.getInput1(); final Channel in2 = unionNode.getInput2(); - PlanNode newUnionNode; + if (!unionNode.unionsStaticAndDynamicPath()) { + + // both on static path, or both on dynamic path. we can collapse them + NAryUnionPlanNode newUnionNode; - List inputs = new ArrayList(); - collect(in1, inputs); - collect(in2, inputs); + List inputs = new ArrayList(); + collect(in1, inputs); + collect(in2, inputs); - newUnionNode = new NAryUnionPlanNode(unionNode.getOptimizerNode(), inputs, unionNode.getGlobalProperties()); + newUnionNode = new NAryUnionPlanNode(unionNode.getOptimizerNode(), inputs, + unionNode.getGlobalProperties(), unionNode.getCumulativeCosts()); + + newUnionNode.setDegreeOfParallelism(unionNode.getDegreeOfParallelism()); - for (Channel c : inputs) { - c.setTarget(newUnionNode); - } + for (Channel c : inputs) { + c.setTarget(newUnionNode); + } - for(Channel channel : unionNode.getOutgoingChannels()){ - channel.swapUnionNodes(newUnionNode); + for (Channel channel : unionNode.getOutgoingChannels()) { + channel.swapUnionNodes(newUnionNode); + newUnionNode.addOutgoingChannel(channel); + } + } + else { + // union between the static and the dynamic path. we need to handle this for now + // through a special union operator + + // make sure that the first input is the cached (static) and the second input is the dynamic + if (in1.isOnDynamicPath()) { + BinaryUnionPlanNode newUnionNode = new BinaryUnionPlanNode(unionNode); + + in1.setTarget(newUnionNode); + in2.setTarget(newUnionNode); + + for (Channel channel : unionNode.getOutgoingChannels()) { + channel.swapUnionNodes(newUnionNode); + newUnionNode.addOutgoingChannel(channel); + } + } } } } @@ -1290,7 +1320,7 @@ public class PactCompiler { inputs.addAll(((NAryUnionPlanNode) in.getSource()).getListOfInputs()); } else { - // is not a union node, so we take the channel directly + // is not a collapsed union node, so we take the channel directly inputs.add(in); } } diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/plan/BinaryUnionPlanNode.java b/flink-compiler/src/main/java/org/apache/flink/compiler/plan/BinaryUnionPlanNode.java index 8e5bb81c170..039952df7fe 100644 --- a/flink-compiler/src/main/java/org/apache/flink/compiler/plan/BinaryUnionPlanNode.java +++ b/flink-compiler/src/main/java/org/apache/flink/compiler/plan/BinaryUnionPlanNode.java @@ -16,10 +16,8 @@ * limitations under the License. */ - package org.apache.flink.compiler.plan; - import org.apache.flink.compiler.dag.BinaryUnionNode; import org.apache.flink.runtime.operators.DriverStrategy; @@ -35,7 +33,28 @@ public class BinaryUnionPlanNode extends DualInputPlanNode { super(template, "Union", in1, in2, DriverStrategy.UNION); } + public BinaryUnionPlanNode(BinaryUnionPlanNode toSwapFrom) { + super(toSwapFrom.getOptimizerNode(), "Union-With-Cached", toSwapFrom.getInput2(), toSwapFrom.getInput1(), + DriverStrategy.UNION_WITH_CACHED); + + this.globalProps = toSwapFrom.globalProps; + this.localProps = toSwapFrom.localProps; + this.nodeCosts = toSwapFrom.nodeCosts; + this.cumulativeCosts = toSwapFrom.cumulativeCosts; + + setDegreeOfParallelism(toSwapFrom.getDegreeOfParallelism()); + } + public BinaryUnionNode getOptimizerNode() { return (BinaryUnionNode) this.template; } + + public boolean unionsStaticAndDynamicPath() { + return getInput1().isOnDynamicPath() != getInput2().isOnDynamicPath(); + } + + @Override + public int getMemoryConsumerWeight() { + return 0; + } } diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/plan/NAryUnionPlanNode.java b/flink-compiler/src/main/java/org/apache/flink/compiler/plan/NAryUnionPlanNode.java index b7ed02310c0..a1b131289bf 100644 --- a/flink-compiler/src/main/java/org/apache/flink/compiler/plan/NAryUnionPlanNode.java +++ b/flink-compiler/src/main/java/org/apache/flink/compiler/plan/NAryUnionPlanNode.java @@ -23,6 +23,7 @@ import java.util.Collections; import java.util.Iterator; import java.util.List; +import org.apache.flink.compiler.costs.Costs; import org.apache.flink.compiler.dag.BinaryUnionNode; import org.apache.flink.compiler.dataproperties.GlobalProperties; import org.apache.flink.compiler.dataproperties.LocalProperties; @@ -40,12 +41,16 @@ public class NAryUnionPlanNode extends PlanNode { /** * @param template */ - public NAryUnionPlanNode(BinaryUnionNode template, List inputs, GlobalProperties gProps) { + public NAryUnionPlanNode(BinaryUnionNode template, List inputs, GlobalProperties gProps, + Costs cumulativeCosts) + { super(template, "Union", DriverStrategy.NONE); this.inputs = inputs; this.globalProps = gProps; this.localProps = new LocalProperties(); + this.nodeCosts = new Costs(); + this.cumulativeCosts = cumulativeCosts; } @Override diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/UnionBetweenDynamicAndStaticPathTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/UnionBetweenDynamicAndStaticPathTest.java new file mode 100644 index 00000000000..9cb6b2e6412 --- /dev/null +++ b/flink-compiler/src/test/java/org/apache/flink/compiler/UnionBetweenDynamicAndStaticPathTest.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.compiler; + +import static org.junit.Assert.*; + +import org.apache.flink.api.common.Plan; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.operators.IterativeDataSet; +import org.apache.flink.compiler.plan.BinaryUnionPlanNode; +import org.apache.flink.compiler.plan.BulkIterationPlanNode; +import org.apache.flink.compiler.plan.Channel; +import org.apache.flink.compiler.plan.NAryUnionPlanNode; +import org.apache.flink.compiler.plan.OptimizedPlan; +import org.apache.flink.compiler.plan.SingleInputPlanNode; +import org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator; +import org.junit.Test; + +@SuppressWarnings("serial") +public class UnionBetweenDynamicAndStaticPathTest extends CompilerTestBase { + + @Test + public void testUnionStaticFirst() { + try { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet input1 = env.generateSequence(1, 10); + DataSet input2 = env.generateSequence(1, 10); + + IterativeDataSet iteration = input1.iterate(10); + + DataSet result = iteration.closeWith( + input2.union(input2).union(iteration.union(iteration))); + + result.print(); + result.print(); + + Plan p = env.createProgramPlan(); + OptimizedPlan op = compileNoStats(p); + + assertEquals(2, op.getDataSinks().size()); + + BulkIterationPlanNode iterPlan = (BulkIterationPlanNode) op.getDataSinks().iterator().next().getInput().getSource(); + + SingleInputPlanNode noopNode = (SingleInputPlanNode) iterPlan.getRootOfStepFunction(); + BinaryUnionPlanNode mixedUnion = (BinaryUnionPlanNode) noopNode.getInput().getSource(); + NAryUnionPlanNode staticUnion = (NAryUnionPlanNode) mixedUnion.getInput1().getSource(); + NAryUnionPlanNode dynamicUnion = (NAryUnionPlanNode) mixedUnion.getInput2().getSource(); + + assertTrue(mixedUnion.unionsStaticAndDynamicPath()); + assertFalse(mixedUnion.getInput1().isOnDynamicPath()); + assertTrue(mixedUnion.getInput2().isOnDynamicPath()); + assertTrue(mixedUnion.getInput1().getTempMode().isCached()); + + for (Channel c : staticUnion.getInputs()) { + assertFalse(c.isOnDynamicPath()); + } + for (Channel c : dynamicUnion.getInputs()) { + assertTrue(c.isOnDynamicPath()); + } + + assertEquals(0.5, iterPlan.getRelativeMemoryPerSubTask(), 0.0); + assertEquals(0.5, mixedUnion.getInput1().getRelativeTempMemory(), 0.0); + assertEquals(0.0, mixedUnion.getInput2().getRelativeTempMemory(), 0.0); + + new NepheleJobGraphGenerator().compileJobGraph(op); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testUnionStaticSecond() { + try { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet input1 = env.generateSequence(1, 10); + DataSet input2 = env.generateSequence(1, 10); + + IterativeDataSet iteration = input1.iterate(10); + + DataSet iterResult = iteration + .closeWith(iteration.union(iteration).union(input2.union(input2))); + + iterResult.print(); + iterResult.print(); + + + Plan p = env.createProgramPlan(); + OptimizedPlan op = compileNoStats(p); + + assertEquals(2, op.getDataSinks().size()); + + BulkIterationPlanNode iterPlan = (BulkIterationPlanNode) op.getDataSinks().iterator().next().getInput().getSource(); + + SingleInputPlanNode noopNode = (SingleInputPlanNode) iterPlan.getRootOfStepFunction(); + BinaryUnionPlanNode mixedUnion = (BinaryUnionPlanNode) noopNode.getInput().getSource(); + NAryUnionPlanNode staticUnion = (NAryUnionPlanNode) mixedUnion.getInput1().getSource(); + NAryUnionPlanNode dynamicUnion = (NAryUnionPlanNode) mixedUnion.getInput2().getSource(); + + assertTrue(mixedUnion.unionsStaticAndDynamicPath()); + assertFalse(mixedUnion.getInput1().isOnDynamicPath()); + assertTrue(mixedUnion.getInput2().isOnDynamicPath()); + assertTrue(mixedUnion.getInput1().getTempMode().isCached()); + + assertEquals(0.5, iterPlan.getRelativeMemoryPerSubTask(), 0.0); + assertEquals(0.5, mixedUnion.getInput1().getRelativeTempMemory(), 0.0); + assertEquals(0.0, mixedUnion.getInput2().getRelativeTempMemory(), 0.0); + + for (Channel c : staticUnion.getInputs()) { + assertFalse(c.isOnDynamicPath()); + } + for (Channel c : dynamicUnion.getInputs()) { + assertTrue(c.isOnDynamicPath()); + } + + new NepheleJobGraphGenerator().compileJobGraph(op); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } +} diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/UnionReplacementTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/UnionReplacementTest.java index 0ce6468cc24..5cdac1b6c6e 100644 --- a/flink-compiler/src/test/java/org/apache/flink/compiler/UnionReplacementTest.java +++ b/flink-compiler/src/test/java/org/apache/flink/compiler/UnionReplacementTest.java @@ -21,7 +21,6 @@ package org.apache.flink.compiler; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.common.Plan; -import org.apache.flink.compiler.CompilerException; import org.apache.flink.compiler.plan.OptimizedPlan; import org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator; import org.junit.Test; @@ -32,24 +31,25 @@ import static org.junit.Assert.fail; public class UnionReplacementTest extends CompilerTestBase { @Test - public void testUnionReplacement(){ - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - DataSet input1 = env.fromElements("test1"); - DataSet input2 = env.fromElements("test2"); - - DataSet union = input1.union(input2); - - union.print(); - union.print(); - - Plan plan = env.createProgramPlan(); - try{ - OptimizedPlan oPlan = this.compileNoStats(plan); + public void testUnionReplacement() { + try { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet input1 = env.fromElements("test1"); + DataSet input2 = env.fromElements("test2"); + + DataSet union = input1.union(input2); + + union.print(); + union.print(); + + Plan plan = env.createProgramPlan(); + OptimizedPlan oPlan = compileNoStats(plan); NepheleJobGraphGenerator jobGen = new NepheleJobGraphGenerator(); jobGen.compileJobGraph(oPlan); - }catch(CompilerException co){ - co.printStackTrace(); - fail("The Pact compiler is unable to compile this plan correctly."); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); } } } diff --git a/flink-compiler/src/test/resources/log4j.properties b/flink-compiler/src/test/resources/log4j.properties new file mode 100644 index 00000000000..fa3f9373077 --- /dev/null +++ b/flink-compiler/src/test/resources/log4j.properties @@ -0,0 +1,27 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +# This file ensures that tests executed from the IDE show log output + +log4j.rootLogger=INFO, console + +# Log all infos in the given file +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.target = System.err +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n \ No newline at end of file diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DriverStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DriverStrategy.java index a133c6cd1a6..ae9b4744904 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DriverStrategy.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DriverStrategy.java @@ -16,7 +16,6 @@ * limitations under the License. */ - package org.apache.flink.runtime.operators; import static org.apache.flink.runtime.operators.DamBehavior.FULL_DAM; @@ -93,9 +92,9 @@ public enum DriverStrategy { NESTEDLOOP_STREAMED_OUTER_SECOND(CrossDriver.class, null, FULL_DAM, PIPELINED, 0), // union utility op. unions happen implicitly on the network layer (in the readers) when bundeling streams - UNION(null, null, FULL_DAM, FULL_DAM, 0); + UNION(null, null, PIPELINED, PIPELINED, 0), // explicit binary union between a streamed and a cached input -// UNION_WITH_CACHED(UnionWithTempOperator.class, null, FULL_DAM, PIPELINED, false); + UNION_WITH_CACHED(UnionWithTempOperator.class, null, FULL_DAM, PIPELINED, 0); // -------------------------------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/UnionWithTempOperator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/UnionWithTempOperator.java index 515b1e3c2c5..d8437a9dc23 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/UnionWithTempOperator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/UnionWithTempOperator.java @@ -16,66 +16,68 @@ * limitations under the License. */ - package org.apache.flink.runtime.operators; -//public class UnionWithTempOperator implements PactDriver { -// -// private PactTaskContext taskContext; -// -// private volatile boolean running; -// -// -// @Override -// public void setup(PactTaskContext context) { -// this.taskContext = context; -// this.running = true; -// } -// -// @Override -// public int getNumberOfInputs() { -// return 2; -// } -// -// @Override -// public Class getStubType() { -// return Function.class; -// } -// -// @Override -// public boolean requiresComparatorOnInput() { -// return false; -// } -// -// @Override -// public void prepare() {} -// -// @Override -// public void run() throws Exception { -// -// final int tempedInput = 0; -// final int streamedInput = 1; -// -// final MutableObjectIterator cache = this.taskContext.getInput(tempedInput); -// final MutableObjectIterator input = this.taskContext.getInput(streamedInput); -// -// final Collector output = this.taskContext.getOutputCollector(); -// -// T record = this.taskContext.getInputSerializer(streamedInput).createInstance(); -// -// while (this.running && ((record = input.next(record)) != null)) { -// output.collect(record); -// } -// while (this.running && ((record = cache.next(record)) != null)) { -// output.collect(record); -// } -// } -// -// @Override -// public void cleanup() {} -// -// @Override -// public void cancel() { -// this.running = false; -// } -//} +import org.apache.flink.api.common.functions.Function; +import org.apache.flink.util.Collector; +import org.apache.flink.util.MutableObjectIterator; + +public class UnionWithTempOperator implements PactDriver { + + private static final int CACHED_INPUT = 0; + private static final int STREAMED_INPUT = 1; + + private PactTaskContext taskContext; + + private volatile boolean running; + + + @Override + public void setup(PactTaskContext context) { + this.taskContext = context; + this.running = true; + } + + @Override + public int getNumberOfInputs() { + return 2; + } + + @Override + public int getNumberOfDriverComparators() { + return 0; + } + + @Override + public Class getStubType() { + return null; // no UDF + } + + @Override + public void prepare() {} + + @Override + public void run() throws Exception { + + final Collector output = this.taskContext.getOutputCollector(); + T record = this.taskContext.getInputSerializer(STREAMED_INPUT).getSerializer().createInstance(); + + final MutableObjectIterator input = this.taskContext.getInput(STREAMED_INPUT); + while (this.running && ((record = input.next(record)) != null)) { + output.collect(record); + } + + final MutableObjectIterator cache = this.taskContext.getInput(CACHED_INPUT); + while (this.running && ((record = cache.next(record)) != null)) { + output.collect(record); + } + } + + @Override + public void cleanup() {} + + @Override + public void cancel() { + this.running = false; + } +} diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/UnionStaticDynamicIterationITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/UnionStaticDynamicIterationITCase.java new file mode 100644 index 00000000000..fa8643f27b4 --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/UnionStaticDynamicIterationITCase.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.iterative; + +import static org.junit.Assert.*; + +import java.util.ArrayList; + +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.io.LocalCollectionOutputFormat; +import org.apache.flink.api.java.operators.IterativeDataSet; +import org.apache.flink.test.util.JavaProgramTestBase; + +public class UnionStaticDynamicIterationITCase extends JavaProgramTestBase { + + private final ArrayList result = new ArrayList(); + + @Override + protected void testProgram() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet inputStatic = env.generateSequence(1, 4); + DataSet inputIteration = env.generateSequence(1, 4); + + IterativeDataSet iteration = inputIteration.iterate(3); + + DataSet result = iteration.closeWith(inputStatic.union(inputStatic).union(iteration.union(iteration))); + + result.output(new LocalCollectionOutputFormat(this.result)); + + env.execute(); + } + + @Override + protected void postSubmit() throws Exception { + assertEquals(88, result.size()); + } +} diff --git a/flink-tests/src/test/resources/log4j.properties b/flink-tests/src/test/resources/log4j.properties new file mode 100644 index 00000000000..6bf344a98aa --- /dev/null +++ b/flink-tests/src/test/resources/log4j.properties @@ -0,0 +1,27 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +# This file ensures that tests executed from the IDE show log output + +log4j.rootLogger=INFO, console + +# Log all infos in the given file +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.target = System.err +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n \ No newline at end of file -- GitLab