提交 cf3ae88b 编写于 作者: U Ufuk Celebi

[FLINK-3369] [runtime] Make RemoteTransportException instance of CancelTaskException

Problem: RemoteTransportException (RTE) is thrown on data transfer failures
when the remote data producer fails. Because RTE is an instance of IOException,
it can happen that the RTE is reported as the root job failure cause.

Solution: Make RTE instance of CancelTaskException, leading to cancellation of
the task and not failure.

Squashes the following commit:

[pr-comments] Add remote address to RemoteTransportException

This closes #1621.
上级 fd324ea7
......@@ -34,6 +34,10 @@ public class CancelTaskException extends RuntimeException {
super(msg);
}
public CancelTaskException(String msg, Throwable cause) {
super(msg, cause);
}
public CancelTaskException() {
super();
}
......
......@@ -114,7 +114,7 @@ public class PartitionRequestClient {
inputChannel.onError(
new LocalTransportException(
"Sending the partition request failed.",
future.channel().localAddress(), future.cause()
future.cause()
));
}
}
......@@ -158,7 +158,7 @@ public class PartitionRequestClient {
if (!future.isSuccess()) {
inputChannel.onError(new LocalTransportException(
"Sending the task event failed.",
future.channel().localAddress(), future.cause()
future.cause()
));
}
}
......@@ -185,7 +185,7 @@ public class PartitionRequestClient {
private void checkNotClosed() throws IOException {
if (closeReferenceCounter.isDisposed()) {
throw new LocalTransportException("Channel closed.", tcpChannel.localAddress());
throw new LocalTransportException("Channel " + tcpChannel.localAddress() + "closed.");
}
}
}
......@@ -216,12 +216,12 @@ class PartitionRequestClientFactory {
"Connecting to remote task manager + '" + connectionId.getAddress() +
"' has failed. This might indicate that the remote task " +
"manager has been lost.",
connectionId.getAddress(), future.cause()));
future.cause(), connectionId.getAddress()));
}
else {
notifyOfError(new LocalTransportException(
"Connecting to remote task manager + '" + connectionId.getAddress() +
"' has been cancelled.", null));
"' has been cancelled."));
}
}
}
......
......@@ -29,7 +29,6 @@ import org.apache.flink.runtime.io.network.buffer.BufferProvider;
import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
import org.apache.flink.runtime.io.network.netty.exception.LocalTransportException;
import org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException;
import org.apache.flink.runtime.io.network.netty.exception.TransportException;
import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID;
import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
......@@ -133,27 +132,23 @@ class PartitionRequestClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
if (cause instanceof TransportException) {
if (cause instanceof LocalTransportException || cause instanceof RemoteTransportException) {
notifyAllChannelsOfErrorAndClose(cause);
}
else {
final SocketAddress remoteAddr = ctx.channel().remoteAddress();
final TransportException tex;
// Improve on the connection reset by peer error message
if (cause instanceof IOException
&& cause.getMessage().equals("Connection reset by peer")) {
tex = new RemoteTransportException(
notifyAllChannelsOfErrorAndClose(new RemoteTransportException(
"Lost connection to task manager '" + remoteAddr + "'. This indicates "
+ "that the remote task manager was lost.", remoteAddr, cause);
+ "that the remote task manager was lost.", cause, remoteAddr));
}
else {
tex = new LocalTransportException(cause.getMessage(), ctx.channel().localAddress(), cause);
notifyAllChannelsOfErrorAndClose(new LocalTransportException(cause.getMessage(), cause));
}
notifyAllChannelsOfErrorAndClose(tex);
}
}
......@@ -228,7 +223,7 @@ class PartitionRequestClientHandler extends ChannelInboundHandlerAdapter {
if (error.isFatalError()) {
notifyAllChannelsOfErrorAndClose(new RemoteTransportException(
"Fatal error at remote task manager '" + remoteAddr + "'.",
remoteAddr, error.cause));
error.cause, remoteAddr));
}
else {
RemoteInputChannel inputChannel = inputChannels.get(error.receiverId);
......@@ -240,7 +235,7 @@ class PartitionRequestClientHandler extends ChannelInboundHandlerAdapter {
else {
inputChannel.onError(new RemoteTransportException(
"Error at remote task manager '" + remoteAddr + "'.",
remoteAddr, error.cause));
error.cause, remoteAddr));
}
}
}
......
......@@ -18,17 +18,27 @@
package org.apache.flink.runtime.io.network.netty.exception;
import java.net.SocketAddress;
import java.io.IOException;
public class LocalTransportException extends TransportException {
/**
* Exception thrown on local transport failures.
*
* <p>If you get this type of exception at task manager T, it means that
* something went wrong in the local network stack of task manager T.
*/
public class LocalTransportException extends IOException {
private static final long serialVersionUID = 2366708881288640674L;
public LocalTransportException(String message, SocketAddress address) {
super(message, address);
public LocalTransportException() {
super();
}
public LocalTransportException(String message) {
super(message);
}
public LocalTransportException(String message, SocketAddress address, Throwable cause) {
super(message, address, cause);
public LocalTransportException(String message, Throwable cause) {
super(message, cause);
}
}
......@@ -18,17 +18,43 @@
package org.apache.flink.runtime.io.network.netty.exception;
import org.apache.flink.runtime.execution.CancelTaskException;
import java.net.SocketAddress;
public class RemoteTransportException extends TransportException {
/**
* Exception thrown on remote transport failures.
*
* <p>If you get this type of exception at task manager T, it means that
* something went wrong at the network stack of another task manager (not T).
* It is not an issue at the task, which throws the Exception.
*/
public class RemoteTransportException extends CancelTaskException {
private static final long serialVersionUID = 4373615529545893089L;
public RemoteTransportException(String message, SocketAddress address) {
super(message, address);
/** Address of the remote task manager that caused this Exception. */
private final SocketAddress remoteAddress;
public RemoteTransportException() {
this(null, null, null);
}
public RemoteTransportException(String msg, SocketAddress remoteAddress) {
this(msg, null, remoteAddress);
}
public RemoteTransportException(String msg, Throwable cause, SocketAddress remoteAddress) {
super(msg, cause);
this.remoteAddress = remoteAddress;
}
public RemoteTransportException(String message, SocketAddress address, Throwable cause) {
super(message, address, cause);
/**
* Returns the address of the task manager causing this Exception.
*
* @return Address of the remote task manager causing this Exception
*/
public SocketAddress getRemoteAddress() {
return remoteAddress;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.io.network.netty.exception;
import java.io.IOException;
import java.net.SocketAddress;
public abstract class TransportException extends IOException {
private static final long serialVersionUID = 3637820720589866570L;
private final SocketAddress address;
public TransportException(String message, SocketAddress address) {
this(message, address, null);
}
public TransportException(String message, SocketAddress address, Throwable cause) {
super(message, cause);
this.address = address;
}
public SocketAddress getAddress() {
return address;
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册