From fac773488a77c0c70ed74a2dca09224f275ec77a Mon Sep 17 00:00:00 2001 From: Gyula Fora Date: Fri, 2 Jan 2015 20:04:51 +0100 Subject: [PATCH] [scala] [streaming] Added scala window helpers + timestamp rework for lambda support --- .../api/datastream/CoWindowDataStream.java | 20 +++--- .../api/datastream/ConnectedDataStream.java | 30 ++++---- .../api/datastream/TemporalOperator.java | 39 +++++----- .../co/CoGroupedWindowReduceInvokable.java | 4 +- .../operator/co/CoWindowInvokable.java | 8 +-- .../operator/co/CoWindowReduceInvokable.java | 15 ++-- .../streaming/api/windowing/helper/Delta.java | 4 +- .../helper/SystemTimestamp.java} | 18 +++-- .../streaming/api/windowing/helper/Time.java | 72 +++++++++++++++---- .../helper/Timestamp.java} | 11 +-- .../windowing/helper/TimestampWrapper.java | 44 ++++++++++++ .../windowing/policy/ActiveTriggerPolicy.java | 6 +- .../windowing/policy/TimeEvictionPolicy.java | 31 ++++---- .../windowing/policy/TimeTriggerPolicy.java | 42 +++++------ .../streaming/api/WindowCrossJoinTest.java | 30 ++++---- .../operator/CoGroupedWindowReduceTest.java | 5 +- .../operator/CoWindowReduceTest.java | 5 +- .../api/invokable/operator/CoWindowTest.java | 23 +++--- .../operator/GroupedWindowInvokableTest.java | 21 +++--- .../operator/WindowInvokableTest.java | 16 ++--- .../policy/TimeEvictionPolicyTest.java | 12 ++-- .../policy/TimeTriggerPolicyTest.java | 28 +++----- .../windowing/DeltaExtractExample.java | 6 +- .../api/scala/streaming/DataStream.scala | 19 +---- .../scala/streaming/StreamCrossOperator.scala | 2 +- .../StreamExecutionEnvironment.scala | 12 +++- .../scala/streaming/StreamJoinOperator.scala | 1 + .../scala/streaming/WindowedDataStream.scala | 6 +- .../api/scala/streaming/windowing/Delta.scala | 47 ++++++++++++ .../api/scala/streaming/windowing/Time.scala | 55 ++++++++++++++ 30 files changed, 388 insertions(+), 244 deletions(-) rename flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/{invokable/util/DefaultTimeStamp.java => windowing/helper/SystemTimestamp.java} (71%) rename flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/{invokable/util/TimeStamp.java => windowing/helper/Timestamp.java} (83%) create mode 100644 flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/TimestampWrapper.java create mode 100644 flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/windowing/Delta.scala create mode 100644 flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/windowing/Time.scala diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/CoWindowDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/CoWindowDataStream.java index c8c634aae5f..9129f9e8311 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/CoWindowDataStream.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/CoWindowDataStream.java @@ -22,33 +22,33 @@ import org.apache.flink.streaming.api.function.co.CoReduceFunction; import org.apache.flink.streaming.api.invokable.operator.co.CoGroupedWindowReduceInvokable; import org.apache.flink.streaming.api.invokable.operator.co.CoInvokable; import org.apache.flink.streaming.api.invokable.operator.co.CoWindowReduceInvokable; -import org.apache.flink.streaming.api.invokable.util.TimeStamp; +import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper; /** * A {@link CoWindowDataStream} represents two data streams whose elements are * batched together into sliding windows. Operation * {@link #reduce(CoReduceFunction)} can be applied for each window. - * + * * @param * The type of the first input data stream * @param * The type of the second input data stream */ public class CoWindowDataStream extends CoBatchedDataStream { - TimeStamp timeStamp1; - TimeStamp timeStamp2; + TimestampWrapper timeStamp1; + TimestampWrapper timeStamp2; protected CoWindowDataStream(DataStream dataStream1, DataStream dataStream2, long windowSize1, long windowSize2, long slideInterval1, long slideInterval2, - TimeStamp timeStamp1, TimeStamp timeStamp2) { + TimestampWrapper timeStamp1, TimestampWrapper timeStamp2) { super(dataStream1, dataStream2, windowSize1, windowSize2, slideInterval1, slideInterval2); this.timeStamp1 = timeStamp1; this.timeStamp2 = timeStamp2; } protected CoWindowDataStream(ConnectedDataStream coDataStream, long windowSize1, - long windowSize2, long slideInterval1, long slideInterval2, TimeStamp timeStamp1, - TimeStamp timeStamp2) { + long windowSize2, long slideInterval1, long slideInterval2, + TimestampWrapper timeStamp1, TimestampWrapper timeStamp2) { super(coDataStream, windowSize1, windowSize2, slideInterval1, slideInterval2); this.timeStamp1 = timeStamp1; this.timeStamp2 = timeStamp2; @@ -96,9 +96,9 @@ public class CoWindowDataStream extends CoBatchedDataStream CoReduceFunction coReducer) { CoWindowReduceInvokable invokable; if (isGrouped) { - invokable = new CoGroupedWindowReduceInvokable(clean(coReducer), batchSize1, - batchSize2, slideSize1, slideSize2, keySelector1, keySelector2, timeStamp1, - timeStamp2); + invokable = new CoGroupedWindowReduceInvokable(clean(coReducer), + batchSize1, batchSize2, slideSize1, slideSize2, keySelector1, keySelector2, + timeStamp1, timeStamp2); } else { invokable = new CoWindowReduceInvokable(clean(coReducer), batchSize1, batchSize2, slideSize1, slideSize2, timeStamp1, timeStamp2); diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java index 39b6460f01b..efd95314d61 100755 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java @@ -36,8 +36,8 @@ import org.apache.flink.streaming.api.invokable.operator.co.CoInvokable; import org.apache.flink.streaming.api.invokable.operator.co.CoMapInvokable; import org.apache.flink.streaming.api.invokable.operator.co.CoReduceInvokable; import org.apache.flink.streaming.api.invokable.operator.co.CoWindowInvokable; -import org.apache.flink.streaming.api.invokable.util.DefaultTimeStamp; -import org.apache.flink.streaming.api.invokable.util.TimeStamp; +import org.apache.flink.streaming.api.windowing.helper.SystemTimestamp; +import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper; /** * The ConnectedDataStream represents a stream for two different data types. It @@ -305,8 +305,8 @@ public class ConnectedDataStream { * @return The transformed {@link ConnectedDataStream} */ public CoWindowDataStream window(long windowSize1, long windowSize2, - long slideInterval1, long slideInterval2, TimeStamp timeStamp1, - TimeStamp timeStamp2) { + long slideInterval1, long slideInterval2, TimestampWrapper timeStamp1, + TimestampWrapper timeStamp2) { if (windowSize1 < 1 || windowSize2 < 1) { throw new IllegalArgumentException("Window size must be positive"); } @@ -338,10 +338,12 @@ public class ConnectedDataStream { * second input data stream are slid by after each transformation * @return The transformed {@link ConnectedDataStream} */ + @SuppressWarnings("unchecked") public CoWindowDataStream window(long windowSize1, long windowSize2, long slideInterval1, long slideInterval2) { return window(windowSize1, windowSize2, slideInterval1, slideInterval2, - new DefaultTimeStamp(), new DefaultTimeStamp()); + (TimestampWrapper) SystemTimestamp.getWrapper(), + (TimestampWrapper) SystemTimestamp.getWrapper()); } /** @@ -365,7 +367,7 @@ public class ConnectedDataStream { * @return The transformed {@link ConnectedDataStream} */ public CoWindowDataStream window(long windowSize1, long windowSize2, - TimeStamp timeStamp1, TimeStamp timeStamp2) { + TimestampWrapper timeStamp1, TimestampWrapper timeStamp2) { return window(windowSize1, windowSize2, windowSize1, windowSize2, timeStamp1, timeStamp2); } @@ -384,9 +386,11 @@ public class ConnectedDataStream { * milliseconds * @return The transformed {@link ConnectedDataStream} */ + @SuppressWarnings("unchecked") public CoWindowDataStream window(long windowSize1, long windowSize2) { return window(windowSize1, windowSize2, windowSize1, windowSize2, - new DefaultTimeStamp(), new DefaultTimeStamp()); + (TimestampWrapper) SystemTimestamp.getWrapper(), + (TimestampWrapper) SystemTimestamp.getWrapper()); } /** @@ -479,10 +483,12 @@ public class ConnectedDataStream { * * @return The transformed {@link DataStream}. */ + @SuppressWarnings("unchecked") public SingleOutputStreamOperator windowReduce( CoWindowFunction coWindowFunction, long windowSize, long slideInterval) { return windowReduce(coWindowFunction, windowSize, slideInterval, - new DefaultTimeStamp(), new DefaultTimeStamp()); + (TimestampWrapper) SystemTimestamp.getWrapper(), + (TimestampWrapper) SystemTimestamp.getWrapper()); } /** @@ -510,7 +516,7 @@ public class ConnectedDataStream { */ public SingleOutputStreamOperator windowReduce( CoWindowFunction coWindowFunction, long windowSize, long slideInterval, - TimeStamp timestamp1, TimeStamp timestamp2) { + TimestampWrapper timestamp1, TimestampWrapper timestamp2) { if (windowSize < 1) { throw new IllegalArgumentException("Window size must be positive"); @@ -541,8 +547,8 @@ public class ConnectedDataStream { public SingleOutputStreamOperator addGeneralWindowCombine( CoWindowFunction coWindowFunction, TypeInformation outTypeInfo, - long windowSize, long slideInterval, TimeStamp timestamp1, - TimeStamp timestamp2) { + long windowSize, long slideInterval, TimestampWrapper timestamp1, + TimestampWrapper timestamp2) { if (windowSize < 1) { throw new IllegalArgumentException("Window size must be positive"); @@ -550,7 +556,7 @@ public class ConnectedDataStream { if (slideInterval < 1) { throw new IllegalArgumentException("Slide interval must be positive"); } - + return addCoFunction("coWindowReduce", outTypeInfo, new CoWindowInvokable( clean(coWindowFunction), windowSize, slideInterval, timestamp1, timestamp2)); diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/TemporalOperator.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/TemporalOperator.java index cd8aabd6dc6..e5385f07736 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/TemporalOperator.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/TemporalOperator.java @@ -18,8 +18,8 @@ package org.apache.flink.streaming.api.datastream; -import org.apache.flink.streaming.api.invokable.util.DefaultTimeStamp; -import org.apache.flink.streaming.api.invokable.util.TimeStamp; +import org.apache.flink.streaming.api.windowing.helper.SystemTimestamp; +import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper; public abstract class TemporalOperator { @@ -29,8 +29,8 @@ public abstract class TemporalOperator { public long windowSize; public long slideInterval; - public TimeStamp timeStamp1; - public TimeStamp timeStamp2; + public TimestampWrapper timeStamp1; + public TimestampWrapper timeStamp2; public TemporalOperator(DataStream input1, DataStream input2) { if (input1 == null || input2 == null) { @@ -41,37 +41,37 @@ public abstract class TemporalOperator { } /** - * Continues a temporal Join transformation.
- * Defines the window size on which the two DataStreams will be joined. + * Continues a temporal transformation.
+ * Defines the window size on which the two DataStreams will be transformed. * * @param windowSize * The size of the window in milliseconds. - * @return An incomplete Join transformation. Call {@link JoinWindow#where} - * to continue the Join. + * @return An incomplete temporal transformation. */ public OP onWindow(long windowSize) { return onWindow(windowSize, windowSize); } /** - * Continues a temporal Join transformation.
- * Defines the window size on which the two DataStreams will be joined. + * Continues a temporal transformation.
+ * Defines the window size on which the two DataStreams will be transformed. * * @param windowSize * The size of the window in milliseconds. * @param slideInterval * The slide size of the window. - * @return An incomplete Join transformation. Call {@link JoinWindow#where} - * to continue the Join. + * @return An incomplete temporal transformation. */ + @SuppressWarnings("unchecked") public OP onWindow(long windowSize, long slideInterval) { - return onWindow(windowSize, slideInterval, new DefaultTimeStamp(), - new DefaultTimeStamp()); + return onWindow(windowSize, slideInterval, + (TimestampWrapper) SystemTimestamp.getWrapper(), + (TimestampWrapper) SystemTimestamp.getWrapper()); } /** - * Continues a temporal Join transformation.
- * Defines the window size on which the two DataStreams will be joined. + * Continues a temporal transformation.
+ * Defines the window size on which the two DataStreams will be transformed. * * @param windowSize * The size of the window in milliseconds. @@ -83,11 +83,10 @@ public abstract class TemporalOperator { * @param timeStamp2 * The timestamp used to extract time from the elements of the * second data stream. - * @return An incomplete Join transformation. Call {@link JoinWindow#where} - * to continue the Join. + * @return An incomplete temporal transformation. */ - public OP onWindow(long windowSize, long slideInterval, TimeStamp timeStamp1, - TimeStamp timeStamp2) { + public OP onWindow(long windowSize, long slideInterval, TimestampWrapper timeStamp1, + TimestampWrapper timeStamp2) { this.windowSize = windowSize; this.slideInterval = slideInterval; diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoGroupedWindowReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoGroupedWindowReduceInvokable.java index 4905566c133..736239f5cc7 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoGroupedWindowReduceInvokable.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoGroupedWindowReduceInvokable.java @@ -22,8 +22,8 @@ import java.util.Map; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.streaming.api.function.co.CoReduceFunction; -import org.apache.flink.streaming.api.invokable.util.TimeStamp; import org.apache.flink.streaming.api.streamrecord.StreamRecord; +import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper; public class CoGroupedWindowReduceInvokable extends CoWindowReduceInvokable { @@ -38,7 +38,7 @@ public class CoGroupedWindowReduceInvokable extends public CoGroupedWindowReduceInvokable(CoReduceFunction coReducer, long windowSize1, long windowSize2, long slideInterval1, long slideInterval2, KeySelector keySelector1, KeySelector keySelector2, - TimeStamp timestamp1, TimeStamp timestamp2) { + TimestampWrapper timestamp1, TimestampWrapper timestamp2) { super(coReducer, windowSize1, windowSize2, slideInterval1, slideInterval2, timestamp1, timestamp2); this.keySelector1 = keySelector1; diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoWindowInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoWindowInvokable.java index 7df56688c6c..59552f41f16 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoWindowInvokable.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoWindowInvokable.java @@ -23,8 +23,8 @@ import java.util.List; import org.apache.commons.math.util.MathUtils; import org.apache.flink.streaming.api.function.co.CoWindowFunction; -import org.apache.flink.streaming.api.invokable.util.TimeStamp; import org.apache.flink.streaming.api.streamrecord.StreamRecord; +import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper; import org.apache.flink.streaming.state.CircularFifoList; public class CoWindowInvokable extends CoInvokable { @@ -35,8 +35,8 @@ public class CoWindowInvokable extends CoInvokable protected long slideSize; protected CircularFifoList> circularList1; protected CircularFifoList> circularList2; - protected TimeStamp timeStamp1; - protected TimeStamp timeStamp2; + protected TimestampWrapper timeStamp1; + protected TimestampWrapper timeStamp2; protected StreamWindow window; @@ -44,7 +44,7 @@ public class CoWindowInvokable extends CoInvokable protected long nextRecordTime; public CoWindowInvokable(CoWindowFunction coWindowFunction, long windowSize, - long slideInterval, TimeStamp timeStamp1, TimeStamp timeStamp2) { + long slideInterval, TimestampWrapper timeStamp1, TimestampWrapper timeStamp2) { super(coWindowFunction); this.coWindowFunction = coWindowFunction; this.windowSize = windowSize; diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoWindowReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoWindowReduceInvokable.java index fa477610913..0c8598ffb80 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoWindowReduceInvokable.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoWindowReduceInvokable.java @@ -19,8 +19,7 @@ package org.apache.flink.streaming.api.invokable.operator.co; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.function.co.CoReduceFunction; -import org.apache.flink.streaming.api.invokable.util.DefaultTimeStamp; -import org.apache.flink.streaming.api.invokable.util.TimeStamp; +import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper; public class CoWindowReduceInvokable extends CoBatchReduceInvokable { private static final long serialVersionUID = 1L; @@ -28,14 +27,14 @@ public class CoWindowReduceInvokable extends CoBatchReduceInvokab protected long startTime2; protected long nextRecordTime1; protected long nextRecordTime2; - protected TimeStamp timestamp1; - protected TimeStamp timestamp2; + protected TimestampWrapper timestamp1; + protected TimestampWrapper timestamp2; protected StreamWindow window1; protected StreamWindow window2; public CoWindowReduceInvokable(CoReduceFunction coReducer, long windowSize1, - long windowSize2, long slideInterval1, long slideInterval2, TimeStamp timestamp1, - TimeStamp timestamp2) { + long windowSize2, long slideInterval1, long slideInterval2, + TimestampWrapper timestamp1, TimestampWrapper timestamp2) { super(coReducer, windowSize1, windowSize2, slideInterval1, slideInterval2); this.timestamp1 = timestamp1; this.timestamp2 = timestamp2; @@ -51,10 +50,10 @@ public class CoWindowReduceInvokable extends CoBatchReduceInvokab this.window2 = new StreamWindow(batchSize2, slideSize2); this.batch1 = this.window1; this.batch2 = this.window2; - if (timestamp1 instanceof DefaultTimeStamp) { + if (timestamp1.isDefaultTimestamp()) { (new TimeCheck1()).start(); } - if (timestamp2 instanceof DefaultTimeStamp) { + if (timestamp2.isDefaultTimestamp()) { (new TimeCheck2()).start(); } } diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Delta.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Delta.java index 9c8c5ca4c54..5434a4e6f86 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Delta.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Delta.java @@ -84,8 +84,8 @@ public class Delta implements WindowingHelper { * The threshold used by the delta function. * @return Helper representing a delta trigger or eviction policy */ - public static Delta of(DeltaFunction deltaFunction, DATA initVal, - double threshold) { + public static Delta of(double threshold, DeltaFunction deltaFunction, + DATA initVal) { return new Delta(deltaFunction, initVal, threshold); } } diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/util/DefaultTimeStamp.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/SystemTimestamp.java similarity index 71% rename from flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/util/DefaultTimeStamp.java rename to flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/SystemTimestamp.java index 2f22e8eb638..8581ac55c9f 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/util/DefaultTimeStamp.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/SystemTimestamp.java @@ -15,16 +15,14 @@ * limitations under the License. */ -package org.apache.flink.streaming.api.invokable.util; +package org.apache.flink.streaming.api.windowing.helper; /** - * Default timestamp function that uses the Java System.currentTimeMillis() - * method to retrieve a timestamp. - * - * @param - * Type of the inputs of the reducing function. + * {@link Timestamp} implementation to be used when system time is needed to + * determine windows */ -public class DefaultTimeStamp implements TimeStamp { +public class SystemTimestamp implements Timestamp { + private static final long serialVersionUID = 1L; @Override @@ -32,8 +30,8 @@ public class DefaultTimeStamp implements TimeStamp { return System.currentTimeMillis(); } - @Override - public long getStartTime() { - return System.currentTimeMillis(); + public static TimestampWrapper getWrapper() { + return new TimestampWrapper(new SystemTimestamp(), System.currentTimeMillis()); } + } diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Time.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Time.java index d987e32470c..9dc1c8c5020 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Time.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Time.java @@ -19,8 +19,6 @@ package org.apache.flink.streaming.api.windowing.helper; import java.util.concurrent.TimeUnit; -import org.apache.flink.streaming.api.invokable.util.DefaultTimeStamp; -import org.apache.flink.streaming.api.invokable.util.TimeStamp; import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy; import org.apache.flink.streaming.api.windowing.policy.TimeEvictionPolicy; import org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy; @@ -39,7 +37,7 @@ public class Time implements WindowingHelper { private long length; private TimeUnit granularity; - private TimeStamp timeStamp; + private TimestampWrapper timestampWrapper; private long delay; /** @@ -53,31 +51,52 @@ public class Time implements WindowingHelper { * the smallest possible granularity is milliseconds. Any smaller * time unit might cause an error at runtime due to conversion * problems. - * @param timeStamp + * @param timestamp * The user defined timestamp that will be used to extract time * information from the incoming elements + * @param startTime + * The startTime of the stream for computing the first window */ - private Time(long length, TimeUnit timeUnit, TimeStamp timeStamp) { + private Time(long length, TimeUnit timeUnit, Timestamp timestamp, long startTime) { + this(length, timeUnit, new TimestampWrapper(timestamp, startTime)); + } + + /** + * Creates an helper representing a trigger which triggers every given + * length or an eviction which evicts all elements older than length. + * + * @param length + * The number of time units + * @param timeUnit + * The unit of time such as minute oder millisecond. Note that + * the smallest possible granularity is milliseconds. Any smaller + * time unit might cause an error at runtime due to conversion + * problems. + * @param timestampWrapper + * The user defined {@link TimestampWrapper} that will be used to + * extract time information from the incoming elements + */ + private Time(long length, TimeUnit timeUnit, TimestampWrapper timestampWrapper) { this.length = length; this.granularity = timeUnit; - this.timeStamp = timeStamp; + this.timestampWrapper = timestampWrapper; this.delay = 0; } @Override public EvictionPolicy toEvict() { - return new TimeEvictionPolicy(granularityInMillis(), timeStamp); + return new TimeEvictionPolicy(granularityInMillis(), timestampWrapper); } @Override public TriggerPolicy toTrigger() { - return new TimeTriggerPolicy(granularityInMillis(), timeStamp, delay); + return new TimeTriggerPolicy(granularityInMillis(), timestampWrapper, delay); } /** * Creates a helper representing a time trigger which triggers every given * length (slide size) or a time eviction which evicts all elements older - * than length (window size). + * than length (window size) using System time. * * @param length * The number of time units @@ -88,8 +107,10 @@ public class Time implements WindowingHelper { * problems. * @return Helper representing the time based trigger and eviction policy */ + @SuppressWarnings("unchecked") public static Time of(long length, TimeUnit timeUnit) { - return new Time(length, timeUnit, new DefaultTimeStamp()); + return new Time(length, timeUnit, + (TimestampWrapper) SystemTimestamp.getWrapper()); } /** @@ -99,13 +120,32 @@ public class Time implements WindowingHelper { * * @param length * The number of time units - * @param timeStamp + * @param timestamp + * The user defined timestamp that will be used to extract time + * information from the incoming elements + * @param startTime + * The startTime used to compute the first window + * @return Helper representing the time based trigger and eviction policy + */ + public static Time of(long length, Timestamp timestamp, long startTime) { + return new Time(length, null, timestamp, startTime); + } + + /** + * Creates a helper representing a time trigger which triggers every given + * length (slide size) or a time eviction which evicts all elements older + * than length (window size) using a user defined timestamp extractor. By + * default the start time is set to 0. + * + * @param length + * The number of time units + * @param timestamp * The user defined timestamp that will be used to extract time * information from the incoming elements * @return Helper representing the time based trigger and eviction policy */ - public static Time of(long length, TimeStamp timeStamp) { - return new Time(length, TimeUnit.MILLISECONDS, timeStamp); + public static Time of(long length, Timestamp timestamp) { + return of(length, timestamp, 0); } /** @@ -121,6 +161,10 @@ public class Time implements WindowingHelper { } private long granularityInMillis() { - return this.granularity.toMillis(this.length); + if (granularity != null) { + return this.granularity.toMillis(this.length); + } else { + return this.length; + } } } diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/util/TimeStamp.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Timestamp.java similarity index 83% rename from flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/util/TimeStamp.java rename to flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Timestamp.java index 86fa101ddf0..fea60202ae0 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/util/TimeStamp.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Timestamp.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.flink.streaming.api.invokable.util; +package org.apache.flink.streaming.api.windowing.helper; import java.io.Serializable; @@ -26,7 +26,7 @@ import java.io.Serializable; * @param * Type of the value to create the timestamp from. */ -public interface TimeStamp extends Serializable { +public interface Timestamp extends Serializable { /** * Values @@ -36,11 +36,4 @@ public interface TimeStamp extends Serializable { * @return The timestamp */ public long getTimestamp(T value); - - /** - * Function to define the starting time for reference - * - * @return The starting timestamp - */ - public long getStartTime(); } diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/TimestampWrapper.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/TimestampWrapper.java new file mode 100644 index 00000000000..8c3a09d7a72 --- /dev/null +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/TimestampWrapper.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.windowing.helper; + +import java.io.Serializable; + +public class TimestampWrapper implements Serializable { + + private static final long serialVersionUID = 1L; + private long startTime; + private Timestamp timestamp; + + public TimestampWrapper(Timestamp timeStamp, long startTime) { + this.timestamp = timeStamp; + this.startTime = startTime; + } + + public long getTimestamp(T in) { + return timestamp.getTimestamp(in); + } + + public long getStartTime() { + return startTime; + } + + public boolean isDefaultTimestamp() { + return timestamp instanceof SystemTimestamp; + } +} diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveTriggerPolicy.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveTriggerPolicy.java index a8a704d6107..414250cc47f 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveTriggerPolicy.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveTriggerPolicy.java @@ -17,7 +17,7 @@ package org.apache.flink.streaming.api.windowing.policy; -import org.apache.flink.streaming.api.invokable.util.TimeStamp; +import org.apache.flink.streaming.api.windowing.helper.Timestamp; /** * This interface extends the {@link TriggerPolicy} interface with functionality @@ -28,7 +28,7 @@ import org.apache.flink.streaming.api.invokable.util.TimeStamp; * first. It can return zero ore more fake data points which will be added * before the the currently arrived real element gets processed. This allows to * handle empty windows in time based windowing with an user defined - * {@link TimeStamp}. Triggers are not called on fake datapoint. A fake + * {@link Timestamp}. Triggers are not called on fake datapoint. A fake * datapoint is always considered as triggered. * * 2) An active trigger has a factory method for a runnable. This factory method @@ -49,7 +49,7 @@ public interface ActiveTriggerPolicy extends TriggerPolicy { * first. It can return zero ore more fake data points which will be added * before the the currently arrived real element gets processed. This allows * to handle empty windows in time based windowing with an user defined - * {@link TimeStamp}. Triggers are not called on fake datapoints. A fake + * {@link Timestamp}. Triggers are not called on fake datapoints. A fake * datapoint is always considered as triggered. * * @param datapoint diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeEvictionPolicy.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeEvictionPolicy.java index aca1deeab5c..16c30fc32bd 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeEvictionPolicy.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeEvictionPolicy.java @@ -19,14 +19,15 @@ package org.apache.flink.streaming.api.windowing.policy; import java.util.LinkedList; -import org.apache.flink.streaming.api.invokable.util.TimeStamp; +import org.apache.flink.streaming.api.windowing.helper.Timestamp; +import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper; /** * This eviction policy evicts all elements which are older then a specified - * time. The time is measured using a given {@link TimeStamp} implementation. A + * time. The time is measured using a given {@link Timestamp} implementation. A * point in time is always represented as long. Therefore, the granularity can * be set as long value as well. - * + * * @param * The type of the incoming data points which are processed by this * policy. @@ -40,12 +41,12 @@ public class TimeEvictionPolicy implements ActiveEvictionPolicy, private static final long serialVersionUID = -1457476766124518220L; private long granularity; - private TimeStamp timestamp; + private TimestampWrapper timestampWrapper; private LinkedList buffer = new LinkedList(); /** * This eviction policy evicts all elements which are older than a specified - * time. The time is measured using a given {@link TimeStamp} + * time. The time is measured using a given {@link Timestamp} * implementation. A point in time is always represented as long. Therefore, * the granularity can be set as long value as well. If this value is set to * 2 the policy will evict all elements which are older as 2. @@ -60,12 +61,12 @@ public class TimeEvictionPolicy implements ActiveEvictionPolicy, * The granularity of the eviction. If this value is set to 2 the * policy will evict all elements which are older as 2(if * (time(X) timestamp) { - this.timestamp = timestamp; + public TimeEvictionPolicy(long granularity, TimestampWrapper timestampWrapper) { + this.timestampWrapper = timestampWrapper; this.granularity = granularity; } @@ -78,7 +79,7 @@ public class TimeEvictionPolicy implements ActiveEvictionPolicy, try { threshold = (Long) datapoint - granularity; } catch (ClassCastException e) { - threshold = timestamp.getTimestamp((DATA) datapoint) - granularity; + threshold = timestampWrapper.getTimestamp((DATA) datapoint) - granularity; } // return result @@ -91,9 +92,9 @@ public class TimeEvictionPolicy implements ActiveEvictionPolicy, checkForDeleted(bufferSize); - //remember timestamp - long time=timestamp.getTimestamp(datapoint); - + // remember timestamp + long time = timestampWrapper.getTimestamp(datapoint); + // delete and count expired tuples long threshold = time - granularity; int counter = deleteAndCountExpired(threshold); @@ -130,7 +131,7 @@ public class TimeEvictionPolicy implements ActiveEvictionPolicy, @Override public TimeEvictionPolicy clone() { - return new TimeEvictionPolicy(granularity, timestamp); + return new TimeEvictionPolicy(granularity, timestampWrapper); } } diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicy.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicy.java index 57bccf2327f..1e91b8eee9a 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicy.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicy.java @@ -19,12 +19,12 @@ package org.apache.flink.streaming.api.windowing.policy; import java.util.LinkedList; -import org.apache.flink.streaming.api.invokable.util.DefaultTimeStamp; -import org.apache.flink.streaming.api.invokable.util.TimeStamp; +import org.apache.flink.streaming.api.windowing.helper.Timestamp; +import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper; /** * This trigger policy triggers with regard to the time. The is measured using a - * given {@link TimeStamp} implementation. A point in time is always represented + * given {@link Timestamp} implementation. A point in time is always represented * as long. Therefore, parameters such as granularity and delay can be set as * long value as well. * @@ -42,12 +42,12 @@ public class TimeTriggerPolicy implements ActiveTriggerPolicy, protected long startTime; protected long granularity; - protected TimeStamp timestamp; + protected TimestampWrapper timestampWrapper; protected long delay; /** * This trigger policy triggers with regard to the time. The is measured - * using a given {@link TimeStamp} implementation. A point in time is always + * using a given {@link Timestamp} implementation. A point in time is always * represented as long. Therefore, parameters such as granularity can be set * as long value as well. If this value for the granularity is set to 2 for * example, the policy will trigger at every second point in time. @@ -55,22 +55,22 @@ public class TimeTriggerPolicy implements ActiveTriggerPolicy, * @param granularity * The granularity of the trigger. If this value is set to x the * policy will trigger at every x-th time point - * @param timestamp - * The {@link TimeStamp} to measure the time with. This can be - * either user defined of provided by the API. + * @param timestampWrapper + * The {@link TimestampWrapper} to measure the time with. This + * can be either user defined of provided by the API. * @param timeWrapper * This policy creates fake elements to not miss windows in case * no element arrived within the duration of the window. This * extractor should wrap a long into such an element of type * DATA. */ - public TimeTriggerPolicy(long granularity, TimeStamp timestamp) { - this(granularity, timestamp, 0); + public TimeTriggerPolicy(long granularity, TimestampWrapper timestampWrapper) { + this(granularity, timestampWrapper, 0); } /** * This is mostly the same as - * {@link TimeTriggerPolicy#TimeTriggerPolicy(long, TimeStamp)}. In addition + * {@link TimeTriggerPolicy#TimeTriggerPolicy(long, Timestamp)}. In addition * to granularity and timestamp a delay can be specified for the first * trigger. If the start time given by the timestamp is x, the delay is y, * and the granularity is z, the first trigger will happen at x+y+z. @@ -78,9 +78,9 @@ public class TimeTriggerPolicy implements ActiveTriggerPolicy, * @param granularity * The granularity of the trigger. If this value is set to 2 the * policy will trigger at every second time point - * @param timestamp - * The {@link TimeStamp} to measure the time with. This can be - * either user defined of provided by the API. + * @param timestampWrapper + * The {@link TimestampWrapper} to measure the time with. This + * can be either user defined of provided by the API. * @param delay * A delay for the first trigger. If the start time given by the * timestamp is x, the delay is y, and the granularity is z, the @@ -91,9 +91,9 @@ public class TimeTriggerPolicy implements ActiveTriggerPolicy, * extractor should wrap a long into such an element of type * DATA. */ - public TimeTriggerPolicy(long granularity, TimeStamp timestamp, long delay) { - this.startTime = timestamp.getStartTime() + delay; - this.timestamp = timestamp; + public TimeTriggerPolicy(long granularity, TimestampWrapper timestampWrapper, long delay) { + this.startTime = timestampWrapper.getStartTime() + delay; + this.timestampWrapper = timestampWrapper; this.granularity = granularity; this.delay = delay; } @@ -107,7 +107,7 @@ public class TimeTriggerPolicy implements ActiveTriggerPolicy, LinkedList fakeElements = new LinkedList(); // check if there is more then one window border missed // use > here. In case >= would fit, the regular call will do the job. - while (timestamp.getTimestamp(datapoint) >= startTime + granularity) { + while (timestampWrapper.getTimestamp(datapoint) >= startTime + granularity) { startTime += granularity; fakeElements.add(startTime - 1); } @@ -127,7 +127,7 @@ public class TimeTriggerPolicy implements ActiveTriggerPolicy, */ @Override public Runnable createActiveTriggerRunnable(ActiveTriggerCallback callback) { - if (this.timestamp instanceof DefaultTimeStamp) { + if (this.timestampWrapper.isDefaultTimestamp()) { return new TimeCheck(callback); } else { return null; @@ -177,7 +177,7 @@ public class TimeTriggerPolicy implements ActiveTriggerPolicy, @Override public synchronized boolean notifyTrigger(DATA datapoint) { - long recordTime = timestamp.getTimestamp(datapoint); + long recordTime = timestampWrapper.getTimestamp(datapoint); if (recordTime >= startTime + granularity) { if (granularity != 0) { startTime = recordTime - ((recordTime - startTime) % granularity); @@ -190,7 +190,7 @@ public class TimeTriggerPolicy implements ActiveTriggerPolicy, @Override public TimeTriggerPolicy clone() { - return new TimeTriggerPolicy(granularity, timestamp, delay); + return new TimeTriggerPolicy(granularity, timestampWrapper, delay); } } diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java index cd6232a192f..e856f07eb15 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java @@ -28,7 +28,7 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.function.sink.SinkFunction; -import org.apache.flink.streaming.api.invokable.util.TimeStamp; +import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper; import org.apache.flink.streaming.util.TestStreamEnvironment; import org.junit.Test; @@ -96,12 +96,16 @@ public class WindowCrossJoinTest implements Serializable { DataStream> inStream1 = env.fromCollection(in1); DataStream> inStream2 = env.fromCollection(in2); - inStream1.join(inStream2).onWindow(1000, 1000, new MyTimestamp1(), new MyTimestamp2()) - .where(0).equalTo(0).addSink(new JoinResultSink()); + inStream1 + .join(inStream2) + .onWindow(1000, 1000, new MyTimestamp>(), + new MyTimestamp>()).where(0).equalTo(0) + .addSink(new JoinResultSink()); inStream1 .cross(inStream2) - .onWindow(1000, 1000, new MyTimestamp1(), new MyTimestamp2()) + .onWindow(1000, 1000, new MyTimestamp>(), + new MyTimestamp>()) .with(new CrossFunction, Tuple1, Tuple2, Tuple1>>() { private static final long serialVersionUID = 1L; @@ -119,25 +123,15 @@ public class WindowCrossJoinTest implements Serializable { assertEquals(crossExpectedResults, crossResults); } - private static class MyTimestamp1 implements TimeStamp> { - private static final long serialVersionUID = 1L; - - @Override - public long getTimestamp(Tuple2 value) { - return 101L; + private static class MyTimestamp extends TimestampWrapper { + public MyTimestamp() { + super(null, 0); } - @Override - public long getStartTime() { - return 100L; - } - } - - private static class MyTimestamp2 implements TimeStamp> { private static final long serialVersionUID = 1L; @Override - public long getTimestamp(Tuple1 value) { + public long getTimestamp(T value) { return 101L; } diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedWindowReduceTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedWindowReduceTest.java index e3f2a1b84dd..508366cd2a6 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedWindowReduceTest.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedWindowReduceTest.java @@ -29,7 +29,7 @@ import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.function.co.CoReduceFunction; import org.apache.flink.streaming.api.invokable.operator.co.CoGroupedWindowReduceInvokable; -import org.apache.flink.streaming.api.invokable.util.TimeStamp; +import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper; import org.apache.flink.streaming.util.MockCoContext; import org.junit.Test; @@ -82,13 +82,14 @@ public class CoGroupedWindowReduceTest { } } - public static final class MyTimeStamp implements TimeStamp { + public static final class MyTimeStamp extends TimestampWrapper { private static final long serialVersionUID = 1L; private Iterator timestamps; private long start; public MyTimeStamp(List timestamps) { + super(null, 0); this.timestamps = timestamps.iterator(); this.start = timestamps.get(0); } diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoWindowReduceTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoWindowReduceTest.java index 90ad4833765..035a02102c9 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoWindowReduceTest.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoWindowReduceTest.java @@ -27,7 +27,7 @@ import java.util.List; import org.apache.flink.streaming.api.function.co.CoReduceFunction; import org.apache.flink.streaming.api.invokable.operator.co.CoWindowReduceInvokable; -import org.apache.flink.streaming.api.invokable.util.TimeStamp; +import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper; import org.apache.flink.streaming.util.MockCoContext; import org.junit.Test; @@ -57,13 +57,14 @@ public class CoWindowReduceTest { } } - public static final class MyTimeStamp implements TimeStamp { + public static final class MyTimeStamp extends TimestampWrapper { private static final long serialVersionUID = 1L; private Iterator timestamps; private long start; public MyTimeStamp(List timestamps) { + super(null, 0); this.timestamps = timestamps.iterator(); this.start = timestamps.get(0); } diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoWindowTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoWindowTest.java index c6d446a44d1..4ab3492f1fd 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoWindowTest.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoWindowTest.java @@ -27,7 +27,8 @@ import java.util.Set; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.function.co.CoWindowFunction; import org.apache.flink.streaming.api.invokable.operator.co.CoWindowInvokable; -import org.apache.flink.streaming.api.invokable.util.TimeStamp; +import org.apache.flink.streaming.api.windowing.helper.Timestamp; +import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper; import org.apache.flink.streaming.util.MockCoContext; import org.apache.flink.util.Collector; import org.junit.Test; @@ -80,7 +81,7 @@ public class CoWindowTest { } - private static final class MyTS1 implements TimeStamp { + private static final class MyTS1 implements Timestamp { private static final long serialVersionUID = 1L; @@ -89,14 +90,9 @@ public class CoWindowTest { return value; } - @Override - public long getStartTime() { - return 1; - } - } - private static final class MyTS2 implements TimeStamp> { + private static final class MyTS2 implements Timestamp> { private static final long serialVersionUID = 1L; @@ -105,18 +101,14 @@ public class CoWindowTest { return value.f0; } - @Override - public long getStartTime() { - return 1; - } - } @Test public void coWindowGroupReduceTest2() throws Exception { CoWindowInvokable invokable1 = new CoWindowInvokable( - new MyCoGroup1(), 2, 1, new MyTS1(), new MyTS1()); + new MyCoGroup1(), 2, 1, new TimestampWrapper(new MyTS1(), 1), + new TimestampWrapper(new MyTS1(), 1)); // Windowsize 2, slide 1 // 1,2|2,3|3,4|4,5 @@ -152,7 +144,8 @@ public class CoWindowTest { assertEquals(expected1, actual1); CoWindowInvokable, Tuple2, Integer> invokable2 = new CoWindowInvokable, Tuple2, Integer>( - new MyCoGroup2(), 2, 3, new MyTS2(), new MyTS2()); + new MyCoGroup2(), 2, 3, new TimestampWrapper>(new MyTS2(), + 1), new TimestampWrapper>(new MyTS2(), 1)); // WindowSize 2, slide 3 // 1,2|4,5|7,8| diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokableTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokableTest.java index d97cadc220b..f38d5c199f3 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokableTest.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokableTest.java @@ -28,7 +28,8 @@ import java.util.List; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.streaming.api.invokable.util.TimeStamp; +import org.apache.flink.streaming.api.windowing.helper.Timestamp; +import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper; import org.apache.flink.streaming.api.windowing.policy.ActiveCloneableEvictionPolicyWrapper; import org.apache.flink.streaming.api.windowing.policy.CloneableEvictionPolicy; import org.apache.flink.streaming.api.windowing.policy.CloneableTriggerPolicy; @@ -360,20 +361,18 @@ public class GroupedWindowInvokableTest { expected.add(new Tuple2(32, "b")); expected.add(new Tuple2(32, "c")); - TimeStamp> myTimeStamp = new TimeStamp>() { + Timestamp> myTimeStamp = new Timestamp>() { private static final long serialVersionUID = 1L; @Override public long getTimestamp(Tuple2 value) { return value.f0; } - - @Override - public long getStartTime() { - return 1; - } }; + TimestampWrapper> myTimeStampWrapper = new TimestampWrapper>( + myTimeStamp, 1); + ReduceFunction> myReduceFunction = new ReduceFunction>() { private static final long serialVersionUID = 1L; @@ -387,11 +386,11 @@ public class GroupedWindowInvokableTest { LinkedList>> triggers = new LinkedList>>(); // Trigger every 2 time units but delay the first trigger by 2 (First // trigger after 4, then every 2) - triggers.add(new TimeTriggerPolicy>(2L, myTimeStamp, 2L)); + triggers.add(new TimeTriggerPolicy>(2L, myTimeStampWrapper, 2L)); LinkedList>> evictions = new LinkedList>>(); // Always delete all elements older then 4 - evictions.add(new TimeEvictionPolicy>(4L, myTimeStamp)); + evictions.add(new TimeEvictionPolicy>(4L, myTimeStampWrapper)); LinkedList>> distributedTriggers = new LinkedList>>(); @@ -409,10 +408,10 @@ public class GroupedWindowInvokableTest { // repeat the test with central eviction. The result should be the same. triggers.clear(); - triggers.add(new TimeTriggerPolicy>(2L, myTimeStamp, 2L)); + triggers.add(new TimeTriggerPolicy>(2L, myTimeStampWrapper, 2L)); evictions.clear(); LinkedList>> centralEvictions = new LinkedList>>(); - centralEvictions.add(new TimeEvictionPolicy>(4L, myTimeStamp)); + centralEvictions.add(new TimeEvictionPolicy>(4L, myTimeStampWrapper)); invokable = new GroupedWindowInvokable, Tuple2>( myReduceFunction, keySelector, distributedTriggers, evictions, triggers, diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/WindowInvokableTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/WindowInvokableTest.java index 421a99968cf..83b45961ba9 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/WindowInvokableTest.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/WindowInvokableTest.java @@ -24,7 +24,8 @@ import java.util.LinkedList; import java.util.List; import org.apache.flink.api.common.functions.ReduceFunction; -import org.apache.flink.streaming.api.invokable.util.TimeStamp; +import org.apache.flink.streaming.api.windowing.helper.Timestamp; +import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper; import org.apache.flink.streaming.api.windowing.policy.CountEvictionPolicy; import org.apache.flink.streaming.api.windowing.policy.CountTriggerPolicy; import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy; @@ -62,18 +63,13 @@ public class WindowInvokableTest { expected.add(10); expected.add(32); - TimeStamp myTimeStamp = new TimeStamp() { + Timestamp myTimeStamp = new Timestamp() { private static final long serialVersionUID = 1L; @Override public long getTimestamp(Integer value) { return value; } - - @Override - public long getStartTime() { - return 1; - } }; ReduceFunction myReduceFunction = new ReduceFunction() { @@ -88,11 +84,13 @@ public class WindowInvokableTest { LinkedList> triggers = new LinkedList>(); // Trigger every 2 time units but delay the first trigger by 2 (First // trigger after 4, then every 2) - triggers.add(new TimeTriggerPolicy(2L, myTimeStamp, 2L)); + triggers.add(new TimeTriggerPolicy(2L, new TimestampWrapper(myTimeStamp, + 1), 2L)); LinkedList> evictions = new LinkedList>(); // Always delete all elements older then 4 - evictions.add(new TimeEvictionPolicy(4L, myTimeStamp)); + evictions.add(new TimeEvictionPolicy(4L, new TimestampWrapper( + myTimeStamp, 1))); WindowInvokable invokable = new WindowReduceInvokable( myReduceFunction, triggers, evictions); diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/TimeEvictionPolicyTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/TimeEvictionPolicyTest.java index 82c8841cf03..b5d502b12aa 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/TimeEvictionPolicyTest.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/TimeEvictionPolicyTest.java @@ -19,7 +19,8 @@ package org.apache.flink.streaming.api.windowing.policy; import java.util.LinkedList; -import org.apache.flink.streaming.api.invokable.util.TimeStamp; +import org.apache.flink.streaming.api.windowing.helper.Timestamp; +import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper; import org.junit.Test; import static org.junit.Assert.*; @@ -35,25 +36,20 @@ public class TimeEvictionPolicyTest { // create a timestamp @SuppressWarnings("serial") - TimeStamp timeStamp = new TimeStamp() { + Timestamp timeStamp = new Timestamp() { @Override public long getTimestamp(Integer value) { return value; } - @Override - public long getStartTime() { - return 0; - } - }; // test different granularity for (long granularity = 0; granularity < 40; granularity++) { // create policy TimeEvictionPolicy policy = new TimeEvictionPolicy(granularity, - timeStamp); + new TimestampWrapper(timeStamp, 0)); // The trigger status should not effect the policy. Therefore, it's // value is changed after each usage. diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicyTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicyTest.java index 9c77a55b11f..2bdbd96ab84 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicyTest.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicyTest.java @@ -21,7 +21,8 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; -import org.apache.flink.streaming.api.invokable.util.TimeStamp; +import org.apache.flink.streaming.api.windowing.helper.Timestamp; +import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper; import org.junit.Test; public class TimeTriggerPolicyTest { @@ -33,29 +34,24 @@ public class TimeTriggerPolicyTest { // create a timestamp @SuppressWarnings("serial") - TimeStamp timeStamp = new TimeStamp() { + Timestamp timeStamp = new Timestamp() { @Override public long getTimestamp(Integer value) { return value; } - @Override - public long getStartTime() { - return 0; - } - }; // test different granularity for (long granularity = 0; granularity < 31; granularity++) { // create policy - TriggerPolicy policy = new TimeTriggerPolicy(granularity, timeStamp); + + TriggerPolicy policy = new TimeTriggerPolicy(granularity, + new TimestampWrapper(timeStamp, 0)); // remember window border - // Remark: This might NOT work in case the timeStamp uses - // System.getCurrentTimeMillis to determine the start time. - long currentTime = timeStamp.getStartTime(); + long currentTime = 0; // test by adding values for (int i = 0; i < times.length; i++) { @@ -85,22 +81,18 @@ public class TimeTriggerPolicyTest { // create a timestamp @SuppressWarnings("serial") - TimeStamp timeStamp = new TimeStamp() { + Timestamp timeStamp = new Timestamp() { @Override public long getTimestamp(Integer value) { return value; } - @Override - public long getStartTime() { - return 0; - } - }; // create policy - TimeTriggerPolicy policy = new TimeTriggerPolicy(5, timeStamp); + TimeTriggerPolicy policy = new TimeTriggerPolicy(5, + new TimestampWrapper(timeStamp, 0)); // expected result Long[][] result = { {}, {}, { 4L, 9L, 14L, 19L }, { 24L } }; diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/DeltaExtractExample.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/DeltaExtractExample.java index 1013e6f041e..d6a9ac0c362 100644 --- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/DeltaExtractExample.java +++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/DeltaExtractExample.java @@ -49,10 +49,8 @@ public class DeltaExtractExample { @SuppressWarnings({ "unchecked", "rawtypes" }) DataStream dstream = env .addSource(new CountingSource()) - .window(Delta.of(new EuclideanDistance(new FieldsFromTuple(0, 1)), new Tuple3(0d, - 0d, "foo"), 1.2)) - .every(Count.of(2)) - .reduce(new ConcatStrings()); + .window(Delta.of(1.2, new EuclideanDistance(new FieldsFromTuple(0, 1)), new Tuple3( + 0d, 0d, "foo"))).every(Count.of(2)).reduce(new ConcatStrings()); // emit result if (fileOutput) { diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala index 38ad384a163..dc1e5b3151c 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala @@ -18,6 +18,7 @@ package org.apache.flink.api.scala.streaming import org.apache.flink.api.scala._ +import org.apache.flink.api.scala.streaming.StreamExecutionEnvironment.clean import org.apache.flink.streaming.api.datastream.{ DataStream => JavaStream } import org.apache.flink.api.common.typeinfo.TypeInformation import scala.reflect.ClassTag @@ -53,24 +54,6 @@ import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo class DataStream[T](javaStream: JavaStream[T]) { - /* This code is originally from the Apache Spark project. */ - /** - * Clean a closure to make it ready to serialized and send to tasks - * (removes unreferenced variables in $outer's, updates REPL variables) - * If checkSerializable is set, clean will also proactively - * check to see if f is serializable and throw a SparkException - * if not. - * - * @param f the closure to clean - * @param checkSerializable whether or not to immediately check f for serializability - * @throws SparkException if checkSerializable is set but f is not - * serializable - */ - private[flink] def clean[F <: AnyRef](f: F, checkSerializable: Boolean = true): F = { - ClosureCleaner.clean(f, checkSerializable) - f - } - /** * Gets the underlying java DataStream object. */ diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamCrossOperator.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamCrossOperator.scala index 72052b9e6f8..e9010c8b18b 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamCrossOperator.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamCrossOperator.scala @@ -32,7 +32,7 @@ import org.apache.flink.streaming.api.function.co.CrossWindowFunction import org.apache.flink.api.common.functions.CrossFunction import org.apache.flink.api.scala.typeutils.CaseClassSerializer import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo - +import org.apache.flink.api.scala.streaming.StreamExecutionEnvironment.clean class StreamCrossOperator[I1, I2](i1: JavaStream[I1], i2: JavaStream[I2]) extends TemporalOperator[I1, I2, StreamCrossOperator.CrossWindow[I1, I2]](i1, i2) { diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamExecutionEnvironment.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamExecutionEnvironment.scala index 2489a640945..a7a471f21e2 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamExecutionEnvironment.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamExecutionEnvironment.scala @@ -161,9 +161,9 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) { */ def addSource[T: ClassTag: TypeInformation](function: SourceFunction[T]): DataStream[T] = { Validate.notNull(function, "Function must not be null.") - ClosureCleaner.clean(function, true) + val cleanFun = StreamExecutionEnvironment.clean(function) val typeInfo = implicitly[TypeInformation[T]] - new DataStream[T](javaEnv.addSource(function, typeInfo)) + new DataStream[T](javaEnv.addSource(cleanFun, typeInfo)) } /** @@ -174,8 +174,9 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) { def addSource[T: ClassTag: TypeInformation](function: Collector[T] => Unit): DataStream[T] = { Validate.notNull(function, "Function must not be null.") val sourceFunction = new SourceFunction[T] { + val cleanFun = StreamExecutionEnvironment.clean(function) override def invoke(out: Collector[T]) { - function(out) + cleanFun(out) } } addSource(sourceFunction) @@ -205,6 +206,11 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) { } object StreamExecutionEnvironment { + + private[flink] def clean[F <: AnyRef](f: F, checkSerializable: Boolean = true): F = { + ClosureCleaner.clean(f, checkSerializable) + f + } /** * Creates an execution environment that represents the context in which the program is diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamJoinOperator.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamJoinOperator.scala index f47d79e158a..4ed50825343 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamJoinOperator.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamJoinOperator.scala @@ -33,6 +33,7 @@ import org.apache.commons.lang.Validate import org.apache.flink.streaming.api.invokable.operator.co.CoWindowInvokable import org.apache.flink.streaming.util.keys.KeySelectorUtil import org.apache.flink.api.java.operators.Keys +import org.apache.flink.api.scala.streaming.StreamExecutionEnvironment.clean class StreamJoinOperator[I1, I2](i1: JavaStream[I1], i2: JavaStream[I2]) extends TemporalOperator[I1, I2, StreamJoinOperator.JoinWindow[I1, I2]](i1, i2) { diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/WindowedDataStream.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/WindowedDataStream.scala index 5346c4ce8c7..e33368c254b 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/WindowedDataStream.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/WindowedDataStream.scala @@ -40,14 +40,10 @@ import org.apache.flink.streaming.api.function.aggregation.AggregationFunction.A import org.apache.flink.api.java.typeutils.TupleTypeInfoBase import org.apache.flink.streaming.api.function.aggregation.SumFunction import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo +import org.apache.flink.api.scala.streaming.StreamExecutionEnvironment.clean class WindowedDataStream[T](javaStream: JavaWStream[T]) { - private[flink] def clean[F <: AnyRef](f: F, checkSerializable: Boolean = true): F = { - ClosureCleaner.clean(f, checkSerializable) - f - } - /** * Defines the slide size (trigger frequency) for the windowed data stream. * This controls how often the user defined function will be triggered on diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/windowing/Delta.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/windowing/Delta.scala new file mode 100644 index 00000000000..b7d1546bf46 --- /dev/null +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/windowing/Delta.scala @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala.streaming.windowing + +import org.apache.flink.streaming.api.windowing.helper.{ Delta => JavaDelta } +import org.apache.commons.lang.Validate +import org.apache.flink.api.scala.streaming.StreamExecutionEnvironment.clean +import org.apache.flink.streaming.api.windowing.deltafunction.DeltaFunction + +object Delta { + + /** + * Creates a delta helper representing a delta trigger or eviction policy. + *

This policy calculates a delta between the data point which + * triggered last and the currently arrived data point. It triggers if the + * delta is higher than a specified threshold.

In case it gets + * used for eviction, this policy starts from the first element of the + * buffer and removes all elements from the buffer which have a higher delta + * then the threshold. As soon as there is an element with a lower delta, + * the eviction stops. + */ + def of[T](threshold: Double, deltaFunction: (T, T) => Double, initVal: T): JavaDelta[T] = { + Validate.notNull(deltaFunction, "Delta function must not be null") + val df = new DeltaFunction[T] { + val cleanFun = clean(deltaFunction) + override def getDelta(first: T, second: T) = cleanFun(first, second) + } + JavaDelta.of(threshold, df, initVal) + } + +} diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/windowing/Time.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/windowing/Time.scala new file mode 100644 index 00000000000..62a47c2a6a3 --- /dev/null +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/windowing/Time.scala @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala.streaming.windowing + +import java.util.concurrent.TimeUnit +import org.apache.flink.streaming.api.windowing.helper.{ Time => JavaTime } +import org.apache.flink.api.scala.ClosureCleaner +import org.apache.commons.net.ntp.TimeStamp +import org.apache.flink.streaming.api.windowing.helper.Timestamp +import org.apache.flink.api.scala.streaming.StreamExecutionEnvironment.clean +import org.apache.commons.lang.Validate + +object Time { + + /** + * Creates a helper representing a time trigger which triggers every given + * length (slide size) or a time eviction which evicts all elements older + * than length (window size) using System time. + * + */ + def of(windowSize: Long, timeUnit: TimeUnit): JavaTime[_] = + JavaTime.of(windowSize, timeUnit) + + /** + * Creates a helper representing a time trigger which triggers every given + * length (slide size) or a time eviction which evicts all elements older + * than length (window size) using a user defined timestamp extractor. + * + */ + def of[R](windowSize: Long, timestamp: R => Long, startTime: Long = 0): JavaTime[R] = { + Validate.notNull(timestamp, "Timestamp must not be null.") + val ts = new Timestamp[R] { + val fun = clean(timestamp, true) + override def getTimestamp(in: R) = fun(in) + } + JavaTime.of(windowSize, ts, startTime) + } + +} -- GitLab