From 452c39a965d93aab84d2fea84345badd2cc45975 Mon Sep 17 00:00:00 2001 From: Paris Carbone Date: Wed, 4 Mar 2015 18:30:09 +0100 Subject: [PATCH] [FLINK-1638] [streaming] Operator state checkpointing and injection prototype --- .../deployment/TaskDeploymentDescriptor.java | 26 +++++++ .../runtime/executiongraph/Execution.java | 11 +++ .../executiongraph/ExecutionGraph.java | 10 +++ .../executiongraph/ExecutionVertex.java | 19 ++++- .../api/reader/AbstractRecordReader.java | 5 +- .../jobgraph/tasks/OperatorStateCarrier.java | 27 +++++++ .../flink/runtime}/state/OperatorState.java | 6 +- .../flink/runtime/state}/StateCheckpoint.java | 6 +- .../flink/runtime/jobmanager/JobManager.scala | 38 +++++++-- .../jobmanager/StreamStateMonitor.scala | 78 ++++++++++++------- .../runtime/taskmanager/TaskManager.scala | 15 +++- .../flink/streaming/api/StreamConfig.java | 2 +- .../flink/streaming/api/StreamGraph.java | 2 +- .../SingleOutputStreamOperator.java | 2 +- .../api/streamvertex/StreamVertex.java | 53 +++++++++++-- .../streamvertex/StreamingRuntimeContext.java | 2 +- .../flink/streaming/state/MapState.java | 4 +- .../streaming/state/PartitionableState.java | 2 + .../flink/streaming/state/SimpleState.java | 3 +- .../state/checkpoint/MapCheckpoint.java | 3 +- .../flink/streaming/state/MapStateTest.java | 2 +- .../streaming/state/OperatorStateTest.java | 3 +- .../examples/wordcount/WordCount.java | 5 ++ 23 files changed, 260 insertions(+), 64 deletions(-) create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/OperatorStateCarrier.java rename {flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming => flink-runtime/src/main/java/org/apache/flink/runtime}/state/OperatorState.java (95%) rename {flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/checkpoint => flink-runtime/src/main/java/org/apache/flink/runtime/state}/StateCheckpoint.java (94%) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java index 20204d5a585..a431a76e1f0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java @@ -23,6 +23,7 @@ import org.apache.flink.runtime.blob.BlobKey; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.jobgraph.JobID; import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.state.OperatorState; import java.io.Serializable; import java.util.ArrayList; @@ -76,6 +77,8 @@ public final class TaskDeploymentDescriptor implements Serializable { /** The list of JAR files required to run this task. */ private final List requiredJarFiles; + + private OperatorState operatorState; /** * Constructs a task deployment descriptor. @@ -119,6 +122,21 @@ public final class TaskDeploymentDescriptor implements Serializable { this.requiredJarFiles = new ArrayList(); } + public TaskDeploymentDescriptor( + JobID jobID, JobVertexID vertexID, ExecutionAttemptID executionId, String taskName, + int indexInSubtaskGroup, int numberOfSubtasks, Configuration jobConfiguration, + Configuration taskConfiguration, String invokableClassName, + List producedPartitions, + List consumedPartitions, + List requiredJarFiles, int targetSlotNumber, OperatorState operatorState) { + + this(jobID, vertexID, executionId, taskName, indexInSubtaskGroup, numberOfSubtasks, + jobConfiguration, taskConfiguration, invokableClassName, producedPartitions, + consumedPartitions, requiredJarFiles, targetSlotNumber); + + setOperatorState(operatorState); + } + /** * Returns the ID of the job the tasks belongs to. */ @@ -224,4 +242,12 @@ public final class TaskDeploymentDescriptor implements Serializable { taskName, indexInSubtaskGroup, numberOfSubtasks, invokableClassName, strProducedPartitions, strConsumedPartitions); } + + public void setOperatorState(OperatorState operatorState) { + this.operatorState = operatorState; + } + + public OperatorState getOperatorState() { + return operatorState; + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java index 57ed4c0e495..89f518389eb 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java @@ -46,6 +46,7 @@ import org.apache.flink.runtime.jobmanager.scheduler.SlotAllocationFutureAction; import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; import org.apache.flink.runtime.messages.TaskManagerMessages; import org.apache.flink.runtime.messages.TaskManagerMessages.TaskOperationResult; +import org.apache.flink.runtime.state.OperatorState; import org.apache.flink.util.ExceptionUtils; import org.slf4j.Logger; @@ -122,6 +123,8 @@ public class Execution implements Serializable { private volatile InstanceConnectionInfo assignedResourceLocation; // for the archived execution + private OperatorState operatorState; + // -------------------------------------------------------------------------------------------- public Execution(ExecutionVertex vertex, int attemptNumber, long startTimestamp, FiniteDuration timeout) { @@ -853,4 +856,12 @@ public class Execution implements Serializable { return String.format("Attempt #%d (%s) @ %s - [%s]", attemptNumber, vertex.getSimpleName(), (assignedResource == null ? "(unassigned)" : assignedResource.toString()), state); } + + public void setOperatorState(OperatorState operatorState) { + this.operatorState = operatorState; + } + + public OperatorState getOperatorState() { + return operatorState; + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java index e6d9c85d88f..bf34e33cab8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java @@ -33,11 +33,13 @@ import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.ScheduleMode; import org.apache.flink.runtime.jobmanager.scheduler.Scheduler; import org.apache.flink.runtime.messages.ExecutionGraphMessages; +import org.apache.flink.runtime.state.OperatorState; import org.apache.flink.runtime.taskmanager.TaskExecutionState; import org.apache.flink.util.ExceptionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import scala.Tuple3; import scala.concurrent.duration.FiniteDuration; import java.io.Serializable; @@ -529,6 +531,14 @@ public class ExecutionGraph implements Serializable { return false; } } + + public void loadOperatorStates(Map ,OperatorState> states) + { + for(Map.Entry ,OperatorState> state : states.entrySet()) + { + tasks.get(state.getKey()._1()).getTaskVertices()[state.getKey()._2()].setOperatorState(state.getValue()); + } + } public void scheduleOrUpdateConsumers(ExecutionAttemptID executionId, int partitionIndex) { Execution execution = currentExecutions.get(executionId); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java index 0158fbfe401..b7f962a79ea 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java @@ -38,6 +38,7 @@ import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint; import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup; import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException; import org.apache.flink.runtime.jobmanager.scheduler.Scheduler; +import org.apache.flink.runtime.state.OperatorState; import org.slf4j.Logger; import scala.concurrent.duration.FiniteDuration; @@ -89,6 +90,8 @@ public class ExecutionVertex implements Serializable { private volatile boolean scheduleLocalOnly; + private OperatorState operatorState; + // -------------------------------------------------------------------------------------------- public ExecutionVertex(ExecutionJobVertex jobVertex, int subTaskIndex, @@ -194,6 +197,14 @@ public class ExecutionVertex implements Serializable { public InstanceConnectionInfo getCurrentAssignedResourceLocation() { return currentExecution.getAssignedResourceLocation(); } + + public void setOperatorState(OperatorState operatorState) { + this.operatorState = operatorState; + } + + public OperatorState getOperatorState() { + return operatorState; + } public ExecutionGraph getExecutionGraph() { return this.jobVertex.getGraph(); @@ -379,6 +390,12 @@ public class ExecutionVertex implements Serializable { if (grp != null) { this.locationConstraint = grp.getLocationConstraint(subTaskIndex); } + + if(operatorState!=null) + { + execution.setOperatorState(operatorState); + } + } else { throw new IllegalStateException("Cannot reset a vertex that is in state " + state); @@ -506,7 +523,7 @@ public class ExecutionVertex implements Serializable { return new TaskDeploymentDescriptor(getJobId(), getJobvertexId(), executionId, getTaskName(), subTaskIndex, getTotalNumberOfParallelSubtasks(), getExecutionGraph().getJobConfiguration(), jobVertex.getJobVertex().getConfiguration(), jobVertex.getJobVertex().getInvokableClassName(), - producedPartitions, consumedPartitions, jarFiles, slot.getSlotNumber()); + producedPartitions, consumedPartitions, jarFiles, slot.getSlotNumber(), operatorState); } // -------------------------------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java index cc36438c19a..920792c3627 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java @@ -19,10 +19,7 @@ package org.apache.flink.runtime.io.network.api.reader; import java.io.IOException; -import java.util.HashSet; -import java.util.LinkedList; -import java.util.Queue; -import java.util.Set; + import org.apache.flink.core.io.IOReadableWritable; import org.apache.flink.runtime.event.task.AbstractEvent; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/OperatorStateCarrier.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/OperatorStateCarrier.java new file mode 100644 index 00000000000..6ea4f27fd66 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/OperatorStateCarrier.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobgraph.tasks; + +import org.apache.flink.runtime.state.OperatorState; + +public interface OperatorStateCarrier { + + public void injectState(OperatorState state); + +} diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/OperatorState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorState.java similarity index 95% rename from flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/OperatorState.java rename to flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorState.java index a0cedba67e9..74ea1a77f42 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/OperatorState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorState.java @@ -15,12 +15,10 @@ * limitations under the License. */ -package org.apache.flink.streaming.state; +package org.apache.flink.runtime.state; import java.io.Serializable; -import org.apache.flink.streaming.state.checkpoint.StateCheckpoint; - /** * Abstract class for representing operator states in Flink programs. By * implementing the methods declared in this abstraction the state of the @@ -33,7 +31,7 @@ public abstract class OperatorState implements Serializable { private static final long serialVersionUID = 1L; - protected T state; + public T state; /** * Constructor used for initializing the state. In case of failure, the diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/checkpoint/StateCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateCheckpoint.java similarity index 94% rename from flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/checkpoint/StateCheckpoint.java rename to flink-runtime/src/main/java/org/apache/flink/runtime/state/StateCheckpoint.java index 8b76245738d..4e4906e9b43 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/checkpoint/StateCheckpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateCheckpoint.java @@ -15,12 +15,10 @@ * limitations under the License. */ -package org.apache.flink.streaming.state.checkpoint; +package org.apache.flink.runtime.state; import java.io.Serializable; -import org.apache.flink.streaming.state.OperatorState; - /** * Base class for creating checkpoints for {@link OperatorState}. This * checkpoints will be used to backup states in stateful Flink operators and @@ -34,7 +32,7 @@ public class StateCheckpoint implements Serializable { private static final long serialVersionUID = 1L; - T checkpointedState; + public T checkpointedState; /** * Creates a state checkpoint from the given {@link OperatorState} diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index 4f58ba755a5..97a60990820 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -301,13 +301,19 @@ class JobManager(val configuration: Configuration, jobInfo.client ! Failure(exception) throw exception } + barrierMonitors.get(jobID) match { case Some(monitor) => - monitor ! PoisonPill - barrierMonitors.remove(jobID) + newJobStatus match{ + case JobStatus.FINISHED | JobStatus.CANCELED => + monitor ! PoisonPill + barrierMonitors.remove(jobID) + case JobStatus.FAILING => + monitor ! JobStateRequest + } case None => + removeJob(jobID) } - removeJob(jobID) } else { newJobStatus match { @@ -315,7 +321,10 @@ class JobManager(val configuration: Configuration, case Some((executionGraph, _)) => //FIXME this is just a fast n dirty check for determining streaming jobs if (executionGraph.getScheduleMode == ScheduleMode.ALL) { - barrierMonitors += jobID -> StreamStateMonitor.props(context, executionGraph) + barrierMonitors.get(jobID) match { + case None => + barrierMonitors += jobID -> StreamStateMonitor.props(context, executionGraph) + } } case None => log.error("Cannot create state monitor for job ID {}.", jobID) @@ -327,12 +336,27 @@ class JobManager(val configuration: Configuration, removeJob(jobID) } - case BarrierAck(jobID, jobVertex, instanceID, checkpoint) => - barrierMonitors.get(jobID) match { - case Some(monitor) => monitor ! BarrierAck(jobID, jobVertex, instanceID, checkpoint) + case msg: BarrierAck => + barrierMonitors.get(msg.jobID) match { + case Some(monitor) => monitor ! msg + case None => + } + case msg: StateBarrierAck => + barrierMonitors.get(msg.jobID) match { + case Some(monitor) => monitor ! msg case None => } + case msg: JobStateResponse => + //inject initial states and restart the job + currentJobs.get(msg.jobID) match { + case Some(jobExecution) => + import scala.collection.JavaConverters._ + jobExecution._1.loadOperatorStates(msg.opStates.asJava) + jobExecution._1.restart() + case None => + } + case ScheduleOrUpdateConsumers(jobId, executionId, partitionIndex) => currentJobs.get(jobId) match { case Some((executionGraph, _)) => diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/StreamStateMonitor.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/StreamStateMonitor.scala index a37ddb509ee..65840f95668 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/StreamStateMonitor.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/StreamStateMonitor.scala @@ -20,65 +20,82 @@ package org.apache.flink.runtime.jobmanager import akka.actor._ import org.apache.flink.runtime.ActorLogMessages -import org.apache.flink.runtime.executiongraph.{ExecutionAttemptID, ExecutionGraph, ExecutionVertex} -import org.apache.flink.runtime.jobgraph.{JobID, JobVertexID} +import org.apache.flink.runtime.executiongraph.{ExecutionAttemptID,ExecutionGraph,ExecutionVertex} +import org.apache.flink.runtime.jobgraph.{JobID,JobVertexID} +import org.apache.flink.runtime.state.OperatorState -import scala.collection.JavaConversions.mapAsScalaMap +import java.lang.Long +import scala.collection.JavaConversions._ import scala.collection.immutable.TreeMap import scala.concurrent.ExecutionContext.Implicits.global -import scala.concurrent.duration.{FiniteDuration, _} +import scala.concurrent.duration.{FiniteDuration,_} object StreamStateMonitor { - def props(context: ActorContext, executionGraph: ExecutionGraph, + def props(context: ActorContext,executionGraph: ExecutionGraph, interval: FiniteDuration = 5 seconds): ActorRef = { val vertices: Iterable[ExecutionVertex] = getExecutionVertices(executionGraph) val monitor = context.system.actorOf(Props(new StreamStateMonitor(executionGraph, - vertices, vertices.map(x => ((x.getJobVertex.getJobVertexId, x.getParallelSubtaskIndex), List.empty[Long])).toMap, interval, 0L, -1L))) + vertices,vertices.map(x => ((x.getJobVertex.getJobVertexId,x.getParallelSubtaskIndex), + List.empty[Long])).toMap,Map(),interval,0L,-1L))) monitor ! InitBarrierScheduler monitor } private def getExecutionVertices(executionGraph: ExecutionGraph): Iterable[ExecutionVertex] = { - for ((_, execJobVertex) <- executionGraph.getAllVertices; - execVertex: ExecutionVertex <- execJobVertex.getTaskVertices) + for((_,execJobVertex) <- executionGraph.getAllVertices; + execVertex: ExecutionVertex <- execJobVertex.getTaskVertices) yield execVertex } } class StreamStateMonitor(val executionGraph: ExecutionGraph, - val vertices: Iterable[ExecutionVertex], var acks: Map[(JobVertexID, Int), List[Long]], - val interval: FiniteDuration, var curId: Long, var ackId: Long) + val vertices: Iterable[ExecutionVertex], + var acks: Map[(JobVertexID,Int),List[Long]], + var states: Map[(JobVertexID, Integer, Long), OperatorState[_]], + val interval: FiniteDuration,var curId: Long,var ackId: Long) extends Actor with ActorLogMessages with ActorLogging { override def receiveWithLogMessages: Receive = { + case InitBarrierScheduler => - context.system.scheduler.schedule(interval, interval, self, BarrierTimeout) - context.system.scheduler.schedule(2 * interval, 2 * interval, self, UpdateCurrentBarrier) + context.system.scheduler.schedule(interval,interval,self,BarrierTimeout) + context.system.scheduler.schedule(2 * interval,2 * interval,self,TriggerBarrierCompaction) log.debug("[FT-MONITOR] Started Stream State Monitor for job {}{}", - executionGraph.getJobID, executionGraph.getJobName) + executionGraph.getJobID,executionGraph.getJobName) + case BarrierTimeout => curId += 1 log.debug("[FT-MONITOR] Sending Barrier to vertices of Job " + executionGraph.getJobName) vertices.filter(v => v.getJobVertex.getJobVertex.isInputVertex).foreach(vertex => vertex.getCurrentAssignedResource.getInstance.getTaskManager - ! BarrierReq(vertex.getCurrentExecutionAttempt.getAttemptId, curId)) - case BarrierAck(_, jobVertexID, instanceID, checkpointID) => - acks.get(jobVertexID, instanceID) match { + ! BarrierReq(vertex.getCurrentExecutionAttempt.getAttemptId,curId)) + + case StateBarrierAck(jobID, jobVertexID, instanceID, checkpointID, opState) => + states += (jobVertexID, instanceID, checkpointID) -> opState + self ! BarrierAck(jobID, jobVertexID, instanceID, checkpointID) + + case BarrierAck(jobID, jobVertexID,instanceID,checkpointID) => + acks.get(jobVertexID,instanceID) match { case Some(acklist) => - acks += (jobVertexID, instanceID) -> (checkpointID :: acklist) + acks += (jobVertexID,instanceID) -> (checkpointID :: acklist) case None => } - log.info(acks.toString) - case UpdateCurrentBarrier => - val barrierCount = acks.values.foldLeft(TreeMap[Long, Int]().withDefaultValue(0))((dict, myList) - => myList.foldLeft(dict)((dict2, elem) => dict2.updated(elem, dict2(elem) + 1))) + log.debug(acks.toString) + + case TriggerBarrierCompaction => + val barrierCount = acks.values.foldLeft(TreeMap[Long,Int]().withDefaultValue(0))((dict,myList) + => myList.foldLeft(dict)((dict2,elem) => dict2.updated(elem,dict2(elem) + 1))) val keysToKeep = barrierCount.filter(_._2 == acks.size).keys - ackId = if (!keysToKeep.isEmpty) keysToKeep.max else ackId - acks.keys.foreach(x => acks = acks.updated(x, acks(x).filter(_ >= ackId))) + ackId = if(!keysToKeep.isEmpty) keysToKeep.max else ackId + acks.keys.foreach(x => acks = acks.updated(x,acks(x).filter(_ >= ackId))) + states = states.filterKeys(_._3 >= ackId) log.debug("[FT-MONITOR] Last global barrier is " + ackId) + + case JobStateRequest => + sender ! JobStateResponse(executionGraph.getJobID, ackId, states) } } @@ -86,11 +103,20 @@ case class BarrierTimeout() case class InitBarrierScheduler() -case class UpdateCurrentBarrier() +case class TriggerBarrierCompaction() + +case class JobStateRequest() + +case class JobStateResponse(jobID: JobID, barrierID: Long, opStates: Map[(JobVertexID, Integer, + Long), OperatorState[_]]) + +case class BarrierReq(attemptID: ExecutionAttemptID,checkpointID: Long) -case class BarrierReq(attemptID: ExecutionAttemptID, checkpointID: Long) +case class BarrierAck(jobID: JobID,jobVertexID: JobVertexID,instanceID: Int,checkpointID: Long) -case class BarrierAck(jobID: JobID, jobVertexID: JobVertexID, instanceID: Int, checkpointID: Long) +case class StateBarrierAck(jobID: JobID, jobVertexID: JobVertexID, instanceID: Integer, + checkpointID: Long, state: OperatorState[_]) + diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala index 427114132c4..497e784a6a3 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala @@ -43,7 +43,7 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync import org.apache.flink.runtime.io.network.NetworkEnvironment import org.apache.flink.runtime.io.network.netty.NettyConfig import org.apache.flink.runtime.jobgraph.IntermediateDataSetID -import org.apache.flink.runtime.jobgraph.tasks.BarrierTransceiver +import org.apache.flink.runtime.jobgraph.tasks.{OperatorStateCarrier,BarrierTransceiver} import org.apache.flink.runtime.jobmanager.{BarrierReq,JobManager} import org.apache.flink.runtime.memorymanager.DefaultMemoryManager import org.apache.flink.runtime.messages.JobManagerMessages.UpdateTaskExecutionState @@ -358,7 +358,9 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo, if (i.getExecutionState == ExecutionState.RUNNING) { i.getEnvironment.getInvokable match { case barrierTransceiver: BarrierTransceiver => - barrierTransceiver.broadcastBarrier(checkpointID) + new Thread(new Runnable { + override def run(): Unit = barrierTransceiver.broadcastBarrier(checkpointID); + }).start() case _ => log.error("[FT-TaskManager] Received a barrier for the wrong vertex") } } @@ -415,6 +417,15 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo, task = new Task(jobID, vertexID, taskIndex, numSubtasks, executionID, tdd.getTaskName, self) + //inject operator state + if(tdd.getOperatorState != null) + { + val vertex = task.getEnvironment.getInvokable match { + case opStateCarrier: OperatorStateCarrier => + opStateCarrier.injectState(tdd.getOperatorState) + } + } + runningTasks.put(executionID, task) match { case Some(_) => throw new RuntimeException( s"TaskManager contains already a task with executionID $executionID.") diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java index d464ef107a6..7c90629e7f9 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java @@ -32,7 +32,7 @@ import org.apache.flink.streaming.api.invokable.StreamInvokable; import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer; import org.apache.flink.streaming.api.streamvertex.StreamVertexException; import org.apache.flink.streaming.partitioner.StreamPartitioner; -import org.apache.flink.streaming.state.OperatorState; +import org.apache.flink.runtime.state.OperatorState; import org.apache.flink.util.InstantiationUtil; public class StreamConfig implements Serializable { diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java index f69605bea1b..640416d62e7 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java @@ -46,7 +46,7 @@ import org.apache.flink.streaming.api.streamvertex.StreamIterationHead; import org.apache.flink.streaming.api.streamvertex.StreamIterationTail; import org.apache.flink.streaming.api.streamvertex.StreamVertex; import org.apache.flink.streaming.partitioner.StreamPartitioner; -import org.apache.flink.streaming.state.OperatorState; +import org.apache.flink.runtime.state.OperatorState; import org.apache.sling.commons.json.JSONArray; import org.apache.sling.commons.json.JSONException; import org.apache.sling.commons.json.JSONObject; diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java index cdf43ee07a6..b0fc3644cd0 100755 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java @@ -26,7 +26,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.invokable.StreamInvokable; import org.apache.flink.streaming.api.invokable.StreamInvokable.ChainingStrategy; import org.apache.flink.streaming.api.streamvertex.StreamingRuntimeContext; -import org.apache.flink.streaming.state.OperatorState; +import org.apache.flink.runtime.state.OperatorState; /** * The SingleOutputStreamOperator represents a user defined transformation diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java index e2cdc342932..24a90d09555 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java @@ -25,15 +25,17 @@ import org.apache.flink.runtime.event.task.TaskEvent; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.jobgraph.tasks.BarrierTransceiver; +import org.apache.flink.runtime.jobgraph.tasks.OperatorStateCarrier; import org.apache.flink.runtime.jobmanager.BarrierAck; +import org.apache.flink.runtime.jobmanager.StateBarrierAck; import org.apache.flink.runtime.util.event.EventListener; +import org.apache.flink.runtime.state.OperatorState; import org.apache.flink.streaming.api.StreamConfig; import org.apache.flink.streaming.api.invokable.ChainableInvokable; import org.apache.flink.streaming.api.invokable.StreamInvokable; import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer; import org.apache.flink.streaming.io.CoReaderIterator; import org.apache.flink.streaming.io.IndexedReaderIterator; -import org.apache.flink.streaming.state.OperatorState; import org.apache.flink.util.Collector; import org.apache.flink.util.MutableObjectIterator; import org.apache.flink.util.StringUtils; @@ -43,7 +45,7 @@ import org.slf4j.LoggerFactory; import akka.actor.ActorRef; public class StreamVertex extends AbstractInvokable implements StreamTaskContext, - BarrierTransceiver { + BarrierTransceiver, OperatorStateCarrier { private static final Logger LOG = LoggerFactory.getLogger(StreamVertex.class); @@ -90,9 +92,27 @@ public class StreamVertex extends AbstractInvokable implements StreamTa this.context = createRuntimeContext(getEnvironment().getTaskName(), this.states); } + protected void invokeUserFunction(StreamInvokable userInvokable) throws Exception { + userInvokable.setRuntimeContext(context); + userInvokable.open(getTaskConfiguration()); + + for (ChainableInvokable invokable : outputHandler.chainedInvokables) { + invokable.setRuntimeContext(context); + invokable.open(getTaskConfiguration()); + } + + userInvokable.invoke(); + userInvokable.close(); + + for (ChainableInvokable invokable : outputHandler.chainedInvokables) { + invokable.close(); + } + + } + @Override public void broadcastBarrier(long id) { - // Only called at input vertices + //Only called at input vertices if (LOG.isDebugEnabled()) { LOG.debug("Received barrier from jobmanager: " + id); } @@ -101,9 +121,21 @@ public class StreamVertex extends AbstractInvokable implements StreamTa @Override public void confirmBarrier(long barrierID) { - getEnvironment().getJobManager().tell( - new BarrierAck(getEnvironment().getJobID(), getEnvironment().getJobVertexId(), - context.getIndexOfThisSubtask(), barrierID), ActorRef.noSender()); + + if(states != null && states.containsKey("kafka")) + { + getEnvironment().getJobManager().tell( + new StateBarrierAck(getEnvironment().getJobID(), + getEnvironment().getJobVertexId(), context.getIndexOfThisSubtask(), + barrierID, states.get("kafka")), ActorRef.noSender()); + } + else + { + getEnvironment().getJobManager().tell( + new BarrierAck(getEnvironment().getJobID(), getEnvironment().getJobVertexId(), + context.getIndexOfThisSubtask(), barrierID), ActorRef.noSender()); + } + } public void setInputsOutputs() { @@ -240,7 +272,8 @@ public class StreamVertex extends AbstractInvokable implements StreamTa private void actOnBarrier(long id) { try { outputHandler.broadcastBarrier(id); - System.out.println("Superstep " + id + " processed: " + StreamVertex.this); + //TODO checkpoint state here + confirmBarrier(id); if (LOG.isDebugEnabled()) { LOG.debug("Superstep " + id + " processed: " + StreamVertex.this); } @@ -256,6 +289,12 @@ public class StreamVertex extends AbstractInvokable implements StreamTa return configuration.getOperatorName() + " (" + context.getIndexOfThisSubtask() + ")"; } + @Override + public void injectState(OperatorState state) { + states.put("kafka", state); + } + + private class SuperstepEventListener implements EventListener { @Override diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingRuntimeContext.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingRuntimeContext.java index 0daf3c2bc91..a47100bff17 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingRuntimeContext.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingRuntimeContext.java @@ -27,7 +27,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider; import org.apache.flink.runtime.operators.util.TaskConfig; -import org.apache.flink.streaming.state.OperatorState; +import org.apache.flink.runtime.state.OperatorState; /** * Implementation of the {@link RuntimeContext}, created by runtime stream UDF diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/MapState.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/MapState.java index 85aec52b5b0..1b861f56664 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/MapState.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/MapState.java @@ -23,8 +23,10 @@ import java.util.HashSet; import java.util.Map; import java.util.Set; +import org.apache.flink.runtime.state.OperatorState; import org.apache.flink.streaming.state.checkpoint.MapCheckpoint; -import org.apache.flink.streaming.state.checkpoint.StateCheckpoint; +import org.apache.flink.runtime.state.StateCheckpoint; + /** * A Map that can be used as a partitionable operator state, for both fault diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/PartitionableState.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/PartitionableState.java index c58c545affc..ddedcd906a9 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/PartitionableState.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/PartitionableState.java @@ -17,6 +17,8 @@ package org.apache.flink.streaming.state; +import org.apache.flink.runtime.state.OperatorState; + /** * Base class for representing operator states that can be repartitioned for * state state and load balancing. diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/SimpleState.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/SimpleState.java index 7ae1f812665..b76f5acc486 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/SimpleState.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/SimpleState.java @@ -17,7 +17,8 @@ package org.apache.flink.streaming.state; -import org.apache.flink.streaming.state.checkpoint.StateCheckpoint; +import org.apache.flink.runtime.state.OperatorState; +import org.apache.flink.runtime.state.StateCheckpoint; /** * Basic {@link OperatorState} for storing and updating simple objects. By default the diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/checkpoint/MapCheckpoint.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/checkpoint/MapCheckpoint.java index 15d1fd52c21..ee27d4f545d 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/checkpoint/MapCheckpoint.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/checkpoint/MapCheckpoint.java @@ -21,8 +21,9 @@ import java.util.HashMap; import java.util.Map; import java.util.Set; +import org.apache.flink.runtime.state.OperatorState; +import org.apache.flink.runtime.state.StateCheckpoint; import org.apache.flink.streaming.state.MapState; -import org.apache.flink.streaming.state.OperatorState; public class MapCheckpoint extends StateCheckpoint> { diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/state/MapStateTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/state/MapStateTest.java index 194403cf011..98bafe43172 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/state/MapStateTest.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/state/MapStateTest.java @@ -25,7 +25,7 @@ import java.util.HashMap; import java.util.Map; import org.apache.flink.streaming.state.checkpoint.MapCheckpoint; -import org.apache.flink.streaming.state.checkpoint.StateCheckpoint; +import org.apache.flink.runtime.state.StateCheckpoint; import org.junit.Test; public class MapStateTest { diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/state/OperatorStateTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/state/OperatorStateTest.java index 6cb8f51b406..4e07a3ffe3d 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/state/OperatorStateTest.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/state/OperatorStateTest.java @@ -20,7 +20,8 @@ package org.apache.flink.streaming.state; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; -import org.apache.flink.streaming.state.checkpoint.StateCheckpoint; +import org.apache.flink.runtime.state.OperatorState; +import org.apache.flink.runtime.state.StateCheckpoint; import org.junit.Test; public class OperatorStateTest { diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java index c207d606d2d..b7a1ba3d49d 100644 --- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java +++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java @@ -101,6 +101,11 @@ public class WordCount { // emit the pairs for (String token : tokens) { + //FIXME to be removed. added this for test purposes + if("killme".equals(token)) + { + throw new Exception("byee"); + } if (token.length() > 0) { out.collect(new Tuple2(token, 1)); } -- GitLab