diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java index a431a76e1f0897de1bb24f97df5bfb7b1ecb24c2..b2573f7e01e5f9fbc0e5f31311c085319c91cae9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java @@ -28,6 +28,7 @@ import org.apache.flink.runtime.state.OperatorState; import java.io.Serializable; import java.util.ArrayList; import java.util.List; +import java.util.Map; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; @@ -78,7 +79,7 @@ public final class TaskDeploymentDescriptor implements Serializable { /** The list of JAR files required to run this task. */ private final List requiredJarFiles; - private OperatorState operatorState; + private Map> operatorStates; /** * Constructs a task deployment descriptor. @@ -128,13 +129,13 @@ public final class TaskDeploymentDescriptor implements Serializable { Configuration taskConfiguration, String invokableClassName, List producedPartitions, List consumedPartitions, - List requiredJarFiles, int targetSlotNumber, OperatorState operatorState) { + List requiredJarFiles, int targetSlotNumber, Map> operatorStates) { this(jobID, vertexID, executionId, taskName, indexInSubtaskGroup, numberOfSubtasks, jobConfiguration, taskConfiguration, invokableClassName, producedPartitions, consumedPartitions, requiredJarFiles, targetSlotNumber); - setOperatorState(operatorState); + setOperatorStates(operatorStates); } /** @@ -243,11 +244,11 @@ public final class TaskDeploymentDescriptor implements Serializable { strProducedPartitions, strConsumedPartitions); } - public void setOperatorState(OperatorState operatorState) { - this.operatorState = operatorState; + public void setOperatorStates(Map> operatorStates) { + this.operatorStates = operatorStates; } - public OperatorState getOperatorState() { - return operatorState; + public Map> getOperatorStates() { + return operatorStates; } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java index 89f518389eb81a0268e4566984db9f3edeb7c36d..93845c729b401a837c231c08052cdf26420618c4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java @@ -56,6 +56,7 @@ import scala.concurrent.duration.FiniteDuration; import java.io.Serializable; import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.TimeoutException; @@ -123,7 +124,7 @@ public class Execution implements Serializable { private volatile InstanceConnectionInfo assignedResourceLocation; // for the archived execution - private OperatorState operatorState; + private Map> operatorStates; // -------------------------------------------------------------------------------------------- @@ -857,11 +858,11 @@ public class Execution implements Serializable { (assignedResource == null ? "(unassigned)" : assignedResource.toString()), state); } - public void setOperatorState(OperatorState operatorState) { - this.operatorState = operatorState; + public void setOperatorStates(Map> operatorStates) { + this.operatorStates = operatorStates; } - public OperatorState getOperatorState() { - return operatorState; + public Map> getOperatorStates() { + return operatorStates; } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java index bf34e33cab8a2f107d0f2b28b263ad95042455ce..0c6c3a78b6c9e39426007e142bba4231b23853ca 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java @@ -18,8 +18,8 @@ package org.apache.flink.runtime.executiongraph; +import akka.actor.ActorContext; import akka.actor.ActorRef; - import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.JobException; import org.apache.flink.runtime.akka.AkkaUtils; @@ -31,6 +31,7 @@ import org.apache.flink.runtime.jobgraph.JobID; import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.ScheduleMode; +import org.apache.flink.runtime.jobmanager.StreamCheckpointCoordinator; import org.apache.flink.runtime.jobmanager.scheduler.Scheduler; import org.apache.flink.runtime.messages.ExecutionGraphMessages; import org.apache.flink.runtime.state.OperatorState; @@ -38,8 +39,8 @@ import org.apache.flink.runtime.taskmanager.TaskExecutionState; import org.apache.flink.util.ExceptionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import scala.Tuple3; +import scala.concurrent.duration.Duration; import scala.concurrent.duration.FiniteDuration; import java.io.Serializable; @@ -52,6 +53,7 @@ import java.util.NoSuchElementException; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import static akka.dispatch.Futures.future; @@ -118,6 +120,14 @@ public class ExecutionGraph implements Serializable { private boolean allowQueuedScheduling = true; + private ActorContext parentContext; + + private ActorRef stateMonitorActor; + + private boolean monitoringEnabled; + + private long monitoringInterval = 10000; + private ScheduleMode scheduleMode = ScheduleMode.FROM_SOURCES; @@ -159,6 +169,18 @@ public class ExecutionGraph implements Serializable { } // -------------------------------------------------------------------------------------------- + + public void setStateMonitorActor(ActorRef stateMonitorActor) { + this.stateMonitorActor = stateMonitorActor; + } + + public ActorRef getStateMonitorActor() { + return stateMonitorActor; + } + + public void setParentContext(ActorContext parentContext) { + this.parentContext = parentContext; + } public void setNumberOfRetriesLeft(int numberOfRetriesLeft) { if (numberOfRetriesLeft < -1) { @@ -214,6 +236,14 @@ public class ExecutionGraph implements Serializable { } } + public void setMonitoringEnabled(boolean monitoringEnabled) { + this.monitoringEnabled = monitoringEnabled; + } + + public void setMonitoringInterval(long monitoringInterval) { + this.monitoringInterval = monitoringInterval; + } + /** * Returns a list of BLOB keys referring to the JAR files required to run this job * @return list of BLOB keys referring to the JAR files required to run this job @@ -361,12 +391,17 @@ public class ExecutionGraph implements Serializable { for (ExecutionJobVertex ejv : getVerticesTopologically()) { ejv.scheduleAll(scheduler, allowQueuedScheduling); } - break; case BACKTRACKING: throw new JobException("BACKTRACKING is currently not supported as schedule mode."); } + + if(monitoringEnabled) + { + stateMonitorActor = StreamCheckpointCoordinator.spawn(parentContext, this, + Duration.create(monitoringInterval, TimeUnit.MILLISECONDS)); + } } else { throw new IllegalStateException("Job may only be scheduled from state " + JobStatus.CREATED); @@ -532,9 +567,9 @@ public class ExecutionGraph implements Serializable { } } - public void loadOperatorStates(Map ,OperatorState> states) + public synchronized void loadOperatorStates(Map , Map>> states) { - for(Map.Entry ,OperatorState> state : states.entrySet()) + for(Map.Entry , Map>> state : states.entrySet()) { tasks.get(state.getKey()._1()).getTaskVertices()[state.getKey()._2()].setOperatorState(state.getValue()); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java index b7f962a79ea4651046e5870607c876d70838b07d..41d34d526545e8d235cea108cc7b0a926fa298e7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java @@ -48,6 +48,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.concurrent.CopyOnWriteArrayList; import static com.google.common.base.Preconditions.checkElementIndex; @@ -90,7 +91,7 @@ public class ExecutionVertex implements Serializable { private volatile boolean scheduleLocalOnly; - private OperatorState operatorState; + private Map> operatorState; // -------------------------------------------------------------------------------------------- @@ -198,11 +199,11 @@ public class ExecutionVertex implements Serializable { return currentExecution.getAssignedResourceLocation(); } - public void setOperatorState(OperatorState operatorState) { + public void setOperatorState(Map> operatorState) { this.operatorState = operatorState; } - public OperatorState getOperatorState() { + public Map> getOperatorState() { return operatorState; } @@ -393,7 +394,7 @@ public class ExecutionVertex implements Serializable { if(operatorState!=null) { - execution.setOperatorState(operatorState); + execution.setOperatorStates(operatorState); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java index 0cf2f5e34f649fea802a0be7e0c05c9205ba45c9..4b398e552801bc0171808e4f8607d849230ea5d2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java @@ -75,6 +75,14 @@ public class JobGraph implements Serializable { private ScheduleMode scheduleMode = ScheduleMode.FROM_SOURCES; + public enum JobType {STREAMING, BATCH} + + private JobType jobType = JobType.BATCH; + + private boolean monitoringEnabled = false; + + private long monitorInterval = 10000; + // -------------------------------------------------------------------------------------------- /** @@ -253,6 +261,31 @@ public class JobGraph implements Serializable { return this.taskVertices.size(); } + + public void setJobType(JobType jobType) { + this.jobType = jobType; + } + + public JobType getJobType() { + return jobType; + } + + public void setMonitoringEnabled(boolean monitoringEnabled) { + this.monitoringEnabled = monitoringEnabled; + } + + public boolean isMonitoringEnabled() { + return monitoringEnabled; + } + + public void setMonitorInterval(long monitorInterval) { + this.monitorInterval = monitorInterval; + } + + public long getMonitorInterval() { + return monitorInterval; + } + /** * Searches for a vertex with a matching ID and returns it. * diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/OperatorStateCarrier.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/OperatorStateCarrier.java index 6ea4f27fd669f465ded4bed5b0c1937a01359331..e8b6d6b23760c907c6b5788c6fa79e3514e578aa 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/OperatorStateCarrier.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/OperatorStateCarrier.java @@ -20,8 +20,10 @@ package org.apache.flink.runtime.jobgraph.tasks; import org.apache.flink.runtime.state.OperatorState; +import java.util.Map; + public interface OperatorStateCarrier { - public void injectState(OperatorState state); + public void injectStates(Map> state); } diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index 97a60990820aacb75fb7e815ce19ebc3fae1dd2c..c3506802f5476a56a3f0db1122c96bb7663e1c53 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -104,7 +104,6 @@ class JobManager(val configuration: Configuration, /** List of current jobs running jobs */ val currentJobs = scala.collection.mutable.HashMap[JobID, (ExecutionGraph, JobInfo)]() - val barrierMonitors = scala.collection.mutable.HashMap[JobID, ActorRef]() /** * Run when the job manager is started. Simply logs an informational message. @@ -284,78 +283,43 @@ class JobManager(val configuration: Configuration, if(newJobStatus.isTerminalState) { jobInfo.end = timeStamp - // is the client waiting for the job result? + // is the client waiting for the job result? newJobStatus match { case JobStatus.FINISHED => val accumulatorResults = accumulatorManager.getJobAccumulatorResults(jobID) - jobInfo.client ! JobResultSuccess(jobID, jobInfo.duration, accumulatorResults) + jobInfo.client ! JobResultSuccess(jobID,jobInfo.duration,accumulatorResults) case JobStatus.CANCELED => jobInfo.client ! Failure(new JobCancellationException(jobID, - "Job was cancelled.", error)) + "Job was cancelled.",error)) case JobStatus.FAILED => jobInfo.client ! Failure(new JobExecutionException(jobID, - "Job execution failed.", error)) + "Job execution failed.",error)) case x => - val exception = new JobExecutionException(jobID, s"$x is not a " + - "terminal state.") + val exception = new JobExecutionException(jobID,s"$x is not a " + + "terminal state.") jobInfo.client ! Failure(exception) throw exception } + + removeJob(jobID) - barrierMonitors.get(jobID) match { - case Some(monitor) => - newJobStatus match{ - case JobStatus.FINISHED | JobStatus.CANCELED => - monitor ! PoisonPill - barrierMonitors.remove(jobID) - case JobStatus.FAILING => - monitor ! JobStateRequest - } - case None => - removeJob(jobID) - } - } - else { - newJobStatus match { - case JobStatus.RUNNING => currentJobs.get(jobID) match { - case Some((executionGraph, _)) => - //FIXME this is just a fast n dirty check for determining streaming jobs - if (executionGraph.getScheduleMode == ScheduleMode.ALL) { - barrierMonitors.get(jobID) match { - case None => - barrierMonitors += jobID -> StreamStateMonitor.props(context, executionGraph) - } - } - case None => - log.error("Cannot create state monitor for job ID {}.", jobID) - new IllegalStateException("Cannot find execution graph for job ID " + jobID) - } - } } case None => removeJob(jobID) - } + } case msg: BarrierAck => - barrierMonitors.get(msg.jobID) match { - case Some(monitor) => monitor ! msg + currentJobs.get(msg.jobID) match { + case Some(jobExecution) => + jobExecution._1.getStateMonitorActor forward msg case None => } case msg: StateBarrierAck => - barrierMonitors.get(msg.jobID) match { - case Some(monitor) => monitor ! msg - case None => - } - - case msg: JobStateResponse => - //inject initial states and restart the job currentJobs.get(msg.jobID) match { case Some(jobExecution) => - import scala.collection.JavaConverters._ - jobExecution._1.loadOperatorStates(msg.opStates.asJava) - jobExecution._1.restart() + jobExecution._1.getStateMonitorActor forward msg case None => - } + } case ScheduleOrUpdateConsumers(jobId, executionId, partitionIndex) => currentJobs.get(jobId) match { @@ -522,6 +486,9 @@ class JobManager(val configuration: Configuration, executionGraph.setDelayBeforeRetrying(delayBetweenRetries) executionGraph.setScheduleMode(jobGraph.getScheduleMode) executionGraph.setQueuedSchedulingAllowed(jobGraph.getAllowQueuedScheduling) + + executionGraph.setMonitoringEnabled(jobGraph.isMonitoringEnabled) + executionGraph.setMonitoringInterval(jobGraph.getMonitorInterval) // initialize the vertices that have a master initialization hook // file output formats create directories here, input formats create splits @@ -564,6 +531,9 @@ class JobManager(val configuration: Configuration, log.debug(s"Successfully created execution graph from job graph ${jobId} (${jobName}).") } + // give an actorContext + executionGraph.setParentContext(context); + // get notified about job status changes executionGraph.registerJobStatusListener(self) diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/StreamStateMonitor.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/StreamCheckpointCoordinator.scala similarity index 66% rename from flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/StreamStateMonitor.scala rename to flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/StreamCheckpointCoordinator.scala index 65840f95668d7049a5ad66f4ae757dbbee21b733..7ab6a6f4f6f41179f53544a33efaa01610b29fcf 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/StreamStateMonitor.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/StreamCheckpointCoordinator.scala @@ -18,28 +18,30 @@ package org.apache.flink.runtime.jobmanager +import java.lang.Long + import akka.actor._ import org.apache.flink.runtime.ActorLogMessages -import org.apache.flink.runtime.executiongraph.{ExecutionAttemptID,ExecutionGraph,ExecutionVertex} -import org.apache.flink.runtime.jobgraph.{JobID,JobVertexID} +import org.apache.flink.runtime.execution.ExecutionState.RUNNING +import org.apache.flink.runtime.executiongraph.{ExecutionAttemptID, ExecutionGraph, ExecutionVertex} +import org.apache.flink.runtime.jobgraph.JobStatus._ +import org.apache.flink.runtime.jobgraph.{JobID, JobVertexID} import org.apache.flink.runtime.state.OperatorState -import java.lang.Long import scala.collection.JavaConversions._ import scala.collection.immutable.TreeMap import scala.concurrent.ExecutionContext.Implicits.global -import scala.concurrent.duration.{FiniteDuration,_} - +import scala.concurrent.duration.{FiniteDuration, _} -object StreamStateMonitor { +object StreamCheckpointCoordinator { - def props(context: ActorContext,executionGraph: ExecutionGraph, + def spawn(context: ActorContext,executionGraph: ExecutionGraph, interval: FiniteDuration = 5 seconds): ActorRef = { val vertices: Iterable[ExecutionVertex] = getExecutionVertices(executionGraph) - val monitor = context.system.actorOf(Props(new StreamStateMonitor(executionGraph, + val monitor = context.system.actorOf(Props(new StreamCheckpointCoordinator(executionGraph, vertices,vertices.map(x => ((x.getJobVertex.getJobVertexId,x.getParallelSubtaskIndex), - List.empty[Long])).toMap,Map(),interval,0L,-1L))) + List.empty[Long])).toMap, Map() ,interval,0L,-1L))) monitor ! InitBarrierScheduler monitor } @@ -51,41 +53,51 @@ object StreamStateMonitor { } } -class StreamStateMonitor(val executionGraph: ExecutionGraph, +class StreamCheckpointCoordinator(val executionGraph: ExecutionGraph, val vertices: Iterable[ExecutionVertex], var acks: Map[(JobVertexID,Int),List[Long]], - var states: Map[(JobVertexID, Integer, Long), OperatorState[_]], + var states: Map[(JobVertexID, Integer, Long), + java.util.Map[String,OperatorState[_]]], val interval: FiniteDuration,var curId: Long,var ackId: Long) extends Actor with ActorLogMessages with ActorLogging { - + override def receiveWithLogMessages: Receive = { case InitBarrierScheduler => context.system.scheduler.schedule(interval,interval,self,BarrierTimeout) - context.system.scheduler.schedule(2 * interval,2 * interval,self,TriggerBarrierCompaction) + context.system.scheduler.schedule(2 * interval,2 * interval,self,CompactAndUpdate) log.debug("[FT-MONITOR] Started Stream State Monitor for job {}{}", executionGraph.getJobID,executionGraph.getJobName) case BarrierTimeout => - curId += 1 - log.debug("[FT-MONITOR] Sending Barrier to vertices of Job " + executionGraph.getJobName) - vertices.filter(v => v.getJobVertex.getJobVertex.isInputVertex).foreach(vertex - => vertex.getCurrentAssignedResource.getInstance.getTaskManager - ! BarrierReq(vertex.getCurrentExecutionAttempt.getAttemptId,curId)) + executionGraph.getState match { + case FAILED | CANCELED | FINISHED => + log.debug("[FT-MONITOR] Stopping monitor for terminated job {}", executionGraph.getJobID) + self ! PoisonPill + case _ => + curId += 1 + log.debug("[FT-MONITOR] Sending Barrier to vertices of Job " + executionGraph.getJobName) + vertices.filter(v => v.getJobVertex.getJobVertex.isInputVertex && + v.getExecutionState == RUNNING).foreach(vertex + => vertex.getCurrentAssignedResource.getInstance.getTaskManager + ! BarrierReq(vertex.getCurrentExecutionAttempt.getAttemptId,curId)) + } case StateBarrierAck(jobID, jobVertexID, instanceID, checkpointID, opState) => - states += (jobVertexID, instanceID, checkpointID) -> opState + states += (jobVertexID, instanceID, checkpointID) -> opState self ! BarrierAck(jobID, jobVertexID, instanceID, checkpointID) case BarrierAck(jobID, jobVertexID,instanceID,checkpointID) => - acks.get(jobVertexID,instanceID) match { - case Some(acklist) => - acks += (jobVertexID,instanceID) -> (checkpointID :: acklist) - case None => - } - log.debug(acks.toString) + acks.get(jobVertexID,instanceID) match { + case Some(acklist) => + acks += (jobVertexID,instanceID) -> (checkpointID :: acklist) + case None => + } + log.debug(acks.toString) - case TriggerBarrierCompaction => + + + case CompactAndUpdate => val barrierCount = acks.values.foldLeft(TreeMap[Long,Int]().withDefaultValue(0))((dict,myList) => myList.foldLeft(dict)((dict2,elem) => dict2.updated(elem,dict2(elem) + 1))) val keysToKeep = barrierCount.filter(_._2 == acks.size).keys @@ -93,29 +105,24 @@ class StreamStateMonitor(val executionGraph: ExecutionGraph, acks.keys.foreach(x => acks = acks.updated(x,acks(x).filter(_ >= ackId))) states = states.filterKeys(_._3 >= ackId) log.debug("[FT-MONITOR] Last global barrier is " + ackId) - - case JobStateRequest => - sender ! JobStateResponse(executionGraph.getJobID, ackId, states) + executionGraph.loadOperatorStates(states) + } + } case class BarrierTimeout() case class InitBarrierScheduler() -case class TriggerBarrierCompaction() - -case class JobStateRequest() - -case class JobStateResponse(jobID: JobID, barrierID: Long, opStates: Map[(JobVertexID, Integer, - Long), OperatorState[_]]) +case class CompactAndUpdate() case class BarrierReq(attemptID: ExecutionAttemptID,checkpointID: Long) case class BarrierAck(jobID: JobID,jobVertexID: JobVertexID,instanceID: Int,checkpointID: Long) case class StateBarrierAck(jobID: JobID, jobVertexID: JobVertexID, instanceID: Integer, - checkpointID: Long, state: OperatorState[_]) + checkpointID: Long, states: java.util.Map[String,OperatorState[_]]) diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala index 497e784a6a30964553827093fe2fe36885e43862..3de917b638a19b007c1fd20c9bd7c44ea5d44826 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala @@ -352,7 +352,8 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo, } case BarrierReq(attemptID, checkpointID) => - log.debug("[FT-TaskManager] Barrier request received for attempt {}", attemptID) + log.debug("[FT-TaskManager] Barrier {} request received for attempt {}", + checkpointID, attemptID) runningTasks.get(attemptID) match { case Some(i) => if (i.getExecutionState == ExecutionState.RUNNING) { @@ -416,15 +417,6 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo, task = new Task(jobID, vertexID, taskIndex, numSubtasks, executionID, tdd.getTaskName, self) - - //inject operator state - if(tdd.getOperatorState != null) - { - val vertex = task.getEnvironment.getInvokable match { - case opStateCarrier: OperatorStateCarrier => - opStateCarrier.injectState(tdd.getOperatorState) - } - } runningTasks.put(executionID, task) match { case Some(_) => throw new RuntimeException( @@ -446,6 +438,15 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo, task.setEnvironment(env) + //inject operator state + if(tdd.getOperatorStates != null) + { + val vertex = task.getEnvironment.getInvokable match { + case opStateCarrier: OperatorStateCarrier => + opStateCarrier.injectStates(tdd.getOperatorStates) + } + } + // register the task with the network stack and profiles networkEnvironment match { case Some(ne) => diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerExample.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerExample.java index d9bb7d36440a29eec1e80ed29afb55b032368e08..dd1221d5e5272da88352d235c9609586eef3f9cf 100644 --- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerExample.java +++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerExample.java @@ -17,14 +17,9 @@ package org.apache.flink.streaming.connectors.kafka; -import org.apache.flink.runtime.state.OperatorState; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.api.KafkaSource; -<<<<<<< HEAD -import org.apache.flink.streaming.connectors.kafka.api.simple.PersistentKafkaSource; -======= ->>>>>>> a62796a... s import org.apache.flink.streaming.connectors.util.JavaDefaultStringSchema; public class KafkaConsumerExample { diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaConsumerIterator.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaConsumerIterator.java index 92d351ae3d5d3cc522163448076ca557ca14d67a..370b3f07175f808078a4b0cfc569aa6dcef0b6dc 100644 --- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaConsumerIterator.java +++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaConsumerIterator.java @@ -17,6 +17,7 @@ package org.apache.flink.streaming.connectors.kafka.api.simple; +import java.io.Serializable; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collections; @@ -36,13 +37,11 @@ import kafka.javaapi.TopicMetadata; import kafka.javaapi.TopicMetadataRequest; import kafka.javaapi.consumer.SimpleConsumer; import kafka.message.MessageAndOffset; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * Iterates the records received from a partition of a Kafka topic as byte arrays. */ -public class KafkaConsumerIterator { +public class KafkaConsumerIterator implements Serializable { private static final long serialVersionUID = 1L; diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaDeserializingConsumerIterator.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaDeserializingConsumerIterator.java index 6ca4c81903bf0b00b49e5be59d02ab0115bb828f..f2af6ca703342cbca551ba89d09c859de418831a 100644 --- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaDeserializingConsumerIterator.java +++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaDeserializingConsumerIterator.java @@ -21,6 +21,7 @@ import org.apache.flink.streaming.connectors.util.DeserializationSchema; public class KafkaDeserializingConsumerIterator extends KafkaConsumerIterator { + private static final long serialVersionUID = 1L; private DeserializationSchema deserializationSchema; public KafkaDeserializingConsumerIterator(String host, int port, String topic, int partition, long waitOnEmptyFetch, diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java index 7c90629e7f91cec289051ec3239d45ec2b3c319b..d813a30263049cba61f9af7fa0e0f4abeb6befae 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java @@ -67,6 +67,7 @@ public class StreamConfig implements Serializable { // DEFAULT VALUES private static final long DEFAULT_TIMEOUT = 100; + public static final String STATE_MONITORING = "STATE_MONITORING"; // CONFIG METHODS @@ -300,6 +301,18 @@ public class StreamConfig implements Serializable { config.setBytes(EDGES_IN_ORDER, SerializationUtils.serialize((Serializable) outEdgeList)); } + + public void setStateMonitoring(boolean stateMonitoring) { + + config.setBoolean(STATE_MONITORING, stateMonitoring); + + } + + public boolean getStateMonitoring() + { + return config.getBoolean(STATE_MONITORING, false); + } + @SuppressWarnings("unchecked") public List> getOutEdgesInOrder(ClassLoader cl) { try { @@ -399,6 +412,7 @@ public class StreamConfig implements Serializable { builder.append("\nInvokable: Missing"); } builder.append("\nBuffer timeout: " + getBufferTimeout()); + builder.append("\nState Monitoring: " + getStateMonitoring()); if (isChainStart() && getChainedOutputs(cl).size() > 0) { builder.append("\n\n\n---------------------\nChained task configs\n---------------------\n"); builder.append(getTransitiveChainedTaskConfigs(cl)).toString(); diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java index 641708e8f83b9a9b22bce8dc13c476afb2e6258b..8334aa1625ecfa6ab0d68e5ebbaca2fa1d91705c 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java @@ -92,6 +92,10 @@ public class StreamGraph extends StreamingPlan { private Set sources; private ExecutionConfig executionConfig; + + private boolean monitoringEnabled; + + private long monitoringInterval = 10000; public StreamGraph(ExecutionConfig executionConfig) { @@ -606,6 +610,22 @@ public class StreamGraph extends StreamingPlan { return operatorNames.get(vertexID); } + public void setMonitoringEnabled(boolean monitoringEnabled) { + this.monitoringEnabled = monitoringEnabled; + } + + public boolean isMonitoringEnabled() { + return monitoringEnabled; + } + + public void setMonitoringInterval(long monitoringInterval) { + this.monitoringInterval = monitoringInterval; + } + + public long getMonitoringInterval() { + return monitoringInterval; + } + @Override public String getStreamingPlanAsJSON() { diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java index c9698e325ed8bc6811fb95f40c3c391ecbc24dd4..b50ac259109a656ea299162afadbf6472f922832 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java @@ -74,7 +74,13 @@ public class StreamingJobGraphGenerator { // Turn lazy scheduling off jobGraph.setScheduleMode(ScheduleMode.ALL); - + jobGraph.setJobType(JobGraph.JobType.STREAMING); + jobGraph.setMonitoringEnabled(streamGraph.isMonitoringEnabled()); + jobGraph.setMonitorInterval(streamGraph.getMonitoringInterval()); + if(jobGraph.isMonitoringEnabled()) + { + jobGraph.setNumberOfExecutionRetries(Integer.MAX_VALUE); + } init(); setChaining(); @@ -211,6 +217,7 @@ public class StreamingJobGraphGenerator { config.setNumberOfOutputs(nonChainableOutputs.size()); config.setOutputs(nonChainableOutputs); config.setChainedOutputs(chainableOutputs); + config.setStateMonitoring(streamGraph.isMonitoringEnabled()); Class vertexClass = streamGraph.getJobVertexClass(vertexID); diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java index 835ce4e08b7e208d7f50616a06ee9d29172a19f7..9cc61313cf08a1e8f53704e17f215a43fd1f71f6 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java @@ -151,6 +151,19 @@ public abstract class StreamExecutionEnvironment { this.bufferTimeout = timeoutMillis; return this; } + + public StreamExecutionEnvironment enableMonitoring(long interval) + { + streamGraph.setMonitoringEnabled(true); + streamGraph.setMonitoringInterval(interval); + return this; + } + + public StreamExecutionEnvironment enableMonitoring() + { + streamGraph.setMonitoringEnabled(true); + return this; + } /** * Sets the maximum time frequency (milliseconds) for the flushing of the diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SocketTextStreamFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SocketTextStreamFunction.java index 67bc128768e5f93089eec23c784e7fc452df9743..d6a5b2b7aae35d684eabfecbc960e90cefd21229 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SocketTextStreamFunction.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SocketTextStreamFunction.java @@ -59,7 +59,6 @@ public class SocketTextStreamFunction extends RichSourceFunction { public void open(Configuration parameters) throws Exception { super.open(parameters); socket = new Socket(); - socket.connect(new InetSocketAddress(hostname, port), CONNECTION_TIMEOUT_TIME); } diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/InputHandler.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/InputHandler.java index a95965c3027f9df20a111981b5ac2be221375021..d766705f7c5c17c532ca296a7e06fce3d69d440f 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/InputHandler.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/InputHandler.java @@ -18,7 +18,6 @@ package org.apache.flink.streaming.api.streamvertex; import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.runtime.event.task.StreamingSuperstep; import org.apache.flink.runtime.io.network.api.reader.MutableReader; import org.apache.flink.runtime.io.network.partition.consumer.InputGate; import org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate; diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java index 82f1329832cafee26d95b3a35b89418c6c3e1f56..fd375f63d3e45ae1b9608a1679e3fe3958ddbff9 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java @@ -25,7 +25,6 @@ import java.util.List; import java.util.Map; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.runtime.event.task.StreamingSuperstep; import org.apache.flink.runtime.io.network.api.writer.RecordWriter; import org.apache.flink.runtime.plugable.SerializationDelegate; import org.apache.flink.streaming.api.StreamConfig; diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java index 24a90d0955564fa0ab800e7bcbada1b297cb4974..5ff47d6c0521ee7916a74a5de94e04230026ee75 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java @@ -20,7 +20,6 @@ package org.apache.flink.streaming.api.streamvertex; import java.io.IOException; import java.util.Map; -import org.apache.flink.runtime.event.task.StreamingSuperstep; import org.apache.flink.runtime.event.task.TaskEvent; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; @@ -65,6 +64,8 @@ public class StreamVertex extends AbstractInvokable implements StreamTa protected ClassLoader userClassLoader; private EventListener superstepListener; + + private boolean onRecovery; public StreamVertex() { userInvokable = null; @@ -88,7 +89,10 @@ public class StreamVertex extends AbstractInvokable implements StreamTa protected void initialize() { this.userClassLoader = getUserCodeClassLoader(); this.configuration = new StreamConfig(getTaskConfiguration()); - this.states = configuration.getOperatorStates(userClassLoader); + if(!onRecovery) + { + this.states = configuration.getOperatorStates(userClassLoader); + } this.context = createRuntimeContext(getEnvironment().getTaskName(), this.states); } @@ -122,12 +126,12 @@ public class StreamVertex extends AbstractInvokable implements StreamTa @Override public void confirmBarrier(long barrierID) { - if(states != null && states.containsKey("kafka")) + if(configuration.getStateMonitoring() && states != null) { getEnvironment().getJobManager().tell( new StateBarrierAck(getEnvironment().getJobID(), getEnvironment().getJobVertexId(), context.getIndexOfThisSubtask(), - barrierID, states.get("kafka")), ActorRef.noSender()); + barrierID, states), ActorRef.noSender()); } else { @@ -290,8 +294,9 @@ public class StreamVertex extends AbstractInvokable implements StreamTa } @Override - public void injectState(OperatorState state) { - states.put("kafka", state); + public void injectStates(Map> states) { + onRecovery = true; + this.states.putAll(states); } diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingRuntimeContext.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingRuntimeContext.java index 60dfe7a73fc13d53e442f808722421d77fce3189..492d2a06aea3f3dd7aee3b38c218c605a47c8330 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingRuntimeContext.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingRuntimeContext.java @@ -71,7 +71,14 @@ public class StreamingRuntimeContext extends RuntimeUDFContext { if (state == null) { throw new RuntimeException("Cannot register null state"); } else { - operatorStates.put(name, state); + if(operatorStates.containsKey(name)) + { + throw new RuntimeException("State is already registered"); + } + else + { + operatorStates.put(name, state); + } } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/event/task/StreamingSuperstep.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingSuperstep.java similarity index 92% rename from flink-runtime/src/main/java/org/apache/flink/runtime/event/task/StreamingSuperstep.java rename to flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingSuperstep.java index e35eb28a0d9563645cac6e75c8a963870fcd37e1..557c6363156fe3ec9fb1c6961d06988fea00a2b4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/event/task/StreamingSuperstep.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingSuperstep.java @@ -16,12 +16,13 @@ * limitations under the License. */ -package org.apache.flink.runtime.event.task; +package org.apache.flink.streaming.api.streamvertex; import java.io.IOException; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.runtime.event.task.TaskEvent; public class StreamingSuperstep extends TaskEvent { diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/BarrierBuffer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/BarrierBuffer.java index 3ff718adb6ed8efafb2a57312ab297d0346beca0..7dfccb0604b6fcf6d0480812aef95c0b7a573f04 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/BarrierBuffer.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/BarrierBuffer.java @@ -23,10 +23,10 @@ import java.util.LinkedList; import java.util.Queue; import java.util.Set; -import org.apache.flink.runtime.event.task.StreamingSuperstep; import org.apache.flink.runtime.io.network.api.reader.AbstractReader; import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent; import org.apache.flink.runtime.io.network.partition.consumer.InputGate; +import org.apache.flink.streaming.api.streamvertex.StreamingSuperstep; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -53,6 +53,12 @@ public class BarrierBuffer { this.reader = reader; } + /** + * Starts the next superstep + * + * @param superstep + * The next superstep + */ protected void startSuperstep(StreamingSuperstep superstep) { this.currentSuperstep = superstep; this.superstepStarted = true; @@ -61,30 +67,53 @@ public class BarrierBuffer { } } + /** + * Buffers a bufferOrEvent received from a blocked channel + * + * @param bufferOrEvent + * bufferOrEvent to buffer + */ protected void store(BufferOrEvent bufferOrEvent) { nonprocessed.add(bufferOrEvent); } + /** + * Get then next non-blocked non-processed BufferOrEvent. Returns null if + * not available. + */ protected BufferOrEvent getNonProcessed() { - BufferOrEvent nextNonprocessed = null; - while (nextNonprocessed == null && !nonprocessed.isEmpty()) { - nextNonprocessed = nonprocessed.poll(); + BufferOrEvent nextNonprocessed; + while ((nextNonprocessed = nonprocessed.poll()) != null) { if (isBlocked(nextNonprocessed.getChannelIndex())) { blockedNonprocessed.add(nextNonprocessed); - nextNonprocessed = null; + } else { + return nextNonprocessed; } } - return nextNonprocessed; + return null; } + /** + * Checks whether a given channel index is blocked for this inputgate + * + * @param channelIndex + * The channel index to check + */ protected boolean isBlocked(int channelIndex) { return blockedChannels.contains(channelIndex); } + /** + * Checks whether all channels are blocked meaning that barriers are + * received from all channels + */ protected boolean isAllBlocked() { return blockedChannels.size() == totalNumberOfInputChannels; } + /** + * Returns the next non-blocked BufferOrEvent. This is a blocking operator. + */ public BufferOrEvent getNextNonBlocked() throws IOException, InterruptedException { // If there are non-processed buffers from the previously blocked ones, // we get the next @@ -99,7 +128,7 @@ public class BarrierBuffer { bufferOrEvent = inputGate.getNextBufferOrEvent(); if (isBlocked(bufferOrEvent.getChannelIndex())) { // If channel blocked we just store it - store(bufferOrEvent); + blockedNonprocessed.add(bufferOrEvent); } else { return bufferOrEvent; } @@ -107,6 +136,12 @@ public class BarrierBuffer { } } + /** + * Blocks the given channel index, from which a barrier has been received. + * + * @param channelIndex + * The channel index to block. + */ protected void blockChannel(int channelIndex) { if (!blockedChannels.contains(channelIndex)) { blockedChannels.add(channelIndex); @@ -122,16 +157,27 @@ public class BarrierBuffer { } } + /** + * Releases the blocks on all channels. + */ protected void releaseBlocks() { - nonprocessed.addAll(blockedNonprocessed); + if (!nonprocessed.isEmpty()) { + // sanity check + throw new RuntimeException("Error in barrier buffer logic"); + } + nonprocessed = blockedNonprocessed; + blockedNonprocessed = new LinkedList(); blockedChannels.clear(); - blockedNonprocessed.clear(); superstepStarted = false; if (LOG.isDebugEnabled()) { LOG.debug("All barriers received, blocks released"); } } + /** + * Method that is executed once the barrier has been received from all + * channels. + */ protected void actOnAllBlocked() { if (LOG.isDebugEnabled()) { LOG.debug("Publishing barrier to the vertex"); @@ -140,10 +186,12 @@ public class BarrierBuffer { releaseBlocks(); } - public String toString() { - return blockedChannels.toString(); - } - + /** + * Processes a streaming superstep event + * + * @param bufferOrEvent + * The BufferOrEvent containing the event + */ public void processSuperstep(BufferOrEvent bufferOrEvent) { StreamingSuperstep superstep = (StreamingSuperstep) bufferOrEvent.getEvent(); if (!superstepStarted) { diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoRecordReader.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoRecordReader.java index 6a1f624f6c38d131363d22b4afe589c12990ab2d..6c91f4d542a53daf3ddbe8c1cf7f8ac218c9e7de 100755 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoRecordReader.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoRecordReader.java @@ -19,11 +19,9 @@ package org.apache.flink.streaming.io; import java.io.IOException; import java.util.LinkedList; -import java.util.Queue; import java.util.concurrent.LinkedBlockingDeque; import org.apache.flink.core.io.IOReadableWritable; -import org.apache.flink.runtime.event.task.StreamingSuperstep; import org.apache.flink.runtime.io.network.api.reader.AbstractReader; import org.apache.flink.runtime.io.network.api.reader.MutableRecordReader; import org.apache.flink.runtime.io.network.api.serialization.AdaptiveSpanningRecordDeserializer; @@ -33,6 +31,7 @@ import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent; import org.apache.flink.runtime.io.network.partition.consumer.InputGate; import org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate; import org.apache.flink.runtime.util.event.EventListener; +import org.apache.flink.streaming.api.streamvertex.StreamingSuperstep; /** * A CoRecordReader wraps {@link MutableRecordReader}s of two different input @@ -66,8 +65,6 @@ public class CoRecordReader unprocessedIndices = new LinkedList(); - public CoRecordReader(InputGate inputgate1, InputGate inputgate2) { super(new UnionInputGate(inputgate1, inputgate2)); @@ -109,14 +106,14 @@ public class CoRecordReader extends AbstractReader implements ReaderBase { + @SuppressWarnings("unused") private static final Logger LOG = LoggerFactory.getLogger(StreamingAbstractRecordReader.class); private final RecordDeserializer[] recordDeserializers; @@ -56,6 +57,7 @@ public abstract class StreamingAbstractRecordReader 0) { out.collect(new Tuple2(token, 1)); }