未验证 提交 9e3bac39 编写于 作者: D Dian Fu 提交者: Dawid Wysakowicz

[FLINK-8226] [cep] Dangling reference generated after NFA clean up timed out SharedBufferEntry

This closes #5141
上级 3fdee00e
...@@ -191,14 +191,12 @@ public class SharedBuffer<K extends Serializable, V> implements Serializable { ...@@ -191,14 +191,12 @@ public class SharedBuffer<K extends Serializable, V> implements Serializable {
*/ */
public boolean prune(long pruningTimestamp) { public boolean prune(long pruningTimestamp) {
Iterator<Map.Entry<K, SharedBufferPage<K, V>>> iter = pages.entrySet().iterator(); Iterator<Map.Entry<K, SharedBufferPage<K, V>>> iter = pages.entrySet().iterator();
boolean pruned = false; List<SharedBufferEntry<K, V>> prunedEntries = new ArrayList<>();
while (iter.hasNext()) { while (iter.hasNext()) {
SharedBufferPage<K, V> page = iter.next().getValue(); SharedBufferPage<K, V> page = iter.next().getValue();
if (page.prune(pruningTimestamp)) { page.prune(pruningTimestamp, prunedEntries);
pruned = true;
}
if (page.isEmpty()) { if (page.isEmpty()) {
// delete page if it is empty // delete page if it is empty
...@@ -206,7 +204,14 @@ public class SharedBuffer<K extends Serializable, V> implements Serializable { ...@@ -206,7 +204,14 @@ public class SharedBuffer<K extends Serializable, V> implements Serializable {
} }
} }
return pruned; if (!prunedEntries.isEmpty()) {
for (Map.Entry<K, SharedBufferPage<K, V>> entry : pages.entrySet()) {
entry.getValue().removeEdges(prunedEntries);
}
return true;
} else {
return false;
}
} }
/** /**
...@@ -451,25 +456,21 @@ public class SharedBuffer<K extends Serializable, V> implements Serializable { ...@@ -451,25 +456,21 @@ public class SharedBuffer<K extends Serializable, V> implements Serializable {
* Removes all entries from the map whose timestamp is smaller than the pruning timestamp. * Removes all entries from the map whose timestamp is smaller than the pruning timestamp.
* *
* @param pruningTimestamp Timestamp for the pruning * @param pruningTimestamp Timestamp for the pruning
* @return {@code true} if pruning happened
*/ */
public boolean prune(long pruningTimestamp) { public void prune(long pruningTimestamp, List<SharedBufferEntry<K, V>> prunedEntries) {
Iterator<Map.Entry<ValueTimeWrapper<V>, SharedBufferEntry<K, V>>> iterator = entries.entrySet().iterator(); Iterator<Map.Entry<ValueTimeWrapper<V>, SharedBufferEntry<K, V>>> iterator = entries.entrySet().iterator();
boolean continuePruning = true; boolean continuePruning = true;
boolean pruned = false;
while (iterator.hasNext() && continuePruning) { while (iterator.hasNext() && continuePruning) {
SharedBufferEntry<K, V> entry = iterator.next().getValue(); SharedBufferEntry<K, V> entry = iterator.next().getValue();
if (entry.getValueTime().getTimestamp() <= pruningTimestamp) { if (entry.getValueTime().getTimestamp() <= pruningTimestamp) {
prunedEntries.add(entry);
iterator.remove(); iterator.remove();
pruned = true;
} else { } else {
continuePruning = false; continuePruning = false;
} }
} }
return pruned;
} }
public boolean isEmpty() { public boolean isEmpty() {
...@@ -480,6 +481,15 @@ public class SharedBuffer<K extends Serializable, V> implements Serializable { ...@@ -480,6 +481,15 @@ public class SharedBuffer<K extends Serializable, V> implements Serializable {
return entries.remove(valueTime); return entries.remove(valueTime);
} }
/**
* Remove edges with the specified targets for the entries.
*/
private void removeEdges(final List<SharedBufferEntry<K, V>> prunedEntries) {
for (Map.Entry<ValueTimeWrapper<V>, SharedBufferEntry<K, V>> entry : entries.entrySet()) {
entry.getValue().removeEdges(prunedEntries);
}
}
@Override @Override
public String toString() { public String toString() {
StringBuilder builder = new StringBuilder(); StringBuilder builder = new StringBuilder();
...@@ -569,6 +579,22 @@ public class SharedBuffer<K extends Serializable, V> implements Serializable { ...@@ -569,6 +579,22 @@ public class SharedBuffer<K extends Serializable, V> implements Serializable {
edges.add(edge); edges.add(edge);
} }
/**
* Remove edges with the specified targets.
*/
private void removeEdges(final List<SharedBufferEntry<K, V>> prunedEntries) {
Iterator<SharedBufferEdge<K, V>> itor = edges.iterator();
while (itor.hasNext()) {
SharedBufferEdge<K, V> edge = itor.next();
for (SharedBufferEntry<K, V> prunedEntry : prunedEntries) {
if (prunedEntry == edge.getTarget()) {
itor.remove();
break;
}
}
}
}
public boolean remove() { public boolean remove() {
if (page != null) { if (page != null) {
page.remove(valueTime); page.remove(valueTime);
......
...@@ -26,6 +26,7 @@ import org.apache.flink.cep.pattern.conditions.IterativeCondition; ...@@ -26,6 +26,7 @@ import org.apache.flink.cep.pattern.conditions.IterativeCondition;
import org.apache.flink.cep.pattern.conditions.SimpleCondition; import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.core.memory.DataInputViewStreamWrapper; import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper; import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.TestLogger; import org.apache.flink.util.TestLogger;
...@@ -176,6 +177,26 @@ public class NFATest extends TestLogger { ...@@ -176,6 +177,26 @@ public class NFATest extends TestLogger {
assertEquals(expectedPatterns, actualPatterns); assertEquals(expectedPatterns, actualPatterns);
} }
@Test
public void testTimeoutWindowPruning2() throws IOException {
NFA<Event> nfa = createLoopingNFA(2);
List<StreamRecord<Event>> streamEvents = new ArrayList<>();
streamEvents.add(new StreamRecord<>(new Event(1, "loop", 1.0), 101L));
streamEvents.add(new StreamRecord<>(new Event(2, "loop", 2.0), 102L));
streamEvents.add(new StreamRecord<>(new Event(3, "loop", 3.0), 103L));
streamEvents.add(new StreamRecord<>(new Event(4, "loop", 4.0), 104L));
streamEvents.add(new StreamRecord<>(new Event(5, "loop", 5.0), 105L));
runNFA(nfa, streamEvents);
NFA.NFASerializer<Event> serializer = new NFA.NFASerializer<>(Event.createTypeSerializer());
//serialize
ByteArrayOutputStream baos = new ByteArrayOutputStream();
serializer.serialize(nfa, new DataOutputViewStreamWrapper(baos));
baos.close();
}
public <T> Collection<Map<String, List<T>>> runNFA(NFA<T> nfa, List<StreamRecord<T>> inputs) { public <T> Collection<Map<String, List<T>>> runNFA(NFA<T> nfa, List<StreamRecord<T>> inputs) {
Set<Map<String, List<T>>> actualPatterns = new HashSet<>(); Set<Map<String, List<T>>> actualPatterns = new HashSet<>();
...@@ -358,4 +379,17 @@ public class NFATest extends TestLogger { ...@@ -358,4 +379,17 @@ public class NFATest extends TestLogger {
return nfa; return nfa;
} }
private NFA<Event> createLoopingNFA(long windowLength) {
Pattern<Event, ?> pattern = Pattern.<Event>begin("loop").where(new SimpleCondition<Event>() {
private static final long serialVersionUID = 5726188262756267490L;
@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("loop");
}
}).timesOrMore(3).within(Time.milliseconds(windowLength));
return NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
}
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册