diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java index faba252287f9e79a9bbc82325322c30ef12708ea..d4907840f5af83c8140f1a182e308fa284e3dd7d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java @@ -354,7 +354,9 @@ public class NetworkEnvironment { public boolean hasReleasedAllResources() { String msg = String.format("Network buffer pool: %d missing memory segments. %d registered buffer pools. Connection manager: %d active connections. Task event dispatcher: %d registered writers.", - networkBufferPool.getTotalNumberOfMemorySegments() - networkBufferPool.getNumberOfAvailableMemorySegments(), networkBufferPool.getNumberOfRegisteredBufferPools(), connectionManager.getNumberOfActiveConnections(), taskEventDispatcher.getNumberOfRegisteredWriters()); + networkBufferPool.getTotalNumberOfMemorySegments() - networkBufferPool.getNumberOfAvailableMemorySegments(), + networkBufferPool.getNumberOfRegisteredBufferPools(), connectionManager.getNumberOfActiveConnections(), + taskEventDispatcher.getNumberOfRegisteredWriters()); boolean success = networkBufferPool.getTotalNumberOfMemorySegments() == networkBufferPool.getNumberOfAvailableMemorySegments() && networkBufferPool.getNumberOfRegisteredBufferPools() == 0 && diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java index c0970987d79099e232b87ef641762d09bb86a5ba..0b66b2c8150c5fe31e8c974e987d3efd76bae7d9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java @@ -42,15 +42,18 @@ public class NettyConnectionManager implements ConnectionManager { } @Override - public void start(ResultPartitionProvider partitionProvider, TaskEventDispatcher taskEventDispatcher, NetworkBufferPool networkbufferPool) throws IOException { - PartitionRequestProtocol partitionRequestProtocol = new PartitionRequestProtocol(partitionProvider, taskEventDispatcher, networkbufferPool); + public void start(ResultPartitionProvider partitionProvider, TaskEventDispatcher taskEventDispatcher, NetworkBufferPool networkbufferPool) + throws IOException { + PartitionRequestProtocol partitionRequestProtocol = + new PartitionRequestProtocol(partitionProvider, taskEventDispatcher, networkbufferPool); client.init(partitionRequestProtocol); server.init(partitionRequestProtocol); } @Override - public PartitionRequestClient createPartitionRequestClient(ConnectionID connectionId) throws IOException, InterruptedException { + public PartitionRequestClient createPartitionRequestClient(ConnectionID connectionId) + throws IOException, InterruptedException { return partitionRequestClientFactory.createPartitionRequestClient(connectionId); } @@ -70,4 +73,3 @@ public class NettyConnectionManager implements ConnectionManager { server.shutdown(); } } -