提交 acd15d25 编写于 作者: Z Zhijiang 提交者: Stephan Ewen

[FLINK-13245][network] Make subpartition consumption notification independant

上级 653ba1a1
......@@ -22,7 +22,6 @@ import org.apache.flink.runtime.io.network.buffer.BufferPoolOwner;
import org.apache.flink.util.function.FunctionWithException;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;
import static org.apache.flink.util.Preconditions.checkState;
......@@ -31,11 +30,18 @@ import static org.apache.flink.util.Preconditions.checkState;
*/
public class ReleaseOnConsumptionResultPartition extends ResultPartition {
private static final Object lock = new Object();
/**
* A flag for each subpartition indicating whether it was already consumed or not.
*/
private final boolean[] consumedSubpartitions;
/**
* The total number of references to subpartitions of this result. The result partition can be
* safely released, iff the reference count is zero.
*/
private final AtomicInteger pendingReferences = new AtomicInteger();
private int numUnconsumedSubpartitions;
ReleaseOnConsumptionResultPartition(
String owningTaskName,
......@@ -47,12 +53,13 @@ public class ReleaseOnConsumptionResultPartition extends ResultPartition {
FunctionWithException<BufferPoolOwner, BufferPool, IOException> bufferPoolFactory) {
super(owningTaskName, partitionId, partitionType, subpartitions, numTargetKeyGroups, partitionManager, bufferPoolFactory);
pendingReferences.set(subpartitions.length);
this.consumedSubpartitions = new boolean[subpartitions.length];
this.numUnconsumedSubpartitions = subpartitions.length;
}
@Override
public ResultSubpartitionView createSubpartitionView(int index, BufferAvailabilityListener availabilityListener) throws IOException {
checkState(pendingReferences.get() > 0, "Partition not pinned.");
checkState(numUnconsumedSubpartitions > 0, "Partition not pinned.");
return super.createSubpartitionView(index, availabilityListener);
}
......@@ -63,22 +70,33 @@ public class ReleaseOnConsumptionResultPartition extends ResultPartition {
return;
}
int refCnt = pendingReferences.decrementAndGet();
final int remainingUnconsumed;
if (refCnt == 0) {
partitionManager.onConsumedPartition(this);
} else if (refCnt < 0) {
throw new IllegalStateException("All references released.");
// we synchronize only the bookkeeping section, to avoid holding the lock during any
// calls into other components
synchronized (lock) {
if (consumedSubpartitions[subpartitionIndex]) {
// repeated call - ignore
return;
}
consumedSubpartitions[subpartitionIndex] = true;
remainingUnconsumed = (--numUnconsumedSubpartitions);
}
LOG.debug("{}: Received release notification for subpartition {}.",
this, subpartitionIndex);
LOG.debug("{}: Received consumed notification for subpartition {}.", this, subpartitionIndex);
if (remainingUnconsumed == 0) {
partitionManager.onConsumedPartition(this);
} else if (remainingUnconsumed < 0) {
throw new IllegalStateException("Received consume notification even though all subpartitions are already consumed.");
}
}
@Override
public String toString() {
return "ReleaseOnConsumptionResultPartition " + partitionId.toString() + " [" + partitionType + ", "
+ subpartitions.length + " subpartitions, "
+ pendingReferences + " pending references]";
+ numUnconsumedSubpartitions + " pending consumptions]";
}
}
......@@ -21,8 +21,8 @@ import org.apache.flink.util.TestLogger;
import org.junit.Test;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
/**
* Tests for the {@link ReleaseOnConsumptionResultPartitionTest}.
......@@ -41,9 +41,42 @@ public class ReleaseOnConsumptionResultPartitionTest extends TestLogger {
manager.registerResultPartition(partition);
partition.onConsumedSubpartition(0);
assertThat(partition.isReleased(), is(false));
assertFalse(partition.isReleased());
partition.onConsumedSubpartition(1);
assertThat(partition.isReleased(), is(true));
assertTrue(partition.isReleased());
}
@Test
public void testMultipleReleaseCallsAreIdempotent() {
final ResultPartitionManager manager = new ResultPartitionManager();
final ResultPartition partition = new ResultPartitionBuilder()
.setNumberOfSubpartitions(2)
.isReleasedOnConsumption(true)
.setResultPartitionManager(manager)
.build();
manager.registerResultPartition(partition);
partition.onConsumedSubpartition(0);
partition.onConsumedSubpartition(0);
assertFalse(partition.isReleased());
}
@Test
public void testReleaseAfterIdempotentCalls() {
final ResultPartitionManager manager = new ResultPartitionManager();
final ResultPartition partition = new ResultPartitionBuilder()
.setNumberOfSubpartitions(2)
.isReleasedOnConsumption(true)
.setResultPartitionManager(manager)
.build();
manager.registerResultPartition(partition);
partition.onConsumedSubpartition(0);
partition.onConsumedSubpartition(0);
partition.onConsumedSubpartition(1);
assertTrue(partition.isReleased());
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册