diff --git a/flink-tests/src/test/java/org/apache/flink/test/operators/ObjectReuseITCase.java b/flink-tests/src/test/java/org/apache/flink/test/operators/ObjectReuseITCase.java index fa1d58a5011176ad68a807934078c877007afd3f..faf6de5c8d8b2dd47a8341be3c2fc5211e8d4e48 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/operators/ObjectReuseITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/operators/ObjectReuseITCase.java @@ -43,18 +43,26 @@ import java.util.List; @RunWith(Parameterized.class) public class ObjectReuseITCase extends JavaProgramTestBase { - private static int NUM_PROGRAMS = 3; + private static int NUM_PROGRAMS = 4; private int curProgId = config.getInteger("ProgramId", -1); private String resultPath; private String expectedResult; + private static String inReducePath; + private static String inGroupReducePath; + + private String IN_REDUCE = "a,1\na,2\na,3\na,4\na,50\n"; + private String IN_GROUP_REDUCE = "a,1\na,2\na,3\na,4\na,5\n"; + public ObjectReuseITCase(Configuration config) { super(config); } @Override protected void preSubmit() throws Exception { + inReducePath = createTempFile("in_reduce.txt", IN_REDUCE); + inGroupReducePath = createTempFile("in_group_reduce.txt", IN_GROUP_REDUCE); resultPath = getTempDirPath("result"); } @@ -100,13 +108,7 @@ public class ObjectReuseITCase extends JavaProgramTestBase { final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); env.getConfig().enableObjectReuse(); - DataSet> input = env.fromElements( - new Tuple2("a", 1), - new Tuple2("a", 2), - new Tuple2("a", 3), - new Tuple2("a", 4), - new Tuple2("a", 50)); - + DataSet> input = env.readCsvFile(inReducePath).types(String.class, Integer.class).setParallelism(1); DataSet> result = input.groupBy(0).reduce(new ReduceFunction>() { @Override @@ -131,24 +133,20 @@ public class ObjectReuseITCase extends JavaProgramTestBase { final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); env.getConfig().enableObjectReuse(); - DataSet> input = env.fromElements( - new Tuple2("a", 1), - new Tuple2("a", 2), - new Tuple2("a", 3), - new Tuple2("a", 4), - new Tuple2("a", 50)); + DataSet> input = env.readCsvFile(inReducePath).types(String.class, Integer.class).setParallelism(1); - DataSet> result = input.reduce(new ReduceFunction>() { + DataSet> result = input + .reduce(new ReduceFunction>() { - @Override - public Tuple2 reduce( - Tuple2 value1, - Tuple2 value2) throws Exception { - value2.f1 += value1.f1; - return value2; - } + @Override + public Tuple2 reduce( + Tuple2 value1, + Tuple2 value2) throws Exception { + value2.f1 += value1.f1; + return value2; + } - }); + }); result.writeAsCsv(resultPath); env.execute(); @@ -163,12 +161,7 @@ public class ObjectReuseITCase extends JavaProgramTestBase { final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); env.getConfig().enableObjectReuse(); - DataSet> input = env.fromElements( - new Tuple2("a", 1), - new Tuple2("a", 2), - new Tuple2("a", 3), - new Tuple2("a", 4), - new Tuple2("a", 5)); + DataSet> input = env.readCsvFile(inGroupReducePath).types(String.class, Integer.class).setParallelism(1); DataSet> result = input.reduceGroup(new GroupReduceFunction, Tuple2>() { @@ -203,12 +196,7 @@ public class ObjectReuseITCase extends JavaProgramTestBase { final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); env.getConfig().enableObjectReuse(); - DataSet> input = env.fromElements( - new Tuple2("a", 1), - new Tuple2("a", 2), - new Tuple2("a", 3), - new Tuple2("a", 4), - new Tuple2("a", 5)); + DataSet> input = env.readCsvFile(inGroupReducePath).types(String.class, Integer.class).setParallelism(1); DataSet> result = input.reduceGroup(new GroupReduceFunction, Tuple2>() {