提交 2c8ee78f 编写于 作者: Z Zhijiang 提交者: Piotr Nowojski

[FLINK-13100][network] Fix the bug of throwing IOException while FileChannelBoundedData#nextBuffer

The implementation of FileChannelBoundedData#nextBuffer assumes that there is always an available buffer, otherwise an IOException is thrown
and it always assumes that pool of two buffers is enough (before using the 3rd buffer, first one was expected to be recycled already). But in
the case of pending flush operation (when the socket channel is not writable while netty thread is calling writeAndFlush method), the first
fetched buffer from FileChannelBoundedData has not been recycled while fetching the second buffer to trigger next read ahead, which breaks the
above assumption.

In order to fix this problem, we make read ahead is not always available for FileChannelBoundedData. If there are no available buffers to read
the next data, we retrigger the read ahead while recycling buffer via ResultSubpartitionView#notifyDataAvailable.
上级 67bc355d
......@@ -198,9 +198,8 @@ final class BoundedBlockingSubpartition extends ResultSubpartition {
availability.notifyDataAvailable();
final BoundedData.Reader dataReader = data.createReader();
final BoundedBlockingSubpartitionReader reader = new BoundedBlockingSubpartitionReader(
this, dataReader, numDataBuffersWritten);
this, data, numDataBuffersWritten, availability);
readers.add(reader);
return reader;
}
......
......@@ -37,6 +37,9 @@ final class BoundedBlockingSubpartitionReader implements ResultSubpartitionView
/** The result subpartition that we read. */
private final BoundedBlockingSubpartition parent;
/** The listener that is notified when there are available buffers for this subpartition view. */
private final BufferAvailabilityListener availabilityListener;
/** The next buffer (look ahead). Null once the data is depleted or reader is disposed. */
@Nullable
private Buffer nextBuffer;
......@@ -57,16 +60,20 @@ final class BoundedBlockingSubpartitionReader implements ResultSubpartitionView
*/
BoundedBlockingSubpartitionReader(
BoundedBlockingSubpartition parent,
BoundedData.Reader dataReader,
int numDataBuffers) throws IOException {
checkArgument(numDataBuffers >= 0);
BoundedData data,
int numDataBuffers,
BufferAvailabilityListener availabilityListener) throws IOException {
this.parent = checkNotNull(parent);
this.dataReader = checkNotNull(dataReader);
this.dataBufferBacklog = numDataBuffers;
checkNotNull(data);
this.dataReader = data.createReader(this);
this.nextBuffer = dataReader.nextBuffer();
checkArgument(numDataBuffers >= 0);
this.dataBufferBacklog = numDataBuffers;
this.availabilityListener = checkNotNull(availabilityListener);
}
@Nullable
......@@ -89,9 +96,31 @@ final class BoundedBlockingSubpartitionReader implements ResultSubpartitionView
return BufferAndBacklog.fromBufferAndLookahead(current, nextBuffer, dataBufferBacklog);
}
/**
* This method is actually only meaningful for the {@link BoundedBlockingSubpartitionType#FILE}.
*
* <p>For the other types the {@link #nextBuffer} can not be ever set to null, so it is no need
* to notify available via this method. But the implementation is also compatible with other
* types even though called by mistake.
*/
@Override
public void notifyDataAvailable() {
throw new IllegalStateException("No data should become available on a blocking partition during consumption.");
if (nextBuffer == null) {
assert dataReader != null;
try {
nextBuffer = dataReader.nextBuffer();
} catch (IOException ex) {
// this exception wrapper is only for avoiding throwing IOException explicitly
// in relevant interface methods
throw new IllegalStateException("No data available while reading", ex);
}
// next buffer is null indicates the end of partition
if (nextBuffer != null) {
availabilityListener.notifyDataAvailable();
}
}
}
@Override
......
......@@ -34,7 +34,7 @@ import java.io.IOException;
* through the {@link #writeBuffer(Buffer)} method.
* The write phase is ended by calling {@link #finishWrite()}.
* After the write phase is finished, the data can be read multiple times through readers created
* via {@link #createReader()}.
* via {@link #createReader(ResultSubpartitionView)}.
* Finally, the BoundedData is dropped / deleted by calling {@link #close()}.
*
* <h2>Thread Safety and Concurrency</h2>
......@@ -60,7 +60,15 @@ interface BoundedData extends Closeable {
* Gets a reader for the bounded data. Multiple readers may be created.
* This call only succeeds once the write phase was finished via {@link #finishWrite()}.
*/
BoundedData.Reader createReader() throws IOException;
BoundedData.Reader createReader(ResultSubpartitionView subpartitionView) throws IOException;
/**
* Gets a reader for the bounded data. Multiple readers may be created.
* This call only succeeds once the write phase was finished via {@link #finishWrite()}.
*/
default BoundedData.Reader createReader() throws IOException {
return createReader(new NoOpResultSubpartitionView());
}
/**
* Gets the number of bytes of all written data (including the metadata in the buffer headers).
......
......@@ -75,11 +75,11 @@ final class FileChannelBoundedData implements BoundedData {
}
@Override
public Reader createReader() throws IOException {
public Reader createReader(ResultSubpartitionView subpartitionView) throws IOException {
checkState(!fileChannel.isOpen());
final FileChannel fc = FileChannel.open(filePath, StandardOpenOption.READ);
return new FileBufferReader(fc, memorySegmentSize);
return new FileBufferReader(fc, memorySegmentSize, subpartitionView);
}
@Override
......@@ -117,7 +117,12 @@ final class FileChannelBoundedData implements BoundedData {
private final ArrayDeque<MemorySegment> buffers;
FileBufferReader(FileChannel fileChannel, int bufferSize) {
private final ResultSubpartitionView subpartitionView;
/** The tag indicates whether we have read the end of this file. */
private boolean isFinished;
FileBufferReader(FileChannel fileChannel, int bufferSize, ResultSubpartitionView subpartitionView) {
this.fileChannel = checkNotNull(fileChannel);
this.headerBuffer = BufferReaderWriterUtil.allocatedHeaderBuffer();
this.buffers = new ArrayDeque<>(NUM_BUFFERS);
......@@ -125,26 +130,25 @@ final class FileChannelBoundedData implements BoundedData {
for (int i = 0; i < NUM_BUFFERS; i++) {
buffers.addLast(MemorySegmentFactory.allocateUnpooledOffHeapMemory(bufferSize, null));
}
this.subpartitionView = checkNotNull(subpartitionView);
}
@Nullable
@Override
public Buffer nextBuffer() throws IOException {
final MemorySegment memory = buffers.pollFirst();
if (memory == null) {
return null;
}
if (memory != null) {
final Buffer next = BufferReaderWriterUtil.readFromByteChannel(fileChannel, headerBuffer, memory, this);
if (next != null) {
return next;
}
else {
recycle(memory);
return null;
}
final Buffer next = BufferReaderWriterUtil.readFromByteChannel(fileChannel, headerBuffer, memory, this);
if (next == null) {
isFinished = true;
recycle(memory);
}
throw new IOException("Bug in BoundedBlockingSubpartition with FILE data: " +
"Requesting new buffer before previous buffer returned.");
return next;
}
@Override
......@@ -155,6 +159,10 @@ final class FileChannelBoundedData implements BoundedData {
@Override
public void recycle(MemorySegment memorySegment) {
buffers.addLast(memorySegment);
if (!isFinished) {
subpartitionView.notifyDataAvailable();
}
}
}
}
......@@ -123,7 +123,7 @@ final class FileChannelMemoryMappedBoundedData implements BoundedData {
}
@Override
public BoundedData.Reader createReader() {
public BoundedData.Reader createReader(ResultSubpartitionView ignored) {
checkState(!fileChannel.isOpen());
final List<ByteBuffer> buffers = memoryMappedRegions.stream()
......
......@@ -113,7 +113,7 @@ final class MemoryMappedBoundedData implements BoundedData {
}
@Override
public BufferSlicer createReader() {
public BufferSlicer createReader(ResultSubpartitionView ignored) {
assert currentBuffer == null;
final List<ByteBuffer> buffers = fullBuffers.stream()
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.io.network.partition;
import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
import org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
/**
* Tests for the availability handling of the BoundedBlockingSubpartitions with not constant
* availability.
*/
public class BoundedBlockingSubpartitionAvailabilityTest {
@ClassRule
public static final TemporaryFolder TMP_FOLDER = new TemporaryFolder();
private static final int BUFFER_SIZE = 32 * 1024;
@Test
public void testInitiallyAvailable() throws Exception {
final ResultSubpartition subpartition = createPartitionWithData(10);
final CountingAvailabilityListener listener = new CountingAvailabilityListener();
// test
final ResultSubpartitionView subpartitionView = subpartition.createReadView(listener);
// assert
assertEquals(1, listener.numNotifications);
// cleanup
subpartitionView.releaseAllResources();
subpartition.release();
}
@Test
public void testUnavailableWhenBuffersExhausted() throws Exception {
// setup
final BoundedBlockingSubpartition subpartition = createPartitionWithData(100_000);
final CountingAvailabilityListener listener = new CountingAvailabilityListener();
final ResultSubpartitionView reader = subpartition.createReadView(listener);
// test
final List<BufferAndBacklog> data = drainAvailableData(reader);
// assert
assertFalse(reader.isAvailable());
assertFalse(data.get(data.size() - 1).isMoreAvailable());
// cleanup
reader.releaseAllResources();
subpartition.release();
}
@Test
public void testAvailabilityNotificationWhenBuffersReturn() throws Exception {
// setup
final ResultSubpartition subpartition = createPartitionWithData(100_000);
final CountingAvailabilityListener listener = new CountingAvailabilityListener();
final ResultSubpartitionView reader = subpartition.createReadView(listener);
// test
final List<ResultSubpartition.BufferAndBacklog> data = drainAvailableData(reader);
data.get(0).buffer().recycleBuffer();
data.get(1).buffer().recycleBuffer();
// assert
assertTrue(reader.isAvailable());
assertEquals(2, listener.numNotifications); // one initial, one for new availability
// cleanup
reader.releaseAllResources();
subpartition.release();
}
@Test
public void testNotAvailableWhenEmpty() throws Exception {
// setup
final ResultSubpartition subpartition = createPartitionWithData(100_000);
final ResultSubpartitionView reader = subpartition.createReadView(new NoOpBufferAvailablityListener());
// test
drainAllData(reader);
// assert
assertFalse(reader.isAvailable());
// cleanup
reader.releaseAllResources();
subpartition.release();
}
// ------------------------------------------------------------------------
private static BoundedBlockingSubpartition createPartitionWithData(int numberOfBuffers) throws IOException {
ResultPartition parent = PartitionTestUtils.createPartition();
BoundedBlockingSubpartition partition = BoundedBlockingSubpartition.createWithFileChannel(
0, parent, new File(TMP_FOLDER.newFolder(), "data"), BUFFER_SIZE);
writeBuffers(partition, numberOfBuffers);
partition.finish();
return partition;
}
private static void writeBuffers(ResultSubpartition partition, int numberOfBuffers) throws IOException {
for (int i = 0; i < numberOfBuffers; i++) {
partition.add(BufferBuilderTestUtils.createFilledBufferConsumer(BUFFER_SIZE, BUFFER_SIZE));
}
}
private static List<BufferAndBacklog> drainAvailableData(ResultSubpartitionView reader) throws Exception {
final ArrayList<BufferAndBacklog> list = new ArrayList<>();
BufferAndBacklog bab;
while ((bab = reader.getNextBuffer()) != null) {
list.add(bab);
}
return list;
}
private static void drainAllData(ResultSubpartitionView reader) throws Exception {
BufferAndBacklog bab;
while ((bab = reader.getNextBuffer()) != null) {
bab.buffer().recycleBuffer();
}
}
}
......@@ -35,6 +35,7 @@ import java.io.File;
import java.io.IOException;
import static org.apache.flink.runtime.io.network.partition.PartitionTestUtils.createPartition;
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
......@@ -80,10 +81,11 @@ public class BoundedBlockingSubpartitionTest extends SubpartitionTestBase {
}
@Test
public void testClosingClosesBoundedData() throws Exception {
public void testCloseBoundedData() throws Exception {
final TestingBoundedDataReader reader = new TestingBoundedDataReader();
final TestingBoundedData data = new TestingBoundedData(reader);
final BoundedBlockingSubpartitionReader bbspr = new BoundedBlockingSubpartitionReader(
(BoundedBlockingSubpartition) createSubpartition(), reader, 10);
(BoundedBlockingSubpartition) createSubpartition(), data, 10, new NoOpBufferAvailablityListener());
bbspr.releaseAllResources();
......@@ -124,7 +126,7 @@ public class BoundedBlockingSubpartitionTest extends SubpartitionTestBase {
}
@Override
public Reader createReader() throws IOException {
public Reader createReader(ResultSubpartitionView subpartitionView) throws IOException {
throw new UnsupportedOperationException();
}
......@@ -137,6 +139,36 @@ public class BoundedBlockingSubpartitionTest extends SubpartitionTestBase {
public void close() {}
}
private static class TestingBoundedData implements BoundedData {
private BoundedData.Reader reader;
private TestingBoundedData(BoundedData.Reader reader) {
this.reader = checkNotNull(reader);
}
@Override
public void writeBuffer(Buffer buffer) throws IOException {
}
@Override
public void finishWrite() throws IOException {
}
@Override
public Reader createReader(ResultSubpartitionView ignored) throws IOException {
return reader;
}
@Override
public long getSize() {
throw new UnsupportedOperationException();
}
@Override
public void close() {}
}
private static class TestingBoundedDataReader implements BoundedData.Reader {
boolean closed;
......
......@@ -59,7 +59,7 @@ public abstract class BoundedDataTestBase {
protected abstract BoundedData createBoundedDataWithRegion(Path tempFilePath, int regionSize) throws IOException;
private BoundedData createBoundedData() throws IOException {
protected BoundedData createBoundedData() throws IOException {
return createBoundedData(createTempPath());
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.io.network.partition;
/**
* A simple BufferAvailabilityListener that counts the number of notifications.
*/
final class CountingAvailabilityListener implements BufferAvailabilityListener {
int numNotifications;
@Override
public void notifyDataAvailable() {
numNotifications++;
}
}
......@@ -18,14 +18,45 @@
package org.apache.flink.runtime.io.network.partition;
import org.apache.flink.runtime.io.disk.FileChannelManager;
import org.apache.flink.runtime.io.disk.FileChannelManagerImpl;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog;
import org.apache.flink.runtime.util.EnvironmentInformation;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.IOException;
import java.nio.file.Path;
import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.buildSomeBuffer;
import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createFilledBufferConsumer;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
/**
* Tests that read the BoundedBlockingSubpartition with multiple threads in parallel.
*/
public class FileChannelBoundedDataTest extends BoundedDataTestBase {
private static final String tempDir = EnvironmentInformation.getTemporaryFileDirectory();
private static FileChannelManager fileChannelManager;
@BeforeClass
public static void setUp() {
fileChannelManager = new FileChannelManagerImpl(new String[] {tempDir}, "testing");
}
@AfterClass
public static void shutdown() throws Exception {
fileChannelManager.close();
}
@Override
protected boolean isRegionBased() {
return false;
......@@ -40,4 +71,145 @@ public class FileChannelBoundedDataTest extends BoundedDataTestBase {
protected BoundedData createBoundedDataWithRegion(Path tempFilePath, int regionSize) throws IOException {
throw new UnsupportedOperationException();
}
@Test
public void testReadNextBuffer() throws Exception {
final int numberOfBuffers = 3;
try (final BoundedData data = createBoundedData()) {
writeBuffers(data, numberOfBuffers);
final BoundedData.Reader reader = data.createReader();
final Buffer buffer1 = reader.nextBuffer();
final Buffer buffer2 = reader.nextBuffer();
assertNotNull(buffer1);
assertNotNull(buffer2);
// there are only two available memory segments for reading data
assertNull(reader.nextBuffer());
// cleanup
buffer1.recycleBuffer();
buffer2.recycleBuffer();
}
}
@Test
public void testRecycleBufferForNotifyingSubpartitionView() throws Exception {
final int numberOfBuffers = 2;
try (final BoundedData data = createBoundedData()) {
writeBuffers(data, numberOfBuffers);
final VerifyNotificationResultSubpartitionView subpartitionView = new VerifyNotificationResultSubpartitionView();
final BoundedData.Reader reader = data.createReader(subpartitionView);
final Buffer buffer1 = reader.nextBuffer();
final Buffer buffer2 = reader.nextBuffer();
assertNotNull(buffer1);
assertNotNull(buffer2);
assertFalse(subpartitionView.isAvailable);
buffer1.recycleBuffer();
// the view is notified while recycling buffer if reader has not tagged finished
assertTrue(subpartitionView.isAvailable);
subpartitionView.resetAvailable();
assertFalse(subpartitionView.isAvailable);
// the next buffer is null to make reader tag finished
assertNull(reader.nextBuffer());
buffer2.recycleBuffer();
// the view is not notified while recycling buffer if reader already finished
assertFalse(subpartitionView.isAvailable);
}
}
@Test
public void testRecycleBufferForNotifyingBufferAvailabilityListener() throws Exception {
final ResultSubpartition subpartition = createFileBoundedBlockingSubpartition();
final int numberOfBuffers = 2;
writeBuffers(subpartition, numberOfBuffers);
final VerifyNotificationBufferAvailabilityListener listener = new VerifyNotificationBufferAvailabilityListener();
final ResultSubpartitionView subpartitionView = subpartition.createReadView(listener);
// the notification is triggered while creating view
assertTrue(listener.isAvailable);
listener.resetAvailable();
assertFalse(listener.isAvailable);
final BufferAndBacklog buffer1 = subpartitionView.getNextBuffer();
final BufferAndBacklog buffer2 = subpartitionView.getNextBuffer();
assertNotNull(buffer1);
assertNotNull(buffer2);
// the next buffer is null in view because FileBufferReader has no available buffers for reading ahead
assertFalse(subpartitionView.isAvailable());
// recycle a buffer to trigger notification of data available
buffer1.buffer().recycleBuffer();
assertTrue(listener.isAvailable);
// cleanup
buffer2.buffer().recycleBuffer();
subpartitionView.releaseAllResources();
subpartition.release();
}
private static ResultSubpartition createFileBoundedBlockingSubpartition() {
final ResultPartition resultPartition = new ResultPartitionBuilder()
.setNetworkBufferSize(BUFFER_SIZE)
.setResultPartitionType(ResultPartitionType.BLOCKING)
.setBoundedBlockingSubpartitionType(BoundedBlockingSubpartitionType.FILE)
.setFileChannelManager(fileChannelManager)
.build();
return resultPartition.subpartitions[0];
}
private static void writeBuffers(BoundedData data, int numberOfBuffers) throws IOException {
for (int i = 0; i < numberOfBuffers; i++) {
data.writeBuffer(buildSomeBuffer(BUFFER_SIZE));
}
data.finishWrite();
}
private static void writeBuffers(ResultSubpartition subpartition, int numberOfBuffers) throws IOException {
for (int i = 0; i < numberOfBuffers; i++) {
subpartition.add(createFilledBufferConsumer(BUFFER_SIZE, BUFFER_SIZE));
}
subpartition.finish();
}
/**
* This subpartition view is used for verifying the {@link ResultSubpartitionView#notifyDataAvailable()}
* was ever called before.
*/
private static class VerifyNotificationResultSubpartitionView extends NoOpResultSubpartitionView {
private boolean isAvailable;
@Override
public void notifyDataAvailable() {
isAvailable = true;
}
private void resetAvailable() {
isAvailable = false;
}
}
/**
* This listener is used for verifying the notification logic in {@link ResultSubpartitionView#notifyDataAvailable()}.
*/
private static class VerifyNotificationBufferAvailabilityListener implements BufferAvailabilityListener {
private boolean isAvailable;
@Override
public void notifyDataAvailable() {
isAvailable = true;
}
private void resetAvailable() {
isAvailable = false;
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.test.runtime;
import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.client.program.MiniClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.api.reader.RecordReader;
import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
import org.apache.flink.runtime.io.network.api.writer.RecordWriterBuilder;
import org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartitionType;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.ScheduleMode;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
import org.apache.flink.testutils.serialization.types.ByteArrayType;
import org.apache.flink.util.TestLogger;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelPromise;
import org.junit.BeforeClass;
import org.junit.Test;
import java.util.concurrent.CompletableFuture;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;
/**
* Tests the bug reported in FLINK-131O0.
*
* <p>The implementation of {@link org.apache.flink.runtime.io.network.partition.BoundedData.Reader#nextBuffer()}
* for {@link BoundedBlockingSubpartitionType#FILE} assumes that there is always an available buffer, otherwise
* an IOException is thrown and it always assumes that pool of two buffers is enough (before using the 3rd buffer,
* first one was expected to be recycled already). But in the case of pending flush operation (when the socket channel
* is not writable while netty thread is calling {@link ChannelHandlerContext#writeAndFlush(Object, ChannelPromise)}),
* the first fetched buffer from {@link org.apache.flink.runtime.io.network.partition.FileChannelBoundedData} has not
* been recycled while fetching the second buffer to trigger next read ahead, which breaks the above assumption.
*/
public class FileBufferReaderITCase extends TestLogger {
private static final int parallelism = 8;
private static final int numRecords = 100_000;
private static final byte[] dataSource = new byte[1024];
@BeforeClass
public static void setup() {
for (int i = 0; i < dataSource.length; i++) {
dataSource[i] = 0;
}
}
@Test
public void testSequentialReading() throws Exception {
// setup
final Configuration configuration = new Configuration();
configuration.setString(RestOptions.BIND_PORT, "0");
configuration.setString(NettyShuffleEnvironmentOptions.NETWORK_BOUNDED_BLOCKING_SUBPARTITION_TYPE, "file");
final MiniClusterConfiguration miniClusterConfiguration = new MiniClusterConfiguration.Builder()
.setConfiguration(configuration)
.setNumTaskManagers(parallelism)
.setNumSlotsPerTaskManager(1)
.build();
try (final MiniCluster miniCluster = new MiniCluster(miniClusterConfiguration)) {
miniCluster.start();
final MiniClusterClient client = new MiniClusterClient(configuration, miniCluster);
final JobGraph jobGraph = createJobGraph();
final CompletableFuture<JobSubmissionResult> submitFuture = client.submitJob(jobGraph);
// wait for the submission to succeed
final JobSubmissionResult result = submitFuture.get();
final CompletableFuture<JobResult> resultFuture = client.requestJobResult(result.getJobID());
final JobResult jobResult = resultFuture.get();
assertThat(jobResult.getSerializedThrowable().isPresent(), is(false));
}
}
private static JobGraph createJobGraph() {
final SlotSharingGroup group1 = new SlotSharingGroup();
final SlotSharingGroup group2 = new SlotSharingGroup();
final JobVertex source = new JobVertex("source");
source.setInvokableClass(TestSourceInvokable.class);
source.setParallelism(parallelism);
source.setSlotSharingGroup(group1);
final JobVertex sink = new JobVertex("sink");
sink.setInvokableClass(TestSinkInvokable.class);
sink.setParallelism(parallelism);
sink.setSlotSharingGroup(group2);
sink.connectNewDataSetAsInput(source, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
final JobGraph jobGraph = new JobGraph(source, sink);
jobGraph.setScheduleMode(ScheduleMode.LAZY_FROM_SOURCES);
return jobGraph;
}
/**
* Basic source {@link AbstractInvokable} which sends the elements to the
* {@link TestSinkInvokable}.
*/
public static final class TestSourceInvokable extends AbstractInvokable {
/**
* Create an Invokable task and set its environment.
*
* @param environment The environment assigned to this invokable.
*/
public TestSourceInvokable(Environment environment) {
super(environment);
}
@Override
public void invoke() throws Exception {
final RecordWriter<ByteArrayType> writer = new RecordWriterBuilder().build(getEnvironment().getWriter(0));
final ByteArrayType bytes = new ByteArrayType(dataSource);
int counter = 0;
while (counter++ < numRecords) {
try {
writer.emit(bytes);
writer.flushAll();
} finally {
writer.clearBuffers();
}
}
}
}
/**
* Basic sink {@link AbstractInvokable} which verifies the sent elements
* from the {@link TestSourceInvokable}.
*/
public static final class TestSinkInvokable extends AbstractInvokable {
private int numReceived = 0;
/**
* Create an Invokable task and set its environment.
*
* @param environment The environment assigned to this invokable.
*/
public TestSinkInvokable(Environment environment) {
super(environment);
}
@Override
public void invoke() throws Exception {
final RecordReader<ByteArrayType> reader = new RecordReader<>(
getEnvironment().getInputGate(0),
ByteArrayType.class,
getEnvironment().getTaskManagerInfo().getTmpDirectories());
while (reader.hasNext()) {
reader.next();
numReceived++;
}
assertThat(numReceived, is(numRecords));
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册