未验证 提交 c4acbb83 编写于 作者: D Dian Fu 提交者: Dawid Wysakowicz

[FLINK-8227] Optimize the performance of SharedBufferSerializer

This closes #5142
上级 91f00ec9
......@@ -527,6 +527,7 @@ public class SharedBuffer<K extends Serializable, V> implements Serializable {
private final Set<SharedBufferEdge<K, V>> edges;
private final SharedBufferPage<K, V> page;
private int referenceCounter;
private transient int entryId;
SharedBufferEntry(
final ValueTimeWrapper<V> valueTime,
......@@ -547,6 +548,8 @@ public class SharedBuffer<K extends Serializable, V> implements Serializable {
referenceCounter = 0;
entryId = -1;
this.page = page;
}
......@@ -886,7 +889,6 @@ public class SharedBuffer<K extends Serializable, V> implements Serializable {
@Override
public void serialize(SharedBuffer<K, V> record, DataOutputView target) throws IOException {
Map<K, SharedBufferPage<K, V>> pages = record.pages;
Map<SharedBufferEntry<K, V>, Integer> entryIDs = new HashMap<>();
int totalEdges = 0;
int entryCounter = 0;
......@@ -908,7 +910,7 @@ public class SharedBuffer<K extends Serializable, V> implements Serializable {
// assign id to the sharedBufferEntry for the future
// serialization of the previous relation
entryIDs.put(sharedBuffer, entryCounter++);
sharedBuffer.entryId = entryCounter++;
ValueTimeWrapper<V> valueTimeWrapper = sharedBuffer.getValueTime();
......@@ -932,15 +934,15 @@ public class SharedBuffer<K extends Serializable, V> implements Serializable {
for (Map.Entry<ValueTimeWrapper<V>, SharedBufferEntry<K, V>> sharedBufferEntry: page.entries.entrySet()) {
SharedBufferEntry<K, V> sharedBuffer = sharedBufferEntry.getValue();
Integer id = entryIDs.get(sharedBuffer);
Preconditions.checkState(id != null, "Could not find id for entry: " + sharedBuffer);
int id = sharedBuffer.entryId;
Preconditions.checkState(id != -1, "Could not find id for entry: " + sharedBuffer);
for (SharedBufferEdge<K, V> edge: sharedBuffer.edges) {
// in order to serialize the previous relation we simply serialize the ids
// of the source and target SharedBufferEntry
if (edge.target != null) {
Integer targetId = entryIDs.get(edge.getTarget());
Preconditions.checkState(targetId != null,
int targetId = edge.getTarget().entryId;
Preconditions.checkState(targetId != -1,
"Could not find id for entry: " + edge.getTarget());
target.writeInt(id);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册