From 65bf092da77ca0d416a3abbcac21b641cf038101 Mon Sep 17 00:00:00 2001 From: Stephan Ewen Date: Fri, 3 Oct 2014 16:42:12 +0200 Subject: [PATCH] [FLINK-1110] Adjust collection based runtime and tests for classloaders in runtime context --- .../api/common/operators/CollectionExecutor.java | 16 ++++++++++------ .../base/FlatMapOperatorCollectionTest.java | 2 +- .../operators/base/JoinOperatorBaseTest.java | 4 ++-- .../common/operators/base/MapOperatorTest.java | 4 ++-- .../operators/base/PartitionMapOperatorTest.java | 4 ++-- .../base/CoGroupOperatorCollectionTest.java | 2 +- .../operators/base/GroupReduceOperatorTest.java | 4 ++-- .../operators/base/JoinOperatorBaseTest.java | 4 ++-- .../operators/base/ReduceOperatorTest.java | 4 ++-- 9 files changed, 24 insertions(+), 20 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java index d204491041f..ac8e1f3eb22 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java @@ -66,6 +66,8 @@ public class CollectionExecutor { private final Map> aggregators; + private final ClassLoader classLoader; + private final boolean mutableObjectSafeMode; // -------------------------------------------------------------------------------------------- @@ -81,6 +83,8 @@ public class CollectionExecutor { this.accumulators = new HashMap>(); this.previousAggregates = new HashMap(); this.aggregators = new HashMap>(); + + this.classLoader = getClass().getClassLoader(); } // -------------------------------------------------------------------------------------------- @@ -181,8 +185,8 @@ public class CollectionExecutor { // build the runtime context and compute broadcast variables, if necessary RuntimeUDFContext ctx; if (RichFunction.class.isAssignableFrom(typedOp.getUserCodeWrapper().getUserCodeClass())) { - ctx = superStep == 0 ? new RuntimeUDFContext(operator.getName(), 1, 0) : - new IterationRuntimeUDFContext(operator.getName(), 1, 0, superStep); + ctx = superStep == 0 ? new RuntimeUDFContext(operator.getName(), 1, 0, getClass().getClassLoader()) : + new IterationRuntimeUDFContext(operator.getName(), 1, 0, superStep, classLoader); for (Map.Entry> bcInputs : operator.getBroadcastInputs().entrySet()) { List bcData = execute(bcInputs.getValue()); @@ -223,8 +227,8 @@ public class CollectionExecutor { // build the runtime context and compute broadcast variables, if necessary RuntimeUDFContext ctx; if (RichFunction.class.isAssignableFrom(typedOp.getUserCodeWrapper().getUserCodeClass())) { - ctx = superStep == 0 ? new RuntimeUDFContext(operator.getName(), 1, 0) : - new IterationRuntimeUDFContext(operator.getName(), 1, 0, superStep); + ctx = superStep == 0 ? new RuntimeUDFContext(operator.getName(), 1, 0, classLoader) : + new IterationRuntimeUDFContext(operator.getName(), 1, 0, superStep, classLoader); for (Map.Entry> bcInputs : operator.getBroadcastInputs().entrySet()) { List bcData = execute(bcInputs.getValue()); @@ -478,8 +482,8 @@ public class CollectionExecutor { private final int superstep; - public IterationRuntimeUDFContext(String name, int numParallelSubtasks, int subtaskIndex, int superstep) { - super(name, numParallelSubtasks, subtaskIndex); + public IterationRuntimeUDFContext(String name, int numParallelSubtasks, int subtaskIndex, int superstep, ClassLoader classloader) { + super(name, numParallelSubtasks, subtaskIndex, classloader); this.superstep = superstep; } diff --git a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/FlatMapOperatorCollectionTest.java b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/FlatMapOperatorCollectionTest.java index 74dc889d678..9a0a2b5a051 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/FlatMapOperatorCollectionTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/FlatMapOperatorCollectionTest.java @@ -65,7 +65,7 @@ public class FlatMapOperatorCollectionTest implements Serializable { private void testExecuteOnCollection(FlatMapFunction udf, List input, boolean mutableSafe) throws Exception { // run on collections final List result = getTestFlatMapOperator(udf) - .executeOnCollections(input, new RuntimeUDFContext("Test UDF", 4, 0), mutableSafe); + .executeOnCollections(input, new RuntimeUDFContext("Test UDF", 4, 0, null), mutableSafe); Assert.assertEquals(input.size(), result.size()); Assert.assertEquals(input, result); diff --git a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/JoinOperatorBaseTest.java b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/JoinOperatorBaseTest.java index 8834989f9a3..0ab8e72a136 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/JoinOperatorBaseTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/JoinOperatorBaseTest.java @@ -110,8 +110,8 @@ public class JoinOperatorBaseTest implements Serializable { try { - List resultSafe = base.executeOnCollections(inputData1, inputData2, new RuntimeUDFContext(taskName, 1, 0), true); - List resultRegular = base.executeOnCollections(inputData1, inputData2, new RuntimeUDFContext(taskName, 1, 0), false); + List resultSafe = base.executeOnCollections(inputData1, inputData2, new RuntimeUDFContext(taskName, 1, 0, null), true); + List resultRegular = base.executeOnCollections(inputData1, inputData2, new RuntimeUDFContext(taskName, 1, 0, null), false); assertEquals(expected, resultSafe); assertEquals(expected, resultRegular); diff --git a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/MapOperatorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/MapOperatorTest.java index 82778c575b7..1a742b6c18b 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/MapOperatorTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/MapOperatorTest.java @@ -97,8 +97,8 @@ public class MapOperatorTest implements java.io.Serializable { parser, new UnaryOperatorInformation(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO), taskName); List input = new ArrayList(asList("1", "2", "3", "4", "5", "6")); - List resultMutableSafe = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0), true); - List resultRegular = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0), false); + List resultMutableSafe = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0, null), true); + List resultRegular = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0, null), false); assertEquals(asList(1, 2, 3, 4, 5, 6), resultMutableSafe); assertEquals(asList(1, 2, 3, 4, 5, 6), resultRegular); diff --git a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/PartitionMapOperatorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/PartitionMapOperatorTest.java index 1c17fdee632..dadd1caf8e8 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/PartitionMapOperatorTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/PartitionMapOperatorTest.java @@ -75,8 +75,8 @@ public class PartitionMapOperatorTest implements java.io.Serializable { List input = new ArrayList(asList("1", "2", "3", "4", "5", "6")); - List resultMutableSafe = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0), true); - List resultRegular = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0), false); + List resultMutableSafe = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0, null), true); + List resultRegular = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0, null), false); assertEquals(asList(1, 2, 3, 4, 5, 6), resultMutableSafe); assertEquals(asList(1, 2, 3, 4, 5, 6), resultRegular); diff --git a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/CoGroupOperatorCollectionTest.java b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/CoGroupOperatorCollectionTest.java index 053b8e2c5cd..51d4b0e1e7a 100644 --- a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/CoGroupOperatorCollectionTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/CoGroupOperatorCollectionTest.java @@ -66,7 +66,7 @@ public class CoGroupOperatorCollectionTest implements Serializable { .build() ); - final RuntimeContext ctx = new RuntimeUDFContext("Test UDF", 4, 0); + final RuntimeContext ctx = new RuntimeUDFContext("Test UDF", 4, 0, null); { SumCoGroup udf1 = new SumCoGroup(); diff --git a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorTest.java b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorTest.java index 5d1ca178a2d..cfca5aa1eac 100644 --- a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorTest.java @@ -155,8 +155,8 @@ public class GroupReduceOperatorTest implements java.io.Serializable { Integer>("foo", 3), new Tuple2("bar", 2), new Tuple2("bar", 4))); - List> resultMutableSafe = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0), true); - List> resultRegular = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0), false); + List> resultMutableSafe = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0, null), true); + List> resultRegular = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0, null), false); Set> resultSetMutableSafe = new HashSet>(resultMutableSafe); diff --git a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/JoinOperatorBaseTest.java b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/JoinOperatorBaseTest.java index d9abf149cce..b4ef54f8e3f 100644 --- a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/JoinOperatorBaseTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/JoinOperatorBaseTest.java @@ -101,8 +101,8 @@ public class JoinOperatorBaseTest implements Serializable { )); try { - List> resultSafe = base.executeOnCollections(inputData1, inputData2, new RuntimeUDFContext("op", 1, 0), true); - List> resultRegular = base.executeOnCollections(inputData1, inputData2, new RuntimeUDFContext("op", 1, 0), false); + List> resultSafe = base.executeOnCollections(inputData1, inputData2, new RuntimeUDFContext("op", 1, 0, null), true); + List> resultRegular = base.executeOnCollections(inputData1, inputData2, new RuntimeUDFContext("op", 1, 0, null), false); assertEquals(expected, new HashSet>(resultSafe)); assertEquals(expected, new HashSet>(resultRegular)); diff --git a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/ReduceOperatorTest.java b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/ReduceOperatorTest.java index 2baf57e2ad4..90bbe41d14b 100644 --- a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/ReduceOperatorTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/ReduceOperatorTest.java @@ -132,8 +132,8 @@ public class ReduceOperatorTest implements java.io.Serializable { Integer>("foo", 3), new Tuple2("bar", 2), new Tuple2("bar", 4))); - List> resultMutableSafe = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0), true); - List> resultRegular = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0), false); + List> resultMutableSafe = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0, null), true); + List> resultRegular = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0, null), false); Set> resultSetMutableSafe = new HashSet>(resultMutableSafe); Set> resultSetRegular = new HashSet>(resultRegular); -- GitLab