提交 fac77348 编写于 作者: G Gyula Fora

[scala] [streaming] Added scala window helpers + timestamp rework for lambda support

上级 8bf9416d
......@@ -22,33 +22,33 @@ import org.apache.flink.streaming.api.function.co.CoReduceFunction;
import org.apache.flink.streaming.api.invokable.operator.co.CoGroupedWindowReduceInvokable;
import org.apache.flink.streaming.api.invokable.operator.co.CoInvokable;
import org.apache.flink.streaming.api.invokable.operator.co.CoWindowReduceInvokable;
import org.apache.flink.streaming.api.invokable.util.TimeStamp;
import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
/**
* A {@link CoWindowDataStream} represents two data streams whose elements are
* batched together into sliding windows. Operation
* {@link #reduce(CoReduceFunction)} can be applied for each window.
*
*
* @param <IN1>
* The type of the first input data stream
* @param <IN2>
* The type of the second input data stream
*/
public class CoWindowDataStream<IN1, IN2> extends CoBatchedDataStream<IN1, IN2> {
TimeStamp<IN1> timeStamp1;
TimeStamp<IN2> timeStamp2;
TimestampWrapper<IN1> timeStamp1;
TimestampWrapper<IN2> timeStamp2;
protected CoWindowDataStream(DataStream<IN1> dataStream1, DataStream<IN2> dataStream2,
long windowSize1, long windowSize2, long slideInterval1, long slideInterval2,
TimeStamp<IN1> timeStamp1, TimeStamp<IN2> timeStamp2) {
TimestampWrapper<IN1> timeStamp1, TimestampWrapper<IN2> timeStamp2) {
super(dataStream1, dataStream2, windowSize1, windowSize2, slideInterval1, slideInterval2);
this.timeStamp1 = timeStamp1;
this.timeStamp2 = timeStamp2;
}
protected CoWindowDataStream(ConnectedDataStream<IN1, IN2> coDataStream, long windowSize1,
long windowSize2, long slideInterval1, long slideInterval2, TimeStamp<IN1> timeStamp1,
TimeStamp<IN2> timeStamp2) {
long windowSize2, long slideInterval1, long slideInterval2,
TimestampWrapper<IN1> timeStamp1, TimestampWrapper<IN2> timeStamp2) {
super(coDataStream, windowSize1, windowSize2, slideInterval1, slideInterval2);
this.timeStamp1 = timeStamp1;
this.timeStamp2 = timeStamp2;
......@@ -96,9 +96,9 @@ public class CoWindowDataStream<IN1, IN2> extends CoBatchedDataStream<IN1, IN2>
CoReduceFunction<IN1, IN2, OUT> coReducer) {
CoWindowReduceInvokable<IN1, IN2, OUT> invokable;
if (isGrouped) {
invokable = new CoGroupedWindowReduceInvokable<IN1, IN2, OUT>(clean(coReducer), batchSize1,
batchSize2, slideSize1, slideSize2, keySelector1, keySelector2, timeStamp1,
timeStamp2);
invokable = new CoGroupedWindowReduceInvokable<IN1, IN2, OUT>(clean(coReducer),
batchSize1, batchSize2, slideSize1, slideSize2, keySelector1, keySelector2,
timeStamp1, timeStamp2);
} else {
invokable = new CoWindowReduceInvokable<IN1, IN2, OUT>(clean(coReducer), batchSize1,
batchSize2, slideSize1, slideSize2, timeStamp1, timeStamp2);
......
......@@ -36,8 +36,8 @@ import org.apache.flink.streaming.api.invokable.operator.co.CoInvokable;
import org.apache.flink.streaming.api.invokable.operator.co.CoMapInvokable;
import org.apache.flink.streaming.api.invokable.operator.co.CoReduceInvokable;
import org.apache.flink.streaming.api.invokable.operator.co.CoWindowInvokable;
import org.apache.flink.streaming.api.invokable.util.DefaultTimeStamp;
import org.apache.flink.streaming.api.invokable.util.TimeStamp;
import org.apache.flink.streaming.api.windowing.helper.SystemTimestamp;
import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
/**
* The ConnectedDataStream represents a stream for two different data types. It
......@@ -305,8 +305,8 @@ public class ConnectedDataStream<IN1, IN2> {
* @return The transformed {@link ConnectedDataStream}
*/
public CoWindowDataStream<IN1, IN2> window(long windowSize1, long windowSize2,
long slideInterval1, long slideInterval2, TimeStamp<IN1> timeStamp1,
TimeStamp<IN2> timeStamp2) {
long slideInterval1, long slideInterval2, TimestampWrapper<IN1> timeStamp1,
TimestampWrapper<IN2> timeStamp2) {
if (windowSize1 < 1 || windowSize2 < 1) {
throw new IllegalArgumentException("Window size must be positive");
}
......@@ -338,10 +338,12 @@ public class ConnectedDataStream<IN1, IN2> {
* second input data stream are slid by after each transformation
* @return The transformed {@link ConnectedDataStream}
*/
@SuppressWarnings("unchecked")
public CoWindowDataStream<IN1, IN2> window(long windowSize1, long windowSize2,
long slideInterval1, long slideInterval2) {
return window(windowSize1, windowSize2, slideInterval1, slideInterval2,
new DefaultTimeStamp<IN1>(), new DefaultTimeStamp<IN2>());
(TimestampWrapper<IN1>) SystemTimestamp.getWrapper(),
(TimestampWrapper<IN2>) SystemTimestamp.getWrapper());
}
/**
......@@ -365,7 +367,7 @@ public class ConnectedDataStream<IN1, IN2> {
* @return The transformed {@link ConnectedDataStream}
*/
public CoWindowDataStream<IN1, IN2> window(long windowSize1, long windowSize2,
TimeStamp<IN1> timeStamp1, TimeStamp<IN2> timeStamp2) {
TimestampWrapper<IN1> timeStamp1, TimestampWrapper<IN2> timeStamp2) {
return window(windowSize1, windowSize2, windowSize1, windowSize2, timeStamp1, timeStamp2);
}
......@@ -384,9 +386,11 @@ public class ConnectedDataStream<IN1, IN2> {
* milliseconds
* @return The transformed {@link ConnectedDataStream}
*/
@SuppressWarnings("unchecked")
public CoWindowDataStream<IN1, IN2> window(long windowSize1, long windowSize2) {
return window(windowSize1, windowSize2, windowSize1, windowSize2,
new DefaultTimeStamp<IN1>(), new DefaultTimeStamp<IN2>());
(TimestampWrapper<IN1>) SystemTimestamp.getWrapper(),
(TimestampWrapper<IN2>) SystemTimestamp.getWrapper());
}
/**
......@@ -479,10 +483,12 @@ public class ConnectedDataStream<IN1, IN2> {
*
* @return The transformed {@link DataStream}.
*/
@SuppressWarnings("unchecked")
public <OUT> SingleOutputStreamOperator<OUT, ?> windowReduce(
CoWindowFunction<IN1, IN2, OUT> coWindowFunction, long windowSize, long slideInterval) {
return windowReduce(coWindowFunction, windowSize, slideInterval,
new DefaultTimeStamp<IN1>(), new DefaultTimeStamp<IN2>());
(TimestampWrapper<IN1>) SystemTimestamp.getWrapper(),
(TimestampWrapper<IN2>) SystemTimestamp.getWrapper());
}
/**
......@@ -510,7 +516,7 @@ public class ConnectedDataStream<IN1, IN2> {
*/
public <OUT> SingleOutputStreamOperator<OUT, ?> windowReduce(
CoWindowFunction<IN1, IN2, OUT> coWindowFunction, long windowSize, long slideInterval,
TimeStamp<IN1> timestamp1, TimeStamp<IN2> timestamp2) {
TimestampWrapper<IN1> timestamp1, TimestampWrapper<IN2> timestamp2) {
if (windowSize < 1) {
throw new IllegalArgumentException("Window size must be positive");
......@@ -541,8 +547,8 @@ public class ConnectedDataStream<IN1, IN2> {
public <OUT> SingleOutputStreamOperator<OUT, ?> addGeneralWindowCombine(
CoWindowFunction<IN1, IN2, OUT> coWindowFunction, TypeInformation<OUT> outTypeInfo,
long windowSize, long slideInterval, TimeStamp<IN1> timestamp1,
TimeStamp<IN2> timestamp2) {
long windowSize, long slideInterval, TimestampWrapper<IN1> timestamp1,
TimestampWrapper<IN2> timestamp2) {
if (windowSize < 1) {
throw new IllegalArgumentException("Window size must be positive");
......@@ -550,7 +556,7 @@ public class ConnectedDataStream<IN1, IN2> {
if (slideInterval < 1) {
throw new IllegalArgumentException("Slide interval must be positive");
}
return addCoFunction("coWindowReduce", outTypeInfo, new CoWindowInvokable<IN1, IN2, OUT>(
clean(coWindowFunction), windowSize, slideInterval, timestamp1, timestamp2));
......
......@@ -18,8 +18,8 @@
package org.apache.flink.streaming.api.datastream;
import org.apache.flink.streaming.api.invokable.util.DefaultTimeStamp;
import org.apache.flink.streaming.api.invokable.util.TimeStamp;
import org.apache.flink.streaming.api.windowing.helper.SystemTimestamp;
import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
public abstract class TemporalOperator<I1, I2, OP> {
......@@ -29,8 +29,8 @@ public abstract class TemporalOperator<I1, I2, OP> {
public long windowSize;
public long slideInterval;
public TimeStamp<I1> timeStamp1;
public TimeStamp<I2> timeStamp2;
public TimestampWrapper<I1> timeStamp1;
public TimestampWrapper<I2> timeStamp2;
public TemporalOperator(DataStream<I1> input1, DataStream<I2> input2) {
if (input1 == null || input2 == null) {
......@@ -41,37 +41,37 @@ public abstract class TemporalOperator<I1, I2, OP> {
}
/**
* Continues a temporal Join transformation.<br/>
* Defines the window size on which the two DataStreams will be joined.
* Continues a temporal transformation.<br/>
* Defines the window size on which the two DataStreams will be transformed.
*
* @param windowSize
* The size of the window in milliseconds.
* @return An incomplete Join transformation. Call {@link JoinWindow#where}
* to continue the Join.
* @return An incomplete temporal transformation.
*/
public OP onWindow(long windowSize) {
return onWindow(windowSize, windowSize);
}
/**
* Continues a temporal Join transformation.<br/>
* Defines the window size on which the two DataStreams will be joined.
* Continues a temporal transformation.<br/>
* Defines the window size on which the two DataStreams will be transformed.
*
* @param windowSize
* The size of the window in milliseconds.
* @param slideInterval
* The slide size of the window.
* @return An incomplete Join transformation. Call {@link JoinWindow#where}
* to continue the Join.
* @return An incomplete temporal transformation.
*/
@SuppressWarnings("unchecked")
public OP onWindow(long windowSize, long slideInterval) {
return onWindow(windowSize, slideInterval, new DefaultTimeStamp<I1>(),
new DefaultTimeStamp<I2>());
return onWindow(windowSize, slideInterval,
(TimestampWrapper<I1>) SystemTimestamp.getWrapper(),
(TimestampWrapper<I2>) SystemTimestamp.getWrapper());
}
/**
* Continues a temporal Join transformation.<br/>
* Defines the window size on which the two DataStreams will be joined.
* Continues a temporal transformation.<br/>
* Defines the window size on which the two DataStreams will be transformed.
*
* @param windowSize
* The size of the window in milliseconds.
......@@ -83,11 +83,10 @@ public abstract class TemporalOperator<I1, I2, OP> {
* @param timeStamp2
* The timestamp used to extract time from the elements of the
* second data stream.
* @return An incomplete Join transformation. Call {@link JoinWindow#where}
* to continue the Join.
* @return An incomplete temporal transformation.
*/
public OP onWindow(long windowSize, long slideInterval, TimeStamp<I1> timeStamp1,
TimeStamp<I2> timeStamp2) {
public OP onWindow(long windowSize, long slideInterval, TimestampWrapper<I1> timeStamp1,
TimestampWrapper<I2> timeStamp2) {
this.windowSize = windowSize;
this.slideInterval = slideInterval;
......
......@@ -22,8 +22,8 @@ import java.util.Map;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.function.co.CoReduceFunction;
import org.apache.flink.streaming.api.invokable.util.TimeStamp;
import org.apache.flink.streaming.api.streamrecord.StreamRecord;
import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
public class CoGroupedWindowReduceInvokable<IN1, IN2, OUT> extends
CoWindowReduceInvokable<IN1, IN2, OUT> {
......@@ -38,7 +38,7 @@ public class CoGroupedWindowReduceInvokable<IN1, IN2, OUT> extends
public CoGroupedWindowReduceInvokable(CoReduceFunction<IN1, IN2, OUT> coReducer,
long windowSize1, long windowSize2, long slideInterval1, long slideInterval2,
KeySelector<IN1, ?> keySelector1, KeySelector<IN2, ?> keySelector2,
TimeStamp<IN1> timestamp1, TimeStamp<IN2> timestamp2) {
TimestampWrapper<IN1> timestamp1, TimestampWrapper<IN2> timestamp2) {
super(coReducer, windowSize1, windowSize2, slideInterval1, slideInterval2, timestamp1,
timestamp2);
this.keySelector1 = keySelector1;
......
......@@ -23,8 +23,8 @@ import java.util.List;
import org.apache.commons.math.util.MathUtils;
import org.apache.flink.streaming.api.function.co.CoWindowFunction;
import org.apache.flink.streaming.api.invokable.util.TimeStamp;
import org.apache.flink.streaming.api.streamrecord.StreamRecord;
import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
import org.apache.flink.streaming.state.CircularFifoList;
public class CoWindowInvokable<IN1, IN2, OUT> extends CoInvokable<IN1, IN2, OUT> {
......@@ -35,8 +35,8 @@ public class CoWindowInvokable<IN1, IN2, OUT> extends CoInvokable<IN1, IN2, OUT>
protected long slideSize;
protected CircularFifoList<StreamRecord<IN1>> circularList1;
protected CircularFifoList<StreamRecord<IN2>> circularList2;
protected TimeStamp<IN1> timeStamp1;
protected TimeStamp<IN2> timeStamp2;
protected TimestampWrapper<IN1> timeStamp1;
protected TimestampWrapper<IN2> timeStamp2;
protected StreamWindow window;
......@@ -44,7 +44,7 @@ public class CoWindowInvokable<IN1, IN2, OUT> extends CoInvokable<IN1, IN2, OUT>
protected long nextRecordTime;
public CoWindowInvokable(CoWindowFunction<IN1, IN2, OUT> coWindowFunction, long windowSize,
long slideInterval, TimeStamp<IN1> timeStamp1, TimeStamp<IN2> timeStamp2) {
long slideInterval, TimestampWrapper<IN1> timeStamp1, TimestampWrapper<IN2> timeStamp2) {
super(coWindowFunction);
this.coWindowFunction = coWindowFunction;
this.windowSize = windowSize;
......
......@@ -19,8 +19,7 @@ package org.apache.flink.streaming.api.invokable.operator.co;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.function.co.CoReduceFunction;
import org.apache.flink.streaming.api.invokable.util.DefaultTimeStamp;
import org.apache.flink.streaming.api.invokable.util.TimeStamp;
import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
public class CoWindowReduceInvokable<IN1, IN2, OUT> extends CoBatchReduceInvokable<IN1, IN2, OUT> {
private static final long serialVersionUID = 1L;
......@@ -28,14 +27,14 @@ public class CoWindowReduceInvokable<IN1, IN2, OUT> extends CoBatchReduceInvokab
protected long startTime2;
protected long nextRecordTime1;
protected long nextRecordTime2;
protected TimeStamp<IN1> timestamp1;
protected TimeStamp<IN2> timestamp2;
protected TimestampWrapper<IN1> timestamp1;
protected TimestampWrapper<IN2> timestamp2;
protected StreamWindow<IN1> window1;
protected StreamWindow<IN2> window2;
public CoWindowReduceInvokable(CoReduceFunction<IN1, IN2, OUT> coReducer, long windowSize1,
long windowSize2, long slideInterval1, long slideInterval2, TimeStamp<IN1> timestamp1,
TimeStamp<IN2> timestamp2) {
long windowSize2, long slideInterval1, long slideInterval2,
TimestampWrapper<IN1> timestamp1, TimestampWrapper<IN2> timestamp2) {
super(coReducer, windowSize1, windowSize2, slideInterval1, slideInterval2);
this.timestamp1 = timestamp1;
this.timestamp2 = timestamp2;
......@@ -51,10 +50,10 @@ public class CoWindowReduceInvokable<IN1, IN2, OUT> extends CoBatchReduceInvokab
this.window2 = new StreamWindow<IN2>(batchSize2, slideSize2);
this.batch1 = this.window1;
this.batch2 = this.window2;
if (timestamp1 instanceof DefaultTimeStamp) {
if (timestamp1.isDefaultTimestamp()) {
(new TimeCheck1()).start();
}
if (timestamp2 instanceof DefaultTimeStamp) {
if (timestamp2.isDefaultTimestamp()) {
(new TimeCheck2()).start();
}
}
......
......@@ -84,8 +84,8 @@ public class Delta<DATA> implements WindowingHelper<DATA> {
* The threshold used by the delta function.
* @return Helper representing a delta trigger or eviction policy
*/
public static <DATA> Delta<DATA> of(DeltaFunction<DATA> deltaFunction, DATA initVal,
double threshold) {
public static <DATA> Delta<DATA> of(double threshold, DeltaFunction<DATA> deltaFunction,
DATA initVal) {
return new Delta<DATA>(deltaFunction, initVal, threshold);
}
}
......@@ -15,16 +15,14 @@
* limitations under the License.
*/
package org.apache.flink.streaming.api.invokable.util;
package org.apache.flink.streaming.api.windowing.helper;
/**
* Default timestamp function that uses the Java System.currentTimeMillis()
* method to retrieve a timestamp.
*
* @param <T>
* Type of the inputs of the reducing function.
* {@link Timestamp} implementation to be used when system time is needed to
* determine windows
*/
public class DefaultTimeStamp<T> implements TimeStamp<T> {
public class SystemTimestamp<T> implements Timestamp<T> {
private static final long serialVersionUID = 1L;
@Override
......@@ -32,8 +30,8 @@ public class DefaultTimeStamp<T> implements TimeStamp<T> {
return System.currentTimeMillis();
}
@Override
public long getStartTime() {
return System.currentTimeMillis();
public static <R> TimestampWrapper<R> getWrapper() {
return new TimestampWrapper<R>(new SystemTimestamp<R>(), System.currentTimeMillis());
}
}
......@@ -19,8 +19,6 @@ package org.apache.flink.streaming.api.windowing.helper;
import java.util.concurrent.TimeUnit;
import org.apache.flink.streaming.api.invokable.util.DefaultTimeStamp;
import org.apache.flink.streaming.api.invokable.util.TimeStamp;
import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
import org.apache.flink.streaming.api.windowing.policy.TimeEvictionPolicy;
import org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy;
......@@ -39,7 +37,7 @@ public class Time<DATA> implements WindowingHelper<DATA> {
private long length;
private TimeUnit granularity;
private TimeStamp<DATA> timeStamp;
private TimestampWrapper<DATA> timestampWrapper;
private long delay;
/**
......@@ -53,31 +51,52 @@ public class Time<DATA> implements WindowingHelper<DATA> {
* the smallest possible granularity is milliseconds. Any smaller
* time unit might cause an error at runtime due to conversion
* problems.
* @param timeStamp
* @param timestamp
* The user defined timestamp that will be used to extract time
* information from the incoming elements
* @param startTime
* The startTime of the stream for computing the first window
*/
private Time(long length, TimeUnit timeUnit, TimeStamp<DATA> timeStamp) {
private Time(long length, TimeUnit timeUnit, Timestamp<DATA> timestamp, long startTime) {
this(length, timeUnit, new TimestampWrapper<DATA>(timestamp, startTime));
}
/**
* Creates an helper representing a trigger which triggers every given
* length or an eviction which evicts all elements older than length.
*
* @param length
* The number of time units
* @param timeUnit
* The unit of time such as minute oder millisecond. Note that
* the smallest possible granularity is milliseconds. Any smaller
* time unit might cause an error at runtime due to conversion
* problems.
* @param timestampWrapper
* The user defined {@link TimestampWrapper} that will be used to
* extract time information from the incoming elements
*/
private Time(long length, TimeUnit timeUnit, TimestampWrapper<DATA> timestampWrapper) {
this.length = length;
this.granularity = timeUnit;
this.timeStamp = timeStamp;
this.timestampWrapper = timestampWrapper;
this.delay = 0;
}
@Override
public EvictionPolicy<DATA> toEvict() {
return new TimeEvictionPolicy<DATA>(granularityInMillis(), timeStamp);
return new TimeEvictionPolicy<DATA>(granularityInMillis(), timestampWrapper);
}
@Override
public TriggerPolicy<DATA> toTrigger() {
return new TimeTriggerPolicy<DATA>(granularityInMillis(), timeStamp, delay);
return new TimeTriggerPolicy<DATA>(granularityInMillis(), timestampWrapper, delay);
}
/**
* Creates a helper representing a time trigger which triggers every given
* length (slide size) or a time eviction which evicts all elements older
* than length (window size).
* than length (window size) using System time.
*
* @param length
* The number of time units
......@@ -88,8 +107,10 @@ public class Time<DATA> implements WindowingHelper<DATA> {
* problems.
* @return Helper representing the time based trigger and eviction policy
*/
@SuppressWarnings("unchecked")
public static <DATA> Time<DATA> of(long length, TimeUnit timeUnit) {
return new Time<DATA>(length, timeUnit, new DefaultTimeStamp<DATA>());
return new Time<DATA>(length, timeUnit,
(TimestampWrapper<DATA>) SystemTimestamp.getWrapper());
}
/**
......@@ -99,13 +120,32 @@ public class Time<DATA> implements WindowingHelper<DATA> {
*
* @param length
* The number of time units
* @param timeStamp
* @param timestamp
* The user defined timestamp that will be used to extract time
* information from the incoming elements
* @param startTime
* The startTime used to compute the first window
* @return Helper representing the time based trigger and eviction policy
*/
public static <DATA> Time<DATA> of(long length, Timestamp<DATA> timestamp, long startTime) {
return new Time<DATA>(length, null, timestamp, startTime);
}
/**
* Creates a helper representing a time trigger which triggers every given
* length (slide size) or a time eviction which evicts all elements older
* than length (window size) using a user defined timestamp extractor. By
* default the start time is set to 0.
*
* @param length
* The number of time units
* @param timestamp
* The user defined timestamp that will be used to extract time
* information from the incoming elements
* @return Helper representing the time based trigger and eviction policy
*/
public static <DATA> Time<DATA> of(long length, TimeStamp<DATA> timeStamp) {
return new Time<DATA>(length, TimeUnit.MILLISECONDS, timeStamp);
public static <DATA> Time<DATA> of(long length, Timestamp<DATA> timestamp) {
return of(length, timestamp, 0);
}
/**
......@@ -121,6 +161,10 @@ public class Time<DATA> implements WindowingHelper<DATA> {
}
private long granularityInMillis() {
return this.granularity.toMillis(this.length);
if (granularity != null) {
return this.granularity.toMillis(this.length);
} else {
return this.length;
}
}
}
......@@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.apache.flink.streaming.api.invokable.util;
package org.apache.flink.streaming.api.windowing.helper;
import java.io.Serializable;
......@@ -26,7 +26,7 @@ import java.io.Serializable;
* @param <T>
* Type of the value to create the timestamp from.
*/
public interface TimeStamp<T> extends Serializable {
public interface Timestamp<T> extends Serializable {
/**
* Values
......@@ -36,11 +36,4 @@ public interface TimeStamp<T> extends Serializable {
* @return The timestamp
*/
public long getTimestamp(T value);
/**
* Function to define the starting time for reference
*
* @return The starting timestamp
*/
public long getStartTime();
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.windowing.helper;
import java.io.Serializable;
public class TimestampWrapper<T> implements Serializable {
private static final long serialVersionUID = 1L;
private long startTime;
private Timestamp<T> timestamp;
public TimestampWrapper(Timestamp<T> timeStamp, long startTime) {
this.timestamp = timeStamp;
this.startTime = startTime;
}
public long getTimestamp(T in) {
return timestamp.getTimestamp(in);
}
public long getStartTime() {
return startTime;
}
public boolean isDefaultTimestamp() {
return timestamp instanceof SystemTimestamp;
}
}
......@@ -17,7 +17,7 @@
package org.apache.flink.streaming.api.windowing.policy;
import org.apache.flink.streaming.api.invokable.util.TimeStamp;
import org.apache.flink.streaming.api.windowing.helper.Timestamp;
/**
* This interface extends the {@link TriggerPolicy} interface with functionality
......@@ -28,7 +28,7 @@ import org.apache.flink.streaming.api.invokable.util.TimeStamp;
* first. It can return zero ore more fake data points which will be added
* before the the currently arrived real element gets processed. This allows to
* handle empty windows in time based windowing with an user defined
* {@link TimeStamp}. Triggers are not called on fake datapoint. A fake
* {@link Timestamp}. Triggers are not called on fake datapoint. A fake
* datapoint is always considered as triggered.
*
* 2) An active trigger has a factory method for a runnable. This factory method
......@@ -49,7 +49,7 @@ public interface ActiveTriggerPolicy<DATA> extends TriggerPolicy<DATA> {
* first. It can return zero ore more fake data points which will be added
* before the the currently arrived real element gets processed. This allows
* to handle empty windows in time based windowing with an user defined
* {@link TimeStamp}. Triggers are not called on fake datapoints. A fake
* {@link Timestamp}. Triggers are not called on fake datapoints. A fake
* datapoint is always considered as triggered.
*
* @param datapoint
......
......@@ -19,14 +19,15 @@ package org.apache.flink.streaming.api.windowing.policy;
import java.util.LinkedList;
import org.apache.flink.streaming.api.invokable.util.TimeStamp;
import org.apache.flink.streaming.api.windowing.helper.Timestamp;
import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
/**
* This eviction policy evicts all elements which are older then a specified
* time. The time is measured using a given {@link TimeStamp} implementation. A
* time. The time is measured using a given {@link Timestamp} implementation. A
* point in time is always represented as long. Therefore, the granularity can
* be set as long value as well.
*
*
* @param <DATA>
* The type of the incoming data points which are processed by this
* policy.
......@@ -40,12 +41,12 @@ public class TimeEvictionPolicy<DATA> implements ActiveEvictionPolicy<DATA>,
private static final long serialVersionUID = -1457476766124518220L;
private long granularity;
private TimeStamp<DATA> timestamp;
private TimestampWrapper<DATA> timestampWrapper;
private LinkedList<Long> buffer = new LinkedList<Long>();
/**
* This eviction policy evicts all elements which are older than a specified
* time. The time is measured using a given {@link TimeStamp}
* time. The time is measured using a given {@link Timestamp}
* implementation. A point in time is always represented as long. Therefore,
* the granularity can be set as long value as well. If this value is set to
* 2 the policy will evict all elements which are older as 2.
......@@ -60,12 +61,12 @@ public class TimeEvictionPolicy<DATA> implements ActiveEvictionPolicy<DATA>,
* The granularity of the eviction. If this value is set to 2 the
* policy will evict all elements which are older as 2(if
* (time(X)<current time-granularity) evict X).
* @param timestamp
* The {@link TimeStamp} to measure the time with. This can be
* either user defined of provided by the API.
* @param timestampWrapper
* The {@link TimestampWrapper} to measure the time with. This
* can be either user defined of provided by the API.
*/
public TimeEvictionPolicy(long granularity, TimeStamp<DATA> timestamp) {
this.timestamp = timestamp;
public TimeEvictionPolicy(long granularity, TimestampWrapper<DATA> timestampWrapper) {
this.timestampWrapper = timestampWrapper;
this.granularity = granularity;
}
......@@ -78,7 +79,7 @@ public class TimeEvictionPolicy<DATA> implements ActiveEvictionPolicy<DATA>,
try {
threshold = (Long) datapoint - granularity;
} catch (ClassCastException e) {
threshold = timestamp.getTimestamp((DATA) datapoint) - granularity;
threshold = timestampWrapper.getTimestamp((DATA) datapoint) - granularity;
}
// return result
......@@ -91,9 +92,9 @@ public class TimeEvictionPolicy<DATA> implements ActiveEvictionPolicy<DATA>,
checkForDeleted(bufferSize);
//remember timestamp
long time=timestamp.getTimestamp(datapoint);
// remember timestamp
long time = timestampWrapper.getTimestamp(datapoint);
// delete and count expired tuples
long threshold = time - granularity;
int counter = deleteAndCountExpired(threshold);
......@@ -130,7 +131,7 @@ public class TimeEvictionPolicy<DATA> implements ActiveEvictionPolicy<DATA>,
@Override
public TimeEvictionPolicy<DATA> clone() {
return new TimeEvictionPolicy<DATA>(granularity, timestamp);
return new TimeEvictionPolicy<DATA>(granularity, timestampWrapper);
}
}
......@@ -19,12 +19,12 @@ package org.apache.flink.streaming.api.windowing.policy;
import java.util.LinkedList;
import org.apache.flink.streaming.api.invokable.util.DefaultTimeStamp;
import org.apache.flink.streaming.api.invokable.util.TimeStamp;
import org.apache.flink.streaming.api.windowing.helper.Timestamp;
import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
/**
* This trigger policy triggers with regard to the time. The is measured using a
* given {@link TimeStamp} implementation. A point in time is always represented
* given {@link Timestamp} implementation. A point in time is always represented
* as long. Therefore, parameters such as granularity and delay can be set as
* long value as well.
*
......@@ -42,12 +42,12 @@ public class TimeTriggerPolicy<DATA> implements ActiveTriggerPolicy<DATA>,
protected long startTime;
protected long granularity;
protected TimeStamp<DATA> timestamp;
protected TimestampWrapper<DATA> timestampWrapper;
protected long delay;
/**
* This trigger policy triggers with regard to the time. The is measured
* using a given {@link TimeStamp} implementation. A point in time is always
* using a given {@link Timestamp} implementation. A point in time is always
* represented as long. Therefore, parameters such as granularity can be set
* as long value as well. If this value for the granularity is set to 2 for
* example, the policy will trigger at every second point in time.
......@@ -55,22 +55,22 @@ public class TimeTriggerPolicy<DATA> implements ActiveTriggerPolicy<DATA>,
* @param granularity
* The granularity of the trigger. If this value is set to x the
* policy will trigger at every x-th time point
* @param timestamp
* The {@link TimeStamp} to measure the time with. This can be
* either user defined of provided by the API.
* @param timestampWrapper
* The {@link TimestampWrapper} to measure the time with. This
* can be either user defined of provided by the API.
* @param timeWrapper
* This policy creates fake elements to not miss windows in case
* no element arrived within the duration of the window. This
* extractor should wrap a long into such an element of type
* DATA.
*/
public TimeTriggerPolicy(long granularity, TimeStamp<DATA> timestamp) {
this(granularity, timestamp, 0);
public TimeTriggerPolicy(long granularity, TimestampWrapper<DATA> timestampWrapper) {
this(granularity, timestampWrapper, 0);
}
/**
* This is mostly the same as
* {@link TimeTriggerPolicy#TimeTriggerPolicy(long, TimeStamp)}. In addition
* {@link TimeTriggerPolicy#TimeTriggerPolicy(long, Timestamp)}. In addition
* to granularity and timestamp a delay can be specified for the first
* trigger. If the start time given by the timestamp is x, the delay is y,
* and the granularity is z, the first trigger will happen at x+y+z.
......@@ -78,9 +78,9 @@ public class TimeTriggerPolicy<DATA> implements ActiveTriggerPolicy<DATA>,
* @param granularity
* The granularity of the trigger. If this value is set to 2 the
* policy will trigger at every second time point
* @param timestamp
* The {@link TimeStamp} to measure the time with. This can be
* either user defined of provided by the API.
* @param timestampWrapper
* The {@link TimestampWrapper} to measure the time with. This
* can be either user defined of provided by the API.
* @param delay
* A delay for the first trigger. If the start time given by the
* timestamp is x, the delay is y, and the granularity is z, the
......@@ -91,9 +91,9 @@ public class TimeTriggerPolicy<DATA> implements ActiveTriggerPolicy<DATA>,
* extractor should wrap a long into such an element of type
* DATA.
*/
public TimeTriggerPolicy(long granularity, TimeStamp<DATA> timestamp, long delay) {
this.startTime = timestamp.getStartTime() + delay;
this.timestamp = timestamp;
public TimeTriggerPolicy(long granularity, TimestampWrapper<DATA> timestampWrapper, long delay) {
this.startTime = timestampWrapper.getStartTime() + delay;
this.timestampWrapper = timestampWrapper;
this.granularity = granularity;
this.delay = delay;
}
......@@ -107,7 +107,7 @@ public class TimeTriggerPolicy<DATA> implements ActiveTriggerPolicy<DATA>,
LinkedList<Object> fakeElements = new LinkedList<Object>();
// check if there is more then one window border missed
// use > here. In case >= would fit, the regular call will do the job.
while (timestamp.getTimestamp(datapoint) >= startTime + granularity) {
while (timestampWrapper.getTimestamp(datapoint) >= startTime + granularity) {
startTime += granularity;
fakeElements.add(startTime - 1);
}
......@@ -127,7 +127,7 @@ public class TimeTriggerPolicy<DATA> implements ActiveTriggerPolicy<DATA>,
*/
@Override
public Runnable createActiveTriggerRunnable(ActiveTriggerCallback callback) {
if (this.timestamp instanceof DefaultTimeStamp) {
if (this.timestampWrapper.isDefaultTimestamp()) {
return new TimeCheck(callback);
} else {
return null;
......@@ -177,7 +177,7 @@ public class TimeTriggerPolicy<DATA> implements ActiveTriggerPolicy<DATA>,
@Override
public synchronized boolean notifyTrigger(DATA datapoint) {
long recordTime = timestamp.getTimestamp(datapoint);
long recordTime = timestampWrapper.getTimestamp(datapoint);
if (recordTime >= startTime + granularity) {
if (granularity != 0) {
startTime = recordTime - ((recordTime - startTime) % granularity);
......@@ -190,7 +190,7 @@ public class TimeTriggerPolicy<DATA> implements ActiveTriggerPolicy<DATA>,
@Override
public TimeTriggerPolicy<DATA> clone() {
return new TimeTriggerPolicy<DATA>(granularity, timestamp, delay);
return new TimeTriggerPolicy<DATA>(granularity, timestampWrapper, delay);
}
}
......@@ -28,7 +28,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.function.sink.SinkFunction;
import org.apache.flink.streaming.api.invokable.util.TimeStamp;
import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
import org.apache.flink.streaming.util.TestStreamEnvironment;
import org.junit.Test;
......@@ -96,12 +96,16 @@ public class WindowCrossJoinTest implements Serializable {
DataStream<Tuple2<Integer, String>> inStream1 = env.fromCollection(in1);
DataStream<Tuple1<Integer>> inStream2 = env.fromCollection(in2);
inStream1.join(inStream2).onWindow(1000, 1000, new MyTimestamp1(), new MyTimestamp2())
.where(0).equalTo(0).addSink(new JoinResultSink());
inStream1
.join(inStream2)
.onWindow(1000, 1000, new MyTimestamp<Tuple2<Integer, String>>(),
new MyTimestamp<Tuple1<Integer>>()).where(0).equalTo(0)
.addSink(new JoinResultSink());
inStream1
.cross(inStream2)
.onWindow(1000, 1000, new MyTimestamp1(), new MyTimestamp2())
.onWindow(1000, 1000, new MyTimestamp<Tuple2<Integer, String>>(),
new MyTimestamp<Tuple1<Integer>>())
.with(new CrossFunction<Tuple2<Integer, String>, Tuple1<Integer>, Tuple2<Tuple2<Integer, String>, Tuple1<Integer>>>() {
private static final long serialVersionUID = 1L;
......@@ -119,25 +123,15 @@ public class WindowCrossJoinTest implements Serializable {
assertEquals(crossExpectedResults, crossResults);
}
private static class MyTimestamp1 implements TimeStamp<Tuple2<Integer, String>> {
private static final long serialVersionUID = 1L;
@Override
public long getTimestamp(Tuple2<Integer, String> value) {
return 101L;
private static class MyTimestamp<T> extends TimestampWrapper<T> {
public MyTimestamp() {
super(null, 0);
}
@Override
public long getStartTime() {
return 100L;
}
}
private static class MyTimestamp2 implements TimeStamp<Tuple1<Integer>> {
private static final long serialVersionUID = 1L;
@Override
public long getTimestamp(Tuple1<Integer> value) {
public long getTimestamp(T value) {
return 101L;
}
......
......@@ -29,7 +29,7 @@ import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.function.co.CoReduceFunction;
import org.apache.flink.streaming.api.invokable.operator.co.CoGroupedWindowReduceInvokable;
import org.apache.flink.streaming.api.invokable.util.TimeStamp;
import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
import org.apache.flink.streaming.util.MockCoContext;
import org.junit.Test;
......@@ -82,13 +82,14 @@ public class CoGroupedWindowReduceTest {
}
}
public static final class MyTimeStamp<T> implements TimeStamp<T> {
public static final class MyTimeStamp<T> extends TimestampWrapper<T> {
private static final long serialVersionUID = 1L;
private Iterator<Long> timestamps;
private long start;
public MyTimeStamp(List<Long> timestamps) {
super(null, 0);
this.timestamps = timestamps.iterator();
this.start = timestamps.get(0);
}
......
......@@ -27,7 +27,7 @@ import java.util.List;
import org.apache.flink.streaming.api.function.co.CoReduceFunction;
import org.apache.flink.streaming.api.invokable.operator.co.CoWindowReduceInvokable;
import org.apache.flink.streaming.api.invokable.util.TimeStamp;
import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
import org.apache.flink.streaming.util.MockCoContext;
import org.junit.Test;
......@@ -57,13 +57,14 @@ public class CoWindowReduceTest {
}
}
public static final class MyTimeStamp<T> implements TimeStamp<T> {
public static final class MyTimeStamp<T> extends TimestampWrapper<T> {
private static final long serialVersionUID = 1L;
private Iterator<Long> timestamps;
private long start;
public MyTimeStamp(List<Long> timestamps) {
super(null, 0);
this.timestamps = timestamps.iterator();
this.start = timestamps.get(0);
}
......
......@@ -27,7 +27,8 @@ import java.util.Set;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.function.co.CoWindowFunction;
import org.apache.flink.streaming.api.invokable.operator.co.CoWindowInvokable;
import org.apache.flink.streaming.api.invokable.util.TimeStamp;
import org.apache.flink.streaming.api.windowing.helper.Timestamp;
import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
import org.apache.flink.streaming.util.MockCoContext;
import org.apache.flink.util.Collector;
import org.junit.Test;
......@@ -80,7 +81,7 @@ public class CoWindowTest {
}
private static final class MyTS1 implements TimeStamp<Integer> {
private static final class MyTS1 implements Timestamp<Integer> {
private static final long serialVersionUID = 1L;
......@@ -89,14 +90,9 @@ public class CoWindowTest {
return value;
}
@Override
public long getStartTime() {
return 1;
}
}
private static final class MyTS2 implements TimeStamp<Tuple2<Integer, Integer>> {
private static final class MyTS2 implements Timestamp<Tuple2<Integer, Integer>> {
private static final long serialVersionUID = 1L;
......@@ -105,18 +101,14 @@ public class CoWindowTest {
return value.f0;
}
@Override
public long getStartTime() {
return 1;
}
}
@Test
public void coWindowGroupReduceTest2() throws Exception {
CoWindowInvokable<Integer, Integer, Integer> invokable1 = new CoWindowInvokable<Integer, Integer, Integer>(
new MyCoGroup1(), 2, 1, new MyTS1(), new MyTS1());
new MyCoGroup1(), 2, 1, new TimestampWrapper<Integer>(new MyTS1(), 1),
new TimestampWrapper<Integer>(new MyTS1(), 1));
// Windowsize 2, slide 1
// 1,2|2,3|3,4|4,5
......@@ -152,7 +144,8 @@ public class CoWindowTest {
assertEquals(expected1, actual1);
CoWindowInvokable<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Integer> invokable2 = new CoWindowInvokable<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Integer>(
new MyCoGroup2(), 2, 3, new MyTS2(), new MyTS2());
new MyCoGroup2(), 2, 3, new TimestampWrapper<Tuple2<Integer, Integer>>(new MyTS2(),
1), new TimestampWrapper<Tuple2<Integer, Integer>>(new MyTS2(), 1));
// WindowSize 2, slide 3
// 1,2|4,5|7,8|
......
......@@ -28,7 +28,8 @@ import java.util.List;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.invokable.util.TimeStamp;
import org.apache.flink.streaming.api.windowing.helper.Timestamp;
import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
import org.apache.flink.streaming.api.windowing.policy.ActiveCloneableEvictionPolicyWrapper;
import org.apache.flink.streaming.api.windowing.policy.CloneableEvictionPolicy;
import org.apache.flink.streaming.api.windowing.policy.CloneableTriggerPolicy;
......@@ -360,20 +361,18 @@ public class GroupedWindowInvokableTest {
expected.add(new Tuple2<Integer, String>(32, "b"));
expected.add(new Tuple2<Integer, String>(32, "c"));
TimeStamp<Tuple2<Integer, String>> myTimeStamp = new TimeStamp<Tuple2<Integer, String>>() {
Timestamp<Tuple2<Integer, String>> myTimeStamp = new Timestamp<Tuple2<Integer, String>>() {
private static final long serialVersionUID = 1L;
@Override
public long getTimestamp(Tuple2<Integer, String> value) {
return value.f0;
}
@Override
public long getStartTime() {
return 1;
}
};
TimestampWrapper<Tuple2<Integer, String>> myTimeStampWrapper = new TimestampWrapper<Tuple2<Integer, String>>(
myTimeStamp, 1);
ReduceFunction<Tuple2<Integer, String>> myReduceFunction = new ReduceFunction<Tuple2<Integer, String>>() {
private static final long serialVersionUID = 1L;
......@@ -387,11 +386,11 @@ public class GroupedWindowInvokableTest {
LinkedList<TriggerPolicy<Tuple2<Integer, String>>> triggers = new LinkedList<TriggerPolicy<Tuple2<Integer, String>>>();
// Trigger every 2 time units but delay the first trigger by 2 (First
// trigger after 4, then every 2)
triggers.add(new TimeTriggerPolicy<Tuple2<Integer, String>>(2L, myTimeStamp, 2L));
triggers.add(new TimeTriggerPolicy<Tuple2<Integer, String>>(2L, myTimeStampWrapper, 2L));
LinkedList<CloneableEvictionPolicy<Tuple2<Integer, String>>> evictions = new LinkedList<CloneableEvictionPolicy<Tuple2<Integer, String>>>();
// Always delete all elements older then 4
evictions.add(new TimeEvictionPolicy<Tuple2<Integer, String>>(4L, myTimeStamp));
evictions.add(new TimeEvictionPolicy<Tuple2<Integer, String>>(4L, myTimeStampWrapper));
LinkedList<CloneableTriggerPolicy<Tuple2<Integer, String>>> distributedTriggers = new LinkedList<CloneableTriggerPolicy<Tuple2<Integer, String>>>();
......@@ -409,10 +408,10 @@ public class GroupedWindowInvokableTest {
// repeat the test with central eviction. The result should be the same.
triggers.clear();
triggers.add(new TimeTriggerPolicy<Tuple2<Integer, String>>(2L, myTimeStamp, 2L));
triggers.add(new TimeTriggerPolicy<Tuple2<Integer, String>>(2L, myTimeStampWrapper, 2L));
evictions.clear();
LinkedList<EvictionPolicy<Tuple2<Integer, String>>> centralEvictions = new LinkedList<EvictionPolicy<Tuple2<Integer, String>>>();
centralEvictions.add(new TimeEvictionPolicy<Tuple2<Integer, String>>(4L, myTimeStamp));
centralEvictions.add(new TimeEvictionPolicy<Tuple2<Integer, String>>(4L, myTimeStampWrapper));
invokable = new GroupedWindowInvokable<Tuple2<Integer, String>, Tuple2<Integer, String>>(
myReduceFunction, keySelector, distributedTriggers, evictions, triggers,
......
......@@ -24,7 +24,8 @@ import java.util.LinkedList;
import java.util.List;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.invokable.util.TimeStamp;
import org.apache.flink.streaming.api.windowing.helper.Timestamp;
import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
import org.apache.flink.streaming.api.windowing.policy.CountEvictionPolicy;
import org.apache.flink.streaming.api.windowing.policy.CountTriggerPolicy;
import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
......@@ -62,18 +63,13 @@ public class WindowInvokableTest {
expected.add(10);
expected.add(32);
TimeStamp<Integer> myTimeStamp = new TimeStamp<Integer>() {
Timestamp<Integer> myTimeStamp = new Timestamp<Integer>() {
private static final long serialVersionUID = 1L;
@Override
public long getTimestamp(Integer value) {
return value;
}
@Override
public long getStartTime() {
return 1;
}
};
ReduceFunction<Integer> myReduceFunction = new ReduceFunction<Integer>() {
......@@ -88,11 +84,13 @@ public class WindowInvokableTest {
LinkedList<TriggerPolicy<Integer>> triggers = new LinkedList<TriggerPolicy<Integer>>();
// Trigger every 2 time units but delay the first trigger by 2 (First
// trigger after 4, then every 2)
triggers.add(new TimeTriggerPolicy<Integer>(2L, myTimeStamp, 2L));
triggers.add(new TimeTriggerPolicy<Integer>(2L, new TimestampWrapper<Integer>(myTimeStamp,
1), 2L));
LinkedList<EvictionPolicy<Integer>> evictions = new LinkedList<EvictionPolicy<Integer>>();
// Always delete all elements older then 4
evictions.add(new TimeEvictionPolicy<Integer>(4L, myTimeStamp));
evictions.add(new TimeEvictionPolicy<Integer>(4L, new TimestampWrapper<Integer>(
myTimeStamp, 1)));
WindowInvokable<Integer, Integer> invokable = new WindowReduceInvokable<Integer>(
myReduceFunction, triggers, evictions);
......
......@@ -19,7 +19,8 @@ package org.apache.flink.streaming.api.windowing.policy;
import java.util.LinkedList;
import org.apache.flink.streaming.api.invokable.util.TimeStamp;
import org.apache.flink.streaming.api.windowing.helper.Timestamp;
import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
import org.junit.Test;
import static org.junit.Assert.*;
......@@ -35,25 +36,20 @@ public class TimeEvictionPolicyTest {
// create a timestamp
@SuppressWarnings("serial")
TimeStamp<Integer> timeStamp = new TimeStamp<Integer>() {
Timestamp<Integer> timeStamp = new Timestamp<Integer>() {
@Override
public long getTimestamp(Integer value) {
return value;
}
@Override
public long getStartTime() {
return 0;
}
};
// test different granularity
for (long granularity = 0; granularity < 40; granularity++) {
// create policy
TimeEvictionPolicy<Integer> policy = new TimeEvictionPolicy<Integer>(granularity,
timeStamp);
new TimestampWrapper<Integer>(timeStamp, 0));
// The trigger status should not effect the policy. Therefore, it's
// value is changed after each usage.
......
......@@ -21,7 +21,8 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import org.apache.flink.streaming.api.invokable.util.TimeStamp;
import org.apache.flink.streaming.api.windowing.helper.Timestamp;
import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
import org.junit.Test;
public class TimeTriggerPolicyTest {
......@@ -33,29 +34,24 @@ public class TimeTriggerPolicyTest {
// create a timestamp
@SuppressWarnings("serial")
TimeStamp<Integer> timeStamp = new TimeStamp<Integer>() {
Timestamp<Integer> timeStamp = new Timestamp<Integer>() {
@Override
public long getTimestamp(Integer value) {
return value;
}
@Override
public long getStartTime() {
return 0;
}
};
// test different granularity
for (long granularity = 0; granularity < 31; granularity++) {
// create policy
TriggerPolicy<Integer> policy = new TimeTriggerPolicy<Integer>(granularity, timeStamp);
TriggerPolicy<Integer> policy = new TimeTriggerPolicy<Integer>(granularity,
new TimestampWrapper<Integer>(timeStamp, 0));
// remember window border
// Remark: This might NOT work in case the timeStamp uses
// System.getCurrentTimeMillis to determine the start time.
long currentTime = timeStamp.getStartTime();
long currentTime = 0;
// test by adding values
for (int i = 0; i < times.length; i++) {
......@@ -85,22 +81,18 @@ public class TimeTriggerPolicyTest {
// create a timestamp
@SuppressWarnings("serial")
TimeStamp<Integer> timeStamp = new TimeStamp<Integer>() {
Timestamp<Integer> timeStamp = new Timestamp<Integer>() {
@Override
public long getTimestamp(Integer value) {
return value;
}
@Override
public long getStartTime() {
return 0;
}
};
// create policy
TimeTriggerPolicy<Integer> policy = new TimeTriggerPolicy<Integer>(5, timeStamp);
TimeTriggerPolicy<Integer> policy = new TimeTriggerPolicy<Integer>(5,
new TimestampWrapper<Integer>(timeStamp, 0));
// expected result
Long[][] result = { {}, {}, { 4L, 9L, 14L, 19L }, { 24L } };
......
......@@ -49,10 +49,8 @@ public class DeltaExtractExample {
@SuppressWarnings({ "unchecked", "rawtypes" })
DataStream dstream = env
.addSource(new CountingSource())
.window(Delta.of(new EuclideanDistance(new FieldsFromTuple(0, 1)), new Tuple3(0d,
0d, "foo"), 1.2))
.every(Count.of(2))
.reduce(new ConcatStrings());
.window(Delta.of(1.2, new EuclideanDistance(new FieldsFromTuple(0, 1)), new Tuple3(
0d, 0d, "foo"))).every(Count.of(2)).reduce(new ConcatStrings());
// emit result
if (fileOutput) {
......
......@@ -18,6 +18,7 @@
package org.apache.flink.api.scala.streaming
import org.apache.flink.api.scala._
import org.apache.flink.api.scala.streaming.StreamExecutionEnvironment.clean
import org.apache.flink.streaming.api.datastream.{ DataStream => JavaStream }
import org.apache.flink.api.common.typeinfo.TypeInformation
import scala.reflect.ClassTag
......@@ -53,24 +54,6 @@ import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
class DataStream[T](javaStream: JavaStream[T]) {
/* This code is originally from the Apache Spark project. */
/**
* Clean a closure to make it ready to serialized and send to tasks
* (removes unreferenced variables in $outer's, updates REPL variables)
* If <tt>checkSerializable</tt> is set, <tt>clean</tt> will also proactively
* check to see if <tt>f</tt> is serializable and throw a <tt>SparkException</tt>
* if not.
*
* @param f the closure to clean
* @param checkSerializable whether or not to immediately check <tt>f</tt> for serializability
* @throws <tt>SparkException<tt> if <tt>checkSerializable</tt> is set but <tt>f</tt> is not
* serializable
*/
private[flink] def clean[F <: AnyRef](f: F, checkSerializable: Boolean = true): F = {
ClosureCleaner.clean(f, checkSerializable)
f
}
/**
* Gets the underlying java DataStream object.
*/
......
......@@ -32,7 +32,7 @@ import org.apache.flink.streaming.api.function.co.CrossWindowFunction
import org.apache.flink.api.common.functions.CrossFunction
import org.apache.flink.api.scala.typeutils.CaseClassSerializer
import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
import org.apache.flink.api.scala.streaming.StreamExecutionEnvironment.clean
class StreamCrossOperator[I1, I2](i1: JavaStream[I1], i2: JavaStream[I2]) extends
TemporalOperator[I1, I2, StreamCrossOperator.CrossWindow[I1, I2]](i1, i2) {
......
......@@ -161,9 +161,9 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
*/
def addSource[T: ClassTag: TypeInformation](function: SourceFunction[T]): DataStream[T] = {
Validate.notNull(function, "Function must not be null.")
ClosureCleaner.clean(function, true)
val cleanFun = StreamExecutionEnvironment.clean(function)
val typeInfo = implicitly[TypeInformation[T]]
new DataStream[T](javaEnv.addSource(function, typeInfo))
new DataStream[T](javaEnv.addSource(cleanFun, typeInfo))
}
/**
......@@ -174,8 +174,9 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
def addSource[T: ClassTag: TypeInformation](function: Collector[T] => Unit): DataStream[T] = {
Validate.notNull(function, "Function must not be null.")
val sourceFunction = new SourceFunction[T] {
val cleanFun = StreamExecutionEnvironment.clean(function)
override def invoke(out: Collector[T]) {
function(out)
cleanFun(out)
}
}
addSource(sourceFunction)
......@@ -205,6 +206,11 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
}
object StreamExecutionEnvironment {
private[flink] def clean[F <: AnyRef](f: F, checkSerializable: Boolean = true): F = {
ClosureCleaner.clean(f, checkSerializable)
f
}
/**
* Creates an execution environment that represents the context in which the program is
......
......@@ -33,6 +33,7 @@ import org.apache.commons.lang.Validate
import org.apache.flink.streaming.api.invokable.operator.co.CoWindowInvokable
import org.apache.flink.streaming.util.keys.KeySelectorUtil
import org.apache.flink.api.java.operators.Keys
import org.apache.flink.api.scala.streaming.StreamExecutionEnvironment.clean
class StreamJoinOperator[I1, I2](i1: JavaStream[I1], i2: JavaStream[I2]) extends
TemporalOperator[I1, I2, StreamJoinOperator.JoinWindow[I1, I2]](i1, i2) {
......
......@@ -40,14 +40,10 @@ import org.apache.flink.streaming.api.function.aggregation.AggregationFunction.A
import org.apache.flink.api.java.typeutils.TupleTypeInfoBase
import org.apache.flink.streaming.api.function.aggregation.SumFunction
import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
import org.apache.flink.api.scala.streaming.StreamExecutionEnvironment.clean
class WindowedDataStream[T](javaStream: JavaWStream[T]) {
private[flink] def clean[F <: AnyRef](f: F, checkSerializable: Boolean = true): F = {
ClosureCleaner.clean(f, checkSerializable)
f
}
/**
* Defines the slide size (trigger frequency) for the windowed data stream.
* This controls how often the user defined function will be triggered on
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.scala.streaming.windowing
import org.apache.flink.streaming.api.windowing.helper.{ Delta => JavaDelta }
import org.apache.commons.lang.Validate
import org.apache.flink.api.scala.streaming.StreamExecutionEnvironment.clean
import org.apache.flink.streaming.api.windowing.deltafunction.DeltaFunction
object Delta {
/**
* Creates a delta helper representing a delta trigger or eviction policy.
* </br></br> This policy calculates a delta between the data point which
* triggered last and the currently arrived data point. It triggers if the
* delta is higher than a specified threshold. </br></br> In case it gets
* used for eviction, this policy starts from the first element of the
* buffer and removes all elements from the buffer which have a higher delta
* then the threshold. As soon as there is an element with a lower delta,
* the eviction stops.
*/
def of[T](threshold: Double, deltaFunction: (T, T) => Double, initVal: T): JavaDelta[T] = {
Validate.notNull(deltaFunction, "Delta function must not be null")
val df = new DeltaFunction[T] {
val cleanFun = clean(deltaFunction)
override def getDelta(first: T, second: T) = cleanFun(first, second)
}
JavaDelta.of(threshold, df, initVal)
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.scala.streaming.windowing
import java.util.concurrent.TimeUnit
import org.apache.flink.streaming.api.windowing.helper.{ Time => JavaTime }
import org.apache.flink.api.scala.ClosureCleaner
import org.apache.commons.net.ntp.TimeStamp
import org.apache.flink.streaming.api.windowing.helper.Timestamp
import org.apache.flink.api.scala.streaming.StreamExecutionEnvironment.clean
import org.apache.commons.lang.Validate
object Time {
/**
* Creates a helper representing a time trigger which triggers every given
* length (slide size) or a time eviction which evicts all elements older
* than length (window size) using System time.
*
*/
def of(windowSize: Long, timeUnit: TimeUnit): JavaTime[_] =
JavaTime.of(windowSize, timeUnit)
/**
* Creates a helper representing a time trigger which triggers every given
* length (slide size) or a time eviction which evicts all elements older
* than length (window size) using a user defined timestamp extractor.
*
*/
def of[R](windowSize: Long, timestamp: R => Long, startTime: Long = 0): JavaTime[R] = {
Validate.notNull(timestamp, "Timestamp must not be null.")
val ts = new Timestamp[R] {
val fun = clean(timestamp, true)
override def getTimestamp(in: R) = fun(in)
}
JavaTime.of(windowSize, ts, startTime)
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册