From 9e87e2030d5a8232b80e6dbb93166fb9ac9ecb53 Mon Sep 17 00:00:00 2001 From: Nico Kruber Date: Mon, 9 Jul 2018 16:49:15 +0200 Subject: [PATCH] [FLINK-9785][network] add remote address information to LocalTransportException instances This closes #6291 --- .../CreditBasedPartitionRequestClientHandler.java | 4 +++- .../io/network/netty/PartitionRequestClient.java | 15 ++++++++++----- .../netty/PartitionRequestClientFactory.java | 6 ++++-- .../netty/PartitionRequestClientHandler.java | 6 +++++- 4 files changed, 22 insertions(+), 9 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java index 47fbdb2bea0..9aa3920934e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java @@ -167,7 +167,9 @@ class CreditBasedPartitionRequestClientHandler extends ChannelInboundHandlerAdap tex = new RemoteTransportException("Lost connection to task manager '" + remoteAddr + "'. " + "This indicates that the remote task manager was lost.", remoteAddr, cause); } else { - tex = new LocalTransportException(cause.getMessage(), ctx.channel().localAddress(), cause); + final SocketAddress localAddr = ctx.channel().localAddress(); + tex = new LocalTransportException( + String.format("%s (connection to '%s')", cause.getMessage(), remoteAddr), localAddr, cause); } notifyAllChannelsOfErrorAndClose(tex); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClient.java index 91dc2d5fc6f..27d341aca40 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClient.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClient.java @@ -34,6 +34,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.net.SocketAddress; import java.util.concurrent.TimeUnit; import static org.apache.flink.runtime.io.network.netty.NettyMessage.PartitionRequest; @@ -114,10 +115,11 @@ public class PartitionRequestClient { public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { clientHandler.removeInputChannel(inputChannel); + SocketAddress remoteAddr = future.channel().remoteAddress(); inputChannel.onError( new LocalTransportException( - "Sending the partition request failed.", - future.channel().localAddress(), future.cause() + String.format("Sending the partition request to '%s' failed.", remoteAddr), + future.channel().localAddress(), future.cause() )); } } @@ -158,9 +160,10 @@ public class PartitionRequestClient { @Override public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { + SocketAddress remoteAddr = future.channel().remoteAddress(); inputChannel.onError(new LocalTransportException( - "Sending the task event failed.", - future.channel().localAddress(), future.cause() + String.format("Sending the task event to '%s' failed.", remoteAddr), + future.channel().localAddress(), future.cause() )); } } @@ -193,7 +196,9 @@ public class PartitionRequestClient { private void checkNotClosed() throws IOException { if (closeReferenceCounter.isDisposed()) { - throw new LocalTransportException("Channel closed.", tcpChannel.localAddress()); + final SocketAddress localAddr = tcpChannel.localAddress(); + final SocketAddress remoteAddr = tcpChannel.remoteAddress(); + throw new LocalTransportException(String.format("Channel to '%s' closed.", remoteAddr), localAddr); } } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java index ea9d3aade8b..2e357c45105 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java @@ -220,8 +220,10 @@ class PartitionRequestClientFactory { } else { notifyOfError(new LocalTransportException( - "Connecting to remote task manager + '" + connectionId.getAddress() + - "' has been cancelled.", null)); + String.format( + "Connecting to remote task manager '%s' has been cancelled.", + connectionId.getAddress()), + null)); } } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java index 2279deb5a2b..367c62d5acf 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java @@ -164,7 +164,11 @@ class PartitionRequestClientHandler extends ChannelInboundHandlerAdapter impleme + "that the remote task manager was lost.", remoteAddr, cause); } else { - tex = new LocalTransportException(cause.getMessage(), ctx.channel().localAddress(), cause); + SocketAddress localAddr = ctx.channel().localAddress(); + tex = new LocalTransportException( + String.format("%s (connection to '%s')", cause.getMessage(), remoteAddr), + localAddr, + cause); } notifyAllChannelsOfErrorAndClose(tex); -- GitLab