diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java index d64548d923ce7766a5bd42f19b804e22f0d87f2c..d7e6efd9e884fe49342cc0713f71cdd7f4db1363 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java @@ -52,20 +52,22 @@ class PartitionRequestClientFactory { Object entry; PartitionRequestClient client = null; - while(client == null) { + while (client == null) { entry = clients.get(remoteAddress); if (entry != null) { // Existing channel or connecting channel if (entry instanceof PartitionRequestClient) { client = (PartitionRequestClient) entry; - } else { + } + else { ConnectingChannel future = (ConnectingChannel) entry; client = future.waitForChannel(); clients.replace(remoteAddress, future, client); } - } else { + } + else { // No channel yet. Create one, but watch out for a race. // We create a "connecting future" and atomically add it to the map. // Only the thread that really added it establishes the channel. @@ -79,18 +81,20 @@ class PartitionRequestClientFactory { client = connectingChannel.waitForChannel(); clients.replace(remoteAddress, connectingChannel, client); - } else if (old instanceof ConnectingChannel) { + } + else if (old instanceof ConnectingChannel) { client = ((ConnectingChannel) old).waitForChannel(); clients.replace(remoteAddress, old, client); - } else { + } + else { client = (PartitionRequestClient) old; } } // Make sure to increment the reference count before handing a client // out to ensure correct bookkeeping for channel closing. - if(!client.incrementReferenceCounter()){ + if (!client.incrementReferenceCounter()) { destroyPartitionRequestClient(remoteAddress, client); client = null; } @@ -102,7 +106,7 @@ class PartitionRequestClientFactory { public void closeOpenChannelConnections(RemoteAddress remoteAddress) { Object entry = clients.get(remoteAddress); - if(entry instanceof ConnectingChannel) { + if (entry instanceof ConnectingChannel) { ConnectingChannel channel = (ConnectingChannel) entry; if (channel.dispose()) { @@ -141,8 +145,9 @@ class PartitionRequestClientFactory { boolean result; synchronized (connectLock) { if (partitionRequestClient != null) { - result = partitionRequestClient.disposeIfNotUsed(); - } else { + result = partitionRequestClient.disposeIfNotUsed(); + } + else { disposeRequestClient = true; result = true; } @@ -155,16 +160,21 @@ class PartitionRequestClientFactory { private void handInChannel(Channel channel) { synchronized (connectLock) { - PartitionRequestClientHandler requestHandler = - (PartitionRequestClientHandler) channel.pipeline().get(PartitionRequestProtocol.CLIENT_REQUEST_HANDLER_NAME); + try { + PartitionRequestClientHandler requestHandler = + (PartitionRequestClientHandler) channel.pipeline().get(PartitionRequestProtocol.CLIENT_REQUEST_HANDLER_NAME); - partitionRequestClient = new PartitionRequestClient(channel, requestHandler, remoteAddress, clientFactory); + partitionRequestClient = new PartitionRequestClient(channel, requestHandler, remoteAddress, clientFactory); - if (disposeRequestClient) { - partitionRequestClient.disposeIfNotUsed(); - } + if (disposeRequestClient) { + partitionRequestClient.disposeIfNotUsed(); + } - connectLock.notifyAll(); + connectLock.notifyAll(); + } + catch (Throwable t) { + notifyOfError(t); + } } }