提交 4fe2e18b 编写于 作者: G Gyula Fora

[streaming] Allow force-enabling checkpoints for iterative jobs

上级 a8e08507
......@@ -1188,7 +1188,15 @@ Rich functions provide, in addition to the user-defined function (`map()`, `redu
Stateful computation
------------
Flink supports the checkpointing and persistence of user defined state, so in case of a failure this state can be restored to the latest checkpoint and the processing can continue from there. This gives exactly once semantics for anything that is sotred in the state. For example when implementing a rolling count over the stream Flink gives you the possibility to safely store the counter. Another common usacase is when reading from a Kafka source to save the latest committed offset to catch up from. To mark a source for checkpointing it has to implement the `flink.streaming.api.checkpoint.Checkpointed` interface or preferably its special case where the checkpointing can be done asynchronously, `CheckpointedAsynchronously`. For example let us write a reduce function that besides summing the data it also counts have many elements it has seen.
Flink supports the checkpointing and persistence of user defined state, so in case of a failure this state can be restored to the latest checkpoint and the processing can continue from there. This gives exactly once semantics for anything that is stored in the state when the sources are stateful as well and checkpoint their current offset. The `PersistentKafkaSource` provides this stateful functionality for example.
For example when implementing a rolling count over the stream Flink gives you the possibility to safely store the counter. Another common usecase is when reading from a Kafka source to save the latest committed offset to catch up from. To mark a function for checkpointing it has to implement the `flink.streaming.api.checkpoint.Checkpointed` interface or preferably its special case where the checkpointing can be done asynchronously, `CheckpointedAsynchronously`.
Checkpointing can be enabled from the `StreamExecutionEnvironment` using the `enableCheckpointing(…)` where additional parameters can be passed to modify the default 5 second checkpoint interval.
By default state checkpoints will be stored in-memory at the JobManager. Flink also supports storing the checkpoints on any flink-supported file system (such as HDFS or Tachyon) which can be set in the flink-conf.yaml.
For example let us write a reduce function that besides summing the data it also counts have many elements it has seen.
{% highlight java %}
public class CounterSum implements ReduceFunction<Long>, CheckpointedAsynchronously<Long> {
......@@ -1257,7 +1265,9 @@ public static class CounterSource implements SourceFunction<Long>, CheckpointedA
}
{% endhighlight %}
Some operators might need the information when a checkpoint is fully acknowledged by Flink to communicate that with the outside world. In this case see the `flink.streaming.api.checkpoint.CheckpointComitter` interface.
Some operators might need the information when a checkpoint is fully acknowledged by Flink to communicate that with the outside world. In this case see the `flink.streaming.api.checkpoint.CheckpointComitter` interface.
Fink currently only provides processing guarantees for jobs without iterations. Enabling checkpointing on an iterative job causes an exception. In order to force checkpointing on an iterative program the user needs to set a special flag when enabling checkpointing: `env.enableCheckpointing(interval, force = true)`.
[Back to top](#top)
......
......@@ -241,6 +241,37 @@ public abstract class StreamExecutionEnvironment {
streamGraph.setCheckpointingInterval(interval);
return this;
}
/**
* Method for force-enabling fault-tolerance. Activates monitoring and
* backup of streaming operator states even for jobs containing iterations.
*
* Please note that the checkpoint/restore guarantees for iterative jobs are
* only best-effort at the moment. Records inside the loops may be lost
* during failure.
* <p/>
* <p/>
* Setting this option assumes that the job is used in production and thus
* if not stated explicitly otherwise with calling with the
* {@link #setNumberOfExecutionRetries(int numberOfExecutionRetries)} method
* in case of failure the job will be resubmitted to the cluster
* indefinitely.
*
* @param interval
* Time interval between state checkpoints in millis
* @param force
* If true checkpointing will be enabled for iterative jobs as
* well
*/
@Deprecated
public StreamExecutionEnvironment enableCheckpointing(long interval, boolean force) {
streamGraph.setCheckpointingEnabled(true);
streamGraph.setCheckpointingInterval(interval);
if (force) {
streamGraph.forceCheckpoint();
}
return this;
}
/**
* Method for enabling fault-tolerance. Activates monitoring and backup of
......
......@@ -79,6 +79,7 @@ public class StreamGraph extends StreamingPlan {
private Map<Integer, StreamLoop> streamLoops;
protected Map<Integer, StreamLoop> vertexIDtoLoop;
private StateHandleProvider<?> stateHandleProvider;
private boolean forceCheckpoint = false;
public StreamGraph(StreamExecutionEnvironment environment) {
......@@ -118,6 +119,10 @@ public class StreamGraph extends StreamingPlan {
public void setCheckpointingInterval(long checkpointingInterval) {
this.checkpointingInterval = checkpointingInterval;
}
public void forceCheckpoint() {
this.forceCheckpoint = true;
}
public void setStateHandleProvider(StateHandleProvider<?> provider) {
this.stateHandleProvider = provider;
......@@ -408,9 +413,11 @@ public class StreamGraph extends StreamingPlan {
public JobGraph getJobGraph(String jobGraphName) {
// temporarily forbid checkpointing for iterative jobs
if (isIterative() && isCheckpointingEnabled()) {
if (isIterative() && isCheckpointingEnabled() && !forceCheckpoint) {
throw new UnsupportedOperationException(
"Checkpointing is currently not supported for iterative jobs!");
"Checkpointing is currently not supported by default for iterative jobs, as we cannot guarantee exactly once semantics. "
+ "State checkpoints happen normally, but records in-transit during the snapshot will be lost upon failure. "
+ "\nThe user can force enable state checkpoints with the reduced guarantees by calling: env.enableCheckpointing(interval,true)");
}
setJobName(jobGraphName);
......
......@@ -398,7 +398,6 @@ public class StreamingJobGraphGenerator {
JobSnapshottingSettings settings = new JobSnapshottingSettings(
triggerVertices, ackVertices, commitVertices, interval);
jobGraph.setSnapshotSettings(settings);
int executionRetries = streamGraph.getExecutionConfig().getNumberOfExecutionRetries();
......
......@@ -65,6 +65,7 @@ public class StreamIterationHead<OUT> extends OneInputStreamTask<OUT, OUT> {
@SuppressWarnings("unchecked")
@Override
public void invoke() throws Exception {
isRunning = true;
if (LOG.isDebugEnabled()) {
LOG.debug("Iteration source {} invoked", getName());
}
......@@ -96,6 +97,7 @@ public class StreamIterationHead<OUT> extends OneInputStreamTask<OUT, OUT> {
}
finally {
// Cleanup
isRunning = false;
outputHandler.flushOutputs();
clearBuffers();
}
......
......@@ -57,6 +57,8 @@ public class StreamIterationTail<IN> extends OneInputStreamTask<IN, IN> {
@Override
public void invoke() throws Exception {
isRunning = true;
if (LOG.isDebugEnabled()) {
LOG.debug("Iteration sink {} invoked", getName());
}
......@@ -74,6 +76,7 @@ public class StreamIterationTail<IN> extends OneInputStreamTask<IN, IN> {
}
finally {
// Cleanup
isRunning = false;
clearBuffers();
}
}
......
......@@ -229,7 +229,7 @@ public abstract class StreamTask<OUT, O extends StreamOperator<OUT>> extends Abs
@Override
public void triggerCheckpoint(long checkpointId, long timestamp) throws Exception {
synchronized (checkpointLock) {
if (isRunning) {
try {
......
......@@ -207,6 +207,22 @@ public class IterateTest {
} catch (UnsupportedOperationException e) {
// expected behaviour
}
// Test force checkpointing
try {
env.enableCheckpointing(1, false);
env.execute();
// this statement should never be reached
fail();
} catch (UnsupportedOperationException e) {
// expected behaviour
}
env.enableCheckpointing(1, true);
env.getStreamGraph().getJobGraph();
}
......
......@@ -118,12 +118,31 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
/**
* Method for enabling fault-tolerance. Activates monitoring and backup of streaming
* operator states. Time interval between state checkpoints is specified in in millis.
*
* If the force flag is set to true, checkpointing will be enabled for iterative jobs as
* well.Please note that the checkpoint/restore guarantees for iterative jobs are
* only best-effort at the moment. Records inside the loops may be lost during failure.
*
* Setting this option assumes that the job is used in production and thus if not stated
* explicitly otherwise with calling with the
* {@link #setNumberOfExecutionRetries(int numberOfExecutionRetries)} method in case of
* failure the job will be resubmitted to the cluster indefinitely.
*/
@deprecated
def enableCheckpointing(interval : Long, force: Boolean) : StreamExecutionEnvironment = {
javaEnv.enableCheckpointing(interval, force)
this
}
/**
* Method for enabling fault-tolerance. Activates monitoring and backup of streaming
* operator states. Time interval between state checkpoints is specified in in millis.
*
* Setting this option assumes that the job is used in production and thus if not stated
* explicitly otherwise with calling with the
* {@link #setNumberOfExecutionRetries(int numberOfExecutionRetries)} method in case of
* failure the job will be resubmitted to the cluster indefinitely.
*/
def enableCheckpointing(interval : Long) : StreamExecutionEnvironment = {
javaEnv.enableCheckpointing(interval)
this
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册