提交 71e85d8b 编写于 作者: A Arvid Heise 提交者: Piotr Nowojski

[FLINK-17315][tests] Fix and reenable UnalignedCheckpointITCase.

上级 1e20d614
...@@ -21,6 +21,8 @@ package org.apache.flink.test.checkpointing; ...@@ -21,6 +21,8 @@ package org.apache.flink.test.checkpointing;
import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.accumulators.LongCounter; import org.apache.flink.api.common.accumulators.LongCounter;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.CheckpointingOptions; import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.Configuration;
...@@ -34,7 +36,6 @@ import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunctio ...@@ -34,7 +36,6 @@ import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunctio
import org.apache.flink.util.TestLogger; import org.apache.flink.util.TestLogger;
import org.hamcrest.Matchers; import org.hamcrest.Matchers;
import org.junit.Ignore;
import org.junit.Rule; import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.junit.rules.TemporaryFolder; import org.junit.rules.TemporaryFolder;
...@@ -50,7 +51,6 @@ import static org.junit.Assert.assertThat; ...@@ -50,7 +51,6 @@ import static org.junit.Assert.assertThat;
/** /**
* Integration test for performing the unaligned checkpoint. * Integration test for performing the unaligned checkpoint.
*/ */
@Ignore("Unstable")
public class UnalignedCheckpointITCase extends TestLogger { public class UnalignedCheckpointITCase extends TestLogger {
public static final String NUM_COMPLETED_CHECKPOINTS = "numCompletedCheckpoints"; public static final String NUM_COMPLETED_CHECKPOINTS = "numCompletedCheckpoints";
...@@ -106,6 +106,7 @@ public class UnalignedCheckpointITCase extends TestLogger { ...@@ -106,6 +106,7 @@ public class UnalignedCheckpointITCase extends TestLogger {
final LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(parallelism, conf); final LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(parallelism, conf);
env.enableCheckpointing(100); env.enableCheckpointing(100);
env.getCheckpointConfig().enableUnalignedCheckpoints(); env.getCheckpointConfig().enableUnalignedCheckpoints();
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, Time.milliseconds(100)));
return env; return env;
} }
...@@ -140,10 +141,12 @@ public class UnalignedCheckpointITCase extends TestLogger { ...@@ -140,10 +141,12 @@ public class UnalignedCheckpointITCase extends TestLogger {
public void run(SourceContext<Integer> ctx) throws Exception { public void run(SourceContext<Integer> ctx) throws Exception {
int counter = 0; int counter = 0;
while (running) { while (running) {
ctx.collect(counter++); synchronized (ctx.getCheckpointLock()) {
ctx.collect(counter++);
if (numCompletedCheckpoints.getLocalValue() >= minCheckpoints) { if (numCompletedCheckpoints.getLocalValue() >= minCheckpoints) {
cancel(); cancel();
}
} }
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册