提交 b18e410b 编写于 作者: S Stephan Ewen

[FLINK-2635] [streaming] Make input processors independent of batch reader interface.

上级 c09d14a9
......@@ -24,7 +24,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.api.reader.AbstractReader;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer.DeserializationResult;
import org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer;
......@@ -52,19 +52,21 @@ import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
*
* @param <IN> The type of the record that can be read with this record reader.
*/
public class StreamInputProcessor<IN> extends AbstractReader implements StreamingReader {
public class StreamInputProcessor<IN> {
private final RecordDeserializer<DeserializationDelegate<StreamElement>>[] recordDeserializers;
private RecordDeserializer<DeserializationDelegate<StreamElement>> currentRecordDeserializer;
private final CheckpointBarrierHandler barrierHandler;
// We need to keep track of the channel from which a buffer came, so that we can
// appropriately map the watermarks to input channels
private int currentChannel = -1;
private boolean isFinished;
private final CheckpointBarrierHandler barrierHandler;
private final long[] watermarks;
private long lastEmittedWatermark;
......@@ -77,8 +79,8 @@ public class StreamInputProcessor<IN> extends AbstractReader implements Streamin
CheckpointingMode checkpointMode,
IOManager ioManager,
boolean enableWatermarkMultiplexing) throws IOException {
super(InputGateUtil.createInputGate(inputGates));
InputGate inputGate = InputGateUtil.createInputGate(inputGates);
if (checkpointMode == CheckpointingMode.EXACTLY_ONCE) {
this.barrierHandler = new BarrierBuffer(inputGate, ioManager);
......@@ -173,7 +175,9 @@ public class StreamInputProcessor<IN> extends AbstractReader implements Streamin
else {
// Event received
final AbstractEvent event = bufferOrEvent.getEvent();
handleEvent(event);
if (event.getClass() != EndOfPartitionEvent.class) {
throw new IOException("Unexpected event: " + event);
}
}
}
else {
......@@ -185,15 +189,13 @@ public class StreamInputProcessor<IN> extends AbstractReader implements Streamin
}
}
}
@Override
public void setReporter(AccumulatorRegistry.Reporter reporter) {
for (RecordDeserializer<?> deserializer : recordDeserializers) {
deserializer.setReporter(reporter);
}
}
@Override
public void cleanup() throws IOException {
// clear the buffers first. this part should not ever fail
for (RecordDeserializer<?> deserializer : recordDeserializers) {
......
......@@ -22,8 +22,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.api.reader.AbstractReader;
import org.apache.flink.runtime.io.network.api.reader.ReaderBase;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer.DeserializationResult;
import org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer;
......@@ -41,8 +40,6 @@ import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordS
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Arrays;
......@@ -58,10 +55,7 @@ import java.util.Collection;
* @param <IN1> The type of the records that arrive on the first input
* @param <IN2> The type of the records that arrive on the second input
*/
public class StreamTwoInputProcessor<IN1, IN2> extends AbstractReader implements ReaderBase, StreamingReader {
@SuppressWarnings("unused")
private static final Logger LOG = LoggerFactory.getLogger(StreamTwoInputProcessor.class);
public class StreamTwoInputProcessor<IN1, IN2> {
private final RecordDeserializer<DeserializationDelegate<StreamElement>>[] recordDeserializers;
......@@ -97,7 +91,7 @@ public class StreamTwoInputProcessor<IN1, IN2> extends AbstractReader implements
IOManager ioManager,
boolean enableWatermarkMultiplexing) throws IOException {
super(InputGateUtil.createInputGate(inputGates1, inputGates2));
final InputGate inputGate = InputGateUtil.createInputGate(inputGates1, inputGates2);
if (checkpointMode == CheckpointingMode.EXACTLY_ONCE) {
this.barrierHandler = new BarrierBuffer(inputGate, ioManager);
......@@ -157,8 +151,7 @@ public class StreamTwoInputProcessor<IN1, IN2> extends AbstractReader implements
Arrays.fill(watermarks2, Long.MIN_VALUE);
lastEmittedWatermark2 = Long.MIN_VALUE;
}
@SuppressWarnings("unchecked")
public boolean processInput(TwoInputStreamOperator<IN1, IN2, ?> streamOperator) throws Exception {
if (isFinished) {
return false;
......@@ -216,7 +209,9 @@ public class StreamTwoInputProcessor<IN1, IN2> extends AbstractReader implements
} else {
// Event received
final AbstractEvent event = bufferOrEvent.getEvent();
handleEvent(event);
if (event.getClass() != EndOfPartitionEvent.class) {
throw new IOException("Unexpected event: " + event);
}
}
}
else {
......@@ -259,15 +254,13 @@ public class StreamTwoInputProcessor<IN1, IN2> extends AbstractReader implements
}
}
}
@Override
public void setReporter(AccumulatorRegistry.Reporter reporter) {
for (RecordDeserializer<?> deserializer : recordDeserializers) {
deserializer.setReporter(reporter);
}
}
@Override
public void cleanup() throws IOException {
// clear the buffers first. this part should not ever fail
for (RecordDeserializer<?> deserializer : recordDeserializers) {
......
......@@ -32,13 +32,12 @@ import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.operators.chaining.ExceptionInChainedStubException;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.util.NoOpIntMap;
import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
import org.apache.flink.streaming.util.TestListResultSink;
import org.apache.flink.streaming.util.TestStreamEnvironment;
import org.junit.Test;
/**
......@@ -147,7 +146,9 @@ public class PartitionerTest extends StreamingMultipleProgramsTestBase {
try {
env.execute();
} catch (Exception e) {
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册