提交 65bf092d 编写于 作者: S Stephan Ewen

[FLINK-1110] Adjust collection based runtime and tests for classloaders in runtime context

上级 ff5ddd50
......@@ -66,6 +66,8 @@ public class CollectionExecutor {
private final Map<String, Aggregator<?>> aggregators;
private final ClassLoader classLoader;
private final boolean mutableObjectSafeMode;
// --------------------------------------------------------------------------------------------
......@@ -81,6 +83,8 @@ public class CollectionExecutor {
this.accumulators = new HashMap<String, Accumulator<?,?>>();
this.previousAggregates = new HashMap<String, Value>();
this.aggregators = new HashMap<String, Aggregator<?>>();
this.classLoader = getClass().getClassLoader();
}
// --------------------------------------------------------------------------------------------
......@@ -181,8 +185,8 @@ public class CollectionExecutor {
// build the runtime context and compute broadcast variables, if necessary
RuntimeUDFContext ctx;
if (RichFunction.class.isAssignableFrom(typedOp.getUserCodeWrapper().getUserCodeClass())) {
ctx = superStep == 0 ? new RuntimeUDFContext(operator.getName(), 1, 0) :
new IterationRuntimeUDFContext(operator.getName(), 1, 0, superStep);
ctx = superStep == 0 ? new RuntimeUDFContext(operator.getName(), 1, 0, getClass().getClassLoader()) :
new IterationRuntimeUDFContext(operator.getName(), 1, 0, superStep, classLoader);
for (Map.Entry<String, Operator<?>> bcInputs : operator.getBroadcastInputs().entrySet()) {
List<?> bcData = execute(bcInputs.getValue());
......@@ -223,8 +227,8 @@ public class CollectionExecutor {
// build the runtime context and compute broadcast variables, if necessary
RuntimeUDFContext ctx;
if (RichFunction.class.isAssignableFrom(typedOp.getUserCodeWrapper().getUserCodeClass())) {
ctx = superStep == 0 ? new RuntimeUDFContext(operator.getName(), 1, 0) :
new IterationRuntimeUDFContext(operator.getName(), 1, 0, superStep);
ctx = superStep == 0 ? new RuntimeUDFContext(operator.getName(), 1, 0, classLoader) :
new IterationRuntimeUDFContext(operator.getName(), 1, 0, superStep, classLoader);
for (Map.Entry<String, Operator<?>> bcInputs : operator.getBroadcastInputs().entrySet()) {
List<?> bcData = execute(bcInputs.getValue());
......@@ -478,8 +482,8 @@ public class CollectionExecutor {
private final int superstep;
public IterationRuntimeUDFContext(String name, int numParallelSubtasks, int subtaskIndex, int superstep) {
super(name, numParallelSubtasks, subtaskIndex);
public IterationRuntimeUDFContext(String name, int numParallelSubtasks, int subtaskIndex, int superstep, ClassLoader classloader) {
super(name, numParallelSubtasks, subtaskIndex, classloader);
this.superstep = superstep;
}
......
......@@ -65,7 +65,7 @@ public class FlatMapOperatorCollectionTest implements Serializable {
private void testExecuteOnCollection(FlatMapFunction<String, String> udf, List<String> input, boolean mutableSafe) throws Exception {
// run on collections
final List<String> result = getTestFlatMapOperator(udf)
.executeOnCollections(input, new RuntimeUDFContext("Test UDF", 4, 0), mutableSafe);
.executeOnCollections(input, new RuntimeUDFContext("Test UDF", 4, 0, null), mutableSafe);
Assert.assertEquals(input.size(), result.size());
Assert.assertEquals(input, result);
......
......@@ -110,8 +110,8 @@ public class JoinOperatorBaseTest implements Serializable {
try {
List<Integer> resultSafe = base.executeOnCollections(inputData1, inputData2, new RuntimeUDFContext(taskName, 1, 0), true);
List<Integer> resultRegular = base.executeOnCollections(inputData1, inputData2, new RuntimeUDFContext(taskName, 1, 0), false);
List<Integer> resultSafe = base.executeOnCollections(inputData1, inputData2, new RuntimeUDFContext(taskName, 1, 0, null), true);
List<Integer> resultRegular = base.executeOnCollections(inputData1, inputData2, new RuntimeUDFContext(taskName, 1, 0, null), false);
assertEquals(expected, resultSafe);
assertEquals(expected, resultRegular);
......
......@@ -97,8 +97,8 @@ public class MapOperatorTest implements java.io.Serializable {
parser, new UnaryOperatorInformation<String, Integer>(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO), taskName);
List<String> input = new ArrayList<String>(asList("1", "2", "3", "4", "5", "6"));
List<Integer> resultMutableSafe = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0), true);
List<Integer> resultRegular = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0), false);
List<Integer> resultMutableSafe = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0, null), true);
List<Integer> resultRegular = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0, null), false);
assertEquals(asList(1, 2, 3, 4, 5, 6), resultMutableSafe);
assertEquals(asList(1, 2, 3, 4, 5, 6), resultRegular);
......
......@@ -75,8 +75,8 @@ public class PartitionMapOperatorTest implements java.io.Serializable {
List<String> input = new ArrayList<String>(asList("1", "2", "3", "4", "5", "6"));
List<Integer> resultMutableSafe = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0), true);
List<Integer> resultRegular = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0), false);
List<Integer> resultMutableSafe = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0, null), true);
List<Integer> resultRegular = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0, null), false);
assertEquals(asList(1, 2, 3, 4, 5, 6), resultMutableSafe);
assertEquals(asList(1, 2, 3, 4, 5, 6), resultRegular);
......
......@@ -66,7 +66,7 @@ public class CoGroupOperatorCollectionTest implements Serializable {
.build()
);
final RuntimeContext ctx = new RuntimeUDFContext("Test UDF", 4, 0);
final RuntimeContext ctx = new RuntimeUDFContext("Test UDF", 4, 0, null);
{
SumCoGroup udf1 = new SumCoGroup();
......
......@@ -155,8 +155,8 @@ public class GroupReduceOperatorTest implements java.io.Serializable {
Integer>("foo", 3), new Tuple2<String, Integer>("bar", 2), new Tuple2<String,
Integer>("bar", 4)));
List<Tuple2<String, Integer>> resultMutableSafe = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0), true);
List<Tuple2<String, Integer>> resultRegular = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0), false);
List<Tuple2<String, Integer>> resultMutableSafe = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0, null), true);
List<Tuple2<String, Integer>> resultRegular = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0, null), false);
Set<Tuple2<String, Integer>> resultSetMutableSafe = new HashSet<Tuple2<String, Integer>>(resultMutableSafe);
......
......@@ -101,8 +101,8 @@ public class JoinOperatorBaseTest implements Serializable {
));
try {
List<Tuple2<Double, String>> resultSafe = base.executeOnCollections(inputData1, inputData2, new RuntimeUDFContext("op", 1, 0), true);
List<Tuple2<Double, String>> resultRegular = base.executeOnCollections(inputData1, inputData2, new RuntimeUDFContext("op", 1, 0), false);
List<Tuple2<Double, String>> resultSafe = base.executeOnCollections(inputData1, inputData2, new RuntimeUDFContext("op", 1, 0, null), true);
List<Tuple2<Double, String>> resultRegular = base.executeOnCollections(inputData1, inputData2, new RuntimeUDFContext("op", 1, 0, null), false);
assertEquals(expected, new HashSet<Tuple2<Double, String>>(resultSafe));
assertEquals(expected, new HashSet<Tuple2<Double, String>>(resultRegular));
......
......@@ -132,8 +132,8 @@ public class ReduceOperatorTest implements java.io.Serializable {
Integer>("foo", 3), new Tuple2<String, Integer>("bar", 2), new Tuple2<String,
Integer>("bar", 4)));
List<Tuple2<String, Integer>> resultMutableSafe = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0), true);
List<Tuple2<String, Integer>> resultRegular = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0), false);
List<Tuple2<String, Integer>> resultMutableSafe = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0, null), true);
List<Tuple2<String, Integer>> resultRegular = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0, null), false);
Set<Tuple2<String, Integer>> resultSetMutableSafe = new HashSet<Tuple2<String, Integer>>(resultMutableSafe);
Set<Tuple2<String, Integer>> resultSetRegular = new HashSet<Tuple2<String, Integer>>(resultRegular);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册