diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/BarrierTransceiver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/BarrierTransceiver.java
deleted file mode 100644
index 1dd6a90e6bf5dbd536f03884e1efaa8ec6396c54..0000000000000000000000000000000000000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/BarrierTransceiver.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.jobgraph.tasks;
-
-
-import java.io.IOException;
-
-/**
- * A BarrierTransceiver describes an operator's barrier checkpointing behavior used for
- * fault tolerance. In the most common case [[broadcastBarrier]] is being expected to be called
- * periodically upon receiving a checkpoint barrier. Furthermore, a [[confirmBarrier]] method should
- * be implemented and used for acknowledging a specific checkpoint checkpoint.
- */
-public interface BarrierTransceiver {
-
- /**
- * A callback for notifying an operator of a new checkpoint barrier.
- * @param barrierID
- */
- public void broadcastBarrierFromSource(long barrierID);
-
- /**
- * A callback for confirming that a barrier checkpoint is complete
- * @param barrierID
- */
- public void confirmBarrier(long barrierID) throws IOException;
-
-}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCommittingOperator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCommittingOperator.java
index 69cb1f8d937f6f12730e5574a40c7ea1accbfbcf..be203d29a6f933994754594db942b51124f5b87a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCommittingOperator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCommittingOperator.java
@@ -20,5 +20,5 @@ package org.apache.flink.runtime.jobgraph.tasks;
public interface CheckpointCommittingOperator {
- void confirmCheckpoint(long checkpointId, long timestamp);
+ void confirmCheckpoint(long checkpointId, long timestamp) throws Exception;
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointedOperator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointedOperator.java
index d07b07ee4b31d12d95d2c6d7bf485bd354757124..17ba9477bf0492ccd28dfb69f63210e9e3b7efc3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointedOperator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointedOperator.java
@@ -20,5 +20,5 @@ package org.apache.flink.runtime.jobgraph.tasks;
public interface CheckpointedOperator {
- void triggerCheckpoint(long checkpointId, long timestamp);
+ void triggerCheckpoint(long checkpointId, long timestamp) throws Exception;
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/OperatorStateCarrier.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/OperatorStateCarrier.java
index 576edb62e52a330b8bb0896e3c49dd4f7ff6b5bc..fb5e63ffce4565dcf4cade2eb10b06fc87c1fd8a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/OperatorStateCarrier.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/OperatorStateCarrier.java
@@ -33,6 +33,6 @@ public interface OperatorStateCarrier> {
*
* @param stateHandle The handle to the state.
*/
- public void setInitialState(T stateHandle);
+ public void setInitialState(T stateHandle) throws Exception;
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalStateHandle.java
index f47b054639f25d4c8aca9f1b07ba6e7fa2d3348a..fa0c515fb997b8bb151e0e82ab500d82230922e9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalStateHandle.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalStateHandle.java
@@ -18,23 +18,23 @@
package org.apache.flink.runtime.state;
-import java.util.Map;
+import java.io.Serializable;
/**
* A StateHandle that includes a map of operator states directly.
*/
-public class LocalStateHandle implements StateHandle
+ *
+ * @param The type of the operator state.
*/
-public interface Checkpointed {
+public interface Checkpointed {
/**
- * Gets the current operator state as a checkpoint. The state must reflect all operations
- * from all prior operations if this function.
+ * Gets the current state of the function of operator. The state must reflect the result of all
+ * prior invocations to this function.
*
* @param checkpointId The ID of the checkpoint.
* @param checkpointTimestamp The timestamp of the checkpoint, as derived by
@@ -49,5 +51,13 @@ public interface Checkpointed {
* recovery), or to discard this checkpoint attempt and to continue running
* and to try again with the next checkpoint attempt.
*/
- OperatorState> snapshotState(long checkpointId, long checkpointTimestamp) throws Exception;
+ T snapshotState(long checkpointId, long checkpointTimestamp) throws Exception;
+
+ /**
+ * Restores the state of the function or operator to that of a previous checkpoint.
+ * This method is invoked when a function is executed as part of a recovery run.
+ * *
+ * @param state The state to be restored.
+ */
+ void restoreState(T state);
}
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
index a361b7a2b3ef538196c0cf2f688380d00b87e6df..f66e39497fb31d4215e5065c30b9396e5da10b3b 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
@@ -26,6 +26,8 @@ import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.functions.util.FunctionUtils;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.checkpoint.CheckpointCommitter;
+import org.apache.flink.streaming.api.checkpoint.Checkpointed;
import org.apache.flink.streaming.runtime.io.IndexedReaderIterator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
@@ -107,16 +109,14 @@ public abstract class StreamOperator implements Serializable {
return nextRecord;
} catch (IOException e) {
if (isRunning) {
- throw new RuntimeException("Could not read next record due to: "
- + StringUtils.stringifyException(e));
+ throw new RuntimeException("Could not read next record", e);
} else {
// Task already cancelled do nothing
return null;
}
} catch (IllegalStateException e) {
if (isRunning) {
- throw new RuntimeException("Could not read next record due to: "
- + StringUtils.stringifyException(e));
+ throw new RuntimeException("Could not read next record", e);
} else {
// Task already cancelled do nothing
return null;
@@ -215,4 +215,49 @@ public abstract class StreamOperator implements Serializable {
public Function getUserFunction() {
return userFunction;
}
+
+ // ------------------------------------------------------------------------
+ // Checkpoints and Checkpoint Confirmations
+ // ------------------------------------------------------------------------
+
+ // NOTE - ALL OF THIS CODE WORKS ONLY FOR THE FIRST OPERATOR IN THE CHAIN
+ // IT NEEDS TO BE EXTENDED TO SUPPORT CHAINS
+
+ public void restoreInitialState(Serializable state) throws Exception {
+ if (userFunction instanceof Checkpointed) {
+ setStateOnFunction(state, userFunction);
+ }
+ else {
+ throw new IllegalStateException("Trying to restore state of a non-checkpointed function");
+ }
+ }
+
+ public Serializable getStateSnapshotFromFunction(long checkpointId, long timestamp) throws Exception {
+ if (userFunction instanceof Checkpointed) {
+ return ((Checkpointed>) userFunction).snapshotState(checkpointId, timestamp);
+ }
+ else {
+ return null;
+ }
+ }
+
+ public void confirmCheckpointCompleted(long checkpointId, long timestamp) throws Exception {
+ if (userFunction instanceof CheckpointCommitter) {
+ try {
+ ((CheckpointCommitter) userFunction).commitCheckpoint(checkpointId);
+ }
+ catch (Exception e) {
+ throw new Exception("Error while confirming checkpoint " + checkpointId + " to the stream function", e);
+ }
+ }
+ }
+
+ private static void setStateOnFunction(Serializable state, Function function) {
+ @SuppressWarnings("unchecked")
+ T typedState = (T) state;
+ @SuppressWarnings("unchecked")
+ Checkpointed typedFunction = (Checkpointed) function;
+
+ typedFunction.restoreState(typedState);
+ }
}
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputHandler.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputHandler.java
index c579c3aea48fb13b5ceff50d18d905c379d505ff..55c05517003e91ce31defb66e144bb29475cdca7 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputHandler.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputHandler.java
@@ -88,8 +88,8 @@ public class OutputHandler {
this.outerCollector = createChainedCollector(configuration);
}
- public void broadcastBarrier(long id) throws IOException, InterruptedException {
- StreamingSuperstep barrier = new StreamingSuperstep(id);
+ public void broadcastBarrier(long id, long timestamp) throws IOException, InterruptedException {
+ StreamingSuperstep barrier = new StreamingSuperstep(id, timestamp);
for (StreamOutput> streamOutput : outputMap.values()) {
streamOutput.broadcastEvent(barrier);
}
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index b4a11aae7d76ef3e3803b71f8c77b91d58c7c811..930c9b41eeacc7bafc8fad0cc34f1826a01b33c0 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -18,18 +18,15 @@
package org.apache.flink.streaming.runtime.tasks;
import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
+import java.io.Serializable;
import org.apache.flink.runtime.event.task.TaskEvent;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.jobgraph.tasks.BarrierTransceiver;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCommittingOperator;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointedOperator;
import org.apache.flink.runtime.jobgraph.tasks.OperatorStateCarrier;
import org.apache.flink.runtime.state.LocalStateHandle;
-import org.apache.flink.runtime.state.OperatorState;
import org.apache.flink.runtime.util.event.EventListener;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.ChainableStreamOperator;
@@ -40,16 +37,18 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
import org.apache.flink.util.Collector;
import org.apache.flink.util.MutableObjectIterator;
import org.apache.flink.util.StringUtils;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class StreamTask extends AbstractInvokable implements StreamTaskContext,
- OperatorStateCarrier, CheckpointedOperator, CheckpointCommittingOperator,
- BarrierTransceiver {
+ OperatorStateCarrier, CheckpointedOperator, CheckpointCommittingOperator {
private static final Logger LOG = LoggerFactory.getLogger(StreamTask.class);
+ private final Object checkpointLock = new Object();
+
private static int numTasks;
protected StreamConfig configuration;
@@ -62,7 +61,6 @@ public class StreamTask extends AbstractInvokable implements StreamTask
protected volatile boolean isRunning = false;
private StreamingRuntimeContext context;
- private Map> states;
protected ClassLoader userClassLoader;
@@ -90,32 +88,7 @@ public class StreamTask extends AbstractInvokable implements StreamTask
protected void initialize() {
this.userClassLoader = getUserCodeClassLoader();
this.configuration = new StreamConfig(getTaskConfiguration());
- this.states = new HashMap>();
- this.context = createRuntimeContext(getEnvironment().getTaskName(), this.states);
- }
-
- @Override
- public void broadcastBarrierFromSource(long id) {
- // Only called at input vertices
- if (LOG.isDebugEnabled()) {
- LOG.debug("Received barrier from jobmanager: " + id);
- }
- actOnBarrier(id);
- }
-
- /**
- * This method is called to confirm that a barrier has been fully processed.
- * It sends an acknowledgment to the jobmanager. In the current version if
- * there is user state it also checkpoints the state to the jobmanager.
- */
- @Override
- public void confirmBarrier(long barrierID) throws IOException {
- if (configuration.getStateMonitoring() && !states.isEmpty()) {
- getEnvironment().acknowledgeCheckpoint(barrierID, new LocalStateHandle(states));
- }
- else {
- getEnvironment().acknowledgeCheckpoint(barrierID);
- }
+ this.context = createRuntimeContext(getEnvironment().getTaskName());
}
public void setInputsOutputs() {
@@ -136,11 +109,10 @@ public class StreamTask extends AbstractInvokable implements StreamTask
return instanceID;
}
- public StreamingRuntimeContext createRuntimeContext(String taskName,
- Map> states) {
+ public StreamingRuntimeContext createRuntimeContext(String taskName) {
Environment env = getEnvironment();
return new StreamingRuntimeContext(taskName, env, getUserCodeClassLoader(),
- getExecutionConfig(), states);
+ getExecutionConfig());
}
@Override
@@ -272,62 +244,98 @@ public class StreamTask extends AbstractInvokable implements StreamTask
return this.superstepListener;
}
+ // ------------------------------------------------------------------------
+ // Checkpoint and Restore
+ // ------------------------------------------------------------------------
+
/**
- * Method to be called when a barrier is received from all the input
- * channels. It should broadcast the barrier to the output operators,
- * checkpoint the state and send an ack.
- *
- * @param id
+ * Re-injects the user states into the map. Also set the state on the functions.
*/
- private synchronized void actOnBarrier(long id) {
- if (isRunning) {
- try {
- outputHandler.broadcastBarrier(id);
- confirmBarrier(id);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Superstep " + id + " processed: " + StreamTask.this);
- }
- } catch (Exception e) {
- // Only throw any exception if the vertex is still running
- if (isRunning) {
- throw new RuntimeException(e);
- }
- }
- }
- }
-
@Override
- public String toString() {
- return getEnvironment().getTaskNameWithSubtasks();
+ public void setInitialState(LocalStateHandle stateHandle) throws Exception {
+ // here, we later resolve the state handle into the actual state by
+ // loading the state described by the handle from the backup store
+ Serializable state = stateHandle.getState();
+ streamOperator.restoreInitialState(state);
}
/**
- * Re-injects the user states into the map
+ * This method is either called directly by the checkpoint coordinator, or called
+ * when all incoming channels have reported a barrier
+ *
+ * @param checkpointId
+ * @param timestamp
+ * @throws Exception
*/
@Override
- public void setInitialState(LocalStateHandle stateHandle) {
- this.states.putAll(stateHandle.getState());
+ public void triggerCheckpoint(long checkpointId, long timestamp) throws Exception {
+
+ synchronized (checkpointLock) {
+ if (isRunning) {
+ try {
+ LOG.info("Starting checkpoint " + checkpointId);
+
+ // first draw the state that should go into checkpoint
+ LocalStateHandle state;
+ try {
+ Serializable userState = streamOperator.getStateSnapshotFromFunction(checkpointId, timestamp);
+ state = userState == null ? null : new LocalStateHandle(userState);
+ }
+ catch (Exception e) {
+ throw new Exception("Error while drawing snapshot of the user state.");
+ }
+
+ // now emit the checkpoint barriers
+ outputHandler.broadcastBarrier(checkpointId, timestamp);
+
+ // now confirm the checkpoint
+ if (state == null) {
+ getEnvironment().acknowledgeCheckpoint(checkpointId);
+ } else {
+ getEnvironment().acknowledgeCheckpoint(checkpointId, state);
+ }
+ }
+ catch (Exception e) {
+ if (isRunning) {
+ throw e;
+ }
+ }
+ }
+ }
}
@Override
- public void triggerCheckpoint(long checkpointId, long timestamp) {
- broadcastBarrierFromSource(checkpointId);
+ public void confirmCheckpoint(long checkpointId, long timestamp) throws Exception {
+ // we do nothing here so far. this should call commit on the source function, for example
+ synchronized (checkpointLock) {
+ streamOperator.confirmCheckpointCompleted(checkpointId, timestamp);
+ }
}
+
+ // ------------------------------------------------------------------------
+ // Utilities
+ // ------------------------------------------------------------------------
+
@Override
- public void confirmCheckpoint(long checkpointId, long timestamp) {
- // we do nothing here so far. this should call commit on the source function, for example
+ public String toString() {
+ return getEnvironment().getTaskNameWithSubtasks();
}
-
-
+ // ------------------------------------------------------------------------
private class SuperstepEventListener implements EventListener {
@Override
public void onEvent(TaskEvent event) {
- actOnBarrier(((StreamingSuperstep) event).getId());
+ try {
+ StreamingSuperstep sStep = (StreamingSuperstep) event;
+ triggerCheckpoint(sStep.getId(), sStep.getTimestamp());
+ }
+ catch (Exception e) {
+ throw new RuntimeException(
+ "Error triggering a checkpoint as the result of receiving checkpoint barrier", e);
+ }
}
-
}
}
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingRuntimeContext.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingRuntimeContext.java
index 8ae2ced84b67e0cf6a023ffce0064545a44a4f73..6112e03c0e5b2f36c270cdcf02a8a1ae32eb2154 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingRuntimeContext.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingRuntimeContext.java
@@ -18,17 +18,13 @@
package org.apache.flink.streaming.runtime.tasks;
-import java.util.Map;
-
import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.functions.RichFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.functions.util.RuntimeUDFContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
import org.apache.flink.runtime.operators.util.TaskConfig;
-import org.apache.flink.runtime.state.OperatorState;
/**
* Implementation of the {@link RuntimeContext}, created by runtime stream UDF
@@ -37,65 +33,13 @@ import org.apache.flink.runtime.state.OperatorState;
public class StreamingRuntimeContext extends RuntimeUDFContext {
private final Environment env;
- private final Map> operatorStates;
+
public StreamingRuntimeContext(String name, Environment env, ClassLoader userCodeClassLoader,
- ExecutionConfig executionConfig, Map> operatorStates) {
+ ExecutionConfig executionConfig) {
super(name, env.getNumberOfSubtasks(), env.getIndexInSubtaskGroup(), userCodeClassLoader,
executionConfig, env.getDistributedCacheEntries());
this.env = env;
- this.operatorStates = operatorStates;
- }
-
- /**
- * Returns the operator state registered by the given name for the operator.
- *
- * @param name
- * Name of the operator state to be returned.
- * @return The operator state.
- */
- public OperatorState> getState(String name) {
- if (operatorStates == null) {
- throw new RuntimeException("No state has been registered for this operator.");
- } else {
- OperatorState> state = operatorStates.get(name);
- if (state != null) {
- return state;
- } else {
- throw new RuntimeException("No state has been registered for the name: " + name);
- }
- }
- }
-
- /**
- * Returns whether there is a state stored by the given name
- */
- public boolean containsState(String name) {
- return operatorStates.containsKey(name);
- }
-
- /**
- * This is a beta feature Register an operator state for this
- * operator by the given name. This name can be used to retrieve the state
- * during runtime using {@link StreamingRuntimeContext#getState(String)}. To
- * obtain the {@link StreamingRuntimeContext} from the user-defined function
- * use the {@link RichFunction#getRuntimeContext()} method.
- *
- * @param name
- * The name of the operator state.
- * @param state
- * The state to be registered for this name.
- */
- public void registerState(String name, OperatorState> state) {
- if (state == null) {
- throw new RuntimeException("Cannot register null state");
- } else {
- if (operatorStates.containsKey(name)) {
- throw new RuntimeException("State is already registered");
- } else {
- operatorStates.put(name, state);
- }
- }
}
/**
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingSuperstep.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingSuperstep.java
index 4b3419b742e2e1d3d1a43ea8abf7f3de29597a28..f749773898df179455deb54f5e8e59016cd27675 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingSuperstep.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingSuperstep.java
@@ -27,34 +27,57 @@ import org.apache.flink.runtime.event.task.TaskEvent;
public class StreamingSuperstep extends TaskEvent {
protected long id;
+ protected long timestamp;
- public StreamingSuperstep() {
+ public StreamingSuperstep() {}
+ public StreamingSuperstep(long id, long timestamp) {
+ this.id = id;
+ this.timestamp = timestamp;
}
- public StreamingSuperstep(long id) {
- this.id = id;
+ public long getId() {
+ return id;
}
+ public long getTimestamp() {
+ return id;
+ }
+
+ // ------------------------------------------------------------------------
+
@Override
public void write(DataOutputView out) throws IOException {
out.writeLong(id);
+ out.writeLong(timestamp);
}
@Override
public void read(DataInputView in) throws IOException {
id = in.readLong();
+ timestamp = in.readLong();
}
+
+ // ------------------------------------------------------------------------
- public long getId() {
- return id;
+ @Override
+ public int hashCode() {
+ return (int) (id ^ (id >>> 32) ^ timestamp ^(timestamp >>> 32));
}
+ @Override
public boolean equals(Object other) {
if (other == null || !(other instanceof StreamingSuperstep)) {
return false;
- } else {
- return ((StreamingSuperstep) other).id == this.id;
}
+ else {
+ StreamingSuperstep that = (StreamingSuperstep) other;
+ return that.id == this.id && that.timestamp == this.timestamp;
+ }
+ }
+
+ @Override
+ public String toString() {
+ return String.format("StreamingSuperstep %d @ %d", id, timestamp);
}
}
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
index b8af4ed354b18b748b4dc3cb0d2683790d57ef0e..0afe8b551c57b7b8de12118d93763246bea12616 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
@@ -33,8 +33,8 @@ import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.util.event.EventListener;
-import org.apache.flink.streaming.runtime.io.BarrierBuffer;
import org.apache.flink.streaming.runtime.tasks.StreamingSuperstep;
+
import org.junit.Test;
public class BarrierBufferTest {
@@ -201,7 +201,7 @@ public class BarrierBufferTest {
}
protected static BufferOrEvent createSuperstep(long id, int channel) {
- return new BufferOrEvent(new StreamingSuperstep(id), channel);
+ return new BufferOrEvent(new StreamingSuperstep(id, System.currentTimeMillis()), channel);
}
protected static BufferOrEvent createBuffer(int channel) {
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java
index aeaad5cb8e01899ccf9f3697cf6f55bedad9a9dc..65e39a2735c83f36009e0cb6a9b8ce0cfcc1279c 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java
@@ -22,7 +22,7 @@ import org.apache.commons.io.FileUtils;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.state.OperatorState;
+import org.apache.flink.streaming.api.checkpoint.Checkpointed;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
@@ -145,12 +145,15 @@ public class ProcessFailureStreamingRecoveryITCase extends AbstractProcessFailur
}
}
- public static class SleepyDurableGenerateSequence extends RichParallelSourceFunction {
+ public static class SleepyDurableGenerateSequence extends RichParallelSourceFunction
+ implements Checkpointed {
private static final long SLEEP_TIME = 50;
private final File coordinateDir;
private final long end;
+
+ private long collected;
public SleepyDurableGenerateSequence(File coordinateDir, long end) {
this.coordinateDir = coordinateDir;
@@ -162,23 +165,10 @@ public class ProcessFailureStreamingRecoveryITCase extends AbstractProcessFailur
public void run(Collector collector) throws Exception {
StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext();
- OperatorState collectedState;
- if (context.containsState("collected")) {
- collectedState = (OperatorState) context.getState("collected");
-
-// if (collected == 0) {
-// throw new RuntimeException("The state did not capture a completed checkpoint");
-// }
- }
- else {
- collectedState = new OperatorState(0L);
- context.registerState("collected", collectedState);
- }
final long stepSize = context.getNumberOfParallelSubtasks();
final long congruence = context.getIndexOfThisSubtask();
final long toCollect = (end % stepSize > congruence) ? (end / stepSize + 1) : (end / stepSize);
- long collected = collectedState.getState();
final File proceedFile = new File(coordinateDir, PROCEED_MARKER_FILE);
boolean checkForProceedFile = true;
@@ -196,13 +186,22 @@ public class ProcessFailureStreamingRecoveryITCase extends AbstractProcessFailur
}
collector.collect(collected * stepSize + congruence);
- collectedState.update(collected);
collected++;
}
}
@Override
public void cancel() {}
+
+ @Override
+ public Long snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
+ return collected;
+ }
+
+ @Override
+ public void restoreState(Long state) {
+ collected = state;
+ }
}