提交 227e40fe 编写于 作者: J Jonas Traub (powibol) 提交者: mbalassi

[streaming] Make windowed data stream aware of time based trigger/eviction in...

[streaming] Make windowed data stream aware of time based trigger/eviction in tumbling window situations.

[streaming] Changed TimeEvictionPolicy to keep timestamps in the buffer instead of data-items
上级 6884a0ff
......@@ -36,6 +36,7 @@ import org.apache.flink.streaming.api.invokable.StreamInvokable;
import org.apache.flink.streaming.api.invokable.operator.GroupedWindowInvokable;
import org.apache.flink.streaming.api.invokable.operator.WindowGroupReduceInvokable;
import org.apache.flink.streaming.api.invokable.operator.WindowReduceInvokable;
import org.apache.flink.streaming.api.windowing.helper.Time;
import org.apache.flink.streaming.api.windowing.helper.WindowingHelper;
import org.apache.flink.streaming.api.windowing.policy.CloneableEvictionPolicy;
import org.apache.flink.streaming.api.windowing.policy.CloneableTriggerPolicy;
......@@ -490,7 +491,17 @@ public class WindowedDataStream<OUT> {
}
} else {
if (userEvicters == null) {
evicters.add(new TumblingEvictionPolicy<OUT>());
boolean notOnlyTime=false;
for (WindowingHelper<OUT> helper : triggerHelpers){
if (helper instanceof Time<?>){
evicters.add(helper.toEvict());
} else {
notOnlyTime=true;
}
}
if (notOnlyTime){
evicters.add(new TumblingEvictionPolicy<OUT>());
}
}
}
......
......@@ -41,7 +41,7 @@ public class TimeEvictionPolicy<DATA> implements ActiveEvictionPolicy<DATA>,
private long granularity;
private TimeStamp<DATA> timestamp;
private LinkedList<DATA> buffer = new LinkedList<DATA>();
private LinkedList<Long> buffer = new LinkedList<Long>();
/**
* This eviction policy evicts all elements which are older than a specified
......@@ -91,12 +91,15 @@ public class TimeEvictionPolicy<DATA> implements ActiveEvictionPolicy<DATA>,
checkForDeleted(bufferSize);
//remember timestamp
long time=timestamp.getTimestamp(datapoint);
// delete and count expired tuples
long threshold = timestamp.getTimestamp(datapoint) - granularity;
long threshold = time - granularity;
int counter = deleteAndCountExpired(threshold);
// Add current element to buffer
buffer.add(datapoint);
buffer.add(time);
// return result
return counter;
......@@ -114,7 +117,7 @@ public class TimeEvictionPolicy<DATA> implements ActiveEvictionPolicy<DATA>,
int counter = 0;
while (!buffer.isEmpty()) {
if (timestamp.getTimestamp(buffer.getFirst()) <= threshold) {
if (buffer.getFirst() <= threshold) {
buffer.removeFirst();
counter++;
} else {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册