From 5061edb80df092a2f8719054b0d2bce8c670265c Mon Sep 17 00:00:00 2001 From: Gyula Fora Date: Fri, 20 Feb 2015 22:24:27 +0100 Subject: [PATCH] [FLINK-1638] [streaming] Vertex level fault tolerance and state monitor --- .../event/task/StreamingSuperstep.java | 51 +++++++ .../io/network/api/reader/AbstractReader.java | 4 + .../api/reader/AbstractRecordReader.java | 58 ++++--- .../io/network/api/reader/BarrierBuffer.java | 143 ++++++++++++++++++ .../jobgraph/tasks/BarrierTransceiver.java | 27 ++++ .../jobmanager/StreamStateMonitor.scala | 96 ++++++++++++ .../flink/streaming/api/StreamConfig.java | 9 ++ .../api/StreamingJobGraphGenerator.java | 4 +- .../streaming/api/collector/StreamOutput.java | 4 + .../SingleOutputStreamOperator.java | 4 +- .../api/invokable/StreamInvokable.java | 2 +- .../api/streamvertex/InputHandler.java | 24 ++- .../api/streamvertex/OutputHandler.java | 8 + .../api/streamvertex/StreamVertex.java | 61 +++++++- .../flink/streaming/io/CoRecordReader.java | 8 +- .../streaming/io/IndexedMutableReader.java | 2 +- .../flink/streaming/util/MockContext.java | 2 +- 17 files changed, 464 insertions(+), 43 deletions(-) create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/event/task/StreamingSuperstep.java create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BarrierBuffer.java create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/BarrierTransceiver.java create mode 100644 flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/StreamStateMonitor.scala diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/event/task/StreamingSuperstep.java b/flink-runtime/src/main/java/org/apache/flink/runtime/event/task/StreamingSuperstep.java new file mode 100644 index 00000000000..e35eb28a0d9 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/event/task/StreamingSuperstep.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.event.task; + +import java.io.IOException; + +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; + +public class StreamingSuperstep extends TaskEvent { + + protected long id; + + public StreamingSuperstep() { + + } + + public StreamingSuperstep(long id) { + this.id = id; + } + + @Override + public void write(DataOutputView out) throws IOException { + out.writeLong(id); + } + + @Override + public void read(DataInputView in) throws IOException { + id = in.readLong(); + } + + public long getId() { + return id; + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractReader.java index 1bfca840611..96b6f9927df 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractReader.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractReader.java @@ -110,6 +110,10 @@ public abstract class AbstractReader implements ReaderBase { throw new IOException("Error while handling event of type " + eventType + ": " + t.getMessage(), t); } } + + public void publish(TaskEvent event){ + taskEventHandler.publish(event); + } // ------------------------------------------------------------------------ // Iterations diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java index e70b6eeeaec..cc36438c19a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java @@ -18,24 +18,37 @@ package org.apache.flink.runtime.io.network.api.reader; +import java.io.IOException; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.Queue; +import java.util.Set; + import org.apache.flink.core.io.IOReadableWritable; +import org.apache.flink.runtime.event.task.AbstractEvent; +import org.apache.flink.runtime.event.task.StreamingSuperstep; import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer; import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer.DeserializationResult; import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent; import org.apache.flink.runtime.io.network.partition.consumer.InputGate; import org.apache.flink.runtime.io.network.serialization.SpillingAdaptiveSpanningRecordDeserializer; - -import java.io.IOException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * A record-oriented reader. *

- * This abstract base class is used by both the mutable and immutable record readers. - * - * @param The type of the record that can be read with this record reader. + * This abstract base class is used by both the mutable and immutable record + * readers. + * + * @param + * The type of the record that can be read with this record reader. */ -abstract class AbstractRecordReader extends AbstractReader implements ReaderBase { +abstract class AbstractRecordReader extends AbstractReader implements + ReaderBase { + + private static final Logger LOG = LoggerFactory.getLogger(AbstractRecordReader.class); private final RecordDeserializer[] recordDeserializers; @@ -43,11 +56,15 @@ abstract class AbstractRecordReader extends Abstra private boolean isFinished; + private final BarrierBuffer barrierBuffer; + protected AbstractRecordReader(InputGate inputGate) { super(inputGate); + barrierBuffer = new BarrierBuffer(inputGate, this); // Initialize one deserializer per input channel - this.recordDeserializers = new SpillingAdaptiveSpanningRecordDeserializer[inputGate.getNumberOfInputChannels()]; + this.recordDeserializers = new SpillingAdaptiveSpanningRecordDeserializer[inputGate + .getNumberOfInputChannels()]; for (int i = 0; i < recordDeserializers.length; i++) { recordDeserializers[i] = new SpillingAdaptiveSpanningRecordDeserializer(); } @@ -72,22 +89,27 @@ abstract class AbstractRecordReader extends Abstra } } - final BufferOrEvent bufferOrEvent = inputGate.getNextBufferOrEvent(); + final BufferOrEvent bufferOrEvent = barrierBuffer.getNextNonBlocked(); if (bufferOrEvent.isBuffer()) { currentRecordDeserializer = recordDeserializers[bufferOrEvent.getChannelIndex()]; currentRecordDeserializer.setNextBuffer(bufferOrEvent.getBuffer()); - } - else if (handleEvent(bufferOrEvent.getEvent())) { - if (inputGate.isFinished()) { - isFinished = true; - - return false; + } else { + // Event received + final AbstractEvent event = bufferOrEvent.getEvent(); + + if (event instanceof StreamingSuperstep) { + barrierBuffer.processSuperstep(bufferOrEvent); + } else { + if (handleEvent(event)) { + if (inputGate.isFinished()) { + isFinished = true; + return false; + } else if (hasReachedEndOfSuperstep()) { + return false; + } // else: More data is coming... + } } - else if (hasReachedEndOfSuperstep()) { - - return false; - } // else: More data is coming... } } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BarrierBuffer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BarrierBuffer.java new file mode 100644 index 00000000000..ee317cdc9a0 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BarrierBuffer.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.api.reader; + +import java.io.IOException; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.Queue; +import java.util.Set; + +import org.apache.flink.runtime.event.task.StreamingSuperstep; +import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent; +import org.apache.flink.runtime.io.network.partition.consumer.InputGate; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class BarrierBuffer { + + private static final Logger LOG = LoggerFactory.getLogger(BarrierBuffer.class); + + private Queue bufferOrEvents = new LinkedList(); + private Queue unprocessed = new LinkedList(); + + private Set blockedChannels = new HashSet(); + private int totalNumberOfInputChannels; + + private StreamingSuperstep currentSuperstep; + private boolean receivedSuperstep; + + private boolean blockAll = false; + + private AbstractReader reader; + + private InputGate inputGate; + + public BarrierBuffer(InputGate inputGate, AbstractReader reader) { + this.inputGate = inputGate; + totalNumberOfInputChannels = inputGate.getNumberOfInputChannels(); + this.reader = reader; + } + + private void startSuperstep(StreamingSuperstep superstep) { + this.currentSuperstep = superstep; + this.receivedSuperstep = true; + if (LOG.isDebugEnabled()) { + LOG.debug("Superstep started with id: " + superstep.getId()); + } + } + + private void store(BufferOrEvent bufferOrEvent) { + bufferOrEvents.add(bufferOrEvent); + } + + private BufferOrEvent getNonProcessed() { + return unprocessed.poll(); + } + + private boolean isBlocked(int channelIndex) { + return blockAll || blockedChannels.contains(channelIndex); + } + + private boolean containsNonprocessed() { + return !unprocessed.isEmpty(); + } + + private boolean receivedSuperstep() { + return receivedSuperstep; + } + + public BufferOrEvent getNextNonBlocked() throws IOException, + InterruptedException { + BufferOrEvent bufferOrEvent = null; + + if (containsNonprocessed()) { + bufferOrEvent = getNonProcessed(); + } else { + while (bufferOrEvent == null) { + BufferOrEvent nextBufferOrEvent = inputGate.getNextBufferOrEvent(); + if (isBlocked(nextBufferOrEvent.getChannelIndex())) { + store(nextBufferOrEvent); + } else { + bufferOrEvent = nextBufferOrEvent; + } + } + } + return bufferOrEvent; + } + + private void blockChannel(int channelIndex) { + if (!blockedChannels.contains(channelIndex)) { + blockedChannels.add(channelIndex); + if (LOG.isDebugEnabled()) { + LOG.debug("Channel blocked with index: " + channelIndex); + } + if (blockedChannels.size() == totalNumberOfInputChannels) { + reader.publish(currentSuperstep); + unprocessed.addAll(bufferOrEvents); + bufferOrEvents.clear(); + blockedChannels.clear(); + receivedSuperstep = false; + if (LOG.isDebugEnabled()) { + LOG.debug("All barriers received, blocks released"); + } + } + + } else { + throw new RuntimeException("Tried to block an already blocked channel"); + } + } + + public String toString() { + return blockedChannels.toString(); + } + + public void processSuperstep(BufferOrEvent bufferOrEvent) { + int channelIndex = bufferOrEvent.getChannelIndex(); + if (isBlocked(channelIndex)) { + store(bufferOrEvent); + } else { + StreamingSuperstep superstep = (StreamingSuperstep) bufferOrEvent.getEvent(); + if (!receivedSuperstep()) { + startSuperstep(superstep); + } + blockChannel(channelIndex); + } + } + +} \ No newline at end of file diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/BarrierTransceiver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/BarrierTransceiver.java new file mode 100644 index 00000000000..c56da62c344 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/BarrierTransceiver.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobgraph.tasks; + + +public interface BarrierTransceiver { + + public void broadcastBarrier(long barrierID); + + public void confirmBarrier(long barrierID); + +} diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/StreamStateMonitor.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/StreamStateMonitor.scala new file mode 100644 index 00000000000..a37ddb509ee --- /dev/null +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/StreamStateMonitor.scala @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmanager + +import akka.actor._ +import org.apache.flink.runtime.ActorLogMessages +import org.apache.flink.runtime.executiongraph.{ExecutionAttemptID, ExecutionGraph, ExecutionVertex} +import org.apache.flink.runtime.jobgraph.{JobID, JobVertexID} + +import scala.collection.JavaConversions.mapAsScalaMap +import scala.collection.immutable.TreeMap +import scala.concurrent.ExecutionContext.Implicits.global +import scala.concurrent.duration.{FiniteDuration, _} + + +object StreamStateMonitor { + + def props(context: ActorContext, executionGraph: ExecutionGraph, + interval: FiniteDuration = 5 seconds): ActorRef = { + + val vertices: Iterable[ExecutionVertex] = getExecutionVertices(executionGraph) + val monitor = context.system.actorOf(Props(new StreamStateMonitor(executionGraph, + vertices, vertices.map(x => ((x.getJobVertex.getJobVertexId, x.getParallelSubtaskIndex), List.empty[Long])).toMap, interval, 0L, -1L))) + monitor ! InitBarrierScheduler + monitor + } + + private def getExecutionVertices(executionGraph: ExecutionGraph): Iterable[ExecutionVertex] = { + for ((_, execJobVertex) <- executionGraph.getAllVertices; + execVertex: ExecutionVertex <- execJobVertex.getTaskVertices) + yield execVertex + } +} + +class StreamStateMonitor(val executionGraph: ExecutionGraph, + val vertices: Iterable[ExecutionVertex], var acks: Map[(JobVertexID, Int), List[Long]], + val interval: FiniteDuration, var curId: Long, var ackId: Long) + extends Actor with ActorLogMessages with ActorLogging { + + override def receiveWithLogMessages: Receive = { + case InitBarrierScheduler => + context.system.scheduler.schedule(interval, interval, self, BarrierTimeout) + context.system.scheduler.schedule(2 * interval, 2 * interval, self, UpdateCurrentBarrier) + log.debug("[FT-MONITOR] Started Stream State Monitor for job {}{}", + executionGraph.getJobID, executionGraph.getJobName) + case BarrierTimeout => + curId += 1 + log.debug("[FT-MONITOR] Sending Barrier to vertices of Job " + executionGraph.getJobName) + vertices.filter(v => v.getJobVertex.getJobVertex.isInputVertex).foreach(vertex + => vertex.getCurrentAssignedResource.getInstance.getTaskManager + ! BarrierReq(vertex.getCurrentExecutionAttempt.getAttemptId, curId)) + case BarrierAck(_, jobVertexID, instanceID, checkpointID) => + acks.get(jobVertexID, instanceID) match { + case Some(acklist) => + acks += (jobVertexID, instanceID) -> (checkpointID :: acklist) + case None => + } + log.info(acks.toString) + case UpdateCurrentBarrier => + val barrierCount = acks.values.foldLeft(TreeMap[Long, Int]().withDefaultValue(0))((dict, myList) + => myList.foldLeft(dict)((dict2, elem) => dict2.updated(elem, dict2(elem) + 1))) + val keysToKeep = barrierCount.filter(_._2 == acks.size).keys + ackId = if (!keysToKeep.isEmpty) keysToKeep.max else ackId + acks.keys.foreach(x => acks = acks.updated(x, acks(x).filter(_ >= ackId))) + log.debug("[FT-MONITOR] Last global barrier is " + ackId) + } +} + +case class BarrierTimeout() + +case class InitBarrierScheduler() + +case class UpdateCurrentBarrier() + +case class BarrierReq(attemptID: ExecutionAttemptID, checkpointID: Long) + +case class BarrierAck(jobID: JobID, jobVertexID: JobVertexID, instanceID: Int, checkpointID: Long) + + + diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java index e1362c42065..d464ef107a6 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java @@ -48,6 +48,7 @@ public class StreamConfig implements Serializable { private static final String OUTPUT_NAME = "outputName_"; private static final String PARTITIONER_OBJECT = "partitionerObject_"; private static final String VERTEX_NAME = "vertexID"; + private static final String OPERATOR_NAME = "operatorName"; private static final String ITERATION_ID = "iteration-id"; private static final String OUTPUT_SELECTOR = "outputSelector"; private static final String DIRECTED_EMIT = "directedEmit"; @@ -87,6 +88,14 @@ public class StreamConfig implements Serializable { return config.getInteger(VERTEX_NAME, -1); } + public void setOperatorName(String name) { + config.setString(OPERATOR_NAME, name); + } + + public String getOperatorName() { + return config.getString(OPERATOR_NAME, "Missing"); + } + public void setTypeSerializerIn1(StreamRecordSerializer serializer) { setTypeSerializer(TYPE_SERIALIZER_IN_1, serializer); } diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java index b999c27e0b9..c9698e325ed 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java @@ -188,7 +188,9 @@ public class StreamingJobGraphGenerator { builtVertices.add(vertexID); jobGraph.addVertex(vertex); - return new StreamConfig(vertex.getConfiguration()); + StreamConfig retConfig = new StreamConfig(vertex.getConfiguration()); + retConfig.setOperatorName(chainedNames.get(vertexID)); + return retConfig; } private void setVertexConfig(Integer vertexID, StreamConfig config, diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamOutput.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamOutput.java index a497119cbd2..c3f694eca19 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamOutput.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamOutput.java @@ -19,6 +19,7 @@ package org.apache.flink.streaming.api.collector; import java.io.IOException; +import org.apache.flink.runtime.event.task.TaskEvent; import org.apache.flink.runtime.io.network.api.writer.RecordWriter; import org.apache.flink.runtime.plugable.SerializationDelegate; import org.apache.flink.streaming.api.streamrecord.StreamRecord; @@ -87,4 +88,7 @@ public class StreamOutput implements Collector { output.clearBuffers(); } + public void broadcastEvent(TaskEvent barrier) throws IOException, InterruptedException { + output.broadcastEvent(barrier); + } } diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java index dcfd6fef8d1..cdf43ee07a6 100755 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java @@ -112,7 +112,7 @@ public class SingleOutputStreamOperator registerState(String name, OperatorState state) { + public SingleOutputStreamOperator registerState(String name, OperatorState state) { streamGraph.addOperatorState(getId(), name, state); return this; } @@ -128,7 +128,7 @@ public class SingleOutputStreamOperator registerState(Map> states) { + public SingleOutputStreamOperator registerState(Map> states) { for (Entry> entry : states.entrySet()) { streamGraph.addOperatorState(getId(), entry.getKey(), entry.getValue()); } diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java index abe31d4245d..6281de3555a 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java @@ -96,7 +96,7 @@ public abstract class StreamInvokable implements Serializable { * Reads the next record from the reader iterator and stores it in the * nextRecord variable */ - protected StreamRecord readNext() { + protected StreamRecord readNext() throws IOException { this.nextRecord = inSerializer.createInstance(); try { nextRecord = recordIterator.next(nextRecord); diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/InputHandler.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/InputHandler.java index e8a2ce17239..a95965c3027 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/InputHandler.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/InputHandler.java @@ -18,7 +18,9 @@ package org.apache.flink.streaming.api.streamvertex; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.event.task.StreamingSuperstep; import org.apache.flink.runtime.io.network.api.reader.MutableReader; +import org.apache.flink.runtime.io.network.partition.consumer.InputGate; import org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate; import org.apache.flink.runtime.plugable.DeserializationDelegate; import org.apache.flink.streaming.api.StreamConfig; @@ -51,25 +53,19 @@ public class InputHandler { inputSerializer = configuration.getTypeSerializerIn1(streamVertex.userClassLoader); int numberOfInputs = configuration.getNumberOfInputs(); - if (numberOfInputs > 0) { - if (numberOfInputs < 2) { - inputs = new IndexedMutableReader>>( - streamVertex.getEnvironment().getInputGate(0)); + if (numberOfInputs > 0) { + InputGate inputGate = numberOfInputs < 2 ? streamVertex.getEnvironment() + .getInputGate(0) : new UnionInputGate(streamVertex.getEnvironment() + .getAllInputGates()); - } else { - inputs = new IndexedMutableReader>>( - new UnionInputGate(streamVertex.getEnvironment().getAllInputGates())); - } + inputs = new IndexedMutableReader>>(inputGate); + inputs.registerTaskEventListener(streamVertex.getSuperstepListener(), + StreamingSuperstep.class); - inputIter = createInputIterator(); + inputIter = new IndexedReaderIterator>(inputs, inputSerializer); } - } - private IndexedReaderIterator> createInputIterator() { - final IndexedReaderIterator> iter = new IndexedReaderIterator>( - inputs, inputSerializer); - return iter; } protected static IndexedReaderIterator> staticCreateInputIterator( diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java index 359675db228..82f1329832c 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java @@ -25,6 +25,7 @@ import java.util.List; import java.util.Map; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.event.task.StreamingSuperstep; import org.apache.flink.runtime.io.network.api.writer.RecordWriter; import org.apache.flink.runtime.plugable.SerializationDelegate; import org.apache.flink.streaming.api.StreamConfig; @@ -84,6 +85,13 @@ public class OutputHandler { } + public void broadcastBarrier(long id) throws IOException, InterruptedException { + StreamingSuperstep barrier = new StreamingSuperstep(id); + for (StreamOutput streamOutput : outputMap.values()) { + streamOutput.broadcastEvent(barrier); + } + } + public Collection> getOutputs() { return outputMap.values(); } diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java index 99ca0985aa5..e2cdc342932 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java @@ -17,10 +17,16 @@ package org.apache.flink.streaming.api.streamvertex; +import java.io.IOException; import java.util.Map; +import org.apache.flink.runtime.event.task.StreamingSuperstep; +import org.apache.flink.runtime.event.task.TaskEvent; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.jobgraph.tasks.BarrierTransceiver; +import org.apache.flink.runtime.jobmanager.BarrierAck; +import org.apache.flink.runtime.util.event.EventListener; import org.apache.flink.streaming.api.StreamConfig; import org.apache.flink.streaming.api.invokable.ChainableInvokable; import org.apache.flink.streaming.api.invokable.StreamInvokable; @@ -34,7 +40,10 @@ import org.apache.flink.util.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class StreamVertex extends AbstractInvokable implements StreamTaskContext { +import akka.actor.ActorRef; + +public class StreamVertex extends AbstractInvokable implements StreamTaskContext, + BarrierTransceiver { private static final Logger LOG = LoggerFactory.getLogger(StreamVertex.class); @@ -53,10 +62,13 @@ public class StreamVertex extends AbstractInvokable implements StreamTa protected ClassLoader userClassLoader; + private EventListener superstepListener; + public StreamVertex() { userInvokable = null; numTasks = newVertex(); instanceID = numTasks; + superstepListener = new SuperstepEventListener(); } protected static int newVertex() { @@ -78,6 +90,22 @@ public class StreamVertex extends AbstractInvokable implements StreamTa this.context = createRuntimeContext(getEnvironment().getTaskName(), this.states); } + @Override + public void broadcastBarrier(long id) { + // Only called at input vertices + if (LOG.isDebugEnabled()) { + LOG.debug("Received barrier from jobmanager: " + id); + } + actOnBarrier(id); + } + + @Override + public void confirmBarrier(long barrierID) { + getEnvironment().getJobManager().tell( + new BarrierAck(getEnvironment().getJobID(), getEnvironment().getJobVertexId(), + context.getIndexOfThisSubtask(), barrierID), ActorRef.noSender()); + } + public void setInputsOutputs() { inputHandler = new InputHandler(this); outputHandler = new OutputHandler(this); @@ -205,4 +233,35 @@ public class StreamVertex extends AbstractInvokable implements StreamTa throw new IllegalArgumentException("CoReader not available"); } + public EventListener getSuperstepListener() { + return this.superstepListener; + } + + private void actOnBarrier(long id) { + try { + outputHandler.broadcastBarrier(id); + System.out.println("Superstep " + id + " processed: " + StreamVertex.this); + if (LOG.isDebugEnabled()) { + LOG.debug("Superstep " + id + " processed: " + StreamVertex.this); + } + } catch (IOException e) { + e.printStackTrace(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + @Override + public String toString() { + return configuration.getOperatorName() + " (" + context.getIndexOfThisSubtask() + ")"; + } + + private class SuperstepEventListener implements EventListener { + + @Override + public void onEvent(TaskEvent event) { + actOnBarrier(((StreamingSuperstep) event).getId()); + } + + } } diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoRecordReader.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoRecordReader.java index bb20ecbaa8a..79f09c48663 100755 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoRecordReader.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoRecordReader.java @@ -17,6 +17,10 @@ package org.apache.flink.streaming.io; +import java.io.IOException; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + import org.apache.flink.core.io.IOReadableWritable; import org.apache.flink.runtime.io.network.api.reader.AbstractReader; import org.apache.flink.runtime.io.network.api.reader.MutableRecordReader; @@ -28,10 +32,6 @@ import org.apache.flink.runtime.io.network.partition.consumer.InputGate; import org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate; import org.apache.flink.runtime.util.event.EventListener; -import java.io.IOException; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; - /** * A CoRecordReader wraps {@link MutableRecordReader}s of two different input * types to read records effectively. diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/IndexedMutableReader.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/IndexedMutableReader.java index 175dba2627e..025393db1ce 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/IndexedMutableReader.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/IndexedMutableReader.java @@ -34,4 +34,4 @@ public class IndexedMutableReader extends MutableR public int getNumberOfInputChannels() { return reader.getNumberOfInputChannels(); } -} \ No newline at end of file +} diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java index 4b13165d686..af836e273ae 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java @@ -79,7 +79,7 @@ public class MockContext implements StreamTaskContext { @Override public StreamRecord next() throws IOException { if (listIterator.hasNext()) { - StreamRecord result = new StreamRecord(); + StreamRecord result = inDeserializer.createInstance(); result.setObject(listIterator.next()); return result; } else { -- GitLab