提交 8b904ae2 编写于 作者: A Aljoscha Krettek 提交者: Stephan Ewen

[FLINK-2052] Fix Serialization warnings in Stream Operators

This closes #698
上级 495a5c3c
...@@ -20,6 +20,8 @@ package org.apache.flink.api.java.functions; ...@@ -20,6 +20,8 @@ package org.apache.flink.api.java.functions;
import org.apache.flink.api.common.functions.Function; import org.apache.flink.api.common.functions.Function;
import java.io.Serializable;
/** /**
* The {@link KeySelector} allows to use arbitrary objects for operations such as * The {@link KeySelector} allows to use arbitrary objects for operations such as
* reduce, reduceGroup, join, coGoup, etc. * reduce, reduceGroup, join, coGoup, etc.
...@@ -29,7 +31,7 @@ import org.apache.flink.api.common.functions.Function; ...@@ -29,7 +31,7 @@ import org.apache.flink.api.common.functions.Function;
* @param <IN> Type of objects to extract the key from. * @param <IN> Type of objects to extract the key from.
* @param <KEY> Type of key. * @param <KEY> Type of key.
*/ */
public interface KeySelector<IN, KEY> extends Function, java.io.Serializable { public interface KeySelector<IN, KEY> extends Function, Serializable {
/** /**
* User-defined function that extracts the key from an arbitrary object. * User-defined function that extracts the key from an arbitrary object.
......
...@@ -129,7 +129,7 @@ public class StreamProjection<IN> { ...@@ -129,7 +129,7 @@ public class StreamProjection<IN> {
TupleTypeInfo<Tuple1<T0>> tType = new TupleTypeInfo<Tuple1<T0>>(fTypes); TupleTypeInfo<Tuple1<T0>> tType = new TupleTypeInfo<Tuple1<T0>>(fTypes);
return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple1<T0>>( return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple1<T0>>(
fieldIndexes, tType)); fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
} }
/** /**
...@@ -143,7 +143,7 @@ public class StreamProjection<IN> { ...@@ -143,7 +143,7 @@ public class StreamProjection<IN> {
TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType()); TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
TupleTypeInfo<Tuple2<T0, T1>> tType = new TupleTypeInfo<Tuple2<T0, T1>>(fTypes); TupleTypeInfo<Tuple2<T0, T1>> tType = new TupleTypeInfo<Tuple2<T0, T1>>(fTypes);
return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple2<T0, T1>>(fieldIndexes, tType)); return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple2<T0, T1>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
} }
/** /**
...@@ -157,7 +157,7 @@ public class StreamProjection<IN> { ...@@ -157,7 +157,7 @@ public class StreamProjection<IN> {
TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType()); TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
TupleTypeInfo<Tuple3<T0, T1, T2>> tType = new TupleTypeInfo<Tuple3<T0, T1, T2>>(fTypes); TupleTypeInfo<Tuple3<T0, T1, T2>> tType = new TupleTypeInfo<Tuple3<T0, T1, T2>>(fTypes);
return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple3<T0, T1, T2>>(fieldIndexes, tType)); return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple3<T0, T1, T2>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
} }
/** /**
...@@ -171,7 +171,7 @@ public class StreamProjection<IN> { ...@@ -171,7 +171,7 @@ public class StreamProjection<IN> {
TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType()); TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
TupleTypeInfo<Tuple4<T0, T1, T2, T3>> tType = new TupleTypeInfo<Tuple4<T0, T1, T2, T3>>(fTypes); TupleTypeInfo<Tuple4<T0, T1, T2, T3>> tType = new TupleTypeInfo<Tuple4<T0, T1, T2, T3>>(fTypes);
return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple4<T0, T1, T2, T3>>(fieldIndexes, tType)); return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple4<T0, T1, T2, T3>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
} }
/** /**
...@@ -185,7 +185,7 @@ public class StreamProjection<IN> { ...@@ -185,7 +185,7 @@ public class StreamProjection<IN> {
TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType()); TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
TupleTypeInfo<Tuple5<T0, T1, T2, T3, T4>> tType = new TupleTypeInfo<Tuple5<T0, T1, T2, T3, T4>>(fTypes); TupleTypeInfo<Tuple5<T0, T1, T2, T3, T4>> tType = new TupleTypeInfo<Tuple5<T0, T1, T2, T3, T4>>(fTypes);
return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple5<T0, T1, T2, T3, T4>>(fieldIndexes, tType)); return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple5<T0, T1, T2, T3, T4>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
} }
/** /**
...@@ -199,7 +199,7 @@ public class StreamProjection<IN> { ...@@ -199,7 +199,7 @@ public class StreamProjection<IN> {
TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType()); TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
TupleTypeInfo<Tuple6<T0, T1, T2, T3, T4, T5>> tType = new TupleTypeInfo<Tuple6<T0, T1, T2, T3, T4, T5>>(fTypes); TupleTypeInfo<Tuple6<T0, T1, T2, T3, T4, T5>> tType = new TupleTypeInfo<Tuple6<T0, T1, T2, T3, T4, T5>>(fTypes);
return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple6<T0, T1, T2, T3, T4, T5>>(fieldIndexes, tType)); return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple6<T0, T1, T2, T3, T4, T5>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
} }
/** /**
...@@ -213,7 +213,7 @@ public class StreamProjection<IN> { ...@@ -213,7 +213,7 @@ public class StreamProjection<IN> {
TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType()); TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
TupleTypeInfo<Tuple7<T0, T1, T2, T3, T4, T5, T6>> tType = new TupleTypeInfo<Tuple7<T0, T1, T2, T3, T4, T5, T6>>(fTypes); TupleTypeInfo<Tuple7<T0, T1, T2, T3, T4, T5, T6>> tType = new TupleTypeInfo<Tuple7<T0, T1, T2, T3, T4, T5, T6>>(fTypes);
return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple7<T0, T1, T2, T3, T4, T5, T6>>(fieldIndexes, tType)); return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple7<T0, T1, T2, T3, T4, T5, T6>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
} }
/** /**
...@@ -227,7 +227,7 @@ public class StreamProjection<IN> { ...@@ -227,7 +227,7 @@ public class StreamProjection<IN> {
TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType()); TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
TupleTypeInfo<Tuple8<T0, T1, T2, T3, T4, T5, T6, T7>> tType = new TupleTypeInfo<Tuple8<T0, T1, T2, T3, T4, T5, T6, T7>>(fTypes); TupleTypeInfo<Tuple8<T0, T1, T2, T3, T4, T5, T6, T7>> tType = new TupleTypeInfo<Tuple8<T0, T1, T2, T3, T4, T5, T6, T7>>(fTypes);
return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple8<T0, T1, T2, T3, T4, T5, T6, T7>>(fieldIndexes, tType)); return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple8<T0, T1, T2, T3, T4, T5, T6, T7>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
} }
/** /**
...@@ -241,7 +241,7 @@ public class StreamProjection<IN> { ...@@ -241,7 +241,7 @@ public class StreamProjection<IN> {
TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType()); TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
TupleTypeInfo<Tuple9<T0, T1, T2, T3, T4, T5, T6, T7, T8>> tType = new TupleTypeInfo<Tuple9<T0, T1, T2, T3, T4, T5, T6, T7, T8>>(fTypes); TupleTypeInfo<Tuple9<T0, T1, T2, T3, T4, T5, T6, T7, T8>> tType = new TupleTypeInfo<Tuple9<T0, T1, T2, T3, T4, T5, T6, T7, T8>>(fTypes);
return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple9<T0, T1, T2, T3, T4, T5, T6, T7, T8>>(fieldIndexes, tType)); return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple9<T0, T1, T2, T3, T4, T5, T6, T7, T8>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
} }
/** /**
...@@ -255,7 +255,7 @@ public class StreamProjection<IN> { ...@@ -255,7 +255,7 @@ public class StreamProjection<IN> {
TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType()); TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
TupleTypeInfo<Tuple10<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9>> tType = new TupleTypeInfo<Tuple10<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9>>(fTypes); TupleTypeInfo<Tuple10<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9>> tType = new TupleTypeInfo<Tuple10<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9>>(fTypes);
return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple10<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9>>(fieldIndexes, tType)); return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple10<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
} }
/** /**
...@@ -269,7 +269,7 @@ public class StreamProjection<IN> { ...@@ -269,7 +269,7 @@ public class StreamProjection<IN> {
TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType()); TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
TupleTypeInfo<Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>> tType = new TupleTypeInfo<Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>>(fTypes); TupleTypeInfo<Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>> tType = new TupleTypeInfo<Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>>(fTypes);
return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>>(fieldIndexes, tType)); return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
} }
/** /**
...@@ -283,7 +283,7 @@ public class StreamProjection<IN> { ...@@ -283,7 +283,7 @@ public class StreamProjection<IN> {
TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType()); TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
TupleTypeInfo<Tuple12<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>> tType = new TupleTypeInfo<Tuple12<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>>(fTypes); TupleTypeInfo<Tuple12<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>> tType = new TupleTypeInfo<Tuple12<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>>(fTypes);
return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple12<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>>(fieldIndexes, tType)); return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple12<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
} }
/** /**
...@@ -297,7 +297,7 @@ public class StreamProjection<IN> { ...@@ -297,7 +297,7 @@ public class StreamProjection<IN> {
TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType()); TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
TupleTypeInfo<Tuple13<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>> tType = new TupleTypeInfo<Tuple13<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>>(fTypes); TupleTypeInfo<Tuple13<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>> tType = new TupleTypeInfo<Tuple13<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>>(fTypes);
return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple13<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>>(fieldIndexes, tType)); return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple13<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
} }
/** /**
...@@ -311,7 +311,7 @@ public class StreamProjection<IN> { ...@@ -311,7 +311,7 @@ public class StreamProjection<IN> {
TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType()); TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
TupleTypeInfo<Tuple14<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>> tType = new TupleTypeInfo<Tuple14<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>>(fTypes); TupleTypeInfo<Tuple14<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>> tType = new TupleTypeInfo<Tuple14<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>>(fTypes);
return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple14<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>>(fieldIndexes, tType)); return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple14<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
} }
/** /**
...@@ -325,7 +325,7 @@ public class StreamProjection<IN> { ...@@ -325,7 +325,7 @@ public class StreamProjection<IN> {
TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType()); TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
TupleTypeInfo<Tuple15<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>> tType = new TupleTypeInfo<Tuple15<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>>(fTypes); TupleTypeInfo<Tuple15<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>> tType = new TupleTypeInfo<Tuple15<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>>(fTypes);
return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple15<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>>(fieldIndexes, tType)); return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple15<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
} }
/** /**
...@@ -339,7 +339,7 @@ public class StreamProjection<IN> { ...@@ -339,7 +339,7 @@ public class StreamProjection<IN> {
TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType()); TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
TupleTypeInfo<Tuple16<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15>> tType = new TupleTypeInfo<Tuple16<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15>>(fTypes); TupleTypeInfo<Tuple16<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15>> tType = new TupleTypeInfo<Tuple16<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15>>(fTypes);
return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple16<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15>>(fieldIndexes, tType)); return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple16<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
} }
/** /**
...@@ -353,7 +353,7 @@ public class StreamProjection<IN> { ...@@ -353,7 +353,7 @@ public class StreamProjection<IN> {
TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType()); TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
TupleTypeInfo<Tuple17<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16>> tType = new TupleTypeInfo<Tuple17<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16>>(fTypes); TupleTypeInfo<Tuple17<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16>> tType = new TupleTypeInfo<Tuple17<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16>>(fTypes);
return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple17<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16>>(fieldIndexes, tType)); return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple17<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
} }
/** /**
...@@ -367,7 +367,7 @@ public class StreamProjection<IN> { ...@@ -367,7 +367,7 @@ public class StreamProjection<IN> {
TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType()); TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
TupleTypeInfo<Tuple18<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17>> tType = new TupleTypeInfo<Tuple18<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17>>(fTypes); TupleTypeInfo<Tuple18<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17>> tType = new TupleTypeInfo<Tuple18<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17>>(fTypes);
return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple18<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17>>(fieldIndexes, tType)); return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple18<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
} }
/** /**
...@@ -381,7 +381,7 @@ public class StreamProjection<IN> { ...@@ -381,7 +381,7 @@ public class StreamProjection<IN> {
TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType()); TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
TupleTypeInfo<Tuple19<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18>> tType = new TupleTypeInfo<Tuple19<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18>>(fTypes); TupleTypeInfo<Tuple19<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18>> tType = new TupleTypeInfo<Tuple19<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18>>(fTypes);
return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple19<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18>>(fieldIndexes, tType)); return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple19<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
} }
/** /**
...@@ -395,7 +395,7 @@ public class StreamProjection<IN> { ...@@ -395,7 +395,7 @@ public class StreamProjection<IN> {
TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType()); TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
TupleTypeInfo<Tuple20<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19>> tType = new TupleTypeInfo<Tuple20<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19>>(fTypes); TupleTypeInfo<Tuple20<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19>> tType = new TupleTypeInfo<Tuple20<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19>>(fTypes);
return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple20<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19>>(fieldIndexes, tType)); return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple20<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
} }
/** /**
...@@ -409,7 +409,7 @@ public class StreamProjection<IN> { ...@@ -409,7 +409,7 @@ public class StreamProjection<IN> {
TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType()); TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
TupleTypeInfo<Tuple21<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20>> tType = new TupleTypeInfo<Tuple21<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20>>(fTypes); TupleTypeInfo<Tuple21<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20>> tType = new TupleTypeInfo<Tuple21<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20>>(fTypes);
return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple21<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20>>(fieldIndexes, tType)); return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple21<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
} }
/** /**
...@@ -423,7 +423,7 @@ public class StreamProjection<IN> { ...@@ -423,7 +423,7 @@ public class StreamProjection<IN> {
TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType()); TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
TupleTypeInfo<Tuple22<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21>> tType = new TupleTypeInfo<Tuple22<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21>>(fTypes); TupleTypeInfo<Tuple22<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21>> tType = new TupleTypeInfo<Tuple22<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21>>(fTypes);
return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple22<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21>>(fieldIndexes, tType)); return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple22<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
} }
/** /**
...@@ -437,7 +437,7 @@ public class StreamProjection<IN> { ...@@ -437,7 +437,7 @@ public class StreamProjection<IN> {
TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType()); TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
TupleTypeInfo<Tuple23<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22>> tType = new TupleTypeInfo<Tuple23<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22>>(fTypes); TupleTypeInfo<Tuple23<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22>> tType = new TupleTypeInfo<Tuple23<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22>>(fTypes);
return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple23<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22>>(fieldIndexes, tType)); return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple23<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
} }
/** /**
...@@ -451,7 +451,7 @@ public class StreamProjection<IN> { ...@@ -451,7 +451,7 @@ public class StreamProjection<IN> {
TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType()); TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
TupleTypeInfo<Tuple24<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23>> tType = new TupleTypeInfo<Tuple24<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23>>(fTypes); TupleTypeInfo<Tuple24<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23>> tType = new TupleTypeInfo<Tuple24<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23>>(fTypes);
return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple24<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23>>(fieldIndexes, tType)); return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple24<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
} }
/** /**
...@@ -465,7 +465,7 @@ public class StreamProjection<IN> { ...@@ -465,7 +465,7 @@ public class StreamProjection<IN> {
TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType()); TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
TupleTypeInfo<Tuple25<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24>> tType = new TupleTypeInfo<Tuple25<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24>>(fTypes); TupleTypeInfo<Tuple25<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24>> tType = new TupleTypeInfo<Tuple25<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24>>(fTypes);
return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple25<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24>>(fieldIndexes, tType)); return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple25<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
} }
public static TypeInformation<?>[] extractFieldTypes(int[] fields, TypeInformation<?> inType) { public static TypeInformation<?>[] extractFieldTypes(int[] fields, TypeInformation<?> inType) {
......
...@@ -31,11 +31,11 @@ public abstract class AbstractStreamOperator<OUT> implements StreamOperator<OUT> ...@@ -31,11 +31,11 @@ public abstract class AbstractStreamOperator<OUT> implements StreamOperator<OUT>
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
protected RuntimeContext runtimeContext; protected transient RuntimeContext runtimeContext;
protected ExecutionConfig executionConfig; protected transient ExecutionConfig executionConfig;
public Output<OUT> output; public transient Output<OUT> output;
// A sane default for most operators // A sane default for most operators
protected ChainingStrategy chainingStrategy = ChainingStrategy.HEAD; protected ChainingStrategy chainingStrategy = ChainingStrategy.HEAD;
......
...@@ -32,7 +32,9 @@ import java.io.Serializable; ...@@ -32,7 +32,9 @@ import java.io.Serializable;
* @param <OUT> The output type of the operator * @param <OUT> The output type of the operator
* @param <F> The type of the user function * @param <F> The type of the user function
*/ */
public abstract class AbstractUdfStreamOperator<OUT, F extends Function> extends AbstractStreamOperator<OUT> implements StatefulStreamOperator<OUT> { public abstract class AbstractUdfStreamOperator<OUT, F extends Function & Serializable> extends AbstractStreamOperator<OUT> implements StatefulStreamOperator<OUT> {
private static final long serialVersionUID = 1L;
protected final F userFunction; protected final F userFunction;
......
...@@ -19,6 +19,8 @@ package org.apache.flink.streaming.api.operators; ...@@ -19,6 +19,8 @@ package org.apache.flink.streaming.api.operators;
public class StreamCounter<IN> extends AbstractStreamOperator<Long> implements OneInputStreamOperator<IN, Long> { public class StreamCounter<IN> extends AbstractStreamOperator<Long> implements OneInputStreamOperator<IN, Long> {
private static final long serialVersionUID = 1L;
private Long count = 0L; private Long count = 0L;
public StreamCounter() { public StreamCounter() {
......
...@@ -21,6 +21,8 @@ import org.apache.flink.api.common.functions.FilterFunction; ...@@ -21,6 +21,8 @@ import org.apache.flink.api.common.functions.FilterFunction;
public class StreamFilter<IN> extends AbstractUdfStreamOperator<IN, FilterFunction<IN>> implements OneInputStreamOperator<IN, IN> { public class StreamFilter<IN> extends AbstractUdfStreamOperator<IN, FilterFunction<IN>> implements OneInputStreamOperator<IN, IN> {
private static final long serialVersionUID = 1L;
public StreamFilter(FilterFunction<IN> filterFunction) { public StreamFilter(FilterFunction<IN> filterFunction) {
super(filterFunction); super(filterFunction);
chainingStrategy = ChainingStrategy.ALWAYS; chainingStrategy = ChainingStrategy.ALWAYS;
......
...@@ -23,6 +23,8 @@ public class StreamFlatMap<IN, OUT> ...@@ -23,6 +23,8 @@ public class StreamFlatMap<IN, OUT>
extends AbstractUdfStreamOperator<OUT, FlatMapFunction<IN, OUT>> extends AbstractUdfStreamOperator<OUT, FlatMapFunction<IN, OUT>>
implements OneInputStreamOperator<IN, OUT> { implements OneInputStreamOperator<IN, OUT> {
private static final long serialVersionUID = 1L;
public StreamFlatMap(FlatMapFunction<IN, OUT> flatMapper) { public StreamFlatMap(FlatMapFunction<IN, OUT> flatMapper) {
super(flatMapper); super(flatMapper);
chainingStrategy = ChainingStrategy.ALWAYS; chainingStrategy = ChainingStrategy.ALWAYS;
......
...@@ -26,6 +26,8 @@ public class StreamFold<IN, OUT> ...@@ -26,6 +26,8 @@ public class StreamFold<IN, OUT>
extends AbstractUdfStreamOperator<OUT, FoldFunction<IN, OUT>> extends AbstractUdfStreamOperator<OUT, FoldFunction<IN, OUT>>
implements OneInputStreamOperator<IN, OUT> { implements OneInputStreamOperator<IN, OUT> {
private static final long serialVersionUID = 1L;
private OUT accumulator; private OUT accumulator;
protected TypeSerializer<OUT> outTypeSerializer; protected TypeSerializer<OUT> outTypeSerializer;
protected TypeInformation<OUT> outTypeInformation; protected TypeInformation<OUT> outTypeInformation;
......
...@@ -26,6 +26,8 @@ import org.apache.flink.api.java.functions.KeySelector; ...@@ -26,6 +26,8 @@ import org.apache.flink.api.java.functions.KeySelector;
public class StreamGroupedFold<IN, OUT> extends StreamFold<IN, OUT> { public class StreamGroupedFold<IN, OUT> extends StreamFold<IN, OUT> {
private static final long serialVersionUID = 1L;
private KeySelector<IN, ?> keySelector; private KeySelector<IN, ?> keySelector;
private Map<Object, OUT> values; private Map<Object, OUT> values;
private OUT initialValue; private OUT initialValue;
......
...@@ -25,6 +25,8 @@ import org.apache.flink.api.java.functions.KeySelector; ...@@ -25,6 +25,8 @@ import org.apache.flink.api.java.functions.KeySelector;
public class StreamGroupedReduce<IN> extends StreamReduce<IN> { public class StreamGroupedReduce<IN> extends StreamReduce<IN> {
private static final long serialVersionUID = 1L;
private KeySelector<IN, ?> keySelector; private KeySelector<IN, ?> keySelector;
private Map<Object, IN> values; private Map<Object, IN> values;
......
...@@ -23,6 +23,8 @@ public class StreamMap<IN, OUT> ...@@ -23,6 +23,8 @@ public class StreamMap<IN, OUT>
extends AbstractUdfStreamOperator<OUT, MapFunction<IN, OUT>> extends AbstractUdfStreamOperator<OUT, MapFunction<IN, OUT>>
implements OneInputStreamOperator<IN, OUT> { implements OneInputStreamOperator<IN, OUT> {
private static final long serialVersionUID = 1L;
public StreamMap(MapFunction<IN, OUT> mapper) { public StreamMap(MapFunction<IN, OUT> mapper) {
super(mapper); super(mapper);
chainingStrategy = ChainingStrategy.ALWAYS; chainingStrategy = ChainingStrategy.ALWAYS;
......
...@@ -17,7 +17,6 @@ ...@@ -17,7 +17,6 @@
package org.apache.flink.streaming.api.operators; package org.apache.flink.streaming.api.operators;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.Configuration;
...@@ -26,16 +25,18 @@ public class StreamProject<IN, OUT extends Tuple> ...@@ -26,16 +25,18 @@ public class StreamProject<IN, OUT extends Tuple>
extends AbstractStreamOperator<OUT> extends AbstractStreamOperator<OUT>
implements OneInputStreamOperator<IN, OUT> { implements OneInputStreamOperator<IN, OUT> {
transient OUT outTuple; private static final long serialVersionUID = 1L;
TypeSerializer<OUT> outTypeSerializer;
TypeInformation<OUT> outTypeInformation;
int[] fields;
int numFields;
public StreamProject(int[] fields, TypeInformation<OUT> outTypeInformation) { private TypeSerializer<OUT> outSerializer;
private int[] fields;
private int numFields;
private transient OUT outTuple;
public StreamProject(int[] fields, TypeSerializer<OUT> outSerializer) {
this.fields = fields; this.fields = fields;
this.numFields = this.fields.length; this.numFields = this.fields.length;
this.outTypeInformation = outTypeInformation; this.outSerializer = outSerializer;
chainingStrategy = ChainingStrategy.ALWAYS; chainingStrategy = ChainingStrategy.ALWAYS;
} }
...@@ -52,7 +53,6 @@ public class StreamProject<IN, OUT extends Tuple> ...@@ -52,7 +53,6 @@ public class StreamProject<IN, OUT extends Tuple>
@Override @Override
public void open(Configuration config) throws Exception { public void open(Configuration config) throws Exception {
super.open(config); super.open(config);
this.outTypeSerializer = outTypeInformation.createSerializer(executionConfig); outTuple = outSerializer.createInstance();
outTuple = outTypeSerializer.createInstance();
} }
} }
...@@ -22,6 +22,8 @@ import org.apache.flink.api.common.functions.ReduceFunction; ...@@ -22,6 +22,8 @@ import org.apache.flink.api.common.functions.ReduceFunction;
public class StreamReduce<IN> extends AbstractUdfStreamOperator<IN, ReduceFunction<IN>> public class StreamReduce<IN> extends AbstractUdfStreamOperator<IN, ReduceFunction<IN>>
implements OneInputStreamOperator<IN, IN> { implements OneInputStreamOperator<IN, IN> {
private static final long serialVersionUID = 1L;
private IN currentValue; private IN currentValue;
public StreamReduce(ReduceFunction<IN> reducer) { public StreamReduce(ReduceFunction<IN> reducer) {
......
...@@ -22,6 +22,8 @@ import org.apache.flink.streaming.api.functions.sink.SinkFunction; ...@@ -22,6 +22,8 @@ import org.apache.flink.streaming.api.functions.sink.SinkFunction;
public class StreamSink<IN> extends AbstractUdfStreamOperator<Object, SinkFunction<IN>> public class StreamSink<IN> extends AbstractUdfStreamOperator<Object, SinkFunction<IN>>
implements OneInputStreamOperator<IN, Object> { implements OneInputStreamOperator<IN, Object> {
private static final long serialVersionUID = 1L;
public StreamSink(SinkFunction<IN> sinkFunction) { public StreamSink(SinkFunction<IN> sinkFunction) {
super(sinkFunction); super(sinkFunction);
......
...@@ -21,6 +21,8 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction; ...@@ -21,6 +21,8 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction;
public class StreamSource<OUT> extends AbstractUdfStreamOperator<OUT, SourceFunction<OUT>> implements StreamOperator<OUT> { public class StreamSource<OUT> extends AbstractUdfStreamOperator<OUT, SourceFunction<OUT>> implements StreamOperator<OUT> {
private static final long serialVersionUID = 1L;
public StreamSource(SourceFunction<OUT> sourceFunction) { public StreamSource(SourceFunction<OUT> sourceFunction) {
super(sourceFunction); super(sourceFunction);
......
...@@ -25,6 +25,8 @@ public class CoStreamFlatMap<IN1, IN2, OUT> ...@@ -25,6 +25,8 @@ public class CoStreamFlatMap<IN1, IN2, OUT>
extends AbstractUdfStreamOperator<OUT, CoFlatMapFunction<IN1, IN2, OUT>> extends AbstractUdfStreamOperator<OUT, CoFlatMapFunction<IN1, IN2, OUT>>
implements TwoInputStreamOperator<IN1, IN2, OUT> { implements TwoInputStreamOperator<IN1, IN2, OUT> {
private static final long serialVersionUID = 1L;
public CoStreamFlatMap(CoFlatMapFunction<IN1, IN2, OUT> flatMapper) { public CoStreamFlatMap(CoFlatMapFunction<IN1, IN2, OUT> flatMapper) {
super(flatMapper); super(flatMapper);
} }
......
...@@ -24,6 +24,7 @@ import org.apache.flink.api.java.functions.KeySelector; ...@@ -24,6 +24,7 @@ import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.functions.co.CoReduceFunction; import org.apache.flink.streaming.api.functions.co.CoReduceFunction;
public class CoStreamGroupedReduce<IN1, IN2, OUT> extends CoStreamReduce<IN1, IN2, OUT> { public class CoStreamGroupedReduce<IN1, IN2, OUT> extends CoStreamReduce<IN1, IN2, OUT> {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
protected KeySelector<IN1, ?> keySelector1; protected KeySelector<IN1, ?> keySelector1;
......
...@@ -25,6 +25,8 @@ public class CoStreamMap<IN1, IN2, OUT> ...@@ -25,6 +25,8 @@ public class CoStreamMap<IN1, IN2, OUT>
extends AbstractUdfStreamOperator<OUT, CoMapFunction<IN1, IN2, OUT>> extends AbstractUdfStreamOperator<OUT, CoMapFunction<IN1, IN2, OUT>>
implements TwoInputStreamOperator<IN1, IN2, OUT> { implements TwoInputStreamOperator<IN1, IN2, OUT> {
private static final long serialVersionUID = 1L;
public CoStreamMap(CoMapFunction<IN1, IN2, OUT> mapper) { public CoStreamMap(CoMapFunction<IN1, IN2, OUT> mapper) {
super(mapper); super(mapper);
} }
......
...@@ -25,6 +25,8 @@ public class CoStreamReduce<IN1, IN2, OUT> ...@@ -25,6 +25,8 @@ public class CoStreamReduce<IN1, IN2, OUT>
extends AbstractUdfStreamOperator<OUT, CoReduceFunction<IN1, IN2, OUT>> extends AbstractUdfStreamOperator<OUT, CoReduceFunction<IN1, IN2, OUT>>
implements TwoInputStreamOperator<IN1, IN2, OUT> { implements TwoInputStreamOperator<IN1, IN2, OUT> {
private static final long serialVersionUID = 1L;
protected IN1 currentValue1 = null; protected IN1 currentValue1 = null;
protected IN2 currentValue2 = null; protected IN2 currentValue2 = null;
......
...@@ -33,6 +33,8 @@ public class CoStreamWindow<IN1, IN2, OUT> ...@@ -33,6 +33,8 @@ public class CoStreamWindow<IN1, IN2, OUT>
extends AbstractUdfStreamOperator<OUT, CoWindowFunction<IN1, IN2, OUT>> extends AbstractUdfStreamOperator<OUT, CoWindowFunction<IN1, IN2, OUT>>
implements TwoInputStreamOperator<IN1, IN2, OUT> { implements TwoInputStreamOperator<IN1, IN2, OUT> {
private static final long serialVersionUID = 1L;
protected long windowSize; protected long windowSize;
protected long slideSize; protected long slideSize;
protected CircularFifoList<StreamRecord<IN1>> circularList1; protected CircularFifoList<StreamRecord<IN1>> circularList1;
......
...@@ -25,10 +25,9 @@ import org.slf4j.LoggerFactory; ...@@ -25,10 +25,9 @@ import org.slf4j.LoggerFactory;
public class GroupedActiveDiscretizer<IN> extends GroupedStreamDiscretizer<IN> { public class GroupedActiveDiscretizer<IN> extends GroupedStreamDiscretizer<IN> {
private static final Logger LOG = LoggerFactory.getLogger(GroupedActiveDiscretizer.class); private static final long serialVersionUID = 1L;
private static final long serialVersionUID = -3469545957144404137L; private static final Logger LOG = LoggerFactory.getLogger(GroupedActiveDiscretizer.class);
private volatile IN last; private volatile IN last;
private Thread centralThread; private Thread centralThread;
......
...@@ -36,7 +36,7 @@ import org.apache.flink.streaming.api.windowing.windowbuffer.WindowBuffer; ...@@ -36,7 +36,7 @@ import org.apache.flink.streaming.api.windowing.windowbuffer.WindowBuffer;
*/ */
public class GroupedStreamDiscretizer<IN> extends StreamDiscretizer<IN> { public class GroupedStreamDiscretizer<IN> extends StreamDiscretizer<IN> {
private static final long serialVersionUID = -3469545957144404137L; private static final long serialVersionUID = 1L;
protected KeySelector<IN, ?> keySelector; protected KeySelector<IN, ?> keySelector;
protected Configuration parameters; protected Configuration parameters;
......
...@@ -17,6 +17,8 @@ ...@@ -17,6 +17,8 @@
package org.apache.flink.streaming.api.operators.windowing; package org.apache.flink.streaming.api.operators.windowing;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
...@@ -31,12 +33,21 @@ import org.apache.flink.streaming.api.windowing.windowbuffer.WindowBuffer; ...@@ -31,12 +33,21 @@ import org.apache.flink.streaming.api.windowing.windowbuffer.WindowBuffer;
*/ */
public class GroupedWindowBuffer<T> extends StreamWindowBuffer<T> { public class GroupedWindowBuffer<T> extends StreamWindowBuffer<T> {
private Map<Object, WindowBuffer<T>> windowMap = new HashMap<Object, WindowBuffer<T>>(); private static final long serialVersionUID = 1L;
private KeySelector<T, ?> keySelector; private KeySelector<T, ?> keySelector;
private transient Map<Object, WindowBuffer<T>> windowMap;
public GroupedWindowBuffer(WindowBuffer<T> buffer, KeySelector<T, ?> keySelector) { public GroupedWindowBuffer(WindowBuffer<T> buffer, KeySelector<T, ?> keySelector) {
super(buffer); super(buffer);
this.keySelector = keySelector; this.keySelector = keySelector;
this.windowMap = new HashMap<Object, WindowBuffer<T>>();
}
private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
in.defaultReadObject();
this.windowMap = new HashMap<Object, WindowBuffer<T>>();
} }
@Override @Override
......
...@@ -36,18 +36,16 @@ public class StreamDiscretizer<IN> ...@@ -36,18 +36,16 @@ public class StreamDiscretizer<IN>
extends AbstractStreamOperator<WindowEvent<IN>> extends AbstractStreamOperator<WindowEvent<IN>>
implements OneInputStreamOperator<IN, WindowEvent<IN>> { implements OneInputStreamOperator<IN, WindowEvent<IN>> {
/** private static final long serialVersionUID = 1L;
* Auto-generated serial version UID
*/
private static final long serialVersionUID = -8038984294071650730L;
protected TriggerPolicy<IN> triggerPolicy; protected TriggerPolicy<IN> triggerPolicy;
protected EvictionPolicy<IN> evictionPolicy; protected EvictionPolicy<IN> evictionPolicy;
private boolean isActiveTrigger; private boolean isActiveTrigger;
private boolean isActiveEviction; private boolean isActiveEviction;
private Thread activePolicyThread;
private int bufferSize = 0; private int bufferSize = 0;
private transient Thread activePolicyThread;
protected WindowEvent<IN> windowEvent = new WindowEvent<IN>(); protected WindowEvent<IN> windowEvent = new WindowEvent<IN>();
public StreamDiscretizer(TriggerPolicy<IN> triggerPolicy, EvictionPolicy<IN> evictionPolicy) { public StreamDiscretizer(TriggerPolicy<IN> triggerPolicy, EvictionPolicy<IN> evictionPolicy) {
......
...@@ -30,6 +30,8 @@ public class StreamWindowBuffer<T> ...@@ -30,6 +30,8 @@ public class StreamWindowBuffer<T>
extends AbstractStreamOperator<StreamWindow<T>> extends AbstractStreamOperator<StreamWindow<T>>
implements OneInputStreamOperator<WindowEvent<T>, StreamWindow<T>> { implements OneInputStreamOperator<WindowEvent<T>, StreamWindow<T>> {
private static final long serialVersionUID = 1L;
protected WindowBuffer<T> buffer; protected WindowBuffer<T> buffer;
public StreamWindowBuffer(WindowBuffer<T> buffer) { public StreamWindowBuffer(WindowBuffer<T> buffer) {
......
...@@ -28,6 +28,8 @@ import org.apache.flink.streaming.api.windowing.StreamWindow; ...@@ -28,6 +28,8 @@ import org.apache.flink.streaming.api.windowing.StreamWindow;
public class WindowFlattener<T> extends AbstractStreamOperator<T> public class WindowFlattener<T> extends AbstractStreamOperator<T>
implements OneInputStreamOperator<StreamWindow<T>, T> { implements OneInputStreamOperator<StreamWindow<T>, T> {
private static final long serialVersionUID = 1L;
public WindowFlattener() { public WindowFlattener() {
chainingStrategy = ChainingStrategy.FORCE_ALWAYS; chainingStrategy = ChainingStrategy.FORCE_ALWAYS;
} }
......
...@@ -33,6 +33,7 @@ import org.apache.flink.streaming.api.windowing.StreamWindow; ...@@ -33,6 +33,7 @@ import org.apache.flink.streaming.api.windowing.StreamWindow;
public class WindowFolder<IN, OUT> extends StreamMap<StreamWindow<IN>, StreamWindow<OUT>> { public class WindowFolder<IN, OUT> extends StreamMap<StreamWindow<IN>, StreamWindow<OUT>> {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
FoldFunction<IN, OUT> folder; FoldFunction<IN, OUT> folder;
public WindowFolder(FoldFunction<IN, OUT> folder, OUT initialValue) { public WindowFolder(FoldFunction<IN, OUT> folder, OUT initialValue) {
......
...@@ -32,6 +32,8 @@ import org.apache.flink.streaming.api.windowing.StreamWindow; ...@@ -32,6 +32,8 @@ import org.apache.flink.streaming.api.windowing.StreamWindow;
public class WindowMerger<T> extends AbstractStreamOperator<StreamWindow<T>> public class WindowMerger<T> extends AbstractStreamOperator<StreamWindow<T>>
implements OneInputStreamOperator<StreamWindow<T>, StreamWindow<T>> { implements OneInputStreamOperator<StreamWindow<T>, StreamWindow<T>> {
private static final long serialVersionUID = 1L;
private Map<Integer, StreamWindow<T>> windows; private Map<Integer, StreamWindow<T>> windows;
public WindowMerger() { public WindowMerger() {
......
...@@ -29,6 +29,8 @@ import org.apache.flink.streaming.api.windowing.StreamWindow; ...@@ -29,6 +29,8 @@ import org.apache.flink.streaming.api.windowing.StreamWindow;
public class WindowPartitioner<T> extends AbstractStreamOperator<StreamWindow<T>> public class WindowPartitioner<T> extends AbstractStreamOperator<StreamWindow<T>>
implements OneInputStreamOperator<StreamWindow<T>, StreamWindow<T>> { implements OneInputStreamOperator<StreamWindow<T>, StreamWindow<T>> {
private static final long serialVersionUID = 1L;
private KeySelector<T, ?> keySelector; private KeySelector<T, ?> keySelector;
private int numberOfSplits; private int numberOfSplits;
......
...@@ -25,6 +25,7 @@ import java.util.ArrayList; ...@@ -25,6 +25,7 @@ import java.util.ArrayList;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple;
...@@ -33,6 +34,7 @@ import org.apache.flink.api.java.tuple.Tuple3; ...@@ -33,6 +34,7 @@ import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple5; import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
import org.apache.flink.streaming.api.datastream.StreamProjection; import org.apache.flink.streaming.api.datastream.StreamProjection;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.streaming.api.functions.sink.SinkFunction;
...@@ -51,12 +53,13 @@ public class ProjectTest implements Serializable { ...@@ -51,12 +53,13 @@ public class ProjectTest implements Serializable {
int[] fields = new int[]{4, 4, 3}; int[] fields = new int[]{4, 4, 3};
TupleSerializer<Tuple3<Integer, Integer, String>> serializer =
new TupleTypeInfo<Tuple3<Integer, Integer, String>>(StreamProjection.extractFieldTypes(fields, inType))
.createSerializer(new ExecutionConfig());
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
StreamProject<Tuple5<Integer, String, Integer, String, Integer>, Tuple3<Integer, Integer, String>> operator = StreamProject<Tuple5<Integer, String, Integer, String, Integer>, Tuple3<Integer, Integer, String>> operator =
new StreamProject<Tuple5<Integer, String, Integer, String, Integer>, Tuple3<Integer, Integer, String>>( new StreamProject<Tuple5<Integer, String, Integer, String, Integer>, Tuple3<Integer, Integer, String>>(
fields, fields, serializer);
new TupleTypeInfo<Tuple3<Integer, Integer, String>>(StreamProjection
.extractFieldTypes(fields, inType)));
List<Tuple5<Integer, String, Integer, String, Integer>> input = new ArrayList<Tuple5<Integer, String, Integer, List<Tuple5<Integer, String, Integer, String, Integer>> input = new ArrayList<Tuple5<Integer, String, Integer,
String, Integer>>(); String, Integer>>();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册