提交 5061edb8 编写于 作者: G Gyula Fora 提交者: Stephan Ewen

[FLINK-1638] [streaming] Vertex level fault tolerance and state monitor

上级 b4e8350f
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.event.task;
import java.io.IOException;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
public class StreamingSuperstep extends TaskEvent {
protected long id;
public StreamingSuperstep() {
}
public StreamingSuperstep(long id) {
this.id = id;
}
@Override
public void write(DataOutputView out) throws IOException {
out.writeLong(id);
}
@Override
public void read(DataInputView in) throws IOException {
id = in.readLong();
}
public long getId() {
return id;
}
}
......@@ -111,6 +111,10 @@ public abstract class AbstractReader implements ReaderBase {
}
}
public void publish(TaskEvent event){
taskEventHandler.publish(event);
}
// ------------------------------------------------------------------------
// Iterations
// ------------------------------------------------------------------------
......
......@@ -18,24 +18,37 @@
package org.apache.flink.runtime.io.network.api.reader;
import java.io.IOException;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.Queue;
import java.util.Set;
import org.apache.flink.core.io.IOReadableWritable;
import org.apache.flink.runtime.event.task.AbstractEvent;
import org.apache.flink.runtime.event.task.StreamingSuperstep;
import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer.DeserializationResult;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.io.network.serialization.SpillingAdaptiveSpanningRecordDeserializer;
import java.io.IOException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A record-oriented reader.
* <p>
* This abstract base class is used by both the mutable and immutable record readers.
* This abstract base class is used by both the mutable and immutable record
* readers.
*
* @param <T> The type of the record that can be read with this record reader.
* @param <T>
* The type of the record that can be read with this record reader.
*/
abstract class AbstractRecordReader<T extends IOReadableWritable> extends AbstractReader implements ReaderBase {
abstract class AbstractRecordReader<T extends IOReadableWritable> extends AbstractReader implements
ReaderBase {
private static final Logger LOG = LoggerFactory.getLogger(AbstractRecordReader.class);
private final RecordDeserializer<T>[] recordDeserializers;
......@@ -43,11 +56,15 @@ abstract class AbstractRecordReader<T extends IOReadableWritable> extends Abstra
private boolean isFinished;
private final BarrierBuffer barrierBuffer;
protected AbstractRecordReader(InputGate inputGate) {
super(inputGate);
barrierBuffer = new BarrierBuffer(inputGate, this);
// Initialize one deserializer per input channel
this.recordDeserializers = new SpillingAdaptiveSpanningRecordDeserializer[inputGate.getNumberOfInputChannels()];
this.recordDeserializers = new SpillingAdaptiveSpanningRecordDeserializer[inputGate
.getNumberOfInputChannels()];
for (int i = 0; i < recordDeserializers.length; i++) {
recordDeserializers[i] = new SpillingAdaptiveSpanningRecordDeserializer<T>();
}
......@@ -72,25 +89,30 @@ abstract class AbstractRecordReader<T extends IOReadableWritable> extends Abstra
}
}
final BufferOrEvent bufferOrEvent = inputGate.getNextBufferOrEvent();
final BufferOrEvent bufferOrEvent = barrierBuffer.getNextNonBlocked();
if (bufferOrEvent.isBuffer()) {
currentRecordDeserializer = recordDeserializers[bufferOrEvent.getChannelIndex()];
currentRecordDeserializer.setNextBuffer(bufferOrEvent.getBuffer());
}
else if (handleEvent(bufferOrEvent.getEvent())) {
} else {
// Event received
final AbstractEvent event = bufferOrEvent.getEvent();
if (event instanceof StreamingSuperstep) {
barrierBuffer.processSuperstep(bufferOrEvent);
} else {
if (handleEvent(event)) {
if (inputGate.isFinished()) {
isFinished = true;
return false;
}
else if (hasReachedEndOfSuperstep()) {
} else if (hasReachedEndOfSuperstep()) {
return false;
} // else: More data is coming...
}
}
}
}
}
public void clearBuffers() {
for (RecordDeserializer<?> deserializer : recordDeserializers) {
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.io.network.api.reader;
import java.io.IOException;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.Queue;
import java.util.Set;
import org.apache.flink.runtime.event.task.StreamingSuperstep;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class BarrierBuffer {
private static final Logger LOG = LoggerFactory.getLogger(BarrierBuffer.class);
private Queue<BufferOrEvent> bufferOrEvents = new LinkedList<BufferOrEvent>();
private Queue<BufferOrEvent> unprocessed = new LinkedList<BufferOrEvent>();
private Set<Integer> blockedChannels = new HashSet<Integer>();
private int totalNumberOfInputChannels;
private StreamingSuperstep currentSuperstep;
private boolean receivedSuperstep;
private boolean blockAll = false;
private AbstractReader reader;
private InputGate inputGate;
public BarrierBuffer(InputGate inputGate, AbstractReader reader) {
this.inputGate = inputGate;
totalNumberOfInputChannels = inputGate.getNumberOfInputChannels();
this.reader = reader;
}
private void startSuperstep(StreamingSuperstep superstep) {
this.currentSuperstep = superstep;
this.receivedSuperstep = true;
if (LOG.isDebugEnabled()) {
LOG.debug("Superstep started with id: " + superstep.getId());
}
}
private void store(BufferOrEvent bufferOrEvent) {
bufferOrEvents.add(bufferOrEvent);
}
private BufferOrEvent getNonProcessed() {
return unprocessed.poll();
}
private boolean isBlocked(int channelIndex) {
return blockAll || blockedChannels.contains(channelIndex);
}
private boolean containsNonprocessed() {
return !unprocessed.isEmpty();
}
private boolean receivedSuperstep() {
return receivedSuperstep;
}
public BufferOrEvent getNextNonBlocked() throws IOException,
InterruptedException {
BufferOrEvent bufferOrEvent = null;
if (containsNonprocessed()) {
bufferOrEvent = getNonProcessed();
} else {
while (bufferOrEvent == null) {
BufferOrEvent nextBufferOrEvent = inputGate.getNextBufferOrEvent();
if (isBlocked(nextBufferOrEvent.getChannelIndex())) {
store(nextBufferOrEvent);
} else {
bufferOrEvent = nextBufferOrEvent;
}
}
}
return bufferOrEvent;
}
private void blockChannel(int channelIndex) {
if (!blockedChannels.contains(channelIndex)) {
blockedChannels.add(channelIndex);
if (LOG.isDebugEnabled()) {
LOG.debug("Channel blocked with index: " + channelIndex);
}
if (blockedChannels.size() == totalNumberOfInputChannels) {
reader.publish(currentSuperstep);
unprocessed.addAll(bufferOrEvents);
bufferOrEvents.clear();
blockedChannels.clear();
receivedSuperstep = false;
if (LOG.isDebugEnabled()) {
LOG.debug("All barriers received, blocks released");
}
}
} else {
throw new RuntimeException("Tried to block an already blocked channel");
}
}
public String toString() {
return blockedChannels.toString();
}
public void processSuperstep(BufferOrEvent bufferOrEvent) {
int channelIndex = bufferOrEvent.getChannelIndex();
if (isBlocked(channelIndex)) {
store(bufferOrEvent);
} else {
StreamingSuperstep superstep = (StreamingSuperstep) bufferOrEvent.getEvent();
if (!receivedSuperstep()) {
startSuperstep(superstep);
}
blockChannel(channelIndex);
}
}
}
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.jobgraph.tasks;
public interface BarrierTransceiver {
public void broadcastBarrier(long barrierID);
public void confirmBarrier(long barrierID);
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.jobmanager
import akka.actor._
import org.apache.flink.runtime.ActorLogMessages
import org.apache.flink.runtime.executiongraph.{ExecutionAttemptID, ExecutionGraph, ExecutionVertex}
import org.apache.flink.runtime.jobgraph.{JobID, JobVertexID}
import scala.collection.JavaConversions.mapAsScalaMap
import scala.collection.immutable.TreeMap
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration.{FiniteDuration, _}
object StreamStateMonitor {
def props(context: ActorContext, executionGraph: ExecutionGraph,
interval: FiniteDuration = 5 seconds): ActorRef = {
val vertices: Iterable[ExecutionVertex] = getExecutionVertices(executionGraph)
val monitor = context.system.actorOf(Props(new StreamStateMonitor(executionGraph,
vertices, vertices.map(x => ((x.getJobVertex.getJobVertexId, x.getParallelSubtaskIndex), List.empty[Long])).toMap, interval, 0L, -1L)))
monitor ! InitBarrierScheduler
monitor
}
private def getExecutionVertices(executionGraph: ExecutionGraph): Iterable[ExecutionVertex] = {
for ((_, execJobVertex) <- executionGraph.getAllVertices;
execVertex: ExecutionVertex <- execJobVertex.getTaskVertices)
yield execVertex
}
}
class StreamStateMonitor(val executionGraph: ExecutionGraph,
val vertices: Iterable[ExecutionVertex], var acks: Map[(JobVertexID, Int), List[Long]],
val interval: FiniteDuration, var curId: Long, var ackId: Long)
extends Actor with ActorLogMessages with ActorLogging {
override def receiveWithLogMessages: Receive = {
case InitBarrierScheduler =>
context.system.scheduler.schedule(interval, interval, self, BarrierTimeout)
context.system.scheduler.schedule(2 * interval, 2 * interval, self, UpdateCurrentBarrier)
log.debug("[FT-MONITOR] Started Stream State Monitor for job {}{}",
executionGraph.getJobID, executionGraph.getJobName)
case BarrierTimeout =>
curId += 1
log.debug("[FT-MONITOR] Sending Barrier to vertices of Job " + executionGraph.getJobName)
vertices.filter(v => v.getJobVertex.getJobVertex.isInputVertex).foreach(vertex
=> vertex.getCurrentAssignedResource.getInstance.getTaskManager
! BarrierReq(vertex.getCurrentExecutionAttempt.getAttemptId, curId))
case BarrierAck(_, jobVertexID, instanceID, checkpointID) =>
acks.get(jobVertexID, instanceID) match {
case Some(acklist) =>
acks += (jobVertexID, instanceID) -> (checkpointID :: acklist)
case None =>
}
log.info(acks.toString)
case UpdateCurrentBarrier =>
val barrierCount = acks.values.foldLeft(TreeMap[Long, Int]().withDefaultValue(0))((dict, myList)
=> myList.foldLeft(dict)((dict2, elem) => dict2.updated(elem, dict2(elem) + 1)))
val keysToKeep = barrierCount.filter(_._2 == acks.size).keys
ackId = if (!keysToKeep.isEmpty) keysToKeep.max else ackId
acks.keys.foreach(x => acks = acks.updated(x, acks(x).filter(_ >= ackId)))
log.debug("[FT-MONITOR] Last global barrier is " + ackId)
}
}
case class BarrierTimeout()
case class InitBarrierScheduler()
case class UpdateCurrentBarrier()
case class BarrierReq(attemptID: ExecutionAttemptID, checkpointID: Long)
case class BarrierAck(jobID: JobID, jobVertexID: JobVertexID, instanceID: Int, checkpointID: Long)
......@@ -48,6 +48,7 @@ public class StreamConfig implements Serializable {
private static final String OUTPUT_NAME = "outputName_";
private static final String PARTITIONER_OBJECT = "partitionerObject_";
private static final String VERTEX_NAME = "vertexID";
private static final String OPERATOR_NAME = "operatorName";
private static final String ITERATION_ID = "iteration-id";
private static final String OUTPUT_SELECTOR = "outputSelector";
private static final String DIRECTED_EMIT = "directedEmit";
......@@ -87,6 +88,14 @@ public class StreamConfig implements Serializable {
return config.getInteger(VERTEX_NAME, -1);
}
public void setOperatorName(String name) {
config.setString(OPERATOR_NAME, name);
}
public String getOperatorName() {
return config.getString(OPERATOR_NAME, "Missing");
}
public void setTypeSerializerIn1(StreamRecordSerializer<?> serializer) {
setTypeSerializer(TYPE_SERIALIZER_IN_1, serializer);
}
......
......@@ -188,7 +188,9 @@ public class StreamingJobGraphGenerator {
builtVertices.add(vertexID);
jobGraph.addVertex(vertex);
return new StreamConfig(vertex.getConfiguration());
StreamConfig retConfig = new StreamConfig(vertex.getConfiguration());
retConfig.setOperatorName(chainedNames.get(vertexID));
return retConfig;
}
private void setVertexConfig(Integer vertexID, StreamConfig config,
......
......@@ -19,6 +19,7 @@ package org.apache.flink.streaming.api.collector;
import java.io.IOException;
import org.apache.flink.runtime.event.task.TaskEvent;
import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
import org.apache.flink.runtime.plugable.SerializationDelegate;
import org.apache.flink.streaming.api.streamrecord.StreamRecord;
......@@ -87,4 +88,7 @@ public class StreamOutput<OUT> implements Collector<OUT> {
output.clearBuffers();
}
public void broadcastEvent(TaskEvent barrier) throws IOException, InterruptedException {
output.broadcastEvent(barrier);
}
}
......@@ -112,7 +112,7 @@ public class SingleOutputStreamOperator<OUT, O extends SingleOutputStreamOperato
* The state to be registered for this name.
* @return The data stream with state registered.
*/
protected SingleOutputStreamOperator<OUT, O> registerState(String name, OperatorState<?> state) {
public SingleOutputStreamOperator<OUT, O> registerState(String name, OperatorState<?> state) {
streamGraph.addOperatorState(getId(), name, state);
return this;
}
......@@ -128,7 +128,7 @@ public class SingleOutputStreamOperator<OUT, O extends SingleOutputStreamOperato
* The map containing the states that will be registered.
* @return The data stream with states registered.
*/
protected SingleOutputStreamOperator<OUT, O> registerState(Map<String, OperatorState<?>> states) {
public SingleOutputStreamOperator<OUT, O> registerState(Map<String, OperatorState<?>> states) {
for (Entry<String, OperatorState<?>> entry : states.entrySet()) {
streamGraph.addOperatorState(getId(), entry.getKey(), entry.getValue());
}
......
......@@ -96,7 +96,7 @@ public abstract class StreamInvokable<IN, OUT> implements Serializable {
* Reads the next record from the reader iterator and stores it in the
* nextRecord variable
*/
protected StreamRecord<IN> readNext() {
protected StreamRecord<IN> readNext() throws IOException {
this.nextRecord = inSerializer.createInstance();
try {
nextRecord = recordIterator.next(nextRecord);
......
......@@ -18,7 +18,9 @@
package org.apache.flink.streaming.api.streamvertex;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.event.task.StreamingSuperstep;
import org.apache.flink.runtime.io.network.api.reader.MutableReader;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate;
import org.apache.flink.runtime.plugable.DeserializationDelegate;
import org.apache.flink.streaming.api.StreamConfig;
......@@ -51,25 +53,19 @@ public class InputHandler<IN> {
inputSerializer = configuration.getTypeSerializerIn1(streamVertex.userClassLoader);
int numberOfInputs = configuration.getNumberOfInputs();
if (numberOfInputs > 0) {
InputGate inputGate = numberOfInputs < 2 ? streamVertex.getEnvironment()
.getInputGate(0) : new UnionInputGate(streamVertex.getEnvironment()
.getAllInputGates());
if (numberOfInputs < 2) {
inputs = new IndexedMutableReader<DeserializationDelegate<StreamRecord<IN>>>(
streamVertex.getEnvironment().getInputGate(0));
inputs = new IndexedMutableReader<DeserializationDelegate<StreamRecord<IN>>>(inputGate);
inputs.registerTaskEventListener(streamVertex.getSuperstepListener(),
StreamingSuperstep.class);
} else {
inputs = new IndexedMutableReader<DeserializationDelegate<StreamRecord<IN>>>(
new UnionInputGate(streamVertex.getEnvironment().getAllInputGates()));
inputIter = new IndexedReaderIterator<StreamRecord<IN>>(inputs, inputSerializer);
}
inputIter = createInputIterator();
}
}
private IndexedReaderIterator<StreamRecord<IN>> createInputIterator() {
final IndexedReaderIterator<StreamRecord<IN>> iter = new IndexedReaderIterator<StreamRecord<IN>>(
inputs, inputSerializer);
return iter;
}
protected static <T> IndexedReaderIterator<StreamRecord<T>> staticCreateInputIterator(
......
......@@ -25,6 +25,7 @@ import java.util.List;
import java.util.Map;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.event.task.StreamingSuperstep;
import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
import org.apache.flink.runtime.plugable.SerializationDelegate;
import org.apache.flink.streaming.api.StreamConfig;
......@@ -84,6 +85,13 @@ public class OutputHandler<OUT> {
}
public void broadcastBarrier(long id) throws IOException, InterruptedException {
StreamingSuperstep barrier = new StreamingSuperstep(id);
for (StreamOutput<?> streamOutput : outputMap.values()) {
streamOutput.broadcastEvent(barrier);
}
}
public Collection<StreamOutput<?>> getOutputs() {
return outputMap.values();
}
......
......@@ -17,10 +17,16 @@
package org.apache.flink.streaming.api.streamvertex;
import java.io.IOException;
import java.util.Map;
import org.apache.flink.runtime.event.task.StreamingSuperstep;
import org.apache.flink.runtime.event.task.TaskEvent;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobgraph.tasks.BarrierTransceiver;
import org.apache.flink.runtime.jobmanager.BarrierAck;
import org.apache.flink.runtime.util.event.EventListener;
import org.apache.flink.streaming.api.StreamConfig;
import org.apache.flink.streaming.api.invokable.ChainableInvokable;
import org.apache.flink.streaming.api.invokable.StreamInvokable;
......@@ -34,7 +40,10 @@ import org.apache.flink.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class StreamVertex<IN, OUT> extends AbstractInvokable implements StreamTaskContext<OUT> {
import akka.actor.ActorRef;
public class StreamVertex<IN, OUT> extends AbstractInvokable implements StreamTaskContext<OUT>,
BarrierTransceiver {
private static final Logger LOG = LoggerFactory.getLogger(StreamVertex.class);
......@@ -53,10 +62,13 @@ public class StreamVertex<IN, OUT> extends AbstractInvokable implements StreamTa
protected ClassLoader userClassLoader;
private EventListener<TaskEvent> superstepListener;
public StreamVertex() {
userInvokable = null;
numTasks = newVertex();
instanceID = numTasks;
superstepListener = new SuperstepEventListener();
}
protected static int newVertex() {
......@@ -78,6 +90,22 @@ public class StreamVertex<IN, OUT> extends AbstractInvokable implements StreamTa
this.context = createRuntimeContext(getEnvironment().getTaskName(), this.states);
}
@Override
public void broadcastBarrier(long id) {
// Only called at input vertices
if (LOG.isDebugEnabled()) {
LOG.debug("Received barrier from jobmanager: " + id);
}
actOnBarrier(id);
}
@Override
public void confirmBarrier(long barrierID) {
getEnvironment().getJobManager().tell(
new BarrierAck(getEnvironment().getJobID(), getEnvironment().getJobVertexId(),
context.getIndexOfThisSubtask(), barrierID), ActorRef.noSender());
}
public void setInputsOutputs() {
inputHandler = new InputHandler<IN>(this);
outputHandler = new OutputHandler<OUT>(this);
......@@ -205,4 +233,35 @@ public class StreamVertex<IN, OUT> extends AbstractInvokable implements StreamTa
throw new IllegalArgumentException("CoReader not available");
}
public EventListener<TaskEvent> getSuperstepListener() {
return this.superstepListener;
}
private void actOnBarrier(long id) {
try {
outputHandler.broadcastBarrier(id);
System.out.println("Superstep " + id + " processed: " + StreamVertex.this);
if (LOG.isDebugEnabled()) {
LOG.debug("Superstep " + id + " processed: " + StreamVertex.this);
}
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public String toString() {
return configuration.getOperatorName() + " (" + context.getIndexOfThisSubtask() + ")";
}
private class SuperstepEventListener implements EventListener<TaskEvent> {
@Override
public void onEvent(TaskEvent event) {
actOnBarrier(((StreamingSuperstep) event).getId());
}
}
}
......@@ -17,6 +17,10 @@
package org.apache.flink.streaming.io;
import java.io.IOException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.flink.core.io.IOReadableWritable;
import org.apache.flink.runtime.io.network.api.reader.AbstractReader;
import org.apache.flink.runtime.io.network.api.reader.MutableRecordReader;
......@@ -28,10 +32,6 @@ import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate;
import org.apache.flink.runtime.util.event.EventListener;
import java.io.IOException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
/**
* A CoRecordReader wraps {@link MutableRecordReader}s of two different input
* types to read records effectively.
......
......@@ -79,7 +79,7 @@ public class MockContext<IN, OUT> implements StreamTaskContext<OUT> {
@Override
public StreamRecord<IN> next() throws IOException {
if (listIterator.hasNext()) {
StreamRecord<IN> result = new StreamRecord<IN>();
StreamRecord<IN> result = inDeserializer.createInstance();
result.setObject(listIterator.next());
return result;
} else {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册