diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java index 0cf47ca1fcf692025257d75aa67f62f1534ba98f..29e8dc2be3ff99b267107702adc69fc627511a53 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java @@ -527,6 +527,7 @@ public class SharedBuffer implements Serializable { private final Set> edges; private final SharedBufferPage page; private int referenceCounter; + private transient int entryId; SharedBufferEntry( final ValueTimeWrapper valueTime, @@ -547,6 +548,8 @@ public class SharedBuffer implements Serializable { referenceCounter = 0; + entryId = -1; + this.page = page; } @@ -886,7 +889,6 @@ public class SharedBuffer implements Serializable { @Override public void serialize(SharedBuffer record, DataOutputView target) throws IOException { Map> pages = record.pages; - Map, Integer> entryIDs = new HashMap<>(); int totalEdges = 0; int entryCounter = 0; @@ -908,7 +910,7 @@ public class SharedBuffer implements Serializable { // assign id to the sharedBufferEntry for the future // serialization of the previous relation - entryIDs.put(sharedBuffer, entryCounter++); + sharedBuffer.entryId = entryCounter++; ValueTimeWrapper valueTimeWrapper = sharedBuffer.getValueTime(); @@ -932,15 +934,15 @@ public class SharedBuffer implements Serializable { for (Map.Entry, SharedBufferEntry> sharedBufferEntry: page.entries.entrySet()) { SharedBufferEntry sharedBuffer = sharedBufferEntry.getValue(); - Integer id = entryIDs.get(sharedBuffer); - Preconditions.checkState(id != null, "Could not find id for entry: " + sharedBuffer); + int id = sharedBuffer.entryId; + Preconditions.checkState(id != -1, "Could not find id for entry: " + sharedBuffer); for (SharedBufferEdge edge: sharedBuffer.edges) { // in order to serialize the previous relation we simply serialize the ids // of the source and target SharedBufferEntry if (edge.target != null) { - Integer targetId = entryIDs.get(edge.getTarget()); - Preconditions.checkState(targetId != null, + int targetId = edge.getTarget().entryId; + Preconditions.checkState(targetId != -1, "Could not find id for entry: " + edge.getTarget()); target.writeInt(id);