提交 71a53d49 编写于 作者: Z Zhijiang 提交者: Stephan Ewen

[FLINK-13245][network] Remove redundant bookkeeping for already canceled input channel IDs

上级 acd15d25
......@@ -26,7 +26,6 @@ import org.apache.flink.runtime.io.network.partition.ProducerFailedException;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannel.BufferAndAvailability;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID;
import org.apache.flink.shaded.guava18.com.google.common.collect.Sets;
import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFutureListener;
......@@ -40,7 +39,6 @@ import javax.annotation.Nullable;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
......@@ -62,8 +60,6 @@ class PartitionRequestQueue extends ChannelInboundHandlerAdapter {
/** All the readers created for the consumers' partition requests. */
private final ConcurrentMap<InputChannelID, NetworkSequenceViewReader> allReaders = new ConcurrentHashMap<>();
private final Set<InputChannelID> released = Sets.newHashSet();
private boolean fatalError;
private ChannelHandlerContext ctx;
......@@ -175,9 +171,6 @@ class PartitionRequestQueue extends ChannelInboundHandlerAdapter {
} else if (msg.getClass() == InputChannelID.class) {
// Release partition view that get a cancel request.
InputChannelID toCancel = (InputChannelID) msg;
if (released.contains(toCancel)) {
return;
}
// remove reader from queue of available readers
availableReaders.removeIf(reader -> reader.getReceiverId().equals(toCancel));
......@@ -222,7 +215,6 @@ class PartitionRequestQueue extends ChannelInboundHandlerAdapter {
if (!reader.isReleased()) {
continue;
}
markAsReleased(reader.getReceiverId());
Throwable cause = reader.getFailureCause();
if (cause != null) {
......@@ -312,14 +304,6 @@ class PartitionRequestQueue extends ChannelInboundHandlerAdapter {
reader.notifySubpartitionConsumed();
reader.setRegisteredAsAvailable(false);
reader.releaseAllResources();
markAsReleased(reader.getReceiverId());
}
/**
* Marks a receiver as released.
*/
private void markAsReleased(InputChannelID receiverId) {
released.add(receiverId);
}
// This listener is called after an element of the current nonEmptyReader has been
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册