提交 07a15d0e 编写于 作者: C Chen Qin 提交者: Aljoscha Krettek

[FLINK-4460] Provide late-data output for window operations

We use side outputs to emit dropped late data.
上级 e134d275
......@@ -63,6 +63,8 @@ import org.apache.flink.streaming.runtime.operators.windowing.functions.Internal
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalWindowFunction;
import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.OutputTag;
import org.apache.flink.util.Preconditions;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
......@@ -106,6 +108,12 @@ public class AllWindowedStream<T, W extends Window> {
/** The user-specified allowed lateness. */
private long allowedLateness = 0L;
/**
* Side output {@code OutputTag} for late data. If no tag is set late data will simply be
* dropped.
*/
private OutputTag<T> lateDataOutputTag;
@PublicEvolving
public AllWindowedStream(DataStream<T> input,
WindowAssigner<? super T, W> windowAssigner) {
......@@ -143,6 +151,23 @@ public class AllWindowedStream<T, W extends Window> {
return this;
}
/**
* Send late arriving data to the side output identified by the given {@link OutputTag}. Data
* is considered late after the watermark has passed the end of the window plus the allowed
* lateness set using {@link #allowedLateness(Time)}.
*
* <p>You can get the stream of late data using
* {@link SingleOutputStreamOperator#getSideOutput(OutputTag)} on the
* {@link SingleOutputStreamOperator} resulting from the windowed operation
* with the same {@link OutputTag}.
*/
@PublicEvolving
public AllWindowedStream<T, W> sideOutputLateData(OutputTag<T> outputTag) {
Preconditions.checkNotNull(outputTag, "Side output tag must not be null.");
this.lateDataOutputTag = input.getExecutionEnvironment().clean(outputTag);
return this;
}
/**
* Sets the {@code Evictor} that should be used to evict elements from a window before emission.
*
......@@ -271,7 +296,8 @@ public class AllWindowedStream<T, W extends Window> {
new InternalIterableAllWindowFunction<>(new ReduceApplyAllWindowFunction<>(reduceFunction, function)),
trigger,
evictor,
allowedLateness);
allowedLateness,
lateDataOutputTag);
} else {
ReducingStateDescriptor<T> stateDesc = new ReducingStateDescriptor<>("window-contents",
......@@ -288,7 +314,8 @@ public class AllWindowedStream<T, W extends Window> {
stateDesc,
new InternalSingleValueAllWindowFunction<>(function),
trigger,
allowedLateness);
allowedLateness,
lateDataOutputTag);
}
return input.transform(opName, resultType, operator).forceNonParallel();
......@@ -367,7 +394,8 @@ public class AllWindowedStream<T, W extends Window> {
new InternalIterableProcessAllWindowFunction<>(new ReduceApplyProcessAllWindowFunction<>(reduceFunction, function)),
trigger,
evictor,
allowedLateness);
allowedLateness,
lateDataOutputTag);
} else {
ReducingStateDescriptor<T> stateDesc = new ReducingStateDescriptor<>("window-contents",
......@@ -384,7 +412,8 @@ public class AllWindowedStream<T, W extends Window> {
stateDesc,
new InternalSingleValueProcessAllWindowFunction<>(function),
trigger,
allowedLateness);
allowedLateness,
lateDataOutputTag);
}
return input.transform(opName, resultType, operator).forceNonParallel();
......@@ -562,7 +591,8 @@ public class AllWindowedStream<T, W extends Window> {
new AggregateApplyAllWindowFunction<>(aggregateFunction, windowFunction)),
trigger,
evictor,
allowedLateness);
allowedLateness,
lateDataOutputTag);
} else {
AggregatingStateDescriptor<T, ACC, V> stateDesc = new AggregatingStateDescriptor<>(
......@@ -580,7 +610,8 @@ public class AllWindowedStream<T, W extends Window> {
stateDesc,
new InternalSingleValueAllWindowFunction<>(windowFunction),
trigger,
allowedLateness);
allowedLateness,
lateDataOutputTag);
}
return input.transform(opName, resultType, operator).forceNonParallel();
......@@ -693,7 +724,8 @@ public class AllWindowedStream<T, W extends Window> {
new InternalAggregateProcessAllWindowFunction<>(aggregateFunction, windowFunction),
trigger,
evictor,
allowedLateness);
allowedLateness,
lateDataOutputTag);
} else {
AggregatingStateDescriptor<T, ACC, V> stateDesc = new AggregatingStateDescriptor<>(
......@@ -711,7 +743,8 @@ public class AllWindowedStream<T, W extends Window> {
stateDesc,
new InternalSingleValueProcessAllWindowFunction<>(windowFunction),
trigger,
allowedLateness);
allowedLateness,
lateDataOutputTag);
}
return input.transform(opName, resultType, operator).forceNonParallel();
......@@ -842,7 +875,8 @@ public class AllWindowedStream<T, W extends Window> {
new InternalIterableAllWindowFunction<>(new FoldApplyAllWindowFunction<>(initialValue, foldFunction, function, foldAccumulatorType)),
trigger,
evictor,
allowedLateness);
allowedLateness,
lateDataOutputTag);
} else {
FoldingStateDescriptor<T, ACC> stateDesc = new FoldingStateDescriptor<>("window-contents",
......@@ -858,7 +892,8 @@ public class AllWindowedStream<T, W extends Window> {
stateDesc,
new InternalSingleValueAllWindowFunction<>(function),
trigger,
allowedLateness);
allowedLateness,
lateDataOutputTag);
}
return input.transform(opName, resultType, operator).forceNonParallel();
......@@ -948,7 +983,8 @@ public class AllWindowedStream<T, W extends Window> {
new InternalIterableProcessAllWindowFunction<>(new FoldApplyProcessAllWindowFunction<>(initialValue, foldFunction, function, foldAccumulatorType)),
trigger,
evictor,
allowedLateness);
allowedLateness,
lateDataOutputTag);
} else {
FoldingStateDescriptor<T, ACC> stateDesc = new FoldingStateDescriptor<>("window-contents",
......@@ -964,7 +1000,8 @@ public class AllWindowedStream<T, W extends Window> {
stateDesc,
new InternalSingleValueProcessAllWindowFunction<>(function),
trigger,
allowedLateness);
allowedLateness,
lateDataOutputTag);
}
return input.transform(opName, resultType, operator).forceNonParallel();
......@@ -1080,7 +1117,8 @@ public class AllWindowedStream<T, W extends Window> {
function,
trigger,
evictor,
allowedLateness);
allowedLateness,
lateDataOutputTag);
} else {
ListStateDescriptor<T> stateDesc = new ListStateDescriptor<>("window-contents",
......@@ -1096,7 +1134,8 @@ public class AllWindowedStream<T, W extends Window> {
stateDesc,
function,
trigger,
allowedLateness);
allowedLateness,
lateDataOutputTag);
}
return input.transform(opName, resultType, operator).forceNonParallel();
......@@ -1177,7 +1216,8 @@ public class AllWindowedStream<T, W extends Window> {
new InternalIterableAllWindowFunction<>(new ReduceApplyAllWindowFunction<>(reduceFunction, function)),
trigger,
evictor,
allowedLateness);
allowedLateness,
lateDataOutputTag);
} else {
ReducingStateDescriptor<T> stateDesc = new ReducingStateDescriptor<>("window-contents",
......@@ -1194,7 +1234,8 @@ public class AllWindowedStream<T, W extends Window> {
stateDesc,
new InternalSingleValueAllWindowFunction<>(function),
trigger,
allowedLateness);
allowedLateness,
lateDataOutputTag);
}
return input.transform(opName, resultType, operator).forceNonParallel();
......@@ -1280,7 +1321,8 @@ public class AllWindowedStream<T, W extends Window> {
new InternalIterableAllWindowFunction<>(new FoldApplyAllWindowFunction<>(initialValue, foldFunction, function, resultType)),
trigger,
evictor,
allowedLateness);
allowedLateness,
lateDataOutputTag);
} else {
FoldingStateDescriptor<T, R> stateDesc = new FoldingStateDescriptor<>("window-contents",
......@@ -1296,7 +1338,8 @@ public class AllWindowedStream<T, W extends Window> {
stateDesc,
new InternalSingleValueAllWindowFunction<>(function),
trigger,
allowedLateness);
allowedLateness,
lateDataOutputTag);
}
return input.transform(opName, resultType, operator).forceNonParallel();
......
......@@ -73,6 +73,8 @@ import org.apache.flink.streaming.runtime.operators.windowing.WindowOperator;
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalWindowFunction;
import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.OutputTag;
import org.apache.flink.util.Preconditions;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
......@@ -120,6 +122,12 @@ public class WindowedStream<T, K, W extends Window> {
/** The user-specified allowed lateness. */
private long allowedLateness = 0L;
/**
* Side output {@code OutputTag} for late data. If no tag is set late data will simply be
* dropped.
*/
private OutputTag<T> lateDataOutputTag;
@PublicEvolving
public WindowedStream(KeyedStream<T, K> input,
WindowAssigner<? super T, W> windowAssigner) {
......@@ -161,6 +169,23 @@ public class WindowedStream<T, K, W extends Window> {
return this;
}
/**
* Send late arriving data to the side output identified by the given {@link OutputTag}. Data
* is considered late after the watermark has passed the end of the window plus the allowed
* lateness set using {@link #allowedLateness(Time)}.
*
* <p>You can get the stream of late data using
* {@link SingleOutputStreamOperator#getSideOutput(OutputTag)} on the
* {@link SingleOutputStreamOperator} resulting from the windowed operation
* with the same {@link OutputTag}.
*/
@PublicEvolving
public WindowedStream<T, K, W> sideOutputLateData(OutputTag<T> outputTag) {
Preconditions.checkNotNull(outputTag, "Side output tag must not be null.");
this.lateDataOutputTag = input.getExecutionEnvironment().clean(outputTag);
return this;
}
/**
* Sets the {@code Evictor} that should be used to evict elements from a window before emission.
*
......@@ -344,7 +369,8 @@ public class WindowedStream<T, K, W extends Window> {
new InternalIterableWindowFunction<>(new ReduceApplyWindowFunction<>(reduceFunction, function)),
trigger,
evictor,
allowedLateness);
allowedLateness,
lateDataOutputTag);
} else {
ReducingStateDescriptor<T> stateDesc = new ReducingStateDescriptor<>("window-contents",
......@@ -362,6 +388,7 @@ public class WindowedStream<T, K, W extends Window> {
new InternalSingleValueWindowFunction<>(function),
trigger,
allowedLateness,
lateDataOutputTag,
legacyWindowOpType);
}
......@@ -437,7 +464,8 @@ public class WindowedStream<T, K, W extends Window> {
new InternalIterableProcessWindowFunction<>(new ReduceApplyProcessWindowFunction<>(reduceFunction, function)),
trigger,
evictor,
allowedLateness);
allowedLateness,
lateDataOutputTag);
} else {
ReducingStateDescriptor<T> stateDesc = new ReducingStateDescriptor<>("window-contents",
......@@ -454,7 +482,8 @@ public class WindowedStream<T, K, W extends Window> {
stateDesc,
new InternalSingleValueProcessWindowFunction<>(function),
trigger,
allowedLateness);
allowedLateness,
lateDataOutputTag);
}
return input.transform(opName, resultType, operator);
......@@ -589,7 +618,8 @@ public class WindowedStream<T, K, W extends Window> {
new InternalIterableWindowFunction<>(new FoldApplyWindowFunction<>(initialValue, foldFunction, function, foldAccumulatorType)),
trigger,
evictor,
allowedLateness);
allowedLateness,
lateDataOutputTag);
} else {
FoldingStateDescriptor<T, ACC> stateDesc = new FoldingStateDescriptor<>("window-contents",
......@@ -604,7 +634,8 @@ public class WindowedStream<T, K, W extends Window> {
stateDesc,
new InternalSingleValueWindowFunction<>(function),
trigger,
allowedLateness);
allowedLateness,
lateDataOutputTag);
}
return input.transform(opName, resultType, operator);
......@@ -697,7 +728,8 @@ public class WindowedStream<T, K, W extends Window> {
new InternalIterableProcessWindowFunction<>(new FoldApplyProcessWindowFunction<>(initialValue, foldFunction, windowFunction, foldResultType)),
trigger,
evictor,
allowedLateness);
allowedLateness,
lateDataOutputTag);
} else {
FoldingStateDescriptor<T, ACC> stateDesc = new FoldingStateDescriptor<>("window-contents",
......@@ -715,7 +747,8 @@ public class WindowedStream<T, K, W extends Window> {
stateDesc,
new InternalSingleValueProcessWindowFunction<>(windowFunction),
trigger,
allowedLateness);
allowedLateness,
lateDataOutputTag);
}
return input.transform(opName, windowResultType, operator);
......@@ -890,7 +923,8 @@ public class WindowedStream<T, K, W extends Window> {
new InternalIterableWindowFunction<>(new AggregateApplyWindowFunction<>(aggregateFunction, windowFunction)),
trigger,
evictor,
allowedLateness);
allowedLateness,
lateDataOutputTag);
} else {
AggregatingStateDescriptor<T, ACC, V> stateDesc = new AggregatingStateDescriptor<>("window-contents",
......@@ -905,7 +939,8 @@ public class WindowedStream<T, K, W extends Window> {
stateDesc,
new InternalSingleValueWindowFunction<>(windowFunction),
trigger,
allowedLateness);
allowedLateness,
lateDataOutputTag);
}
return input.transform(opName, resultType, operator);
......@@ -1017,7 +1052,8 @@ public class WindowedStream<T, K, W extends Window> {
new InternalAggregateProcessWindowFunction<>(aggregateFunction, windowFunction),
trigger,
evictor,
allowedLateness);
allowedLateness,
lateDataOutputTag);
} else {
AggregatingStateDescriptor<T, ACC, V> stateDesc = new AggregatingStateDescriptor<>("window-contents",
......@@ -1032,7 +1068,8 @@ public class WindowedStream<T, K, W extends Window> {
stateDesc,
new InternalSingleValueProcessWindowFunction<>(windowFunction),
trigger,
allowedLateness);
allowedLateness,
lateDataOutputTag);
}
return input.transform(opName, resultType, operator);
......@@ -1154,7 +1191,8 @@ public class WindowedStream<T, K, W extends Window> {
function,
trigger,
evictor,
allowedLateness);
allowedLateness,
lateDataOutputTag);
} else {
ListStateDescriptor<T> stateDesc = new ListStateDescriptor<>("window-contents",
......@@ -1171,6 +1209,7 @@ public class WindowedStream<T, K, W extends Window> {
function,
trigger,
allowedLateness,
lateDataOutputTag,
legacyWindowOpType);
}
......@@ -1252,7 +1291,8 @@ public class WindowedStream<T, K, W extends Window> {
new InternalIterableWindowFunction<>(new ReduceApplyWindowFunction<>(reduceFunction, function)),
trigger,
evictor,
allowedLateness);
allowedLateness,
lateDataOutputTag);
} else {
ReducingStateDescriptor<T> stateDesc = new ReducingStateDescriptor<>("window-contents",
......@@ -1269,7 +1309,8 @@ public class WindowedStream<T, K, W extends Window> {
stateDesc,
new InternalSingleValueWindowFunction<>(function),
trigger,
allowedLateness);
allowedLateness,
lateDataOutputTag);
}
return input.transform(opName, resultType, operator);
......@@ -1354,7 +1395,8 @@ public class WindowedStream<T, K, W extends Window> {
new InternalIterableWindowFunction<>(new FoldApplyWindowFunction<>(initialValue, foldFunction, function, resultType)),
trigger,
evictor,
allowedLateness);
allowedLateness,
lateDataOutputTag);
} else {
FoldingStateDescriptor<T, R> stateDesc = new FoldingStateDescriptor<>("window-contents",
......@@ -1369,7 +1411,8 @@ public class WindowedStream<T, K, W extends Window> {
stateDesc,
new InternalSingleValueWindowFunction<>(function),
trigger,
allowedLateness);
allowedLateness,
lateDataOutputTag);
}
return input.transform(opName, resultType, operator);
......
......@@ -39,6 +39,7 @@ import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalWindowFunction;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.OutputTag;
import java.util.Collection;
......@@ -86,10 +87,11 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window>
InternalWindowFunction<Iterable<IN>, OUT, K, W> windowFunction,
Trigger<? super IN, ? super W> trigger,
Evictor<? super IN, ? super W> evictor,
long allowedLateness) {
long allowedLateness,
OutputTag<IN> lateDataOutputTag) {
super(windowAssigner, windowSerializer, keySelector,
keySerializer, null, windowFunction, trigger, allowedLateness);
keySerializer, null, windowFunction, trigger, allowedLateness, lateDataOutputTag);
this.evictor = checkNotNull(evictor);
this.evictingWindowStateDescriptor = checkNotNull(windowStateDescriptor);
......@@ -137,7 +139,7 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window>
});
// check if the window is already inactive
if (isLate(actualWindow)) {
if (isWindowLate(actualWindow)) {
mergingWindows.retireWindow(actualWindow);
continue;
}
......@@ -177,7 +179,7 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window>
for (W window : elementWindows) {
// check if the window is already inactive
if (isLate(window)) {
if (isWindowLate(window)) {
continue;
}
......
......@@ -29,6 +29,7 @@ import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.util.OutputTag;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
......@@ -132,6 +133,13 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
*/
private final long allowedLateness;
/**
* {@link OutputTag} to use for late arriving events. Elements for which
* {@code window.maxTimestamp + allowedLateness} is smaller than the current watermark will
* be emitted to this.
*/
private final OutputTag<IN> lateDataOutputTag;
// ------------------------------------------------------------------------
// State that is not checkpointed
// ------------------------------------------------------------------------
......@@ -200,10 +208,11 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
StateDescriptor<? extends AppendingState<IN, ACC>, ?> windowStateDescriptor,
InternalWindowFunction<ACC, OUT, K, W> windowFunction,
Trigger<? super IN, ? super W> trigger,
long allowedLateness) {
long allowedLateness,
OutputTag<IN> lateDataOutputTag) {
this(windowAssigner, windowSerializer, keySelector, keySerializer,
windowStateDescriptor, windowFunction, trigger, allowedLateness, LegacyWindowOperatorType.NONE);
windowStateDescriptor, windowFunction, trigger, allowedLateness, lateDataOutputTag, LegacyWindowOperatorType.NONE);
}
/**
......@@ -218,6 +227,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
InternalWindowFunction<ACC, OUT, K, W> windowFunction,
Trigger<? super IN, ? super W> trigger,
long allowedLateness,
OutputTag<IN> lateDataOutputTag,
LegacyWindowOperatorType legacyWindowOperatorType) {
super(windowFunction);
......@@ -239,6 +249,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
this.windowStateDescriptor = windowStateDescriptor;
this.trigger = checkNotNull(trigger);
this.allowedLateness = allowedLateness;
this.lateDataOutputTag = lateDataOutputTag;
this.legacyWindowOperatorType = legacyWindowOperatorType;
setChainingStrategy(ChainingStrategy.ALWAYS);
......@@ -323,6 +334,9 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
final Collection<W> elementWindows = windowAssigner.assignWindows(
element.getValue(), element.getTimestamp(), windowAssignerContext);
//if element is handled by none of assigned elementWindows
boolean isSkippedElement = true;
final K key = this.<K>getKeyedStateBackend().getCurrentKey();
if (windowAssigner instanceof MergingWindowAssigner) {
......@@ -355,10 +369,11 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
});
// drop if the window is already late
if (isLate(actualWindow)) {
if (isWindowLate(actualWindow)) {
mergingWindows.retireWindow(actualWindow);
continue;
}
isSkippedElement = false;
W stateWindow = mergingWindows.getStateWindow(actualWindow);
if (stateWindow == null) {
......@@ -393,9 +408,10 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
for (W window: elementWindows) {
// drop if the window is already late
if (isLate(window)) {
if (isWindowLate(window)) {
continue;
}
isSkippedElement = false;
windowState.setCurrentNamespace(window);
windowState.add(element.getValue());
......@@ -419,6 +435,14 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
registerCleanupTimer(window);
}
}
// side output input event if
// element not handled by any window
// late arriving tag has been set
// windowAssigner is event time and current timestamp + allowed lateness no less than element timestamp
if (isSkippedElement && lateDataOutputTag != null && isElementLate(element)) {
sideOutput(element);
}
}
@Override
......@@ -545,6 +569,15 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
userFunction.apply(context.key, context.window, contents, timestampedCollector);
}
/**
* Write skipped late arriving element to SideOutput
*
* @param element skipped late arriving element to side output
*/
private void sideOutput(StreamRecord<IN> element){
output.collect(lateDataOutputTag, element);
}
/**
* Retrieves the {@link MergingWindowSet} for the currently active key.
* The caller must ensure that the correct key is set in the state backend.
......@@ -562,10 +595,21 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
* Returns {@code true} if the watermark is after the end timestamp plus the allowed lateness
* of the given window.
*/
protected boolean isLate(W window) {
protected boolean isWindowLate(W window) {
return (windowAssigner.isEventTime() && (cleanupTime(window) <= internalTimerService.currentWatermark()));
}
/**
* Decide if a record is currently late, based on current watermark and allowed lateness.
*
* @param element The element to check
* @return The element for which should be considered when sideoutputs
*/
protected boolean isElementLate(StreamRecord<IN> element){
return (windowAssigner.isEventTime()) &&
(element.getTimestamp() + allowedLateness <= internalTimerService.currentWatermark());
}
/**
* Registers a timer to cleanup the content of the window.
* @param window
......
......@@ -89,7 +89,8 @@ public class EvictingWindowOperatorTest {
new InternalIterableWindowFunction<>(new RichSumReducer<GlobalWindow>(closeCalled)),
CountTrigger.of(TRIGGER_COUNT),
CountEvictor.of(WINDOW_SIZE,EVICT_AFTER),
0);
0,
null /* late data output tag */);
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
......@@ -169,7 +170,8 @@ public class EvictingWindowOperatorTest {
new InternalIterableWindowFunction<>(new RichSumReducer<GlobalWindow>(closeCalled)),
CountTrigger.of(TRIGGER_COUNT),
TimeEvictor.of(Time.seconds(2), EVICT_AFTER),
0);
0,
null /* late data output tag */);
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
......@@ -243,7 +245,8 @@ public class EvictingWindowOperatorTest {
new InternalIterableWindowFunction<>(new RichSumReducer<TimeWindow>(closeCalled)),
CountTrigger.of(TRIGGER_COUNT),
TimeEvictor.of(Time.seconds(2)),
0);
0,
null /* late data output tag */);
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
......@@ -319,7 +322,8 @@ public class EvictingWindowOperatorTest {
new InternalIterableWindowFunction<>(new RichSumReducer<GlobalWindow>(closeCalled)),
CountTrigger.of(TRIGGER_COUNT),
TimeEvictor.of(Time.seconds(2), EVICT_AFTER),
0);
0,
null /* late data output tag */);
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
......@@ -398,7 +402,8 @@ public class EvictingWindowOperatorTest {
return newDataPoint.f1 - oldDataPoint.f1;
}
}, EVICT_AFTER),
0);
0,
null /* late data output tag */);
......@@ -475,7 +480,8 @@ public class EvictingWindowOperatorTest {
return newDataPoint.f1 - oldDataPoint.f1;
}
}, EVICT_AFTER),
0);
0,
null /* late data output tag */);
......@@ -543,7 +549,8 @@ public class EvictingWindowOperatorTest {
new InternalIterableWindowFunction<>(new ReduceIterableWindowFunction<String, GlobalWindow, Tuple2<String, Integer>>(new SumReducer())),
CountTrigger.of(WINDOW_SLIDE),
CountEvictor.of(WINDOW_SIZE),
0);
0,
null /* late data output tag */);
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
......@@ -615,7 +622,8 @@ public class EvictingWindowOperatorTest {
new InternalIterableWindowFunction<>(new RichSumReducer<GlobalWindow>(closeCalled)),
CountTrigger.of(WINDOW_SLIDE),
CountEvictor.of(WINDOW_SIZE),
0);
0,
null /* late data output tag */);
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
......@@ -686,7 +694,8 @@ public class EvictingWindowOperatorTest {
new InternalIterableWindowFunction<>(new RichSumReducer<TimeWindow>(closeCalled)),
EventTimeTrigger.create(),
CountEvictor.of(WINDOW_SIZE),
0);
0,
null /* late data output tag */);
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
......
......@@ -2322,7 +2322,8 @@ public class WindowOperatorContractTest extends TestLogger {
stateDescriptor,
windowFunction,
trigger,
allowedLatenss);
allowedLatenss,
null /*late data output tag */);
return new KeyedOneInputStreamOperatorTestHarness<>(
operator,
......
......@@ -100,7 +100,8 @@ public class WindowOperatorMigrationTest {
stateDesc,
new InternalIterableWindowFunction<>(new SessionWindowFunction()),
PurgingTrigger.of(CountTrigger.of(4)),
0);
0,
null /* late data output tag */);
ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
......@@ -181,7 +182,8 @@ public class WindowOperatorMigrationTest {
stateDesc,
new InternalIterableWindowFunction<>(new SessionWindowFunction()),
PurgingTrigger.of(CountTrigger.of(4)),
0);
0,
null /* late data output tag */);
ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
......@@ -258,7 +260,8 @@ public class WindowOperatorMigrationTest {
stateDesc,
new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction<String, TimeWindow, Tuple2<String, Integer>>()),
EventTimeTrigger.create(),
0);
0,
null /* late data output tag */);
ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
......@@ -346,7 +349,8 @@ public class WindowOperatorMigrationTest {
stateDesc,
new InternalIterableWindowFunction<>(new RichSumReducer<TimeWindow>()),
EventTimeTrigger.create(),
0);
0,
null /* late data output tag */);
ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
......@@ -435,7 +439,8 @@ public class WindowOperatorMigrationTest {
stateDesc,
new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction<String, TimeWindow, Tuple2<String, Integer>>()),
ProcessingTimeTrigger.create(),
0);
0,
null /* late data output tag */);
ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
......@@ -510,7 +515,8 @@ public class WindowOperatorMigrationTest {
stateDesc,
new InternalIterableWindowFunction<>(new RichSumReducer<TimeWindow>()),
ProcessingTimeTrigger.create(),
0);
0,
null /* late data output tag */);
ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
......@@ -633,6 +639,7 @@ public class WindowOperatorMigrationTest {
new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction<String, TimeWindow, Tuple2<String, Integer>>()),
ProcessingTimeTrigger.create(),
0,
null /* late data output tag */,
LegacyWindowOperatorType.FAST_AGGREGATING);
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
......@@ -735,6 +742,7 @@ public class WindowOperatorMigrationTest {
new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction<String, TimeWindow, Tuple2<String, Integer>>()),
ProcessingTimeTrigger.create(),
0,
null /* late data output tag */,
LegacyWindowOperatorType.FAST_ACCUMULATING);
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
......
......@@ -26,6 +26,7 @@ import org.apache.flink.api.common.state.FoldingStateDescriptor;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.util.OutputTag;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
......@@ -92,6 +93,9 @@ public class WindowOperatorTest extends TestLogger {
// For counting if close() is called the correct number of times on the SumReducer
private static AtomicInteger closeCalled = new AtomicInteger(0);
// late arriving event OutputTag<StreamRecord<IN>>
private static final OutputTag<Tuple2<String, Integer>> lateOutputTag = new OutputTag<Tuple2<String, Integer>>("late-output") {};
private void testSlidingEventTimeWindows(OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness) throws Exception {
ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
......@@ -181,7 +185,8 @@ public class WindowOperatorTest extends TestLogger {
stateDesc,
new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction<String, TimeWindow, Tuple2<String, Integer>>()),
EventTimeTrigger.create(),
0);
0,
null /* late data output tag */);
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
......@@ -215,7 +220,8 @@ public class WindowOperatorTest extends TestLogger {
stateDesc,
new InternalIterableWindowFunction<>(new RichSumReducer<TimeWindow>()),
EventTimeTrigger.create(),
0);
0,
null /* late data output tag */);
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
......@@ -314,7 +320,8 @@ public class WindowOperatorTest extends TestLogger {
stateDesc,
new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction<String, TimeWindow, Tuple2<String, Integer>>()),
EventTimeTrigger.create(),
0);
0,
null /* late data output tag */);
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
......@@ -346,7 +353,8 @@ public class WindowOperatorTest extends TestLogger {
stateDesc,
new InternalIterableWindowFunction<>(new RichSumReducer<TimeWindow>()),
EventTimeTrigger.create(),
0);
0,
null /* late data output tag */);
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
......@@ -381,7 +389,8 @@ public class WindowOperatorTest extends TestLogger {
stateDesc,
new InternalIterableWindowFunction<>(new SessionWindowFunction()),
EventTimeTrigger.create(),
0);
0,
null /* late data output tag */);
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
......@@ -453,7 +462,8 @@ public class WindowOperatorTest extends TestLogger {
stateDesc,
new InternalIterableProcessWindowFunction<>(new SessionProcessWindowFunction()),
EventTimeTrigger.create(),
0);
0,
null /* late data output tag */);
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
......@@ -525,7 +535,8 @@ public class WindowOperatorTest extends TestLogger {
stateDesc,
new InternalSingleValueWindowFunction<>(new ReducedSessionWindowFunction()),
EventTimeTrigger.create(),
0);
0,
null /* late data output tag */);
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
......@@ -595,7 +606,8 @@ public class WindowOperatorTest extends TestLogger {
stateDesc,
new InternalSingleValueProcessWindowFunction<>(new ReducedProcessSessionWindowFunction()),
EventTimeTrigger.create(),
0);
0,
null /* late data output tag */);
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
......@@ -669,7 +681,8 @@ public class WindowOperatorTest extends TestLogger {
stateDesc,
new InternalIterableWindowFunction<>(new SessionWindowFunction()),
PurgingTrigger.of(CountTrigger.of(4)),
0);
0,
null /* late data output tag */);
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
......@@ -739,7 +752,8 @@ public class WindowOperatorTest extends TestLogger {
stateDesc,
new InternalIterableWindowFunction<>(new SessionWindowFunction()),
ContinuousEventTimeTrigger.of(Time.seconds(2)),
0);
0,
null /* late data output tag */);
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
......@@ -843,7 +857,8 @@ public class WindowOperatorTest extends TestLogger {
stateDesc,
new InternalIterableWindowFunction<>(new SessionWindowFunction()),
EventTimeTrigger.create(),
0);
0,
null /* late data output tag */);
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
......@@ -901,7 +916,8 @@ public class WindowOperatorTest extends TestLogger {
stateDesc,
new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction<String, GlobalWindow, Tuple2<String, Integer>>()),
ContinuousEventTimeTrigger.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)),
0);
0,
null /* late data output tag */);
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
......@@ -989,7 +1005,8 @@ public class WindowOperatorTest extends TestLogger {
stateDesc,
new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction<String, GlobalWindow, Tuple2<String, Integer>>()),
PurgingTrigger.of(CountTrigger.of(WINDOW_SIZE)),
0);
0,
null /* late data output tag */);
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
......@@ -1030,7 +1047,8 @@ public class WindowOperatorTest extends TestLogger {
stateDesc,
new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction<String, GlobalWindow, Tuple2<String, Integer>>()),
PurgingTrigger.of(CountTrigger.of(WINDOW_SIZE)),
0);
0,
null /* late data output tag */);
testHarness = new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
......@@ -1075,7 +1093,9 @@ public class WindowOperatorTest extends TestLogger {
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
stateDesc,
new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction<String, TimeWindow, Tuple2<String, Integer>>()),
ProcessingTimeTrigger.create(), 0);
ProcessingTimeTrigger.create(),
0,
null /* late data output tag */);
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
......@@ -1132,7 +1152,9 @@ public class WindowOperatorTest extends TestLogger {
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
stateDesc,
new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction<String, TimeWindow, Tuple2<String, Integer>>()),
ProcessingTimeTrigger.create(), 0);
ProcessingTimeTrigger.create(),
0,
null /* late data output tag */);
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
......@@ -1202,7 +1224,9 @@ public class WindowOperatorTest extends TestLogger {
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
stateDesc,
new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction<String, TimeWindow, Tuple2<String, Integer>>()),
ProcessingTimeTrigger.create(), 0);
ProcessingTimeTrigger.create(),
0,
null /* late data output tag */);
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
......@@ -1267,7 +1291,8 @@ public class WindowOperatorTest extends TestLogger {
stateDesc,
new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction<String, TimeWindow, Tuple2<String, Integer>>()),
PurgingTrigger.of(EventTimeTrigger.create()),
LATENESS);
LATENESS,
lateOutputTag);
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
......@@ -1275,6 +1300,7 @@ public class WindowOperatorTest extends TestLogger {
testHarness.open();
ConcurrentLinkedQueue<Object> expected = new ConcurrentLinkedQueue<>();
ConcurrentLinkedQueue<Object> lateExpected = new ConcurrentLinkedQueue<>();
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 500));
testHarness.processWatermark(new Watermark(1500));
......@@ -1287,7 +1313,7 @@ public class WindowOperatorTest extends TestLogger {
expected.add(new StreamRecord<>(new Tuple2<>("key2", 2), 1999));
expected.add(new Watermark(2300));
// this will not be dropped because window.maxTimestamp() + allowedLateness > currentWatermark
// this will not be sideoutput because window.maxTimestamp() + allowedLateness > currentWatermark
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1997));
testHarness.processWatermark(new Watermark(6000));
......@@ -1295,13 +1321,21 @@ public class WindowOperatorTest extends TestLogger {
expected.add(new StreamRecord<>(new Tuple2<>("key2", 1), 1999));
expected.add(new Watermark(6000));
// this will be dropped because window.maxTimestamp() + allowedLateness < currentWatermark
// this will be side output because window.maxTimestamp() + allowedLateness < currentWatermark
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1998));
testHarness.processWatermark(new Watermark(7000));
lateExpected.add(new StreamRecord<>(new Tuple2<>("key2", 1), 1998));
expected.add(new Watermark(7000));
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expected, testHarness.getOutput(), new Tuple2ResultSortComparator());
TestHarnessUtil.assertOutputEqualsSorted(
"SideOutput was not correct.",
lateExpected,
testHarness.getSideOutput(lateOutputTag),
new Tuple2ResultSortComparator());
testHarness.close();
}
......@@ -1327,7 +1361,8 @@ public class WindowOperatorTest extends TestLogger {
stateDesc,
new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction<String, TimeWindow, Tuple2<String, Integer>>()),
EventTimeTrigger.create(),
LATENESS);
LATENESS,
null /* late data output tag */);
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
......@@ -1374,7 +1409,7 @@ public class WindowOperatorTest extends TestLogger {
}
@Test
public void testDropDueToLatenessTumbling() throws Exception {
public void testSideOutputDueToLatenessTumbling() throws Exception {
final int WINDOW_SIZE = 2;
final long LATENESS = 0;
......@@ -1393,7 +1428,8 @@ public class WindowOperatorTest extends TestLogger {
stateDesc,
new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction<String, TimeWindow, Tuple2<String, Integer>>()),
EventTimeTrigger.create(),
LATENESS);
LATENESS,
lateOutputTag);
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
......@@ -1401,6 +1437,7 @@ public class WindowOperatorTest extends TestLogger {
testHarness.open();
ConcurrentLinkedQueue<Object> expected = new ConcurrentLinkedQueue<>();
ConcurrentLinkedQueue<Object> sideExpected = new ConcurrentLinkedQueue<>();
// normal element
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1000));
......@@ -1415,8 +1452,9 @@ public class WindowOperatorTest extends TestLogger {
expected.add(new StreamRecord<>(new Tuple2<>("key2", 2), 1999));
expected.add(new Watermark(1999));
// dropped as late
// sideoutput as late, will reuse previous timestamp since only input tuple is sideoutputed
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1998));
sideExpected.add(new StreamRecord<>(new Tuple2<>("key2", 1), 1998));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 2001));
testHarness.processWatermark(new Watermark(2999));
......@@ -1429,11 +1467,13 @@ public class WindowOperatorTest extends TestLogger {
expected.add(new Watermark(3999));
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expected, testHarness.getOutput(), new Tuple2ResultSortComparator());
TestHarnessUtil.assertOutputEqualsSorted("SideOutput was not correct.", sideExpected, testHarness.getSideOutput(
lateOutputTag), new Tuple2ResultSortComparator());
testHarness.close();
}
@Test
public void testDropDueToLatenessSliding() throws Exception {
public void testSideOutputDueToLatenessSliding() throws Exception {
final int WINDOW_SIZE = 3;
final int WINDOW_SLIDE = 1;
final long LATENESS = 0;
......@@ -1453,7 +1493,8 @@ public class WindowOperatorTest extends TestLogger {
stateDesc,
new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction<String, TimeWindow, Tuple2<String, Integer>>()),
EventTimeTrigger.create(),
LATENESS);
LATENESS,
lateOutputTag /* late data output tag */);
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
......@@ -1461,6 +1502,7 @@ public class WindowOperatorTest extends TestLogger {
testHarness.open();
ConcurrentLinkedQueue<Object> expected = new ConcurrentLinkedQueue<>();
ConcurrentLinkedQueue<Object> sideExpected = new ConcurrentLinkedQueue<>();
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1000));
testHarness.processWatermark(new Watermark(1999));
......@@ -1497,19 +1539,21 @@ public class WindowOperatorTest extends TestLogger {
expected.add(new Watermark(6000));
// dropped due to lateness
// sideoutput element due to lateness
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 3001));
sideExpected.add(new StreamRecord<>(new Tuple2<>("key1", 1), 3001));
testHarness.processWatermark(new Watermark(25000));
expected.add(new Watermark(25000));
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expected, testHarness.getOutput(), new Tuple2ResultSortComparator());
TestHarnessUtil.assertOutputEqualsSorted("SideOutput was not correct.", sideExpected, testHarness.getSideOutput(lateOutputTag), new Tuple2ResultSortComparator());
testHarness.close();
}
@Test
public void testDropDueToLatenessSessionZeroLatenessPurgingTrigger() throws Exception {
public void testSideOutputDueToLatenessSessionZeroLatenessPurgingTrigger() throws Exception {
final int GAP_SIZE = 3;
final long LATENESS = 0;
......@@ -1528,7 +1572,8 @@ public class WindowOperatorTest extends TestLogger {
stateDesc,
new InternalSingleValueWindowFunction<>(new ReducedSessionWindowFunction()),
PurgingTrigger.of(EventTimeTrigger.create()),
LATENESS);
LATENESS,
lateOutputTag);
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
......@@ -1536,6 +1581,7 @@ public class WindowOperatorTest extends TestLogger {
testHarness.open();
ConcurrentLinkedQueue<Object> expected = new ConcurrentLinkedQueue<>();
ConcurrentLinkedQueue<Object> sideExpected = new ConcurrentLinkedQueue<>();
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1000));
testHarness.processWatermark(new Watermark(1999));
......@@ -1571,10 +1617,13 @@ public class WindowOperatorTest extends TestLogger {
expected.add(new StreamRecord<>(new Tuple3<>("key2-1", 11600L, 14600L), 14599));
expected.add(new Watermark(14600));
// this is dropped as late
// this is side output as late
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 10000));
// this is also dropped as late (we test that they are not accidentally merged)
sideExpected.add(new StreamRecord<>(new Tuple2<>("key2", 1), 10000));
// this is also side output as late (we test that they are not accidentally merged)
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 10100));
sideExpected.add(new StreamRecord<>(new Tuple2<>("key2", 1), 10100));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 14500));
testHarness.processWatermark(new Watermark(20000));
......@@ -1587,19 +1636,16 @@ public class WindowOperatorTest extends TestLogger {
expected.add(new Watermark(100000));
ConcurrentLinkedQueue<Object> actual = testHarness.getOutput();
ConcurrentLinkedQueue<Object> sideActual = testHarness.getSideOutput(lateOutputTag);
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expected, actual, new Tuple2ResultSortComparator());
TestHarnessUtil.assertOutputEqualsSorted("SideOutput was not correct.", sideExpected, sideActual, new Tuple2ResultSortComparator());
testHarness.close();
}
@Test
public void testDropDueToLatenessSessionZeroLateness() throws Exception {
// same as testDropDueToLatenessSessionZeroLateness() but with an accumulating trigger, i.e.
// one that does not return FIRE_AND_PURGE when firing but just FIRE
// this has the same output as testDropDueToLatenessSessionZeroLateness() because
// accumulating/discarding does not make a difference with "allowed lateness" = 0.
public void testSideOutputDueToLatenessSessionZeroLateness() throws Exception {
final int GAP_SIZE = 3;
final long LATENESS = 0;
......@@ -1618,7 +1664,8 @@ public class WindowOperatorTest extends TestLogger {
stateDesc,
new InternalSingleValueWindowFunction<>(new ReducedSessionWindowFunction()),
EventTimeTrigger.create(),
LATENESS);
LATENESS,
lateOutputTag);
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
......@@ -1626,6 +1673,7 @@ public class WindowOperatorTest extends TestLogger {
testHarness.open();
ConcurrentLinkedQueue<Object> expected = new ConcurrentLinkedQueue<>();
ConcurrentLinkedQueue<Object> sideExpected = new ConcurrentLinkedQueue<>();
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1000));
testHarness.processWatermark(new Watermark(1999));
......@@ -1661,8 +1709,9 @@ public class WindowOperatorTest extends TestLogger {
expected.add(new StreamRecord<>(new Tuple3<>("key2-1", 11600L, 14600L), 14599));
expected.add(new Watermark(14600));
// this is dropped as late
// this is sideoutput as late, reuse last timestamp
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 10000));
sideExpected.add(new StreamRecord<>(new Tuple2<>("key2", 1), 10000));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 14500));
testHarness.processWatermark(new Watermark(20000));
......@@ -1674,14 +1723,16 @@ public class WindowOperatorTest extends TestLogger {
expected.add(new Watermark(100000));
ConcurrentLinkedQueue<Object> actual = testHarness.getOutput();
ConcurrentLinkedQueue<Object> sideActual = testHarness.getSideOutput(lateOutputTag);
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expected, actual, new Tuple2ResultSortComparator());
TestHarnessUtil.assertOutputEqualsSorted("SideOutput was not correct.", sideExpected, sideActual, new Tuple2ResultSortComparator());
testHarness.close();
}
@Test
public void testDropDueToLatenessSessionWithLatenessPurgingTrigger() throws Exception {
// this has the same output as testDropDueToLatenessSessionZeroLateness() because
// this has the same output as testSideOutputDueToLatenessSessionZeroLateness() because
// the allowed lateness is too small to make a difference
final int GAP_SIZE = 3;
......@@ -1702,7 +1753,8 @@ public class WindowOperatorTest extends TestLogger {
stateDesc,
new InternalSingleValueWindowFunction<>(new ReducedSessionWindowFunction()),
PurgingTrigger.of(EventTimeTrigger.create()),
LATENESS);
LATENESS,
lateOutputTag);
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
......@@ -1710,7 +1762,7 @@ public class WindowOperatorTest extends TestLogger {
testHarness.open();
ConcurrentLinkedQueue<Object> expected = new ConcurrentLinkedQueue<>();
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1000));
testHarness.processWatermark(new Watermark(1999));
......@@ -1759,13 +1811,14 @@ public class WindowOperatorTest extends TestLogger {
expected.add(new Watermark(100000));
ConcurrentLinkedQueue<Object> actual = testHarness.getOutput();
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expected, actual, new Tuple3ResultSortComparator());
testHarness.close();
}
@Test
public void testDropDueToLatenessSessionWithLateness() throws Exception {
// same as testDropDueToLatenessSessionWithLateness() but with an accumulating trigger, i.e.
public void testNotSideOutputDueToLatenessSessionWithLateness() throws Exception {
// same as testSideOutputDueToLatenessSessionWithLateness() but with an accumulating trigger, i.e.
// one that does not return FIRE_AND_PURGE when firing but just FIRE. The expected
// results are therefore slightly different.
......@@ -1787,7 +1840,8 @@ public class WindowOperatorTest extends TestLogger {
stateDesc,
new InternalSingleValueWindowFunction<>(new ReducedSessionWindowFunction()),
EventTimeTrigger.create(),
LATENESS);
LATENESS,
null /* late data output tag */);
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
......@@ -1806,7 +1860,7 @@ public class WindowOperatorTest extends TestLogger {
expected.add(new Watermark(4998));
// this will not be dropped because the session we're adding two has maxTimestamp
// this will not be sideoutput because the session we're adding two has maxTimestamp
// after the current watermark
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 4500));
......@@ -1832,7 +1886,7 @@ public class WindowOperatorTest extends TestLogger {
// because of the small allowed lateness and because the trigger is accumulating
// this will be merged into the session (11600-14600) and therefore will not
// be dropped as late
// be sideoutput as late
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 10000));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 14500));
......@@ -1843,7 +1897,10 @@ public class WindowOperatorTest extends TestLogger {
expected.add(new StreamRecord<>(new Tuple3<>("key2-2", 10000L, 14600L), 14599));
ConcurrentLinkedQueue<Object> actual = testHarness.getOutput();
ConcurrentLinkedQueue<Object> sideActual = testHarness.getSideOutput(lateOutputTag);
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expected, actual, new Tuple3ResultSortComparator());
assertEquals(null, sideActual);
testHarness.processWatermark(new Watermark(20000));
......@@ -1855,12 +1912,15 @@ public class WindowOperatorTest extends TestLogger {
expected.add(new Watermark(100000));
actual = testHarness.getOutput();
sideActual = testHarness.getSideOutput(lateOutputTag);
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expected, actual, new Tuple3ResultSortComparator());
assertEquals(null, sideActual);
testHarness.close();
}
@Test
public void testDropDueToLatenessSessionWithHugeLatenessPurgingTrigger() throws Exception {
public void testNotSideOutputDueToLatenessSessionWithHugeLatenessPurgingTrigger() throws Exception {
final int GAP_SIZE = 3;
final long LATENESS = 10000;
......@@ -1880,7 +1940,8 @@ public class WindowOperatorTest extends TestLogger {
stateDesc,
new InternalSingleValueWindowFunction<>(new ReducedSessionWindowFunction()),
PurgingTrigger.of(EventTimeTrigger.create()),
LATENESS);
LATENESS,
null /* late data output tag */);
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
......@@ -1899,7 +1960,7 @@ public class WindowOperatorTest extends TestLogger {
expected.add(new Watermark(4998));
// this will not be dropped because the session we're adding two has maxTimestamp
// this will not be sideoutput because the session we're adding two has maxTimestamp
// after the current watermark
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 4500));
......@@ -1928,7 +1989,9 @@ public class WindowOperatorTest extends TestLogger {
expected.add(new StreamRecord<>(new Tuple3<>("key2-1", 1000L, 14600L), 14599));
ConcurrentLinkedQueue<Object> actual = testHarness.getOutput();
ConcurrentLinkedQueue<Object> sideActual = testHarness.getSideOutput(lateOutputTag);
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expected, actual, new Tuple3ResultSortComparator());
assertEquals(null, sideActual);
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 14500));
testHarness.processWatermark(new Watermark(20000));
......@@ -1941,12 +2004,15 @@ public class WindowOperatorTest extends TestLogger {
expected.add(new Watermark(100000));
actual = testHarness.getOutput();
sideActual = testHarness.getSideOutput(lateOutputTag);
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expected, actual, new Tuple3ResultSortComparator());
assertEquals(null, sideActual);
testHarness.close();
}
@Test
public void testDropDueToLatenessSessionWithHugeLateness() throws Exception {
public void testNotSideOutputDueToLatenessSessionWithHugeLateness() throws Exception {
final int GAP_SIZE = 3;
final long LATENESS = 10000;
......@@ -1965,7 +2031,8 @@ public class WindowOperatorTest extends TestLogger {
stateDesc,
new InternalSingleValueWindowFunction<>(new ReducedSessionWindowFunction()),
EventTimeTrigger.create(),
LATENESS);
LATENESS,
null /* late data output tag */);
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
......@@ -1984,7 +2051,7 @@ public class WindowOperatorTest extends TestLogger {
expected.add(new Watermark(4998));
// this will not be dropped because the session we're adding two has maxTimestamp
// this will not be sideoutput because the session we're adding two has maxTimestamp
// after the current watermark
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 4500));
......@@ -2015,7 +2082,9 @@ public class WindowOperatorTest extends TestLogger {
expected.add(new StreamRecord<>(new Tuple3<>("key2-7", 1000L, 14600L), 14599));
ConcurrentLinkedQueue<Object> actual = testHarness.getOutput();
ConcurrentLinkedQueue<Object> sideActual = testHarness.getSideOutput(lateOutputTag);
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expected, actual, new Tuple3ResultSortComparator());
assertEquals(null, sideActual);
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 14500));
testHarness.processWatermark(new Watermark(20000));
......@@ -2027,7 +2096,11 @@ public class WindowOperatorTest extends TestLogger {
expected.add(new Watermark(100000));
actual = testHarness.getOutput();
sideActual = testHarness.getSideOutput(lateOutputTag);
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expected, actual, new Tuple3ResultSortComparator());
assertEquals(null, sideActual);
testHarness.close();
}
......@@ -2050,7 +2123,8 @@ public class WindowOperatorTest extends TestLogger {
windowStateDesc,
new InternalIterableWindowFunction<>(new PassThroughFunction2()),
new EventTimeTriggerAccumGC(LATENESS),
LATENESS);
LATENESS,
null /* late data output tag */);
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, String> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
......@@ -2104,7 +2178,8 @@ public class WindowOperatorTest extends TestLogger {
windowStateDesc,
new InternalIterableWindowFunction<>(new PassThroughFunction()),
EventTimeTrigger.create(),
LATENESS);
LATENESS,
null /* late data output tag */);
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
......@@ -2150,7 +2225,8 @@ public class WindowOperatorTest extends TestLogger {
stateDesc,
new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction<String, TimeWindow, Tuple2<String, Integer>>()),
EventTimeTrigger.create(),
LATENESS);
LATENESS,
null /* late data output tag */);
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
......@@ -2207,7 +2283,8 @@ public class WindowOperatorTest extends TestLogger {
windowStateDesc,
new InternalSingleValueWindowFunction<>(new PassThroughFunction()),
EventTimeTrigger.create(),
LATENESS);
LATENESS,
null /* late data output tag */);
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
......@@ -2252,7 +2329,8 @@ public class WindowOperatorTest extends TestLogger {
windowStateDesc,
new InternalIterableWindowFunction<>(new PassThroughFunction()),
EventTimeTrigger.create(),
LATENESS);
LATENESS,
null /* late data output tag */);
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
......@@ -2296,7 +2374,8 @@ public class WindowOperatorTest extends TestLogger {
stateDesc,
new InternalSingleValueWindowFunction<>(new ReducedSessionWindowFunction()),
EventTimeTrigger.create(),
LATENESS);
LATENESS,
null /* late data output tag */);
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
......@@ -2351,7 +2430,8 @@ public class WindowOperatorTest extends TestLogger {
windowStateDesc,
new InternalSingleValueWindowFunction<>(new PassThroughFunction()),
EventTimeTrigger.create(),
LATENESS);
LATENESS,
null /* late data output tag */);
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
......
......@@ -30,7 +30,7 @@ import org.apache.flink.streaming.api.windowing.evictors.Evictor
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.triggers.Trigger
import org.apache.flink.streaming.api.windowing.windows.Window
import org.apache.flink.util.Collector
import org.apache.flink.util.{Collector, OutputTag}
import org.apache.flink.util.Preconditions.checkNotNull
/**
......@@ -71,6 +71,20 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) {
this
}
/**
* Send late arriving data to the side output identified by the given [[OutputTag]]. Data
* is considered late after the watermark has passed the end of the window plus the allowed
* lateness set using [[allowedLateness(Time)]].
*
* You can get the stream of late data using [[DataStream.getSideOutput()]] on the [[DataStream]]
* resulting from the windowed operation with the same [[OutputTag]].
*/
@PublicEvolving
def sideOutputLateData(outputTag: OutputTag[T]): AllWindowedStream[T, W] = {
javaStream.sideOutputLateData(outputTag)
this
}
/**
* Sets the [[Trigger]] that should be used to trigger window emission.
*/
......
......@@ -19,8 +19,6 @@
package org.apache.flink.streaming.api.scala
import org.apache.flink.annotation.{Public, PublicEvolving}
import org.apache.flink.api.common.functions.{FoldFunction, ReduceFunction}
import org.apache.flink.annotation.{PublicEvolving, Public}
import org.apache.flink.api.common.functions.{AggregateFunction, FoldFunction, ReduceFunction}
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.streaming.api.datastream.{WindowedStream => JavaWStream}
......@@ -32,7 +30,7 @@ import org.apache.flink.streaming.api.windowing.evictors.Evictor
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.triggers.Trigger
import org.apache.flink.streaming.api.windowing.windows.Window
import org.apache.flink.util.Collector
import org.apache.flink.util.{Collector, OutputTag}
/**
* A [[WindowedStream]] represents a data stream where elements are grouped by
......@@ -75,6 +73,20 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) {
this
}
/**
* Send late arriving data to the side output identified by the given [[OutputTag]]. Data
* is considered late after the watermark has passed the end of the window plus the allowed
* lateness set using [[allowedLateness(Time)]].
*
* You can get the stream of late data using [[DataStream.getSideOutput()]] on the [[DataStream]]
* resulting from the windowed operation with the same [[OutputTag]].
*/
@PublicEvolving
def sideOutputLateData(outputTag: OutputTag[T]): WindowedStream[T, K, W] = {
javaStream.sideOutputLateData(outputTag)
this
}
/**
* Sets the [[Trigger]] that should be used to trigger window emission.
*/
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.test.streaming.runtime;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.util.OutputTag;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
import org.apache.flink.test.streaming.runtime.util.TestListResultSink;
import org.apache.flink.util.Collector;
import org.junit.Test;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import static org.junit.Assert.assertEquals;
/**
* Integration test for streaming programs using side outputs.
*/
public class SideOutputITCase extends StreamingMultipleProgramsTestBase {
static List<Integer> elements = new ArrayList<>();
static {
elements.add(1);
elements.add(2);
elements.add(5);
elements.add(3);
elements.add(4);
}
private static class TestWatermarkAssigner implements AssignerWithPunctuatedWatermarks<Integer> {
private static final long serialVersionUID = 1L;
@Nullable
@Override
public Watermark checkAndGetNextWatermark(Integer lastElement, long extractedTimestamp) {
return new Watermark(extractedTimestamp);
}
@Override
public long extractTimestamp(Integer element, long previousElementTimestamp) {
return Long.valueOf(element);
}
}
private static class TestKeySelector implements KeySelector<Integer, Integer> {
private static final long serialVersionUID = 1L;
@Override
public Integer getKey(Integer value) throws Exception {
return value;
}
}
/**
* Test window late arriving events stream
*/
@Test
public void testAllWindowLateArrivingEvents() throws Exception {
TestListResultSink<String> sideOutputResultSink = new TestListResultSink<>();
StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
see.setParallelism(1);
see.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStream<Integer> dataStream = see.fromCollection(elements);
OutputTag<Integer> lateDataTag = new OutputTag<Integer>("late"){};
SingleOutputStreamOperator<Integer> windowOperator = dataStream
.assignTimestampsAndWatermarks(new TestWatermarkAssigner())
.timeWindowAll(Time.milliseconds(1), Time.milliseconds(1))
.sideOutputLateData(lateDataTag)
.apply(new AllWindowFunction<Integer, Integer, TimeWindow>() {
private static final long serialVersionUID = 1L;
@Override
public void apply(TimeWindow window, Iterable<Integer> values, Collector<Integer> out) throws Exception {
for(Integer val : values) {
out.collect(val);
}
}
});
windowOperator
.getSideOutput(lateDataTag)
.flatMap(new FlatMapFunction<Integer, String>() {
private static final long serialVersionUID = 1L;
@Override
public void flatMap(Integer value, Collector<String> out) throws Exception {
out.collect("late-" + String.valueOf(value));
}
})
.addSink(sideOutputResultSink);
see.execute();
assertEquals(sideOutputResultSink.getSortedResult(), Arrays.asList("late-3", "late-4"));
}
@Test
public void testKeyedWindowLateArrivingEvents() throws Exception {
TestListResultSink<String> resultSink = new TestListResultSink<>();
TestListResultSink<Integer> lateResultSink = new TestListResultSink<>();
StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
see.setParallelism(3);
see.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStream<Integer> dataStream = see.fromCollection(elements);
OutputTag<Integer> lateDataTag = new OutputTag<Integer>("late"){};
SingleOutputStreamOperator<String> windowOperator = dataStream
.assignTimestampsAndWatermarks(new TestWatermarkAssigner())
.keyBy(new TestKeySelector())
.timeWindow(Time.milliseconds(1), Time.milliseconds(1))
.allowedLateness(Time.milliseconds(2))
.sideOutputLateData(lateDataTag)
.apply(new WindowFunction<Integer, String, Integer, TimeWindow>() {
private static final long serialVersionUID = 1L;
@Override
public void apply(Integer key, TimeWindow window, Iterable<Integer> input, Collector<String> out) throws Exception {
for(Integer val : input) {
out.collect(String.valueOf(key) + "-" + String.valueOf(val));
}
}
});
windowOperator
.addSink(resultSink);
windowOperator
.getSideOutput(lateDataTag)
.addSink(lateResultSink);
see.execute();
assertEquals(Arrays.asList("1-1", "2-2", "4-4", "5-5"), resultSink.getSortedResult());
assertEquals(Collections.singletonList(3), lateResultSink.getSortedResult());
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册