提交 78d3c61c 编写于 作者: S Stefan Richter 提交者: Ufuk Celebi

[FLINK-4078] [dataSet] Introduce closure cleaning in CoGroup.where() and equaltTo()

This closes #2116.
上级 6a1144e5
......@@ -455,7 +455,7 @@ public class CoGroupOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OU
*/
public <K> CoGroupOperatorSetsPredicate where(KeySelector<I1, K> keyExtractor) {
TypeInformation<K> keyType = TypeExtractor.getKeySelectorTypes(keyExtractor, input1.getType());
return new CoGroupOperatorSetsPredicate(new SelectorFunctionKeys<>(keyExtractor, input1.getType(), keyType));
return new CoGroupOperatorSetsPredicate(new SelectorFunctionKeys<>(input1.clean(keyExtractor), input1.getType(), keyType));
}
// ----------------------------------------------------------------------------------------
......@@ -520,7 +520,7 @@ public class CoGroupOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OU
*/
public <K> CoGroupOperatorWithoutFunction equalTo(KeySelector<I2, K> keyExtractor) {
TypeInformation<K> keyType = TypeExtractor.getKeySelectorTypes(keyExtractor, input2.getType());
return createCoGroupOperator(new SelectorFunctionKeys<>(keyExtractor, input2.getType(), keyType));
return createCoGroupOperator(new SelectorFunctionKeys<>(input1.clean(keyExtractor), input2.getType(), keyType));
}
/**
......
......@@ -18,6 +18,7 @@
package org.apache.flink.test.javaApiOperators;
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.distributions.DataDistribution;
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.functions.RichCoGroupFunction;
......@@ -308,7 +309,7 @@ public class CoGroupITCase extends MultipleProgramsTestBase {
}
@Test
public void testCoGroupWithMultipleKeyFieldsWithKeyExtractor() throws Exception {
public void testCoGroupWithMultipleKeyFieldsWithStaticClassKeyExtractor() throws Exception {
/*
* CoGroup with multiple key fields
*/
......@@ -334,6 +335,118 @@ public class CoGroupITCase extends MultipleProgramsTestBase {
compareResultAsTuples(result, expected);
}
@Test
public void testCoGroupWithMultipleKeyFieldsWithInnerClassKeyExtractorWithClosureCleaner() throws Exception {
/*
* CoGroup with multiple key fields, test working closure cleaner for inner classes
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds1 = CollectionDataSets.get5TupleDataSet(env);
DataSet<Tuple3<Integer, Long, String>> ds2 = CollectionDataSets.get3TupleDataSet(env);
DataSet<Tuple3<Integer, Long, String>> coGrouped = ds1.coGroup(ds2).
where(new KeySelector<Tuple5<Integer, Long, Integer, String, Long>,
Tuple2<Integer, Long>>() {
@Override
public Tuple2<Integer, Long> getKey(Tuple5<Integer, Long, Integer, String, Long> t) throws Exception {
return new Tuple2<Integer, Long>(t.f0, t.f4);
}
}).
equalTo(new KeySelector<Tuple3<Integer,Long,String>, Tuple2<Integer, Long>>() {
@Override
public Tuple2<Integer, Long> getKey(Tuple3<Integer,Long,String> t) {
return new Tuple2<Integer, Long>(t.f0, t.f1);
}
}).
with(new CoGroupFunction<Tuple5<Integer, Long, Integer, String, Long>, Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>>() {
@Override
public void coGroup(Iterable<Tuple5<Integer, Long, Integer, String, Long>> first,
Iterable<Tuple3<Integer, Long, String>> second,
Collector<Tuple3<Integer, Long, String>> out)
{
List<String> strs = new ArrayList<String>();
for (Tuple5<Integer, Long, Integer, String, Long> t : first) {
strs.add(t.f3);
}
for(Tuple3<Integer, Long, String> t : second) {
for(String s : strs) {
out.collect(new Tuple3<Integer, Long, String>(t.f0, t.f1, s));
}
}
}
});
List<Tuple3<Integer, Long, String>> result = coGrouped.collect();
String expected = "1,1,Hallo\n" +
"2,2,Hallo Welt\n" +
"3,2,Hallo Welt wie gehts?\n" +
"3,2,ABC\n" +
"5,3,HIJ\n" +
"5,3,IJK\n";
compareResultAsTuples(result, expected);
}
@Test
public void testCoGroupWithMultipleKeyFieldsWithInnerClassKeyExtractorWithoutClosureCleaner() throws Exception {
/*
* CoGroup with multiple key fields, test that disabling closure cleaner leads to an exception when using inner
* classes.
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.getConfig().disableClosureCleaner();
DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds1 = CollectionDataSets.get5TupleDataSet(env);
DataSet<Tuple3<Integer, Long, String>> ds2 = CollectionDataSets.get3TupleDataSet(env);
boolean correctExceptionTriggered = false;
try {
DataSet<Tuple3<Integer, Long, String>> coGrouped = ds1.coGroup(ds2).
where(new KeySelector<Tuple5<Integer, Long, Integer, String, Long>,
Tuple2<Integer, Long>>() {
@Override
public Tuple2<Integer, Long> getKey(Tuple5<Integer, Long, Integer, String, Long> t) throws Exception {
return new Tuple2<Integer, Long>(t.f0, t.f4);
}
}).
equalTo(new KeySelector<Tuple3<Integer, Long, String>, Tuple2<Integer, Long>>() {
@Override
public Tuple2<Integer, Long> getKey(Tuple3<Integer, Long, String> t) {
return new Tuple2<Integer, Long>(t.f0, t.f1);
}
}).
with(new CoGroupFunction<Tuple5<Integer, Long, Integer, String, Long>, Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>>() {
@Override
public void coGroup(Iterable<Tuple5<Integer, Long, Integer, String, Long>> first,
Iterable<Tuple3<Integer, Long, String>> second,
Collector<Tuple3<Integer, Long, String>> out) {
List<String> strs = new ArrayList<String>();
for (Tuple5<Integer, Long, Integer, String, Long> t : first) {
strs.add(t.f3);
}
for (Tuple3<Integer, Long, String> t : second) {
for (String s : strs) {
out.collect(new Tuple3<Integer, Long, String>(t.f0, t.f1, s));
}
}
}
});
} catch (InvalidProgramException ex) {
correctExceptionTriggered = (ex.getCause() instanceof java.io.NotSerializableException);
}
Assert.assertTrue(correctExceptionTriggered);
}
public static class KeySelector7 implements KeySelector<Tuple5<Integer,Long,Integer,String,Long>,
Tuple2<Integer, Long>> {
private static final long serialVersionUID = 1L;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册