提交 0b4c04d7 编写于 作者: A Aljoscha Krettek 提交者: Ufuk Celebi

[FLINK-4239] Set Default Allowed Lateness to Zero and Make Triggers Non-Purging

This closes #2278.
上级 f0ac261a
......@@ -515,6 +515,9 @@ and are considered when computing window results. If elements arrive after the a
will be dropped. Flink will also make sure that any state held by the windowing operation is garbage
collected once the watermark passes the end of a window plus the allowed lateness.
<span class="label label-info">Default</span> By default, the allowed lateness is set to
`0`. That is, elements that arrive behind the watermark will be dropped.
You can specify an allowed lateness like this:
<div class="codetabs" markdown="1">
......
......@@ -89,7 +89,7 @@ public class AllWindowedStream<T, W extends Window> {
private Evictor<? super T, ? super W> evictor;
/** The user-specified allowed lateness. */
private long allowedLateness = Long.MAX_VALUE;
private long allowedLateness = 0L;
@PublicEvolving
public AllWindowedStream(DataStream<T> input,
......@@ -113,11 +113,11 @@ public class AllWindowedStream<T, W extends Window> {
}
/**
* Sets the allowed lateness to a user-specified value.
* If not explicitly set, the allowed lateness is {@link Long#MAX_VALUE}.
* Setting the allowed lateness is only valid for event-time windows.
* If a value different than 0 is provided with a processing-time
* {@link WindowAssigner}, then an exception is thrown.
* Sets the time by which elements are allowed to be late. Elements that
* arrive behind the watermark by more than the specified time will be dropped.
* By default, the allowed lateness is {@code 0L}.
*
* <p>Setting an allowed lateness is only valid for event-time windows.
*/
@PublicEvolving
public AllWindowedStream<T, W> allowedLateness(Time lateness) {
......
......@@ -100,7 +100,7 @@ public class WindowedStream<T, K, W extends Window> {
private Evictor<? super T, ? super W> evictor;
/** The user-specified allowed lateness. */
private long allowedLateness = Long.MAX_VALUE;
private long allowedLateness = 0L;
@PublicEvolving
public WindowedStream(KeyedStream<T, K> input,
......@@ -124,11 +124,11 @@ public class WindowedStream<T, K, W extends Window> {
}
/**
* Sets the allowed lateness to a user-specified value.
* If not explicitly set, the allowed lateness is {@link Long#MAX_VALUE}.
* Setting the allowed lateness is only valid for event-time windows.
* If a value different than 0 is provided with a processing-time
* {@link WindowAssigner}, then an exception is thrown.
* Sets the time by which elements are allowed to be late. Elements that
* arrive behind the watermark by more than the specified time will be dropped.
* By default, the allowed lateness is {@code 0L}.
*
* <p>Setting an allowed lateness is only valid for event-time windows.
*/
@PublicEvolving
public WindowedStream<T, K, W> allowedLateness(Time lateness) {
......
......@@ -37,7 +37,7 @@ public class EventTimeTrigger extends Trigger<Object, TimeWindow> {
public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
// if the watermark is already past the window fire immediately
return TriggerResult.FIRE_AND_PURGE;
return TriggerResult.FIRE;
} else {
ctx.registerEventTimeTimer(window.maxTimestamp());
return TriggerResult.CONTINUE;
......@@ -47,7 +47,7 @@ public class EventTimeTrigger extends Trigger<Object, TimeWindow> {
@Override
public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
return time == window.maxTimestamp() ?
TriggerResult.FIRE_AND_PURGE :
TriggerResult.FIRE :
TriggerResult.CONTINUE;
}
......
......@@ -44,7 +44,7 @@ public class ProcessingTimeTrigger extends Trigger<Object, TimeWindow> {
@Override
public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) {
return TriggerResult.FIRE_AND_PURGE;
return TriggerResult.FIRE;
}
@Override
......
......@@ -44,40 +44,19 @@ public class PurgingTrigger<T, W extends Window> extends Trigger<T, W> {
@Override
public TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception {
TriggerResult triggerResult = nestedTrigger.onElement(element, timestamp, window, ctx);
switch (triggerResult) {
case FIRE:
return TriggerResult.FIRE_AND_PURGE;
case FIRE_AND_PURGE:
return TriggerResult.FIRE_AND_PURGE;
default:
return TriggerResult.CONTINUE;
}
return triggerResult.isFire() ? TriggerResult.FIRE_AND_PURGE : triggerResult;
}
@Override
public TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception {
TriggerResult triggerResult = nestedTrigger.onEventTime(time, window, ctx);
switch (triggerResult) {
case FIRE:
return TriggerResult.FIRE_AND_PURGE;
case FIRE_AND_PURGE:
return TriggerResult.FIRE_AND_PURGE;
default:
return TriggerResult.CONTINUE;
}
return triggerResult.isFire() ? TriggerResult.FIRE_AND_PURGE : triggerResult;
}
@Override
public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception {
TriggerResult triggerResult = nestedTrigger.onProcessingTime(time, window, ctx);
switch (triggerResult) {
case FIRE:
return TriggerResult.FIRE_AND_PURGE;
case FIRE_AND_PURGE:
return TriggerResult.FIRE_AND_PURGE;
default:
return TriggerResult.CONTINUE;
}
return triggerResult.isFire() ? TriggerResult.FIRE_AND_PURGE : triggerResult;
}
@Override
......@@ -93,14 +72,7 @@ public class PurgingTrigger<T, W extends Window> extends Trigger<T, W> {
@Override
public TriggerResult onMerge(W window, OnMergeContext ctx) throws Exception {
TriggerResult triggerResult = nestedTrigger.onMerge(window, ctx);
switch (triggerResult) {
case FIRE:
return TriggerResult.FIRE_AND_PURGE;
case FIRE_AND_PURGE:
return TriggerResult.FIRE_AND_PURGE;
default:
return TriggerResult.CONTINUE;
}
return triggerResult.isPurge() ? TriggerResult.FIRE_AND_PURGE : triggerResult;
}
@Override
......
......@@ -51,8 +51,6 @@ import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.api.windowing.windows.Window;
......@@ -1154,7 +1152,7 @@ public class WindowOperatorTest {
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
stateDesc,
new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction<String, TimeWindow, Tuple2<String, Integer>>()),
EventTimeTrigger.create(),
PurgingTrigger.of(EventTimeTrigger.create()),
LATENESS);
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
......@@ -1409,7 +1407,7 @@ public class WindowOperatorTest {
}
@Test
public void testDropDueToLatenessSessionZeroLateness() throws Exception {
public void testDropDueToLatenessSessionZeroLatenessPurgingTrigger() throws Exception {
final int GAP_SIZE = 3;
final long LATENESS = 0;
......@@ -1427,7 +1425,7 @@ public class WindowOperatorTest {
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
stateDesc,
new InternalSingleValueWindowFunction<>(new ReducedSessionWindowFunction()),
EventTimeTrigger.create(),
PurgingTrigger.of(EventTimeTrigger.create()),
LATENESS);
operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
......@@ -1497,7 +1495,7 @@ public class WindowOperatorTest {
}
@Test
public void testDropDueToLatenessSessionZeroLatenessAccum() throws Exception {
public void testDropDueToLatenessSessionZeroLateness() throws Exception {
// same as testDropDueToLatenessSessionZeroLateness() but with an accumulating trigger, i.e.
// one that does not return FIRE_AND_PURGE when firing but just FIRE
......@@ -1521,7 +1519,7 @@ public class WindowOperatorTest {
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
stateDesc,
new InternalSingleValueWindowFunction<>(new ReducedSessionWindowFunction()),
new EventTimeTriggerAccum(),
EventTimeTrigger.create(),
LATENESS);
operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
......@@ -1587,7 +1585,7 @@ public class WindowOperatorTest {
}
@Test
public void testDropDueToLatenessSessionWithLateness() throws Exception {
public void testDropDueToLatenessSessionWithLatenessPurgingTrigger() throws Exception {
// this has the same output as testDropDueToLatenessSessionZeroLateness() because
// the allowed lateness is too small to make a difference
......@@ -1609,7 +1607,7 @@ public class WindowOperatorTest {
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
stateDesc,
new InternalSingleValueWindowFunction<>(new ReducedSessionWindowFunction()),
EventTimeTrigger.create(),
PurgingTrigger.of(EventTimeTrigger.create()),
LATENESS);
operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
......@@ -1675,7 +1673,7 @@ public class WindowOperatorTest {
}
@Test
public void testDropDueToLatenessSessionWithLatenessAccum() throws Exception {
public void testDropDueToLatenessSessionWithLateness() throws Exception {
// same as testDropDueToLatenessSessionWithLateness() but with an accumulating trigger, i.e.
// one that does not return FIRE_AND_PURGE when firing but just FIRE. The expected
// results are therefore slightly different.
......@@ -1697,7 +1695,7 @@ public class WindowOperatorTest {
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
stateDesc,
new InternalSingleValueWindowFunction<>(new ReducedSessionWindowFunction()),
new EventTimeTriggerAccum(),
EventTimeTrigger.create(),
LATENESS);
operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
......@@ -1775,7 +1773,7 @@ public class WindowOperatorTest {
}
@Test
public void testDropDueToLatenessSessionWithHugeLateness() throws Exception {
public void testDropDueToLatenessSessionWithHugeLatenessPurgingTrigger() throws Exception {
final int GAP_SIZE = 3;
final long LATENESS = 10000;
......@@ -1794,7 +1792,7 @@ public class WindowOperatorTest {
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
stateDesc,
new InternalSingleValueWindowFunction<>(new ReducedSessionWindowFunction()),
EventTimeTrigger.create(),
PurgingTrigger.of(EventTimeTrigger.create()),
LATENESS);
operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
......@@ -1865,7 +1863,7 @@ public class WindowOperatorTest {
}
@Test
public void testDropDueToLatenessSessionWithHugeLatenessAccum() throws Exception {
public void testDropDueToLatenessSessionWithHugeLateness() throws Exception {
final int GAP_SIZE = 3;
final long LATENESS = 10000;
......@@ -1883,7 +1881,7 @@ public class WindowOperatorTest {
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
stateDesc,
new InternalSingleValueWindowFunction<>(new ReducedSessionWindowFunction()),
new EventTimeTriggerAccum(),
EventTimeTrigger.create(),
LATENESS);
operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
......@@ -2114,60 +2112,4 @@ public class WindowOperatorTest {
return Collections.singletonList(new TimeWindow(timestamp, timestamp + sessionTimeout));
}
}
/**
* A trigger that fires at the end of the window but does not
* purge the state of the fired window. This is to test the state
* garbage collection mechanism.
*/
public class EventTimeTriggerAccum extends Trigger<Object, TimeWindow> {
private static final long serialVersionUID = 1L;
private EventTimeTriggerAccum() {}
@Override
public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
// if the watermark is already past the window fire immediately
return TriggerResult.FIRE;
} else {
ctx.registerEventTimeTimer(window.maxTimestamp());
return TriggerResult.CONTINUE;
}
}
@Override
public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
return time == window.maxTimestamp() ?
TriggerResult.FIRE :
TriggerResult.CONTINUE;
}
@Override
public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}
@Override
public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
ctx.deleteEventTimeTimer(window.maxTimestamp());
}
@Override
public boolean canMerge() {
return true;
}
@Override
public TriggerResult onMerge(TimeWindow window,
OnMergeContext ctx) {
ctx.registerEventTimeTimer(window.maxTimestamp());
return TriggerResult.CONTINUE;
}
@Override
public String toString() {
return "EventTimeTrigger()";
}
}
}
......@@ -32,6 +32,7 @@ import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
......@@ -114,8 +115,8 @@ public class SessionWindowITCase extends StreamingMultipleProgramsTestBase {
windowedStream = windowedStream.allowedLateness(Time.milliseconds(ALLOWED_LATENESS_MS));
}
if (!PURGE_WINDOW_ON_FIRE) {
windowedStream = windowedStream.trigger(new NonPurgingEventTimeTriggerWrapper());
if (PURGE_WINDOW_ON_FIRE) {
windowedStream = windowedStream.trigger(PurgingTrigger.of(EventTimeTrigger.create()));
}
windowedStream.apply(windowFunction).print();
......@@ -284,60 +285,4 @@ public class SessionWindowITCase extends StreamingMultipleProgramsTestBase {
isRunning = false;
}
}
/**
* Wrapper class that converts purging triggers into non-purging ones
*/
private static final class NonPurgingEventTimeTriggerWrapper
extends Trigger<SessionEvent<Integer, TestEventPayload>, TimeWindow> {
static final long serialVersionUID = 34763482396L;
EventTimeTrigger delegate = EventTimeTrigger.create();
@Override
public TriggerResult onElement(
SessionEvent<Integer, TestEventPayload> element,
long timestamp,
TimeWindow window,
TriggerContext ctx) throws Exception {
return removePurging(delegate.onElement(element, timestamp, window, ctx));
}
@Override
public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
return removePurging(delegate.onProcessingTime(time, window, ctx));
}
@Override
public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
return removePurging(delegate.onEventTime(time, window, ctx));
}
@Override
public boolean canMerge() {
return delegate.canMerge();
}
@Override
public TriggerResult onMerge(TimeWindow window, OnMergeContext ctx) throws Exception {
return removePurging(delegate.onMerge(window, ctx));
}
@Override
public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
delegate.clear(window, ctx);
}
private TriggerResult removePurging(TriggerResult result) {
if (TriggerResult.PURGE == result) {
return TriggerResult.CONTINUE;
} else if (TriggerResult.FIRE_AND_PURGE == result) {
return TriggerResult.FIRE;
} else {
return result;
}
}
}
}
\ No newline at end of file
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册