提交 8ebba929 编写于 作者: A Arvid Heise 提交者: Piotr Nowojski

[FLINK-17315][checkpointing] Disable priority event listener

Priority event listener currently does not snapshot any buffers and causes additional synchronization points. We should re-enable/re-evaluate this concept once a proper threading model has been established on input side.
上级 755b576b
......@@ -127,10 +127,7 @@ public class PipelinedSubpartition extends ResultSubpartition {
@Override
public boolean add(BufferConsumer bufferConsumer, boolean isPriorityEvent) throws IOException {
if (isPriorityEvent) {
if (readView != null && readView.notifyPriorityEvent(bufferConsumer)) {
bufferConsumer.close();
return true;
}
// TODO: use readView.notifyPriorityEvent for local channels
return add(bufferConsumer, false, true);
}
return add(bufferConsumer, false, false);
......
......@@ -331,7 +331,7 @@ public class PipelinedSubpartitionWithReadViewTest {
BufferConsumer barrierBuffer = EventSerializer.toBufferConsumer(new CheckpointBarrier(0, 0, options));
subpartition.add(barrierBuffer, true);
assertEquals(2, availablityListener.getNumNotifications());
assertEquals(1, availablityListener.getNumPriorityEvents());
assertEquals(0, availablityListener.getNumPriorityEvents());
List<Buffer> inflight = subpartition.requestInflightBufferSnapshot();
assertEquals(Arrays.asList(1, 2, 3), inflight.stream().map(Buffer::getSize).collect(Collectors.toList()));
......@@ -344,39 +344,6 @@ public class PipelinedSubpartitionWithReadViewTest {
assertNoNextBuffer(readView);
}
@Test
public void testBarrierConsumedByAvailabilityListener() throws Exception {
availablityListener.consumePriorityEvents();
subpartition.add(createFilledFinishedBufferConsumer(1));
assertEquals(0, availablityListener.getNumNotifications());
assertEquals(0, availablityListener.getNumPriorityEvents());
subpartition.add(createFilledFinishedBufferConsumer(2));
assertEquals(1, availablityListener.getNumNotifications());
assertEquals(0, availablityListener.getNumPriorityEvents());
subpartition.add(createFilledFinishedBufferConsumer(3));
assertEquals(1, availablityListener.getNumNotifications());
assertEquals(0, availablityListener.getNumPriorityEvents());
CheckpointOptions options = new CheckpointOptions(
CheckpointType.CHECKPOINT,
new CheckpointStorageLocationReference(new byte[]{0, 1, 2}));
BufferConsumer barrierBuffer = EventSerializer.toBufferConsumer(new CheckpointBarrier(0, 0, options));
subpartition.add(barrierBuffer, true);
assertEquals(1, availablityListener.getNumNotifications());
assertEquals(1, availablityListener.getNumPriorityEvents());
List<Buffer> inflight = subpartition.requestInflightBufferSnapshot();
assertEquals(Arrays.asList(), inflight.stream().map(Buffer::getSize).collect(Collectors.toList()));
assertNextBuffer(readView, 1, true, 1, false, true);
assertNextBuffer(readView, 2, false, 0, false, true);
assertNextBuffer(readView, 3, false, 0, false, true);
assertNoNextBuffer(readView);
}
@Test
public void testBacklogConsistentWithNumberOfConsumableBuffers() throws Exception {
testBacklogConsistentWithNumberOfConsumableBuffers(false, false);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册