From 3b2ee23b65544a5f517e77cb55911bacae4117c6 Mon Sep 17 00:00:00 2001
From: Stephan Ewen
Date: Tue, 5 May 2015 16:08:50 +0200
Subject: [PATCH] [streaming] Integrate new checkpointed interface with
StreamTask, StreamOperator, and PersistentKafkaSource
---
.../jobgraph/tasks/BarrierTransceiver.java | 43 -----
.../tasks/CheckpointCommittingOperator.java | 2 +-
.../jobgraph/tasks/CheckpointedOperator.java | 2 +-
.../jobgraph/tasks/OperatorStateCarrier.java | 2 +-
.../flink/runtime/state/LocalStateHandle.java | 14 +-
.../flink/runtime/state/StateUtils.java | 4 +-
.../flink/runtime/taskmanager/Task.java | 19 +--
.../CheckpointStateRestoreTest.java | 7 +-
.../api/simple/PersistentKafkaSource.java | 27 ++--
.../api/checkpoint/Checkpointed.java | 20 ++-
.../api/operators/StreamOperator.java | 53 ++++++-
.../runtime/tasks/OutputHandler.java | 4 +-
.../streaming/runtime/tasks/StreamTask.java | 150 +++++++++---------
.../tasks/StreamingRuntimeContext.java | 60 +------
.../runtime/tasks/StreamingSuperstep.java | 37 ++++-
.../runtime/io/BarrierBufferTest.java | 4 +-
...ProcessFailureStreamingRecoveryITCase.java | 31 ++--
17 files changed, 229 insertions(+), 250 deletions(-)
delete mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/BarrierTransceiver.java
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/BarrierTransceiver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/BarrierTransceiver.java
deleted file mode 100644
index 1dd6a90e6bf..00000000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/BarrierTransceiver.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.jobgraph.tasks;
-
-
-import java.io.IOException;
-
-/**
- * A BarrierTransceiver describes an operator's barrier checkpointing behavior used for
- * fault tolerance. In the most common case [[broadcastBarrier]] is being expected to be called
- * periodically upon receiving a checkpoint barrier. Furthermore, a [[confirmBarrier]] method should
- * be implemented and used for acknowledging a specific checkpoint checkpoint.
- */
-public interface BarrierTransceiver {
-
- /**
- * A callback for notifying an operator of a new checkpoint barrier.
- * @param barrierID
- */
- public void broadcastBarrierFromSource(long barrierID);
-
- /**
- * A callback for confirming that a barrier checkpoint is complete
- * @param barrierID
- */
- public void confirmBarrier(long barrierID) throws IOException;
-
-}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCommittingOperator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCommittingOperator.java
index 69cb1f8d937..be203d29a6f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCommittingOperator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCommittingOperator.java
@@ -20,5 +20,5 @@ package org.apache.flink.runtime.jobgraph.tasks;
public interface CheckpointCommittingOperator {
- void confirmCheckpoint(long checkpointId, long timestamp);
+ void confirmCheckpoint(long checkpointId, long timestamp) throws Exception;
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointedOperator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointedOperator.java
index d07b07ee4b3..17ba9477bf0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointedOperator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointedOperator.java
@@ -20,5 +20,5 @@ package org.apache.flink.runtime.jobgraph.tasks;
public interface CheckpointedOperator {
- void triggerCheckpoint(long checkpointId, long timestamp);
+ void triggerCheckpoint(long checkpointId, long timestamp) throws Exception;
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/OperatorStateCarrier.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/OperatorStateCarrier.java
index 576edb62e52..fb5e63ffce4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/OperatorStateCarrier.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/OperatorStateCarrier.java
@@ -33,6 +33,6 @@ public interface OperatorStateCarrier> {
*
* @param stateHandle The handle to the state.
*/
- public void setInitialState(T stateHandle);
+ public void setInitialState(T stateHandle) throws Exception;
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalStateHandle.java
index f47b054639f..fa0c515fb99 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalStateHandle.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalStateHandle.java
@@ -18,23 +18,23 @@
package org.apache.flink.runtime.state;
-import java.util.Map;
+import java.io.Serializable;
/**
* A StateHandle that includes a map of operator states directly.
*/
-public class LocalStateHandle implements StateHandle
+ *
+ * @param The type of the operator state.
*/
-public interface Checkpointed {
+public interface Checkpointed {
/**
- * Gets the current operator state as a checkpoint. The state must reflect all operations
- * from all prior operations if this function.
+ * Gets the current state of the function of operator. The state must reflect the result of all
+ * prior invocations to this function.
*
* @param checkpointId The ID of the checkpoint.
* @param checkpointTimestamp The timestamp of the checkpoint, as derived by
@@ -49,5 +51,13 @@ public interface Checkpointed {
* recovery), or to discard this checkpoint attempt and to continue running
* and to try again with the next checkpoint attempt.
*/
- OperatorState> snapshotState(long checkpointId, long checkpointTimestamp) throws Exception;
+ T snapshotState(long checkpointId, long checkpointTimestamp) throws Exception;
+
+ /**
+ * Restores the state of the function or operator to that of a previous checkpoint.
+ * This method is invoked when a function is executed as part of a recovery run.
+ * *
+ * @param state The state to be restored.
+ */
+ void restoreState(T state);
}
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
index a361b7a2b3e..f66e39497fb 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
@@ -26,6 +26,8 @@ import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.functions.util.FunctionUtils;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.checkpoint.CheckpointCommitter;
+import org.apache.flink.streaming.api.checkpoint.Checkpointed;
import org.apache.flink.streaming.runtime.io.IndexedReaderIterator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
@@ -107,16 +109,14 @@ public abstract class StreamOperator implements Serializable {
return nextRecord;
} catch (IOException e) {
if (isRunning) {
- throw new RuntimeException("Could not read next record due to: "
- + StringUtils.stringifyException(e));
+ throw new RuntimeException("Could not read next record", e);
} else {
// Task already cancelled do nothing
return null;
}
} catch (IllegalStateException e) {
if (isRunning) {
- throw new RuntimeException("Could not read next record due to: "
- + StringUtils.stringifyException(e));
+ throw new RuntimeException("Could not read next record", e);
} else {
// Task already cancelled do nothing
return null;
@@ -215,4 +215,49 @@ public abstract class StreamOperator implements Serializable {
public Function getUserFunction() {
return userFunction;
}
+
+ // ------------------------------------------------------------------------
+ // Checkpoints and Checkpoint Confirmations
+ // ------------------------------------------------------------------------
+
+ // NOTE - ALL OF THIS CODE WORKS ONLY FOR THE FIRST OPERATOR IN THE CHAIN
+ // IT NEEDS TO BE EXTENDED TO SUPPORT CHAINS
+
+ public void restoreInitialState(Serializable state) throws Exception {
+ if (userFunction instanceof Checkpointed) {
+ setStateOnFunction(state, userFunction);
+ }
+ else {
+ throw new IllegalStateException("Trying to restore state of a non-checkpointed function");
+ }
+ }
+
+ public Serializable getStateSnapshotFromFunction(long checkpointId, long timestamp) throws Exception {
+ if (userFunction instanceof Checkpointed) {
+ return ((Checkpointed>) userFunction).snapshotState(checkpointId, timestamp);
+ }
+ else {
+ return null;
+ }
+ }
+
+ public void confirmCheckpointCompleted(long checkpointId, long timestamp) throws Exception {
+ if (userFunction instanceof CheckpointCommitter) {
+ try {
+ ((CheckpointCommitter) userFunction).commitCheckpoint(checkpointId);
+ }
+ catch (Exception e) {
+ throw new Exception("Error while confirming checkpoint " + checkpointId + " to the stream function", e);
+ }
+ }
+ }
+
+ private static void setStateOnFunction(Serializable state, Function function) {
+ @SuppressWarnings("unchecked")
+ T typedState = (T) state;
+ @SuppressWarnings("unchecked")
+ Checkpointed typedFunction = (Checkpointed) function;
+
+ typedFunction.restoreState(typedState);
+ }
}
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputHandler.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputHandler.java
index c579c3aea48..55c05517003 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputHandler.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputHandler.java
@@ -88,8 +88,8 @@ public class OutputHandler {
this.outerCollector = createChainedCollector(configuration);
}
- public void broadcastBarrier(long id) throws IOException, InterruptedException {
- StreamingSuperstep barrier = new StreamingSuperstep(id);
+ public void broadcastBarrier(long id, long timestamp) throws IOException, InterruptedException {
+ StreamingSuperstep barrier = new StreamingSuperstep(id, timestamp);
for (StreamOutput> streamOutput : outputMap.values()) {
streamOutput.broadcastEvent(barrier);
}
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index b4a11aae7d7..930c9b41eea 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -18,18 +18,15 @@
package org.apache.flink.streaming.runtime.tasks;
import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
+import java.io.Serializable;
import org.apache.flink.runtime.event.task.TaskEvent;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.jobgraph.tasks.BarrierTransceiver;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCommittingOperator;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointedOperator;
import org.apache.flink.runtime.jobgraph.tasks.OperatorStateCarrier;
import org.apache.flink.runtime.state.LocalStateHandle;
-import org.apache.flink.runtime.state.OperatorState;
import org.apache.flink.runtime.util.event.EventListener;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.ChainableStreamOperator;
@@ -40,16 +37,18 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
import org.apache.flink.util.Collector;
import org.apache.flink.util.MutableObjectIterator;
import org.apache.flink.util.StringUtils;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class StreamTask extends AbstractInvokable implements StreamTaskContext,
- OperatorStateCarrier, CheckpointedOperator, CheckpointCommittingOperator,
- BarrierTransceiver {
+ OperatorStateCarrier, CheckpointedOperator, CheckpointCommittingOperator {
private static final Logger LOG = LoggerFactory.getLogger(StreamTask.class);
+ private final Object checkpointLock = new Object();
+
private static int numTasks;
protected StreamConfig configuration;
@@ -62,7 +61,6 @@ public class StreamTask extends AbstractInvokable implements StreamTask
protected volatile boolean isRunning = false;
private StreamingRuntimeContext context;
- private Map> states;
protected ClassLoader userClassLoader;
@@ -90,32 +88,7 @@ public class StreamTask extends AbstractInvokable implements StreamTask
protected void initialize() {
this.userClassLoader = getUserCodeClassLoader();
this.configuration = new StreamConfig(getTaskConfiguration());
- this.states = new HashMap>();
- this.context = createRuntimeContext(getEnvironment().getTaskName(), this.states);
- }
-
- @Override
- public void broadcastBarrierFromSource(long id) {
- // Only called at input vertices
- if (LOG.isDebugEnabled()) {
- LOG.debug("Received barrier from jobmanager: " + id);
- }
- actOnBarrier(id);
- }
-
- /**
- * This method is called to confirm that a barrier has been fully processed.
- * It sends an acknowledgment to the jobmanager. In the current version if
- * there is user state it also checkpoints the state to the jobmanager.
- */
- @Override
- public void confirmBarrier(long barrierID) throws IOException {
- if (configuration.getStateMonitoring() && !states.isEmpty()) {
- getEnvironment().acknowledgeCheckpoint(barrierID, new LocalStateHandle(states));
- }
- else {
- getEnvironment().acknowledgeCheckpoint(barrierID);
- }
+ this.context = createRuntimeContext(getEnvironment().getTaskName());
}
public void setInputsOutputs() {
@@ -136,11 +109,10 @@ public class StreamTask extends AbstractInvokable implements StreamTask
return instanceID;
}
- public StreamingRuntimeContext createRuntimeContext(String taskName,
- Map> states) {
+ public StreamingRuntimeContext createRuntimeContext(String taskName) {
Environment env = getEnvironment();
return new StreamingRuntimeContext(taskName, env, getUserCodeClassLoader(),
- getExecutionConfig(), states);
+ getExecutionConfig());
}
@Override
@@ -272,62 +244,98 @@ public class StreamTask extends AbstractInvokable implements StreamTask
return this.superstepListener;
}
+ // ------------------------------------------------------------------------
+ // Checkpoint and Restore
+ // ------------------------------------------------------------------------
+
/**
- * Method to be called when a barrier is received from all the input
- * channels. It should broadcast the barrier to the output operators,
- * checkpoint the state and send an ack.
- *
- * @param id
+ * Re-injects the user states into the map. Also set the state on the functions.
*/
- private synchronized void actOnBarrier(long id) {
- if (isRunning) {
- try {
- outputHandler.broadcastBarrier(id);
- confirmBarrier(id);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Superstep " + id + " processed: " + StreamTask.this);
- }
- } catch (Exception e) {
- // Only throw any exception if the vertex is still running
- if (isRunning) {
- throw new RuntimeException(e);
- }
- }
- }
- }
-
@Override
- public String toString() {
- return getEnvironment().getTaskNameWithSubtasks();
+ public void setInitialState(LocalStateHandle stateHandle) throws Exception {
+ // here, we later resolve the state handle into the actual state by
+ // loading the state described by the handle from the backup store
+ Serializable state = stateHandle.getState();
+ streamOperator.restoreInitialState(state);
}
/**
- * Re-injects the user states into the map
+ * This method is either called directly by the checkpoint coordinator, or called
+ * when all incoming channels have reported a barrier
+ *
+ * @param checkpointId
+ * @param timestamp
+ * @throws Exception
*/
@Override
- public void setInitialState(LocalStateHandle stateHandle) {
- this.states.putAll(stateHandle.getState());
+ public void triggerCheckpoint(long checkpointId, long timestamp) throws Exception {
+
+ synchronized (checkpointLock) {
+ if (isRunning) {
+ try {
+ LOG.info("Starting checkpoint " + checkpointId);
+
+ // first draw the state that should go into checkpoint
+ LocalStateHandle state;
+ try {
+ Serializable userState = streamOperator.getStateSnapshotFromFunction(checkpointId, timestamp);
+ state = userState == null ? null : new LocalStateHandle(userState);
+ }
+ catch (Exception e) {
+ throw new Exception("Error while drawing snapshot of the user state.");
+ }
+
+ // now emit the checkpoint barriers
+ outputHandler.broadcastBarrier(checkpointId, timestamp);
+
+ // now confirm the checkpoint
+ if (state == null) {
+ getEnvironment().acknowledgeCheckpoint(checkpointId);
+ } else {
+ getEnvironment().acknowledgeCheckpoint(checkpointId, state);
+ }
+ }
+ catch (Exception e) {
+ if (isRunning) {
+ throw e;
+ }
+ }
+ }
+ }
}
@Override
- public void triggerCheckpoint(long checkpointId, long timestamp) {
- broadcastBarrierFromSource(checkpointId);
+ public void confirmCheckpoint(long checkpointId, long timestamp) throws Exception {
+ // we do nothing here so far. this should call commit on the source function, for example
+ synchronized (checkpointLock) {
+ streamOperator.confirmCheckpointCompleted(checkpointId, timestamp);
+ }
}
+
+ // ------------------------------------------------------------------------
+ // Utilities
+ // ------------------------------------------------------------------------
+
@Override
- public void confirmCheckpoint(long checkpointId, long timestamp) {
- // we do nothing here so far. this should call commit on the source function, for example
+ public String toString() {
+ return getEnvironment().getTaskNameWithSubtasks();
}
-
-
+ // ------------------------------------------------------------------------
private class SuperstepEventListener implements EventListener {
@Override
public void onEvent(TaskEvent event) {
- actOnBarrier(((StreamingSuperstep) event).getId());
+ try {
+ StreamingSuperstep sStep = (StreamingSuperstep) event;
+ triggerCheckpoint(sStep.getId(), sStep.getTimestamp());
+ }
+ catch (Exception e) {
+ throw new RuntimeException(
+ "Error triggering a checkpoint as the result of receiving checkpoint barrier", e);
+ }
}
-
}
}
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingRuntimeContext.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingRuntimeContext.java
index 8ae2ced84b6..6112e03c0e5 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingRuntimeContext.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingRuntimeContext.java
@@ -18,17 +18,13 @@
package org.apache.flink.streaming.runtime.tasks;
-import java.util.Map;
-
import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.functions.RichFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.functions.util.RuntimeUDFContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
import org.apache.flink.runtime.operators.util.TaskConfig;
-import org.apache.flink.runtime.state.OperatorState;
/**
* Implementation of the {@link RuntimeContext}, created by runtime stream UDF
@@ -37,65 +33,13 @@ import org.apache.flink.runtime.state.OperatorState;
public class StreamingRuntimeContext extends RuntimeUDFContext {
private final Environment env;
- private final Map> operatorStates;
+
public StreamingRuntimeContext(String name, Environment env, ClassLoader userCodeClassLoader,
- ExecutionConfig executionConfig, Map> operatorStates) {
+ ExecutionConfig executionConfig) {
super(name, env.getNumberOfSubtasks(), env.getIndexInSubtaskGroup(), userCodeClassLoader,
executionConfig, env.getDistributedCacheEntries());
this.env = env;
- this.operatorStates = operatorStates;
- }
-
- /**
- * Returns the operator state registered by the given name for the operator.
- *
- * @param name
- * Name of the operator state to be returned.
- * @return The operator state.
- */
- public OperatorState> getState(String name) {
- if (operatorStates == null) {
- throw new RuntimeException("No state has been registered for this operator.");
- } else {
- OperatorState> state = operatorStates.get(name);
- if (state != null) {
- return state;
- } else {
- throw new RuntimeException("No state has been registered for the name: " + name);
- }
- }
- }
-
- /**
- * Returns whether there is a state stored by the given name
- */
- public boolean containsState(String name) {
- return operatorStates.containsKey(name);
- }
-
- /**
- * This is a beta feature Register an operator state for this
- * operator by the given name. This name can be used to retrieve the state
- * during runtime using {@link StreamingRuntimeContext#getState(String)}. To
- * obtain the {@link StreamingRuntimeContext} from the user-defined function
- * use the {@link RichFunction#getRuntimeContext()} method.
- *
- * @param name
- * The name of the operator state.
- * @param state
- * The state to be registered for this name.
- */
- public void registerState(String name, OperatorState> state) {
- if (state == null) {
- throw new RuntimeException("Cannot register null state");
- } else {
- if (operatorStates.containsKey(name)) {
- throw new RuntimeException("State is already registered");
- } else {
- operatorStates.put(name, state);
- }
- }
}
/**
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingSuperstep.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingSuperstep.java
index 4b3419b742e..f749773898d 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingSuperstep.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingSuperstep.java
@@ -27,34 +27,57 @@ import org.apache.flink.runtime.event.task.TaskEvent;
public class StreamingSuperstep extends TaskEvent {
protected long id;
+ protected long timestamp;
- public StreamingSuperstep() {
+ public StreamingSuperstep() {}
+ public StreamingSuperstep(long id, long timestamp) {
+ this.id = id;
+ this.timestamp = timestamp;
}
- public StreamingSuperstep(long id) {
- this.id = id;
+ public long getId() {
+ return id;
}
+ public long getTimestamp() {
+ return id;
+ }
+
+ // ------------------------------------------------------------------------
+
@Override
public void write(DataOutputView out) throws IOException {
out.writeLong(id);
+ out.writeLong(timestamp);
}
@Override
public void read(DataInputView in) throws IOException {
id = in.readLong();
+ timestamp = in.readLong();
}
+
+ // ------------------------------------------------------------------------
- public long getId() {
- return id;
+ @Override
+ public int hashCode() {
+ return (int) (id ^ (id >>> 32) ^ timestamp ^(timestamp >>> 32));
}
+ @Override
public boolean equals(Object other) {
if (other == null || !(other instanceof StreamingSuperstep)) {
return false;
- } else {
- return ((StreamingSuperstep) other).id == this.id;
}
+ else {
+ StreamingSuperstep that = (StreamingSuperstep) other;
+ return that.id == this.id && that.timestamp == this.timestamp;
+ }
+ }
+
+ @Override
+ public String toString() {
+ return String.format("StreamingSuperstep %d @ %d", id, timestamp);
}
}
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
index b8af4ed354b..0afe8b551c5 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
@@ -33,8 +33,8 @@ import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.util.event.EventListener;
-import org.apache.flink.streaming.runtime.io.BarrierBuffer;
import org.apache.flink.streaming.runtime.tasks.StreamingSuperstep;
+
import org.junit.Test;
public class BarrierBufferTest {
@@ -201,7 +201,7 @@ public class BarrierBufferTest {
}
protected static BufferOrEvent createSuperstep(long id, int channel) {
- return new BufferOrEvent(new StreamingSuperstep(id), channel);
+ return new BufferOrEvent(new StreamingSuperstep(id, System.currentTimeMillis()), channel);
}
protected static BufferOrEvent createBuffer(int channel) {
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java
index aeaad5cb8e0..65e39a2735c 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java
@@ -22,7 +22,7 @@ import org.apache.commons.io.FileUtils;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.state.OperatorState;
+import org.apache.flink.streaming.api.checkpoint.Checkpointed;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
@@ -145,12 +145,15 @@ public class ProcessFailureStreamingRecoveryITCase extends AbstractProcessFailur
}
}
- public static class SleepyDurableGenerateSequence extends RichParallelSourceFunction {
+ public static class SleepyDurableGenerateSequence extends RichParallelSourceFunction
+ implements Checkpointed {
private static final long SLEEP_TIME = 50;
private final File coordinateDir;
private final long end;
+
+ private long collected;
public SleepyDurableGenerateSequence(File coordinateDir, long end) {
this.coordinateDir = coordinateDir;
@@ -162,23 +165,10 @@ public class ProcessFailureStreamingRecoveryITCase extends AbstractProcessFailur
public void run(Collector collector) throws Exception {
StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext();
- OperatorState collectedState;
- if (context.containsState("collected")) {
- collectedState = (OperatorState) context.getState("collected");
-
-// if (collected == 0) {
-// throw new RuntimeException("The state did not capture a completed checkpoint");
-// }
- }
- else {
- collectedState = new OperatorState(0L);
- context.registerState("collected", collectedState);
- }
final long stepSize = context.getNumberOfParallelSubtasks();
final long congruence = context.getIndexOfThisSubtask();
final long toCollect = (end % stepSize > congruence) ? (end / stepSize + 1) : (end / stepSize);
- long collected = collectedState.getState();
final File proceedFile = new File(coordinateDir, PROCEED_MARKER_FILE);
boolean checkForProceedFile = true;
@@ -196,13 +186,22 @@ public class ProcessFailureStreamingRecoveryITCase extends AbstractProcessFailur
}
collector.collect(collected * stepSize + congruence);
- collectedState.update(collected);
collected++;
}
}
@Override
public void cancel() {}
+
+ @Override
+ public Long snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
+ return collected;
+ }
+
+ @Override
+ public void restoreState(Long state) {
+ collected = state;
+ }
}
--
GitLab