提交 c8c0ec2f 编写于 作者: S Stephan Ewen

[FLINK-2628] [tests] CoStreamCheckpointingITCase prints a warning when test is inconclusive

The test is inconclusive when the test failure happens before the first checkpoint.
上级 8a849372
...@@ -31,7 +31,6 @@ import org.apache.flink.streaming.api.functions.sink.SinkFunction; ...@@ -31,7 +31,6 @@ import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction; import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.util.Collector; import org.apache.flink.util.Collector;
import org.junit.Assert;
import java.io.IOException; import java.io.IOException;
import java.util.Random; import java.util.Random;
...@@ -80,21 +79,21 @@ public class CoStreamCheckpointingITCase extends StreamFaultToleranceTestBase { ...@@ -80,21 +79,21 @@ public class CoStreamCheckpointingITCase extends StreamFaultToleranceTestBase {
// -------------- first vertex, chained to the source ---------------- // -------------- first vertex, chained to the source ----------------
.filter(new StringRichFilterFunction()) .filter(new StringRichFilterFunction())
// -------------- second vertex - the stateful one that also fails ---------------- // -------------- second vertex - stateful ----------------
.connect(stream).flatMap(new LeftIdentityCoRichFlatMapFunction()) .connect(stream).flatMap(new LeftIdentityCoRichFlatMapFunction())
// -------------- third vertex - the stateful one that also fails ---------------- // -------------- third vertex - stateful ----------------
.map(new StringPrefixCountRichMapFunction()) .map(new StringPrefixCountRichMapFunction())
.startNewChain() .startNewChain()
.map(new StatefulCounterFunction()) .map(new StatefulCounterFunction())
// -------------- fourth vertex - reducer and the sink ---------------- // -------------- fourth vertex - reducer (failing) and the sink ----------------
.groupBy("prefix") .groupBy("prefix")
.reduce(new OnceFailingReducer(NUM_STRINGS)) .reduce(new OnceFailingReducer(NUM_STRINGS))
.addSink(new SinkFunction<PrefixCount>() { .addSink(new SinkFunction<PrefixCount>() {
@Override @Override
public void invoke(PrefixCount value) throws Exception { public void invoke(PrefixCount value) {
// Do nothing here // Do nothing here
} }
}); });
...@@ -123,15 +122,14 @@ public class CoStreamCheckpointingITCase extends StreamFaultToleranceTestBase { ...@@ -123,15 +122,14 @@ public class CoStreamCheckpointingITCase extends StreamFaultToleranceTestBase {
} }
if (!StringPrefixCountRichMapFunction.restoreCalledAtLeastOnce) { if (!StringPrefixCountRichMapFunction.restoreCalledAtLeastOnce) {
Assert.fail("Restore was never called on counting Map function."); System.err.println("Test inconclusive: Restore was never called on counting Map function.");
} }
if (!LeftIdentityCoRichFlatMapFunction.restoreCalledAtLeastOnce) { if (!LeftIdentityCoRichFlatMapFunction.restoreCalledAtLeastOnce) {
Assert.fail("Restore was never called on counting CoMap function."); System.err.println("Test inconclusive: Restore was never called on counting CoMap function.");
} }
// verify that we counted exactly right // verify that we counted exactly right
assertEquals(NUM_STRINGS, filterSum); assertEquals(NUM_STRINGS, filterSum);
assertEquals(NUM_STRINGS, coMapSum); assertEquals(NUM_STRINGS, coMapSum);
assertEquals(NUM_STRINGS, mapSum); assertEquals(NUM_STRINGS, mapSum);
...@@ -301,19 +299,20 @@ public class CoStreamCheckpointingITCase extends StreamFaultToleranceTestBase { ...@@ -301,19 +299,20 @@ public class CoStreamCheckpointingITCase extends StreamFaultToleranceTestBase {
} }
private static class StringPrefixCountRichMapFunction extends RichMapFunction<String, PrefixCount> implements Checkpointed<Long> { private static class StringPrefixCountRichMapFunction extends RichMapFunction<String, PrefixCount> implements Checkpointed<Long> {
private long count = 0;
static final long[] counts = new long[PARALLELISM]; static final long[] counts = new long[PARALLELISM];
static volatile boolean restoreCalledAtLeastOnce = false; static volatile boolean restoreCalledAtLeastOnce = false;
private long count;
@Override @Override
public PrefixCount map(String value) throws IOException { public PrefixCount map(String value) {
count += 1; count += 1;
return new PrefixCount(value.substring(0, 1), value, 1L); return new PrefixCount(value.substring(0, 1), value, 1L);
} }
@Override @Override
public Long snapshotState(long checkpointId, long checkpointTimestamp) throws Exception { public Long snapshotState(long checkpointId, long checkpointTimestamp) {
return count; return count;
} }
...@@ -334,25 +333,25 @@ public class CoStreamCheckpointingITCase extends StreamFaultToleranceTestBase { ...@@ -334,25 +333,25 @@ public class CoStreamCheckpointingITCase extends StreamFaultToleranceTestBase {
private static class LeftIdentityCoRichFlatMapFunction extends RichCoFlatMapFunction<String, String, String> implements Checkpointed<Long> { private static class LeftIdentityCoRichFlatMapFunction extends RichCoFlatMapFunction<String, String, String> implements Checkpointed<Long> {
long count = 0;
static final long[] counts = new long[PARALLELISM]; static final long[] counts = new long[PARALLELISM];
static volatile boolean restoreCalledAtLeastOnce = false; static volatile boolean restoreCalledAtLeastOnce = false;
private long count;
@Override @Override
public void flatMap1(String value, Collector<String> out) throws IOException { public void flatMap1(String value, Collector<String> out) {
count += 1; count += 1;
out.collect(value); out.collect(value);
} }
@Override @Override
public void flatMap2(String value, Collector<String> out) throws IOException { public void flatMap2(String value, Collector<String> out) {
// we ignore the values from the second input // we ignore the values from the second input
} }
@Override @Override
public Long snapshotState(long checkpointId, long checkpointTimestamp) throws Exception { public Long snapshotState(long checkpointId, long checkpointTimestamp) {
return count; return count;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册