diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java index 0f002d10869ad6e77f49cfc510092e87a44541ea..9ed123ef4a4bd856a05e6026d51bbc537d020a09 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java @@ -127,10 +127,7 @@ public class PipelinedSubpartition extends ResultSubpartition { @Override public boolean add(BufferConsumer bufferConsumer, boolean isPriorityEvent) throws IOException { if (isPriorityEvent) { - if (readView != null && readView.notifyPriorityEvent(bufferConsumer)) { - bufferConsumer.close(); - return true; - } + // TODO: use readView.notifyPriorityEvent for local channels return add(bufferConsumer, false, true); } return add(bufferConsumer, false, false); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionWithReadViewTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionWithReadViewTest.java index b96f58a0959b213d2f454fb992df03e00d8ec797..8ddd531cd0e2d331f0057c68440cf6880f2684f8 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionWithReadViewTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionWithReadViewTest.java @@ -331,7 +331,7 @@ public class PipelinedSubpartitionWithReadViewTest { BufferConsumer barrierBuffer = EventSerializer.toBufferConsumer(new CheckpointBarrier(0, 0, options)); subpartition.add(barrierBuffer, true); assertEquals(2, availablityListener.getNumNotifications()); - assertEquals(1, availablityListener.getNumPriorityEvents()); + assertEquals(0, availablityListener.getNumPriorityEvents()); List inflight = subpartition.requestInflightBufferSnapshot(); assertEquals(Arrays.asList(1, 2, 3), inflight.stream().map(Buffer::getSize).collect(Collectors.toList())); @@ -344,39 +344,6 @@ public class PipelinedSubpartitionWithReadViewTest { assertNoNextBuffer(readView); } - @Test - public void testBarrierConsumedByAvailabilityListener() throws Exception { - availablityListener.consumePriorityEvents(); - - subpartition.add(createFilledFinishedBufferConsumer(1)); - assertEquals(0, availablityListener.getNumNotifications()); - assertEquals(0, availablityListener.getNumPriorityEvents()); - - subpartition.add(createFilledFinishedBufferConsumer(2)); - assertEquals(1, availablityListener.getNumNotifications()); - assertEquals(0, availablityListener.getNumPriorityEvents()); - - subpartition.add(createFilledFinishedBufferConsumer(3)); - assertEquals(1, availablityListener.getNumNotifications()); - assertEquals(0, availablityListener.getNumPriorityEvents()); - - CheckpointOptions options = new CheckpointOptions( - CheckpointType.CHECKPOINT, - new CheckpointStorageLocationReference(new byte[]{0, 1, 2})); - BufferConsumer barrierBuffer = EventSerializer.toBufferConsumer(new CheckpointBarrier(0, 0, options)); - subpartition.add(barrierBuffer, true); - assertEquals(1, availablityListener.getNumNotifications()); - assertEquals(1, availablityListener.getNumPriorityEvents()); - - List inflight = subpartition.requestInflightBufferSnapshot(); - assertEquals(Arrays.asList(), inflight.stream().map(Buffer::getSize).collect(Collectors.toList())); - - assertNextBuffer(readView, 1, true, 1, false, true); - assertNextBuffer(readView, 2, false, 0, false, true); - assertNextBuffer(readView, 3, false, 0, false, true); - assertNoNextBuffer(readView); - } - @Test public void testBacklogConsistentWithNumberOfConsumableBuffers() throws Exception { testBacklogConsistentWithNumberOfConsumableBuffers(false, false);