未验证 提交 a1dfbf59 编写于 作者: D Dawid Wysakowicz 提交者: Aljoscha Krettek

[FLINK-20517] Add test for mixed-inpput operations in BATCH execution mode

上级 9bec3359
......@@ -23,6 +23,8 @@ import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
......@@ -35,12 +37,22 @@ import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.MultipleConnectedStreams;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
import org.apache.flink.streaming.api.operators.AbstractInput;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.AbstractStreamOperatorV2;
import org.apache.flink.streaming.api.operators.Input;
import org.apache.flink.streaming.api.operators.MultipleInputStreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.api.transformations.KeyedMultipleInputTransformation;
import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.util.CloseableIterator;
......@@ -415,6 +427,126 @@ public class DataStreamBatchExecutionITCase {
}
}
@Test
public void batchMixedKeyedAndNonKeyedTwoInputOperator() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
DataStream<Tuple2<String, Integer>> bcInput =
env.fromElements(Tuple2.of("bc3", 3), Tuple2.of("bc2", 2), Tuple2.of("bc1", 1))
.assignTimestampsAndWatermarks(
WatermarkStrategy.<Tuple2<String, Integer>>forMonotonousTimestamps()
.withTimestampAssigner((in, ts) -> in.f1))
.broadcast();
DataStream<Tuple2<String, Integer>> regularInput =
env.fromElements(
Tuple2.of("regular1", 1),
Tuple2.of("regular1", 2),
Tuple2.of("regular1", 3),
Tuple2.of("regular1", 4),
Tuple2.of("regular2", 3),
Tuple2.of("regular2", 5),
Tuple2.of("regular1", 3))
.assignTimestampsAndWatermarks(
WatermarkStrategy.<Tuple2<String, Integer>>forMonotonousTimestamps()
.withTimestampAssigner((in, ts) -> in.f1))
.keyBy(input -> input.f0);
TwoInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>, String>
twoInputTransformation =
new TwoInputTransformation<>(
regularInput.getTransformation(),
bcInput.getTransformation(),
"operator",
new TestMixedTwoInputOperator(),
BasicTypeInfo.STRING_TYPE_INFO,
1);
twoInputTransformation.setStateKeyType(BasicTypeInfo.STRING_TYPE_INFO);
twoInputTransformation.setStateKeySelectors(input -> input.f0, null);
DataStream<String> result = new DataStream<>(env, twoInputTransformation);
try (CloseableIterator<String> resultIterator = result.executeAndCollect()) {
List<String> results = CollectionUtil.iteratorToList(resultIterator);
assertThat(
results,
equalTo(
Arrays.asList(
"(regular1,1): [bc3, bc2, bc1]",
"(regular1,2): [bc3, bc2, bc1]",
"(regular1,3): [bc3, bc2, bc1]",
"(regular1,3): [bc3, bc2, bc1]",
"(regular1,4): [bc3, bc2, bc1]",
"(regular2,3): [bc3, bc2, bc1]",
"(regular2,5): [bc3, bc2, bc1]")));
}
}
@Test
public void batchMixedKeyedAndNonKeyedMultiInputOperator() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
DataStream<Tuple2<String, Integer>> bc1Input =
env.fromElements(Tuple2.of("bc3", 3), Tuple2.of("bc2", 2))
.assignTimestampsAndWatermarks(
WatermarkStrategy.<Tuple2<String, Integer>>forMonotonousTimestamps()
.withTimestampAssigner((in, ts) -> in.f1))
.broadcast();
DataStream<Tuple2<String, Integer>> bc2Input =
env.fromElements(Tuple2.of("bc1", 1))
.assignTimestampsAndWatermarks(
WatermarkStrategy.<Tuple2<String, Integer>>forMonotonousTimestamps()
.withTimestampAssigner((in, ts) -> in.f1))
.broadcast();
DataStream<Tuple2<String, Integer>> regularInput =
env.fromElements(
Tuple2.of("regular1", 1),
Tuple2.of("regular1", 2),
Tuple2.of("regular1", 3),
Tuple2.of("regular1", 4),
Tuple2.of("regular2", 3),
Tuple2.of("regular2", 5),
Tuple2.of("regular1", 3))
.assignTimestampsAndWatermarks(
WatermarkStrategy.<Tuple2<String, Integer>>forMonotonousTimestamps()
.withTimestampAssigner((in, ts) -> in.f1))
.keyBy(input -> input.f0);
KeyedMultipleInputTransformation<String> multipleInputTransformation =
new KeyedMultipleInputTransformation<>(
"operator",
mixedInputsOperatorFactory,
BasicTypeInfo.STRING_TYPE_INFO,
1,
BasicTypeInfo.STRING_TYPE_INFO);
multipleInputTransformation.addInput(
regularInput.getTransformation(), input -> ((Tuple2<String, Integer>) input).f0);
multipleInputTransformation.addInput(bc1Input.getTransformation(), null);
multipleInputTransformation.addInput(bc2Input.getTransformation(), null);
DataStream<String> result =
new MultipleConnectedStreams(env).transform(multipleInputTransformation);
try (CloseableIterator<String> resultIterator = result.executeAndCollect()) {
List<String> results = CollectionUtil.iteratorToList(resultIterator);
assertThat(
results,
equalTo(
Arrays.asList(
"(regular1,1): [bc3, bc2, bc1]",
"(regular1,2): [bc3, bc2, bc1]",
"(regular1,3): [bc3, bc2, bc1]",
"(regular1,3): [bc3, bc2, bc1]",
"(regular1,4): [bc3, bc2, bc1]",
"(regular2,3): [bc3, bc2, bc1]",
"(regular2,5): [bc3, bc2, bc1]")));
}
}
private StreamExecutionEnvironment getExecutionEnvironment() {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
......@@ -470,6 +602,9 @@ public class DataStreamBatchExecutionITCase {
static final ValueStateDescriptor<String> KEYED_STATE_DESCRIPTOR =
new ValueStateDescriptor<>("keyed-state", StringSerializer.INSTANCE);
static final ListStateDescriptor<String> LIST_STATE_DESCRIPTOR =
new ListStateDescriptor<>("bc-list-input", StringSerializer.INSTANCE);
private static class TestKeyedBroadcastFunction
extends KeyedBroadcastProcessFunction<
String, Tuple2<String, Integer>, Tuple2<String, Integer>, String> {
......@@ -537,4 +672,88 @@ public class DataStreamBatchExecutionITCase {
new StreamRecord<>(element.getValue().toString(), element.getTimestamp()));
}
}
private static final class TestMixedTwoInputOperator extends AbstractStreamOperator<String>
implements TwoInputStreamOperator<
Tuple2<String, Integer>, Tuple2<String, Integer>, String> {
@Override
public void processElement1(StreamRecord<Tuple2<String, Integer>> element)
throws Exception {
ListState<String> operatorState =
getOperatorStateBackend().getListState(LIST_STATE_DESCRIPTOR);
output.collect(
new StreamRecord<>(element.getValue() + ": " + operatorState.get().toString()));
}
@Override
public void processElement2(StreamRecord<Tuple2<String, Integer>> element)
throws Exception {
ListState<String> operatorState =
getOperatorStateBackend().getListState(LIST_STATE_DESCRIPTOR);
operatorState.add(element.getValue().f0);
}
}
@SuppressWarnings({"unchecked", "rawtypes"})
private static final AbstractStreamOperatorFactory<String> mixedInputsOperatorFactory =
new AbstractStreamOperatorFactory<String>() {
@Override
public <T extends StreamOperator<String>> T createStreamOperator(
StreamOperatorParameters<String> parameters) {
return (T) new TestMixedMultipleInputOperator(parameters);
}
@Override
public Class<? extends StreamOperator> getStreamOperatorClass(
ClassLoader classLoader) {
return TestMixedMultipleInputOperator.class;
}
};
private static class TestMixedMultipleInputOperator extends AbstractStreamOperatorV2<String>
implements MultipleInputStreamOperator<String> {
public TestMixedMultipleInputOperator(StreamOperatorParameters<String> parameters) {
super(parameters, 3);
}
@Override
@SuppressWarnings({"rawtypes"})
public List<Input> getInputs() {
return Arrays.asList(
new AbstractInput<Tuple2<String, Integer>, String>(this, 1) {
@Override
public void processElement(StreamRecord<Tuple2<String, Integer>> element)
throws Exception {
ListState<String> operatorState =
getOperatorStateBackend().getListState(LIST_STATE_DESCRIPTOR);
output.collect(
new StreamRecord<>(
element.getValue()
+ ": "
+ operatorState.get().toString()));
}
},
new AbstractInput<Tuple2<String, Integer>, String>(this, 2) {
@Override
public void processElement(StreamRecord<Tuple2<String, Integer>> element)
throws Exception {
ListState<String> operatorState =
getOperatorStateBackend().getListState(LIST_STATE_DESCRIPTOR);
operatorState.add(element.getValue().f0);
}
},
new AbstractInput<Tuple2<String, Integer>, String>(this, 3) {
@Override
public void processElement(StreamRecord<Tuple2<String, Integer>> element)
throws Exception {
ListState<String> operatorState =
getOperatorStateBackend().getListState(LIST_STATE_DESCRIPTOR);
operatorState.add(element.getValue().f0);
}
});
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册