diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractInput.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractInput.java new file mode 100644 index 0000000000000000000000000000000000000000..e9d227c672207da6f503ffbbc1f97e709ceef68c --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractInput.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; + +import static org.apache.flink.util.Preconditions.checkArgument; + +/** + * Base abstract implementation of {@link Input} interface intended to be used when extending + * {@link AbstractStreamOperatorV2}. + */ +@Experimental +public abstract class AbstractInput implements Input { + protected final AbstractStreamOperatorV2 owner; + protected final int inputId; + protected final Output> output; + + public AbstractInput(AbstractStreamOperatorV2 owner, int inputId) { + checkArgument(inputId > 0, "Inputs are index from 1"); + this.owner = owner; + this.inputId = inputId; + this.output = owner.output; + } + + @Override + public void processWatermark(Watermark mark) throws Exception { + owner.reportWatermark(mark, inputId); + } +} diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java index a4ad021d2438d4f211a3ba04d099b9d81d3597a0..9f1fa89c318cb9bb48e6278261d46e01a7ba2dc1 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java @@ -465,7 +465,7 @@ public abstract class AbstractStreamOperatorV2 implements StreamOperator { * This method is guaranteed to not be called concurrently with other methods of the operator. */ void processElement(StreamRecord element) throws Exception; + + /** + * Processes a {@link Watermark} that arrived on the first input of this two-input operator. + * This method is guaranteed to not be called concurrently with other methods of the operator. + * + * @see org.apache.flink.streaming.api.watermark.Watermark + */ + void processWatermark(Watermark mark) throws Exception; } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java index 67545499723697a409a4d792da35fab980e5283b..b93e5179351bdb11b0a7cfbb874d5aa38b89d906 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java @@ -81,13 +81,14 @@ public final class StreamMultipleInputProcessor implements StreamInputProcessor this.inputSelectionHandler = checkNotNull(inputSelectionHandler); List inputs = streamOperator.getInputs(); - int operatorsCount = inputs.size(); + int inputsCount = inputs.size(); - this.inputProcessors = new InputProcessor[operatorsCount]; - this.streamStatuses = new StreamStatus[operatorsCount]; + this.inputProcessors = new InputProcessor[inputsCount]; + this.streamStatuses = new StreamStatus[inputsCount]; this.numRecordsIn = numRecordsIn; - for (int i = 0; i < operatorsCount; i++) { + for (int i = 0; i < inputsCount; i++) { + streamStatuses[i] = StreamStatus.ACTIVE; StreamTaskNetworkOutput dataOutput = new StreamTaskNetworkOutput<>( inputs.get(i), streamStatusMaintainer, @@ -282,7 +283,8 @@ public final class StreamMultipleInputProcessor implements StreamInputProcessor @Override public void emitWatermark(Watermark watermark) throws Exception { - throw new UnsupportedOperationException(); + inputWatermarkGauge.setCurrentWatermark(watermark.getTimestamp()); + input.processWatermark(watermark); } @Override diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTask.java index 9cc361adb07f5c457205713beb93f9823ad6769f..7562be7e0621adfda4e8dd7bba2ec064fcb30cec 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTask.java @@ -63,7 +63,7 @@ public class MultipleInputStreamTask extends StreamTask(); watermarkGauges[i] = new WatermarkGauge(); - headOperator.getMetricGroup().gauge(MetricNames.currentInputWatermarkName(i), watermarkGauges[i]); + headOperator.getMetricGroup().gauge(MetricNames.currentInputWatermarkName(i + 1), watermarkGauges[i]); } MinWatermarkGauge minInputWatermarkGauge = new MinWatermarkGauge(watermarkGauges); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java index 383721d6b06b98863f2bf4c14451a2ac350d8dbc..75b7e5bcd35fd2b75df25cd7dc2b25e7ee6613b7 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java @@ -21,14 +21,21 @@ package org.apache.flink.streaming.runtime.tasks; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.Metric; import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker; import org.apache.flink.runtime.io.network.api.CheckpointBarrier; import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.metrics.MetricNames; import org.apache.flink.runtime.metrics.NoOpMetricRegistry; import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup; +import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup; import org.apache.flink.runtime.metrics.groups.TaskMetricGroup; import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; +import org.apache.flink.runtime.metrics.util.InterceptingOperatorMetricGroup; +import org.apache.flink.runtime.metrics.util.InterceptingTaskMetricGroup; +import org.apache.flink.streaming.api.operators.AbstractInput; import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory; import org.apache.flink.streaming.api.operators.AbstractStreamOperatorV2; import org.apache.flink.streaming.api.operators.Input; @@ -36,17 +43,25 @@ import org.apache.flink.streaming.api.operators.MultipleInputStreamOperator; import org.apache.flink.streaming.api.operators.StreamOperator; import org.apache.flink.streaming.api.operators.StreamOperatorParameters; import org.apache.flink.streaming.api.operators.co.CoStreamMap; +import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.io.InputStatus; import org.apache.flink.streaming.runtime.io.StreamMultipleInputProcessor; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.streamstatus.StreamStatus; +import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTest.WatermarkMetricOperator; import org.apache.flink.streaming.util.TestBoundedMultipleInputOperator; +import org.apache.flink.streaming.util.TestHarnessUtil; +import org.hamcrest.collection.IsEmptyCollection; +import org.hamcrest.collection.IsMapContaining; import org.junit.Assert; import org.junit.Test; import java.util.ArrayDeque; import java.util.Arrays; import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.contains; @@ -57,7 +72,6 @@ import static org.junit.Assert.assertEquals; * {@link StreamMultipleInputProcessor}. */ public class MultipleInputStreamTaskTest { - /** * This test verifies that open() and close() are correctly called. This test also verifies * that timestamps of emitted elements are correct. {@link CoStreamMap} assigns the input @@ -257,10 +271,17 @@ public class MultipleInputStreamTaskTest { @Override public List getInputs() { - return Arrays.asList(new DuplicatingInput(), new DuplicatingInput(), new DuplicatingInput()); + return Arrays.asList( + new DuplicatingInput(this, 1), + new DuplicatingInput(this, 2), + new DuplicatingInput(this, 3)); } - class DuplicatingInput implements Input { + class DuplicatingInput extends AbstractInput { + public DuplicatingInput(AbstractStreamOperatorV2 owner, int inputId) { + super(owner, inputId); + } + @Override public void processElement(StreamRecord element) throws Exception { output.collect(element); @@ -355,8 +376,244 @@ public class MultipleInputStreamTaskTest { } } - // This must only be used in one test, otherwise the static fields will be changed - // by several tests concurrently + @Test + public void testWatermark() throws Exception { + try (StreamTaskMailboxTestHarness testHarness = + new MultipleInputStreamTaskTestHarnessBuilder<>(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO) + .addInput(BasicTypeInfo.STRING_TYPE_INFO, 2) + .addInput(BasicTypeInfo.INT_TYPE_INFO, 2) + .addInput(BasicTypeInfo.DOUBLE_TYPE_INFO, 2) + .setupOutputForSingletonOperatorChain(new MapToStringMultipleInputOperatorFactory()) + .build()) { + ArrayDeque expectedOutput = new ArrayDeque<>(); + + long initialTime = 0L; + + testHarness.processElement(new Watermark(initialTime), 0, 0); + testHarness.processElement(new Watermark(initialTime), 0, 1); + testHarness.processElement(new Watermark(initialTime), 1, 0); + testHarness.processElement(new Watermark(initialTime), 1, 1); + + testHarness.processElement(new Watermark(initialTime), 2, 0); + + assertThat(testHarness.getOutput(), IsEmptyCollection.empty()); + + testHarness.processElement(new Watermark(initialTime), 2, 1); + + // now the watermark should have propagated, Map simply forward Watermarks + expectedOutput.add(new Watermark(initialTime)); + assertThat(testHarness.getOutput(), contains(expectedOutput.toArray())); + + // contrary to checkpoint barriers these elements are not blocked by watermarks + testHarness.processElement(new StreamRecord<>("Hello", initialTime), 0, 0); + testHarness.processElement(new StreamRecord<>(42, initialTime), 1, 1); + expectedOutput.add(new StreamRecord<>("Hello", initialTime)); + expectedOutput.add(new StreamRecord<>("42", initialTime)); + + assertThat(testHarness.getOutput(), contains(expectedOutput.toArray())); + + testHarness.processElement(new Watermark(initialTime + 4), 0, 0); + testHarness.processElement(new Watermark(initialTime + 3), 0, 1); + testHarness.processElement(new Watermark(initialTime + 3), 1, 0); + testHarness.processElement(new Watermark(initialTime + 4), 1, 1); + testHarness.processElement(new Watermark(initialTime + 3), 2, 0); + testHarness.processElement(new Watermark(initialTime + 2), 2, 1); + + // check whether we get the minimum of all the watermarks, this must also only occur in + // the output after the two StreamRecords + expectedOutput.add(new Watermark(initialTime + 2)); + assertThat(testHarness.getOutput(), contains(expectedOutput.toArray())); + + // advance watermark from one of the inputs, now we should get a new one since the + // minimum increases + testHarness.processElement(new Watermark(initialTime + 4), 2, 1); + expectedOutput.add(new Watermark(initialTime + 3)); + assertThat(testHarness.getOutput(), contains(expectedOutput.toArray())); + + // advance the other two inputs, now we should get a new one since the + // minimum increases again + testHarness.processElement(new Watermark(initialTime + 4), 0, 1); + testHarness.processElement(new Watermark(initialTime + 4), 1, 0); + testHarness.processElement(new Watermark(initialTime + 4), 2, 0); + expectedOutput.add(new Watermark(initialTime + 4)); + assertThat(testHarness.getOutput(), contains(expectedOutput.toArray())); + + List resultElements = TestHarnessUtil.getRawElementsFromOutput(testHarness.getOutput()); + assertEquals(2, resultElements.size()); + } + } + + /** + * This test verifies that watermarks and stream statuses are correctly forwarded. This also checks whether + * watermarks are forwarded only when we have received watermarks from all inputs. The + * forwarded watermark must be the minimum of the watermarks of all active inputs. + */ + @Test + public void testWatermarkAndStreamStatusForwarding() throws Exception { + try (StreamTaskMailboxTestHarness testHarness = + new MultipleInputStreamTaskTestHarnessBuilder<>(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO) + .addInput(BasicTypeInfo.STRING_TYPE_INFO, 2) + .addInput(BasicTypeInfo.INT_TYPE_INFO, 2) + .addInput(BasicTypeInfo.DOUBLE_TYPE_INFO, 2) + .setupOutputForSingletonOperatorChain(new MapToStringMultipleInputOperatorFactory()) + .build()) { + ArrayDeque expectedOutput = new ArrayDeque<>(); + + long initialTime = 0L; + + // test whether idle input channels are acknowledged correctly when forwarding watermarks + testHarness.processElement(StreamStatus.IDLE, 0, 1); + testHarness.processElement(StreamStatus.IDLE, 1, 1); + testHarness.processElement(StreamStatus.IDLE, 2, 0); + testHarness.processElement(new Watermark(initialTime + 6), 0, 0); + testHarness.processElement(new Watermark(initialTime + 6), 1, 0); + testHarness.processElement(new Watermark(initialTime + 5), 2, 1); // this watermark should be advanced first + testHarness.processElement(StreamStatus.IDLE, 2, 1); // once this is acknowledged, + + expectedOutput.add(new Watermark(initialTime + 5)); + // We don't expect to see Watermark(6) here because the idle status of one + // input doesn't propagate to the other input. That is, if input 1 is at WM 6 and input + // two was at WM 5 before going to IDLE then the output watermark will not jump to WM 6. + assertThat(testHarness.getOutput(), contains(expectedOutput.toArray())); + + // make all input channels idle and check that the operator's idle status is forwarded + testHarness.processElement(StreamStatus.IDLE, 0, 0); + testHarness.processElement(StreamStatus.IDLE, 1, 0); + expectedOutput.add(StreamStatus.IDLE); + assertThat(testHarness.getOutput(), contains(expectedOutput.toArray())); + + // make some input channels active again and check that the operator's active status is forwarded only once + testHarness.processElement(StreamStatus.ACTIVE, 1, 0); + testHarness.processElement(StreamStatus.ACTIVE, 0, 1); + expectedOutput.add(StreamStatus.ACTIVE); + assertThat(testHarness.getOutput(), contains(expectedOutput.toArray())); + } + } + + @Test + @SuppressWarnings("unchecked") + public void testWatermarkMetrics() throws Exception { + OperatorID headOperatorId = new OperatorID(); + OperatorID chainedOperatorId = new OperatorID(); + + InterceptingOperatorMetricGroup headOperatorMetricGroup = new InterceptingOperatorMetricGroup(); + InterceptingOperatorMetricGroup chainedOperatorMetricGroup = new InterceptingOperatorMetricGroup(); + InterceptingTaskMetricGroup taskMetricGroup = new InterceptingTaskMetricGroup() { + @Override + public OperatorMetricGroup getOrAddOperator(OperatorID id, String name) { + if (id.equals(headOperatorId)) { + return headOperatorMetricGroup; + } else if (id.equals(chainedOperatorId)) { + return chainedOperatorMetricGroup; + } else { + return super.getOrAddOperator(id, name); + } + } + }; + + try (StreamTaskMailboxTestHarness testHarness = + new MultipleInputStreamTaskTestHarnessBuilder<>(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO) + .addInput(BasicTypeInfo.STRING_TYPE_INFO) + .addInput(BasicTypeInfo.INT_TYPE_INFO) + .addInput(BasicTypeInfo.DOUBLE_TYPE_INFO) + .setupOperatorChain(headOperatorId, new MapToStringMultipleInputOperatorFactory()) + .chain( + chainedOperatorId, + new WatermarkMetricOperator(), + BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig())) + .finish() + .setTaskMetricGroup(taskMetricGroup) + .build()) { + Gauge taskInputWatermarkGauge = (Gauge) taskMetricGroup.get(MetricNames.IO_CURRENT_INPUT_WATERMARK); + Gauge headInput1WatermarkGauge = (Gauge) headOperatorMetricGroup.get(MetricNames.currentInputWatermarkName(1)); + Gauge headInput2WatermarkGauge = (Gauge) headOperatorMetricGroup.get(MetricNames.currentInputWatermarkName(2)); + Gauge headInput3WatermarkGauge = (Gauge) headOperatorMetricGroup.get(MetricNames.currentInputWatermarkName(3)); + Gauge headInputWatermarkGauge = (Gauge) headOperatorMetricGroup.get(MetricNames.IO_CURRENT_INPUT_WATERMARK); + Gauge headOutputWatermarkGauge = (Gauge) headOperatorMetricGroup.get(MetricNames.IO_CURRENT_OUTPUT_WATERMARK); + Gauge chainedInputWatermarkGauge = (Gauge) chainedOperatorMetricGroup.get(MetricNames.IO_CURRENT_INPUT_WATERMARK); + Gauge chainedOutputWatermarkGauge = (Gauge) chainedOperatorMetricGroup.get(MetricNames.IO_CURRENT_OUTPUT_WATERMARK); + + assertEquals(Long.MIN_VALUE, taskInputWatermarkGauge.getValue().longValue()); + assertEquals(Long.MIN_VALUE, headInputWatermarkGauge.getValue().longValue()); + assertEquals(Long.MIN_VALUE, headInput1WatermarkGauge.getValue().longValue()); + assertEquals(Long.MIN_VALUE, headInput2WatermarkGauge.getValue().longValue()); + assertEquals(Long.MIN_VALUE, headInput3WatermarkGauge.getValue().longValue()); + assertEquals(Long.MIN_VALUE, headOutputWatermarkGauge.getValue().longValue()); + assertEquals(Long.MIN_VALUE, chainedInputWatermarkGauge.getValue().longValue()); + assertEquals(Long.MIN_VALUE, chainedOutputWatermarkGauge.getValue().longValue()); + + testHarness.processElement(new Watermark(1L), 0); + assertEquals(Long.MIN_VALUE, taskInputWatermarkGauge.getValue().longValue()); + assertEquals(Long.MIN_VALUE, headInputWatermarkGauge.getValue().longValue()); + assertEquals(1L, headInput1WatermarkGauge.getValue().longValue()); + assertEquals(Long.MIN_VALUE, headInput2WatermarkGauge.getValue().longValue()); + assertEquals(Long.MIN_VALUE, headInput3WatermarkGauge.getValue().longValue()); + assertEquals(Long.MIN_VALUE, headOutputWatermarkGauge.getValue().longValue()); + assertEquals(Long.MIN_VALUE, chainedInputWatermarkGauge.getValue().longValue()); + assertEquals(Long.MIN_VALUE, chainedOutputWatermarkGauge.getValue().longValue()); + + testHarness.processElement(new Watermark(2L), 1); + assertEquals(Long.MIN_VALUE, taskInputWatermarkGauge.getValue().longValue()); + assertEquals(Long.MIN_VALUE, headInputWatermarkGauge.getValue().longValue()); + assertEquals(1L, headInput1WatermarkGauge.getValue().longValue()); + assertEquals(2L, headInput2WatermarkGauge.getValue().longValue()); + assertEquals(Long.MIN_VALUE, headInput3WatermarkGauge.getValue().longValue()); + assertEquals(Long.MIN_VALUE, headOutputWatermarkGauge.getValue().longValue()); + assertEquals(Long.MIN_VALUE, chainedInputWatermarkGauge.getValue().longValue()); + assertEquals(Long.MIN_VALUE, chainedOutputWatermarkGauge.getValue().longValue()); + + testHarness.processElement(new Watermark(2L), 2); + assertEquals(1L, taskInputWatermarkGauge.getValue().longValue()); + assertEquals(1L, headInputWatermarkGauge.getValue().longValue()); + assertEquals(1L, headInput1WatermarkGauge.getValue().longValue()); + assertEquals(2L, headInput2WatermarkGauge.getValue().longValue()); + assertEquals(2L, headInput3WatermarkGauge.getValue().longValue()); + assertEquals(1L, headOutputWatermarkGauge.getValue().longValue()); + assertEquals(1L, chainedInputWatermarkGauge.getValue().longValue()); + assertEquals(2L, chainedOutputWatermarkGauge.getValue().longValue()); + + testHarness.processElement(new Watermark(4L), 0); + testHarness.processElement(new Watermark(3L), 1); + assertEquals(2L, taskInputWatermarkGauge.getValue().longValue()); + assertEquals(2L, headInputWatermarkGauge.getValue().longValue()); + assertEquals(4L, headInput1WatermarkGauge.getValue().longValue()); + assertEquals(3L, headInput2WatermarkGauge.getValue().longValue()); + assertEquals(2L, headInput3WatermarkGauge.getValue().longValue()); + assertEquals(2L, headOutputWatermarkGauge.getValue().longValue()); + assertEquals(2L, chainedInputWatermarkGauge.getValue().longValue()); + assertEquals(4L, chainedOutputWatermarkGauge.getValue().longValue()); + + testHarness.endInput(); + testHarness.waitForTaskCompletion(); + } + } + + /** + * Tests the checkpoint related metrics are registered into {@link TaskIOMetricGroup} + * correctly while generating the {@link TwoInputStreamTask}. + */ + @Test + public void testCheckpointBarrierMetrics() throws Exception { + final Map metrics = new ConcurrentHashMap<>(); + final TaskMetricGroup taskMetricGroup = new StreamTaskTestHarness.TestTaskMetricGroup(metrics); + + try (StreamTaskMailboxTestHarness testHarness = + new MultipleInputStreamTaskTestHarnessBuilder<>(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO) + .addInput(BasicTypeInfo.STRING_TYPE_INFO, 2) + .addInput(BasicTypeInfo.INT_TYPE_INFO, 2) + .addInput(BasicTypeInfo.DOUBLE_TYPE_INFO, 2) + .setupOutputForSingletonOperatorChain(new MapToStringMultipleInputOperatorFactory()) + .setTaskMetricGroup(taskMetricGroup) + .build()) { + + assertThat(metrics, IsMapContaining.hasKey(MetricNames.CHECKPOINT_ALIGNMENT_TIME)); + assertThat(metrics, IsMapContaining.hasKey(MetricNames.CHECKPOINT_START_DELAY_TIME)); + + testHarness.endInput(); + testHarness.waitForTaskCompletion(); + } + } + private static class MapToStringMultipleInputOperator extends AbstractStreamOperatorV2 implements MultipleInputStreamOperator { private static final long serialVersionUID = 1L; @@ -389,16 +646,20 @@ public class MultipleInputStreamTaskTest { @Override public List getInputs() { return Arrays.asList( - new MapToStringInput(), - new MapToStringInput(), - new MapToStringInput()); + new MapToStringInput(this, 1), + new MapToStringInput(this, 2), + new MapToStringInput(this, 3)); } public boolean wasCloseCalled() { return closeCalled; } - public class MapToStringInput implements Input { + public class MapToStringInput extends AbstractInput { + public MapToStringInput(AbstractStreamOperatorV2 owner, int inputId) { + super(owner, inputId); + } + @Override public void processElement(StreamRecord element) throws Exception { if (!openCalled) { diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTestHarnessBuilder.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTestHarnessBuilder.java index 3dee5edae2792cae70d42765ff743f32a8fbab88..559b934175fcc52aa0164869af69b62526d0bd59 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTestHarnessBuilder.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTestHarnessBuilder.java @@ -20,6 +20,7 @@ package org.apache.flink.streaming.runtime.tasks; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.io.network.partition.consumer.StreamTestSingleInputGate; import org.apache.flink.streaming.api.graph.StreamEdge; @@ -29,6 +30,8 @@ import org.apache.flink.streaming.api.operators.StreamOperator; import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner; import org.apache.flink.util.function.FunctionWithException; +import javax.annotation.Nullable; + import java.util.ArrayList; import java.util.LinkedList; import java.util.List; @@ -52,6 +55,14 @@ public class MultipleInputStreamTaskTestHarnessBuilder extends StreamTaskMa } public MultipleInputStreamTaskTestHarnessBuilder addInput(TypeInformation inputType, int inputChannels) { + return addInput(inputType, inputChannels, null); + } + + public MultipleInputStreamTaskTestHarnessBuilder addInput( + TypeInformation inputType, + int inputChannels, + @Nullable KeySelector keySelector) { + streamConfig.setStatePartitioner(inputSerializers.size(), keySelector); inputSerializers.add(inputType.createSerializer(executionConfig)); inputChannelsPerGate.add(inputChannels); return this; diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMailboxTestHarnessBuilder.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMailboxTestHarnessBuilder.java index b8dbd7893868145d1a3d0fa15713ed5a2ccc61ee..9b1668688fe9b524c59420420de8c1cf73831fc4 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMailboxTestHarnessBuilder.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMailboxTestHarnessBuilder.java @@ -185,5 +185,10 @@ public abstract class StreamTaskMailboxTestHarnessBuilder { this.taskMetricGroup = taskMetricGroup; return this; } + + public StreamTaskMailboxTestHarnessBuilder setKeyType(TypeInformation keyType) { + streamConfig.setStateKeySerializer(keyType.createSerializer(executionConfig)); + return this; + } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java index 9580156aac14d9c79ee36d9fafd2a26638ddb791..2a868038cceb32f19cea96cd0543b49930d8d520 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java @@ -210,7 +210,7 @@ public class TwoInputStreamTaskTest { testHarness.processElement(new Watermark(initialTime + 6), 0, 0); testHarness.processElement(new Watermark(initialTime + 5), 1, 1); // this watermark should be advanced first testHarness.processElement(StreamStatus.IDLE, 1, 1); // once this is acknowledged, - // watermark (initial + 6) should be forwarded + testHarness.waitForInputProcessing(); expectedOutput.add(new Watermark(initialTime + 5)); // We don't expect to see Watermark(6) here because the idle status of one diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestBoundedMultipleInputOperator.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestBoundedMultipleInputOperator.java index e6ebe84a8be1d9ecb1b85adb8803a260fd5fcfac..6f18b58299c1e1eaa30f7e585905e7198d8f06f3 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestBoundedMultipleInputOperator.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestBoundedMultipleInputOperator.java @@ -18,6 +18,7 @@ package org.apache.flink.streaming.util; +import org.apache.flink.streaming.api.operators.AbstractInput; import org.apache.flink.streaming.api.operators.AbstractStreamOperatorV2; import org.apache.flink.streaming.api.operators.BoundedMultiInput; import org.apache.flink.streaming.api.operators.Input; @@ -46,9 +47,9 @@ public class TestBoundedMultipleInputOperator extends AbstractStreamOperatorV2 getInputs() { return Arrays.asList( - new TestInput(1), - new TestInput(2), - new TestInput(3) + new TestInput(this, 1), + new TestInput(this, 2), + new TestInput(this, 3) ); } @@ -63,16 +64,14 @@ public class TestBoundedMultipleInputOperator extends AbstractStreamOperatorV2 { - private final int inputIndex; - - public TestInput(int inputIndex) { - this.inputIndex = inputIndex; + class TestInput extends AbstractInput { + public TestInput(AbstractStreamOperatorV2 owner, int inputId) { + super(owner, inputId); } @Override public void processElement(StreamRecord element) throws Exception { - output.collect(element.replace("[" + name + "-" + inputIndex + "]: " + element.getValue())); + output.collect(element.replace("[" + name + "-" + inputId + "]: " + element.getValue())); } } } diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/operators/KeyedProcessOperatorWithWatermarkDelayTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/operators/KeyedProcessOperatorWithWatermarkDelayTest.scala index d41945372657076b6d2eb50528594cc7d617a633..8ebdeb87a9fd2898038fcd518e31a7bbd689451b 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/operators/KeyedProcessOperatorWithWatermarkDelayTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/operators/KeyedProcessOperatorWithWatermarkDelayTest.scala @@ -38,7 +38,7 @@ class KeyedProcessOperatorWithWatermarkDelayTest extends TestLogger { val operator = new KeyedProcessOperatorWithWatermarkDelay[Integer, Integer, String]( new EmptyProcessFunction, 100) val testHarness = new KeyedOneInputStreamOperatorTestHarness[Integer, Integer, String]( - operator, new IdentityKeySelector, BasicTypeInfo.INT_TYPE_INFO) + operator, new IdentityKeySelector[Integer], BasicTypeInfo.INT_TYPE_INFO) testHarness.setup() testHarness.open() diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/MultipleInputITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/MultipleInputITCase.java index 32b64249acb4d585d15b38b6eda256e8ce39abfb..02181124e3e50cb42742616a4c3eec998ff10368 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/MultipleInputITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/MultipleInputITCase.java @@ -21,6 +21,7 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.MultipleConnectedStreams; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.operators.AbstractInput; import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory; import org.apache.flink.streaming.api.operators.AbstractStreamOperatorV2; import org.apache.flink.streaming.api.operators.Input; @@ -94,15 +95,19 @@ public class MultipleInputITCase extends AbstractTestBase { @Override public List getInputs() { return Arrays.asList( - new SumInput(), - new SumInput(), - new SumInput()); + new SumInput(this, 1), + new SumInput(this, 2), + new SumInput(this, 3)); } /** * Summing input for {@link SumAllInputOperator}. */ - public class SumInput implements Input { + public class SumInput extends AbstractInput { + public SumInput(AbstractStreamOperatorV2 owner, int inputId) { + super(owner, inputId); + } + @Override public void processElement(StreamRecord element) throws Exception { sum += Long.valueOf(element.getValue().toString());