提交 f4f45103 编写于 作者: A azagrebin 提交者: Chesnay Schepler

[FLINK-12570][network] Work against ResultPartitionWriter interface

This part of Shuffle API refactoring: make task not depend on the
concrete implementation of ResultPartitionWriter (ResultPartition).
上级 66d9270b
......@@ -22,6 +22,8 @@ import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
import org.apache.flink.runtime.io.network.buffer.BufferProvider;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import javax.annotation.Nullable;
import java.io.IOException;
/**
......@@ -65,4 +67,21 @@ public interface ResultPartitionWriter extends AutoCloseable {
* Manually trigger consumption from enqueued {@link BufferConsumer BufferConsumers} in one specified subpartition.
*/
void flush(int subpartitionIndex);
/**
* Fail the production of the partition.
*
* <p>This method propagates non-{@code null} failure causes to consumers on a best-effort basis.
* Closing of partition is still needed.
*
* @param throwable failure cause
*/
void fail(@Nullable Throwable throwable);
/**
* Successfully finish the production of the partition.
*
* <p>Closing of partition is still needed.
*/
void finish() throws IOException;
}
......@@ -256,6 +256,7 @@ public class ResultPartition implements ResultPartitionWriter, BufferPoolOwner {
*
* <p>For BLOCKING results, this will trigger the deployment of consuming tasks.
*/
@Override
public void finish() throws IOException {
boolean success = false;
......@@ -313,6 +314,7 @@ public class ResultPartition implements ResultPartitionWriter, BufferPoolOwner {
}
}
@Override
public void fail(@Nullable Throwable throwable) {
partitionManager.releasePartition(partitionId, throwable);
}
......
......@@ -52,7 +52,6 @@ import org.apache.flink.runtime.io.network.NetworkEnvironment;
import org.apache.flink.runtime.io.network.TaskEventDispatcher;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
......@@ -190,7 +189,7 @@ public class Task implements Runnable, TaskActions, CheckpointListener {
/** Serialized version of the job specific execution configuration (see {@link ExecutionConfig}). */
private final SerializedValue<ExecutionConfig> serializedExecutionConfig;
private final ResultPartition[] producedPartitions;
private final ResultPartitionWriter[] producedPartitions;
private final SingleInputGate[] inputGates;
......@@ -598,7 +597,7 @@ public class Task implements Runnable, TaskActions, CheckpointListener {
setupPartionsAndGates(producedPartitions, inputGates);
for (ResultPartition partition : producedPartitions) {
for (ResultPartitionWriter partition : producedPartitions) {
taskEventDispatcher.registerPartition(partition.getPartitionId());
}
......@@ -688,7 +687,7 @@ public class Task implements Runnable, TaskActions, CheckpointListener {
// ----------------------------------------------------------------
// finish the produced partitions. if this fails, we consider the execution failed.
for (ResultPartition partition : producedPartitions) {
for (ResultPartitionWriter partition : producedPartitions) {
if (partition != null) {
partition.finish();
}
......@@ -845,7 +844,7 @@ public class Task implements Runnable, TaskActions, CheckpointListener {
private void releaseNetworkResources() {
LOG.debug("Release task {} network resources (state: {}).", taskNameWithSubtask, getExecutionState());
for (ResultPartition partition : producedPartitions) {
for (ResultPartitionWriter partition : producedPartitions) {
taskEventDispatcher.unregisterPartition(partition.getPartitionId());
if (isCanceledOrFailed()) {
partition.fail(getFailureCause());
......@@ -860,7 +859,7 @@ public class Task implements Runnable, TaskActions, CheckpointListener {
* release partitions and gates. Another is from task thread during task exiting.
*/
private void closeNetworkResources() {
for (ResultPartition partition : producedPartitions) {
for (ResultPartitionWriter partition : producedPartitions) {
try {
partition.close();
} catch (Throwable t) {
......
......@@ -23,6 +23,7 @@ import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
import org.apache.flink.runtime.io.network.buffer.BufferProvider;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
import java.io.IOException;
......@@ -109,5 +110,15 @@ public abstract class AbstractCollectingResultPartitionWriter implements ResultP
public void close() {
}
@Override
public void fail(@Nullable Throwable throwable) {
throw new UnsupportedOperationException();
}
@Override
public void finish() {
throw new UnsupportedOperationException();
}
protected abstract void deserializeBuffer(Buffer buffer) throws IOException;
}
......@@ -55,6 +55,8 @@ import org.junit.rules.TemporaryFolder;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
......@@ -510,6 +512,16 @@ public class RecordWriterTest {
public void flush(int subpartitionIndex) {
}
@Override
public void fail(@Nullable Throwable throwable) {
throw new UnsupportedOperationException();
}
@Override
public void finish() {
throw new UnsupportedOperationException();
}
@Override
public void close() {
}
......@@ -575,6 +587,16 @@ public class RecordWriterTest {
public void flush(int subpartitionIndex) {
}
@Override
public void fail(@Nullable Throwable throwable) {
throw new UnsupportedOperationException();
}
@Override
public void finish() {
throw new UnsupportedOperationException();
}
@Override
public void close() {
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册