未验证 提交 687cc919 编写于 作者: A Andrey Zagrebin 提交者: Till Rohrmann

[hotfix][network] Rename taskExecutorLocation to taskExecutorResourceId and small fixes

上级 7d8869d7
......@@ -87,7 +87,7 @@ public class NettyShuffleEnvironment implements ShuffleEnvironment<ResultPartiti
private final Object lock = new Object();
private final ResourceID taskExecutorLocation;
private final ResourceID taskExecutorResourceId;
private final NettyShuffleEnvironmentConfiguration config;
......@@ -106,19 +106,19 @@ public class NettyShuffleEnvironment implements ShuffleEnvironment<ResultPartiti
private boolean isClosed;
private NettyShuffleEnvironment(
ResourceID taskExecutorLocation,
ResourceID taskExecutorResourceId,
NettyShuffleEnvironmentConfiguration config,
NetworkBufferPool networkBufferPool,
ConnectionManager connectionManager,
ResultPartitionManager resultPartitionManager,
ResultPartitionFactory resultPartitionFactory,
SingleInputGateFactory singleInputGateFactory) {
this.taskExecutorLocation = taskExecutorLocation;
this.taskExecutorResourceId = taskExecutorResourceId;
this.config = config;
this.networkBufferPool = networkBufferPool;
this.connectionManager = connectionManager;
this.resultPartitionManager = resultPartitionManager;
this.inputGatesById = new ConcurrentHashMap<>();
this.inputGatesById = new ConcurrentHashMap<>(10);
this.resultPartitionFactory = resultPartitionFactory;
this.singleInputGateFactory = singleInputGateFactory;
this.isClosed = false;
......@@ -126,11 +126,11 @@ public class NettyShuffleEnvironment implements ShuffleEnvironment<ResultPartiti
public static NettyShuffleEnvironment create(
NettyShuffleEnvironmentConfiguration config,
ResourceID taskExecutorLocation,
ResourceID taskExecutorResourceId,
TaskEventPublisher taskEventPublisher,
MetricGroup metricGroup,
IOManager ioManager) {
checkNotNull(taskExecutorLocation);
checkNotNull(taskExecutorResourceId);
checkNotNull(ioManager);
checkNotNull(taskEventPublisher);
checkNotNull(config);
......@@ -159,7 +159,7 @@ public class NettyShuffleEnvironment implements ShuffleEnvironment<ResultPartiti
config.isForcePartitionReleaseOnConsumption());
SingleInputGateFactory singleInputGateFactory = new SingleInputGateFactory(
taskExecutorLocation,
taskExecutorResourceId,
config,
connectionManager,
resultPartitionManager,
......@@ -167,7 +167,7 @@ public class NettyShuffleEnvironment implements ShuffleEnvironment<ResultPartiti
networkBufferPool);
return new NettyShuffleEnvironment(
taskExecutorLocation,
taskExecutorResourceId,
config,
networkBufferPool,
connectionManager,
......@@ -320,7 +320,7 @@ public class NettyShuffleEnvironment implements ShuffleEnvironment<ResultPartiti
checkArgument(shuffleDescriptor instanceof NettyShuffleDescriptor,
"Tried to update unknown channel with unknown ShuffleDescriptor %s.",
shuffleDescriptor.getClass().getName());
inputGate.updateInputChannel(taskExecutorLocation, (NettyShuffleDescriptor) shuffleDescriptor);
inputGate.updateInputChannel(taskExecutorResourceId, (NettyShuffleDescriptor) shuffleDescriptor);
return true;
}
......@@ -358,6 +358,7 @@ public class NettyShuffleEnvironment implements ShuffleEnvironment<ResultPartiti
LOG.info("Shutting down the network environment and its components.");
// terminate all network connections
//noinspection OverlyBroadCatchBlock
try {
LOG.debug("Shutting down network connection manager");
connectionManager.shutdown();
......
......@@ -42,7 +42,7 @@ import java.util.Optional;
* Factory for {@link ResultPartition} to use in {@link NettyShuffleEnvironment}.
*/
public class ResultPartitionFactory {
private static final Logger LOG = LoggerFactory.getLogger(ResultPartition.class);
private static final Logger LOG = LoggerFactory.getLogger(ResultPartitionFactory.class);
@Nonnull
private final ResultPartitionManager partitionManager;
......
......@@ -50,10 +50,10 @@ import static org.apache.flink.runtime.shuffle.ShuffleUtils.applyWithShuffleType
* Factory for {@link SingleInputGate} to use in {@link NettyShuffleEnvironment}.
*/
public class SingleInputGateFactory {
private static final Logger LOG = LoggerFactory.getLogger(SingleInputGate.class);
private static final Logger LOG = LoggerFactory.getLogger(SingleInputGateFactory.class);
@Nonnull
private final ResourceID taskExecutorLocation;
private final ResourceID taskExecutorResourceId;
private final boolean isCreditBased;
......@@ -78,13 +78,13 @@ public class SingleInputGateFactory {
private final int floatingNetworkBuffersPerGate;
public SingleInputGateFactory(
@Nonnull ResourceID taskExecutorLocation,
@Nonnull ResourceID taskExecutorResourceId,
@Nonnull NettyShuffleEnvironmentConfiguration networkConfig,
@Nonnull ConnectionManager connectionManager,
@Nonnull ResultPartitionManager partitionManager,
@Nonnull TaskEventPublisher taskEventPublisher,
@Nonnull NetworkBufferPool networkBufferPool) {
this.taskExecutorLocation = taskExecutorLocation;
this.taskExecutorResourceId = taskExecutorResourceId;
this.isCreditBased = networkConfig.isCreditBased();
this.partitionRequestInitialBackoff = networkConfig.partitionRequestInitialBackoff();
this.partitionRequestMaxBackoff = networkConfig.partitionRequestMaxBackoff();
......@@ -194,7 +194,7 @@ public class SingleInputGateFactory {
ChannelStatistics channelStatistics,
InputChannelMetrics metrics) {
ResultPartitionID partitionId = inputChannelDescriptor.getResultPartitionID();
if (inputChannelDescriptor.isLocalTo(taskExecutorLocation)) {
if (inputChannelDescriptor.isLocalTo(taskExecutorResourceId)) {
// Consuming task is deployed to the same TaskManager as the partition => local
channelStatistics.numLocalChannels++;
return new LocalInputChannel(
......@@ -241,9 +241,9 @@ public class SingleInputGateFactory {
}
private static class ChannelStatistics {
int numLocalChannels = 0;
int numRemoteChannels = 0;
int numUnknownChannels = 0;
int numLocalChannels;
int numRemoteChannels;
int numUnknownChannels;
@Override
public String toString() {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册