From 8b904ae21d319c6cd26d160f7c5cc91b5f081577 Mon Sep 17 00:00:00 2001 From: Aljoscha Krettek Date: Wed, 20 May 2015 10:48:43 +0200 Subject: [PATCH] [FLINK-2052] Fix Serialization warnings in Stream Operators This closes #698 --- .../flink/api/java/functions/KeySelector.java | 4 +- .../api/datastream/StreamProjection.java | 50 +++++++++---------- .../api/operators/AbstractStreamOperator.java | 6 +-- .../operators/AbstractUdfStreamOperator.java | 4 +- .../api/operators/StreamCounter.java | 2 + .../streaming/api/operators/StreamFilter.java | 2 + .../api/operators/StreamFlatMap.java | 2 + .../streaming/api/operators/StreamFold.java | 2 + .../api/operators/StreamGroupedFold.java | 2 + .../api/operators/StreamGroupedReduce.java | 2 + .../streaming/api/operators/StreamMap.java | 2 + .../api/operators/StreamProject.java | 20 ++++---- .../streaming/api/operators/StreamReduce.java | 2 + .../streaming/api/operators/StreamSink.java | 2 + .../streaming/api/operators/StreamSource.java | 2 + .../api/operators/co/CoStreamFlatMap.java | 2 + .../operators/co/CoStreamGroupedReduce.java | 1 + .../api/operators/co/CoStreamMap.java | 2 + .../api/operators/co/CoStreamReduce.java | 2 + .../api/operators/co/CoStreamWindow.java | 2 + .../windowing/GroupedActiveDiscretizer.java | 5 +- .../windowing/GroupedStreamDiscretizer.java | 2 +- .../windowing/GroupedWindowBuffer.java | 13 ++++- .../windowing/StreamDiscretizer.java | 8 ++- .../windowing/StreamWindowBuffer.java | 2 + .../operators/windowing/WindowFlattener.java | 2 + .../api/operators/windowing/WindowFolder.java | 1 + .../api/operators/windowing/WindowMerger.java | 2 + .../windowing/WindowPartitioner.java | 2 + .../streaming/api/operators/ProjectTest.java | 9 ++-- 30 files changed, 106 insertions(+), 53 deletions(-) diff --git a/flink-java/src/main/java/org/apache/flink/api/java/functions/KeySelector.java b/flink-java/src/main/java/org/apache/flink/api/java/functions/KeySelector.java index 21ecc24e9cf..3d06c598e7f 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/functions/KeySelector.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/functions/KeySelector.java @@ -20,6 +20,8 @@ package org.apache.flink.api.java.functions; import org.apache.flink.api.common.functions.Function; +import java.io.Serializable; + /** * The {@link KeySelector} allows to use arbitrary objects for operations such as * reduce, reduceGroup, join, coGoup, etc. @@ -29,7 +31,7 @@ import org.apache.flink.api.common.functions.Function; * @param Type of objects to extract the key from. * @param Type of key. */ -public interface KeySelector extends Function, java.io.Serializable { +public interface KeySelector extends Function, Serializable { /** * User-defined function that extracts the key from an arbitrary object. diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamProjection.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamProjection.java index 447b1fdc089..16e9deb725f 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamProjection.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamProjection.java @@ -129,7 +129,7 @@ public class StreamProjection { TupleTypeInfo> tType = new TupleTypeInfo>(fTypes); return dataStream.transform("Projection", tType, new StreamProject>( - fieldIndexes, tType)); + fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig()))); } /** @@ -143,7 +143,7 @@ public class StreamProjection { TypeInformation[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType()); TupleTypeInfo> tType = new TupleTypeInfo>(fTypes); - return dataStream.transform("Projection", tType, new StreamProject>(fieldIndexes, tType)); + return dataStream.transform("Projection", tType, new StreamProject>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig()))); } /** @@ -157,7 +157,7 @@ public class StreamProjection { TypeInformation[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType()); TupleTypeInfo> tType = new TupleTypeInfo>(fTypes); - return dataStream.transform("Projection", tType, new StreamProject>(fieldIndexes, tType)); + return dataStream.transform("Projection", tType, new StreamProject>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig()))); } /** @@ -171,7 +171,7 @@ public class StreamProjection { TypeInformation[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType()); TupleTypeInfo> tType = new TupleTypeInfo>(fTypes); - return dataStream.transform("Projection", tType, new StreamProject>(fieldIndexes, tType)); + return dataStream.transform("Projection", tType, new StreamProject>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig()))); } /** @@ -185,7 +185,7 @@ public class StreamProjection { TypeInformation[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType()); TupleTypeInfo> tType = new TupleTypeInfo>(fTypes); - return dataStream.transform("Projection", tType, new StreamProject>(fieldIndexes, tType)); + return dataStream.transform("Projection", tType, new StreamProject>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig()))); } /** @@ -199,7 +199,7 @@ public class StreamProjection { TypeInformation[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType()); TupleTypeInfo> tType = new TupleTypeInfo>(fTypes); - return dataStream.transform("Projection", tType, new StreamProject>(fieldIndexes, tType)); + return dataStream.transform("Projection", tType, new StreamProject>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig()))); } /** @@ -213,7 +213,7 @@ public class StreamProjection { TypeInformation[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType()); TupleTypeInfo> tType = new TupleTypeInfo>(fTypes); - return dataStream.transform("Projection", tType, new StreamProject>(fieldIndexes, tType)); + return dataStream.transform("Projection", tType, new StreamProject>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig()))); } /** @@ -227,7 +227,7 @@ public class StreamProjection { TypeInformation[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType()); TupleTypeInfo> tType = new TupleTypeInfo>(fTypes); - return dataStream.transform("Projection", tType, new StreamProject>(fieldIndexes, tType)); + return dataStream.transform("Projection", tType, new StreamProject>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig()))); } /** @@ -241,7 +241,7 @@ public class StreamProjection { TypeInformation[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType()); TupleTypeInfo> tType = new TupleTypeInfo>(fTypes); - return dataStream.transform("Projection", tType, new StreamProject>(fieldIndexes, tType)); + return dataStream.transform("Projection", tType, new StreamProject>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig()))); } /** @@ -255,7 +255,7 @@ public class StreamProjection { TypeInformation[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType()); TupleTypeInfo> tType = new TupleTypeInfo>(fTypes); - return dataStream.transform("Projection", tType, new StreamProject>(fieldIndexes, tType)); + return dataStream.transform("Projection", tType, new StreamProject>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig()))); } /** @@ -269,7 +269,7 @@ public class StreamProjection { TypeInformation[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType()); TupleTypeInfo> tType = new TupleTypeInfo>(fTypes); - return dataStream.transform("Projection", tType, new StreamProject>(fieldIndexes, tType)); + return dataStream.transform("Projection", tType, new StreamProject>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig()))); } /** @@ -283,7 +283,7 @@ public class StreamProjection { TypeInformation[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType()); TupleTypeInfo> tType = new TupleTypeInfo>(fTypes); - return dataStream.transform("Projection", tType, new StreamProject>(fieldIndexes, tType)); + return dataStream.transform("Projection", tType, new StreamProject>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig()))); } /** @@ -297,7 +297,7 @@ public class StreamProjection { TypeInformation[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType()); TupleTypeInfo> tType = new TupleTypeInfo>(fTypes); - return dataStream.transform("Projection", tType, new StreamProject>(fieldIndexes, tType)); + return dataStream.transform("Projection", tType, new StreamProject>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig()))); } /** @@ -311,7 +311,7 @@ public class StreamProjection { TypeInformation[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType()); TupleTypeInfo> tType = new TupleTypeInfo>(fTypes); - return dataStream.transform("Projection", tType, new StreamProject>(fieldIndexes, tType)); + return dataStream.transform("Projection", tType, new StreamProject>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig()))); } /** @@ -325,7 +325,7 @@ public class StreamProjection { TypeInformation[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType()); TupleTypeInfo> tType = new TupleTypeInfo>(fTypes); - return dataStream.transform("Projection", tType, new StreamProject>(fieldIndexes, tType)); + return dataStream.transform("Projection", tType, new StreamProject>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig()))); } /** @@ -339,7 +339,7 @@ public class StreamProjection { TypeInformation[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType()); TupleTypeInfo> tType = new TupleTypeInfo>(fTypes); - return dataStream.transform("Projection", tType, new StreamProject>(fieldIndexes, tType)); + return dataStream.transform("Projection", tType, new StreamProject>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig()))); } /** @@ -353,7 +353,7 @@ public class StreamProjection { TypeInformation[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType()); TupleTypeInfo> tType = new TupleTypeInfo>(fTypes); - return dataStream.transform("Projection", tType, new StreamProject>(fieldIndexes, tType)); + return dataStream.transform("Projection", tType, new StreamProject>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig()))); } /** @@ -367,7 +367,7 @@ public class StreamProjection { TypeInformation[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType()); TupleTypeInfo> tType = new TupleTypeInfo>(fTypes); - return dataStream.transform("Projection", tType, new StreamProject>(fieldIndexes, tType)); + return dataStream.transform("Projection", tType, new StreamProject>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig()))); } /** @@ -381,7 +381,7 @@ public class StreamProjection { TypeInformation[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType()); TupleTypeInfo> tType = new TupleTypeInfo>(fTypes); - return dataStream.transform("Projection", tType, new StreamProject>(fieldIndexes, tType)); + return dataStream.transform("Projection", tType, new StreamProject>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig()))); } /** @@ -395,7 +395,7 @@ public class StreamProjection { TypeInformation[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType()); TupleTypeInfo> tType = new TupleTypeInfo>(fTypes); - return dataStream.transform("Projection", tType, new StreamProject>(fieldIndexes, tType)); + return dataStream.transform("Projection", tType, new StreamProject>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig()))); } /** @@ -409,7 +409,7 @@ public class StreamProjection { TypeInformation[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType()); TupleTypeInfo> tType = new TupleTypeInfo>(fTypes); - return dataStream.transform("Projection", tType, new StreamProject>(fieldIndexes, tType)); + return dataStream.transform("Projection", tType, new StreamProject>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig()))); } /** @@ -423,7 +423,7 @@ public class StreamProjection { TypeInformation[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType()); TupleTypeInfo> tType = new TupleTypeInfo>(fTypes); - return dataStream.transform("Projection", tType, new StreamProject>(fieldIndexes, tType)); + return dataStream.transform("Projection", tType, new StreamProject>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig()))); } /** @@ -437,7 +437,7 @@ public class StreamProjection { TypeInformation[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType()); TupleTypeInfo> tType = new TupleTypeInfo>(fTypes); - return dataStream.transform("Projection", tType, new StreamProject>(fieldIndexes, tType)); + return dataStream.transform("Projection", tType, new StreamProject>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig()))); } /** @@ -451,7 +451,7 @@ public class StreamProjection { TypeInformation[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType()); TupleTypeInfo> tType = new TupleTypeInfo>(fTypes); - return dataStream.transform("Projection", tType, new StreamProject>(fieldIndexes, tType)); + return dataStream.transform("Projection", tType, new StreamProject>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig()))); } /** @@ -465,7 +465,7 @@ public class StreamProjection { TypeInformation[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType()); TupleTypeInfo> tType = new TupleTypeInfo>(fTypes); - return dataStream.transform("Projection", tType, new StreamProject>(fieldIndexes, tType)); + return dataStream.transform("Projection", tType, new StreamProject>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig()))); } public static TypeInformation[] extractFieldTypes(int[] fields, TypeInformation inType) { diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java index a8dc8c54856..b55e5d67baa 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java @@ -31,11 +31,11 @@ public abstract class AbstractStreamOperator implements StreamOperator private static final long serialVersionUID = 1L; - protected RuntimeContext runtimeContext; + protected transient RuntimeContext runtimeContext; - protected ExecutionConfig executionConfig; + protected transient ExecutionConfig executionConfig; - public Output output; + public transient Output output; // A sane default for most operators protected ChainingStrategy chainingStrategy = ChainingStrategy.HEAD; diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java index dbd93b544ce..09d1ef6e1c6 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java @@ -32,7 +32,9 @@ import java.io.Serializable; * @param The output type of the operator * @param The type of the user function */ -public abstract class AbstractUdfStreamOperator extends AbstractStreamOperator implements StatefulStreamOperator { +public abstract class AbstractUdfStreamOperator extends AbstractStreamOperator implements StatefulStreamOperator { + + private static final long serialVersionUID = 1L; protected final F userFunction; diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamCounter.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamCounter.java index d9a67ddd133..240e2b1aba3 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamCounter.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamCounter.java @@ -19,6 +19,8 @@ package org.apache.flink.streaming.api.operators; public class StreamCounter extends AbstractStreamOperator implements OneInputStreamOperator { + private static final long serialVersionUID = 1L; + private Long count = 0L; public StreamCounter() { diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFilter.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFilter.java index e5575dbbd59..a54a4eaece8 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFilter.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFilter.java @@ -21,6 +21,8 @@ import org.apache.flink.api.common.functions.FilterFunction; public class StreamFilter extends AbstractUdfStreamOperator> implements OneInputStreamOperator { + private static final long serialVersionUID = 1L; + public StreamFilter(FilterFunction filterFunction) { super(filterFunction); chainingStrategy = ChainingStrategy.ALWAYS; diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFlatMap.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFlatMap.java index 1e836b17313..e8da2c7ee2c 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFlatMap.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFlatMap.java @@ -23,6 +23,8 @@ public class StreamFlatMap extends AbstractUdfStreamOperator> implements OneInputStreamOperator { + private static final long serialVersionUID = 1L; + public StreamFlatMap(FlatMapFunction flatMapper) { super(flatMapper); chainingStrategy = ChainingStrategy.ALWAYS; diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFold.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFold.java index 6f956caeb45..580477a7c3f 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFold.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFold.java @@ -26,6 +26,8 @@ public class StreamFold extends AbstractUdfStreamOperator> implements OneInputStreamOperator { + private static final long serialVersionUID = 1L; + private OUT accumulator; protected TypeSerializer outTypeSerializer; protected TypeInformation outTypeInformation; diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java index 75217be3ecd..08107a926eb 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java @@ -26,6 +26,8 @@ import org.apache.flink.api.java.functions.KeySelector; public class StreamGroupedFold extends StreamFold { + private static final long serialVersionUID = 1L; + private KeySelector keySelector; private Map values; private OUT initialValue; diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java index e3980cecf4b..8269be71286 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java @@ -25,6 +25,8 @@ import org.apache.flink.api.java.functions.KeySelector; public class StreamGroupedReduce extends StreamReduce { + private static final long serialVersionUID = 1L; + private KeySelector keySelector; private Map values; diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamMap.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamMap.java index a379c566ba2..08dc9813af0 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamMap.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamMap.java @@ -23,6 +23,8 @@ public class StreamMap extends AbstractUdfStreamOperator> implements OneInputStreamOperator { + private static final long serialVersionUID = 1L; + public StreamMap(MapFunction mapper) { super(mapper); chainingStrategy = ChainingStrategy.ALWAYS; diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamProject.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamProject.java index d039144bf2e..83613d83d9e 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamProject.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamProject.java @@ -17,7 +17,6 @@ package org.apache.flink.streaming.api.operators; -import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.configuration.Configuration; @@ -26,16 +25,18 @@ public class StreamProject extends AbstractStreamOperator implements OneInputStreamOperator { - transient OUT outTuple; - TypeSerializer outTypeSerializer; - TypeInformation outTypeInformation; - int[] fields; - int numFields; + private static final long serialVersionUID = 1L; - public StreamProject(int[] fields, TypeInformation outTypeInformation) { + private TypeSerializer outSerializer; + private int[] fields; + private int numFields; + + private transient OUT outTuple; + + public StreamProject(int[] fields, TypeSerializer outSerializer) { this.fields = fields; this.numFields = this.fields.length; - this.outTypeInformation = outTypeInformation; + this.outSerializer = outSerializer; chainingStrategy = ChainingStrategy.ALWAYS; } @@ -52,7 +53,6 @@ public class StreamProject @Override public void open(Configuration config) throws Exception { super.open(config); - this.outTypeSerializer = outTypeInformation.createSerializer(executionConfig); - outTuple = outTypeSerializer.createInstance(); + outTuple = outSerializer.createInstance(); } } diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamReduce.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamReduce.java index 8205fe6128b..97cebc18e0b 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamReduce.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamReduce.java @@ -22,6 +22,8 @@ import org.apache.flink.api.common.functions.ReduceFunction; public class StreamReduce extends AbstractUdfStreamOperator> implements OneInputStreamOperator { + private static final long serialVersionUID = 1L; + private IN currentValue; public StreamReduce(ReduceFunction reducer) { diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSink.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSink.java index b1a021257f5..53993020050 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSink.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSink.java @@ -22,6 +22,8 @@ import org.apache.flink.streaming.api.functions.sink.SinkFunction; public class StreamSink extends AbstractUdfStreamOperator> implements OneInputStreamOperator { + private static final long serialVersionUID = 1L; + public StreamSink(SinkFunction sinkFunction) { super(sinkFunction); diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java index 2e4d313daa7..9cdfb01a15f 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java @@ -21,6 +21,8 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction; public class StreamSource extends AbstractUdfStreamOperator> implements StreamOperator { + private static final long serialVersionUID = 1L; + public StreamSource(SourceFunction sourceFunction) { super(sourceFunction); diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMap.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMap.java index 0be8c90ba2d..e3662d620a5 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMap.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMap.java @@ -25,6 +25,8 @@ public class CoStreamFlatMap extends AbstractUdfStreamOperator> implements TwoInputStreamOperator { + private static final long serialVersionUID = 1L; + public CoStreamFlatMap(CoFlatMapFunction flatMapper) { super(flatMapper); } diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamGroupedReduce.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamGroupedReduce.java index d13671967fb..3dc509a027f 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamGroupedReduce.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamGroupedReduce.java @@ -24,6 +24,7 @@ import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.streaming.api.functions.co.CoReduceFunction; public class CoStreamGroupedReduce extends CoStreamReduce { + private static final long serialVersionUID = 1L; protected KeySelector keySelector1; diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamMap.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamMap.java index 9a98c6669ad..a8e57e35ba8 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamMap.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamMap.java @@ -25,6 +25,8 @@ public class CoStreamMap extends AbstractUdfStreamOperator> implements TwoInputStreamOperator { + private static final long serialVersionUID = 1L; + public CoStreamMap(CoMapFunction mapper) { super(mapper); } diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamReduce.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamReduce.java index 81da1895bf9..7157b1df6b0 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamReduce.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamReduce.java @@ -25,6 +25,8 @@ public class CoStreamReduce extends AbstractUdfStreamOperator> implements TwoInputStreamOperator { + private static final long serialVersionUID = 1L; + protected IN1 currentValue1 = null; protected IN2 currentValue2 = null; diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamWindow.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamWindow.java index 8f2a0b8d3be..e7b069e6cb4 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamWindow.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamWindow.java @@ -33,6 +33,8 @@ public class CoStreamWindow extends AbstractUdfStreamOperator> implements TwoInputStreamOperator { + private static final long serialVersionUID = 1L; + protected long windowSize; protected long slideSize; protected CircularFifoList> circularList1; diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/GroupedActiveDiscretizer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/GroupedActiveDiscretizer.java index 2ff8496d64b..fd951100728 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/GroupedActiveDiscretizer.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/GroupedActiveDiscretizer.java @@ -25,10 +25,9 @@ import org.slf4j.LoggerFactory; public class GroupedActiveDiscretizer extends GroupedStreamDiscretizer { - private static final Logger LOG = LoggerFactory.getLogger(GroupedActiveDiscretizer.class); - + private static final long serialVersionUID = 1L; - private static final long serialVersionUID = -3469545957144404137L; + private static final Logger LOG = LoggerFactory.getLogger(GroupedActiveDiscretizer.class); private volatile IN last; private Thread centralThread; diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/GroupedStreamDiscretizer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/GroupedStreamDiscretizer.java index 7f6a9171d4a..e80b6ab60d4 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/GroupedStreamDiscretizer.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/GroupedStreamDiscretizer.java @@ -36,7 +36,7 @@ import org.apache.flink.streaming.api.windowing.windowbuffer.WindowBuffer; */ public class GroupedStreamDiscretizer extends StreamDiscretizer { - private static final long serialVersionUID = -3469545957144404137L; + private static final long serialVersionUID = 1L; protected KeySelector keySelector; protected Configuration parameters; diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/GroupedWindowBuffer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/GroupedWindowBuffer.java index 4e4350dedc7..c6b2499f128 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/GroupedWindowBuffer.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/GroupedWindowBuffer.java @@ -17,6 +17,8 @@ package org.apache.flink.streaming.api.operators.windowing; +import java.io.IOException; +import java.io.ObjectInputStream; import java.util.HashMap; import java.util.Map; @@ -31,12 +33,21 @@ import org.apache.flink.streaming.api.windowing.windowbuffer.WindowBuffer; */ public class GroupedWindowBuffer extends StreamWindowBuffer { - private Map> windowMap = new HashMap>(); + private static final long serialVersionUID = 1L; + private KeySelector keySelector; + private transient Map> windowMap; + public GroupedWindowBuffer(WindowBuffer buffer, KeySelector keySelector) { super(buffer); this.keySelector = keySelector; + this.windowMap = new HashMap>(); + } + + private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { + in.defaultReadObject(); + this.windowMap = new HashMap>(); } @Override diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/StreamDiscretizer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/StreamDiscretizer.java index fbe6e442bf9..4ab31cb960a 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/StreamDiscretizer.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/StreamDiscretizer.java @@ -36,18 +36,16 @@ public class StreamDiscretizer extends AbstractStreamOperator> implements OneInputStreamOperator> { - /** - * Auto-generated serial version UID - */ - private static final long serialVersionUID = -8038984294071650730L; + private static final long serialVersionUID = 1L; protected TriggerPolicy triggerPolicy; protected EvictionPolicy evictionPolicy; private boolean isActiveTrigger; private boolean isActiveEviction; - private Thread activePolicyThread; private int bufferSize = 0; + private transient Thread activePolicyThread; + protected WindowEvent windowEvent = new WindowEvent(); public StreamDiscretizer(TriggerPolicy triggerPolicy, EvictionPolicy evictionPolicy) { diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/StreamWindowBuffer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/StreamWindowBuffer.java index b9de6986fef..f890b6964f0 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/StreamWindowBuffer.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/StreamWindowBuffer.java @@ -30,6 +30,8 @@ public class StreamWindowBuffer extends AbstractStreamOperator> implements OneInputStreamOperator, StreamWindow> { + private static final long serialVersionUID = 1L; + protected WindowBuffer buffer; public StreamWindowBuffer(WindowBuffer buffer) { diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowFlattener.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowFlattener.java index f1b0ee2410d..3afc50fab2b 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowFlattener.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowFlattener.java @@ -28,6 +28,8 @@ import org.apache.flink.streaming.api.windowing.StreamWindow; public class WindowFlattener extends AbstractStreamOperator implements OneInputStreamOperator, T> { + private static final long serialVersionUID = 1L; + public WindowFlattener() { chainingStrategy = ChainingStrategy.FORCE_ALWAYS; } diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowFolder.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowFolder.java index 29a68dbe88f..bdf67827c5c 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowFolder.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowFolder.java @@ -33,6 +33,7 @@ import org.apache.flink.streaming.api.windowing.StreamWindow; public class WindowFolder extends StreamMap, StreamWindow> { private static final long serialVersionUID = 1L; + FoldFunction folder; public WindowFolder(FoldFunction folder, OUT initialValue) { diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowMerger.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowMerger.java index 12dd2395b16..9b3347405ec 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowMerger.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowMerger.java @@ -32,6 +32,8 @@ import org.apache.flink.streaming.api.windowing.StreamWindow; public class WindowMerger extends AbstractStreamOperator> implements OneInputStreamOperator, StreamWindow> { + private static final long serialVersionUID = 1L; + private Map> windows; public WindowMerger() { diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowPartitioner.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowPartitioner.java index 14a055a565e..b86caaa0661 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowPartitioner.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowPartitioner.java @@ -29,6 +29,8 @@ import org.apache.flink.streaming.api.windowing.StreamWindow; public class WindowPartitioner extends AbstractStreamOperator> implements OneInputStreamOperator, StreamWindow> { + private static final long serialVersionUID = 1L; + private KeySelector keySelector; private int numberOfSplits; diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/ProjectTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/ProjectTest.java index 035abe67803..d9cc607fb4d 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/ProjectTest.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/ProjectTest.java @@ -25,6 +25,7 @@ import java.util.ArrayList; import java.util.HashSet; import java.util.List; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple; @@ -33,6 +34,7 @@ import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.tuple.Tuple5; import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.api.java.typeutils.runtime.TupleSerializer; import org.apache.flink.streaming.api.datastream.StreamProjection; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.SinkFunction; @@ -51,12 +53,13 @@ public class ProjectTest implements Serializable { int[] fields = new int[]{4, 4, 3}; + TupleSerializer> serializer = + new TupleTypeInfo>(StreamProjection.extractFieldTypes(fields, inType)) + .createSerializer(new ExecutionConfig()); @SuppressWarnings("unchecked") StreamProject, Tuple3> operator = new StreamProject, Tuple3>( - fields, - new TupleTypeInfo>(StreamProjection - .extractFieldTypes(fields, inType))); + fields, serializer); List> input = new ArrayList>(); -- GitLab