提交 34a8b03d 编写于 作者: K kl0u 提交者: Aljoscha Krettek

[FLINK-3714] Add Support for "Allowed Lateness"

Handle late elements and take care
of cleaning the window state.
上级 f2e9c521
......@@ -42,6 +42,7 @@ import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.api.windowing.evictors.Evictor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.streaming.runtime.operators.windowing.EvictingWindowOperator;
......@@ -87,6 +88,8 @@ public class AllWindowedStream<T, W extends Window> {
/** The evictor that is used for evicting elements before window evaluation. */
private Evictor<? super T, ? super W> evictor;
/** The user-specified allowed lateness. */
private long allowedLateness = Long.MAX_VALUE;
@PublicEvolving
public AllWindowedStream(DataStream<T> input,
......@@ -109,6 +112,26 @@ public class AllWindowedStream<T, W extends Window> {
return this;
}
/**
* Sets the allowed lateness to a user-specified value.
* If not explicitly set, the allowed lateness is {@link Long#MAX_VALUE}.
* Setting the allowed lateness is only valid for event-time windows.
* If a value different than 0 is provided with a processing-time
* {@link WindowAssigner}, then an exception is thrown.
*/
@PublicEvolving
public AllWindowedStream<T, W> allowedLateness(Time lateness) {
long millis = lateness.toMilliseconds();
if (allowedLateness < 0) {
throw new IllegalArgumentException("The allowed lateness cannot be negative.");
} else if (allowedLateness != 0 && !windowAssigner.isEventTime()) {
throw new IllegalArgumentException("Setting the allowed lateness is only valid for event-time windows.");
} else {
this.allowedLateness = millis;
}
return this;
}
/**
* Sets the {@code Evictor} that should be used to evict elements from a window before emission.
*
......@@ -251,14 +274,16 @@ public class AllWindowedStream<T, W extends Window> {
opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + evictor + ", " + udfName + ")";
operator = new EvictingWindowOperator<>(windowAssigner,
operator =
new EvictingWindowOperator<>(windowAssigner,
windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
keySel,
input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
stateDesc,
new InternalIterableAllWindowFunction<>(function),
trigger,
evictor);
evictor,
allowedLateness);
} else {
ListStateDescriptor<T> stateDesc = new ListStateDescriptor<>("window-contents",
......@@ -266,13 +291,15 @@ public class AllWindowedStream<T, W extends Window> {
opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + udfName + ")";
operator = new WindowOperator<>(windowAssigner,
operator =
new WindowOperator<>(windowAssigner,
windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
keySel,
input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
stateDesc,
new InternalIterableAllWindowFunction<>(function),
trigger);
trigger,
allowedLateness);
}
return input.transform(opName, resultType, operator).setParallelism(1);
......@@ -335,14 +362,16 @@ public class AllWindowedStream<T, W extends Window> {
opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + evictor + ", " + udfName + ")";
operator = new EvictingWindowOperator<>(windowAssigner,
operator =
new EvictingWindowOperator<>(windowAssigner,
windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
keySel,
input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
stateDesc,
new InternalIterableAllWindowFunction<>(new ReduceApplyAllWindowFunction<>(reduceFunction, function)),
trigger,
evictor);
evictor,
allowedLateness);
} else {
ReducingStateDescriptor<T> stateDesc = new ReducingStateDescriptor<>("window-contents",
......@@ -351,13 +380,15 @@ public class AllWindowedStream<T, W extends Window> {
opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + udfName + ")";
operator = new WindowOperator<>(windowAssigner,
operator =
new WindowOperator<>(windowAssigner,
windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
keySel,
input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
stateDesc,
new InternalSingleValueAllWindowFunction<>(function),
trigger);
trigger,
allowedLateness);
}
return input.transform(opName, resultType, operator).setParallelism(1);
......@@ -425,14 +456,16 @@ public class AllWindowedStream<T, W extends Window> {
opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + evictor + ", " + udfName + ")";
operator = new EvictingWindowOperator<>(windowAssigner,
operator =
new EvictingWindowOperator<>(windowAssigner,
windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
keySel,
input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
stateDesc,
new InternalIterableAllWindowFunction<>(new FoldApplyAllWindowFunction<>(initialValue, foldFunction, function)),
trigger,
evictor);
evictor,
allowedLateness);
} else {
FoldingStateDescriptor<T, R> stateDesc = new FoldingStateDescriptor<>("window-contents",
......@@ -442,13 +475,15 @@ public class AllWindowedStream<T, W extends Window> {
opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + udfName + ")";
operator = new WindowOperator<>(windowAssigner,
operator =
new WindowOperator<>(windowAssigner,
windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
keySel,
input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
stateDesc,
new InternalSingleValueAllWindowFunction<>(function),
trigger);
trigger,
allowedLateness);
}
return input.transform(opName, resultType, operator).setParallelism(1);
......
......@@ -45,6 +45,7 @@ import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeW
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.api.windowing.evictors.Evictor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
......@@ -98,6 +99,8 @@ public class WindowedStream<T, K, W extends Window> {
/** The evictor that is used for evicting elements before window evaluation. */
private Evictor<? super T, ? super W> evictor;
/** The user-specified allowed lateness. */
private long allowedLateness = Long.MAX_VALUE;
@PublicEvolving
public WindowedStream(KeyedStream<T, K> input,
......@@ -120,6 +123,26 @@ public class WindowedStream<T, K, W extends Window> {
return this;
}
/**
* Sets the allowed lateness to a user-specified value.
* If not explicitly set, the allowed lateness is {@link Long#MAX_VALUE}.
* Setting the allowed lateness is only valid for event-time windows.
* If a value different than 0 is provided with a processing-time
* {@link WindowAssigner}, then an exception is thrown.
*/
@PublicEvolving
public WindowedStream<T, K, W> allowedLateness(Time lateness) {
long millis = lateness.toMilliseconds();
if (allowedLateness < 0) {
throw new IllegalArgumentException("The allowed lateness cannot be negative.");
} else if (allowedLateness != 0 && !windowAssigner.isEventTime()) {
throw new IllegalArgumentException("Setting the allowed lateness is only valid for event-time windows.");
} else {
this.allowedLateness = millis;
}
return this;
}
/**
* Sets the {@code Evictor} that should be used to evict elements from a window before emission.
*
......@@ -272,14 +295,16 @@ public class WindowedStream<T, K, W extends Window> {
opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + evictor + ", " + udfName + ")";
operator = new EvictingWindowOperator<>(windowAssigner,
operator =
new EvictingWindowOperator<>(windowAssigner,
windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
keySel,
input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
stateDesc,
new InternalIterableWindowFunction<>(function),
trigger,
evictor);
evictor,
allowedLateness);
} else {
ListStateDescriptor<T> stateDesc = new ListStateDescriptor<>("window-contents",
......@@ -287,13 +312,15 @@ public class WindowedStream<T, K, W extends Window> {
opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + udfName + ")";
operator = new WindowOperator<>(windowAssigner,
operator =
new WindowOperator<>(windowAssigner,
windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
keySel,
input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
stateDesc,
new InternalIterableWindowFunction<>(function),
trigger);
trigger,
allowedLateness);
}
return input.transform(opName, resultType, operator);
......@@ -356,14 +383,16 @@ public class WindowedStream<T, K, W extends Window> {
opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + evictor + ", " + udfName + ")";
operator = new EvictingWindowOperator<>(windowAssigner,
operator =
new EvictingWindowOperator<>(windowAssigner,
windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
keySel,
input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
stateDesc,
new InternalIterableWindowFunction<>(new ReduceApplyWindowFunction<>(reduceFunction, function)),
trigger,
evictor);
evictor,
allowedLateness);
} else {
ReducingStateDescriptor<T> stateDesc = new ReducingStateDescriptor<>("window-contents",
......@@ -372,13 +401,15 @@ public class WindowedStream<T, K, W extends Window> {
opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + udfName + ")";
operator = new WindowOperator<>(windowAssigner,
operator =
new WindowOperator<>(windowAssigner,
windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
keySel,
input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
stateDesc,
new InternalSingleValueWindowFunction<>(function),
trigger);
trigger,
allowedLateness);
}
return input.transform(opName, resultType, operator);
......@@ -453,7 +484,8 @@ public class WindowedStream<T, K, W extends Window> {
stateDesc,
new InternalIterableWindowFunction<>(new FoldApplyWindowFunction<>(initialValue, foldFunction, function)),
trigger,
evictor);
evictor,
allowedLateness);
} else {
FoldingStateDescriptor<T, R> stateDesc = new FoldingStateDescriptor<>("window-contents",
......@@ -469,7 +501,8 @@ public class WindowedStream<T, K, W extends Window> {
input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
stateDesc,
new InternalSingleValueWindowFunction<>(function),
trigger);
trigger,
allowedLateness);
}
return input.transform(opName, resultType, operator);
......
......@@ -81,6 +81,11 @@ public class EventTimeSessionWindows extends MergingWindowAssigner<Object, TimeW
return new TimeWindow.Serializer();
}
@Override
public boolean isEventTime() {
return true;
}
/**
* Merge overlapping {@link TimeWindow}s.
*/
......
......@@ -102,4 +102,9 @@ public class GlobalWindows extends WindowAssigner<Object, GlobalWindow> {
public TypeSerializer<GlobalWindow> getWindowSerializer(ExecutionConfig executionConfig) {
return new GlobalWindow.Serializer();
}
@Override
public boolean isEventTime() {
return false;
}
}
......@@ -81,6 +81,11 @@ public class ProcessingTimeSessionWindows extends MergingWindowAssigner<Object,
return new TimeWindow.Serializer();
}
@Override
public boolean isEventTime() {
return false;
}
/**
* Merge overlapping {@link TimeWindow}s.
*/
......
......@@ -109,4 +109,9 @@ public class SlidingEventTimeWindows extends WindowAssigner<Object, TimeWindow>
public TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig executionConfig) {
return new TimeWindow.Serializer();
}
@Override
public boolean isEventTime() {
return true;
}
}
......@@ -101,4 +101,9 @@ public class SlidingProcessingTimeWindows extends WindowAssigner<Object, TimeWin
public TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig executionConfig) {
return new TimeWindow.Serializer();
}
@Override
public boolean isEventTime() {
return false;
}
}
......@@ -95,4 +95,9 @@ public class TumblingEventTimeWindows extends WindowAssigner<Object, TimeWindow>
public TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig executionConfig) {
return new TimeWindow.Serializer();
}
@Override
public boolean isEventTime() {
return true;
}
}
......@@ -86,4 +86,9 @@ public class TumblingProcessingTimeWindows extends WindowAssigner<Object, TimeWi
public TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig executionConfig) {
return new TimeWindow.Serializer();
}
@Override
public boolean isEventTime() {
return false;
}
}
......@@ -63,4 +63,10 @@ public abstract class WindowAssigner<T, W extends Window> implements Serializabl
* this {@code WindowAssigner}.
*/
public abstract TypeSerializer<W> getWindowSerializer(ExecutionConfig executionConfig);
/**
* Returns {@code true} if elements are assigned to windows based on event time,
* {@code false} otherwise.
*/
public abstract boolean isEventTime();
}
......@@ -22,7 +22,7 @@ import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
/**
* An {@link Evictor} that keeps only a certain amount of elements.
* An {@link Evictor} that keeps up to a certain amount of elements.
*
* @param <W> The type of {@link Window Windows} on which this {@code Evictor} can operate.
*/
......
......@@ -36,12 +36,17 @@ public class EventTimeTrigger extends Trigger<Object, TimeWindow> {
@Override
public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
ctx.registerEventTimeTimer(window.maxTimestamp());
return TriggerResult.CONTINUE;
return (window.maxTimestamp() <= ctx.getCurrentWatermark()) ?
TriggerResult.FIRE_AND_PURGE :
TriggerResult.CONTINUE;
}
@Override
public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
return TriggerResult.FIRE_AND_PURGE;
return time == window.maxTimestamp() ?
TriggerResult.FIRE_AND_PURGE :
TriggerResult.CONTINUE;
}
@Override
......
......@@ -29,6 +29,7 @@ import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.api.windowing.evictors.Evictor;
......@@ -70,8 +71,11 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window
StateDescriptor<? extends ListState<StreamRecord<IN>>, ?> windowStateDescriptor,
InternalWindowFunction<Iterable<IN>, OUT, K, W> windowFunction,
Trigger<? super IN, ? super W> trigger,
Evictor<? super IN, ? super W> evictor) {
super(windowAssigner, windowSerializer, keySelector, keySerializer, null, windowFunction, trigger);
Evictor<? super IN, ? super W> evictor,
long allowedLateness) {
super(windowAssigner, windowSerializer, keySelector,
keySerializer, null, windowFunction, trigger, allowedLateness);
this.evictor = requireNonNull(evictor);
this.windowStateDescriptor = windowStateDescriptor;
}
......@@ -79,8 +83,10 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window
@Override
@SuppressWarnings("unchecked")
public void processElement(StreamRecord<IN> element) throws Exception {
Collection<W> elementWindows = windowAssigner.assignWindows(element.getValue(),
element.getTimestamp());
Collection<W> elementWindows = windowAssigner.assignWindows(
element.getValue(),
element.getTimestamp());
final K key = (K) getStateBackend().getCurrentKey();
......@@ -112,20 +118,27 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window
for (W m : mergedWindows) {
context.window = m;
context.clear();
deleteCleanupTimer(m);
}
// merge the merged state windows into the newly resulting state window
getStateBackend().mergePartitionedStates(stateWindowResult,
mergedStateWindows,
windowSerializer,
(StateDescriptor<? extends MergingState<?, ?>, ?>) windowStateDescriptor);
getStateBackend().mergePartitionedStates(
stateWindowResult,
mergedStateWindows,
windowSerializer,
(StateDescriptor<? extends MergingState<?, ?>, ?>) windowStateDescriptor);
}
});
// check if the window is already inactive
if (isLate(actualWindow)) {
LOG.info("Dropped element " + element + " for window " + actualWindow + " due to lateness.");
continue;
}
W stateWindow = mergingWindows.getStateWindow(actualWindow);
ListState<StreamRecord<IN>> windowState = getPartitionedState(stateWindow,
windowSerializer,
windowStateDescriptor);
ListState<StreamRecord<IN>> windowState = getPartitionedState(
stateWindow, windowSerializer, windowStateDescriptor);
windowState.add(element);
context.key = key;
......@@ -134,77 +147,166 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window
// we might have already fired because of a merge but still call onElement
// on the (possibly merged) window
TriggerResult triggerResult = context.onElement(element);
TriggerResult combinedTriggerResult = TriggerResult.merge(triggerResult,
mergeTriggerResult.f0);
processTriggerResult(combinedTriggerResult, actualWindow);
TriggerResult combinedTriggerResult = TriggerResult.merge(triggerResult, mergeTriggerResult.f0);
fireOrContinue(combinedTriggerResult, actualWindow, windowState);
if (combinedTriggerResult.isPurge()) {
cleanup(actualWindow, windowState, mergingWindows);
} else {
registerCleanupTimer(actualWindow);
}
}
} else {
for (W window : elementWindows) {
ListState<StreamRecord<IN>> windowState = getPartitionedState(window,
windowSerializer,
windowStateDescriptor);
// check if the window is already inactive
if (isLate(window)) {
LOG.info("Dropped element " + element + " for window " + window + " due to lateness.");
continue;
}
ListState<StreamRecord<IN>> windowState = getPartitionedState(
window, windowSerializer, windowStateDescriptor);
windowState.add(element);
context.key = key;
context.window = window;
TriggerResult triggerResult = context.onElement(element);
fireOrContinue(triggerResult, window, windowState);
processTriggerResult(triggerResult, window);
if (triggerResult.isPurge()) {
cleanup(window, windowState, null);
} else {
registerCleanupTimer(window);
}
}
}
}
@Override
@SuppressWarnings("unchecked,rawtypes")
protected void processTriggerResult(TriggerResult triggerResult, W window) throws Exception {
if (!triggerResult.isFire() && !triggerResult.isPurge()) {
// do nothing
return;
}
public void processWatermark(Watermark mark) throws Exception {
boolean fire;
do {
Timer<K, W> timer = watermarkTimersQueue.peek();
if (timer != null && timer.timestamp <= mark.getTimestamp()) {
fire = true;
watermarkTimers.remove(timer);
watermarkTimersQueue.remove();
context.key = timer.key;
context.window = timer.window;
setKeyContext(timer.key);
ListState<StreamRecord<IN>> windowState;
MergingWindowSet<W> mergingWindows = null;
if (windowAssigner instanceof MergingWindowAssigner) {
mergingWindows = getMergingWindowSet();
W stateWindow = mergingWindows.getStateWindow(context.window);
windowState = getPartitionedState(stateWindow, windowSerializer, windowStateDescriptor);
} else {
windowState = getPartitionedState(context.window, windowSerializer, windowStateDescriptor);
}
TriggerResult triggerResult = context.onEventTime(timer.timestamp);
fireOrContinue(triggerResult, context.window, windowState);
if (triggerResult.isPurge() || (windowAssigner.isEventTime() && isCleanupTime(timer.window, timer.timestamp))) {
cleanup(timer.window, windowState, mergingWindows);
}
} else {
fire = false;
}
} while (fire);
ListState<StreamRecord<IN>> windowState;
output.emitWatermark(mark);
MergingWindowSet<W> mergingWindows = null;
this.currentWatermark = mark.getTimestamp();
}
if (windowAssigner instanceof MergingWindowAssigner) {
mergingWindows = getMergingWindowSet();
W stateWindow = mergingWindows.getStateWindow(window);
windowState = getPartitionedState(stateWindow, windowSerializer, windowStateDescriptor);
@Override
public void trigger(long time) throws Exception {
boolean fire;
} else {
windowState = getPartitionedState(window, windowSerializer, windowStateDescriptor);
}
//Remove information about the triggering task
processingTimeTimerFutures.remove(time);
processingTimeTimerTimestamps.remove(time, processingTimeTimerTimestamps.count(time));
if (triggerResult.isFire()) {
timestampedCollector.setAbsoluteTimestamp(window.maxTimestamp());
Iterable<StreamRecord<IN>> contents = windowState.get();
// Work around type system restrictions...
int toEvict = evictor.evict((Iterable) contents, Iterables.size(contents), context.window);
FluentIterable<IN> projectedContents = FluentIterable
.from(contents)
.skip(toEvict)
.transform(new Function<StreamRecord<IN>, IN>() {
@Override
public IN apply(StreamRecord<IN> input) {
return input.getValue();
}
});
userFunction.apply(context.key, context.window, projectedContents, timestampedCollector);
}
if (triggerResult.isPurge()) {
windowState.clear();
if (mergingWindows != null) {
mergingWindows.retireWindow(window);
do {
Timer<K, W> timer = processingTimeTimersQueue.peek();
if (timer != null && timer.timestamp <= time) {
fire = true;
processingTimeTimers.remove(timer);
processingTimeTimersQueue.remove();
context.key = timer.key;
context.window = timer.window;
setKeyContext(timer.key);
ListState<StreamRecord<IN>> windowState;
MergingWindowSet<W> mergingWindows = null;
if (windowAssigner instanceof MergingWindowAssigner) {
mergingWindows = getMergingWindowSet();
W stateWindow = mergingWindows.getStateWindow(context.window);
windowState = getPartitionedState(stateWindow, windowSerializer, windowStateDescriptor);
} else {
windowState = getPartitionedState(context.window, windowSerializer, windowStateDescriptor);
}
TriggerResult triggerResult = context.onProcessingTime(timer.timestamp);
fireOrContinue(triggerResult, context.window, windowState);
if (triggerResult.isPurge() || (!windowAssigner.isEventTime() && isCleanupTime(timer.window, timer.timestamp))) {
cleanup(timer.window, windowState, mergingWindows);
}
} else {
fire = false;
}
context.clear();
} while (fire);
}
private void fireOrContinue(TriggerResult triggerResult,
W window,
ListState<StreamRecord<IN>> windowState) throws Exception {
if (!triggerResult.isFire()) {
return;
}
timestampedCollector.setAbsoluteTimestamp(window.maxTimestamp());
Iterable<StreamRecord<IN>> contents = windowState.get();
// Work around type system restrictions...
int toEvict = evictor.evict((Iterable) contents, Iterables.size(contents), context.window);
FluentIterable<IN> projectedContents = FluentIterable
.from(contents)
.skip(toEvict)
.transform(new Function<StreamRecord<IN>, IN>() {
@Override
public IN apply(StreamRecord<IN> input) {
return input.getValue();
}
});
userFunction.apply(context.key, context.window, projectedContents, timestampedCollector);
}
private void cleanup(W window,
ListState<StreamRecord<IN>> windowState,
MergingWindowSet<W> mergingWindows) throws Exception {
windowState.clear();
if (mergingWindows != null) {
mergingWindows.retireWindow(window);
}
context.clear();
deleteCleanupTimer(window);
}
// ------------------------------------------------------------------------
......
......@@ -59,6 +59,7 @@ import org.apache.flink.streaming.runtime.operators.Triggerable;
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalWindowFunction;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
import org.apache.flink.util.Preconditions;
import java.io.IOException;
import java.io.ObjectInputStream;
......@@ -78,9 +79,9 @@ import static java.util.Objects.requireNonNull;
* {@link Trigger}.
*
* <p>
* When an element arrives it gets assigned a key using a {@link KeySelector} and it get's
* When an element arrives it gets assigned a key using a {@link KeySelector} and it gets
* assigned to zero or more windows using a {@link WindowAssigner}. Based on this the element
* is put into panes. A pane is the bucket of elements that have the same key and same
* is put into panes. A pane is the bucket of elements that have the same key and belong to the same
* {@code Window}. An element can be in multiple panes of it was assigned to multiple windows by the
* {@code WindowAssigner}.
*
......@@ -130,6 +131,16 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
*/
protected final TypeSerializer<W> windowSerializer;
/**
* The allowed lateness for elements. This is used for:
* <ul>
* <li>Deciding if an element should be dropped from a window due to lateness.
* <li>Clearing the state of a window if the system time passes the
* {@code window.maxTimestamp + allowedLateness} landmark.
* </ul>
*/
protected final long allowedLateness;
// ------------------------------------------------------------------------
// State that is not checkpointed
// ------------------------------------------------------------------------
......@@ -177,7 +188,8 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
TypeSerializer<K> keySerializer,
StateDescriptor<? extends AppendingState<IN, ACC>, ?> windowStateDescriptor,
InternalWindowFunction<ACC, OUT, K, W> windowFunction,
Trigger<? super IN, ? super W> trigger) {
Trigger<? super IN, ? super W> trigger,
long allowedLateness) {
super(windowFunction);
......@@ -189,6 +201,9 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
this.windowStateDescriptor = windowStateDescriptor;
this.trigger = requireNonNull(trigger);
Preconditions.checkArgument(allowedLateness >= 0);
this.allowedLateness = allowedLateness;
setChainingStrategy(ChainingStrategy.ALWAYS);
}
......@@ -264,7 +279,10 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
@Override
@SuppressWarnings("unchecked")
public void processElement(StreamRecord<IN> element) throws Exception {
Collection<W> elementWindows = windowAssigner.assignWindows(element.getValue(), element.getTimestamp());
Collection<W> elementWindows = windowAssigner.assignWindows(
element.getValue(),
element.getTimestamp());
final K key = (K) getStateBackend().getCurrentKey();
......@@ -276,7 +294,6 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
// element because we always eagerly merge
final Tuple1<TriggerResult> mergeTriggerResult = new Tuple1<>(TriggerResult.CONTINUE);
// adding the new window might result in a merge, in that case the actualWindow
// is the merged window and we work with that. If we don't merge then
// actualWindow == window
......@@ -294,18 +311,27 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
for (W m: mergedWindows) {
context.window = m;
context.clear();
deleteCleanupTimer(m);
}
// merge the merged state windows into the newly resulting state window
getStateBackend().mergePartitionedStates(stateWindowResult,
mergedStateWindows,
windowSerializer,
(StateDescriptor<? extends MergingState<?,?>, ?>) windowStateDescriptor);
getStateBackend().mergePartitionedStates(
stateWindowResult,
mergedStateWindows,
windowSerializer,
(StateDescriptor<? extends MergingState<?,?>, ?>) windowStateDescriptor);
}
});
// check if the window is already inactive
if (isLate(actualWindow)) {
LOG.info("Dropped element " + element+ " for window " + actualWindow + " due to lateness.");
continue;
}
W stateWindow = mergingWindows.getStateWindow(actualWindow);
AppendingState<IN, ACC> windowState = getPartitionedState(stateWindow, windowSerializer, windowStateDescriptor);
AppendingState<IN, ACC> windowState = getPartitionedState(
stateWindow, windowSerializer, windowStateDescriptor);
windowState.add(element.getValue());
context.key = key;
......@@ -314,104 +340,46 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
// we might have already fired because of a merge but still call onElement
// on the (possibly merged) window
TriggerResult triggerResult = context.onElement(element);
TriggerResult combinedTriggerResult = TriggerResult.merge(triggerResult, mergeTriggerResult.f0);
fireOrContinue(combinedTriggerResult, actualWindow, windowState);
processTriggerResult(combinedTriggerResult, actualWindow);
if (combinedTriggerResult.isPurge()) {
cleanup(actualWindow, windowState, mergingWindows);
} else {
registerCleanupTimer(actualWindow);
}
}
} else {
for (W window: elementWindows) {
AppendingState<IN, ACC> windowState = getPartitionedState(window, windowSerializer,
windowStateDescriptor);
// check if the window is already inactive
if (isLate(window)) {
LOG.info("Dropped element " + element + " for window " + window + " due to lateness.");
continue;
}
AppendingState<IN, ACC> windowState = getPartitionedState(
window, windowSerializer, windowStateDescriptor);
windowState.add(element.getValue());
context.key = key;
context.window = window;
TriggerResult triggerResult = context.onElement(element);
processTriggerResult(triggerResult, window);
}
}
}
/**
* Retrieves the {@link MergingWindowSet} for the currently active key. The caller must
* ensure that the correct key is set in the state backend.
*/
@SuppressWarnings("unchecked")
protected MergingWindowSet<W> getMergingWindowSet() throws Exception {
MergingWindowSet<W> mergingWindows = mergingWindowsByKey.get((K) getStateBackend().getCurrentKey());
if (mergingWindows == null) {
// try to retrieve from state
TupleSerializer<Tuple2<W, W>> tupleSerializer = new TupleSerializer<>((Class) Tuple2.class, new TypeSerializer[] {windowSerializer, windowSerializer} );
ListStateDescriptor<Tuple2<W, W>> mergeStateDescriptor = new ListStateDescriptor<>("merging-window-set", tupleSerializer);
ListState<Tuple2<W, W>> mergeState = getStateBackend().getPartitionedState(null, VoidSerializer.INSTANCE, mergeStateDescriptor);
mergingWindows = new MergingWindowSet<>((MergingWindowAssigner<? super IN, W>) windowAssigner, mergeState);
mergeState.clear();
mergingWindowsByKey.put((K) getStateBackend().getCurrentKey(), mergingWindows);
}
return mergingWindows;
}
/**
* Process {@link TriggerResult} for the currently active key and the given window. The caller
* must ensure that the correct key is set in the state backend and the context object.
*/
@SuppressWarnings("unchecked")
protected void processTriggerResult(TriggerResult triggerResult, W window) throws Exception {
if (!triggerResult.isFire() && !triggerResult.isPurge()) {
// do nothing
return;
}
AppendingState<IN, ACC> windowState;
MergingWindowSet<W> mergingWindows = null;
if (windowAssigner instanceof MergingWindowAssigner) {
mergingWindows = getMergingWindowSet();
W stateWindow = mergingWindows.getStateWindow(window);
windowState = getPartitionedState(stateWindow, windowSerializer, windowStateDescriptor);
} else {
windowState = getPartitionedState(window, windowSerializer, windowStateDescriptor);
}
if (triggerResult.isFire()) {
timestampedCollector.setAbsoluteTimestamp(window.maxTimestamp());
ACC contents = windowState.get();
userFunction.apply(context.key, context.window, contents, timestampedCollector);
TriggerResult triggerResult = context.onElement(element);
fireOrContinue(triggerResult, window, windowState);
}
if (triggerResult.isPurge()) {
windowState.clear();
if (mergingWindows != null) {
mergingWindows.retireWindow(window);
if (triggerResult.isPurge()) {
cleanup(window, windowState, null);
} else {
registerCleanupTimer(window);
}
}
context.clear();
}
}
@Override
public final void processWatermark(Watermark mark) throws Exception {
processTriggersFor(mark);
output.emitWatermark(mark);
this.currentWatermark = mark.getTimestamp();
}
private void processTriggersFor(Watermark mark) throws Exception {
public void processWatermark(Watermark mark) throws Exception {
boolean fire;
do {
Timer<K, W> timer = watermarkTimersQueue.peek();
if (timer != null && timer.timestamp <= mark.getTimestamp()) {
......@@ -423,16 +391,37 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
context.key = timer.key;
context.window = timer.window;
setKeyContext(timer.key);
AppendingState<IN, ACC> windowState;
MergingWindowSet<W> mergingWindows = null;
if (windowAssigner instanceof MergingWindowAssigner) {
mergingWindows = getMergingWindowSet();
W stateWindow = mergingWindows.getStateWindow(context.window);
windowState = getPartitionedState(stateWindow, windowSerializer, windowStateDescriptor);
} else {
windowState = getPartitionedState(context.window, windowSerializer, windowStateDescriptor);
}
TriggerResult triggerResult = context.onEventTime(timer.timestamp);
processTriggerResult(triggerResult, context.window);
fireOrContinue(triggerResult, context.window, windowState);
if (triggerResult.isPurge() || (windowAssigner.isEventTime() && isCleanupTime(timer.window, timer.timestamp))) {
cleanup(timer.window, windowState, mergingWindows);
}
} else {
fire = false;
}
} while (fire);
output.emitWatermark(mark);
this.currentWatermark = mark.getTimestamp();
}
@Override
public final void trigger(long time) throws Exception {
public void trigger(long time) throws Exception {
boolean fire;
//Remove information about the triggering task
......@@ -450,17 +439,154 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
context.key = timer.key;
context.window = timer.window;
setKeyContext(timer.key);
AppendingState<IN, ACC> windowState;
MergingWindowSet<W> mergingWindows = null;
if (windowAssigner instanceof MergingWindowAssigner) {
mergingWindows = getMergingWindowSet();
W stateWindow = mergingWindows.getStateWindow(context.window);
windowState = getPartitionedState(stateWindow, windowSerializer, windowStateDescriptor);
} else {
windowState = getPartitionedState(context.window, windowSerializer, windowStateDescriptor);
}
TriggerResult triggerResult = context.onProcessingTime(timer.timestamp);
processTriggerResult(triggerResult, context.window);
fireOrContinue(triggerResult, context.window, windowState);
if (triggerResult.isPurge() || (!windowAssigner.isEventTime() && isCleanupTime(timer.window, timer.timestamp))) {
cleanup(timer.window, windowState, mergingWindows);
}
} else {
fire = false;
}
} while (fire);
}
/**
* Cleans up the window state if the provided {@link TriggerResult} requires so, or if it
* is time to do so (see {@link #isCleanupTime(Window, long)}). The caller must ensure that the
* correct key is set in the state backend and the context object.
*/
private void cleanup(W window,
AppendingState<IN, ACC> windowState,
MergingWindowSet<W> mergingWindows) throws Exception {
windowState.clear();
if (mergingWindows != null) {
mergingWindows.retireWindow(window);
}
context.clear();
deleteCleanupTimer(window);
}
/**
* Triggers the window computation if the provided {@link TriggerResult} requires so.
* The caller must ensure that the correct key is set in the state backend and the context object.
*/
@SuppressWarnings("unchecked")
private void fireOrContinue(TriggerResult triggerResult,
W window,
AppendingState<IN, ACC> windowState) throws Exception {
if (!triggerResult.isFire()) {
return;
}
timestampedCollector.setAbsoluteTimestamp(window.maxTimestamp());
ACC contents = windowState.get();
userFunction.apply(context.key, context.window, contents, timestampedCollector);
}
/**
* Retrieves the {@link MergingWindowSet} for the currently active key.
* The caller must ensure that the correct key is set in the state backend.
*/
@SuppressWarnings("unchecked")
protected MergingWindowSet<W> getMergingWindowSet() throws Exception {
MergingWindowSet<W> mergingWindows = mergingWindowsByKey.get((K) getStateBackend().getCurrentKey());
if (mergingWindows == null) {
// try to retrieve from state
TupleSerializer<Tuple2<W, W>> tupleSerializer = new TupleSerializer<>((Class) Tuple2.class, new TypeSerializer[] {windowSerializer, windowSerializer} );
ListStateDescriptor<Tuple2<W, W>> mergeStateDescriptor = new ListStateDescriptor<>("merging-window-set", tupleSerializer);
ListState<Tuple2<W, W>> mergeState = getStateBackend().getPartitionedState(null, VoidSerializer.INSTANCE, mergeStateDescriptor);
mergingWindows = new MergingWindowSet<>((MergingWindowAssigner<? super IN, W>) windowAssigner, mergeState);
mergeState.clear();
mergingWindowsByKey.put((K) getStateBackend().getCurrentKey(), mergingWindows);
}
return mergingWindows;
}
// Also check any watermark timers. We might have some in here since
// Context.registerEventTimeTimer sets a trigger if an event-time trigger is registered
// that is already behind the watermark.
processTriggersFor(new Watermark(currentWatermark));
/**
* Decides if a window is currently late or not, based on the current
* watermark, i.e. the current event time, and the allowed lateness.
* @param window
* The collection of windows returned by the {@link WindowAssigner}.
* @return The windows (among the {@code eligibleWindows}) for which the element should still be
* considered when triggering.
*/
protected boolean isLate(W window) {
return (windowAssigner.isEventTime() && (getCleanupTimeForWindow(window) <= currentWatermark));
}
/**
* Registers a timer to cleanup the content of the window.
* @param window
* the window whose state to discard
*/
protected void registerCleanupTimer(W window) {
long cleanupTime = getCleanupTimeForWindow(window);
if (windowAssigner.isEventTime()) {
context.registerEventTimeTimer(cleanupTime);
} else {
context.registerProcessingTimeTimer(cleanupTime);
}
}
/**
* Deletes the cleanup timer set for the contents of the provided window.
* @param window
* the window whose state to discard
*/
protected void deleteCleanupTimer(W window) {
long cleanupTime = getCleanupTimeForWindow(window);
if (windowAssigner.isEventTime()) {
context.deleteEventTimeTimer(cleanupTime);
} else {
context.deleteProcessingTimeTimer(cleanupTime);
}
}
/**
* Returns the cleanup time for a window, which is
* {@code window.maxTimestamp + allowedLateness}. In
* case this leads to a value greated than {@link Long#MAX_VALUE}
* then a cleanup time of {@link Long#MAX_VALUE} is
* returned.
*
* @param window the window whose cleanup time we are computing.
*/
private long getCleanupTimeForWindow(W window) {
long cleanupTime = window.maxTimestamp() + allowedLateness;
return cleanupTime >= window.maxTimestamp() ? cleanupTime : Long.MAX_VALUE;
}
/**
* Decides if it is time to clean up the window state.
* Clean up time for a window is:
* <li> if it is event time, after the watermark passes the end of the window plus the user-specified allowed lateness
* <li> if it is processing time, after the processing time at the node passes the end of the window.
* @param window
* the window to clean
* @param time
* the current time (event or processing depending on the {@link WindowAssigner}
* @return {@code true} if it is time to clean up the window state, {@code false} otherwise.
*/
protected final boolean isCleanupTime(W window, long time) {
long cleanupTime = getCleanupTimeForWindow(window);
return cleanupTime == time;
}
/**
......@@ -562,13 +688,6 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
if (watermarkTimers.add(timer)) {
watermarkTimersQueue.add(timer);
}
if (time <= currentWatermark) {
// immediately schedule a trigger, so that we don't wait for the next
// watermark update to fire the watermark trigger
getRuntimeContext().registerTimer(System.currentTimeMillis(), WindowOperator.this);
//No need to put it in processingTimeTimerFutures as this timer is never removed
}
}
@Override
......@@ -594,7 +713,6 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
if (watermarkTimers.remove(timer)) {
watermarkTimersQueue.remove(timer);
}
}
public TriggerResult onElement(StreamRecord<IN> element) throws Exception {
......@@ -627,7 +745,6 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
}
}
/**
* Internal class for keeping track of in-flight timers.
*/
......
......@@ -30,9 +30,13 @@ import org.apache.flink.streaming.api.functions.windowing.ReduceIterableWindowFu
import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.evictors.CountEvictor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
......@@ -45,6 +49,7 @@ import org.junit.Test;
import java.util.Comparator;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class EvictingWindowOperatorTest {
......@@ -73,7 +78,8 @@ public class EvictingWindowOperatorTest {
stateDesc,
new InternalIterableWindowFunction<>(new ReduceIterableWindowFunction<String, GlobalWindow, Tuple2<String, Integer>>(new SumReducer())),
CountTrigger.of(WINDOW_SLIDE),
CountEvictor.of(WINDOW_SIZE));
CountEvictor.of(WINDOW_SIZE),
0);
operator.setInputType(inputType, new ExecutionConfig());
......@@ -144,7 +150,8 @@ public class EvictingWindowOperatorTest {
stateDesc,
new InternalIterableWindowFunction<>(new RichSumReducer<GlobalWindow>(closeCalled)),
CountTrigger.of(WINDOW_SLIDE),
CountEvictor.of(WINDOW_SIZE));
CountEvictor.of(WINDOW_SIZE),
0);
operator.setInputType(inputType, new ExecutionConfig());
......@@ -194,7 +201,69 @@ public class EvictingWindowOperatorTest {
Assert.assertEquals("Close was not called.", 1, closeCalled.get());
}
// ------------------------------------------------------------------------
@Test
@SuppressWarnings("unchecked")
public void testTumblingWindowWithApply() throws Exception {
AtomicInteger closeCalled = new AtomicInteger(0);
final int WINDOW_SIZE = 4;
TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
ListStateDescriptor<StreamRecord<Tuple2<String, Integer>>> stateDesc = new ListStateDescriptor<>("window-contents",
new StreamRecordSerializer<>(inputType.createSerializer(new ExecutionConfig())));
EvictingWindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator = new EvictingWindowOperator<>(
TumblingEventTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)),
new TimeWindow.Serializer(),
new TupleKeySelector(),
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
stateDesc,
new InternalIterableWindowFunction<>(new RichSumReducer<TimeWindow>(closeCalled)),
EventTimeTrigger.create(),
CountEvictor.of(WINDOW_SIZE),
0);
operator.setInputType(inputType, new ExecutionConfig());
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
new OneInputStreamOperatorTestHarness<>(operator);
testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
long initialTime = 0L;
testHarness.open();
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 10));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 100));
testHarness.processWatermark(new Watermark(1999));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 1997));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 1998));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 2310)); // not late but more than 4
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 2310));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 2310));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 2310));
testHarness.processWatermark(new Watermark(3999)); // now is the evictor
ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
expectedOutput.add(new Watermark(1999));
expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 4), 3999));
expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 2), 3999));
expectedOutput.add(new Watermark(3999));
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(),
new EvictingWindowOperatorTest.ResultSortComparator());
testHarness.close();
}
// ------------------------------------------------------------------------
// UDFs
// ------------------------------------------------------------------------
......
......@@ -27,6 +27,7 @@ import org.apache.flink.streaming.api.functions.aggregation.{ComparableAggregato
import org.apache.flink.streaming.api.scala.function.AllWindowFunction
import org.apache.flink.streaming.api.scala.function.util.{ScalaAllWindowFunction, ScalaAllWindowFunctionWrapper, ScalaReduceFunction, ScalaFoldFunction}
import org.apache.flink.streaming.api.windowing.evictors.Evictor
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.triggers.Trigger
import org.apache.flink.streaming.api.windowing.windows.Window
import org.apache.flink.util.Collector
......@@ -54,7 +55,21 @@ import org.apache.flink.util.Collector
*/
@Public
class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) {
/**
* Sets the allowed lateness to a user-specified value.
* If not explicitly set, the allowed lateness is [[Long.MaxValue]].
* Setting the allowed lateness is only valid for event-time windows.
* If a value different than 0 is provided with a processing-time
* [[org.apache.flink.streaming.api.windowing.assigners.WindowAssigner]],
* then an exception is thrown.
*/
@PublicEvolving
def allowedLateness(lateness: Time): AllWindowedStream[T, W] = {
javaStream.allowedLateness(lateness)
this
}
/**
* Sets the [[Trigger]] that should be used to trigger window emission.
*/
......
......@@ -27,6 +27,7 @@ import org.apache.flink.streaming.api.functions.aggregation.{ComparableAggregato
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.scala.function.util.{ScalaFoldFunction, ScalaReduceFunction, ScalaWindowFunction, ScalaWindowFunctionWrapper}
import org.apache.flink.streaming.api.windowing.evictors.Evictor
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.triggers.Trigger
import org.apache.flink.streaming.api.windowing.windows.Window
import org.apache.flink.util.Collector
......@@ -58,6 +59,20 @@ import org.apache.flink.util.Collector
@Public
class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) {
/**
* Sets the allowed lateness to a user-specified value.
* If not explicitly set, the allowed lateness is [[Long.MaxValue]].
* Setting the allowed lateness is only valid for event-time windows.
* If a value different than 0 is provided with a processing-time
* [[org.apache.flink.streaming.api.windowing.assigners.WindowAssigner]],
* then an exception is thrown.
*/
@PublicEvolving
def allowedLateness(lateness: Time): WindowedStream[T, K, W] = {
javaStream.allowedLateness(lateness)
this
}
/**
* Sets the [[Trigger]] that should be used to trigger window emission.
*/
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册