未验证 提交 5e72b341 编写于 作者: Z Zhijiang 提交者: Till Rohrmann

[FLINK-6227][network] Introduce PartitionException to indicate restarting producer on JM side

The new proposed PartitionException would cover all the cases of consuming partition failure which causes consumer failed, then JM decides to restart the producer based on this exception.

[FLINK-6227][network] (part 2)Make current PartitionNotFoundException extend PartitionException

This closes #8242.
上级 6d0b6b9a
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.io.network.partition;
import java.io.IOException;
/**
* Exception for covering all the scenarios of consuming partition failure
* which causes the consumer task failed, and the job master would decide
* whether to restart the producer based on this exception.
*/
public abstract class PartitionException extends IOException {
private static final long serialVersionUID = 0L;
private final ResultPartitionID partitionId;
public PartitionException(String message, ResultPartitionID partitionId) {
this(message, partitionId, null);
}
public PartitionException(String message, ResultPartitionID partitionId, Throwable throwable) {
super(message, throwable);
this.partitionId = partitionId;
}
public ResultPartitionID getPartitionId() {
return partitionId;
}
}
......@@ -18,27 +18,14 @@
package org.apache.flink.runtime.io.network.partition;
import java.io.IOException;
/**
* Exception for failed partition requests due to non-existing partitions.
*/
public class PartitionNotFoundException extends IOException {
public class PartitionNotFoundException extends PartitionException {
private static final long serialVersionUID = 0L;
private final ResultPartitionID partitionId;
public PartitionNotFoundException(ResultPartitionID partitionId) {
this.partitionId = partitionId;
}
public ResultPartitionID getPartitionId() {
return partitionId;
}
@Override
public String getMessage() {
return "Partition " + partitionId + " not found.";
super("Partition " + partitionId + " not found.", partitionId);
}
}
......@@ -22,6 +22,7 @@ import org.apache.flink.metrics.Counter;
import org.apache.flink.runtime.event.TaskEvent;
import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.partition.PartitionException;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
......@@ -176,6 +177,9 @@ public abstract class InputChannel {
/**
* Checks for an error and rethrows it if one was reported.
*
* <p>Note: Any {@link PartitionException} instances should not be transformed
* and make sure they are always visible in task failure cause.
*/
protected void checkError() throws IOException {
final Throwable t = cause.get();
......
......@@ -35,6 +35,7 @@ import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
import org.apache.flink.runtime.io.network.partition.InputChannelTestUtils;
import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
......@@ -52,6 +53,8 @@ import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
import static org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createLocalInputChannel;
import static org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createSingleInputGate;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
......@@ -59,6 +62,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Mockito.mock;
......@@ -520,6 +524,28 @@ public class SingleInputGateTest extends InputGateTestBase {
}
}
/**
* Tests that if the {@link PartitionNotFoundException} is set onto one {@link InputChannel},
* then it would be thrown directly via {@link SingleInputGate#getNextBufferOrEvent()}. So we
* could confirm the {@link SingleInputGate} would not swallow or transform the original exception.
*/
@Test
public void testPartitionNotFoundExceptionWhileGetNextBuffer() throws Exception {
final SingleInputGate inputGate = createSingleInputGate(1);
final LocalInputChannel localChannel = createLocalInputChannel(inputGate, new ResultPartitionManager());
final ResultPartitionID partitionId = localChannel.getPartitionId();
inputGate.setInputChannel(partitionId.getPartitionId(), localChannel);
localChannel.setError(new PartitionNotFoundException(partitionId));
try {
inputGate.getNextBufferOrEvent();
fail("Should throw a PartitionNotFoundException.");
} catch (PartitionNotFoundException notFound) {
assertThat(partitionId, is(notFound.getPartitionId()));
}
}
// ---------------------------------------------------------------------------------------------
private void addUnknownInputChannel(
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册