diff --git a/flink-java/src/main/java/org/apache/flink/api/java/functions/KeySelector.java b/flink-java/src/main/java/org/apache/flink/api/java/functions/KeySelector.java index 21ecc24e9cfb430b5e93f73da683939dabf4e4b1..3d06c598e7fd4ef61364436322693d10a8a32e31 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/functions/KeySelector.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/functions/KeySelector.java @@ -20,6 +20,8 @@ package org.apache.flink.api.java.functions; import org.apache.flink.api.common.functions.Function; +import java.io.Serializable; + /** * The {@link KeySelector} allows to use arbitrary objects for operations such as * reduce, reduceGroup, join, coGoup, etc. @@ -29,7 +31,7 @@ import org.apache.flink.api.common.functions.Function; * @param Type of objects to extract the key from. * @param Type of key. */ -public interface KeySelector extends Function, java.io.Serializable { +public interface KeySelector extends Function, Serializable { /** * User-defined function that extracts the key from an arbitrary object. diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamProjection.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamProjection.java index 447b1fdc08991249693107117ac0b7068094011e..16e9deb725f6dc98d39694ea741039fad2c8a8b3 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamProjection.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamProjection.java @@ -129,7 +129,7 @@ public class StreamProjection { TupleTypeInfo> tType = new TupleTypeInfo>(fTypes); return dataStream.transform("Projection", tType, new StreamProject>( - fieldIndexes, tType)); + fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig()))); } /** @@ -143,7 +143,7 @@ public class StreamProjection { TypeInformation[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType()); TupleTypeInfo> tType = new TupleTypeInfo>(fTypes); - return dataStream.transform("Projection", tType, new StreamProject>(fieldIndexes, tType)); + return dataStream.transform("Projection", tType, new StreamProject>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig()))); } /** @@ -157,7 +157,7 @@ public class StreamProjection { TypeInformation[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType()); TupleTypeInfo> tType = new TupleTypeInfo>(fTypes); - return dataStream.transform("Projection", tType, new StreamProject>(fieldIndexes, tType)); + return dataStream.transform("Projection", tType, new StreamProject>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig()))); } /** @@ -171,7 +171,7 @@ public class StreamProjection { TypeInformation[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType()); TupleTypeInfo> tType = new TupleTypeInfo>(fTypes); - return dataStream.transform("Projection", tType, new StreamProject>(fieldIndexes, tType)); + return dataStream.transform("Projection", tType, new StreamProject>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig()))); } /** @@ -185,7 +185,7 @@ public class StreamProjection { TypeInformation[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType()); TupleTypeInfo> tType = new TupleTypeInfo>(fTypes); - return dataStream.transform("Projection", tType, new StreamProject>(fieldIndexes, tType)); + return dataStream.transform("Projection", tType, new StreamProject>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig()))); } /** @@ -199,7 +199,7 @@ public class StreamProjection { TypeInformation[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType()); TupleTypeInfo> tType = new TupleTypeInfo>(fTypes); - return dataStream.transform("Projection", tType, new StreamProject>(fieldIndexes, tType)); + return dataStream.transform("Projection", tType, new StreamProject>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig()))); } /** @@ -213,7 +213,7 @@ public class StreamProjection { TypeInformation[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType()); TupleTypeInfo> tType = new TupleTypeInfo>(fTypes); - return dataStream.transform("Projection", tType, new StreamProject>(fieldIndexes, tType)); + return dataStream.transform("Projection", tType, new StreamProject>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig()))); } /** @@ -227,7 +227,7 @@ public class StreamProjection { TypeInformation[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType()); TupleTypeInfo> tType = new TupleTypeInfo>(fTypes); - return dataStream.transform("Projection", tType, new StreamProject>(fieldIndexes, tType)); + return dataStream.transform("Projection", tType, new StreamProject>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig()))); } /** @@ -241,7 +241,7 @@ public class StreamProjection { TypeInformation[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType()); TupleTypeInfo> tType = new TupleTypeInfo>(fTypes); - return dataStream.transform("Projection", tType, new StreamProject>(fieldIndexes, tType)); + return dataStream.transform("Projection", tType, new StreamProject>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig()))); } /** @@ -255,7 +255,7 @@ public class StreamProjection { TypeInformation[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType()); TupleTypeInfo> tType = new TupleTypeInfo>(fTypes); - return dataStream.transform("Projection", tType, new StreamProject>(fieldIndexes, tType)); + return dataStream.transform("Projection", tType, new StreamProject>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig()))); } /** @@ -269,7 +269,7 @@ public class StreamProjection { TypeInformation[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType()); TupleTypeInfo> tType = new TupleTypeInfo>(fTypes); - return dataStream.transform("Projection", tType, new StreamProject>(fieldIndexes, tType)); + return dataStream.transform("Projection", tType, new StreamProject>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig()))); } /** @@ -283,7 +283,7 @@ public class StreamProjection { TypeInformation[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType()); TupleTypeInfo> tType = new TupleTypeInfo>(fTypes); - return dataStream.transform("Projection", tType, new StreamProject>(fieldIndexes, tType)); + return dataStream.transform("Projection", tType, new StreamProject>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig()))); } /** @@ -297,7 +297,7 @@ public class StreamProjection { TypeInformation[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType()); TupleTypeInfo> tType = new TupleTypeInfo>(fTypes); - return dataStream.transform("Projection", tType, new StreamProject>(fieldIndexes, tType)); + return dataStream.transform("Projection", tType, new StreamProject>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig()))); } /** @@ -311,7 +311,7 @@ public class StreamProjection { TypeInformation[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType()); TupleTypeInfo> tType = new TupleTypeInfo>(fTypes); - return dataStream.transform("Projection", tType, new StreamProject>(fieldIndexes, tType)); + return dataStream.transform("Projection", tType, new StreamProject>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig()))); } /** @@ -325,7 +325,7 @@ public class StreamProjection { TypeInformation[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType()); TupleTypeInfo> tType = new TupleTypeInfo>(fTypes); - return dataStream.transform("Projection", tType, new StreamProject>(fieldIndexes, tType)); + return dataStream.transform("Projection", tType, new StreamProject>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig()))); } /** @@ -339,7 +339,7 @@ public class StreamProjection { TypeInformation[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType()); TupleTypeInfo> tType = new TupleTypeInfo>(fTypes); - return dataStream.transform("Projection", tType, new StreamProject>(fieldIndexes, tType)); + return dataStream.transform("Projection", tType, new StreamProject>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig()))); } /** @@ -353,7 +353,7 @@ public class StreamProjection { TypeInformation[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType()); TupleTypeInfo> tType = new TupleTypeInfo>(fTypes); - return dataStream.transform("Projection", tType, new StreamProject>(fieldIndexes, tType)); + return dataStream.transform("Projection", tType, new StreamProject>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig()))); } /** @@ -367,7 +367,7 @@ public class StreamProjection { TypeInformation[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType()); TupleTypeInfo> tType = new TupleTypeInfo>(fTypes); - return dataStream.transform("Projection", tType, new StreamProject>(fieldIndexes, tType)); + return dataStream.transform("Projection", tType, new StreamProject>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig()))); } /** @@ -381,7 +381,7 @@ public class StreamProjection { TypeInformation[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType()); TupleTypeInfo> tType = new TupleTypeInfo>(fTypes); - return dataStream.transform("Projection", tType, new StreamProject>(fieldIndexes, tType)); + return dataStream.transform("Projection", tType, new StreamProject>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig()))); } /** @@ -395,7 +395,7 @@ public class StreamProjection { TypeInformation[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType()); TupleTypeInfo> tType = new TupleTypeInfo>(fTypes); - return dataStream.transform("Projection", tType, new StreamProject>(fieldIndexes, tType)); + return dataStream.transform("Projection", tType, new StreamProject>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig()))); } /** @@ -409,7 +409,7 @@ public class StreamProjection { TypeInformation[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType()); TupleTypeInfo> tType = new TupleTypeInfo>(fTypes); - return dataStream.transform("Projection", tType, new StreamProject>(fieldIndexes, tType)); + return dataStream.transform("Projection", tType, new StreamProject>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig()))); } /** @@ -423,7 +423,7 @@ public class StreamProjection { TypeInformation[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType()); TupleTypeInfo> tType = new TupleTypeInfo>(fTypes); - return dataStream.transform("Projection", tType, new StreamProject>(fieldIndexes, tType)); + return dataStream.transform("Projection", tType, new StreamProject>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig()))); } /** @@ -437,7 +437,7 @@ public class StreamProjection { TypeInformation[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType()); TupleTypeInfo> tType = new TupleTypeInfo>(fTypes); - return dataStream.transform("Projection", tType, new StreamProject>(fieldIndexes, tType)); + return dataStream.transform("Projection", tType, new StreamProject>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig()))); } /** @@ -451,7 +451,7 @@ public class StreamProjection { TypeInformation[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType()); TupleTypeInfo> tType = new TupleTypeInfo>(fTypes); - return dataStream.transform("Projection", tType, new StreamProject>(fieldIndexes, tType)); + return dataStream.transform("Projection", tType, new StreamProject>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig()))); } /** @@ -465,7 +465,7 @@ public class StreamProjection { TypeInformation[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType()); TupleTypeInfo> tType = new TupleTypeInfo>(fTypes); - return dataStream.transform("Projection", tType, new StreamProject>(fieldIndexes, tType)); + return dataStream.transform("Projection", tType, new StreamProject>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig()))); } public static TypeInformation[] extractFieldTypes(int[] fields, TypeInformation inType) { diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java index a8dc8c548566b61161ffb0a287279923a561dee5..b55e5d67baad63effbae890618e05bf805001394 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java @@ -31,11 +31,11 @@ public abstract class AbstractStreamOperator implements StreamOperator private static final long serialVersionUID = 1L; - protected RuntimeContext runtimeContext; + protected transient RuntimeContext runtimeContext; - protected ExecutionConfig executionConfig; + protected transient ExecutionConfig executionConfig; - public Output output; + public transient Output output; // A sane default for most operators protected ChainingStrategy chainingStrategy = ChainingStrategy.HEAD; diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java index dbd93b544ce230fcf58e91a53674d4720c7794d4..09d1ef6e1c64bec563179d3fd0b526c805ad33e3 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java @@ -32,7 +32,9 @@ import java.io.Serializable; * @param The output type of the operator * @param The type of the user function */ -public abstract class AbstractUdfStreamOperator extends AbstractStreamOperator implements StatefulStreamOperator { +public abstract class AbstractUdfStreamOperator extends AbstractStreamOperator implements StatefulStreamOperator { + + private static final long serialVersionUID = 1L; protected final F userFunction; diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamCounter.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamCounter.java index d9a67ddd13358335fd7280425b5a5cc8b0b2c2f8..240e2b1aba310b40961a3235448b2994603f48e7 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamCounter.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamCounter.java @@ -19,6 +19,8 @@ package org.apache.flink.streaming.api.operators; public class StreamCounter extends AbstractStreamOperator implements OneInputStreamOperator { + private static final long serialVersionUID = 1L; + private Long count = 0L; public StreamCounter() { diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFilter.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFilter.java index e5575dbbd595482f21e5453411e4bced24e55245..a54a4eaece8ccee055b27f5541ca5f547f6b7b3e 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFilter.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFilter.java @@ -21,6 +21,8 @@ import org.apache.flink.api.common.functions.FilterFunction; public class StreamFilter extends AbstractUdfStreamOperator> implements OneInputStreamOperator { + private static final long serialVersionUID = 1L; + public StreamFilter(FilterFunction filterFunction) { super(filterFunction); chainingStrategy = ChainingStrategy.ALWAYS; diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFlatMap.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFlatMap.java index 1e836b17313fc01acfc875c9910f9c000da27830..e8da2c7ee2c70b493e16dce2e924c9f27a447521 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFlatMap.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFlatMap.java @@ -23,6 +23,8 @@ public class StreamFlatMap extends AbstractUdfStreamOperator> implements OneInputStreamOperator { + private static final long serialVersionUID = 1L; + public StreamFlatMap(FlatMapFunction flatMapper) { super(flatMapper); chainingStrategy = ChainingStrategy.ALWAYS; diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFold.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFold.java index 6f956caeb4528c2d3e07d0e2c7d284e394a85cfb..580477a7c3f4424dd1c340a03dfd9daa98931011 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFold.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFold.java @@ -26,6 +26,8 @@ public class StreamFold extends AbstractUdfStreamOperator> implements OneInputStreamOperator { + private static final long serialVersionUID = 1L; + private OUT accumulator; protected TypeSerializer outTypeSerializer; protected TypeInformation outTypeInformation; diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java index 75217be3ecd1d114d10f53f4ab1cd98634c73162..08107a926eb3931876cd539f7cdfba1254ae0677 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java @@ -26,6 +26,8 @@ import org.apache.flink.api.java.functions.KeySelector; public class StreamGroupedFold extends StreamFold { + private static final long serialVersionUID = 1L; + private KeySelector keySelector; private Map values; private OUT initialValue; diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java index e3980cecf4b2336b9ef9dcdd2d7bdd9ffd84ff18..8269be71286774b7a5ee83936cdcec62d2bc9f4f 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java @@ -25,6 +25,8 @@ import org.apache.flink.api.java.functions.KeySelector; public class StreamGroupedReduce extends StreamReduce { + private static final long serialVersionUID = 1L; + private KeySelector keySelector; private Map values; diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamMap.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamMap.java index a379c566ba206aa5a203c110e080ebab0c78e633..08dc9813af0155b27d74067c5f5ed6ea71fb2866 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamMap.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamMap.java @@ -23,6 +23,8 @@ public class StreamMap extends AbstractUdfStreamOperator> implements OneInputStreamOperator { + private static final long serialVersionUID = 1L; + public StreamMap(MapFunction mapper) { super(mapper); chainingStrategy = ChainingStrategy.ALWAYS; diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamProject.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamProject.java index d039144bf2edbcd92b8b916edea8a71a89936b26..83613d83d9e7eeb00604d82e2419a2f92c59fe9e 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamProject.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamProject.java @@ -17,7 +17,6 @@ package org.apache.flink.streaming.api.operators; -import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.configuration.Configuration; @@ -26,16 +25,18 @@ public class StreamProject extends AbstractStreamOperator implements OneInputStreamOperator { - transient OUT outTuple; - TypeSerializer outTypeSerializer; - TypeInformation outTypeInformation; - int[] fields; - int numFields; + private static final long serialVersionUID = 1L; - public StreamProject(int[] fields, TypeInformation outTypeInformation) { + private TypeSerializer outSerializer; + private int[] fields; + private int numFields; + + private transient OUT outTuple; + + public StreamProject(int[] fields, TypeSerializer outSerializer) { this.fields = fields; this.numFields = this.fields.length; - this.outTypeInformation = outTypeInformation; + this.outSerializer = outSerializer; chainingStrategy = ChainingStrategy.ALWAYS; } @@ -52,7 +53,6 @@ public class StreamProject @Override public void open(Configuration config) throws Exception { super.open(config); - this.outTypeSerializer = outTypeInformation.createSerializer(executionConfig); - outTuple = outTypeSerializer.createInstance(); + outTuple = outSerializer.createInstance(); } } diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamReduce.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamReduce.java index 8205fe6128b28e6c6d7ff3e58bad02fe80afd72c..97cebc18e0bc2a78664f45fd159b4d9ee4689f3a 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamReduce.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamReduce.java @@ -22,6 +22,8 @@ import org.apache.flink.api.common.functions.ReduceFunction; public class StreamReduce extends AbstractUdfStreamOperator> implements OneInputStreamOperator { + private static final long serialVersionUID = 1L; + private IN currentValue; public StreamReduce(ReduceFunction reducer) { diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSink.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSink.java index b1a021257f51676216a420156d2763840ee8cece..539930200508331547a3e10786ea4e525cb4bdfc 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSink.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSink.java @@ -22,6 +22,8 @@ import org.apache.flink.streaming.api.functions.sink.SinkFunction; public class StreamSink extends AbstractUdfStreamOperator> implements OneInputStreamOperator { + private static final long serialVersionUID = 1L; + public StreamSink(SinkFunction sinkFunction) { super(sinkFunction); diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java index 2e4d313daa7abe386dca150c195e8be1cefaff3e..9cdfb01a15fe15cf2b96701b012edb6b46cc0fdc 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java @@ -21,6 +21,8 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction; public class StreamSource extends AbstractUdfStreamOperator> implements StreamOperator { + private static final long serialVersionUID = 1L; + public StreamSource(SourceFunction sourceFunction) { super(sourceFunction); diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMap.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMap.java index 0be8c90ba2d19d4e82c05f3f8317604d85156807..e3662d620a5c2c57608d3128f464730764e5146a 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMap.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMap.java @@ -25,6 +25,8 @@ public class CoStreamFlatMap extends AbstractUdfStreamOperator> implements TwoInputStreamOperator { + private static final long serialVersionUID = 1L; + public CoStreamFlatMap(CoFlatMapFunction flatMapper) { super(flatMapper); } diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamGroupedReduce.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamGroupedReduce.java index d13671967fb638f86a0e69e0eb9aacd3368441a3..3dc509a027f9f5a546dba8b0a1371f8177e20572 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamGroupedReduce.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamGroupedReduce.java @@ -24,6 +24,7 @@ import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.streaming.api.functions.co.CoReduceFunction; public class CoStreamGroupedReduce extends CoStreamReduce { + private static final long serialVersionUID = 1L; protected KeySelector keySelector1; diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamMap.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamMap.java index 9a98c6669adce3ecdf7e5044e2400fd7e3da459c..a8e57e35ba8d0bcd7103fb9f8785bc8c0cc5e21e 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamMap.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamMap.java @@ -25,6 +25,8 @@ public class CoStreamMap extends AbstractUdfStreamOperator> implements TwoInputStreamOperator { + private static final long serialVersionUID = 1L; + public CoStreamMap(CoMapFunction mapper) { super(mapper); } diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamReduce.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamReduce.java index 81da1895bf992a5e28d396b626530b74e8bb91ff..7157b1df6b09b64c47fe94457c21268a4b6cb6ab 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamReduce.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamReduce.java @@ -25,6 +25,8 @@ public class CoStreamReduce extends AbstractUdfStreamOperator> implements TwoInputStreamOperator { + private static final long serialVersionUID = 1L; + protected IN1 currentValue1 = null; protected IN2 currentValue2 = null; diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamWindow.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamWindow.java index 8f2a0b8d3bef5cfac5b56b450370135e4ac59d8f..e7b069e6cb4642422327a80a937bfd739e09c41c 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamWindow.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamWindow.java @@ -33,6 +33,8 @@ public class CoStreamWindow extends AbstractUdfStreamOperator> implements TwoInputStreamOperator { + private static final long serialVersionUID = 1L; + protected long windowSize; protected long slideSize; protected CircularFifoList> circularList1; diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/GroupedActiveDiscretizer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/GroupedActiveDiscretizer.java index 2ff8496d64b4c10a28ea430572f6ac267e73f22d..fd951100728de4ebb600b63e2c1a9f22ad27b881 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/GroupedActiveDiscretizer.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/GroupedActiveDiscretizer.java @@ -25,10 +25,9 @@ import org.slf4j.LoggerFactory; public class GroupedActiveDiscretizer extends GroupedStreamDiscretizer { - private static final Logger LOG = LoggerFactory.getLogger(GroupedActiveDiscretizer.class); - + private static final long serialVersionUID = 1L; - private static final long serialVersionUID = -3469545957144404137L; + private static final Logger LOG = LoggerFactory.getLogger(GroupedActiveDiscretizer.class); private volatile IN last; private Thread centralThread; diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/GroupedStreamDiscretizer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/GroupedStreamDiscretizer.java index 7f6a9171d4a57c44a479e28766073f6eb3832c55..e80b6ab60d4f7d28e6fd9b6ccec9d131d2bad30b 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/GroupedStreamDiscretizer.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/GroupedStreamDiscretizer.java @@ -36,7 +36,7 @@ import org.apache.flink.streaming.api.windowing.windowbuffer.WindowBuffer; */ public class GroupedStreamDiscretizer extends StreamDiscretizer { - private static final long serialVersionUID = -3469545957144404137L; + private static final long serialVersionUID = 1L; protected KeySelector keySelector; protected Configuration parameters; diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/GroupedWindowBuffer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/GroupedWindowBuffer.java index 4e4350dedc7bdbb0cdf0d1fdaa4b6f8d6095261f..c6b2499f12816b2e696368fa57ab541a1f565eed 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/GroupedWindowBuffer.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/GroupedWindowBuffer.java @@ -17,6 +17,8 @@ package org.apache.flink.streaming.api.operators.windowing; +import java.io.IOException; +import java.io.ObjectInputStream; import java.util.HashMap; import java.util.Map; @@ -31,12 +33,21 @@ import org.apache.flink.streaming.api.windowing.windowbuffer.WindowBuffer; */ public class GroupedWindowBuffer extends StreamWindowBuffer { - private Map> windowMap = new HashMap>(); + private static final long serialVersionUID = 1L; + private KeySelector keySelector; + private transient Map> windowMap; + public GroupedWindowBuffer(WindowBuffer buffer, KeySelector keySelector) { super(buffer); this.keySelector = keySelector; + this.windowMap = new HashMap>(); + } + + private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { + in.defaultReadObject(); + this.windowMap = new HashMap>(); } @Override diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/StreamDiscretizer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/StreamDiscretizer.java index fbe6e442bf92dbe9c26dddf989f8d8e449ecc2f8..4ab31cb960a5ab32bf8cb15bd30363cc617e1929 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/StreamDiscretizer.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/StreamDiscretizer.java @@ -36,18 +36,16 @@ public class StreamDiscretizer extends AbstractStreamOperator> implements OneInputStreamOperator> { - /** - * Auto-generated serial version UID - */ - private static final long serialVersionUID = -8038984294071650730L; + private static final long serialVersionUID = 1L; protected TriggerPolicy triggerPolicy; protected EvictionPolicy evictionPolicy; private boolean isActiveTrigger; private boolean isActiveEviction; - private Thread activePolicyThread; private int bufferSize = 0; + private transient Thread activePolicyThread; + protected WindowEvent windowEvent = new WindowEvent(); public StreamDiscretizer(TriggerPolicy triggerPolicy, EvictionPolicy evictionPolicy) { diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/StreamWindowBuffer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/StreamWindowBuffer.java index b9de6986fef9ed213083670e98bdcaf61c116676..f890b6964f09224727eec32477f30a216040bef0 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/StreamWindowBuffer.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/StreamWindowBuffer.java @@ -30,6 +30,8 @@ public class StreamWindowBuffer extends AbstractStreamOperator> implements OneInputStreamOperator, StreamWindow> { + private static final long serialVersionUID = 1L; + protected WindowBuffer buffer; public StreamWindowBuffer(WindowBuffer buffer) { diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowFlattener.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowFlattener.java index f1b0ee2410d016712bc9824441f838cafc49b40d..3afc50fab2b696cfdb7e09e45f58c993f2edd4a3 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowFlattener.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowFlattener.java @@ -28,6 +28,8 @@ import org.apache.flink.streaming.api.windowing.StreamWindow; public class WindowFlattener extends AbstractStreamOperator implements OneInputStreamOperator, T> { + private static final long serialVersionUID = 1L; + public WindowFlattener() { chainingStrategy = ChainingStrategy.FORCE_ALWAYS; } diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowFolder.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowFolder.java index 29a68dbe88f2147749e564d4119e81df5a5b32bb..bdf67827c5c1fb3b8b6601dcbd515b2a4edee92c 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowFolder.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowFolder.java @@ -33,6 +33,7 @@ import org.apache.flink.streaming.api.windowing.StreamWindow; public class WindowFolder extends StreamMap, StreamWindow> { private static final long serialVersionUID = 1L; + FoldFunction folder; public WindowFolder(FoldFunction folder, OUT initialValue) { diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowMerger.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowMerger.java index 12dd2395b160e56d66ae3b9eea319af7d8790f12..9b3347405ecae1c181157303e7a14ef2e5b2e36c 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowMerger.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowMerger.java @@ -32,6 +32,8 @@ import org.apache.flink.streaming.api.windowing.StreamWindow; public class WindowMerger extends AbstractStreamOperator> implements OneInputStreamOperator, StreamWindow> { + private static final long serialVersionUID = 1L; + private Map> windows; public WindowMerger() { diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowPartitioner.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowPartitioner.java index 14a055a565ed64a6c258a0f2462b5b6635398068..b86caaa0661efa1fb704d574bc34697c28086f31 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowPartitioner.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowPartitioner.java @@ -29,6 +29,8 @@ import org.apache.flink.streaming.api.windowing.StreamWindow; public class WindowPartitioner extends AbstractStreamOperator> implements OneInputStreamOperator, StreamWindow> { + private static final long serialVersionUID = 1L; + private KeySelector keySelector; private int numberOfSplits; diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/ProjectTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/ProjectTest.java index 035abe678033940385901fa2f64a99b7dc26023d..d9cc607fb4d26e77a3cebeec780d84c12e15ce56 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/ProjectTest.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/ProjectTest.java @@ -25,6 +25,7 @@ import java.util.ArrayList; import java.util.HashSet; import java.util.List; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple; @@ -33,6 +34,7 @@ import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.tuple.Tuple5; import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.api.java.typeutils.runtime.TupleSerializer; import org.apache.flink.streaming.api.datastream.StreamProjection; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.SinkFunction; @@ -51,12 +53,13 @@ public class ProjectTest implements Serializable { int[] fields = new int[]{4, 4, 3}; + TupleSerializer> serializer = + new TupleTypeInfo>(StreamProjection.extractFieldTypes(fields, inType)) + .createSerializer(new ExecutionConfig()); @SuppressWarnings("unchecked") StreamProject, Tuple3> operator = new StreamProject, Tuple3>( - fields, - new TupleTypeInfo>(StreamProjection - .extractFieldTypes(fields, inType))); + fields, serializer); List> input = new ArrayList>();