提交 158e4478 编写于 作者: P Piotr Nowojski 提交者: Piotr Nowojski

[FLINK-16317][operators] Provide support for watermarks in MultipleInputStreamOperator

上级 5b3d4eb3
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.operators;
import org.apache.flink.annotation.Experimental;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import static org.apache.flink.util.Preconditions.checkArgument;
/**
* Base abstract implementation of {@link Input} interface intended to be used when extending
* {@link AbstractStreamOperatorV2}.
*/
@Experimental
public abstract class AbstractInput<IN, OUT> implements Input<IN> {
protected final AbstractStreamOperatorV2<OUT> owner;
protected final int inputId;
protected final Output<StreamRecord<OUT>> output;
public AbstractInput(AbstractStreamOperatorV2<OUT> owner, int inputId) {
checkArgument(inputId > 0, "Inputs are index from 1");
this.owner = owner;
this.inputId = inputId;
this.output = owner.output;
}
@Override
public void processWatermark(Watermark mark) throws Exception {
owner.reportWatermark(mark, inputId);
}
}
......@@ -465,7 +465,7 @@ public abstract class AbstractStreamOperatorV2<OUT> implements StreamOperator<OU
}
protected void reportWatermark(Watermark mark, int inputId) throws Exception {
inputWatermarks[inputId] = mark.getTimestamp();
inputWatermarks[inputId - 1] = mark.getTimestamp();
long newMin = mark.getTimestamp();
for (long inputWatermark : inputWatermarks) {
newMin = Math.min(inputWatermark, newMin);
......
......@@ -19,6 +19,7 @@
package org.apache.flink.streaming.api.operators;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
/**
......@@ -31,4 +32,12 @@ public interface Input<IN> {
* This method is guaranteed to not be called concurrently with other methods of the operator.
*/
void processElement(StreamRecord<IN> element) throws Exception;
/**
* Processes a {@link Watermark} that arrived on the first input of this two-input operator.
* This method is guaranteed to not be called concurrently with other methods of the operator.
*
* @see org.apache.flink.streaming.api.watermark.Watermark
*/
void processWatermark(Watermark mark) throws Exception;
}
......@@ -81,13 +81,14 @@ public final class StreamMultipleInputProcessor implements StreamInputProcessor
this.inputSelectionHandler = checkNotNull(inputSelectionHandler);
List<Input> inputs = streamOperator.getInputs();
int operatorsCount = inputs.size();
int inputsCount = inputs.size();
this.inputProcessors = new InputProcessor[operatorsCount];
this.streamStatuses = new StreamStatus[operatorsCount];
this.inputProcessors = new InputProcessor[inputsCount];
this.streamStatuses = new StreamStatus[inputsCount];
this.numRecordsIn = numRecordsIn;
for (int i = 0; i < operatorsCount; i++) {
for (int i = 0; i < inputsCount; i++) {
streamStatuses[i] = StreamStatus.ACTIVE;
StreamTaskNetworkOutput dataOutput = new StreamTaskNetworkOutput<>(
inputs.get(i),
streamStatusMaintainer,
......@@ -282,7 +283,8 @@ public final class StreamMultipleInputProcessor implements StreamInputProcessor
@Override
public void emitWatermark(Watermark watermark) throws Exception {
throw new UnsupportedOperationException();
inputWatermarkGauge.setCurrentWatermark(watermark.getTimestamp());
input.processWatermark(watermark);
}
@Override
......
......@@ -63,7 +63,7 @@ public class MultipleInputStreamTask<OUT> extends StreamTask<OUT, MultipleInputS
for (int i = 0; i < inputDeserializers.length; i++) {
inputLists[i] = new ArrayList<>();
watermarkGauges[i] = new WatermarkGauge();
headOperator.getMetricGroup().gauge(MetricNames.currentInputWatermarkName(i), watermarkGauges[i]);
headOperator.getMetricGroup().gauge(MetricNames.currentInputWatermarkName(i + 1), watermarkGauges[i]);
}
MinWatermarkGauge minInputWatermarkGauge = new MinWatermarkGauge(watermarkGauges);
......
......@@ -21,14 +21,21 @@ package org.apache.flink.streaming.runtime.tasks;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.Metric;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.metrics.MetricNames;
import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.metrics.util.InterceptingOperatorMetricGroup;
import org.apache.flink.runtime.metrics.util.InterceptingTaskMetricGroup;
import org.apache.flink.streaming.api.operators.AbstractInput;
import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.AbstractStreamOperatorV2;
import org.apache.flink.streaming.api.operators.Input;
......@@ -36,17 +43,25 @@ import org.apache.flink.streaming.api.operators.MultipleInputStreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
import org.apache.flink.streaming.api.operators.co.CoStreamMap;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.io.InputStatus;
import org.apache.flink.streaming.runtime.io.StreamMultipleInputProcessor;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTest.WatermarkMetricOperator;
import org.apache.flink.streaming.util.TestBoundedMultipleInputOperator;
import org.apache.flink.streaming.util.TestHarnessUtil;
import org.hamcrest.collection.IsEmptyCollection;
import org.hamcrest.collection.IsMapContaining;
import org.junit.Assert;
import org.junit.Test;
import java.util.ArrayDeque;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
......@@ -57,7 +72,6 @@ import static org.junit.Assert.assertEquals;
* {@link StreamMultipleInputProcessor}.
*/
public class MultipleInputStreamTaskTest {
/**
* This test verifies that open() and close() are correctly called. This test also verifies
* that timestamps of emitted elements are correct. {@link CoStreamMap} assigns the input
......@@ -257,10 +271,17 @@ public class MultipleInputStreamTaskTest {
@Override
public List<Input> getInputs() {
return Arrays.asList(new DuplicatingInput(), new DuplicatingInput(), new DuplicatingInput());
return Arrays.asList(
new DuplicatingInput(this, 1),
new DuplicatingInput(this, 2),
new DuplicatingInput(this, 3));
}
class DuplicatingInput extends AbstractInput<String, String> {
public DuplicatingInput(AbstractStreamOperatorV2<String> owner, int inputId) {
super(owner, inputId);
}
class DuplicatingInput implements Input<String> {
@Override
public void processElement(StreamRecord<String> element) throws Exception {
output.collect(element);
......@@ -355,8 +376,244 @@ public class MultipleInputStreamTaskTest {
}
}
// This must only be used in one test, otherwise the static fields will be changed
// by several tests concurrently
@Test
public void testWatermark() throws Exception {
try (StreamTaskMailboxTestHarness<String> testHarness =
new MultipleInputStreamTaskTestHarnessBuilder<>(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO)
.addInput(BasicTypeInfo.STRING_TYPE_INFO, 2)
.addInput(BasicTypeInfo.INT_TYPE_INFO, 2)
.addInput(BasicTypeInfo.DOUBLE_TYPE_INFO, 2)
.setupOutputForSingletonOperatorChain(new MapToStringMultipleInputOperatorFactory())
.build()) {
ArrayDeque<Object> expectedOutput = new ArrayDeque<>();
long initialTime = 0L;
testHarness.processElement(new Watermark(initialTime), 0, 0);
testHarness.processElement(new Watermark(initialTime), 0, 1);
testHarness.processElement(new Watermark(initialTime), 1, 0);
testHarness.processElement(new Watermark(initialTime), 1, 1);
testHarness.processElement(new Watermark(initialTime), 2, 0);
assertThat(testHarness.getOutput(), IsEmptyCollection.empty());
testHarness.processElement(new Watermark(initialTime), 2, 1);
// now the watermark should have propagated, Map simply forward Watermarks
expectedOutput.add(new Watermark(initialTime));
assertThat(testHarness.getOutput(), contains(expectedOutput.toArray()));
// contrary to checkpoint barriers these elements are not blocked by watermarks
testHarness.processElement(new StreamRecord<>("Hello", initialTime), 0, 0);
testHarness.processElement(new StreamRecord<>(42, initialTime), 1, 1);
expectedOutput.add(new StreamRecord<>("Hello", initialTime));
expectedOutput.add(new StreamRecord<>("42", initialTime));
assertThat(testHarness.getOutput(), contains(expectedOutput.toArray()));
testHarness.processElement(new Watermark(initialTime + 4), 0, 0);
testHarness.processElement(new Watermark(initialTime + 3), 0, 1);
testHarness.processElement(new Watermark(initialTime + 3), 1, 0);
testHarness.processElement(new Watermark(initialTime + 4), 1, 1);
testHarness.processElement(new Watermark(initialTime + 3), 2, 0);
testHarness.processElement(new Watermark(initialTime + 2), 2, 1);
// check whether we get the minimum of all the watermarks, this must also only occur in
// the output after the two StreamRecords
expectedOutput.add(new Watermark(initialTime + 2));
assertThat(testHarness.getOutput(), contains(expectedOutput.toArray()));
// advance watermark from one of the inputs, now we should get a new one since the
// minimum increases
testHarness.processElement(new Watermark(initialTime + 4), 2, 1);
expectedOutput.add(new Watermark(initialTime + 3));
assertThat(testHarness.getOutput(), contains(expectedOutput.toArray()));
// advance the other two inputs, now we should get a new one since the
// minimum increases again
testHarness.processElement(new Watermark(initialTime + 4), 0, 1);
testHarness.processElement(new Watermark(initialTime + 4), 1, 0);
testHarness.processElement(new Watermark(initialTime + 4), 2, 0);
expectedOutput.add(new Watermark(initialTime + 4));
assertThat(testHarness.getOutput(), contains(expectedOutput.toArray()));
List<String> resultElements = TestHarnessUtil.getRawElementsFromOutput(testHarness.getOutput());
assertEquals(2, resultElements.size());
}
}
/**
* This test verifies that watermarks and stream statuses are correctly forwarded. This also checks whether
* watermarks are forwarded only when we have received watermarks from all inputs. The
* forwarded watermark must be the minimum of the watermarks of all active inputs.
*/
@Test
public void testWatermarkAndStreamStatusForwarding() throws Exception {
try (StreamTaskMailboxTestHarness<String> testHarness =
new MultipleInputStreamTaskTestHarnessBuilder<>(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO)
.addInput(BasicTypeInfo.STRING_TYPE_INFO, 2)
.addInput(BasicTypeInfo.INT_TYPE_INFO, 2)
.addInput(BasicTypeInfo.DOUBLE_TYPE_INFO, 2)
.setupOutputForSingletonOperatorChain(new MapToStringMultipleInputOperatorFactory())
.build()) {
ArrayDeque<Object> expectedOutput = new ArrayDeque<>();
long initialTime = 0L;
// test whether idle input channels are acknowledged correctly when forwarding watermarks
testHarness.processElement(StreamStatus.IDLE, 0, 1);
testHarness.processElement(StreamStatus.IDLE, 1, 1);
testHarness.processElement(StreamStatus.IDLE, 2, 0);
testHarness.processElement(new Watermark(initialTime + 6), 0, 0);
testHarness.processElement(new Watermark(initialTime + 6), 1, 0);
testHarness.processElement(new Watermark(initialTime + 5), 2, 1); // this watermark should be advanced first
testHarness.processElement(StreamStatus.IDLE, 2, 1); // once this is acknowledged,
expectedOutput.add(new Watermark(initialTime + 5));
// We don't expect to see Watermark(6) here because the idle status of one
// input doesn't propagate to the other input. That is, if input 1 is at WM 6 and input
// two was at WM 5 before going to IDLE then the output watermark will not jump to WM 6.
assertThat(testHarness.getOutput(), contains(expectedOutput.toArray()));
// make all input channels idle and check that the operator's idle status is forwarded
testHarness.processElement(StreamStatus.IDLE, 0, 0);
testHarness.processElement(StreamStatus.IDLE, 1, 0);
expectedOutput.add(StreamStatus.IDLE);
assertThat(testHarness.getOutput(), contains(expectedOutput.toArray()));
// make some input channels active again and check that the operator's active status is forwarded only once
testHarness.processElement(StreamStatus.ACTIVE, 1, 0);
testHarness.processElement(StreamStatus.ACTIVE, 0, 1);
expectedOutput.add(StreamStatus.ACTIVE);
assertThat(testHarness.getOutput(), contains(expectedOutput.toArray()));
}
}
@Test
@SuppressWarnings("unchecked")
public void testWatermarkMetrics() throws Exception {
OperatorID headOperatorId = new OperatorID();
OperatorID chainedOperatorId = new OperatorID();
InterceptingOperatorMetricGroup headOperatorMetricGroup = new InterceptingOperatorMetricGroup();
InterceptingOperatorMetricGroup chainedOperatorMetricGroup = new InterceptingOperatorMetricGroup();
InterceptingTaskMetricGroup taskMetricGroup = new InterceptingTaskMetricGroup() {
@Override
public OperatorMetricGroup getOrAddOperator(OperatorID id, String name) {
if (id.equals(headOperatorId)) {
return headOperatorMetricGroup;
} else if (id.equals(chainedOperatorId)) {
return chainedOperatorMetricGroup;
} else {
return super.getOrAddOperator(id, name);
}
}
};
try (StreamTaskMailboxTestHarness<String> testHarness =
new MultipleInputStreamTaskTestHarnessBuilder<>(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO)
.addInput(BasicTypeInfo.STRING_TYPE_INFO)
.addInput(BasicTypeInfo.INT_TYPE_INFO)
.addInput(BasicTypeInfo.DOUBLE_TYPE_INFO)
.setupOperatorChain(headOperatorId, new MapToStringMultipleInputOperatorFactory())
.chain(
chainedOperatorId,
new WatermarkMetricOperator(),
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()))
.finish()
.setTaskMetricGroup(taskMetricGroup)
.build()) {
Gauge<Long> taskInputWatermarkGauge = (Gauge<Long>) taskMetricGroup.get(MetricNames.IO_CURRENT_INPUT_WATERMARK);
Gauge<Long> headInput1WatermarkGauge = (Gauge<Long>) headOperatorMetricGroup.get(MetricNames.currentInputWatermarkName(1));
Gauge<Long> headInput2WatermarkGauge = (Gauge<Long>) headOperatorMetricGroup.get(MetricNames.currentInputWatermarkName(2));
Gauge<Long> headInput3WatermarkGauge = (Gauge<Long>) headOperatorMetricGroup.get(MetricNames.currentInputWatermarkName(3));
Gauge<Long> headInputWatermarkGauge = (Gauge<Long>) headOperatorMetricGroup.get(MetricNames.IO_CURRENT_INPUT_WATERMARK);
Gauge<Long> headOutputWatermarkGauge = (Gauge<Long>) headOperatorMetricGroup.get(MetricNames.IO_CURRENT_OUTPUT_WATERMARK);
Gauge<Long> chainedInputWatermarkGauge = (Gauge<Long>) chainedOperatorMetricGroup.get(MetricNames.IO_CURRENT_INPUT_WATERMARK);
Gauge<Long> chainedOutputWatermarkGauge = (Gauge<Long>) chainedOperatorMetricGroup.get(MetricNames.IO_CURRENT_OUTPUT_WATERMARK);
assertEquals(Long.MIN_VALUE, taskInputWatermarkGauge.getValue().longValue());
assertEquals(Long.MIN_VALUE, headInputWatermarkGauge.getValue().longValue());
assertEquals(Long.MIN_VALUE, headInput1WatermarkGauge.getValue().longValue());
assertEquals(Long.MIN_VALUE, headInput2WatermarkGauge.getValue().longValue());
assertEquals(Long.MIN_VALUE, headInput3WatermarkGauge.getValue().longValue());
assertEquals(Long.MIN_VALUE, headOutputWatermarkGauge.getValue().longValue());
assertEquals(Long.MIN_VALUE, chainedInputWatermarkGauge.getValue().longValue());
assertEquals(Long.MIN_VALUE, chainedOutputWatermarkGauge.getValue().longValue());
testHarness.processElement(new Watermark(1L), 0);
assertEquals(Long.MIN_VALUE, taskInputWatermarkGauge.getValue().longValue());
assertEquals(Long.MIN_VALUE, headInputWatermarkGauge.getValue().longValue());
assertEquals(1L, headInput1WatermarkGauge.getValue().longValue());
assertEquals(Long.MIN_VALUE, headInput2WatermarkGauge.getValue().longValue());
assertEquals(Long.MIN_VALUE, headInput3WatermarkGauge.getValue().longValue());
assertEquals(Long.MIN_VALUE, headOutputWatermarkGauge.getValue().longValue());
assertEquals(Long.MIN_VALUE, chainedInputWatermarkGauge.getValue().longValue());
assertEquals(Long.MIN_VALUE, chainedOutputWatermarkGauge.getValue().longValue());
testHarness.processElement(new Watermark(2L), 1);
assertEquals(Long.MIN_VALUE, taskInputWatermarkGauge.getValue().longValue());
assertEquals(Long.MIN_VALUE, headInputWatermarkGauge.getValue().longValue());
assertEquals(1L, headInput1WatermarkGauge.getValue().longValue());
assertEquals(2L, headInput2WatermarkGauge.getValue().longValue());
assertEquals(Long.MIN_VALUE, headInput3WatermarkGauge.getValue().longValue());
assertEquals(Long.MIN_VALUE, headOutputWatermarkGauge.getValue().longValue());
assertEquals(Long.MIN_VALUE, chainedInputWatermarkGauge.getValue().longValue());
assertEquals(Long.MIN_VALUE, chainedOutputWatermarkGauge.getValue().longValue());
testHarness.processElement(new Watermark(2L), 2);
assertEquals(1L, taskInputWatermarkGauge.getValue().longValue());
assertEquals(1L, headInputWatermarkGauge.getValue().longValue());
assertEquals(1L, headInput1WatermarkGauge.getValue().longValue());
assertEquals(2L, headInput2WatermarkGauge.getValue().longValue());
assertEquals(2L, headInput3WatermarkGauge.getValue().longValue());
assertEquals(1L, headOutputWatermarkGauge.getValue().longValue());
assertEquals(1L, chainedInputWatermarkGauge.getValue().longValue());
assertEquals(2L, chainedOutputWatermarkGauge.getValue().longValue());
testHarness.processElement(new Watermark(4L), 0);
testHarness.processElement(new Watermark(3L), 1);
assertEquals(2L, taskInputWatermarkGauge.getValue().longValue());
assertEquals(2L, headInputWatermarkGauge.getValue().longValue());
assertEquals(4L, headInput1WatermarkGauge.getValue().longValue());
assertEquals(3L, headInput2WatermarkGauge.getValue().longValue());
assertEquals(2L, headInput3WatermarkGauge.getValue().longValue());
assertEquals(2L, headOutputWatermarkGauge.getValue().longValue());
assertEquals(2L, chainedInputWatermarkGauge.getValue().longValue());
assertEquals(4L, chainedOutputWatermarkGauge.getValue().longValue());
testHarness.endInput();
testHarness.waitForTaskCompletion();
}
}
/**
* Tests the checkpoint related metrics are registered into {@link TaskIOMetricGroup}
* correctly while generating the {@link TwoInputStreamTask}.
*/
@Test
public void testCheckpointBarrierMetrics() throws Exception {
final Map<String, Metric> metrics = new ConcurrentHashMap<>();
final TaskMetricGroup taskMetricGroup = new StreamTaskTestHarness.TestTaskMetricGroup(metrics);
try (StreamTaskMailboxTestHarness<String> testHarness =
new MultipleInputStreamTaskTestHarnessBuilder<>(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO)
.addInput(BasicTypeInfo.STRING_TYPE_INFO, 2)
.addInput(BasicTypeInfo.INT_TYPE_INFO, 2)
.addInput(BasicTypeInfo.DOUBLE_TYPE_INFO, 2)
.setupOutputForSingletonOperatorChain(new MapToStringMultipleInputOperatorFactory())
.setTaskMetricGroup(taskMetricGroup)
.build()) {
assertThat(metrics, IsMapContaining.hasKey(MetricNames.CHECKPOINT_ALIGNMENT_TIME));
assertThat(metrics, IsMapContaining.hasKey(MetricNames.CHECKPOINT_START_DELAY_TIME));
testHarness.endInput();
testHarness.waitForTaskCompletion();
}
}
private static class MapToStringMultipleInputOperator
extends AbstractStreamOperatorV2<String> implements MultipleInputStreamOperator<String> {
private static final long serialVersionUID = 1L;
......@@ -389,16 +646,20 @@ public class MultipleInputStreamTaskTest {
@Override
public List<Input> getInputs() {
return Arrays.asList(
new MapToStringInput<String>(),
new MapToStringInput<Integer>(),
new MapToStringInput<Double>());
new MapToStringInput<String>(this, 1),
new MapToStringInput<Integer>(this, 2),
new MapToStringInput<Double>(this, 3));
}
public boolean wasCloseCalled() {
return closeCalled;
}
public class MapToStringInput<T> implements Input<T> {
public class MapToStringInput<T> extends AbstractInput<T, String> {
public MapToStringInput(AbstractStreamOperatorV2<String> owner, int inputId) {
super(owner, inputId);
}
@Override
public void processElement(StreamRecord<T> element) throws Exception {
if (!openCalled) {
......
......@@ -20,6 +20,7 @@ package org.apache.flink.streaming.runtime.tasks;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.partition.consumer.StreamTestSingleInputGate;
import org.apache.flink.streaming.api.graph.StreamEdge;
......@@ -29,6 +30,8 @@ import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
import org.apache.flink.util.function.FunctionWithException;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
......@@ -52,6 +55,14 @@ public class MultipleInputStreamTaskTestHarnessBuilder<OUT> extends StreamTaskMa
}
public MultipleInputStreamTaskTestHarnessBuilder<OUT> addInput(TypeInformation<?> inputType, int inputChannels) {
return addInput(inputType, inputChannels, null);
}
public MultipleInputStreamTaskTestHarnessBuilder<OUT> addInput(
TypeInformation<?> inputType,
int inputChannels,
@Nullable KeySelector<?, ?> keySelector) {
streamConfig.setStatePartitioner(inputSerializers.size(), keySelector);
inputSerializers.add(inputType.createSerializer(executionConfig));
inputChannelsPerGate.add(inputChannels);
return this;
......
......@@ -185,5 +185,10 @@ public abstract class StreamTaskMailboxTestHarnessBuilder<OUT> {
this.taskMetricGroup = taskMetricGroup;
return this;
}
public StreamTaskMailboxTestHarnessBuilder<OUT> setKeyType(TypeInformation<?> keyType) {
streamConfig.setStateKeySerializer(keyType.createSerializer(executionConfig));
return this;
}
}
......@@ -210,7 +210,7 @@ public class TwoInputStreamTaskTest {
testHarness.processElement(new Watermark(initialTime + 6), 0, 0);
testHarness.processElement(new Watermark(initialTime + 5), 1, 1); // this watermark should be advanced first
testHarness.processElement(StreamStatus.IDLE, 1, 1); // once this is acknowledged,
// watermark (initial + 6) should be forwarded
testHarness.waitForInputProcessing();
expectedOutput.add(new Watermark(initialTime + 5));
// We don't expect to see Watermark(6) here because the idle status of one
......
......@@ -18,6 +18,7 @@
package org.apache.flink.streaming.util;
import org.apache.flink.streaming.api.operators.AbstractInput;
import org.apache.flink.streaming.api.operators.AbstractStreamOperatorV2;
import org.apache.flink.streaming.api.operators.BoundedMultiInput;
import org.apache.flink.streaming.api.operators.Input;
......@@ -46,9 +47,9 @@ public class TestBoundedMultipleInputOperator extends AbstractStreamOperatorV2<S
@Override
public List<Input> getInputs() {
return Arrays.asList(
new TestInput(1),
new TestInput(2),
new TestInput(3)
new TestInput(this, 1),
new TestInput(this, 2),
new TestInput(this, 3)
);
}
......@@ -63,16 +64,14 @@ public class TestBoundedMultipleInputOperator extends AbstractStreamOperatorV2<S
super.close();
}
class TestInput implements Input<String> {
private final int inputIndex;
public TestInput(int inputIndex) {
this.inputIndex = inputIndex;
class TestInput extends AbstractInput<String, String> {
public TestInput(AbstractStreamOperatorV2<String> owner, int inputId) {
super(owner, inputId);
}
@Override
public void processElement(StreamRecord<String> element) throws Exception {
output.collect(element.replace("[" + name + "-" + inputIndex + "]: " + element.getValue()));
output.collect(element.replace("[" + name + "-" + inputId + "]: " + element.getValue()));
}
}
}
......@@ -38,7 +38,7 @@ class KeyedProcessOperatorWithWatermarkDelayTest extends TestLogger {
val operator = new KeyedProcessOperatorWithWatermarkDelay[Integer, Integer, String](
new EmptyProcessFunction, 100)
val testHarness = new KeyedOneInputStreamOperatorTestHarness[Integer, Integer, String](
operator, new IdentityKeySelector, BasicTypeInfo.INT_TYPE_INFO)
operator, new IdentityKeySelector[Integer], BasicTypeInfo.INT_TYPE_INFO)
testHarness.setup()
testHarness.open()
......
......@@ -21,6 +21,7 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.MultipleConnectedStreams;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.operators.AbstractInput;
import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.AbstractStreamOperatorV2;
import org.apache.flink.streaming.api.operators.Input;
......@@ -94,15 +95,19 @@ public class MultipleInputITCase extends AbstractTestBase {
@Override
public List<Input> getInputs() {
return Arrays.asList(
new SumInput<Integer>(),
new SumInput<Long>(),
new SumInput<String>());
new SumInput<Integer>(this, 1),
new SumInput<Long>(this, 2),
new SumInput<String>(this, 3));
}
/**
* Summing input for {@link SumAllInputOperator}.
*/
public class SumInput<T> implements Input<T> {
public class SumInput<T> extends AbstractInput<T, Long> {
public SumInput(AbstractStreamOperatorV2<Long> owner, int inputId) {
super(owner, inputId);
}
@Override
public void processElement(StreamRecord<T> element) throws Exception {
sum += Long.valueOf(element.getValue().toString());
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册