提交 a17d4e82 编写于 作者: U Ufuk Celebi

[FLINK-2460] [runtime] Check parent state in isReleased() check of partition view

- Address PR comments

This closes #1051.
上级 5a9daca4
......@@ -41,7 +41,7 @@ class PipelinedSubpartition extends ResultSubpartition {
private boolean isFinished;
/** Flag indicating whether the subpartition has been released. */
private boolean isReleased;
private volatile boolean isReleased;
/**
* A data availability listener. Registered, when the consuming task is faster than the
......@@ -166,6 +166,11 @@ class PipelinedSubpartition extends ResultSubpartition {
return 0;
}
@Override
public boolean isReleased() {
return isReleased;
}
@Override
public PipelinedSubpartitionView createReadView(BufferProvider bufferProvider) {
synchronized (buffers) {
......
......@@ -81,4 +81,6 @@ public abstract class ResultSubpartition {
abstract int releaseMemory() throws IOException;
abstract public boolean isReleased();
}
......@@ -59,7 +59,7 @@ class SpillableSubpartition extends ResultSubpartition {
private boolean isFinished;
/** Flag indicating whether the subpartition has been released. */
boolean isReleased;
private volatile boolean isReleased;
/** The read view to consume this subpartition. */
private ResultSubpartitionView readView;
......@@ -167,6 +167,11 @@ class SpillableSubpartition extends ResultSubpartition {
return 0;
}
@Override
public boolean isReleased() {
return isReleased;
}
@Override
public ResultSubpartitionView createReadView(BufferProvider bufferProvider) throws IOException {
synchronized (buffers) {
......
......@@ -73,7 +73,7 @@ class SpillableSubpartitionView implements ResultSubpartitionView {
// 1) In-memory
synchronized (parent.buffers) {
if (parent.isReleased) {
if (parent.isReleased()) {
return null;
}
......@@ -162,7 +162,7 @@ class SpillableSubpartitionView implements ResultSubpartitionView {
@Override
public boolean isReleased() {
return isReleased.get();
return parent.isReleased() || isReleased.get();
}
@Override
......
......@@ -187,7 +187,7 @@ class SpilledSubpartitionViewAsyncIO implements ResultSubpartitionView {
@Override
public boolean isReleased() {
return isReleased;
return parent.isReleased() || isReleased;
}
@Override
......
......@@ -108,7 +108,7 @@ class SpilledSubpartitionViewSyncIO implements ResultSubpartitionView {
@Override
public boolean isReleased() {
return isReleased.get();
return parent.isReleased() || isReleased.get();
}
@Override
......
......@@ -18,11 +18,14 @@
package org.apache.flink.runtime.io.network.partition;
import org.apache.flink.api.common.JobID;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferProvider;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.util.TestBufferFactory;
import org.apache.flink.runtime.io.network.util.TestConsumerCallback;
import org.apache.flink.runtime.io.network.util.TestNotificationListener;
import org.apache.flink.runtime.io.network.util.TestPooledBufferProvider;
......@@ -36,6 +39,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import static org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode.ASYNC;
import static org.apache.flink.runtime.io.network.util.TestBufferFactory.createBuffer;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
......
......@@ -19,10 +19,15 @@
package org.apache.flink.runtime.io.network.partition;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.util.TestBufferFactory;
import org.apache.flink.runtime.io.network.util.TestInfiniteBufferProvider;
import org.apache.flink.util.TestLogger;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
/**
......@@ -68,4 +73,41 @@ public abstract class SubpartitionTestBase extends TestLogger {
}
}
}
@Test
public void testReleaseParent() throws Exception {
final ResultSubpartition partition = createSubpartition();
verifyViewReleasedAfterParentRelease(partition);
}
@Test
public void testReleaseParentAfterSpilled() throws Exception {
final ResultSubpartition partition = createSubpartition();
partition.releaseMemory();
verifyViewReleasedAfterParentRelease(partition);
}
private void verifyViewReleasedAfterParentRelease(ResultSubpartition partition) throws Exception {
// Add a buffer
Buffer buffer = TestBufferFactory.createBuffer();
partition.add(buffer);
partition.finish();
TestInfiniteBufferProvider buffers = new TestInfiniteBufferProvider();
// Create the view
ResultSubpartitionView view = partition.createReadView(buffers);
// The added buffer and end-of-partition event
assertNotNull(view.getNextBuffer());
assertNotNull(view.getNextBuffer());
// Release the parent
assertFalse(view.isReleased());
partition.release();
// Verify that parent release is reflected at partition view
assertTrue(view.isReleased());
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册