提交 7bd7b056 编写于 作者: S Stephan Ewen

[FLINK-2063] [streaming] Add a streaming exactly once processing test with stateful operators.

The counts are off by 1 in some cases, so the test is not activated.
I commit it to allow others to use it as a base of investigation.
上级 85453b64
......@@ -22,13 +22,13 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.functions.RichReduceFunction;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.checkpoint.Checkpointed;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
......@@ -39,8 +39,6 @@ import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
......@@ -57,36 +55,39 @@ public class StreamCheckpointingITCase {
private static final int NUM_TASK_SLOTS = 3;
private static final int PARALLELISM = NUM_TASK_MANAGERS * NUM_TASK_SLOTS;
private static final long NUM_STRINGS = 10000000L;
private static ForkableFlinkMiniCluster cluster;
@BeforeClass
public static void startCluster() {
try {
Configuration conf = new Configuration();
conf.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, NUM_TASK_MANAGERS);
conf.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_TASK_SLOTS);
cluster = new ForkableFlinkMiniCluster(conf, false);
Configuration config = new Configuration();
config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, NUM_TASK_MANAGERS);
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_TASK_SLOTS);
config.setString(ConfigConstants.DEFAULT_EXECUTION_RETRY_DELAY_KEY, "0 ms");
config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 12);
cluster = new ForkableFlinkMiniCluster(config, false);
}
catch (Exception e) {
e.printStackTrace();
fail("custer startup failed");
fail("Failed to start test cluster: " + e.getMessage());
}
}
@AfterClass
public static void shutdownCluster() {
try {
cluster.stop();
cluster.shutdown();
cluster = null;
}
catch (Exception e) {
e.printStackTrace();
fail("Cluster shutdown failed.");
fail("Failed to stop test cluster: " + e.getMessage());
}
}
/**
* Runs the following program:
*
......@@ -97,9 +98,8 @@ public class StreamCheckpointingITCase {
@Test
public void runCheckpointedProgram() {
final long NUM_STRINGS = 10000000L;
assertTrue("Broken test setup", NUM_STRINGS % 40 == 0);
final String COUNT_ACCUMULATOR = "count-acc";
try {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(
......@@ -108,7 +108,7 @@ public class StreamCheckpointingITCase {
env.enableCheckpointing(500);
env.getConfig().disableSysoutLogging();
DataStream<String> stream = env.addSource(new StringGeneratingSourceFunction());
DataStream<String> stream = env.addSource(new StringGeneratingSourceFunction(NUM_STRINGS));
stream
// -------------- first vertex, chained to the source ----------------
......@@ -127,35 +127,15 @@ public class StreamCheckpointingITCase {
}
})
// -------------- seconds vertex - the stateful one ----------------
// -------------- seconds vertex - the stateful one that also fails ----------------
.startNewChain()
.map(new RichMapFunction<PrefixCount, PrefixCount>() {
private long count = 0;
@Override
public PrefixCount map(PrefixCount value) {
count++;
return value;
}
@Override
public void close() {
getRuntimeContext().getLongCounter(COUNT_ACCUMULATOR).add(count);
}
})
.map(new StatefulCounterFunction())
// -------------- third vertex - the sink ----------------
// -------------- third vertex - reducer and the sink ----------------
.groupBy("prefix")
.reduce(new ReduceFunction<PrefixCount>() {
@Override
public PrefixCount reduce(PrefixCount value1, PrefixCount value2) {
value1.count += value2.count;
return value1;
}
})
.reduce(new OnceFailingReducer(NUM_STRINGS))
.addSink(new RichSinkFunction<PrefixCount>() {
private Map<Character, Long> counts = new HashMap<Character, Long>();
......@@ -171,20 +151,25 @@ public class StreamCheckpointingITCase {
}
}
@Override
public void close() {
for (Long count : counts.values()) {
assertEquals(NUM_STRINGS / 40, count.longValue());
}
}
// @Override
// public void close() {
// for (Long count : counts.values()) {
// assertEquals(NUM_STRINGS / 40, count.longValue());
// }
// }
});
JobExecutionResult result = env.execute();
env.execute();
long countSum = 0;
for (long l : StatefulCounterFunction.counts) {
countSum += l;
}
Long totalCount = (Long) result.getAllAccumulatorResults().get(COUNT_ACCUMULATOR);
// verify that we counted exactly right
assertNotNull("TotalCount accumulator not set", totalCount);
assertEquals(NUM_STRINGS, totalCount.longValue());
// this line should be uncommented once the "exactly one off by one" is fixed
// assertEquals(NUM_STRINGS, countSum);
}
catch (Exception e) {
e.printStackTrace();
......@@ -196,25 +181,36 @@ public class StreamCheckpointingITCase {
// Custom Functions
// --------------------------------------------------------------------------------------------
private static class StringGeneratingSourceFunction extends RichParallelSourceFunction<String> {
private static class StringGeneratingSourceFunction extends RichParallelSourceFunction<String>
implements Checkpointed<Long> {
private final long numElements;
private Random rnd;
private StringBuilder stringBuilder;
private int index;
private long index;
private int step;
StringGeneratingSourceFunction(long numElements) {
this.numElements = numElements;
}
@Override
public void open(Configuration parameters) {
rnd = new Random();
stringBuilder = new StringBuilder();
step = getRuntimeContext().getNumberOfParallelSubtasks();
index = getRuntimeContext().getIndexOfThisSubtask();
if (index == 0) {
index = getRuntimeContext().getIndexOfThisSubtask();
}
}
@Override
public boolean reachedEnd() throws Exception {
return index >= NUM_STRINGS;
return index >= numElements;
}
@Override
......@@ -229,6 +225,16 @@ public class StreamCheckpointingITCase {
return result;
}
@Override
public Long snapshotState(long checkpointId, long checkpointTimestamp) {
return this.index;
}
@Override
public void restoreState(Long state) {
this.index = state;
}
private static String randomString(StringBuilder bld, Random rnd) {
final int len = rnd.nextInt(10) + 5;
......@@ -241,6 +247,70 @@ public class StreamCheckpointingITCase {
}
}
private static class StatefulCounterFunction extends RichMapFunction<PrefixCount, PrefixCount>
implements Checkpointed<Long> {
static final long[] counts = new long[PARALLELISM];
private long count = 0;
@Override
public PrefixCount map(PrefixCount value) throws Exception {
count++;
return value;
}
@Override
public void close() {
counts[getRuntimeContext().getIndexOfThisSubtask()] = count;
}
@Override
public Long snapshotState(long checkpointId, long checkpointTimestamp) {
return count;
}
@Override
public void restoreState(Long state) {
count = state;
}
}
private static class OnceFailingReducer extends RichReduceFunction<PrefixCount> {
private static volatile boolean hasFailed = false;
private final long numElements;
private long failurePos;
private long count;
OnceFailingReducer(long numElements) {
this.numElements = numElements;
}
@Override
public void open(Configuration parameters) {
long failurePosMin = (long) (0.4 * numElements / getRuntimeContext().getNumberOfParallelSubtasks());
long failurePosMax = (long) (0.7 * numElements / getRuntimeContext().getNumberOfParallelSubtasks());
failurePos = (new Random().nextLong() % (failurePosMax - failurePosMin)) + failurePosMin;
count = 0;
}
@Override
public PrefixCount reduce(PrefixCount value1, PrefixCount value2) throws Exception {
count++;
if (!hasFailed && count >= failurePos) {
hasFailed = true;
throw new Exception("Test Failure");
}
value1.count += value2.count;
return value1;
}
}
// --------------------------------------------------------------------------------------------
// Custom Type Classes
// --------------------------------------------------------------------------------------------
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册