提交 9e87e203 编写于 作者: N Nico Kruber

[FLINK-9785][network] add remote address information to LocalTransportException instances

This closes #6291
上级 3f0b9fee
......@@ -167,7 +167,9 @@ class CreditBasedPartitionRequestClientHandler extends ChannelInboundHandlerAdap
tex = new RemoteTransportException("Lost connection to task manager '" + remoteAddr + "'. " +
"This indicates that the remote task manager was lost.", remoteAddr, cause);
} else {
tex = new LocalTransportException(cause.getMessage(), ctx.channel().localAddress(), cause);
final SocketAddress localAddr = ctx.channel().localAddress();
tex = new LocalTransportException(
String.format("%s (connection to '%s')", cause.getMessage(), remoteAddr), localAddr, cause);
}
notifyAllChannelsOfErrorAndClose(tex);
......
......@@ -34,6 +34,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.SocketAddress;
import java.util.concurrent.TimeUnit;
import static org.apache.flink.runtime.io.network.netty.NettyMessage.PartitionRequest;
......@@ -114,10 +115,11 @@ public class PartitionRequestClient {
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
clientHandler.removeInputChannel(inputChannel);
SocketAddress remoteAddr = future.channel().remoteAddress();
inputChannel.onError(
new LocalTransportException(
"Sending the partition request failed.",
future.channel().localAddress(), future.cause()
String.format("Sending the partition request to '%s' failed.", remoteAddr),
future.channel().localAddress(), future.cause()
));
}
}
......@@ -158,9 +160,10 @@ public class PartitionRequestClient {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
SocketAddress remoteAddr = future.channel().remoteAddress();
inputChannel.onError(new LocalTransportException(
"Sending the task event failed.",
future.channel().localAddress(), future.cause()
String.format("Sending the task event to '%s' failed.", remoteAddr),
future.channel().localAddress(), future.cause()
));
}
}
......@@ -193,7 +196,9 @@ public class PartitionRequestClient {
private void checkNotClosed() throws IOException {
if (closeReferenceCounter.isDisposed()) {
throw new LocalTransportException("Channel closed.", tcpChannel.localAddress());
final SocketAddress localAddr = tcpChannel.localAddress();
final SocketAddress remoteAddr = tcpChannel.remoteAddress();
throw new LocalTransportException(String.format("Channel to '%s' closed.", remoteAddr), localAddr);
}
}
}
......@@ -220,8 +220,10 @@ class PartitionRequestClientFactory {
}
else {
notifyOfError(new LocalTransportException(
"Connecting to remote task manager + '" + connectionId.getAddress() +
"' has been cancelled.", null));
String.format(
"Connecting to remote task manager '%s' has been cancelled.",
connectionId.getAddress()),
null));
}
}
}
......
......@@ -164,7 +164,11 @@ class PartitionRequestClientHandler extends ChannelInboundHandlerAdapter impleme
+ "that the remote task manager was lost.", remoteAddr, cause);
}
else {
tex = new LocalTransportException(cause.getMessage(), ctx.channel().localAddress(), cause);
SocketAddress localAddr = ctx.channel().localAddress();
tex = new LocalTransportException(
String.format("%s (connection to '%s')", cause.getMessage(), remoteAddr),
localAddr,
cause);
}
notifyAllChannelsOfErrorAndClose(tex);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册