提交 456d0aba 编写于 作者: A Aljoscha Krettek 提交者: Stephan Ewen

[FLINK-3200] Fix Triggers by introducing clear() method to clean up state/triggers

上级 e4d05f72
...@@ -45,7 +45,7 @@ public class SessionWindowing { ...@@ -45,7 +45,7 @@ public class SessionWindowing {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(2); env.setParallelism(1);
final List<Tuple3<String, Long, Integer>> input = new ArrayList<>(); final List<Tuple3<String, Long, Integer>> input = new ArrayList<>();
...@@ -103,7 +103,7 @@ public class SessionWindowing { ...@@ -103,7 +103,7 @@ public class SessionWindowing {
private final Long sessionTimeout; private final Long sessionTimeout;
private final ValueStateDescriptor<Long> stateDesc = new ValueStateDescriptor<>("last-seen", 1L, private final ValueStateDescriptor<Long> stateDesc = new ValueStateDescriptor<>("last-seen", -1L,
BasicTypeInfo.LONG_TYPE_INFO.createSerializer(new ExecutionConfig())); BasicTypeInfo.LONG_TYPE_INFO.createSerializer(new ExecutionConfig()));
...@@ -120,12 +120,15 @@ public class SessionWindowing { ...@@ -120,12 +120,15 @@ public class SessionWindowing {
Long timeSinceLastEvent = timestamp - lastSeen; Long timeSinceLastEvent = timestamp - lastSeen;
ctx.deleteEventTimeTimer(lastSeen + sessionTimeout);
// Update the last seen event time // Update the last seen event time
lastSeenState.update(timestamp); lastSeenState.update(timestamp);
ctx.registerEventTimeTimer(timestamp + sessionTimeout); ctx.registerEventTimeTimer(timestamp + sessionTimeout);
if (timeSinceLastEvent > sessionTimeout) { if (lastSeen != -1 && timeSinceLastEvent > sessionTimeout) {
System.out.println("FIRING ON ELEMENT: " + element + " ts: " + timestamp + " last " + lastSeen);
return TriggerResult.FIRE_AND_PURGE; return TriggerResult.FIRE_AND_PURGE;
} else { } else {
return TriggerResult.CONTINUE; return TriggerResult.CONTINUE;
...@@ -138,6 +141,7 @@ public class SessionWindowing { ...@@ -138,6 +141,7 @@ public class SessionWindowing {
Long lastSeen = lastSeenState.value(); Long lastSeen = lastSeenState.value();
if (time - lastSeen >= sessionTimeout) { if (time - lastSeen >= sessionTimeout) {
System.out.println("CTX: " + ctx + " Firing Time " + time + " last seen " + lastSeen);
return TriggerResult.FIRE_AND_PURGE; return TriggerResult.FIRE_AND_PURGE;
} }
return TriggerResult.CONTINUE; return TriggerResult.CONTINUE;
...@@ -147,6 +151,15 @@ public class SessionWindowing { ...@@ -147,6 +151,15 @@ public class SessionWindowing {
public TriggerResult onProcessingTime(long time, GlobalWindow window, TriggerContext ctx) throws Exception { public TriggerResult onProcessingTime(long time, GlobalWindow window, TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE; return TriggerResult.CONTINUE;
} }
@Override
public void clear(GlobalWindow window, TriggerContext ctx) throws Exception {
ValueState<Long> lastSeenState = ctx.getPartitionedState(stateDesc);
if (lastSeenState.value() != -1) {
ctx.deleteEventTimeTimer(lastSeenState.value() + sessionTimeout);
}
lastSeenState.clear();
}
} }
// ************************************************************************* // *************************************************************************
......
...@@ -87,6 +87,9 @@ public class GlobalWindows extends WindowAssigner<Object, GlobalWindow> { ...@@ -87,6 +87,9 @@ public class GlobalWindows extends WindowAssigner<Object, GlobalWindow> {
public TriggerResult onProcessingTime(long time, GlobalWindow window, TriggerContext ctx) { public TriggerResult onProcessingTime(long time, GlobalWindow window, TriggerContext ctx) {
return TriggerResult.CONTINUE; return TriggerResult.CONTINUE;
} }
@Override
public void clear(GlobalWindow window, TriggerContext ctx) throws Exception {}
} }
@Override @Override
......
...@@ -73,6 +73,9 @@ public class ContinuousEventTimeTrigger<W extends Window> implements Trigger<Obj ...@@ -73,6 +73,9 @@ public class ContinuousEventTimeTrigger<W extends Window> implements Trigger<Obj
return TriggerResult.CONTINUE; return TriggerResult.CONTINUE;
} }
@Override
public void clear(W window, TriggerContext ctx) throws Exception {}
@Override @Override
public String toString() { public String toString() {
return "ContinuousProcessingTimeTrigger(" + interval + ")"; return "ContinuousProcessingTimeTrigger(" + interval + ")";
......
...@@ -90,6 +90,9 @@ public class ContinuousProcessingTimeTrigger<W extends Window> implements Trigge ...@@ -90,6 +90,9 @@ public class ContinuousProcessingTimeTrigger<W extends Window> implements Trigge
return TriggerResult.CONTINUE; return TriggerResult.CONTINUE;
} }
@Override
public void clear(W window, TriggerContext ctx) throws Exception {}
@VisibleForTesting @VisibleForTesting
public long getInterval() { public long getInterval() {
return interval; return interval;
......
...@@ -65,6 +65,11 @@ public class CountTrigger<W extends Window> implements Trigger<Object, W> { ...@@ -65,6 +65,11 @@ public class CountTrigger<W extends Window> implements Trigger<Object, W> {
return TriggerResult.CONTINUE; return TriggerResult.CONTINUE;
} }
@Override
public void clear(W window, TriggerContext ctx) throws Exception {
ctx.getPartitionedState(stateDesc).clear();
}
@Override @Override
public String toString() { public String toString() {
return "CountTrigger(" + maxCount + ")"; return "CountTrigger(" + maxCount + ")";
......
...@@ -71,6 +71,11 @@ public class DeltaTrigger<T, W extends Window> implements Trigger<T, W> { ...@@ -71,6 +71,11 @@ public class DeltaTrigger<T, W extends Window> implements Trigger<T, W> {
return TriggerResult.CONTINUE; return TriggerResult.CONTINUE;
} }
@Override
public void clear(W window, TriggerContext ctx) throws Exception {
ctx.getPartitionedState(stateDesc).clear();
}
@Override @Override
public String toString() { public String toString() {
return "DeltaTrigger(" + deltaFunction + ", " + threshold + ")"; return "DeltaTrigger(" + deltaFunction + ", " + threshold + ")";
......
...@@ -46,6 +46,11 @@ public class EventTimeTrigger implements Trigger<Object, TimeWindow> { ...@@ -46,6 +46,11 @@ public class EventTimeTrigger implements Trigger<Object, TimeWindow> {
return TriggerResult.CONTINUE; return TriggerResult.CONTINUE;
} }
@Override
public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
ctx.deleteEventTimeTimer(window.maxTimestamp());
}
@Override @Override
public String toString() { public String toString() {
return "EventTimeTrigger()"; return "EventTimeTrigger()";
......
...@@ -44,6 +44,11 @@ public class ProcessingTimeTrigger implements Trigger<Object, TimeWindow> { ...@@ -44,6 +44,11 @@ public class ProcessingTimeTrigger implements Trigger<Object, TimeWindow> {
return TriggerResult.FIRE_AND_PURGE; return TriggerResult.FIRE_AND_PURGE;
} }
@Override
public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
ctx.deleteProcessingTimeTimer(window.maxTimestamp());
}
@Override @Override
public String toString() { public String toString() {
return "ProcessingTimeTrigger()"; return "ProcessingTimeTrigger()";
......
...@@ -78,6 +78,11 @@ public class PurgingTrigger<T, W extends Window> implements Trigger<T, W> { ...@@ -78,6 +78,11 @@ public class PurgingTrigger<T, W extends Window> implements Trigger<T, W> {
} }
} }
@Override
public void clear(W window, TriggerContext ctx) throws Exception {
nestedTrigger.clear(window, ctx);
}
@Override @Override
public String toString() { public String toString() {
return "PurgingTrigger(" + nestedTrigger.toString() + ")"; return "PurgingTrigger(" + nestedTrigger.toString() + ")";
......
...@@ -73,6 +73,13 @@ public interface Trigger<T, W extends Window> extends Serializable { ...@@ -73,6 +73,13 @@ public interface Trigger<T, W extends Window> extends Serializable {
*/ */
TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception; TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception;
/**
* Clears any state that the trigger might still hold for the given window. This is called
* when a window is purged. Timers set using {@link TriggerContext#registerEventTimeTimer(long)}
* and {@link TriggerContext#registerProcessingTimeTimer(long)} should be deleted here as
* well as state acquired using {@link TriggerContext#getPartitionedState(StateDescriptor)}.
*/
void clear(W window, TriggerContext ctx) throws Exception;
/** /**
* Result type for trigger methods. This determines what happens with the window. * Result type for trigger methods. This determines what happens with the window.
...@@ -150,6 +157,16 @@ public interface Trigger<T, W extends Window> extends Serializable { ...@@ -150,6 +157,16 @@ public interface Trigger<T, W extends Window> extends Serializable {
*/ */
void registerEventTimeTimer(long time); void registerEventTimeTimer(long time);
/**
* Delete the processing time trigger for the given time.
*/
void deleteProcessingTimeTimer(long time);
/**
* Delete the event-time trigger for the given time.
*/
void deleteEventTimeTimer(long time);
/** /**
* Retrieves an {@link State} object that can be used to interact with * Retrieves an {@link State} object that can be used to interact with
* fault-tolerant state that is scoped to the window and key of the current * fault-tolerant state that is scoped to the window and key of the current
......
...@@ -284,6 +284,10 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window> ...@@ -284,6 +284,10 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window>
if (triggerResult.isFire()) { if (triggerResult.isFire()) {
emitWindow(context); emitWindow(context);
} }
if (triggerResult.isPurge()) {
context.clear();
}
} }
@Override @Override
...@@ -516,6 +520,23 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window> ...@@ -516,6 +520,23 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window>
triggers.add(this); triggers.add(this);
} }
@Override
public void deleteProcessingTimeTimer(long time) {
Set<Context> triggers = processingTimeTimers.get(time);
if (triggers != null) {
triggers.remove(this);
}
}
@Override
public void deleteEventTimeTimer(long time) {
Set<Context> triggers = watermarkTimers.get(time);
if (triggers != null) {
triggers.remove(this);
}
}
public Trigger.TriggerResult onElement(StreamRecord<IN> element) throws Exception { public Trigger.TriggerResult onElement(StreamRecord<IN> element) throws Exception {
Trigger.TriggerResult onElementResult = trigger.onElement(element.getValue(), element.getTimestamp(), window, this); Trigger.TriggerResult onElementResult = trigger.onElement(element.getValue(), element.getTimestamp(), window, this);
if (watermarkTimer > 0 && watermarkTimer <= currentWatermark) { if (watermarkTimer > 0 && watermarkTimer <= currentWatermark) {
...@@ -553,6 +574,10 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window> ...@@ -553,6 +574,10 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window>
return Trigger.TriggerResult.CONTINUE; return Trigger.TriggerResult.CONTINUE;
} }
} }
public void clear() throws Exception {
trigger.clear(window, this);
}
} }
/** /**
......
/** /**
* Licensed to the Apache Software Foundation (ASF) under one * Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file * or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information * distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file * regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the * to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance * "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at * with the License. You may obtain a copy of the License at
* *
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* *
* Unless required by applicable law or agreed to in writing, software * Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, * distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.flink.streaming.runtime.operators.windowing; package org.apache.flink.streaming.runtime.operators.windowing;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
...@@ -85,8 +85,8 @@ import static java.util.Objects.requireNonNull; ...@@ -85,8 +85,8 @@ import static java.util.Objects.requireNonNull;
* @param <W> The type of {@code Window} that the {@code WindowAssigner} assigns. * @param <W> The type of {@code Window} that the {@code WindowAssigner} assigns.
*/ */
public class WindowOperator<K, IN, ACC, OUT, W extends Window> public class WindowOperator<K, IN, ACC, OUT, W extends Window>
extends AbstractUdfStreamOperator<OUT, WindowFunction<ACC, OUT, K, W>> extends AbstractUdfStreamOperator<OUT, WindowFunction<ACC, OUT, K, W>>
implements OneInputStreamOperator<IN, OUT>, Triggerable, InputTypeConfigurable { implements OneInputStreamOperator<IN, OUT>, Triggerable, InputTypeConfigurable {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
...@@ -164,12 +164,12 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window> ...@@ -164,12 +164,12 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
* Creates a new {@code WindowOperator} based on the given policies and user functions. * Creates a new {@code WindowOperator} based on the given policies and user functions.
*/ */
public WindowOperator(WindowAssigner<? super IN, W> windowAssigner, public WindowOperator(WindowAssigner<? super IN, W> windowAssigner,
TypeSerializer<W> windowSerializer, TypeSerializer<W> windowSerializer,
KeySelector<IN, K> keySelector, KeySelector<IN, K> keySelector,
TypeSerializer<K> keySerializer, TypeSerializer<K> keySerializer,
StateDescriptor<? extends MergingState<IN, ACC>> windowStateDescriptor, StateDescriptor<? extends MergingState<IN, ACC>> windowStateDescriptor,
WindowFunction<ACC, OUT, K, W> windowFunction, WindowFunction<ACC, OUT, K, W> windowFunction,
Trigger<? super IN, ? super W> trigger) { Trigger<? super IN, ? super W> trigger) {
super(windowFunction); super(windowFunction);
...@@ -258,8 +258,6 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window> ...@@ -258,8 +258,6 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
if (triggerResult.isFire()) { if (triggerResult.isFire()) {
timestampedCollector.setTimestamp(window.maxTimestamp()); timestampedCollector.setTimestamp(window.maxTimestamp());
setKeyContext(key);
MergingState<IN, ACC> windowState = getPartitionedState(window, windowSerializer, MergingState<IN, ACC> windowState = getPartitionedState(window, windowSerializer,
windowStateDescriptor); windowStateDescriptor);
...@@ -269,12 +267,13 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window> ...@@ -269,12 +267,13 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
if (triggerResult.isPurge()) { if (triggerResult.isPurge()) {
windowState.clear(); windowState.clear();
context.clear();
} }
} else if (triggerResult.isPurge()) { } else if (triggerResult.isPurge()) {
setKeyContext(key);
MergingState<IN, ACC> windowState = getPartitionedState(window, windowSerializer, MergingState<IN, ACC> windowState = getPartitionedState(window, windowSerializer,
windowStateDescriptor); windowStateDescriptor);
windowState.clear(); windowState.clear();
context.clear();
} }
} }
...@@ -293,7 +292,8 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window> ...@@ -293,7 +292,8 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
context.key = timer.key; context.key = timer.key;
context.window = timer.window; context.window = timer.window;
Trigger.TriggerResult triggerResult = context.onEventTime(mark.getTimestamp()); setKeyContext(timer.key);
Trigger.TriggerResult triggerResult = context.onEventTime(timer.timestamp);
processTriggerResult(triggerResult, context.key, context.window); processTriggerResult(triggerResult, context.key, context.window);
} else { } else {
fire = false; fire = false;
...@@ -319,7 +319,8 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window> ...@@ -319,7 +319,8 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
context.key = timer.key; context.key = timer.key;
context.window = timer.window; context.window = timer.window;
Trigger.TriggerResult triggerResult = context.onProcessingTime(time); setKeyContext(timer.key);
Trigger.TriggerResult triggerResult = context.onProcessingTime(timer.timestamp);
processTriggerResult(triggerResult, context.key, context.window); processTriggerResult(triggerResult, context.key, context.window);
} else { } else {
fire = false; fire = false;
...@@ -410,6 +411,23 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window> ...@@ -410,6 +411,23 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
} }
} }
@Override
public void deleteProcessingTimeTimer(long time) {
Timer<K, W> timer = new Timer<>(time, key, window);
if (processingTimeTimers.remove(timer)) {
processingTimeTimersQueue.remove(timer);
}
}
@Override
public void deleteEventTimeTimer(long time) {
Timer<K, W> timer = new Timer<>(time, key, window);
if (watermarkTimers.remove(timer)) {
watermarkTimersQueue.remove(timer);
}
}
public Trigger.TriggerResult onElement(StreamRecord<IN> element) throws Exception { public Trigger.TriggerResult onElement(StreamRecord<IN> element) throws Exception {
return trigger.onElement(element.getValue(), element.getTimestamp(), window, this); return trigger.onElement(element.getValue(), element.getTimestamp(), window, this);
} }
...@@ -421,6 +439,18 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window> ...@@ -421,6 +439,18 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
public Trigger.TriggerResult onEventTime(long time) throws Exception { public Trigger.TriggerResult onEventTime(long time) throws Exception {
return trigger.onEventTime(time, window, this); return trigger.onEventTime(time, window, this);
} }
public void clear() throws Exception {
trigger.clear(window, this);
}
@Override
public String toString() {
return "Context{" +
"key=" + key +
", window=" + window +
'}';
}
} }
/** /**
...@@ -454,8 +484,8 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window> ...@@ -454,8 +484,8 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
Timer<?, ?> timer = (Timer<?, ?>) o; Timer<?, ?> timer = (Timer<?, ?>) o;
return timestamp == timer.timestamp return timestamp == timer.timestamp
&& key.equals(timer.key) && key.equals(timer.key)
&& window.equals(timer.window); && window.equals(timer.window);
} }
...@@ -470,10 +500,10 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window> ...@@ -470,10 +500,10 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
@Override @Override
public String toString() { public String toString() {
return "Timer{" + return "Timer{" +
"timestamp=" + timestamp + "timestamp=" + timestamp +
", key=" + key + ", key=" + key +
", window=" + window + ", window=" + window +
'}'; '}';
} }
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册