未验证 提交 49e18320 编写于 作者: Z Zhijiang 提交者: Till Rohrmann

[FLINK-12458][network] Introduce PartitionConnectionException for unreachable producer

If the consumer can not establish a connection to remote task executor while requesting remote subpartition, which might indicate the remote task executor is not reachable.
We could wrap this connection exception into new proposed PartitionConnectionException which also extends PartitionException, then the job master would decide whether to
restart the upstream region to re-producer partition data.

This closes #8509.
上级 5e72b341
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.io.network.partition.consumer;
import org.apache.flink.runtime.io.network.partition.PartitionException;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
/**
* Exception for failed partition requests due to connection failure
* with unreachable producer.
*/
public class PartitionConnectionException extends PartitionException {
private static final long serialVersionUID = 0L;
public PartitionConnectionException(ResultPartitionID partitionId, Throwable throwable) {
super("Connection for partition " + partitionId + " not reachable.", partitionId, throwable);
}
}
......@@ -161,8 +161,12 @@ public class RemoteInputChannel extends InputChannel implements BufferRecycler,
public void requestSubpartition(int subpartitionIndex) throws IOException, InterruptedException {
if (partitionRequestClient == null) {
// Create a client and request the partition
partitionRequestClient = connectionManager
.createPartitionRequestClient(connectionId);
try {
partitionRequestClient = connectionManager.createPartitionRequestClient(connectionId);
} catch (IOException e) {
// IOExceptions indicate that we could not open a connection to the remote TaskExecutor
throw new PartitionConnectionException(partitionId, e);
}
partitionRequestClient.requestSubpartition(partitionId, subpartitionIndex, this, 0);
}
......
......@@ -18,6 +18,8 @@
package org.apache.flink.runtime.io.network;
import java.io.IOException;
/**
* A dummy implementation of the {@link ConnectionManager} which is mainly used for creating
* {@link PartitionRequestClient} instance in tests.
......@@ -28,7 +30,7 @@ public class TestingConnectionManager implements ConnectionManager {
public void start() {}
@Override
public PartitionRequestClient createPartitionRequestClient(ConnectionID connectionId) {
public PartitionRequestClient createPartitionRequestClient(ConnectionID connectionId) throws IOException {
return new TestingPartitionRequestClient();
}
......
......@@ -1016,6 +1016,23 @@ public class RemoteInputChannelTest {
}
}
/**
* Tests that any exceptions thrown by {@link ConnectionManager#createPartitionRequestClient(ConnectionID)}
* would be wrapped into {@link PartitionConnectionException} during
* {@link RemoteInputChannel#requestSubpartition(int)}.
*/
@Test
public void testPartitionConnectionExceptionWhileRequestingPartition() throws Exception {
final RemoteInputChannel inputChannel = InputChannelTestUtils.createRemoteInputChannel(
createSingleInputGate(1), 0, new TestingExceptionConnectionManager());
try {
inputChannel.requestSubpartition(0);
fail("Expected PartitionConnectionException.");
} catch (PartitionConnectionException ex) {
assertThat(inputChannel.getPartitionId(), is(ex.getPartitionId()));
}
}
// ---------------------------------------------------------------------------------------------
private RemoteInputChannel createRemoteInputChannel(SingleInputGate inputGate)
......@@ -1179,4 +1196,11 @@ public class RemoteInputChannelTest {
ExceptionUtils.rethrowException(throwable);
}
}
private static final class TestingExceptionConnectionManager extends TestingConnectionManager {
@Override
public PartitionRequestClient createPartitionRequestClient(ConnectionID connectionId) throws IOException {
throw new IOException("");
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册