提交 b2f8e307 编写于 作者: U Ufuk Celebi

[FLINK-2089] [runtime] Fix illegal state in RecordWriter after partition write failure

- Address PR comments

This closes #1050.
上级 6f07c5f3
...@@ -86,8 +86,7 @@ public class RecordWriter<T extends IOReadableWritable> { ...@@ -86,8 +86,7 @@ public class RecordWriter<T extends IOReadableWritable> {
Buffer buffer = serializer.getCurrentBuffer(); Buffer buffer = serializer.getCurrentBuffer();
if (buffer != null) { if (buffer != null) {
writer.writeBuffer(buffer, targetChannel); writeBuffer(buffer, targetChannel, serializer);
serializer.clearCurrentBuffer();
} }
buffer = writer.getBufferProvider().requestBufferBlocking(); buffer = writer.getBufferProvider().requestBufferBlocking();
...@@ -112,8 +111,7 @@ public class RecordWriter<T extends IOReadableWritable> { ...@@ -112,8 +111,7 @@ public class RecordWriter<T extends IOReadableWritable> {
Buffer buffer = serializer.getCurrentBuffer(); Buffer buffer = serializer.getCurrentBuffer();
if (buffer != null) { if (buffer != null) {
writer.writeBuffer(buffer, targetChannel); writeBuffer(buffer, targetChannel, serializer);
serializer.clearCurrentBuffer();
} }
buffer = writer.getBufferProvider().requestBufferBlocking(); buffer = writer.getBufferProvider().requestBufferBlocking();
...@@ -135,8 +133,7 @@ public class RecordWriter<T extends IOReadableWritable> { ...@@ -135,8 +133,7 @@ public class RecordWriter<T extends IOReadableWritable> {
throw new IllegalStateException("Serializer has data but no buffer."); throw new IllegalStateException("Serializer has data but no buffer.");
} }
writer.writeBuffer(buffer, targetChannel); writeBuffer(buffer, targetChannel, serializer);
serializer.clearCurrentBuffer();
writer.writeEvent(event, targetChannel); writer.writeEvent(event, targetChannel);
...@@ -157,8 +154,7 @@ public class RecordWriter<T extends IOReadableWritable> { ...@@ -157,8 +154,7 @@ public class RecordWriter<T extends IOReadableWritable> {
synchronized (serializer) { synchronized (serializer) {
Buffer buffer = serializer.getCurrentBuffer(); Buffer buffer = serializer.getCurrentBuffer();
if (buffer != null) { if (buffer != null) {
writer.writeBuffer(buffer, targetChannel); writeBuffer(buffer, targetChannel, serializer);
serializer.clearCurrentBuffer();
buffer = writer.getBufferProvider().requestBufferBlocking(); buffer = writer.getBufferProvider().requestBufferBlocking();
serializer.setNextBuffer(buffer); serializer.setNextBuffer(buffer);
...@@ -174,26 +170,31 @@ public class RecordWriter<T extends IOReadableWritable> { ...@@ -174,26 +170,31 @@ public class RecordWriter<T extends IOReadableWritable> {
RecordSerializer<T> serializer = serializers[targetChannel]; RecordSerializer<T> serializer = serializers[targetChannel];
synchronized (serializer) { synchronized (serializer) {
Buffer buffer = serializer.getCurrentBuffer(); try {
Buffer buffer = serializer.getCurrentBuffer();
if (buffer != null) { if (buffer != null) {
// Only clear the serializer after the buffer was written out. writeBuffer(buffer, targetChannel, serializer);
writer.writeBuffer(buffer, targetChannel); }
} finally {
serializer.clear();
} }
serializer.clear();
} }
} }
} }
public void clearBuffers() { public void clearBuffers() {
for (RecordSerializer<?> s : serializers) { for (RecordSerializer<?> serializer : serializers) {
synchronized (s) { synchronized (serializer) {
Buffer b = s.getCurrentBuffer(); try {
s.clear(); Buffer buffer = serializer.getCurrentBuffer();
if (b != null) { if (buffer != null) {
b.recycle(); buffer.recycle();
}
}
finally {
serializer.clear();
} }
} }
} }
...@@ -208,4 +209,22 @@ public class RecordWriter<T extends IOReadableWritable> { ...@@ -208,4 +209,22 @@ public class RecordWriter<T extends IOReadableWritable> {
} }
} }
/**
* Writes the buffer to the {@link ResultPartitionWriter}.
*
* <p> The buffer is cleared from the serializer state after a call to this method.
*/
private void writeBuffer(
Buffer buffer,
int targetChannel,
RecordSerializer<T> serializer) throws IOException {
try {
writer.writeBuffer(buffer, targetChannel);
}
finally {
serializer.clearCurrentBuffer();
}
}
} }
...@@ -19,9 +19,13 @@ ...@@ -19,9 +19,13 @@
package org.apache.flink.runtime.io.network.api.writer; package org.apache.flink.runtime.io.network.api.writer;
import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.BufferProvider; import org.apache.flink.runtime.io.network.buffer.BufferProvider;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.util.TestBufferFactory; import org.apache.flink.runtime.io.network.util.TestBufferFactory;
import org.apache.flink.runtime.io.network.util.TestTaskEvent;
import org.apache.flink.types.IntValue; import org.apache.flink.types.IntValue;
import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.mockito.invocation.InvocationOnMock; import org.mockito.invocation.InvocationOnMock;
...@@ -37,6 +41,7 @@ import java.util.concurrent.Executors; ...@@ -37,6 +41,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkNotNull;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
...@@ -59,100 +64,6 @@ public class RecordWriterTest { ...@@ -59,100 +64,6 @@ public class RecordWriterTest {
// Resource release tests // Resource release tests
// --------------------------------------------------------------------------------------------- // ---------------------------------------------------------------------------------------------
@Test
public void testClearBuffersAfterEmit() throws Exception {
final Buffer buffer = TestBufferFactory.createBuffer(32);
BufferProvider bufferProvider = createBufferProvider(buffer);
ResultPartitionWriter partitionWriter = createResultPartitionWriter(bufferProvider);
RecordWriter<IntValue> recordWriter = new RecordWriter<IntValue>(partitionWriter);
// Emit single record, the buffer will not be written out.
// Therefore, it needs to be cleared explicitly.
recordWriter.emit(new IntValue(0));
// Verify that a buffer is requested, but not written out.
verify(bufferProvider, times(1)).requestBufferBlocking();
verify(partitionWriter, never()).writeBuffer(any(Buffer.class), anyInt());
recordWriter.clearBuffers();
assertTrue("Buffer not recycled.", buffer.isRecycled());
}
@Test
public void testClearBuffersAfterExceptionInFlushWritePartition() throws Exception {
// Size of buffer ensures that a single record will fill the buffer.
final Buffer buffer = TestBufferFactory.createBuffer(4);
BufferProvider bufferProvider = createBufferProvider(buffer);
ResultPartitionWriter partitionWriter = createResultPartitionWriter(bufferProvider);
doThrow(new IOException("Expected test exception"))
.when(partitionWriter).writeBuffer(eq(buffer), eq(0));
RecordWriter<IntValue> recordWriter = new RecordWriter<IntValue>(partitionWriter);
try {
// Emit single record, the buffer will not be written out,
// because of the Exception. Therefore, it needs to be cleared
// explicitly.
recordWriter.emit(new IntValue(0));
fail("Did not throw expected Exception. This means that the record "
+ "writer did not request a buffer as expected.");
}
catch (IOException expected) {
}
// Verify that a buffer is requested, but not written out due to the Exception.
verify(bufferProvider, times(1)).requestBufferBlocking();
verify(partitionWriter, times(1)).writeBuffer(any(Buffer.class), anyInt());
recordWriter.clearBuffers();
assertTrue("Buffer not recycled.", buffer.isRecycled());
}
@Test
public void testClearBuffersAfterExceptionInEmitWritePartition() throws Exception {
// Size of buffer ensures that a single record will NOT fill the buffer.
final Buffer buffer = TestBufferFactory.createBuffer(32);
BufferProvider bufferProvider = createBufferProvider(buffer);
ResultPartitionWriter partitionWriter = createResultPartitionWriter(bufferProvider);
doThrow(new IOException("Expected test exception"))
.when(partitionWriter).writeBuffer(eq(buffer), eq(0));
RecordWriter<IntValue> recordWriter = new RecordWriter<IntValue>(partitionWriter);
try {
recordWriter.emit(new IntValue(0));
// Verify that a buffer is requested, but not written out.
verify(bufferProvider, times(1)).requestBufferBlocking();
verify(partitionWriter, never()).writeBuffer(any(Buffer.class), anyInt());
// Now flush the record.
recordWriter.flush();
fail("Did not throw expected Exception. This means that the record "
+ "writer did not request a buffer as expected.");
}
catch (IOException expected) {
}
// Flushing the buffer tried to write out the buffer.
verify(partitionWriter, times(1)).writeBuffer(any(Buffer.class), anyInt());
recordWriter.clearBuffers();
assertTrue("Buffer not recycled.", buffer.isRecycled());
}
/** /**
* Tests a fix for FLINK-2089. * Tests a fix for FLINK-2089.
* *
...@@ -241,6 +152,124 @@ public class RecordWriterTest { ...@@ -241,6 +152,124 @@ public class RecordWriterTest {
} }
} }
@Test
public void testClearBuffersAfterExceptionInPartitionWriter() throws Exception {
NetworkBufferPool buffers = null;
BufferPool bufferPool = null;
try {
buffers = new NetworkBufferPool(1, 1024);
bufferPool = spy(buffers.createBufferPool(1, true));
ResultPartitionWriter partitionWriter = mock(ResultPartitionWriter.class);
when(partitionWriter.getBufferProvider()).thenReturn(checkNotNull(bufferPool));
when(partitionWriter.getNumberOfOutputChannels()).thenReturn(1);
// Recycle buffer and throw Exception
doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
Buffer buffer = (Buffer) invocation.getArguments()[0];
buffer.recycle();
throw new RuntimeException("Expected test Exception");
}
}).when(partitionWriter).writeBuffer(any(Buffer.class), anyInt());
RecordWriter<IntValue> recordWriter = new RecordWriter<>(partitionWriter);
try {
// Verify that emit correctly clears the buffer. The infinite loop looks
// dangerous indeed, but the buffer will only be flushed after its full. Adding a
// manual flush here doesn't test this case (see next).
for (;;) {
recordWriter.emit(new IntValue(0));
}
}
catch (Exception e) {
// Verify that the buffer is not part of the record writer state after a failure
// to flush it out. If the buffer is still part of the record writer state, this
// will fail, because the buffer has already been recycled. NOTE: The mock
// partition writer needs to recycle the buffer to correctly test this.
recordWriter.clearBuffers();
}
// Verify expected methods have been called
verify(partitionWriter, times(1)).writeBuffer(any(Buffer.class), anyInt());
verify(bufferPool, times(1)).requestBufferBlocking();
try {
// Verify that manual flushing correctly clears the buffer.
recordWriter.emit(new IntValue(0));
recordWriter.flush();
Assert.fail("Did not throw expected test Exception");
}
catch (Exception e) {
recordWriter.clearBuffers();
}
// Verify expected methods have been called
verify(partitionWriter, times(2)).writeBuffer(any(Buffer.class), anyInt());
verify(bufferPool, times(2)).requestBufferBlocking();
try {
// Verify that broadcast emit correctly clears the buffer.
for (;;) {
recordWriter.broadcastEmit(new IntValue(0));
}
}
catch (Exception e) {
recordWriter.clearBuffers();
}
// Verify expected methods have been called
verify(partitionWriter, times(3)).writeBuffer(any(Buffer.class), anyInt());
verify(bufferPool, times(3)).requestBufferBlocking();
try {
// Verify that end of super step correctly clears the buffer.
recordWriter.emit(new IntValue(0));
recordWriter.sendEndOfSuperstep();
Assert.fail("Did not throw expected test Exception");
}
catch (Exception e) {
recordWriter.clearBuffers();
}
// Verify expected methods have been called
verify(partitionWriter, times(4)).writeBuffer(any(Buffer.class), anyInt());
verify(bufferPool, times(4)).requestBufferBlocking();
try {
// Verify that broadcasting and event correctly clears the buffer.
recordWriter.emit(new IntValue(0));
recordWriter.broadcastEvent(new TestTaskEvent());
Assert.fail("Did not throw expected test Exception");
}
catch (Exception e) {
recordWriter.clearBuffers();
}
// Verify expected methods have been called
verify(partitionWriter, times(5)).writeBuffer(any(Buffer.class), anyInt());
verify(bufferPool, times(5)).requestBufferBlocking();
}
finally {
if (bufferPool != null) {
assertEquals(1, bufferPool.getNumberOfAvailableMemorySegments());
bufferPool.lazyDestroy();
}
if (buffers != null) {
assertEquals(1, buffers.getNumberOfAvailableMemorySegments());
buffers.destroy();
}
}
}
@Test @Test
public void testSerializerClearedAfterClearBuffers() throws Exception { public void testSerializerClearedAfterClearBuffers() throws Exception {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册