diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/util/NoOpBinaryUdfOp.java b/flink-compiler/src/main/java/org/apache/flink/compiler/util/NoOpBinaryUdfOp.java index 1abaeac9abb0c610b3274f3f1bc521496c550e49..4b48ec711a34190f50fb80ea3c45c557cb94b533 100644 --- a/flink-compiler/src/main/java/org/apache/flink/compiler/util/NoOpBinaryUdfOp.java +++ b/flink-compiler/src/main/java/org/apache/flink/compiler/util/NoOpBinaryUdfOp.java @@ -43,7 +43,7 @@ public class NoOpBinaryUdfOp extends DualInputOperator executeOnCollections(List inputData1, List inputData2, RuntimeContext runtimeContext) { + protected List executeOnCollections(List inputData1, List inputData2, RuntimeContext runtimeContext, boolean mutables) { throw new UnsupportedOperationException(); } } diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/util/NoOpUnaryUdfOp.java b/flink-compiler/src/main/java/org/apache/flink/compiler/util/NoOpUnaryUdfOp.java index 8f88168799bc8ef8802633c79cd0ef9f30e0cc4e..474d3a4b3a255ceba59698fca39358a3c36f8f36 100644 --- a/flink-compiler/src/main/java/org/apache/flink/compiler/util/NoOpUnaryUdfOp.java +++ b/flink-compiler/src/main/java/org/apache/flink/compiler/util/NoOpUnaryUdfOp.java @@ -54,7 +54,7 @@ public class NoOpUnaryUdfOp extends SingleInputOperator executeOnCollections(List inputData, RuntimeContext runtimeContext) { + protected List executeOnCollections(List inputData, RuntimeContext runtimeContext, boolean mutables) { return inputData; } } diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/CopyingIterator.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/CopyingIterator.java new file mode 100644 index 0000000000000000000000000000000000000000..b7a8dc9bc2c4e0c1cd1d192827ff84217e0e9174 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/CopyingIterator.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.functions.util; + +import java.util.Iterator; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.util.TraversableOnceException; + +public class CopyingIterator implements Iterator, Iterable { + + private final Iterator source; + private final TypeSerializer serializer; + + private boolean available = true; + + public CopyingIterator(Iterator source, TypeSerializer serializer) { + this.source = source; + this.serializer = serializer; + } + + @Override + public Iterator iterator() { + if (available) { + available = false; + return this; + } else { + throw new TraversableOnceException(); + } + } + + @Override + public boolean hasNext() { + return source.hasNext(); + } + + @Override + public E next() { + E next = source.next(); + return serializer.copy(next); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } +} diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/CopyingListCollector.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/CopyingListCollector.java new file mode 100644 index 0000000000000000000000000000000000000000..862098126b8b4e46939d6d6ffe502637bed81526 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/CopyingListCollector.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.functions.util; + +import java.util.List; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.util.Collector; + +public class CopyingListCollector implements Collector { + + private final List list; + private final TypeSerializer serializer; + + public CopyingListCollector(List list, TypeSerializer serializer) { + this.list = list; + this.serializer = serializer; + } + + @Override + public void collect(T record) { + list.add(serializer.copy(record)); + } + + @Override + public void close() {} +} diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java index a748a5ed60d153f7ae168394e50058cc487a424a..5d30df423e84738322c9695713ada8ad74447cb6 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java @@ -52,13 +52,22 @@ import org.apache.flink.util.Visitor; */ public class CollectionExecutor { + private static final boolean DEFAULT_MUTABLE_OBJECT_SAFE_MODE = true; + private final Map, List> intermediateResults; private final Map> accumulators; + private final boolean mutableObjectSafeMode; + // -------------------------------------------------------------------------------------------- public CollectionExecutor() { + this(DEFAULT_MUTABLE_OBJECT_SAFE_MODE); + } + + public CollectionExecutor(boolean mutableObjectSafeMode) { + this.mutableObjectSafeMode = mutableObjectSafeMode; this.intermediateResults = new HashMap, List>(); this.accumulators = new HashMap>(); } @@ -172,7 +181,7 @@ public class CollectionExecutor { ctx = null; } - List result = typedOp.executeOnCollections(inputData, ctx); + List result = typedOp.executeOnCollections(inputData, ctx, mutableObjectSafeMode); if (ctx != null) { AccumulatorHelper.mergeInto(this.accumulators, ctx.getAllAccumulators()); @@ -214,7 +223,7 @@ public class CollectionExecutor { ctx = null; } - List result = typedOp.executeOnCollections(inputData1, inputData2, ctx); + List result = typedOp.executeOnCollections(inputData1, inputData2, ctx, mutableObjectSafeMode); if (ctx != null) { AccumulatorHelper.mergeInto(this.accumulators, ctx.getAllAccumulators()); diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/DualInputOperator.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/DualInputOperator.java index 2d281806f928f742f0bc7b41177f3eac94c42317..6325788d65ba868850600293b378311a8397ee1a 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/operators/DualInputOperator.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/DualInputOperator.java @@ -286,5 +286,5 @@ public abstract class DualInputOperator exte // -------------------------------------------------------------------------------------------- - protected abstract List executeOnCollections(List inputData1, List inputData2, RuntimeContext runtimeContext) throws Exception; + protected abstract List executeOnCollections(List inputData1, List inputData2, RuntimeContext runtimeContext, boolean mutableObjectSafeMode) throws Exception; } diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/SingleInputOperator.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/SingleInputOperator.java index fa923e7f7c2979832e84c32d0c195adb3423004b..f1bf2ad8d1ab9c855dd884272235c3199e64fe7e 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/operators/SingleInputOperator.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/SingleInputOperator.java @@ -209,5 +209,5 @@ public abstract class SingleInputOperator extends // -------------------------------------------------------------------------------------------- - protected abstract List executeOnCollections(List inputData, RuntimeContext runtimeContext) throws Exception; + protected abstract List executeOnCollections(List inputData, RuntimeContext runtimeContext, boolean mutableObjectSafeMode) throws Exception; } diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/Union.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/Union.java index f4b3c51766bc4d910ecbf3a3e73d80f280e65f76..fb8626d8f42daf041c6c27b3df5937fe45f6d2cb 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/operators/Union.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/Union.java @@ -48,7 +48,7 @@ public class Union extends DualInputOperator { } @Override - protected List executeOnCollections(List inputData1, List inputData2, RuntimeContext runtimeContext) { + protected List executeOnCollections(List inputData1, List inputData2, RuntimeContext runtimeContext, boolean mutableObjectSafeMode) { ArrayList result = new ArrayList(inputData1.size() + inputData2.size()); result.addAll(inputData1); result.addAll(inputData2); diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/BulkIterationBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/BulkIterationBase.java index 31b5191f0eb1ef640ea422caf059584a84e56c96..4fbf65e789e19ad9719a876127646acd2315744e 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/BulkIterationBase.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/BulkIterationBase.java @@ -303,7 +303,7 @@ public class BulkIterationBase extends SingleInputOperator executeOnCollections(List inputData, RuntimeContext runtimeContext) { + protected List executeOnCollections(List inputData, RuntimeContext runtimeContext, boolean mutableObjectSafeMode) { throw new UnsupportedOperationException(); } } diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CoGroupOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CoGroupOperatorBase.java index 4b6d639f42f62db1c36b76692cfe8292b56cfcb9..6aa3da0f02b0bc1e6cd80a6786794814b23f15b5 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CoGroupOperatorBase.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CoGroupOperatorBase.java @@ -21,6 +21,7 @@ package org.apache.flink.api.common.operators.base; import org.apache.flink.api.common.InvalidProgramException; import org.apache.flink.api.common.functions.CoGroupFunction; import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.functions.util.CopyingListCollector; import org.apache.flink.api.common.functions.util.FunctionUtils; import org.apache.flink.api.common.functions.util.ListCollector; import org.apache.flink.api.common.operators.BinaryOperatorInformation; @@ -35,6 +36,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.GenericPairComparator; import org.apache.flink.api.common.typeutils.TypeComparator; import org.apache.flink.api.common.typeutils.TypePairComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.util.Collector; import java.io.IOException; @@ -177,7 +179,7 @@ public class CoGroupOperatorBase executeOnCollections(List input1, List input2, RuntimeContext ctx) throws Exception { + protected List executeOnCollections(List input1, List input2, RuntimeContext ctx, boolean mutableObjectSafe) throws Exception { // -------------------------------------------------------------------- // Setup // -------------------------------------------------------------------- @@ -193,11 +195,15 @@ public class CoGroupOperatorBase inputSerializer1 = inputType1.createSerializer(); + final TypeSerializer inputSerializer2 = inputType2.createSerializer(); + final TypeComparator inputComparator1 = getTypeComparator(inputType1, inputKeys1, inputSortDirections1); final TypeComparator inputComparator2 = getTypeComparator(inputType2, inputKeys2, inputSortDirections2); CoGroupSortListIterator coGroupIterator = - new CoGroupSortListIterator(input1, inputComparator1, input2, inputComparator2); + new CoGroupSortListIterator(input1, inputComparator1, inputSerializer1, + input2, inputComparator2, inputSerializer2, mutableObjectSafe); // -------------------------------------------------------------------- // Run UDF @@ -208,7 +214,9 @@ public class CoGroupOperatorBase result = new ArrayList(); - Collector resultCollector = new ListCollector(result); + Collector resultCollector = mutableObjectSafe ? + new CopyingListCollector(result, getOperatorInfo().getOutputType().createSerializer()) : + new ListCollector(result); while (coGroupIterator.next()) { function.coGroup(coGroupIterator.getValues1(), coGroupIterator.getValues2(), resultCollector); @@ -247,13 +255,14 @@ public class CoGroupOperatorBase secondReturn; private CoGroupSortListIterator( - List input1, final TypeComparator inputComparator1, - List input2, final TypeComparator inputComparator2) { - + List input1, final TypeComparator inputComparator1, TypeSerializer serializer1, + List input2, final TypeComparator inputComparator2, TypeSerializer serializer2, + boolean copyElements) + { this.pairComparator = new GenericPairComparator(inputComparator1, inputComparator2); - this.iterator1 = new ListKeyGroupedIterator(input1, inputComparator1); - this.iterator2 = new ListKeyGroupedIterator(input2, inputComparator2); + this.iterator1 = new ListKeyGroupedIterator(input1, serializer1, inputComparator1, copyElements); + this.iterator2 = new ListKeyGroupedIterator(input2, serializer2, inputComparator2, copyElements); // ---------------------------------------------------------------- // Sort diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CollectorMapOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CollectorMapOperatorBase.java index c3e00bc0ffcf2bcebfab104d166512e28a41a867..8ad91c68783b511b88f52f384fc816056d193eb2 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CollectorMapOperatorBase.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CollectorMapOperatorBase.java @@ -52,7 +52,7 @@ public class CollectorMapOperatorBase executeOnCollections(List inputData, RuntimeContext ctx) { + protected List executeOnCollections(List inputData, RuntimeContext ctx, boolean mutableObjectSafeMode) { throw new UnsupportedOperationException(); } } diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CrossOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CrossOperatorBase.java index c70460a8cebcb9f5f7160d0bbcf4f660a346eee5..8f4951888a15eac1b5ca8378037a157292014ca0 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CrossOperatorBase.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CrossOperatorBase.java @@ -29,6 +29,7 @@ import org.apache.flink.api.common.operators.DualInputOperator; import org.apache.flink.api.common.operators.util.UserCodeClassWrapper; import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper; import org.apache.flink.api.common.operators.util.UserCodeWrapper; +import org.apache.flink.api.common.typeutils.TypeSerializer; /** * @see org.apache.flink.api.common.functions.CrossFunction @@ -50,21 +51,37 @@ public class CrossOperatorBase executeOnCollections(List inputData1, List inputData2, RuntimeContext ctx) throws Exception { + protected List executeOnCollections(List inputData1, List inputData2, RuntimeContext ctx, boolean mutableObjectSafeMode) throws Exception { CrossFunction function = this.userFunction.getUserCodeObject(); FunctionUtils.setFunctionRuntimeContext(function, ctx); FunctionUtils.openFunction(function, this.parameters); ArrayList result = new ArrayList(inputData1.size() * inputData2.size()); - for (IN1 element1 : inputData1) { - for (IN2 element2 : inputData2) { - result.add(function.cross(element1, element2)); + + if (mutableObjectSafeMode) { + TypeSerializer inSerializer1 = getOperatorInfo().getFirstInputType().createSerializer(); + TypeSerializer inSerializer2 = getOperatorInfo().getSecondInputType().createSerializer(); + TypeSerializer outSerializer = getOperatorInfo().getOutputType().createSerializer(); + + for (IN1 element1 : inputData1) { + for (IN2 element2 : inputData2) { + IN1 copy1 = inSerializer1.copy(element1); + IN2 copy2 = inSerializer2.copy(element2); + OUT o = function.cross(copy1, copy2); + result.add(outSerializer.copy(o)); + } + } + } + else { + for (IN1 element1 : inputData1) { + for (IN2 element2 : inputData2) { + result.add(function.cross(element1, element2)); + } } } FunctionUtils.closeFunction(function); - return result; } diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/DeltaIterationBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/DeltaIterationBase.java index fddf6e798cbf7bc2b26fe8311bc25c2f718a3327..f945b1d4551588a3e982eb5486821d82839f6f01 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/DeltaIterationBase.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/DeltaIterationBase.java @@ -16,7 +16,6 @@ * limitations under the License. */ - package org.apache.flink.api.common.operators.base; import java.util.Collections; @@ -333,7 +332,7 @@ public class DeltaIterationBase extends DualInputOperator executeOnCollections(List inputData1, List inputData2, RuntimeContext runtimeContext) throws Exception { + protected List executeOnCollections(List inputData1, List inputData2, RuntimeContext runtimeContext, boolean mutableObjectSafeMode) { throw new UnsupportedOperationException(); } } diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/FileDataSinkBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/FileDataSinkBase.java index a36d44be43da3c76c4532dc9d5a47db27809b6c3..8f5c4e3eb17a92122cde1061b0f8b3afb18250d9 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/FileDataSinkBase.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/FileDataSinkBase.java @@ -16,7 +16,6 @@ * limitations under the License. */ - package org.apache.flink.api.common.operators.base; import org.apache.flink.api.common.io.FileOutputFormat; @@ -72,8 +71,7 @@ public class FileDataSinkBase extends GenericDataSinkBase { * * @return The path to which the output shall be written. */ - public String getFilePath() - { + public String getFilePath() { return this.filePath; } @@ -82,5 +80,4 @@ public class FileDataSinkBase extends GenericDataSinkBase { public String toString() { return this.filePath; } - } diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/FilterOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/FilterOperatorBase.java index 39f898450ef65821e984aac79d18699d0478075b..7f7add21080e05f266e282dcd29b09b90291e12a 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/FilterOperatorBase.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/FilterOperatorBase.java @@ -50,7 +50,7 @@ public class FilterOperatorBase> extends Sin } @Override - protected List executeOnCollections(List inputData, RuntimeContext ctx) throws Exception { + protected List executeOnCollections(List inputData, RuntimeContext ctx, boolean mutableObjectSafeMode) throws Exception { FlatMapFunction function = this.userFunction.getUserCodeObject(); FunctionUtils.openFunction(function, this.parameters); diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/FlatMapOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/FlatMapOperatorBase.java index 81f3bcf507ec4687b1c3c37951888936d394625a..8312a994a937fd42e12dd2940941742f09526e84 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/FlatMapOperatorBase.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/FlatMapOperatorBase.java @@ -20,6 +20,7 @@ package org.apache.flink.api.common.operators.base; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.functions.util.CopyingListCollector; import org.apache.flink.api.common.functions.util.FunctionUtils; import org.apache.flink.api.common.functions.util.ListCollector; import org.apache.flink.api.common.operators.SingleInputOperator; @@ -27,6 +28,7 @@ import org.apache.flink.api.common.operators.UnaryOperatorInformation; import org.apache.flink.api.common.operators.util.UserCodeClassWrapper; import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper; import org.apache.flink.api.common.operators.util.UserCodeWrapper; +import org.apache.flink.api.common.typeutils.TypeSerializer; import java.util.ArrayList; import java.util.List; @@ -51,17 +53,29 @@ public class FlatMapOperatorBase> e // ------------------------------------------------------------------------ @Override - protected List executeOnCollections(List input, RuntimeContext ctx) throws Exception { + protected List executeOnCollections(List input, RuntimeContext ctx, boolean mutableObjectSafeMode) throws Exception { FlatMapFunction function = userFunction.getUserCodeObject(); - + FunctionUtils.setFunctionRuntimeContext(function, ctx); FunctionUtils.openFunction(function, parameters); ArrayList result = new ArrayList(input.size()); - ListCollector resultCollector = new ListCollector(result); - - for (IN element : input) { - function.flatMap(element, resultCollector); + + if (mutableObjectSafeMode) { + TypeSerializer inSerializer = getOperatorInfo().getInputType().createSerializer(); + TypeSerializer outSerializer = getOperatorInfo().getOutputType().createSerializer(); + + CopyingListCollector resultCollector = new CopyingListCollector(result, outSerializer); + + for (IN element : input) { + IN inCopy = inSerializer.copy(element); + function.flatMap(inCopy, resultCollector); + } + } else { + ListCollector resultCollector = new ListCollector(result); + for (IN element : input) { + function.flatMap(element, resultCollector); + } } FunctionUtils.closeFunction(function); diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorBase.java index 6abd7b523770715cfe5b526fb2c151aae8115fc1..5d3b92dd3504320bdbd97e7f2e7d9e52f1ad0f35 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorBase.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorBase.java @@ -18,11 +18,11 @@ package org.apache.flink.api.common.operators.base; - import org.apache.flink.api.common.InvalidProgramException; import org.apache.flink.api.common.functions.FlatCombineFunction; import org.apache.flink.api.common.functions.GroupReduceFunction; import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.functions.util.CopyingListCollector; import org.apache.flink.api.common.functions.util.FunctionUtils; import org.apache.flink.api.common.functions.util.ListCollector; import org.apache.flink.api.common.operators.Ordering; @@ -35,6 +35,7 @@ import org.apache.flink.api.common.operators.util.UserCodeWrapper; import org.apache.flink.api.common.typeinfo.CompositeType; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; import java.util.ArrayList; import java.util.Collections; @@ -131,20 +132,21 @@ public class GroupReduceOperatorBase executeOnCollections(List inputData, RuntimeContext ctx) - throws Exception { + protected List executeOnCollections(List inputData, RuntimeContext ctx, boolean mutableObjectSafeMode) throws Exception { GroupReduceFunction function = this.userFunction.getUserCodeObject(); UnaryOperatorInformation operatorInfo = getOperatorInfo(); TypeInformation inputType = operatorInfo.getInputType(); if (!(inputType instanceof CompositeType)) { - throw new InvalidProgramException("Input type of groupReduce operation must be" + - " composite type."); + throw new InvalidProgramException("Input type of groupReduce operation must be a composite type."); } int[] inputColumns = getKeyColumns(0); boolean[] inputOrderings = new boolean[inputColumns.length]; + + final TypeSerializer inputSerializer = inputType.createSerializer(); + @SuppressWarnings("unchecked") final TypeComparator inputComparator = ((CompositeType) inputType).createComparator(inputColumns, inputOrderings); @@ -152,26 +154,34 @@ public class GroupReduceOperatorBase result = new ArrayList(inputData.size()); - ListCollector collector = new ListCollector(result); - Collections.sort(inputData, new Comparator() { @Override public int compare(IN o1, IN o2) { return inputComparator.compare(o2, o1); } }); - ListKeyGroupedIterator keyedIterator = - new ListKeyGroupedIterator(inputData, inputComparator); - - while (keyedIterator.nextKey()) { - function.reduce(keyedIterator.getValues(), collector); + + ListKeyGroupedIterator keyedIterator = new ListKeyGroupedIterator( + inputData, inputSerializer, inputComparator, mutableObjectSafeMode); + + ArrayList result = new ArrayList(); + + if (mutableObjectSafeMode) { + TypeSerializer outSerializer = getOperatorInfo().getOutputType().createSerializer(); + CopyingListCollector collector = new CopyingListCollector(result, outSerializer); + + while (keyedIterator.nextKey()) { + function.reduce(keyedIterator.getValues(), collector); + } + } + else { + ListCollector collector = new ListCollector(result); + while (keyedIterator.nextKey()) { + function.reduce(keyedIterator.getValues(), collector); + } } FunctionUtils.closeFunction(function); - return result; } - } diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/JoinOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/JoinOperatorBase.java index 34ede6529487ec73835136191be85415cca2c682..3d5cf725075e87e02e8beb0d8bed5b7aa78e2359 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/JoinOperatorBase.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/JoinOperatorBase.java @@ -16,11 +16,11 @@ * limitations under the License. */ - package org.apache.flink.api.common.operators.base; import org.apache.flink.api.common.functions.FlatJoinFunction; import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.functions.util.CopyingListCollector; import org.apache.flink.api.common.functions.util.FunctionUtils; import org.apache.flink.api.common.functions.util.ListCollector; import org.apache.flink.api.common.operators.BinaryOperatorInformation; @@ -34,6 +34,8 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.GenericPairComparator; import org.apache.flink.api.common.typeutils.TypeComparator; import org.apache.flink.api.common.typeutils.TypePairComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.util.Collector; import java.util.ArrayList; import java.util.Arrays; @@ -60,7 +62,7 @@ public class JoinOperatorBase executeOnCollections(List inputData1, List inputData2, RuntimeContext runtimeContext) throws Exception { + protected List executeOnCollections(List inputData1, List inputData2, RuntimeContext runtimeContext, boolean mutableObjectSafe) throws Exception { FlatJoinFunction function = userFunction.getUserCodeObject(); FunctionUtils.setFunctionRuntimeContext(function, runtimeContext); @@ -68,13 +70,18 @@ public class JoinOperatorBase leftInformation = getOperatorInfo().getFirstInputType(); TypeInformation rightInformation = getOperatorInfo().getSecondInputType(); - + TypeInformation outInformation = getOperatorInfo().getOutputType(); + + TypeSerializer leftSerializer = mutableObjectSafe ? leftInformation.createSerializer() : null; + TypeSerializer rightSerializer = mutableObjectSafe ? rightInformation.createSerializer() : null; + TypeComparator leftComparator; TypeComparator rightComparator; - if(leftInformation instanceof AtomicType){ + if (leftInformation instanceof AtomicType){ leftComparator = ((AtomicType) leftInformation).createComparator(true); - }else if(leftInformation instanceof CompositeType){ + } + else if(leftInformation instanceof CompositeType){ int[] keyPositions = getKeyColumns(0); boolean[] orders = new boolean[keyPositions.length]; Arrays.fill(orders, true); @@ -102,12 +109,13 @@ public class JoinOperatorBase result = new ArrayList(); - ListCollector collector = new ListCollector(result); + Collector collector = mutableObjectSafe ? new CopyingListCollector(result, outInformation.createSerializer()) + : new ListCollector(result); Map> probeTable = new HashMap>(); - //Build probe table - for(IN2 element: inputData2){ + //Build hash table + for (IN2 element: inputData2){ List list = probeTable.get(rightComparator.hash(element)); if(list == null){ list = new ArrayList(); @@ -118,15 +126,18 @@ public class JoinOperatorBase matchingHashes = probeTable.get(leftComparator.hash(left)); - pairComparator.setReference(left); - - if(matchingHashes != null){ - for(IN2 right: matchingHashes){ - if(pairComparator.equalToReference(right)){ - function.join(left, right, collector); + if (matchingHashes != null) { + pairComparator.setReference(left); + for (IN2 right : matchingHashes){ + if (pairComparator.equalToReference(right)) { + if (mutableObjectSafe) { + function.join(leftSerializer.copy(left), rightSerializer.copy(right), collector); + } else { + function.join(left, right, collector); + } } } } diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/MapOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/MapOperatorBase.java index d545676935b9a4ac6cf3e066716cbc6adf8f0563..0218bfa252d692b2b32bb0b4d5cdb8e3096dcdfa 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/MapOperatorBase.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/MapOperatorBase.java @@ -16,7 +16,6 @@ * limitations under the License. */ - package org.apache.flink.api.common.operators.base; import java.util.ArrayList; @@ -30,7 +29,7 @@ import org.apache.flink.api.common.operators.UnaryOperatorInformation; import org.apache.flink.api.common.operators.util.UserCodeClassWrapper; import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper; import org.apache.flink.api.common.operators.util.UserCodeWrapper; - +import org.apache.flink.api.common.typeutils.TypeSerializer; /** * @@ -55,15 +54,27 @@ public class MapOperatorBase> extends S // -------------------------------------------------------------------------------------------- @Override - protected List executeOnCollections(List inputData, RuntimeContext ctx) throws Exception { + protected List executeOnCollections(List inputData, RuntimeContext ctx, boolean mutableObjectSafeMode) throws Exception { MapFunction function = this.userFunction.getUserCodeObject(); FunctionUtils.setFunctionRuntimeContext(function, ctx); FunctionUtils.openFunction(function, this.parameters); ArrayList result = new ArrayList(inputData.size()); - for (IN element : inputData) { - result.add(function.map(element)); + + if (mutableObjectSafeMode) { + TypeSerializer inSerializer = getOperatorInfo().getInputType().createSerializer(); + TypeSerializer outSerializer = getOperatorInfo().getOutputType().createSerializer(); + + for (IN element : inputData) { + IN inCopy = inSerializer.copy(element); + OUT out = function.map(inCopy); + result.add(outSerializer.copy(out)); + } + } else { + for (IN element : inputData) { + result.add(function.map(element)); + } } FunctionUtils.closeFunction(function); diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/MapPartitionOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/MapPartitionOperatorBase.java index 21fa9be6cf2f3ba9a7585ba8446fc2ab150f52ab..7c1fcef1684cc1fb26166af0edd4fd1d58db91a0 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/MapPartitionOperatorBase.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/MapPartitionOperatorBase.java @@ -23,6 +23,8 @@ import java.util.List; import org.apache.flink.api.common.functions.MapPartitionFunction; import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.functions.util.CopyingIterator; +import org.apache.flink.api.common.functions.util.CopyingListCollector; import org.apache.flink.api.common.functions.util.FunctionUtils; import org.apache.flink.api.common.functions.util.ListCollector; import org.apache.flink.api.common.operators.SingleInputOperator; @@ -30,6 +32,7 @@ import org.apache.flink.api.common.operators.UnaryOperatorInformation; import org.apache.flink.api.common.operators.util.UserCodeClassWrapper; import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper; import org.apache.flink.api.common.operators.util.UserCodeWrapper; +import org.apache.flink.api.common.typeutils.TypeSerializer; /** * @@ -54,18 +57,28 @@ public class MapPartitionOperatorBase executeOnCollections(List inputData, RuntimeContext ctx) throws Exception { + protected List executeOnCollections(List inputData, RuntimeContext ctx, boolean mutableObjectSafeMode) throws Exception { MapPartitionFunction function = this.userFunction.getUserCodeObject(); FunctionUtils.setFunctionRuntimeContext(function, ctx); FunctionUtils.openFunction(function, this.parameters); ArrayList result = new ArrayList(inputData.size() / 4); - ListCollector resultCollector = new ListCollector(result); - function.mapPartition(inputData, resultCollector); + if (mutableObjectSafeMode) { + TypeSerializer inSerializer = getOperatorInfo().getInputType().createSerializer(); + TypeSerializer outSerializer = getOperatorInfo().getOutputType().createSerializer(); + + CopyingIterator source = new CopyingIterator(inputData.iterator(), inSerializer); + CopyingListCollector resultCollector = new CopyingListCollector(result, outSerializer); + + function.mapPartition(source, resultCollector); + } else { + ListCollector resultCollector = new ListCollector(result); + function.mapPartition(inputData, resultCollector); + } + result.trimToSize(); - FunctionUtils.closeFunction(function); return result; } diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/PartitionOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/PartitionOperatorBase.java index 1f17db09091abb4b785ae13e3a6e4d7643055634..af8a111a27c868607d37bee925728de903f13b9c 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/PartitionOperatorBase.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/PartitionOperatorBase.java @@ -18,6 +18,9 @@ package org.apache.flink.api.common.operators.base; +import java.util.List; + +import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.common.functions.util.NoOpFunction; import org.apache.flink.api.common.operators.SingleInputOperator; import org.apache.flink.api.common.operators.UnaryOperatorInformation; @@ -51,4 +54,8 @@ public class PartitionOperatorBase extends SingleInputOperator executeOnCollections(List inputData, RuntimeContext runtimeContext, boolean mutableObjectSafeMode) { + return inputData; + } } diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/ReduceOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/ReduceOperatorBase.java index f7975a19f9d75f2dc4955d00d19b218641a2d6fb..1f192f8ce3feea362130337185e4fa15e5f3e972 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/ReduceOperatorBase.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/ReduceOperatorBase.java @@ -16,7 +16,6 @@ * limitations under the License. */ - package org.apache.flink.api.common.operators.base; import org.apache.flink.api.common.InvalidProgramException; @@ -32,13 +31,14 @@ import org.apache.flink.api.common.operators.util.UserCodeWrapper; import org.apache.flink.api.common.typeinfo.CompositeType; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; - /** * Base data flow operator for Reduce user-defined functions. Accepts reduce functions * and key positions. The key positions are expected in the flattened common data model. @@ -123,20 +123,23 @@ public class ReduceOperatorBase> extends SingleI super(new UserCodeClassWrapper(udf), operatorInfo, name); } -// -------------------------------------------------------------------------------------------- + // -------------------------------------------------------------------------------------------- @SuppressWarnings("unchecked") @Override - protected List executeOnCollections(List inputData, RuntimeContext ctx) - throws Exception { + protected List executeOnCollections(List inputData, RuntimeContext ctx, boolean mutableObjectSafeMode) throws Exception { + // make sure we can handle empty inputs + if (inputData.isEmpty()) { + return Collections.emptyList(); + } + ReduceFunction function = this.userFunction.getUserCodeObject(); UnaryOperatorInformation operatorInfo = getOperatorInfo(); TypeInformation inputType = operatorInfo.getInputType(); if (!(inputType instanceof CompositeType)) { - throw new InvalidProgramException("Input type of groupReduce operation must be" + - " composite type."); + throw new InvalidProgramException("Input type of groupReduce operation must be" + " composite type."); } FunctionUtils.setFunctionRuntimeContext(function, ctx); @@ -161,22 +164,30 @@ public class ReduceOperatorBase> extends SingleI aggregateMap.put(wrapper, result); } - List result = new ArrayList(aggregateMap.values().size()); - result.addAll(aggregateMap.values()); - FunctionUtils.closeFunction(function); - return result; - } else { + return new ArrayList(aggregateMap.values()); + } + else { T aggregate = inputData.get(0); - for (int i = 1; i < inputData.size(); i++) { - aggregate = function.reduce(aggregate, inputData.get(i)); + + if (mutableObjectSafeMode) { + TypeSerializer serializer = getOperatorInfo().getInputType().createSerializer(); + aggregate = serializer.copy(aggregate); + + for (int i = 1; i < inputData.size(); i++) { + T next = function.reduce(aggregate, serializer.copy(inputData.get(i))); + aggregate = serializer.copy(next); + } + } + else { + for (int i = 1; i < inputData.size(); i++) { + aggregate = function.reduce(aggregate, inputData.get(i)); + } } - List result = new ArrayList(1); - result.add(aggregate); FunctionUtils.setFunctionRuntimeContext(function, ctx); - return result; + + return Collections.singletonList(aggregate); } - } } diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/util/ListKeyGroupedIterator.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/util/ListKeyGroupedIterator.java index 8a4c919a6620caf6dd2f2db0bb4ecd1be37b8658..e530f8a3794564c93baf94a9ac3ff972290475ee 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/operators/util/ListKeyGroupedIterator.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/util/ListKeyGroupedIterator.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -19,6 +19,7 @@ package org.apache.flink.api.common.operators.util; import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; import java.io.IOException; import java.util.Iterator; @@ -32,14 +33,16 @@ public final class ListKeyGroupedIterator { private final List input; + private final TypeSerializer serializer; // != null if the elements should be copied + private final TypeComparator comparator; private ValuesIterator valuesIterator; private int currentPosition = 0; - private E lookahead = null; - + private E lookahead; + private boolean done; /** @@ -48,13 +51,13 @@ public final class ListKeyGroupedIterator { * @param input The list with the input elements. * @param comparator The comparator for the data type iterated over. */ - public ListKeyGroupedIterator(List input, TypeComparator comparator) - { + public ListKeyGroupedIterator(List input, TypeSerializer serializer, TypeComparator comparator, boolean copy) { if (input == null || comparator == null) { throw new NullPointerException(); } this.input = input; + this.serializer = copy ? serializer : null; this.comparator = comparator; this.done = input.isEmpty() ? true : false; @@ -109,7 +112,7 @@ public final class ListKeyGroupedIterator { E first = input.get(currentPosition++); if (first != null) { this.comparator.setReference(first); - this.valuesIterator = new ValuesIterator(first); + this.valuesIterator = new ValuesIterator(first, serializer); return true; } else { @@ -155,9 +158,12 @@ public final class ListKeyGroupedIterator { public final class ValuesIterator implements Iterator, Iterable { private E next; + + private final TypeSerializer serializer; - private ValuesIterator(E first) { + private ValuesIterator(E first, TypeSerializer serializer) { this.next = first; + this.serializer = serializer; } @Override @@ -170,7 +176,7 @@ public final class ListKeyGroupedIterator { if (this.next != null) { E current = this.next; this.next = ListKeyGroupedIterator.this.advanceToNext(); - return current; + return serializer != null ? serializer.copy(current) : current; } else { throw new NoSuchElementException(); } diff --git a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/FlatMapOperatorCollectionTest.java b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/FlatMapOperatorCollectionTest.java index 8227c2218d400ef8f1aa917030c6f0d064ad54e6..74dc889d678bb729c97f30a6822b98f5ab8a191f 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/FlatMapOperatorCollectionTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/FlatMapOperatorCollectionTest.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -30,8 +30,8 @@ import org.junit.Assert; import org.junit.Test; import java.io.Serializable; -import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.List; @SuppressWarnings("serial") @@ -41,25 +41,34 @@ public class FlatMapOperatorCollectionTest implements Serializable { public void testExecuteOnCollection() { try { IdRichFlatMap udf = new IdRichFlatMap(); - testExecuteOnCollection(udf, Arrays.asList("f", "l", "i", "n", "k")); + testExecuteOnCollection(udf, Arrays.asList("f", "l", "i", "n", "k"), true); Assert.assertTrue(udf.isClosed); - testExecuteOnCollection(new IdRichFlatMap(), new ArrayList()); - } catch (Throwable t) { - Assert.fail(t.getMessage()); + udf = new IdRichFlatMap(); + testExecuteOnCollection(udf, Arrays.asList("f", "l", "i", "n", "k"), false); + Assert.assertTrue(udf.isClosed); + + udf = new IdRichFlatMap(); + testExecuteOnCollection(udf, Collections.emptyList(), true); + Assert.assertTrue(udf.isClosed); + + udf = new IdRichFlatMap(); + testExecuteOnCollection(udf, Collections.emptyList(), false); + Assert.assertTrue(udf.isClosed); + } + catch (Exception e) { + e.printStackTrace(); + Assert.fail(e.getMessage()); } } - private void testExecuteOnCollection(FlatMapFunction udf, List input) throws Exception { + private void testExecuteOnCollection(FlatMapFunction udf, List input, boolean mutableSafe) throws Exception { // run on collections final List result = getTestFlatMapOperator(udf) - .executeOnCollections(input, new RuntimeUDFContext("Test UDF", 4, 0)); + .executeOnCollections(input, new RuntimeUDFContext("Test UDF", 4, 0), mutableSafe); Assert.assertEquals(input.size(), result.size()); - - for (int i = 0; i < input.size(); i++) { - Assert.assertEquals(input.get(i), result.get(i)); - } + Assert.assertEquals(input, result); } diff --git a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/JoinOperatorBaseTest.java b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/JoinOperatorBaseTest.java index b048cc5afdd7505a82b730dea108089e09ade44a..8834989f9a3f1a3a8e424b03f0df802d945ad716 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/JoinOperatorBaseTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/JoinOperatorBaseTest.java @@ -60,10 +60,13 @@ public class JoinOperatorBaseTest implements Serializable { List expected = new ArrayList(Arrays.asList(3, 3, 6 ,6)); try { - List result = base.executeOnCollections(inputData1, inputData2, null); + List resultSafe = base.executeOnCollections(inputData1, inputData2, null, true); + List resultRegular = base.executeOnCollections(inputData1, inputData2, null, false); - assertEquals(expected, result); - } catch (Exception e) { + assertEquals(expected, resultSafe); + assertEquals(expected, resultRegular); + } + catch (Exception e) { e.printStackTrace(); fail(e.getMessage()); } @@ -107,11 +110,13 @@ public class JoinOperatorBaseTest implements Serializable { try { - List result = base.executeOnCollections(inputData1, inputData2, new RuntimeUDFContext(taskName, - 1, 0)); + List resultSafe = base.executeOnCollections(inputData1, inputData2, new RuntimeUDFContext(taskName, 1, 0), true); + List resultRegular = base.executeOnCollections(inputData1, inputData2, new RuntimeUDFContext(taskName, 1, 0), false); - assertEquals(expected, result); - } catch (Exception e) { + assertEquals(expected, resultSafe); + assertEquals(expected, resultRegular); + } + catch (Exception e) { e.printStackTrace(); fail(e.getMessage()); } diff --git a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/MapOperatorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/MapOperatorTest.java index bb263ad348435ff4747d9b32015e094400dc36d4..82778c575b77a249a97093b7cf13433cf435def0 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/MapOperatorTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/MapOperatorTest.java @@ -52,9 +52,11 @@ public class MapOperatorTest implements java.io.Serializable { parser, new UnaryOperatorInformation(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO), "TestMapper"); List input = new ArrayList(asList("1", "2", "3", "4", "5", "6")); - List result = op.executeOnCollections(input, null); + List resultMutableSafe = op.executeOnCollections(input, null, true); + List resultRegular = op.executeOnCollections(input, null, false); - assertEquals(asList(1, 2, 3, 4, 5, 6), result); + assertEquals(asList(1, 2, 3, 4, 5, 6), resultMutableSafe); + assertEquals(asList(1, 2, 3, 4, 5, 6), resultRegular); } catch (Exception e) { e.printStackTrace(); @@ -95,9 +97,11 @@ public class MapOperatorTest implements java.io.Serializable { parser, new UnaryOperatorInformation(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO), taskName); List input = new ArrayList(asList("1", "2", "3", "4", "5", "6")); - List result = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0)); + List resultMutableSafe = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0), true); + List resultRegular = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0), false); - assertEquals(asList(1, 2, 3, 4, 5, 6), result); + assertEquals(asList(1, 2, 3, 4, 5, 6), resultMutableSafe); + assertEquals(asList(1, 2, 3, 4, 5, 6), resultRegular); assertTrue(opened.get()); assertTrue(closed.get()); diff --git a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/PartitionMapOperatorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/PartitionMapOperatorTest.java index 0657ac10df01e17c59db120f7e21f1ead6bde05d..1c17fdee63212ad022dae14169d17a5f61485df2 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/PartitionMapOperatorTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/PartitionMapOperatorTest.java @@ -74,9 +74,12 @@ public class PartitionMapOperatorTest implements java.io.Serializable { parser, new UnaryOperatorInformation(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO), taskName); List input = new ArrayList(asList("1", "2", "3", "4", "5", "6")); - List result = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0)); - assertEquals(asList(1, 2, 3, 4, 5, 6), result); + List resultMutableSafe = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0), true); + List resultRegular = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0), false); + + assertEquals(asList(1, 2, 3, 4, 5, 6), resultMutableSafe); + assertEquals(asList(1, 2, 3, 4, 5, 6), resultRegular); assertTrue(opened.get()); assertTrue(closed.get()); diff --git a/flink-java/src/test/java/org/apache/flink/api/common/operators/CollectionExecutionAccumulatorsTest.java b/flink-java/src/test/java/org/apache/flink/api/common/operators/CollectionExecutionAccumulatorsTest.java index 7c89f1325dc06d248a3980543dccc82f18191c53..f83669255a7d48f2e8fcc748a04a6a62204a6f96 100644 --- a/flink-java/src/test/java/org/apache/flink/api/common/operators/CollectionExecutionAccumulatorsTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/common/operators/CollectionExecutionAccumulatorsTest.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information diff --git a/flink-java/src/test/java/org/apache/flink/api/common/operators/CollectionExecutionIterationTest.java b/flink-java/src/test/java/org/apache/flink/api/common/operators/CollectionExecutionIterationTest.java index 0f6acbef91dccddfb2b68a595e9e8a8b75679539..34f9137aea92a7d69d46e70c46939d759b4e33ba 100644 --- a/flink-java/src/test/java/org/apache/flink/api/common/operators/CollectionExecutionIterationTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/common/operators/CollectionExecutionIterationTest.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -20,7 +20,6 @@ package org.apache.flink.api.common.operators; //CHECKSTYLE.OFF: AvoidStarImport - Needed for TupleGenerator import static org.junit.Assert.*; -//CHECKSTYLE.ON: AvoidStarImport import java.util.ArrayList; import java.util.List; @@ -102,11 +101,14 @@ public class CollectionExecutionIterationTest implements java.io.Serializable { try { ExecutionEnvironment env = new CollectionEnvironment(); + @SuppressWarnings("unchecked") DataSet> solInput = env.fromElements( new Tuple2(1, 0), new Tuple2(2, 0), new Tuple2(3, 0), new Tuple2(4, 0)); + + @SuppressWarnings("unchecked") DataSet> workInput = env.fromElements( new Tuple1(1), new Tuple1(2), diff --git a/flink-java/src/test/java/org/apache/flink/api/common/operators/CollectionExecutionWithBroadcastVariableTest.java b/flink-java/src/test/java/org/apache/flink/api/common/operators/CollectionExecutionWithBroadcastVariableTest.java index adf2d960156a3406b8624c23949564c4f2180559..c2db7c9217fd581863cc20fb5235410d3f6fbe0c 100644 --- a/flink-java/src/test/java/org/apache/flink/api/common/operators/CollectionExecutionWithBroadcastVariableTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/common/operators/CollectionExecutionWithBroadcastVariableTest.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information diff --git a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/CoGroupOperatorCollectionTest.java b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/CoGroupOperatorCollectionTest.java index fe2551dfe892b59b9480f1c0376d31a774d4efd1..053b8e2c5cdb59d449f87286cc54c38b62faf659 100644 --- a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/CoGroupOperatorCollectionTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/CoGroupOperatorCollectionTest.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -38,6 +38,7 @@ import java.util.HashSet; import java.util.List; import java.util.Set; +@SuppressWarnings("serial") public class CoGroupOperatorCollectionTest implements Serializable { @Test @@ -68,12 +69,17 @@ public class CoGroupOperatorCollectionTest implements Serializable { final RuntimeContext ctx = new RuntimeUDFContext("Test UDF", 4, 0); { - SumCoGroup udf = new SumCoGroup(); - List> result = getCoGroupOperator(udf) - .executeOnCollections(input1, input2, ctx); - - Assert.assertTrue(udf.isClosed); - + SumCoGroup udf1 = new SumCoGroup(); + SumCoGroup udf2 = new SumCoGroup(); + + List> resultSafe = getCoGroupOperator(udf1) + .executeOnCollections(input1, input2, ctx, true); + List> resultRegular = getCoGroupOperator(udf2) + .executeOnCollections(input1, input2, ctx, false); + + Assert.assertTrue(udf1.isClosed); + Assert.assertTrue(udf2.isClosed); + Set> expected = new HashSet>( Arrays.asList(new Tuple2Builder() .add("foo", 8) @@ -84,14 +90,21 @@ public class CoGroupOperatorCollectionTest implements Serializable { ) ); - Assert.assertEquals(expected, new HashSet(result)); + Assert.assertEquals(expected, new HashSet>(resultSafe)); + Assert.assertEquals(expected, new HashSet>(resultRegular)); } { - List> result = getCoGroupOperator(new SumCoGroup()) - .executeOnCollections(Collections.EMPTY_LIST, Collections.EMPTY_LIST, ctx); - - Assert.assertEquals(0, result.size()); + List> resultSafe = getCoGroupOperator(new SumCoGroup()) + .executeOnCollections(Collections.>emptyList(), + Collections.>emptyList(), ctx, true); + + List> resultRegular = getCoGroupOperator(new SumCoGroup()) + .executeOnCollections(Collections.>emptyList(), + Collections.>emptyList(), ctx, false); + + Assert.assertEquals(0, resultSafe.size()); + Assert.assertEquals(0, resultRegular.size()); } } catch (Throwable t) { t.printStackTrace(); diff --git a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorTest.java b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorTest.java index f77b29260172750d511416b0af4b59a1452fe6b4..5d1ca178a2d9623ddd4072d56ab18f71f8adf923 100644 --- a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorTest.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -79,15 +79,21 @@ public class GroupReduceOperatorTest implements java.io.Serializable { Integer>("foo", 3), new Tuple2("bar", 2), new Tuple2("bar", 4))); - List> result = op.executeOnCollections(input, null); - Set> resultSet = new HashSet>(result); + + List> resultMutableSafe = op.executeOnCollections(input, null, true); + List> resultRegular = op.executeOnCollections(input, null, false); + + Set> resultSetMutableSafe = new HashSet>(resultMutableSafe); + Set> resultSetRegular = new HashSet>(resultRegular); Set> expectedResult = new HashSet>(asList(new Tuple2("foo", 4), new Tuple2("bar", 6))); - assertEquals(expectedResult, resultSet); - } catch (Exception e) { + assertEquals(expectedResult, resultSetMutableSafe); + assertEquals(expectedResult, resultSetRegular); + } + catch (Exception e) { e.printStackTrace(); fail(e.getMessage()); } @@ -149,15 +155,19 @@ public class GroupReduceOperatorTest implements java.io.Serializable { Integer>("foo", 3), new Tuple2("bar", 2), new Tuple2("bar", 4))); - List> result = op.executeOnCollections(input, - new RuntimeUDFContext(taskName, 1, 0)); - Set> resultSet = new HashSet>(result); + List> resultMutableSafe = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0), true); + List> resultRegular = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0), false); + + + Set> resultSetMutableSafe = new HashSet>(resultMutableSafe); + Set> resultSetRegular = new HashSet>(resultRegular); Set> expectedResult = new HashSet>(asList(new Tuple2("foo", 4), new Tuple2("bar", 6))); - assertEquals(expectedResult, resultSet); + assertEquals(expectedResult, resultSetMutableSafe); + assertEquals(expectedResult, resultSetRegular); assertTrue(opened.get()); assertTrue(closed.get()); diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operators/base/JoinOperatorBaseTest.java b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/JoinOperatorBaseTest.java similarity index 81% rename from flink-java/src/test/java/org/apache/flink/api/java/operators/base/JoinOperatorBaseTest.java rename to flink-java/src/test/java/org/apache/flink/api/common/operators/base/JoinOperatorBaseTest.java index f332832ec73eab8a237e067a0d0100be0ccf3567..d9abf149cce0eaeab93dc4d782586e9623ee4cbf 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/operators/base/JoinOperatorBaseTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/JoinOperatorBaseTest.java @@ -16,12 +16,12 @@ * limitations under the License. */ -package org.apache.flink.api.java.operators.base; +package org.apache.flink.api.common.operators.base; import static org.junit.Assert.*; import org.apache.flink.api.common.functions.FlatJoinFunction; -import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.functions.util.RuntimeUDFContext; import org.apache.flink.api.common.operators.BinaryOperatorInformation; import org.apache.flink.api.common.operators.base.JoinOperatorBase; import org.apache.flink.api.java.tuple.Tuple2; @@ -31,21 +31,23 @@ import org.apache.flink.util.Collector; import org.junit.Test; import java.io.Serializable; -import java.lang.reflect.Method; import java.util.ArrayList; import java.util.Arrays; import java.util.HashSet; import java.util.List; import java.util.Set; +@SuppressWarnings({ "unchecked", "serial" }) public class JoinOperatorBaseTest implements Serializable { + @Test public void testTupleBaseJoiner(){ - final FlatJoinFunction, Tuple2, Tuple2> joiner = new FlatJoinFunction() { + final FlatJoinFunction, Tuple2, Tuple2> joiner = + new FlatJoinFunction, Tuple2, Tuple2>() + { @Override - public void join(Object first, Object second, Collector out) throws Exception { + public void join(Tuple3 first, Tuple2 second, Collector> out) { Tuple3 fst = (Tuple3)first; Tuple2 snd = (Tuple2)second; @@ -99,18 +101,15 @@ public class JoinOperatorBaseTest implements Serializable { )); try { - Method executeOnCollections = base.getClass().getDeclaredMethod("executeOnCollections", List.class, - List.class, RuntimeContext.class); - executeOnCollections.setAccessible(true); + List> resultSafe = base.executeOnCollections(inputData1, inputData2, new RuntimeUDFContext("op", 1, 0), true); + List> resultRegular = base.executeOnCollections(inputData1, inputData2, new RuntimeUDFContext("op", 1, 0), false); - Object result = executeOnCollections.invoke(base, inputData1, inputData2, null); - - assertEquals(expected, new HashSet>((List>)result)); - - } catch (Exception e) { + assertEquals(expected, new HashSet>(resultSafe)); + assertEquals(expected, new HashSet>(resultRegular)); + } + catch (Exception e) { e.printStackTrace(); fail(e.getMessage()); } - } } diff --git a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/ReduceOperatorTest.java b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/ReduceOperatorTest.java index a4de40b9b5afe1682b69ca1222992628c4b897cc..2baf57e2ad4e5ef8eb2bb73a066b082d8d1ed74c 100644 --- a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/ReduceOperatorTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/ReduceOperatorTest.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -68,15 +68,20 @@ public class ReduceOperatorTest implements java.io.Serializable { Integer>("foo", 3), new Tuple2("bar", 2), new Tuple2("bar", 4))); - List> result = op.executeOnCollections(input, null); - Set> resultSet = new HashSet>(result); + List> resultMutableSafe = op.executeOnCollections(input, null, true); + List> resultRegular = op.executeOnCollections(input, null, false); + + Set> resultSetMutableSafe = new HashSet>(resultMutableSafe); + Set> resultSetRegular = new HashSet>(resultRegular); Set> expectedResult = new HashSet>(asList(new Tuple2("foo", 4), new Tuple2("bar", 6))); - assertEquals(expectedResult, resultSet); - } catch (Exception e) { + assertEquals(expectedResult, resultSetMutableSafe); + assertEquals(expectedResult, resultSetRegular); + } + catch (Exception e) { e.printStackTrace(); fail(e.getMessage()); } @@ -127,20 +132,23 @@ public class ReduceOperatorTest implements java.io.Serializable { Integer>("foo", 3), new Tuple2("bar", 2), new Tuple2("bar", 4))); - List> result = op.executeOnCollections(input, - new RuntimeUDFContext(taskName, 1, 0)); + List> resultMutableSafe = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0), true); + List> resultRegular = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0), false); - Set> resultSet = new HashSet>(result); + Set> resultSetMutableSafe = new HashSet>(resultMutableSafe); + Set> resultSetRegular = new HashSet>(resultRegular); Set> expectedResult = new HashSet>(asList(new Tuple2("foo", 4), new Tuple2("bar", 6))); - assertEquals(expectedResult, resultSet); + assertEquals(expectedResult, resultSetMutableSafe); + assertEquals(expectedResult, resultSetRegular); assertTrue(opened.get()); assertTrue(closed.get()); - } catch (Exception e) { + } + catch (Exception e) { e.printStackTrace(); fail(e.getMessage()); }