提交 c8de6568 编写于 作者: G Gyula Fora 提交者: Stephan Ewen

[streaming] StreamTask updated to handle chained states

上级 54e95761
......@@ -49,7 +49,7 @@ public class OutputHandler<OUT> {
private ClassLoader cl;
private Collector<OUT> outerCollector;
public List<ChainableStreamOperator<?, ?>> chainedOperators;
private List<ChainableStreamOperator<?, ?>> chainedOperators;
private Map<StreamEdge, StreamOutput<?>> outputMap;
......@@ -98,6 +98,10 @@ public class OutputHandler<OUT> {
public Collection<StreamOutput<?>> getOutputs() {
return outputMap.values();
}
public List<ChainableStreamOperator<?, ?>> getChainedOperators(){
return chainedOperators;
}
/**
* This method builds up a nested collector which encapsulates all the
......
......@@ -19,7 +19,11 @@ package org.apache.flink.streaming.runtime.tasks;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.functors.NotNullPredicate;
import org.apache.flink.runtime.event.task.TaskEvent;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
......@@ -37,7 +41,6 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
import org.apache.flink.util.Collector;
import org.apache.flink.util.MutableObjectIterator;
import org.apache.flink.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -58,6 +61,7 @@ public class StreamTask<IN, OUT> extends AbstractInvokable implements StreamTask
private InputHandler<IN> inputHandler;
protected OutputHandler<OUT> outputHandler;
private StreamOperator<IN, OUT> streamOperator;
private boolean chained;
protected volatile boolean isRunning = false;
private StreamingRuntimeContext context;
......@@ -94,6 +98,7 @@ public class StreamTask<IN, OUT> extends AbstractInvokable implements StreamTask
public void setInputsOutputs() {
inputHandler = new InputHandler<IN>(this);
outputHandler = new OutputHandler<OUT>(this);
chained = !outputHandler.getChainedOperators().isEmpty();
}
protected void setOperator() {
......@@ -165,7 +170,7 @@ public class StreamTask<IN, OUT> extends AbstractInvokable implements StreamTask
protected void openOperator() throws Exception {
streamOperator.open(getTaskConfiguration());
for (ChainableStreamOperator<?, ?> operator : outputHandler.chainedOperators) {
for (ChainableStreamOperator<?, ?> operator : outputHandler.getChainedOperators()) {
operator.setRuntimeContext(context);
operator.open(getTaskConfiguration());
}
......@@ -174,7 +179,7 @@ public class StreamTask<IN, OUT> extends AbstractInvokable implements StreamTask
protected void closeOperator() throws Exception {
streamOperator.close();
for (ChainableStreamOperator<?, ?> operator : outputHandler.chainedOperators) {
for (ChainableStreamOperator<?, ?> operator : outputHandler.getChainedOperators()) {
operator.close();
}
}
......@@ -256,7 +261,26 @@ public class StreamTask<IN, OUT> extends AbstractInvokable implements StreamTask
// here, we later resolve the state handle into the actual state by
// loading the state described by the handle from the backup store
Serializable state = stateHandle.getState();
streamOperator.restoreInitialState(state);
if (chained) {
@SuppressWarnings("unchecked")
List<Serializable> chainedStates = (List<Serializable>) state;
Serializable headState = chainedStates.get(0);
if (headState != null) {
streamOperator.restoreInitialState(headState);
}
for (int i = 1; i < chainedStates.size(); i++) {
Serializable chainedState = chainedStates.get(i);
if (chainedState != null) {
outputHandler.getChainedOperators().get(i - 1).restoreInitialState(chainedState);
}
}
} else {
streamOperator.restoreInitialState(state);
}
}
/**
......@@ -278,7 +302,25 @@ public class StreamTask<IN, OUT> extends AbstractInvokable implements StreamTask
// first draw the state that should go into checkpoint
LocalStateHandle state;
try {
Serializable userState = streamOperator.getStateSnapshotFromFunction(checkpointId, timestamp);
Serializable userState = streamOperator.getStateSnapshotFromFunction(
checkpointId, timestamp);
if (chained) {
// We construct a list of states for chained tasks
List<Serializable> chainedStates = new ArrayList<Serializable>();
chainedStates.add(userState);
for (StreamOperator<?, ?> chainedOperator : outputHandler.getChainedOperators()) {
chainedStates.add(chainedOperator.getStateSnapshotFromFunction(
checkpointId, timestamp));
}
userState = CollectionUtils.exists(chainedStates,
NotNullPredicate.INSTANCE) ? (Serializable) chainedStates
: null;
}
state = userState == null ? null : new LocalStateHandle(userState);
}
catch (Exception e) {
......@@ -309,6 +351,11 @@ public class StreamTask<IN, OUT> extends AbstractInvokable implements StreamTask
// we do nothing here so far. this should call commit on the source function, for example
synchronized (checkpointLock) {
streamOperator.confirmCheckpointCompleted(checkpointId, timestamp);
if (chained) {
for (StreamOperator<?, ?> op : outputHandler.getChainedOperators()) {
op.confirmCheckpointCompleted(checkpointId, timestamp);
}
}
}
}
......
......@@ -19,6 +19,7 @@
package org.apache.flink.test.recovery;
import org.apache.commons.io.FileUtils;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
......@@ -77,25 +78,15 @@ public class ProcessFailureStreamingRecoveryITCase extends AbstractProcessFailur
env.enableCheckpointing(200);
DataStream<Long> result = env.addSource(new SleepyDurableGenerateSequence(coordinateDir, DATA_COUNT))
// make sure every mapper is involved
// .shuffle()
// populate the coordinate directory so we can proceed to TaskManager failure
.map(new RichMapFunction<Long, Long>() {
private boolean markerCreated = false;
// add a non-chained no-op map to test the chain state restore logic
.distribute().map(new MapFunction<Long, Long>() {
@Override
public Long map(Long value) throws Exception {
if (!markerCreated) {
int taskIndex = getRuntimeContext().getIndexOfThisSubtask();
touchFile(new File(coordinateDir, READY_MARKER_FILE_PREFIX + taskIndex));
markerCreated = true;
}
return value;
}
});
})
// populate the coordinate directory so we can proceed to TaskManager failure
.map(new StatefulMapper(coordinateDir));
//write result to temporary file
result.addSink(new RichSinkFunction<Long>() {
......@@ -161,7 +152,6 @@ public class ProcessFailureStreamingRecoveryITCase extends AbstractProcessFailur
}
@Override
@SuppressWarnings("unchecked")
public void run(Collector<Long> collector) throws Exception {
StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext();
......@@ -203,6 +193,44 @@ public class ProcessFailureStreamingRecoveryITCase extends AbstractProcessFailur
collected = state;
}
}
public static class StatefulMapper extends RichMapFunction<Long, Long> implements
Checkpointed<Integer> {
private boolean markerCreated = false;
private File coordinateDir;
private boolean restored = false;
public StatefulMapper(File coordinateDir) {
this.coordinateDir = coordinateDir;
}
@Override
public Long map(Long value) throws Exception {
if (!markerCreated) {
int taskIndex = getRuntimeContext().getIndexOfThisSubtask();
touchFile(new File(coordinateDir, READY_MARKER_FILE_PREFIX + taskIndex));
markerCreated = true;
}
return value;
}
@Override
public void close() {
if (!restored) {
fail();
}
}
@Override
public Integer snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
return 1;
}
@Override
public void restoreState(Integer state) {
restored = true;
}
}
private static void fileBatchHasEveryNumberLower(int numFiles, int numbers, File path) throws IOException {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册