diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java index cc91d6367db206b720eba5e1e1b8f70bfb8325e8..8ce8a01d5c3966309e5bf2d9df12990219c24af1 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java @@ -24,7 +24,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.event.AbstractEvent; import org.apache.flink.runtime.io.disk.iomanager.IOManager; -import org.apache.flink.runtime.io.network.api.reader.AbstractReader; +import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent; import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer; import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer.DeserializationResult; import org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer; @@ -52,19 +52,21 @@ import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext; * * @param The type of the record that can be read with this record reader. */ -public class StreamInputProcessor extends AbstractReader implements StreamingReader { +public class StreamInputProcessor { private final RecordDeserializer>[] recordDeserializers; private RecordDeserializer> currentRecordDeserializer; + private final CheckpointBarrierHandler barrierHandler; + // We need to keep track of the channel from which a buffer came, so that we can // appropriately map the watermarks to input channels private int currentChannel = -1; private boolean isFinished; - private final CheckpointBarrierHandler barrierHandler; + private final long[] watermarks; private long lastEmittedWatermark; @@ -77,8 +79,8 @@ public class StreamInputProcessor extends AbstractReader implements Streamin CheckpointingMode checkpointMode, IOManager ioManager, boolean enableWatermarkMultiplexing) throws IOException { - - super(InputGateUtil.createInputGate(inputGates)); + + InputGate inputGate = InputGateUtil.createInputGate(inputGates); if (checkpointMode == CheckpointingMode.EXACTLY_ONCE) { this.barrierHandler = new BarrierBuffer(inputGate, ioManager); @@ -173,7 +175,9 @@ public class StreamInputProcessor extends AbstractReader implements Streamin else { // Event received final AbstractEvent event = bufferOrEvent.getEvent(); - handleEvent(event); + if (event.getClass() != EndOfPartitionEvent.class) { + throw new IOException("Unexpected event: " + event); + } } } else { @@ -185,15 +189,13 @@ public class StreamInputProcessor extends AbstractReader implements Streamin } } } - - @Override + public void setReporter(AccumulatorRegistry.Reporter reporter) { for (RecordDeserializer deserializer : recordDeserializers) { deserializer.setReporter(reporter); } } - - @Override + public void cleanup() throws IOException { // clear the buffers first. this part should not ever fail for (RecordDeserializer deserializer : recordDeserializers) { diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java index 7dffa7160a971fae365060c443a6c0332c6fbbed..6322cc85f2509c9f73be51c2e3ff29afd13d6af4 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java @@ -22,8 +22,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.event.AbstractEvent; import org.apache.flink.runtime.io.disk.iomanager.IOManager; -import org.apache.flink.runtime.io.network.api.reader.AbstractReader; -import org.apache.flink.runtime.io.network.api.reader.ReaderBase; +import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent; import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer; import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer.DeserializationResult; import org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer; @@ -41,8 +40,6 @@ import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordS import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer; import org.apache.flink.runtime.io.network.api.CheckpointBarrier; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.Arrays; @@ -58,10 +55,7 @@ import java.util.Collection; * @param The type of the records that arrive on the first input * @param The type of the records that arrive on the second input */ -public class StreamTwoInputProcessor extends AbstractReader implements ReaderBase, StreamingReader { - - @SuppressWarnings("unused") - private static final Logger LOG = LoggerFactory.getLogger(StreamTwoInputProcessor.class); +public class StreamTwoInputProcessor { private final RecordDeserializer>[] recordDeserializers; @@ -97,7 +91,7 @@ public class StreamTwoInputProcessor extends AbstractReader implements IOManager ioManager, boolean enableWatermarkMultiplexing) throws IOException { - super(InputGateUtil.createInputGate(inputGates1, inputGates2)); + final InputGate inputGate = InputGateUtil.createInputGate(inputGates1, inputGates2); if (checkpointMode == CheckpointingMode.EXACTLY_ONCE) { this.barrierHandler = new BarrierBuffer(inputGate, ioManager); @@ -157,8 +151,7 @@ public class StreamTwoInputProcessor extends AbstractReader implements Arrays.fill(watermarks2, Long.MIN_VALUE); lastEmittedWatermark2 = Long.MIN_VALUE; } - - @SuppressWarnings("unchecked") + public boolean processInput(TwoInputStreamOperator streamOperator) throws Exception { if (isFinished) { return false; @@ -216,7 +209,9 @@ public class StreamTwoInputProcessor extends AbstractReader implements } else { // Event received final AbstractEvent event = bufferOrEvent.getEvent(); - handleEvent(event); + if (event.getClass() != EndOfPartitionEvent.class) { + throw new IOException("Unexpected event: " + event); + } } } else { @@ -259,15 +254,13 @@ public class StreamTwoInputProcessor extends AbstractReader implements } } } - - @Override + public void setReporter(AccumulatorRegistry.Reporter reporter) { for (RecordDeserializer deserializer : recordDeserializers) { deserializer.setReporter(reporter); } } - - @Override + public void cleanup() throws IOException { // clear the buffers first. this part should not ever fail for (RecordDeserializer deserializer : recordDeserializers) { diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PartitionerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PartitionerTest.java index 987a8fb3fc228b265dcf1abfc2472312c842f1ef..a6c6936ba71cb1baec192f3c387add3282e158f2 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PartitionerTest.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PartitionerTest.java @@ -32,13 +32,12 @@ import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.java.tuple.Tuple1; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.operators.chaining.ExceptionInChainedStubException; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.util.NoOpIntMap; import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase; import org.apache.flink.streaming.util.TestListResultSink; -import org.apache.flink.streaming.util.TestStreamEnvironment; + import org.junit.Test; /** @@ -147,7 +146,9 @@ public class PartitionerTest extends StreamingMultipleProgramsTestBase { try { env.execute(); - } catch (Exception e) { + } + catch (Exception e) { + e.printStackTrace(); fail(e.getMessage()); }