未验证 提交 22f87ee9 编写于 作者: Z Zhijiang 提交者: Till Rohrmann

[hotfix][network,tests] Add new unit test for LocalInputChannel#requestSubpartition

It is necessary for flip1 to make sure the PartitionNotFoundException would be thrown by LocalInputChannel#requestSubpartition if the partition
was not registered in ResultPartitionManager before. So a new unit test is added to cover this case.
上级 f78e494c
......@@ -410,6 +410,11 @@ public class SingleInputGate extends InputGate {
}
}
@VisibleForTesting
Timer getRetriggerLocalRequestTimer() {
return retriggerLocalRequestTimer;
}
@Override
public void close() throws IOException {
boolean released = false;
......
......@@ -39,6 +39,7 @@ import org.apache.flink.util.function.CheckedSupplier;
import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
import org.hamcrest.Matchers;
import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
......@@ -60,6 +61,8 @@ import static org.apache.flink.runtime.io.network.partition.InputChannelTestUtil
import static org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createSingleInputGate;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyInt;
......@@ -246,6 +249,75 @@ public class LocalInputChannelTest {
ch.getNextBuffer();
}
/**
* Tests that {@link LocalInputChannel#requestSubpartition(int)} throws {@link PartitionNotFoundException}
* if the result partition was not registered in {@link ResultPartitionManager} and no backoff.
*/
@Test
public void testPartitionNotFoundExceptionWhileRequestingPartition() throws Exception {
final SingleInputGate inputGate = createSingleInputGate(1);
final LocalInputChannel localChannel = createLocalInputChannel(inputGate, new ResultPartitionManager());
try {
localChannel.requestSubpartition(0);
fail("Should throw a PartitionNotFoundException.");
} catch (PartitionNotFoundException notFound) {
assertThat(localChannel.getPartitionId(), Matchers.is(notFound.getPartitionId()));
}
}
/**
* Tests that {@link SingleInputGate#retriggerPartitionRequest(IntermediateResultPartitionID)} is triggered
* after {@link LocalInputChannel#requestSubpartition(int)} throws {@link PartitionNotFoundException}
* within backoff.
*/
@Test
public void testRetriggerPartitionRequestWhilePartitionNotFound() throws Exception {
final SingleInputGate inputGate = createSingleInputGate(1);
final LocalInputChannel localChannel = createLocalInputChannel(
inputGate, new ResultPartitionManager(), 1, 1);
inputGate.setInputChannel(localChannel.getPartitionId().getPartitionId(), localChannel);
localChannel.requestSubpartition(0);
// The timer should be initialized at the first time of retriggering partition request.
assertNotNull(inputGate.getRetriggerLocalRequestTimer());
}
/**
* Tests that {@link LocalInputChannel#retriggerSubpartitionRequest(Timer, int)} would throw
* {@link PartitionNotFoundException} which is set onto the input channel then.
*/
@Test
public void testChannelErrorWhileRetriggeringRequest() {
final SingleInputGate inputGate = createSingleInputGate(1);
final LocalInputChannel localChannel = createLocalInputChannel(inputGate, new ResultPartitionManager());
final Timer timer = new Timer(true) {
@Override
public void schedule(TimerTask task, long delay) {
task.run();
try {
localChannel.checkError();
fail("Should throw a PartitionNotFoundException.");
} catch (PartitionNotFoundException notFound) {
assertThat(localChannel.partitionId, Matchers.is(notFound.getPartitionId()));
} catch (IOException ex) {
fail("Should throw a PartitionNotFoundException.");
}
}
};
try {
localChannel.retriggerSubpartitionRequest(timer, 0);
} finally {
timer.cancel();
}
}
/**
* Verifies that concurrent release via the SingleInputGate and re-triggering
* of a partition request works smoothly.
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册