diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartition.java index 7a74872d945b8a6e1b2b5c8ea56e342dfb4955dc..785e44cbd926d4f53e85dc524967bd4c02b15d1a 100755 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartition.java @@ -198,9 +198,8 @@ final class BoundedBlockingSubpartition extends ResultSubpartition { availability.notifyDataAvailable(); - final BoundedData.Reader dataReader = data.createReader(); final BoundedBlockingSubpartitionReader reader = new BoundedBlockingSubpartitionReader( - this, dataReader, numDataBuffersWritten); + this, data, numDataBuffersWritten, availability); readers.add(reader); return reader; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionReader.java index f7536b988db17cfd0c21ffaa16d6ccac20da77d1..63e5e226b88fe3830dce09e9af5d650d608b1827 100755 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionReader.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionReader.java @@ -37,6 +37,9 @@ final class BoundedBlockingSubpartitionReader implements ResultSubpartitionView /** The result subpartition that we read. */ private final BoundedBlockingSubpartition parent; + /** The listener that is notified when there are available buffers for this subpartition view. */ + private final BufferAvailabilityListener availabilityListener; + /** The next buffer (look ahead). Null once the data is depleted or reader is disposed. */ @Nullable private Buffer nextBuffer; @@ -57,16 +60,20 @@ final class BoundedBlockingSubpartitionReader implements ResultSubpartitionView */ BoundedBlockingSubpartitionReader( BoundedBlockingSubpartition parent, - BoundedData.Reader dataReader, - int numDataBuffers) throws IOException { - - checkArgument(numDataBuffers >= 0); + BoundedData data, + int numDataBuffers, + BufferAvailabilityListener availabilityListener) throws IOException { this.parent = checkNotNull(parent); - this.dataReader = checkNotNull(dataReader); - this.dataBufferBacklog = numDataBuffers; + checkNotNull(data); + this.dataReader = data.createReader(this); this.nextBuffer = dataReader.nextBuffer(); + + checkArgument(numDataBuffers >= 0); + this.dataBufferBacklog = numDataBuffers; + + this.availabilityListener = checkNotNull(availabilityListener); } @Nullable @@ -89,9 +96,31 @@ final class BoundedBlockingSubpartitionReader implements ResultSubpartitionView return BufferAndBacklog.fromBufferAndLookahead(current, nextBuffer, dataBufferBacklog); } + /** + * This method is actually only meaningful for the {@link BoundedBlockingSubpartitionType#FILE}. + * + *

For the other types the {@link #nextBuffer} can not be ever set to null, so it is no need + * to notify available via this method. But the implementation is also compatible with other + * types even though called by mistake. + */ @Override public void notifyDataAvailable() { - throw new IllegalStateException("No data should become available on a blocking partition during consumption."); + if (nextBuffer == null) { + assert dataReader != null; + + try { + nextBuffer = dataReader.nextBuffer(); + } catch (IOException ex) { + // this exception wrapper is only for avoiding throwing IOException explicitly + // in relevant interface methods + throw new IllegalStateException("No data available while reading", ex); + } + + // next buffer is null indicates the end of partition + if (nextBuffer != null) { + availabilityListener.notifyDataAvailable(); + } + } } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedData.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedData.java index 4d58cf86cb3864317e7c8ead8d35e6391a0e9075..a8681cc3a150870c80b65a29fe349f560bdeaaa8 100755 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedData.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedData.java @@ -34,7 +34,7 @@ import java.io.IOException; * through the {@link #writeBuffer(Buffer)} method. * The write phase is ended by calling {@link #finishWrite()}. * After the write phase is finished, the data can be read multiple times through readers created - * via {@link #createReader()}. + * via {@link #createReader(ResultSubpartitionView)}. * Finally, the BoundedData is dropped / deleted by calling {@link #close()}. * *

Thread Safety and Concurrency

@@ -60,7 +60,15 @@ interface BoundedData extends Closeable { * Gets a reader for the bounded data. Multiple readers may be created. * This call only succeeds once the write phase was finished via {@link #finishWrite()}. */ - BoundedData.Reader createReader() throws IOException; + BoundedData.Reader createReader(ResultSubpartitionView subpartitionView) throws IOException; + + /** + * Gets a reader for the bounded data. Multiple readers may be created. + * This call only succeeds once the write phase was finished via {@link #finishWrite()}. + */ + default BoundedData.Reader createReader() throws IOException { + return createReader(new NoOpResultSubpartitionView()); + } /** * Gets the number of bytes of all written data (including the metadata in the buffer headers). diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/FileChannelBoundedData.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/FileChannelBoundedData.java index 50dca6083edebfa2d8ba2219b6457c5b36b3d05f..690ad7d4482c8506657219e6a2987ba8b2e9fb29 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/FileChannelBoundedData.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/FileChannelBoundedData.java @@ -75,11 +75,11 @@ final class FileChannelBoundedData implements BoundedData { } @Override - public Reader createReader() throws IOException { + public Reader createReader(ResultSubpartitionView subpartitionView) throws IOException { checkState(!fileChannel.isOpen()); final FileChannel fc = FileChannel.open(filePath, StandardOpenOption.READ); - return new FileBufferReader(fc, memorySegmentSize); + return new FileBufferReader(fc, memorySegmentSize, subpartitionView); } @Override @@ -117,7 +117,12 @@ final class FileChannelBoundedData implements BoundedData { private final ArrayDeque buffers; - FileBufferReader(FileChannel fileChannel, int bufferSize) { + private final ResultSubpartitionView subpartitionView; + + /** The tag indicates whether we have read the end of this file. */ + private boolean isFinished; + + FileBufferReader(FileChannel fileChannel, int bufferSize, ResultSubpartitionView subpartitionView) { this.fileChannel = checkNotNull(fileChannel); this.headerBuffer = BufferReaderWriterUtil.allocatedHeaderBuffer(); this.buffers = new ArrayDeque<>(NUM_BUFFERS); @@ -125,26 +130,25 @@ final class FileChannelBoundedData implements BoundedData { for (int i = 0; i < NUM_BUFFERS; i++) { buffers.addLast(MemorySegmentFactory.allocateUnpooledOffHeapMemory(bufferSize, null)); } + + this.subpartitionView = checkNotNull(subpartitionView); } @Nullable @Override public Buffer nextBuffer() throws IOException { final MemorySegment memory = buffers.pollFirst(); + if (memory == null) { + return null; + } - if (memory != null) { - final Buffer next = BufferReaderWriterUtil.readFromByteChannel(fileChannel, headerBuffer, memory, this); - if (next != null) { - return next; - } - else { - recycle(memory); - return null; - } + final Buffer next = BufferReaderWriterUtil.readFromByteChannel(fileChannel, headerBuffer, memory, this); + if (next == null) { + isFinished = true; + recycle(memory); } - throw new IOException("Bug in BoundedBlockingSubpartition with FILE data: " + - "Requesting new buffer before previous buffer returned."); + return next; } @Override @@ -155,6 +159,10 @@ final class FileChannelBoundedData implements BoundedData { @Override public void recycle(MemorySegment memorySegment) { buffers.addLast(memorySegment); + + if (!isFinished) { + subpartitionView.notifyDataAvailable(); + } } } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/FileChannelMemoryMappedBoundedData.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/FileChannelMemoryMappedBoundedData.java index 4a71fcdd1ac3ca7ddbc7aef990db80c4c64b08be..f22efd0fdd7464e62c22f8d52110c4d724bd8772 100755 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/FileChannelMemoryMappedBoundedData.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/FileChannelMemoryMappedBoundedData.java @@ -123,7 +123,7 @@ final class FileChannelMemoryMappedBoundedData implements BoundedData { } @Override - public BoundedData.Reader createReader() { + public BoundedData.Reader createReader(ResultSubpartitionView ignored) { checkState(!fileChannel.isOpen()); final List buffers = memoryMappedRegions.stream() diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/MemoryMappedBoundedData.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/MemoryMappedBoundedData.java index 502c64c47c1b11234844bf2432cfa80dd20d72ea..e8718f526139e5bcadf64daa1f6d9241feaf65f4 100755 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/MemoryMappedBoundedData.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/MemoryMappedBoundedData.java @@ -113,7 +113,7 @@ final class MemoryMappedBoundedData implements BoundedData { } @Override - public BufferSlicer createReader() { + public BufferSlicer createReader(ResultSubpartitionView ignored) { assert currentBuffer == null; final List buffers = fullBuffers.stream() diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionAvailabilityTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionAvailabilityTest.java new file mode 100644 index 0000000000000000000000000000000000000000..915cf43faed9246dda04ab336bbeef4ba76ba4a0 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionAvailabilityTest.java @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition; + +import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils; +import org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog; + +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +/** + * Tests for the availability handling of the BoundedBlockingSubpartitions with not constant + * availability. + */ +public class BoundedBlockingSubpartitionAvailabilityTest { + + @ClassRule + public static final TemporaryFolder TMP_FOLDER = new TemporaryFolder(); + + private static final int BUFFER_SIZE = 32 * 1024; + + @Test + public void testInitiallyAvailable() throws Exception { + final ResultSubpartition subpartition = createPartitionWithData(10); + final CountingAvailabilityListener listener = new CountingAvailabilityListener(); + + // test + final ResultSubpartitionView subpartitionView = subpartition.createReadView(listener); + + // assert + assertEquals(1, listener.numNotifications); + + // cleanup + subpartitionView.releaseAllResources(); + subpartition.release(); + } + + @Test + public void testUnavailableWhenBuffersExhausted() throws Exception { + // setup + final BoundedBlockingSubpartition subpartition = createPartitionWithData(100_000); + final CountingAvailabilityListener listener = new CountingAvailabilityListener(); + final ResultSubpartitionView reader = subpartition.createReadView(listener); + + // test + final List data = drainAvailableData(reader); + + // assert + assertFalse(reader.isAvailable()); + assertFalse(data.get(data.size() - 1).isMoreAvailable()); + + // cleanup + reader.releaseAllResources(); + subpartition.release(); + } + + @Test + public void testAvailabilityNotificationWhenBuffersReturn() throws Exception { + // setup + final ResultSubpartition subpartition = createPartitionWithData(100_000); + final CountingAvailabilityListener listener = new CountingAvailabilityListener(); + final ResultSubpartitionView reader = subpartition.createReadView(listener); + + // test + final List data = drainAvailableData(reader); + data.get(0).buffer().recycleBuffer(); + data.get(1).buffer().recycleBuffer(); + + // assert + assertTrue(reader.isAvailable()); + assertEquals(2, listener.numNotifications); // one initial, one for new availability + + // cleanup + reader.releaseAllResources(); + subpartition.release(); + } + + @Test + public void testNotAvailableWhenEmpty() throws Exception { + // setup + final ResultSubpartition subpartition = createPartitionWithData(100_000); + final ResultSubpartitionView reader = subpartition.createReadView(new NoOpBufferAvailablityListener()); + + // test + drainAllData(reader); + + // assert + assertFalse(reader.isAvailable()); + + // cleanup + reader.releaseAllResources(); + subpartition.release(); + } + + // ------------------------------------------------------------------------ + + private static BoundedBlockingSubpartition createPartitionWithData(int numberOfBuffers) throws IOException { + ResultPartition parent = PartitionTestUtils.createPartition(); + + BoundedBlockingSubpartition partition = BoundedBlockingSubpartition.createWithFileChannel( + 0, parent, new File(TMP_FOLDER.newFolder(), "data"), BUFFER_SIZE); + + writeBuffers(partition, numberOfBuffers); + partition.finish(); + + return partition; + } + + private static void writeBuffers(ResultSubpartition partition, int numberOfBuffers) throws IOException { + for (int i = 0; i < numberOfBuffers; i++) { + partition.add(BufferBuilderTestUtils.createFilledBufferConsumer(BUFFER_SIZE, BUFFER_SIZE)); + } + } + + private static List drainAvailableData(ResultSubpartitionView reader) throws Exception { + final ArrayList list = new ArrayList<>(); + + BufferAndBacklog bab; + while ((bab = reader.getNextBuffer()) != null) { + list.add(bab); + } + + return list; + } + + private static void drainAllData(ResultSubpartitionView reader) throws Exception { + BufferAndBacklog bab; + while ((bab = reader.getNextBuffer()) != null) { + bab.buffer().recycleBuffer(); + } + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionTest.java index 9bd0c4b447172deab63213a20a2a3afc9f8cb4aa..ce4083f5cdb873d753e62d677e7fe8a4e40890a5 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionTest.java @@ -35,6 +35,7 @@ import java.io.File; import java.io.IOException; import static org.apache.flink.runtime.io.network.partition.PartitionTestUtils.createPartition; +import static org.apache.flink.util.Preconditions.checkNotNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -80,10 +81,11 @@ public class BoundedBlockingSubpartitionTest extends SubpartitionTestBase { } @Test - public void testClosingClosesBoundedData() throws Exception { + public void testCloseBoundedData() throws Exception { final TestingBoundedDataReader reader = new TestingBoundedDataReader(); + final TestingBoundedData data = new TestingBoundedData(reader); final BoundedBlockingSubpartitionReader bbspr = new BoundedBlockingSubpartitionReader( - (BoundedBlockingSubpartition) createSubpartition(), reader, 10); + (BoundedBlockingSubpartition) createSubpartition(), data, 10, new NoOpBufferAvailablityListener()); bbspr.releaseAllResources(); @@ -124,7 +126,7 @@ public class BoundedBlockingSubpartitionTest extends SubpartitionTestBase { } @Override - public Reader createReader() throws IOException { + public Reader createReader(ResultSubpartitionView subpartitionView) throws IOException { throw new UnsupportedOperationException(); } @@ -137,6 +139,36 @@ public class BoundedBlockingSubpartitionTest extends SubpartitionTestBase { public void close() {} } + private static class TestingBoundedData implements BoundedData { + + private BoundedData.Reader reader; + + private TestingBoundedData(BoundedData.Reader reader) { + this.reader = checkNotNull(reader); + } + + @Override + public void writeBuffer(Buffer buffer) throws IOException { + } + + @Override + public void finishWrite() throws IOException { + } + + @Override + public Reader createReader(ResultSubpartitionView ignored) throws IOException { + return reader; + } + + @Override + public long getSize() { + throw new UnsupportedOperationException(); + } + + @Override + public void close() {} + } + private static class TestingBoundedDataReader implements BoundedData.Reader { boolean closed; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedDataTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedDataTestBase.java index c71b9dfdebb630a8862d83bbb108920f30b650a7..365e93e913e25b731f870518f5d77fb93a4fb1da 100755 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedDataTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedDataTestBase.java @@ -59,7 +59,7 @@ public abstract class BoundedDataTestBase { protected abstract BoundedData createBoundedDataWithRegion(Path tempFilePath, int regionSize) throws IOException; - private BoundedData createBoundedData() throws IOException { + protected BoundedData createBoundedData() throws IOException { return createBoundedData(createTempPath()); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/CountingAvailabilityListener.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/CountingAvailabilityListener.java new file mode 100644 index 0000000000000000000000000000000000000000..4e27ee0bcc7e6dd6e8d0391f31ec36c27f8ff3c8 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/CountingAvailabilityListener.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition; + +/** + * A simple BufferAvailabilityListener that counts the number of notifications. + */ +final class CountingAvailabilityListener implements BufferAvailabilityListener { + + int numNotifications; + + @Override + public void notifyDataAvailable() { + numNotifications++; + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/FileChannelBoundedDataTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/FileChannelBoundedDataTest.java index cc499f4fae19970f912ad4381f9e922026ad06f1..1ca2bc8eeb05679fa9ec5a6412035391f046989a 100755 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/FileChannelBoundedDataTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/FileChannelBoundedDataTest.java @@ -18,14 +18,45 @@ package org.apache.flink.runtime.io.network.partition; +import org.apache.flink.runtime.io.disk.FileChannelManager; +import org.apache.flink.runtime.io.disk.FileChannelManagerImpl; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog; +import org.apache.flink.runtime.util.EnvironmentInformation; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + import java.io.IOException; import java.nio.file.Path; +import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.buildSomeBuffer; +import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createFilledBufferConsumer; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + /** * Tests that read the BoundedBlockingSubpartition with multiple threads in parallel. */ public class FileChannelBoundedDataTest extends BoundedDataTestBase { + private static final String tempDir = EnvironmentInformation.getTemporaryFileDirectory(); + + private static FileChannelManager fileChannelManager; + + @BeforeClass + public static void setUp() { + fileChannelManager = new FileChannelManagerImpl(new String[] {tempDir}, "testing"); + } + + @AfterClass + public static void shutdown() throws Exception { + fileChannelManager.close(); + } + @Override protected boolean isRegionBased() { return false; @@ -40,4 +71,145 @@ public class FileChannelBoundedDataTest extends BoundedDataTestBase { protected BoundedData createBoundedDataWithRegion(Path tempFilePath, int regionSize) throws IOException { throw new UnsupportedOperationException(); } + + @Test + public void testReadNextBuffer() throws Exception { + final int numberOfBuffers = 3; + try (final BoundedData data = createBoundedData()) { + writeBuffers(data, numberOfBuffers); + + final BoundedData.Reader reader = data.createReader(); + final Buffer buffer1 = reader.nextBuffer(); + final Buffer buffer2 = reader.nextBuffer(); + + assertNotNull(buffer1); + assertNotNull(buffer2); + // there are only two available memory segments for reading data + assertNull(reader.nextBuffer()); + + // cleanup + buffer1.recycleBuffer(); + buffer2.recycleBuffer(); + } + } + + @Test + public void testRecycleBufferForNotifyingSubpartitionView() throws Exception { + final int numberOfBuffers = 2; + try (final BoundedData data = createBoundedData()) { + writeBuffers(data, numberOfBuffers); + + final VerifyNotificationResultSubpartitionView subpartitionView = new VerifyNotificationResultSubpartitionView(); + final BoundedData.Reader reader = data.createReader(subpartitionView); + final Buffer buffer1 = reader.nextBuffer(); + final Buffer buffer2 = reader.nextBuffer(); + assertNotNull(buffer1); + assertNotNull(buffer2); + + assertFalse(subpartitionView.isAvailable); + buffer1.recycleBuffer(); + // the view is notified while recycling buffer if reader has not tagged finished + assertTrue(subpartitionView.isAvailable); + + subpartitionView.resetAvailable(); + assertFalse(subpartitionView.isAvailable); + + // the next buffer is null to make reader tag finished + assertNull(reader.nextBuffer()); + + buffer2.recycleBuffer(); + // the view is not notified while recycling buffer if reader already finished + assertFalse(subpartitionView.isAvailable); + } + } + + @Test + public void testRecycleBufferForNotifyingBufferAvailabilityListener() throws Exception { + final ResultSubpartition subpartition = createFileBoundedBlockingSubpartition(); + final int numberOfBuffers = 2; + writeBuffers(subpartition, numberOfBuffers); + + final VerifyNotificationBufferAvailabilityListener listener = new VerifyNotificationBufferAvailabilityListener(); + final ResultSubpartitionView subpartitionView = subpartition.createReadView(listener); + // the notification is triggered while creating view + assertTrue(listener.isAvailable); + + listener.resetAvailable(); + assertFalse(listener.isAvailable); + + final BufferAndBacklog buffer1 = subpartitionView.getNextBuffer(); + final BufferAndBacklog buffer2 = subpartitionView.getNextBuffer(); + assertNotNull(buffer1); + assertNotNull(buffer2); + + // the next buffer is null in view because FileBufferReader has no available buffers for reading ahead + assertFalse(subpartitionView.isAvailable()); + // recycle a buffer to trigger notification of data available + buffer1.buffer().recycleBuffer(); + assertTrue(listener.isAvailable); + + // cleanup + buffer2.buffer().recycleBuffer(); + subpartitionView.releaseAllResources(); + subpartition.release(); + } + + private static ResultSubpartition createFileBoundedBlockingSubpartition() { + final ResultPartition resultPartition = new ResultPartitionBuilder() + .setNetworkBufferSize(BUFFER_SIZE) + .setResultPartitionType(ResultPartitionType.BLOCKING) + .setBoundedBlockingSubpartitionType(BoundedBlockingSubpartitionType.FILE) + .setFileChannelManager(fileChannelManager) + .build(); + return resultPartition.subpartitions[0]; + } + + private static void writeBuffers(BoundedData data, int numberOfBuffers) throws IOException { + for (int i = 0; i < numberOfBuffers; i++) { + data.writeBuffer(buildSomeBuffer(BUFFER_SIZE)); + } + data.finishWrite(); + } + + private static void writeBuffers(ResultSubpartition subpartition, int numberOfBuffers) throws IOException { + for (int i = 0; i < numberOfBuffers; i++) { + subpartition.add(createFilledBufferConsumer(BUFFER_SIZE, BUFFER_SIZE)); + } + subpartition.finish(); + } + + /** + * This subpartition view is used for verifying the {@link ResultSubpartitionView#notifyDataAvailable()} + * was ever called before. + */ + private static class VerifyNotificationResultSubpartitionView extends NoOpResultSubpartitionView { + + private boolean isAvailable; + + @Override + public void notifyDataAvailable() { + isAvailable = true; + } + + private void resetAvailable() { + isAvailable = false; + } + } + + /** + * This listener is used for verifying the notification logic in {@link ResultSubpartitionView#notifyDataAvailable()}. + */ + private static class VerifyNotificationBufferAvailabilityListener implements BufferAvailabilityListener { + + private boolean isAvailable; + + @Override + public void notifyDataAvailable() { + isAvailable = true; + } + + private void resetAvailable() { + isAvailable = false; + } + } } diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/FileBufferReaderITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/FileBufferReaderITCase.java new file mode 100644 index 0000000000000000000000000000000000000000..49d9ef562b786ae35f1175bb724855fa514e38f2 --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/FileBufferReaderITCase.java @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.runtime; + +import org.apache.flink.api.common.JobSubmissionResult; +import org.apache.flink.client.program.MiniClusterClient; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.NettyShuffleEnvironmentOptions; +import org.apache.flink.configuration.RestOptions; +import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.io.network.api.reader.RecordReader; +import org.apache.flink.runtime.io.network.api.writer.RecordWriter; +import org.apache.flink.runtime.io.network.api.writer.RecordWriterBuilder; +import org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartitionType; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.jobgraph.DistributionPattern; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.ScheduleMode; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; +import org.apache.flink.runtime.jobmaster.JobResult; +import org.apache.flink.runtime.minicluster.MiniCluster; +import org.apache.flink.runtime.minicluster.MiniClusterConfiguration; +import org.apache.flink.testutils.serialization.types.ByteArrayType; +import org.apache.flink.util.TestLogger; + +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelPromise; + +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.concurrent.CompletableFuture; + +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThat; + +/** + * Tests the bug reported in FLINK-131O0. + * + *

The implementation of {@link org.apache.flink.runtime.io.network.partition.BoundedData.Reader#nextBuffer()} + * for {@link BoundedBlockingSubpartitionType#FILE} assumes that there is always an available buffer, otherwise + * an IOException is thrown and it always assumes that pool of two buffers is enough (before using the 3rd buffer, + * first one was expected to be recycled already). But in the case of pending flush operation (when the socket channel + * is not writable while netty thread is calling {@link ChannelHandlerContext#writeAndFlush(Object, ChannelPromise)}), + * the first fetched buffer from {@link org.apache.flink.runtime.io.network.partition.FileChannelBoundedData} has not + * been recycled while fetching the second buffer to trigger next read ahead, which breaks the above assumption. + */ +public class FileBufferReaderITCase extends TestLogger { + + private static final int parallelism = 8; + + private static final int numRecords = 100_000; + + private static final byte[] dataSource = new byte[1024]; + + @BeforeClass + public static void setup() { + for (int i = 0; i < dataSource.length; i++) { + dataSource[i] = 0; + } + } + + @Test + public void testSequentialReading() throws Exception { + // setup + final Configuration configuration = new Configuration(); + configuration.setString(RestOptions.BIND_PORT, "0"); + configuration.setString(NettyShuffleEnvironmentOptions.NETWORK_BOUNDED_BLOCKING_SUBPARTITION_TYPE, "file"); + + final MiniClusterConfiguration miniClusterConfiguration = new MiniClusterConfiguration.Builder() + .setConfiguration(configuration) + .setNumTaskManagers(parallelism) + .setNumSlotsPerTaskManager(1) + .build(); + + try (final MiniCluster miniCluster = new MiniCluster(miniClusterConfiguration)) { + miniCluster.start(); + + final MiniClusterClient client = new MiniClusterClient(configuration, miniCluster); + final JobGraph jobGraph = createJobGraph(); + final CompletableFuture submitFuture = client.submitJob(jobGraph); + // wait for the submission to succeed + final JobSubmissionResult result = submitFuture.get(); + + final CompletableFuture resultFuture = client.requestJobResult(result.getJobID()); + final JobResult jobResult = resultFuture.get(); + + assertThat(jobResult.getSerializedThrowable().isPresent(), is(false)); + } + } + + private static JobGraph createJobGraph() { + final SlotSharingGroup group1 = new SlotSharingGroup(); + final SlotSharingGroup group2 = new SlotSharingGroup(); + + final JobVertex source = new JobVertex("source"); + source.setInvokableClass(TestSourceInvokable.class); + source.setParallelism(parallelism); + source.setSlotSharingGroup(group1); + + final JobVertex sink = new JobVertex("sink"); + sink.setInvokableClass(TestSinkInvokable.class); + sink.setParallelism(parallelism); + sink.setSlotSharingGroup(group2); + + sink.connectNewDataSetAsInput(source, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING); + + final JobGraph jobGraph = new JobGraph(source, sink); + jobGraph.setScheduleMode(ScheduleMode.LAZY_FROM_SOURCES); + + return jobGraph; + } + + /** + * Basic source {@link AbstractInvokable} which sends the elements to the + * {@link TestSinkInvokable}. + */ + public static final class TestSourceInvokable extends AbstractInvokable { + + /** + * Create an Invokable task and set its environment. + * + * @param environment The environment assigned to this invokable. + */ + public TestSourceInvokable(Environment environment) { + super(environment); + } + + @Override + public void invoke() throws Exception { + final RecordWriter writer = new RecordWriterBuilder().build(getEnvironment().getWriter(0)); + + final ByteArrayType bytes = new ByteArrayType(dataSource); + int counter = 0; + while (counter++ < numRecords) { + try { + writer.emit(bytes); + writer.flushAll(); + } finally { + writer.clearBuffers(); + } + } + } + } + + /** + * Basic sink {@link AbstractInvokable} which verifies the sent elements + * from the {@link TestSourceInvokable}. + */ + public static final class TestSinkInvokable extends AbstractInvokable { + + private int numReceived = 0; + + /** + * Create an Invokable task and set its environment. + * + * @param environment The environment assigned to this invokable. + */ + public TestSinkInvokable(Environment environment) { + super(environment); + } + + @Override + public void invoke() throws Exception { + final RecordReader reader = new RecordReader<>( + getEnvironment().getInputGate(0), + ByteArrayType.class, + getEnvironment().getTaskManagerInfo().getTmpDirectories()); + + while (reader.hasNext()) { + reader.next(); + numReceived++; + } + + assertThat(numReceived, is(numRecords)); + } + } +}