提交 1e20d614 编写于 作者: A Arvid Heise 提交者: Piotr Nowojski

[FLINK-17315][checkpointing] Only adding buffers to inflight data of PipelinedSubpartition.

上级 0d08d230
......@@ -174,7 +174,9 @@ public class PipelinedSubpartition extends ResultSubpartition {
// Meanwhile prepare the collection of in-flight buffers which would be fetched in the next step later.
for (BufferConsumer buffer : buffers) {
try (BufferConsumer bc = buffer.copy()) {
inflightBufferSnapshot.add(bc.build());
if (bc.isBuffer()) {
inflightBufferSnapshot.add(bc.build());
}
}
}
......
......@@ -24,6 +24,7 @@ import org.apache.flink.runtime.checkpoint.CheckpointType;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.disk.NoOpFileChannelManager;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.api.EndOfSuperstepEvent;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
......@@ -319,7 +320,12 @@ public class PipelinedSubpartitionWithReadViewTest {
assertEquals(1, availablityListener.getNumNotifications());
assertEquals(0, availablityListener.getNumPriorityEvents());
subpartition.add(createFilledFinishedBufferConsumer(3));
BufferConsumer eventBuffer = EventSerializer.toBufferConsumer(EndOfSuperstepEvent.INSTANCE);
subpartition.add(eventBuffer);
assertEquals(1, availablityListener.getNumNotifications());
assertEquals(0, availablityListener.getNumPriorityEvents());
subpartition.add(createFilledFinishedBufferConsumer(4));
assertEquals(1, availablityListener.getNumNotifications());
assertEquals(0, availablityListener.getNumPriorityEvents());
......@@ -334,13 +340,14 @@ public class PipelinedSubpartitionWithReadViewTest {
assertEquals(0, availablityListener.getNumPriorityEvents());
List<Buffer> inflight = subpartition.requestInflightBufferSnapshot();
assertEquals(Arrays.asList(1, 2, 3), inflight.stream().map(Buffer::getSize).collect(Collectors.toList()));
assertEquals(Arrays.asList(1, 2, 4), inflight.stream().map(Buffer::getSize).collect(Collectors.toList()));
inflight.forEach(Buffer::recycleBuffer);
assertNextEvent(readView, barrierBuffer.getWrittenBytes(), CheckpointBarrier.class, true, 2, false, true);
assertNextBuffer(readView, 1, true, 1, false, true);
assertNextBuffer(readView, 2, false, 0, false, true);
assertNextBuffer(readView, 3, false, 0, false, true);
assertNextBuffer(readView, 2, true, 0, true, true);
assertNextEvent(readView, eventBuffer.getWrittenBytes(), EndOfSuperstepEvent.class, false, 0, false, true);
assertNextBuffer(readView, 4, false, 0, false, true);
assertNoNextBuffer(readView);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册