提交 37390d63 编写于 作者: P Paris Carbone 提交者: Stephan Ewen

[FLINK-1638] [streaming] At-Least once monitoring semantics added and bug fixes

Fault Tolerance monitor suicide on ExecutionGraph terminal state

Forwarding messages to StreamCheckpointCoordinator
上级 ed5ba95d
......@@ -28,6 +28,7 @@ import org.apache.flink.runtime.state.OperatorState;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
......@@ -78,7 +79,7 @@ public final class TaskDeploymentDescriptor implements Serializable {
/** The list of JAR files required to run this task. */
private final List<BlobKey> requiredJarFiles;
private OperatorState operatorState;
private Map<String, OperatorState<?>> operatorStates;
/**
* Constructs a task deployment descriptor.
......@@ -128,13 +129,13 @@ public final class TaskDeploymentDescriptor implements Serializable {
Configuration taskConfiguration, String invokableClassName,
List<PartitionDeploymentDescriptor> producedPartitions,
List<PartitionConsumerDeploymentDescriptor> consumedPartitions,
List<BlobKey> requiredJarFiles, int targetSlotNumber, OperatorState operatorState) {
List<BlobKey> requiredJarFiles, int targetSlotNumber, Map<String,OperatorState<?>> operatorStates) {
this(jobID, vertexID, executionId, taskName, indexInSubtaskGroup, numberOfSubtasks,
jobConfiguration, taskConfiguration, invokableClassName, producedPartitions,
consumedPartitions, requiredJarFiles, targetSlotNumber);
setOperatorState(operatorState);
setOperatorStates(operatorStates);
}
/**
......@@ -243,11 +244,11 @@ public final class TaskDeploymentDescriptor implements Serializable {
strProducedPartitions, strConsumedPartitions);
}
public void setOperatorState(OperatorState operatorState) {
this.operatorState = operatorState;
public void setOperatorStates(Map<String,OperatorState<?>> operatorStates) {
this.operatorStates = operatorStates;
}
public OperatorState getOperatorState() {
return operatorState;
public Map<String, OperatorState<?>> getOperatorStates() {
return operatorStates;
}
}
......@@ -56,6 +56,7 @@ import scala.concurrent.duration.FiniteDuration;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeoutException;
......@@ -123,7 +124,7 @@ public class Execution implements Serializable {
private volatile InstanceConnectionInfo assignedResourceLocation; // for the archived execution
private OperatorState operatorState;
private Map<String,OperatorState<?>> operatorStates;
// --------------------------------------------------------------------------------------------
......@@ -857,11 +858,11 @@ public class Execution implements Serializable {
(assignedResource == null ? "(unassigned)" : assignedResource.toString()), state);
}
public void setOperatorState(OperatorState operatorState) {
this.operatorState = operatorState;
public void setOperatorStates(Map<String,OperatorState<?>> operatorStates) {
this.operatorStates = operatorStates;
}
public OperatorState getOperatorState() {
return operatorState;
public Map<String,OperatorState<?>> getOperatorStates() {
return operatorStates;
}
}
......@@ -18,8 +18,8 @@
package org.apache.flink.runtime.executiongraph;
import akka.actor.ActorContext;
import akka.actor.ActorRef;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.akka.AkkaUtils;
......@@ -31,6 +31,7 @@ import org.apache.flink.runtime.jobgraph.JobID;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.ScheduleMode;
import org.apache.flink.runtime.jobmanager.StreamCheckpointCoordinator;
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
import org.apache.flink.runtime.messages.ExecutionGraphMessages;
import org.apache.flink.runtime.state.OperatorState;
......@@ -38,8 +39,8 @@ import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.util.ExceptionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple3;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
import java.io.Serializable;
......@@ -52,6 +53,7 @@ import java.util.NoSuchElementException;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import static akka.dispatch.Futures.future;
......@@ -118,6 +120,14 @@ public class ExecutionGraph implements Serializable {
private boolean allowQueuedScheduling = true;
private ActorContext parentContext;
private ActorRef stateMonitorActor;
private boolean monitoringEnabled;
private long monitoringInterval = 10000;
private ScheduleMode scheduleMode = ScheduleMode.FROM_SOURCES;
......@@ -159,6 +169,18 @@ public class ExecutionGraph implements Serializable {
}
// --------------------------------------------------------------------------------------------
public void setStateMonitorActor(ActorRef stateMonitorActor) {
this.stateMonitorActor = stateMonitorActor;
}
public ActorRef getStateMonitorActor() {
return stateMonitorActor;
}
public void setParentContext(ActorContext parentContext) {
this.parentContext = parentContext;
}
public void setNumberOfRetriesLeft(int numberOfRetriesLeft) {
if (numberOfRetriesLeft < -1) {
......@@ -214,6 +236,14 @@ public class ExecutionGraph implements Serializable {
}
}
public void setMonitoringEnabled(boolean monitoringEnabled) {
this.monitoringEnabled = monitoringEnabled;
}
public void setMonitoringInterval(long monitoringInterval) {
this.monitoringInterval = monitoringInterval;
}
/**
* Returns a list of BLOB keys referring to the JAR files required to run this job
* @return list of BLOB keys referring to the JAR files required to run this job
......@@ -361,12 +391,17 @@ public class ExecutionGraph implements Serializable {
for (ExecutionJobVertex ejv : getVerticesTopologically()) {
ejv.scheduleAll(scheduler, allowQueuedScheduling);
}
break;
case BACKTRACKING:
throw new JobException("BACKTRACKING is currently not supported as schedule mode.");
}
if(monitoringEnabled)
{
stateMonitorActor = StreamCheckpointCoordinator.spawn(parentContext, this,
Duration.create(monitoringInterval, TimeUnit.MILLISECONDS));
}
}
else {
throw new IllegalStateException("Job may only be scheduled from state " + JobStatus.CREATED);
......@@ -532,9 +567,9 @@ public class ExecutionGraph implements Serializable {
}
}
public void loadOperatorStates(Map<Tuple3<JobVertexID, Integer, Long> ,OperatorState<?>> states)
public synchronized void loadOperatorStates(Map<Tuple3<JobVertexID, Integer, Long> , Map<String,OperatorState<?>>> states)
{
for(Map.Entry<Tuple3<JobVertexID, Integer, Long> ,OperatorState<?>> state : states.entrySet())
for(Map.Entry<Tuple3<JobVertexID, Integer, Long> , Map<String,OperatorState<?>>> state : states.entrySet())
{
tasks.get(state.getKey()._1()).getTaskVertices()[state.getKey()._2()].setOperatorState(state.getValue());
}
......
......@@ -48,6 +48,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import static com.google.common.base.Preconditions.checkElementIndex;
......@@ -90,7 +91,7 @@ public class ExecutionVertex implements Serializable {
private volatile boolean scheduleLocalOnly;
private OperatorState operatorState;
private Map<String,OperatorState<?>> operatorState;
// --------------------------------------------------------------------------------------------
......@@ -198,11 +199,11 @@ public class ExecutionVertex implements Serializable {
return currentExecution.getAssignedResourceLocation();
}
public void setOperatorState(OperatorState operatorState) {
public void setOperatorState(Map<String,OperatorState<?>> operatorState) {
this.operatorState = operatorState;
}
public OperatorState getOperatorState() {
public Map<String,OperatorState<?>> getOperatorState() {
return operatorState;
}
......@@ -393,7 +394,7 @@ public class ExecutionVertex implements Serializable {
if(operatorState!=null)
{
execution.setOperatorState(operatorState);
execution.setOperatorStates(operatorState);
}
}
......
......@@ -75,6 +75,14 @@ public class JobGraph implements Serializable {
private ScheduleMode scheduleMode = ScheduleMode.FROM_SOURCES;
public enum JobType {STREAMING, BATCH}
private JobType jobType = JobType.BATCH;
private boolean monitoringEnabled = false;
private long monitorInterval = 10000;
// --------------------------------------------------------------------------------------------
/**
......@@ -253,6 +261,31 @@ public class JobGraph implements Serializable {
return this.taskVertices.size();
}
public void setJobType(JobType jobType) {
this.jobType = jobType;
}
public JobType getJobType() {
return jobType;
}
public void setMonitoringEnabled(boolean monitoringEnabled) {
this.monitoringEnabled = monitoringEnabled;
}
public boolean isMonitoringEnabled() {
return monitoringEnabled;
}
public void setMonitorInterval(long monitorInterval) {
this.monitorInterval = monitorInterval;
}
public long getMonitorInterval() {
return monitorInterval;
}
/**
* Searches for a vertex with a matching ID and returns it.
*
......
......@@ -20,8 +20,10 @@ package org.apache.flink.runtime.jobgraph.tasks;
import org.apache.flink.runtime.state.OperatorState;
import java.util.Map;
public interface OperatorStateCarrier {
public void injectState(OperatorState state);
public void injectStates(Map<String, OperatorState<?>> state);
}
......@@ -104,7 +104,6 @@ class JobManager(val configuration: Configuration,
/** List of current jobs running jobs */
val currentJobs = scala.collection.mutable.HashMap[JobID, (ExecutionGraph, JobInfo)]()
val barrierMonitors = scala.collection.mutable.HashMap[JobID, ActorRef]()
/**
* Run when the job manager is started. Simply logs an informational message.
......@@ -284,78 +283,43 @@ class JobManager(val configuration: Configuration,
if(newJobStatus.isTerminalState) {
jobInfo.end = timeStamp
// is the client waiting for the job result?
// is the client waiting for the job result?
newJobStatus match {
case JobStatus.FINISHED =>
val accumulatorResults = accumulatorManager.getJobAccumulatorResults(jobID)
jobInfo.client ! JobResultSuccess(jobID, jobInfo.duration, accumulatorResults)
jobInfo.client ! JobResultSuccess(jobID,jobInfo.duration,accumulatorResults)
case JobStatus.CANCELED =>
jobInfo.client ! Failure(new JobCancellationException(jobID,
"Job was cancelled.", error))
"Job was cancelled.",error))
case JobStatus.FAILED =>
jobInfo.client ! Failure(new JobExecutionException(jobID,
"Job execution failed.", error))
"Job execution failed.",error))
case x =>
val exception = new JobExecutionException(jobID, s"$x is not a " +
"terminal state.")
val exception = new JobExecutionException(jobID,s"$x is not a " +
"terminal state.")
jobInfo.client ! Failure(exception)
throw exception
}
removeJob(jobID)
barrierMonitors.get(jobID) match {
case Some(monitor) =>
newJobStatus match{
case JobStatus.FINISHED | JobStatus.CANCELED =>
monitor ! PoisonPill
barrierMonitors.remove(jobID)
case JobStatus.FAILING =>
monitor ! JobStateRequest
}
case None =>
removeJob(jobID)
}
}
else {
newJobStatus match {
case JobStatus.RUNNING => currentJobs.get(jobID) match {
case Some((executionGraph, _)) =>
//FIXME this is just a fast n dirty check for determining streaming jobs
if (executionGraph.getScheduleMode == ScheduleMode.ALL) {
barrierMonitors.get(jobID) match {
case None =>
barrierMonitors += jobID -> StreamStateMonitor.props(context, executionGraph)
}
}
case None =>
log.error("Cannot create state monitor for job ID {}.", jobID)
new IllegalStateException("Cannot find execution graph for job ID " + jobID)
}
}
}
case None =>
removeJob(jobID)
}
}
case msg: BarrierAck =>
barrierMonitors.get(msg.jobID) match {
case Some(monitor) => monitor ! msg
currentJobs.get(msg.jobID) match {
case Some(jobExecution) =>
jobExecution._1.getStateMonitorActor forward msg
case None =>
}
case msg: StateBarrierAck =>
barrierMonitors.get(msg.jobID) match {
case Some(monitor) => monitor ! msg
case None =>
}
case msg: JobStateResponse =>
//inject initial states and restart the job
currentJobs.get(msg.jobID) match {
case Some(jobExecution) =>
import scala.collection.JavaConverters._
jobExecution._1.loadOperatorStates(msg.opStates.asJava)
jobExecution._1.restart()
jobExecution._1.getStateMonitorActor forward msg
case None =>
}
}
case ScheduleOrUpdateConsumers(jobId, executionId, partitionIndex) =>
currentJobs.get(jobId) match {
......@@ -522,6 +486,9 @@ class JobManager(val configuration: Configuration,
executionGraph.setDelayBeforeRetrying(delayBetweenRetries)
executionGraph.setScheduleMode(jobGraph.getScheduleMode)
executionGraph.setQueuedSchedulingAllowed(jobGraph.getAllowQueuedScheduling)
executionGraph.setMonitoringEnabled(jobGraph.isMonitoringEnabled)
executionGraph.setMonitoringInterval(jobGraph.getMonitorInterval)
// initialize the vertices that have a master initialization hook
// file output formats create directories here, input formats create splits
......@@ -564,6 +531,9 @@ class JobManager(val configuration: Configuration,
log.debug(s"Successfully created execution graph from job graph ${jobId} (${jobName}).")
}
// give an actorContext
executionGraph.setParentContext(context);
// get notified about job status changes
executionGraph.registerJobStatusListener(self)
......
......@@ -18,28 +18,30 @@
package org.apache.flink.runtime.jobmanager
import java.lang.Long
import akka.actor._
import org.apache.flink.runtime.ActorLogMessages
import org.apache.flink.runtime.executiongraph.{ExecutionAttemptID,ExecutionGraph,ExecutionVertex}
import org.apache.flink.runtime.jobgraph.{JobID,JobVertexID}
import org.apache.flink.runtime.execution.ExecutionState.RUNNING
import org.apache.flink.runtime.executiongraph.{ExecutionAttemptID, ExecutionGraph, ExecutionVertex}
import org.apache.flink.runtime.jobgraph.JobStatus._
import org.apache.flink.runtime.jobgraph.{JobID, JobVertexID}
import org.apache.flink.runtime.state.OperatorState
import java.lang.Long
import scala.collection.JavaConversions._
import scala.collection.immutable.TreeMap
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration.{FiniteDuration,_}
import scala.concurrent.duration.{FiniteDuration, _}
object StreamStateMonitor {
object StreamCheckpointCoordinator {
def props(context: ActorContext,executionGraph: ExecutionGraph,
def spawn(context: ActorContext,executionGraph: ExecutionGraph,
interval: FiniteDuration = 5 seconds): ActorRef = {
val vertices: Iterable[ExecutionVertex] = getExecutionVertices(executionGraph)
val monitor = context.system.actorOf(Props(new StreamStateMonitor(executionGraph,
val monitor = context.system.actorOf(Props(new StreamCheckpointCoordinator(executionGraph,
vertices,vertices.map(x => ((x.getJobVertex.getJobVertexId,x.getParallelSubtaskIndex),
List.empty[Long])).toMap,Map(),interval,0L,-1L)))
List.empty[Long])).toMap, Map() ,interval,0L,-1L)))
monitor ! InitBarrierScheduler
monitor
}
......@@ -51,41 +53,51 @@ object StreamStateMonitor {
}
}
class StreamStateMonitor(val executionGraph: ExecutionGraph,
class StreamCheckpointCoordinator(val executionGraph: ExecutionGraph,
val vertices: Iterable[ExecutionVertex],
var acks: Map[(JobVertexID,Int),List[Long]],
var states: Map[(JobVertexID, Integer, Long), OperatorState[_]],
var states: Map[(JobVertexID, Integer, Long),
java.util.Map[String,OperatorState[_]]],
val interval: FiniteDuration,var curId: Long,var ackId: Long)
extends Actor with ActorLogMessages with ActorLogging {
override def receiveWithLogMessages: Receive = {
case InitBarrierScheduler =>
context.system.scheduler.schedule(interval,interval,self,BarrierTimeout)
context.system.scheduler.schedule(2 * interval,2 * interval,self,TriggerBarrierCompaction)
context.system.scheduler.schedule(2 * interval,2 * interval,self,CompactAndUpdate)
log.debug("[FT-MONITOR] Started Stream State Monitor for job {}{}",
executionGraph.getJobID,executionGraph.getJobName)
case BarrierTimeout =>
curId += 1
log.debug("[FT-MONITOR] Sending Barrier to vertices of Job " + executionGraph.getJobName)
vertices.filter(v => v.getJobVertex.getJobVertex.isInputVertex).foreach(vertex
=> vertex.getCurrentAssignedResource.getInstance.getTaskManager
! BarrierReq(vertex.getCurrentExecutionAttempt.getAttemptId,curId))
executionGraph.getState match {
case FAILED | CANCELED | FINISHED =>
log.debug("[FT-MONITOR] Stopping monitor for terminated job {}", executionGraph.getJobID)
self ! PoisonPill
case _ =>
curId += 1
log.debug("[FT-MONITOR] Sending Barrier to vertices of Job " + executionGraph.getJobName)
vertices.filter(v => v.getJobVertex.getJobVertex.isInputVertex &&
v.getExecutionState == RUNNING).foreach(vertex
=> vertex.getCurrentAssignedResource.getInstance.getTaskManager
! BarrierReq(vertex.getCurrentExecutionAttempt.getAttemptId,curId))
}
case StateBarrierAck(jobID, jobVertexID, instanceID, checkpointID, opState) =>
states += (jobVertexID, instanceID, checkpointID) -> opState
states += (jobVertexID, instanceID, checkpointID) -> opState
self ! BarrierAck(jobID, jobVertexID, instanceID, checkpointID)
case BarrierAck(jobID, jobVertexID,instanceID,checkpointID) =>
acks.get(jobVertexID,instanceID) match {
case Some(acklist) =>
acks += (jobVertexID,instanceID) -> (checkpointID :: acklist)
case None =>
}
log.debug(acks.toString)
acks.get(jobVertexID,instanceID) match {
case Some(acklist) =>
acks += (jobVertexID,instanceID) -> (checkpointID :: acklist)
case None =>
}
log.debug(acks.toString)
case TriggerBarrierCompaction =>
case CompactAndUpdate =>
val barrierCount = acks.values.foldLeft(TreeMap[Long,Int]().withDefaultValue(0))((dict,myList)
=> myList.foldLeft(dict)((dict2,elem) => dict2.updated(elem,dict2(elem) + 1)))
val keysToKeep = barrierCount.filter(_._2 == acks.size).keys
......@@ -93,29 +105,24 @@ class StreamStateMonitor(val executionGraph: ExecutionGraph,
acks.keys.foreach(x => acks = acks.updated(x,acks(x).filter(_ >= ackId)))
states = states.filterKeys(_._3 >= ackId)
log.debug("[FT-MONITOR] Last global barrier is " + ackId)
case JobStateRequest =>
sender ! JobStateResponse(executionGraph.getJobID, ackId, states)
executionGraph.loadOperatorStates(states)
}
}
case class BarrierTimeout()
case class InitBarrierScheduler()
case class TriggerBarrierCompaction()
case class JobStateRequest()
case class JobStateResponse(jobID: JobID, barrierID: Long, opStates: Map[(JobVertexID, Integer,
Long), OperatorState[_]])
case class CompactAndUpdate()
case class BarrierReq(attemptID: ExecutionAttemptID,checkpointID: Long)
case class BarrierAck(jobID: JobID,jobVertexID: JobVertexID,instanceID: Int,checkpointID: Long)
case class StateBarrierAck(jobID: JobID, jobVertexID: JobVertexID, instanceID: Integer,
checkpointID: Long, state: OperatorState[_])
checkpointID: Long, states: java.util.Map[String,OperatorState[_]])
......
......@@ -352,7 +352,8 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo,
}
case BarrierReq(attemptID, checkpointID) =>
log.debug("[FT-TaskManager] Barrier request received for attempt {}", attemptID)
log.debug("[FT-TaskManager] Barrier {} request received for attempt {}",
checkpointID, attemptID)
runningTasks.get(attemptID) match {
case Some(i) =>
if (i.getExecutionState == ExecutionState.RUNNING) {
......@@ -416,15 +417,6 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo,
task = new Task(jobID, vertexID, taskIndex, numSubtasks, executionID,
tdd.getTaskName, self)
//inject operator state
if(tdd.getOperatorState != null)
{
val vertex = task.getEnvironment.getInvokable match {
case opStateCarrier: OperatorStateCarrier =>
opStateCarrier.injectState(tdd.getOperatorState)
}
}
runningTasks.put(executionID, task) match {
case Some(_) => throw new RuntimeException(
......@@ -446,6 +438,15 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo,
task.setEnvironment(env)
//inject operator state
if(tdd.getOperatorStates != null)
{
val vertex = task.getEnvironment.getInvokable match {
case opStateCarrier: OperatorStateCarrier =>
opStateCarrier.injectStates(tdd.getOperatorStates)
}
}
// register the task with the network stack and profiles
networkEnvironment match {
case Some(ne) =>
......
......@@ -17,14 +17,9 @@
package org.apache.flink.streaming.connectors.kafka;
import org.apache.flink.runtime.state.OperatorState;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.api.KafkaSource;
<<<<<<< HEAD
import org.apache.flink.streaming.connectors.kafka.api.simple.PersistentKafkaSource;
=======
>>>>>>> a62796a... s
import org.apache.flink.streaming.connectors.util.JavaDefaultStringSchema;
public class KafkaConsumerExample {
......
......@@ -17,6 +17,7 @@
package org.apache.flink.streaming.connectors.kafka.api.simple;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
......@@ -36,13 +37,11 @@ import kafka.javaapi.TopicMetadata;
import kafka.javaapi.TopicMetadataRequest;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.message.MessageAndOffset;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Iterates the records received from a partition of a Kafka topic as byte arrays.
*/
public class KafkaConsumerIterator {
public class KafkaConsumerIterator implements Serializable {
private static final long serialVersionUID = 1L;
......
......@@ -21,6 +21,7 @@ import org.apache.flink.streaming.connectors.util.DeserializationSchema;
public class KafkaDeserializingConsumerIterator<IN> extends KafkaConsumerIterator {
private static final long serialVersionUID = 1L;
private DeserializationSchema<IN> deserializationSchema;
public KafkaDeserializingConsumerIterator(String host, int port, String topic, int partition, long waitOnEmptyFetch,
......
......@@ -67,6 +67,7 @@ public class StreamConfig implements Serializable {
// DEFAULT VALUES
private static final long DEFAULT_TIMEOUT = 100;
public static final String STATE_MONITORING = "STATE_MONITORING";
// CONFIG METHODS
......@@ -300,6 +301,18 @@ public class StreamConfig implements Serializable {
config.setBytes(EDGES_IN_ORDER, SerializationUtils.serialize((Serializable) outEdgeList));
}
public void setStateMonitoring(boolean stateMonitoring) {
config.setBoolean(STATE_MONITORING, stateMonitoring);
}
public boolean getStateMonitoring()
{
return config.getBoolean(STATE_MONITORING, false);
}
@SuppressWarnings("unchecked")
public List<Tuple2<Integer, Integer>> getOutEdgesInOrder(ClassLoader cl) {
try {
......@@ -399,6 +412,7 @@ public class StreamConfig implements Serializable {
builder.append("\nInvokable: Missing");
}
builder.append("\nBuffer timeout: " + getBufferTimeout());
builder.append("\nState Monitoring: " + getStateMonitoring());
if (isChainStart() && getChainedOutputs(cl).size() > 0) {
builder.append("\n\n\n---------------------\nChained task configs\n---------------------\n");
builder.append(getTransitiveChainedTaskConfigs(cl)).toString();
......
......@@ -92,6 +92,10 @@ public class StreamGraph extends StreamingPlan {
private Set<Integer> sources;
private ExecutionConfig executionConfig;
private boolean monitoringEnabled;
private long monitoringInterval = 10000;
public StreamGraph(ExecutionConfig executionConfig) {
......@@ -606,6 +610,22 @@ public class StreamGraph extends StreamingPlan {
return operatorNames.get(vertexID);
}
public void setMonitoringEnabled(boolean monitoringEnabled) {
this.monitoringEnabled = monitoringEnabled;
}
public boolean isMonitoringEnabled() {
return monitoringEnabled;
}
public void setMonitoringInterval(long monitoringInterval) {
this.monitoringInterval = monitoringInterval;
}
public long getMonitoringInterval() {
return monitoringInterval;
}
@Override
public String getStreamingPlanAsJSON() {
......
......@@ -74,7 +74,13 @@ public class StreamingJobGraphGenerator {
// Turn lazy scheduling off
jobGraph.setScheduleMode(ScheduleMode.ALL);
jobGraph.setJobType(JobGraph.JobType.STREAMING);
jobGraph.setMonitoringEnabled(streamGraph.isMonitoringEnabled());
jobGraph.setMonitorInterval(streamGraph.getMonitoringInterval());
if(jobGraph.isMonitoringEnabled())
{
jobGraph.setNumberOfExecutionRetries(Integer.MAX_VALUE);
}
init();
setChaining();
......@@ -211,6 +217,7 @@ public class StreamingJobGraphGenerator {
config.setNumberOfOutputs(nonChainableOutputs.size());
config.setOutputs(nonChainableOutputs);
config.setChainedOutputs(chainableOutputs);
config.setStateMonitoring(streamGraph.isMonitoringEnabled());
Class<? extends AbstractInvokable> vertexClass = streamGraph.getJobVertexClass(vertexID);
......
......@@ -151,6 +151,19 @@ public abstract class StreamExecutionEnvironment {
this.bufferTimeout = timeoutMillis;
return this;
}
public StreamExecutionEnvironment enableMonitoring(long interval)
{
streamGraph.setMonitoringEnabled(true);
streamGraph.setMonitoringInterval(interval);
return this;
}
public StreamExecutionEnvironment enableMonitoring()
{
streamGraph.setMonitoringEnabled(true);
return this;
}
/**
* Sets the maximum time frequency (milliseconds) for the flushing of the
......
......@@ -59,7 +59,6 @@ public class SocketTextStreamFunction extends RichSourceFunction<String> {
public void open(Configuration parameters) throws Exception {
super.open(parameters);
socket = new Socket();
socket.connect(new InetSocketAddress(hostname, port), CONNECTION_TIMEOUT_TIME);
}
......
......@@ -18,7 +18,6 @@
package org.apache.flink.streaming.api.streamvertex;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.event.task.StreamingSuperstep;
import org.apache.flink.runtime.io.network.api.reader.MutableReader;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate;
......
......@@ -25,7 +25,6 @@ import java.util.List;
import java.util.Map;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.event.task.StreamingSuperstep;
import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
import org.apache.flink.runtime.plugable.SerializationDelegate;
import org.apache.flink.streaming.api.StreamConfig;
......
......@@ -20,7 +20,6 @@ package org.apache.flink.streaming.api.streamvertex;
import java.io.IOException;
import java.util.Map;
import org.apache.flink.runtime.event.task.StreamingSuperstep;
import org.apache.flink.runtime.event.task.TaskEvent;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
......@@ -65,6 +64,8 @@ public class StreamVertex<IN, OUT> extends AbstractInvokable implements StreamTa
protected ClassLoader userClassLoader;
private EventListener<TaskEvent> superstepListener;
private boolean onRecovery;
public StreamVertex() {
userInvokable = null;
......@@ -88,7 +89,10 @@ public class StreamVertex<IN, OUT> extends AbstractInvokable implements StreamTa
protected void initialize() {
this.userClassLoader = getUserCodeClassLoader();
this.configuration = new StreamConfig(getTaskConfiguration());
this.states = configuration.getOperatorStates(userClassLoader);
if(!onRecovery)
{
this.states = configuration.getOperatorStates(userClassLoader);
}
this.context = createRuntimeContext(getEnvironment().getTaskName(), this.states);
}
......@@ -122,12 +126,12 @@ public class StreamVertex<IN, OUT> extends AbstractInvokable implements StreamTa
@Override
public void confirmBarrier(long barrierID) {
if(states != null && states.containsKey("kafka"))
if(configuration.getStateMonitoring() && states != null)
{
getEnvironment().getJobManager().tell(
new StateBarrierAck(getEnvironment().getJobID(),
getEnvironment().getJobVertexId(), context.getIndexOfThisSubtask(),
barrierID, states.get("kafka")), ActorRef.noSender());
barrierID, states), ActorRef.noSender());
}
else
{
......@@ -290,8 +294,9 @@ public class StreamVertex<IN, OUT> extends AbstractInvokable implements StreamTa
}
@Override
public void injectState(OperatorState state) {
states.put("kafka", state);
public void injectStates(Map<String,OperatorState<?>> states) {
onRecovery = true;
this.states.putAll(states);
}
......
......@@ -71,7 +71,14 @@ public class StreamingRuntimeContext extends RuntimeUDFContext {
if (state == null) {
throw new RuntimeException("Cannot register null state");
} else {
operatorStates.put(name, state);
if(operatorStates.containsKey(name))
{
throw new RuntimeException("State is already registered");
}
else
{
operatorStates.put(name, state);
}
}
}
......
......@@ -16,12 +16,13 @@
* limitations under the License.
*/
package org.apache.flink.runtime.event.task;
package org.apache.flink.streaming.api.streamvertex;
import java.io.IOException;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.runtime.event.task.TaskEvent;
public class StreamingSuperstep extends TaskEvent {
......
......@@ -23,10 +23,10 @@ import java.util.LinkedList;
import java.util.Queue;
import java.util.Set;
import org.apache.flink.runtime.event.task.StreamingSuperstep;
import org.apache.flink.runtime.io.network.api.reader.AbstractReader;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.streaming.api.streamvertex.StreamingSuperstep;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -53,6 +53,12 @@ public class BarrierBuffer {
this.reader = reader;
}
/**
* Starts the next superstep
*
* @param superstep
* The next superstep
*/
protected void startSuperstep(StreamingSuperstep superstep) {
this.currentSuperstep = superstep;
this.superstepStarted = true;
......@@ -61,30 +67,53 @@ public class BarrierBuffer {
}
}
/**
* Buffers a bufferOrEvent received from a blocked channel
*
* @param bufferOrEvent
* bufferOrEvent to buffer
*/
protected void store(BufferOrEvent bufferOrEvent) {
nonprocessed.add(bufferOrEvent);
}
/**
* Get then next non-blocked non-processed BufferOrEvent. Returns null if
* not available.
*/
protected BufferOrEvent getNonProcessed() {
BufferOrEvent nextNonprocessed = null;
while (nextNonprocessed == null && !nonprocessed.isEmpty()) {
nextNonprocessed = nonprocessed.poll();
BufferOrEvent nextNonprocessed;
while ((nextNonprocessed = nonprocessed.poll()) != null) {
if (isBlocked(nextNonprocessed.getChannelIndex())) {
blockedNonprocessed.add(nextNonprocessed);
nextNonprocessed = null;
} else {
return nextNonprocessed;
}
}
return nextNonprocessed;
return null;
}
/**
* Checks whether a given channel index is blocked for this inputgate
*
* @param channelIndex
* The channel index to check
*/
protected boolean isBlocked(int channelIndex) {
return blockedChannels.contains(channelIndex);
}
/**
* Checks whether all channels are blocked meaning that barriers are
* received from all channels
*/
protected boolean isAllBlocked() {
return blockedChannels.size() == totalNumberOfInputChannels;
}
/**
* Returns the next non-blocked BufferOrEvent. This is a blocking operator.
*/
public BufferOrEvent getNextNonBlocked() throws IOException, InterruptedException {
// If there are non-processed buffers from the previously blocked ones,
// we get the next
......@@ -99,7 +128,7 @@ public class BarrierBuffer {
bufferOrEvent = inputGate.getNextBufferOrEvent();
if (isBlocked(bufferOrEvent.getChannelIndex())) {
// If channel blocked we just store it
store(bufferOrEvent);
blockedNonprocessed.add(bufferOrEvent);
} else {
return bufferOrEvent;
}
......@@ -107,6 +136,12 @@ public class BarrierBuffer {
}
}
/**
* Blocks the given channel index, from which a barrier has been received.
*
* @param channelIndex
* The channel index to block.
*/
protected void blockChannel(int channelIndex) {
if (!blockedChannels.contains(channelIndex)) {
blockedChannels.add(channelIndex);
......@@ -122,16 +157,27 @@ public class BarrierBuffer {
}
}
/**
* Releases the blocks on all channels.
*/
protected void releaseBlocks() {
nonprocessed.addAll(blockedNonprocessed);
if (!nonprocessed.isEmpty()) {
// sanity check
throw new RuntimeException("Error in barrier buffer logic");
}
nonprocessed = blockedNonprocessed;
blockedNonprocessed = new LinkedList<BufferOrEvent>();
blockedChannels.clear();
blockedNonprocessed.clear();
superstepStarted = false;
if (LOG.isDebugEnabled()) {
LOG.debug("All barriers received, blocks released");
}
}
/**
* Method that is executed once the barrier has been received from all
* channels.
*/
protected void actOnAllBlocked() {
if (LOG.isDebugEnabled()) {
LOG.debug("Publishing barrier to the vertex");
......@@ -140,10 +186,12 @@ public class BarrierBuffer {
releaseBlocks();
}
public String toString() {
return blockedChannels.toString();
}
/**
* Processes a streaming superstep event
*
* @param bufferOrEvent
* The BufferOrEvent containing the event
*/
public void processSuperstep(BufferOrEvent bufferOrEvent) {
StreamingSuperstep superstep = (StreamingSuperstep) bufferOrEvent.getEvent();
if (!superstepStarted) {
......
......@@ -19,11 +19,9 @@ package org.apache.flink.streaming.io;
import java.io.IOException;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingDeque;
import org.apache.flink.core.io.IOReadableWritable;
import org.apache.flink.runtime.event.task.StreamingSuperstep;
import org.apache.flink.runtime.io.network.api.reader.AbstractReader;
import org.apache.flink.runtime.io.network.api.reader.MutableRecordReader;
import org.apache.flink.runtime.io.network.api.serialization.AdaptiveSpanningRecordDeserializer;
......@@ -33,6 +31,7 @@ import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate;
import org.apache.flink.runtime.util.event.EventListener;
import org.apache.flink.streaming.api.streamvertex.StreamingSuperstep;
/**
* A CoRecordReader wraps {@link MutableRecordReader}s of two different input
......@@ -66,8 +65,6 @@ public class CoRecordReader<T1 extends IOReadableWritable, T2 extends IOReadable
private CoBarrierBuffer barrierBuffer1;
private CoBarrierBuffer barrierBuffer2;
private Queue<Integer> unprocessedIndices = new LinkedList<Integer>();
public CoRecordReader(InputGate inputgate1, InputGate inputgate2) {
super(new UnionInputGate(inputgate1, inputgate2));
......@@ -109,14 +106,14 @@ public class CoRecordReader<T1 extends IOReadableWritable, T2 extends IOReadable
@SuppressWarnings("unchecked")
protected int getNextRecord(T1 target1, T2 target2) throws IOException, InterruptedException {
requestPartitionsOnce();
requestPartitionsOnce();
while (true) {
if (currentReaderIndex == 0) {
if ((bufferReader1.isFinished() && bufferReader2.isFinished())) {
return 0;
}
currentReaderIndex = getNextReaderIndexBlocking();
}
......@@ -234,10 +231,8 @@ public class CoRecordReader<T1 extends IOReadableWritable, T2 extends IOReadable
@Override
public void onEvent(InputGate bufferReader) {
if (bufferReader == bufferReader1) {
System.out.println("Added 1");
availableRecordReaders.add(1);
} else if (bufferReader == bufferReader2) {
System.out.println("Added 2");
availableRecordReaders.add(2);
}
}
......
......@@ -22,7 +22,6 @@ import java.io.IOException;
import org.apache.flink.core.io.IOReadableWritable;
import org.apache.flink.runtime.event.task.AbstractEvent;
import org.apache.flink.runtime.event.task.StreamingSuperstep;
import org.apache.flink.runtime.io.network.api.reader.AbstractReader;
import org.apache.flink.runtime.io.network.api.reader.ReaderBase;
import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
......@@ -31,6 +30,7 @@ import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.io.network.serialization.SpillingAdaptiveSpanningRecordDeserializer;
import org.apache.flink.streaming.api.streamvertex.StreamingSuperstep;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -46,6 +46,7 @@ import org.slf4j.LoggerFactory;
public abstract class StreamingAbstractRecordReader<T extends IOReadableWritable> extends AbstractReader implements
ReaderBase {
@SuppressWarnings("unused")
private static final Logger LOG = LoggerFactory.getLogger(StreamingAbstractRecordReader.class);
private final RecordDeserializer<T>[] recordDeserializers;
......@@ -56,6 +57,7 @@ public abstract class StreamingAbstractRecordReader<T extends IOReadableWritable
private final BarrierBuffer barrierBuffer;
@SuppressWarnings("unchecked")
protected StreamingAbstractRecordReader(InputGate inputGate) {
super(inputGate);
barrierBuffer = new BarrierBuffer(inputGate, this);
......
......@@ -25,7 +25,6 @@ import java.util.List;
import java.util.Queue;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.event.task.StreamingSuperstep;
import org.apache.flink.runtime.event.task.TaskEvent;
import org.apache.flink.runtime.io.network.api.reader.AbstractReader;
import org.apache.flink.runtime.io.network.buffer.Buffer;
......@@ -33,6 +32,7 @@ import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.util.event.EventListener;
import org.apache.flink.streaming.api.streamvertex.StreamingSuperstep;
import org.junit.Test;
public class BarrierBufferTest {
......
......@@ -101,11 +101,6 @@ public class WordCount {
// emit the pairs
for (String token : tokens) {
//FIXME to be removed. added this for test purposes
if("killme".equals(token))
{
throw new Exception("byee");
}
if (token.length() > 0) {
out.collect(new Tuple2<String, Integer>(token, 1));
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册