提交 452c39a9 编写于 作者: P Paris Carbone 提交者: Stephan Ewen

[FLINK-1638] [streaming] Operator state checkpointing and injection prototype

上级 a34869c0
......@@ -23,6 +23,7 @@ import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.jobgraph.JobID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.state.OperatorState;
import java.io.Serializable;
import java.util.ArrayList;
......@@ -76,6 +77,8 @@ public final class TaskDeploymentDescriptor implements Serializable {
/** The list of JAR files required to run this task. */
private final List<BlobKey> requiredJarFiles;
private OperatorState operatorState;
/**
* Constructs a task deployment descriptor.
......@@ -119,6 +122,21 @@ public final class TaskDeploymentDescriptor implements Serializable {
this.requiredJarFiles = new ArrayList<BlobKey>();
}
public TaskDeploymentDescriptor(
JobID jobID, JobVertexID vertexID, ExecutionAttemptID executionId, String taskName,
int indexInSubtaskGroup, int numberOfSubtasks, Configuration jobConfiguration,
Configuration taskConfiguration, String invokableClassName,
List<PartitionDeploymentDescriptor> producedPartitions,
List<PartitionConsumerDeploymentDescriptor> consumedPartitions,
List<BlobKey> requiredJarFiles, int targetSlotNumber, OperatorState operatorState) {
this(jobID, vertexID, executionId, taskName, indexInSubtaskGroup, numberOfSubtasks,
jobConfiguration, taskConfiguration, invokableClassName, producedPartitions,
consumedPartitions, requiredJarFiles, targetSlotNumber);
setOperatorState(operatorState);
}
/**
* Returns the ID of the job the tasks belongs to.
*/
......@@ -224,4 +242,12 @@ public final class TaskDeploymentDescriptor implements Serializable {
taskName, indexInSubtaskGroup, numberOfSubtasks, invokableClassName,
strProducedPartitions, strConsumedPartitions);
}
public void setOperatorState(OperatorState operatorState) {
this.operatorState = operatorState;
}
public OperatorState getOperatorState() {
return operatorState;
}
}
......@@ -46,6 +46,7 @@ import org.apache.flink.runtime.jobmanager.scheduler.SlotAllocationFutureAction;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.messages.TaskManagerMessages;
import org.apache.flink.runtime.messages.TaskManagerMessages.TaskOperationResult;
import org.apache.flink.runtime.state.OperatorState;
import org.apache.flink.util.ExceptionUtils;
import org.slf4j.Logger;
......@@ -122,6 +123,8 @@ public class Execution implements Serializable {
private volatile InstanceConnectionInfo assignedResourceLocation; // for the archived execution
private OperatorState operatorState;
// --------------------------------------------------------------------------------------------
public Execution(ExecutionVertex vertex, int attemptNumber, long startTimestamp, FiniteDuration timeout) {
......@@ -853,4 +856,12 @@ public class Execution implements Serializable {
return String.format("Attempt #%d (%s) @ %s - [%s]", attemptNumber, vertex.getSimpleName(),
(assignedResource == null ? "(unassigned)" : assignedResource.toString()), state);
}
public void setOperatorState(OperatorState operatorState) {
this.operatorState = operatorState;
}
public OperatorState getOperatorState() {
return operatorState;
}
}
......@@ -33,11 +33,13 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.ScheduleMode;
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
import org.apache.flink.runtime.messages.ExecutionGraphMessages;
import org.apache.flink.runtime.state.OperatorState;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.util.ExceptionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple3;
import scala.concurrent.duration.FiniteDuration;
import java.io.Serializable;
......@@ -529,6 +531,14 @@ public class ExecutionGraph implements Serializable {
return false;
}
}
public void loadOperatorStates(Map<Tuple3<JobVertexID, Integer, Long> ,OperatorState<?>> states)
{
for(Map.Entry<Tuple3<JobVertexID, Integer, Long> ,OperatorState<?>> state : states.entrySet())
{
tasks.get(state.getKey()._1()).getTaskVertices()[state.getKey()._2()].setOperatorState(state.getValue());
}
}
public void scheduleOrUpdateConsumers(ExecutionAttemptID executionId, int partitionIndex) {
Execution execution = currentExecutions.get(executionId);
......
......@@ -38,6 +38,7 @@ import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
import org.apache.flink.runtime.state.OperatorState;
import org.slf4j.Logger;
import scala.concurrent.duration.FiniteDuration;
......@@ -89,6 +90,8 @@ public class ExecutionVertex implements Serializable {
private volatile boolean scheduleLocalOnly;
private OperatorState operatorState;
// --------------------------------------------------------------------------------------------
public ExecutionVertex(ExecutionJobVertex jobVertex, int subTaskIndex,
......@@ -194,6 +197,14 @@ public class ExecutionVertex implements Serializable {
public InstanceConnectionInfo getCurrentAssignedResourceLocation() {
return currentExecution.getAssignedResourceLocation();
}
public void setOperatorState(OperatorState operatorState) {
this.operatorState = operatorState;
}
public OperatorState getOperatorState() {
return operatorState;
}
public ExecutionGraph getExecutionGraph() {
return this.jobVertex.getGraph();
......@@ -379,6 +390,12 @@ public class ExecutionVertex implements Serializable {
if (grp != null) {
this.locationConstraint = grp.getLocationConstraint(subTaskIndex);
}
if(operatorState!=null)
{
execution.setOperatorState(operatorState);
}
}
else {
throw new IllegalStateException("Cannot reset a vertex that is in state " + state);
......@@ -506,7 +523,7 @@ public class ExecutionVertex implements Serializable {
return new TaskDeploymentDescriptor(getJobId(), getJobvertexId(), executionId, getTaskName(),
subTaskIndex, getTotalNumberOfParallelSubtasks(), getExecutionGraph().getJobConfiguration(),
jobVertex.getJobVertex().getConfiguration(), jobVertex.getJobVertex().getInvokableClassName(),
producedPartitions, consumedPartitions, jarFiles, slot.getSlotNumber());
producedPartitions, consumedPartitions, jarFiles, slot.getSlotNumber(), operatorState);
}
// --------------------------------------------------------------------------------------------
......
......@@ -19,10 +19,7 @@
package org.apache.flink.runtime.io.network.api.reader;
import java.io.IOException;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.Queue;
import java.util.Set;
import org.apache.flink.core.io.IOReadableWritable;
import org.apache.flink.runtime.event.task.AbstractEvent;
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.jobgraph.tasks;
import org.apache.flink.runtime.state.OperatorState;
public interface OperatorStateCarrier {
public void injectState(OperatorState state);
}
......@@ -15,12 +15,10 @@
* limitations under the License.
*/
package org.apache.flink.streaming.state;
package org.apache.flink.runtime.state;
import java.io.Serializable;
import org.apache.flink.streaming.state.checkpoint.StateCheckpoint;
/**
* Abstract class for representing operator states in Flink programs. By
* implementing the methods declared in this abstraction the state of the
......@@ -33,7 +31,7 @@ public abstract class OperatorState<T> implements Serializable {
private static final long serialVersionUID = 1L;
protected T state;
public T state;
/**
* Constructor used for initializing the state. In case of failure, the
......
......@@ -15,12 +15,10 @@
* limitations under the License.
*/
package org.apache.flink.streaming.state.checkpoint;
package org.apache.flink.runtime.state;
import java.io.Serializable;
import org.apache.flink.streaming.state.OperatorState;
/**
* Base class for creating checkpoints for {@link OperatorState}. This
* checkpoints will be used to backup states in stateful Flink operators and
......@@ -34,7 +32,7 @@ public class StateCheckpoint<T> implements Serializable {
private static final long serialVersionUID = 1L;
T checkpointedState;
public T checkpointedState;
/**
* Creates a state checkpoint from the given {@link OperatorState}
......
......@@ -301,13 +301,19 @@ class JobManager(val configuration: Configuration,
jobInfo.client ! Failure(exception)
throw exception
}
barrierMonitors.get(jobID) match {
case Some(monitor) =>
monitor ! PoisonPill
barrierMonitors.remove(jobID)
newJobStatus match{
case JobStatus.FINISHED | JobStatus.CANCELED =>
monitor ! PoisonPill
barrierMonitors.remove(jobID)
case JobStatus.FAILING =>
monitor ! JobStateRequest
}
case None =>
removeJob(jobID)
}
removeJob(jobID)
}
else {
newJobStatus match {
......@@ -315,7 +321,10 @@ class JobManager(val configuration: Configuration,
case Some((executionGraph, _)) =>
//FIXME this is just a fast n dirty check for determining streaming jobs
if (executionGraph.getScheduleMode == ScheduleMode.ALL) {
barrierMonitors += jobID -> StreamStateMonitor.props(context, executionGraph)
barrierMonitors.get(jobID) match {
case None =>
barrierMonitors += jobID -> StreamStateMonitor.props(context, executionGraph)
}
}
case None =>
log.error("Cannot create state monitor for job ID {}.", jobID)
......@@ -327,12 +336,27 @@ class JobManager(val configuration: Configuration,
removeJob(jobID)
}
case BarrierAck(jobID, jobVertex, instanceID, checkpoint) =>
barrierMonitors.get(jobID) match {
case Some(monitor) => monitor ! BarrierAck(jobID, jobVertex, instanceID, checkpoint)
case msg: BarrierAck =>
barrierMonitors.get(msg.jobID) match {
case Some(monitor) => monitor ! msg
case None =>
}
case msg: StateBarrierAck =>
barrierMonitors.get(msg.jobID) match {
case Some(monitor) => monitor ! msg
case None =>
}
case msg: JobStateResponse =>
//inject initial states and restart the job
currentJobs.get(msg.jobID) match {
case Some(jobExecution) =>
import scala.collection.JavaConverters._
jobExecution._1.loadOperatorStates(msg.opStates.asJava)
jobExecution._1.restart()
case None =>
}
case ScheduleOrUpdateConsumers(jobId, executionId, partitionIndex) =>
currentJobs.get(jobId) match {
case Some((executionGraph, _)) =>
......
......@@ -20,65 +20,82 @@ package org.apache.flink.runtime.jobmanager
import akka.actor._
import org.apache.flink.runtime.ActorLogMessages
import org.apache.flink.runtime.executiongraph.{ExecutionAttemptID, ExecutionGraph, ExecutionVertex}
import org.apache.flink.runtime.jobgraph.{JobID, JobVertexID}
import org.apache.flink.runtime.executiongraph.{ExecutionAttemptID,ExecutionGraph,ExecutionVertex}
import org.apache.flink.runtime.jobgraph.{JobID,JobVertexID}
import org.apache.flink.runtime.state.OperatorState
import scala.collection.JavaConversions.mapAsScalaMap
import java.lang.Long
import scala.collection.JavaConversions._
import scala.collection.immutable.TreeMap
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration.{FiniteDuration, _}
import scala.concurrent.duration.{FiniteDuration,_}
object StreamStateMonitor {
def props(context: ActorContext, executionGraph: ExecutionGraph,
def props(context: ActorContext,executionGraph: ExecutionGraph,
interval: FiniteDuration = 5 seconds): ActorRef = {
val vertices: Iterable[ExecutionVertex] = getExecutionVertices(executionGraph)
val monitor = context.system.actorOf(Props(new StreamStateMonitor(executionGraph,
vertices, vertices.map(x => ((x.getJobVertex.getJobVertexId, x.getParallelSubtaskIndex), List.empty[Long])).toMap, interval, 0L, -1L)))
vertices,vertices.map(x => ((x.getJobVertex.getJobVertexId,x.getParallelSubtaskIndex),
List.empty[Long])).toMap,Map(),interval,0L,-1L)))
monitor ! InitBarrierScheduler
monitor
}
private def getExecutionVertices(executionGraph: ExecutionGraph): Iterable[ExecutionVertex] = {
for ((_, execJobVertex) <- executionGraph.getAllVertices;
execVertex: ExecutionVertex <- execJobVertex.getTaskVertices)
for((_,execJobVertex) <- executionGraph.getAllVertices;
execVertex: ExecutionVertex <- execJobVertex.getTaskVertices)
yield execVertex
}
}
class StreamStateMonitor(val executionGraph: ExecutionGraph,
val vertices: Iterable[ExecutionVertex], var acks: Map[(JobVertexID, Int), List[Long]],
val interval: FiniteDuration, var curId: Long, var ackId: Long)
val vertices: Iterable[ExecutionVertex],
var acks: Map[(JobVertexID,Int),List[Long]],
var states: Map[(JobVertexID, Integer, Long), OperatorState[_]],
val interval: FiniteDuration,var curId: Long,var ackId: Long)
extends Actor with ActorLogMessages with ActorLogging {
override def receiveWithLogMessages: Receive = {
case InitBarrierScheduler =>
context.system.scheduler.schedule(interval, interval, self, BarrierTimeout)
context.system.scheduler.schedule(2 * interval, 2 * interval, self, UpdateCurrentBarrier)
context.system.scheduler.schedule(interval,interval,self,BarrierTimeout)
context.system.scheduler.schedule(2 * interval,2 * interval,self,TriggerBarrierCompaction)
log.debug("[FT-MONITOR] Started Stream State Monitor for job {}{}",
executionGraph.getJobID, executionGraph.getJobName)
executionGraph.getJobID,executionGraph.getJobName)
case BarrierTimeout =>
curId += 1
log.debug("[FT-MONITOR] Sending Barrier to vertices of Job " + executionGraph.getJobName)
vertices.filter(v => v.getJobVertex.getJobVertex.isInputVertex).foreach(vertex
=> vertex.getCurrentAssignedResource.getInstance.getTaskManager
! BarrierReq(vertex.getCurrentExecutionAttempt.getAttemptId, curId))
case BarrierAck(_, jobVertexID, instanceID, checkpointID) =>
acks.get(jobVertexID, instanceID) match {
! BarrierReq(vertex.getCurrentExecutionAttempt.getAttemptId,curId))
case StateBarrierAck(jobID, jobVertexID, instanceID, checkpointID, opState) =>
states += (jobVertexID, instanceID, checkpointID) -> opState
self ! BarrierAck(jobID, jobVertexID, instanceID, checkpointID)
case BarrierAck(jobID, jobVertexID,instanceID,checkpointID) =>
acks.get(jobVertexID,instanceID) match {
case Some(acklist) =>
acks += (jobVertexID, instanceID) -> (checkpointID :: acklist)
acks += (jobVertexID,instanceID) -> (checkpointID :: acklist)
case None =>
}
log.info(acks.toString)
case UpdateCurrentBarrier =>
val barrierCount = acks.values.foldLeft(TreeMap[Long, Int]().withDefaultValue(0))((dict, myList)
=> myList.foldLeft(dict)((dict2, elem) => dict2.updated(elem, dict2(elem) + 1)))
log.debug(acks.toString)
case TriggerBarrierCompaction =>
val barrierCount = acks.values.foldLeft(TreeMap[Long,Int]().withDefaultValue(0))((dict,myList)
=> myList.foldLeft(dict)((dict2,elem) => dict2.updated(elem,dict2(elem) + 1)))
val keysToKeep = barrierCount.filter(_._2 == acks.size).keys
ackId = if (!keysToKeep.isEmpty) keysToKeep.max else ackId
acks.keys.foreach(x => acks = acks.updated(x, acks(x).filter(_ >= ackId)))
ackId = if(!keysToKeep.isEmpty) keysToKeep.max else ackId
acks.keys.foreach(x => acks = acks.updated(x,acks(x).filter(_ >= ackId)))
states = states.filterKeys(_._3 >= ackId)
log.debug("[FT-MONITOR] Last global barrier is " + ackId)
case JobStateRequest =>
sender ! JobStateResponse(executionGraph.getJobID, ackId, states)
}
}
......@@ -86,11 +103,20 @@ case class BarrierTimeout()
case class InitBarrierScheduler()
case class UpdateCurrentBarrier()
case class TriggerBarrierCompaction()
case class JobStateRequest()
case class JobStateResponse(jobID: JobID, barrierID: Long, opStates: Map[(JobVertexID, Integer,
Long), OperatorState[_]])
case class BarrierReq(attemptID: ExecutionAttemptID,checkpointID: Long)
case class BarrierReq(attemptID: ExecutionAttemptID, checkpointID: Long)
case class BarrierAck(jobID: JobID,jobVertexID: JobVertexID,instanceID: Int,checkpointID: Long)
case class BarrierAck(jobID: JobID, jobVertexID: JobVertexID, instanceID: Int, checkpointID: Long)
case class StateBarrierAck(jobID: JobID, jobVertexID: JobVertexID, instanceID: Integer,
checkpointID: Long, state: OperatorState[_])
......@@ -43,7 +43,7 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync
import org.apache.flink.runtime.io.network.NetworkEnvironment
import org.apache.flink.runtime.io.network.netty.NettyConfig
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID
import org.apache.flink.runtime.jobgraph.tasks.BarrierTransceiver
import org.apache.flink.runtime.jobgraph.tasks.{OperatorStateCarrier,BarrierTransceiver}
import org.apache.flink.runtime.jobmanager.{BarrierReq,JobManager}
import org.apache.flink.runtime.memorymanager.DefaultMemoryManager
import org.apache.flink.runtime.messages.JobManagerMessages.UpdateTaskExecutionState
......@@ -358,7 +358,9 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo,
if (i.getExecutionState == ExecutionState.RUNNING) {
i.getEnvironment.getInvokable match {
case barrierTransceiver: BarrierTransceiver =>
barrierTransceiver.broadcastBarrier(checkpointID)
new Thread(new Runnable {
override def run(): Unit = barrierTransceiver.broadcastBarrier(checkpointID);
}).start()
case _ => log.error("[FT-TaskManager] Received a barrier for the wrong vertex")
}
}
......@@ -415,6 +417,15 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo,
task = new Task(jobID, vertexID, taskIndex, numSubtasks, executionID,
tdd.getTaskName, self)
//inject operator state
if(tdd.getOperatorState != null)
{
val vertex = task.getEnvironment.getInvokable match {
case opStateCarrier: OperatorStateCarrier =>
opStateCarrier.injectState(tdd.getOperatorState)
}
}
runningTasks.put(executionID, task) match {
case Some(_) => throw new RuntimeException(
s"TaskManager contains already a task with executionID $executionID.")
......
......@@ -32,7 +32,7 @@ import org.apache.flink.streaming.api.invokable.StreamInvokable;
import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
import org.apache.flink.streaming.api.streamvertex.StreamVertexException;
import org.apache.flink.streaming.partitioner.StreamPartitioner;
import org.apache.flink.streaming.state.OperatorState;
import org.apache.flink.runtime.state.OperatorState;
import org.apache.flink.util.InstantiationUtil;
public class StreamConfig implements Serializable {
......
......@@ -46,7 +46,7 @@ import org.apache.flink.streaming.api.streamvertex.StreamIterationHead;
import org.apache.flink.streaming.api.streamvertex.StreamIterationTail;
import org.apache.flink.streaming.api.streamvertex.StreamVertex;
import org.apache.flink.streaming.partitioner.StreamPartitioner;
import org.apache.flink.streaming.state.OperatorState;
import org.apache.flink.runtime.state.OperatorState;
import org.apache.sling.commons.json.JSONArray;
import org.apache.sling.commons.json.JSONException;
import org.apache.sling.commons.json.JSONObject;
......
......@@ -26,7 +26,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.invokable.StreamInvokable;
import org.apache.flink.streaming.api.invokable.StreamInvokable.ChainingStrategy;
import org.apache.flink.streaming.api.streamvertex.StreamingRuntimeContext;
import org.apache.flink.streaming.state.OperatorState;
import org.apache.flink.runtime.state.OperatorState;
/**
* The SingleOutputStreamOperator represents a user defined transformation
......
......@@ -25,15 +25,17 @@ import org.apache.flink.runtime.event.task.TaskEvent;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobgraph.tasks.BarrierTransceiver;
import org.apache.flink.runtime.jobgraph.tasks.OperatorStateCarrier;
import org.apache.flink.runtime.jobmanager.BarrierAck;
import org.apache.flink.runtime.jobmanager.StateBarrierAck;
import org.apache.flink.runtime.util.event.EventListener;
import org.apache.flink.runtime.state.OperatorState;
import org.apache.flink.streaming.api.StreamConfig;
import org.apache.flink.streaming.api.invokable.ChainableInvokable;
import org.apache.flink.streaming.api.invokable.StreamInvokable;
import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
import org.apache.flink.streaming.io.CoReaderIterator;
import org.apache.flink.streaming.io.IndexedReaderIterator;
import org.apache.flink.streaming.state.OperatorState;
import org.apache.flink.util.Collector;
import org.apache.flink.util.MutableObjectIterator;
import org.apache.flink.util.StringUtils;
......@@ -43,7 +45,7 @@ import org.slf4j.LoggerFactory;
import akka.actor.ActorRef;
public class StreamVertex<IN, OUT> extends AbstractInvokable implements StreamTaskContext<OUT>,
BarrierTransceiver {
BarrierTransceiver, OperatorStateCarrier {
private static final Logger LOG = LoggerFactory.getLogger(StreamVertex.class);
......@@ -90,9 +92,27 @@ public class StreamVertex<IN, OUT> extends AbstractInvokable implements StreamTa
this.context = createRuntimeContext(getEnvironment().getTaskName(), this.states);
}
protected <T> void invokeUserFunction(StreamInvokable<?, T> userInvokable) throws Exception {
userInvokable.setRuntimeContext(context);
userInvokable.open(getTaskConfiguration());
for (ChainableInvokable<?, ?> invokable : outputHandler.chainedInvokables) {
invokable.setRuntimeContext(context);
invokable.open(getTaskConfiguration());
}
userInvokable.invoke();
userInvokable.close();
for (ChainableInvokable<?, ?> invokable : outputHandler.chainedInvokables) {
invokable.close();
}
}
@Override
public void broadcastBarrier(long id) {
// Only called at input vertices
//Only called at input vertices
if (LOG.isDebugEnabled()) {
LOG.debug("Received barrier from jobmanager: " + id);
}
......@@ -101,9 +121,21 @@ public class StreamVertex<IN, OUT> extends AbstractInvokable implements StreamTa
@Override
public void confirmBarrier(long barrierID) {
getEnvironment().getJobManager().tell(
new BarrierAck(getEnvironment().getJobID(), getEnvironment().getJobVertexId(),
context.getIndexOfThisSubtask(), barrierID), ActorRef.noSender());
if(states != null && states.containsKey("kafka"))
{
getEnvironment().getJobManager().tell(
new StateBarrierAck(getEnvironment().getJobID(),
getEnvironment().getJobVertexId(), context.getIndexOfThisSubtask(),
barrierID, states.get("kafka")), ActorRef.noSender());
}
else
{
getEnvironment().getJobManager().tell(
new BarrierAck(getEnvironment().getJobID(), getEnvironment().getJobVertexId(),
context.getIndexOfThisSubtask(), barrierID), ActorRef.noSender());
}
}
public void setInputsOutputs() {
......@@ -240,7 +272,8 @@ public class StreamVertex<IN, OUT> extends AbstractInvokable implements StreamTa
private void actOnBarrier(long id) {
try {
outputHandler.broadcastBarrier(id);
System.out.println("Superstep " + id + " processed: " + StreamVertex.this);
//TODO checkpoint state here
confirmBarrier(id);
if (LOG.isDebugEnabled()) {
LOG.debug("Superstep " + id + " processed: " + StreamVertex.this);
}
......@@ -256,6 +289,12 @@ public class StreamVertex<IN, OUT> extends AbstractInvokable implements StreamTa
return configuration.getOperatorName() + " (" + context.getIndexOfThisSubtask() + ")";
}
@Override
public void injectState(OperatorState state) {
states.put("kafka", state);
}
private class SuperstepEventListener implements EventListener<TaskEvent> {
@Override
......
......@@ -27,7 +27,7 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
import org.apache.flink.runtime.operators.util.TaskConfig;
import org.apache.flink.streaming.state.OperatorState;
import org.apache.flink.runtime.state.OperatorState;
/**
* Implementation of the {@link RuntimeContext}, created by runtime stream UDF
......
......@@ -23,8 +23,10 @@ import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.flink.runtime.state.OperatorState;
import org.apache.flink.streaming.state.checkpoint.MapCheckpoint;
import org.apache.flink.streaming.state.checkpoint.StateCheckpoint;
import org.apache.flink.runtime.state.StateCheckpoint;
/**
* A Map that can be used as a partitionable operator state, for both fault
......
......@@ -17,6 +17,8 @@
package org.apache.flink.streaming.state;
import org.apache.flink.runtime.state.OperatorState;
/**
* Base class for representing operator states that can be repartitioned for
* state state and load balancing.
......
......@@ -17,7 +17,8 @@
package org.apache.flink.streaming.state;
import org.apache.flink.streaming.state.checkpoint.StateCheckpoint;
import org.apache.flink.runtime.state.OperatorState;
import org.apache.flink.runtime.state.StateCheckpoint;
/**
* Basic {@link OperatorState} for storing and updating simple objects. By default the
......
......@@ -21,8 +21,9 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import org.apache.flink.runtime.state.OperatorState;
import org.apache.flink.runtime.state.StateCheckpoint;
import org.apache.flink.streaming.state.MapState;
import org.apache.flink.streaming.state.OperatorState;
public class MapCheckpoint<K, V> extends StateCheckpoint<Map<K, V>> {
......
......@@ -25,7 +25,7 @@ import java.util.HashMap;
import java.util.Map;
import org.apache.flink.streaming.state.checkpoint.MapCheckpoint;
import org.apache.flink.streaming.state.checkpoint.StateCheckpoint;
import org.apache.flink.runtime.state.StateCheckpoint;
import org.junit.Test;
public class MapStateTest {
......
......@@ -20,7 +20,8 @@ package org.apache.flink.streaming.state;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import org.apache.flink.streaming.state.checkpoint.StateCheckpoint;
import org.apache.flink.runtime.state.OperatorState;
import org.apache.flink.runtime.state.StateCheckpoint;
import org.junit.Test;
public class OperatorStateTest {
......
......@@ -101,6 +101,11 @@ public class WordCount {
// emit the pairs
for (String token : tokens) {
//FIXME to be removed. added this for test purposes
if("killme".equals(token))
{
throw new Exception("byee");
}
if (token.length() > 0) {
out.collect(new Tuple2<String, Integer>(token, 1));
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册