提交 783aa4a8 编写于 作者: J Jonas Traub (powibol) 提交者: mbalassi

[streaming] Changed GroupedWindowInvokable to always delete groups in case...

[streaming] Changed GroupedWindowInvokable to always delete groups in case they have an empty element buffer
上级 4e046a9b
......@@ -18,6 +18,7 @@
package org.apache.flink.streaming.api.invokable.operator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.Map;
import java.util.NoSuchElementException;
......@@ -252,6 +253,7 @@ public class GroupedWindowInvokable<IN, OUT> extends StreamInvokable<IN, OUT> {
// process in groups
for (WindowInvokable<IN, OUT> group : windowingGroups.values()) {
group.processFakeElement(in, trigger);
checkForEmptyGroupBuffer(group);
}
}
}
......@@ -267,6 +269,7 @@ public class GroupedWindowInvokable<IN, OUT> extends StreamInvokable<IN, OUT> {
// only add the element to its group
groupInvokable.processRealElement(reuse.getObject());
checkForEmptyGroupBuffer(groupInvokable);
// If central eviction is used, handle it here
if (!centralEvictionPolicies.isEmpty()) {
......@@ -286,6 +289,9 @@ public class GroupedWindowInvokable<IN, OUT> extends StreamInvokable<IN, OUT> {
// policies
group.externalTriggerFakeElement(reuse.getObject(), currentTriggerPolicies);
}
//remove group in case it has an empty buffer
//checkForEmptyGroupBuffer(group);
}
// If central eviction is used, handle it here
......@@ -450,8 +456,13 @@ public class GroupedWindowInvokable<IN, OUT> extends StreamInvokable<IN, OUT> {
* buffer.
*/
private void evictElements(int numToEvict) {
HashSet<WindowInvokable<IN, OUT>> usedGroups=new HashSet<WindowInvokable<IN,OUT>>();
for (; numToEvict > 0; numToEvict--) {
deleteOrderForCentralEviction.getFirst().evictFirst();
WindowInvokable<IN, OUT> currentGroup=deleteOrderForCentralEviction.getFirst();
//Do the eviction
currentGroup.evictFirst();
//Remember groups which possibly have an empty buffer after the eviction
usedGroups.add(currentGroup);
try {
deleteOrderForCentralEviction.removeFirst();
} catch (NoSuchElementException e) {
......@@ -460,6 +471,25 @@ public class GroupedWindowInvokable<IN, OUT> extends StreamInvokable<IN, OUT> {
}
}
//Remove groups with empty buffer
for (WindowInvokable<IN, OUT> group:usedGroups){
checkForEmptyGroupBuffer(group);
}
}
/**
* Checks if the element buffer of a given windowing group is empty. If so,
* the group will be deleted.
*
* @param group
* The windowing group to be checked and and removed in case its
* buffer is empty.
*/
private void checkForEmptyGroupBuffer(WindowInvokable<IN, OUT> group) {
if (group.isBufferEmpty()) {
windowingGroups.remove(group);
}
}
/**
......@@ -486,6 +516,7 @@ public class GroupedWindowInvokable<IN, OUT> extends StreamInvokable<IN, OUT> {
// handle element in groups
for (WindowInvokable<IN, OUT> group : windowingGroups.values()) {
group.processFakeElement(datapoint, policy);
checkForEmptyGroupBuffer(group);
}
}
......
......@@ -356,6 +356,17 @@ public abstract class WindowInvokable<IN, OUT> extends StreamInvokable<IN, OUT>
}
}
/**
* This method returns whether the element buffer is empty or not. It is
* used to figure out if a group can be deleted or not when
* {@link GroupedWindowInvokable} is used.
*
* @return true in case the buffer is empty otherwise false.
*/
protected boolean isBufferEmpty(){
return buffer.isEmpty();
}
/**
* This method does the final reduce at the end of the stream and emits the
* result.
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册