提交 57615aaa 编写于 作者: F Fabian Hueske 提交者: Stephan Ewen

[FLINK-785] Fixed ObjectReuseITCase

This closes #370
上级 a3a7350d
...@@ -43,18 +43,26 @@ import java.util.List; ...@@ -43,18 +43,26 @@ import java.util.List;
@RunWith(Parameterized.class) @RunWith(Parameterized.class)
public class ObjectReuseITCase extends JavaProgramTestBase { public class ObjectReuseITCase extends JavaProgramTestBase {
private static int NUM_PROGRAMS = 3; private static int NUM_PROGRAMS = 4;
private int curProgId = config.getInteger("ProgramId", -1); private int curProgId = config.getInteger("ProgramId", -1);
private String resultPath; private String resultPath;
private String expectedResult; private String expectedResult;
private static String inReducePath;
private static String inGroupReducePath;
private String IN_REDUCE = "a,1\na,2\na,3\na,4\na,50\n";
private String IN_GROUP_REDUCE = "a,1\na,2\na,3\na,4\na,5\n";
public ObjectReuseITCase(Configuration config) { public ObjectReuseITCase(Configuration config) {
super(config); super(config);
} }
@Override @Override
protected void preSubmit() throws Exception { protected void preSubmit() throws Exception {
inReducePath = createTempFile("in_reduce.txt", IN_REDUCE);
inGroupReducePath = createTempFile("in_group_reduce.txt", IN_GROUP_REDUCE);
resultPath = getTempDirPath("result"); resultPath = getTempDirPath("result");
} }
...@@ -100,13 +108,7 @@ public class ObjectReuseITCase extends JavaProgramTestBase { ...@@ -100,13 +108,7 @@ public class ObjectReuseITCase extends JavaProgramTestBase {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.getConfig().enableObjectReuse(); env.getConfig().enableObjectReuse();
DataSet<Tuple2<String, Integer>> input = env.fromElements( DataSet<Tuple2<String, Integer>> input = env.readCsvFile(inReducePath).types(String.class, Integer.class).setParallelism(1);
new Tuple2<String, Integer>("a", 1),
new Tuple2<String, Integer>("a", 2),
new Tuple2<String, Integer>("a", 3),
new Tuple2<String, Integer>("a", 4),
new Tuple2<String, Integer>("a", 50));
DataSet<Tuple2<String, Integer>> result = input.groupBy(0).reduce(new ReduceFunction<Tuple2<String, Integer>>() { DataSet<Tuple2<String, Integer>> result = input.groupBy(0).reduce(new ReduceFunction<Tuple2<String, Integer>>() {
@Override @Override
...@@ -131,24 +133,20 @@ public class ObjectReuseITCase extends JavaProgramTestBase { ...@@ -131,24 +133,20 @@ public class ObjectReuseITCase extends JavaProgramTestBase {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.getConfig().enableObjectReuse(); env.getConfig().enableObjectReuse();
DataSet<Tuple2<String, Integer>> input = env.fromElements( DataSet<Tuple2<String, Integer>> input = env.readCsvFile(inReducePath).types(String.class, Integer.class).setParallelism(1);
new Tuple2<String, Integer>("a", 1),
new Tuple2<String, Integer>("a", 2),
new Tuple2<String, Integer>("a", 3),
new Tuple2<String, Integer>("a", 4),
new Tuple2<String, Integer>("a", 50));
DataSet<Tuple2<String, Integer>> result = input.reduce(new ReduceFunction<Tuple2<String, Integer>>() { DataSet<Tuple2<String, Integer>> result = input
.reduce(new ReduceFunction<Tuple2<String, Integer>>() {
@Override @Override
public Tuple2<String, Integer> reduce( public Tuple2<String, Integer> reduce(
Tuple2<String, Integer> value1, Tuple2<String, Integer> value1,
Tuple2<String, Integer> value2) throws Exception { Tuple2<String, Integer> value2) throws Exception {
value2.f1 += value1.f1; value2.f1 += value1.f1;
return value2; return value2;
} }
}); });
result.writeAsCsv(resultPath); result.writeAsCsv(resultPath);
env.execute(); env.execute();
...@@ -163,12 +161,7 @@ public class ObjectReuseITCase extends JavaProgramTestBase { ...@@ -163,12 +161,7 @@ public class ObjectReuseITCase extends JavaProgramTestBase {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.getConfig().enableObjectReuse(); env.getConfig().enableObjectReuse();
DataSet<Tuple2<String, Integer>> input = env.fromElements( DataSet<Tuple2<String, Integer>> input = env.readCsvFile(inGroupReducePath).types(String.class, Integer.class).setParallelism(1);
new Tuple2<String, Integer>("a", 1),
new Tuple2<String, Integer>("a", 2),
new Tuple2<String, Integer>("a", 3),
new Tuple2<String, Integer>("a", 4),
new Tuple2<String, Integer>("a", 5));
DataSet<Tuple2<String, Integer>> result = input.reduceGroup(new GroupReduceFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() { DataSet<Tuple2<String, Integer>> result = input.reduceGroup(new GroupReduceFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {
...@@ -203,12 +196,7 @@ public class ObjectReuseITCase extends JavaProgramTestBase { ...@@ -203,12 +196,7 @@ public class ObjectReuseITCase extends JavaProgramTestBase {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.getConfig().enableObjectReuse(); env.getConfig().enableObjectReuse();
DataSet<Tuple2<String, Integer>> input = env.fromElements( DataSet<Tuple2<String, Integer>> input = env.readCsvFile(inGroupReducePath).types(String.class, Integer.class).setParallelism(1);
new Tuple2<String, Integer>("a", 1),
new Tuple2<String, Integer>("a", 2),
new Tuple2<String, Integer>("a", 3),
new Tuple2<String, Integer>("a", 4),
new Tuple2<String, Integer>("a", 5));
DataSet<Tuple2<String, Integer>> result = input.reduceGroup(new GroupReduceFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() { DataSet<Tuple2<String, Integer>> result = input.reduceGroup(new GroupReduceFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册