[FLINK-20491] Add broadcast operators for BATCH execution mode

This adds an ITCase because we need to check that all the components
work together and that the wiring works correctly.

This uses the previously added funtionality to specify that given inputs
should be processed before other inputs and should not be sorted.
上级 524a1070
......@@ -237,6 +237,35 @@ next key.
See [FLIP-140](https://cwiki.apache.org/confluence/x/kDh4CQ) for background
information on this.
### Order of Processing
The order in which records are processed in operators or user-defined functions (UDFs) can differ between `BATCH` and `STREAMING` execution.
In `STREAMING` mode, user-defined functions should not make any assumptions about incoming records' order.
Data is processed as soon as it arrives.
In `BATCH` execution mode, there are some operations where Flink guarantees order.
The ordering can be a side effect of the particular task scheduling,
network shuffle, and state backend (see above), or a conscious choice by the system.
There are three general types of input that we can differentiate:
- _broadcast input_: input from a broadcast stream (see also [Broadcast
State]({% link dev/stream/state/broadcast_state.md %}))
- _regular input_: input that is neither broadcast nor keyed
- _keyed input_: input from a `KeyedStream`
Functions, or Operators, that consume multiple input types will process them in the following order:
- broadcast inputs are processed first
- regular inputs are processed second
- keyed inputs are processed last
For functions that consume from multiple regular or broadcast inputs — such as a `CoProcessFunction` — Flink has the right to process data from any input of that type in any order.
For functions that consume from multiple keyed inputs — such as a `KeyedCoProcessFunction` — Flink processes all records for a single key from all keyed inputs before moving on to the next.
### Event Time / Watermarks
When it comes to supporting [event time]({% link dev/event_time.md %}), Flink’s
......@@ -329,7 +358,6 @@ Unsupported in BATCH mode:
* [Checkpointing]({% link concepts/stateful-stream-processing.md
%}#stateful-stream-processing) and any operations that depend on
checkpointing do not work.
* [Broadcast State]({% link dev/stream/state/broadcast_state.md %})
* [Iterations]({% link dev/stream/operators/index.md %}#iterate)
Custom operators should be implemented with care, otherwise they might behave
......@@ -354,26 +382,6 @@ You can still use all the [state primitives]({% link dev/stream/state/state.md
%}#working-with-state), it's just that the mechanism used for failure recovery
will be different.
### Broadcast State
This feature was introduced to allow users to implement use-cases where a
“control” stream needs to be broadcast to all downstream tasks, and the
broadcast elements, e.g. rules, need to be applied to all incoming elements
from another stream.
In this pattern, Flink provides no guarantees about the order in which the
inputs are read. Use-cases like the one above make sense in the streaming
world where jobs are expected to run for a long period with input data that are
not known in advance. In these settings, requirements may change over time
depending on the incoming data.
In the batch world though, we believe that such use-cases do not make much
sense, as the input (both the elements and the control stream) are static and
known in advance.
We plan to support a variation of that pattern for `BATCH` processing where the
broadcast side is processed first entirely in the future.
### Writing Custom Operators
{% capture custom_operator_note %}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.operators.co;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
import org.apache.flink.streaming.api.operators.BoundedMultiInput;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import java.util.List;
import static org.apache.flink.util.Preconditions.checkState;
/**
* A {@link TwoInputStreamOperator} for executing {@link KeyedBroadcastProcessFunction
* KeyedBroadcastProcessFunctions} in {@link org.apache.flink.api.common.RuntimeExecutionMode#BATCH}
* execution mode.
*
* <p>Compared to {@link CoBroadcastWithKeyedOperator} this does an additional sanity check on the
* input processing order requirement.
*
* @param <KS> The key type of the input keyed stream.
* @param <IN1> The input type of the keyed (non-broadcast) side.
* @param <IN2> The input type of the broadcast side.
* @param <OUT> The output type of the operator.
*/
@Internal
public class BatchCoBroadcastWithKeyedOperator<KS, IN1, IN2, OUT>
extends CoBroadcastWithKeyedOperator<KS, IN1, IN2, OUT> implements BoundedMultiInput {
private static final long serialVersionUID = 5926499536290284870L;
private transient volatile boolean isBroadcastSideDone = false;
public BatchCoBroadcastWithKeyedOperator(
final KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT> function,
final List<MapStateDescriptor<?, ?>> broadcastStateDescriptors) {
super(function, broadcastStateDescriptors);
}
@Override
public void endInput(int inputId) throws Exception {
if (inputId == 2) {
// finished with the broadcast side
isBroadcastSideDone = true;
}
}
@Override
public void processElement1(StreamRecord<IN1> element) throws Exception {
checkState(
isBroadcastSideDone,
"Should not process regular input before broadcast side is done.");
super.processElement1(element);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.operators.co;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.streaming.api.operators.BoundedMultiInput;
import org.apache.flink.streaming.api.operators.InputSelectable;
import org.apache.flink.streaming.api.operators.InputSelection;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import java.util.List;
import static org.apache.flink.util.Preconditions.checkState;
/**
* A {@link TwoInputStreamOperator} for executing {@link BroadcastProcessFunction
* BroadcastProcessFunctions} in {@link org.apache.flink.api.common.RuntimeExecutionMode#BATCH}
* execution mode.
*
* <p>Compared to {@link CoBroadcastWithNonKeyedOperator} this uses {@link BoundedMultiInput} and
* {@link InputSelectable} to enforce the requirement that the broadcast side is processed before
* the regular input.
*
* @param <IN1> The input type of the regular (non-broadcast) side.
* @param <IN2> The input type of the broadcast side.
* @param <OUT> The output type of the operator.
*/
@Internal
public class BatchCoBroadcastWithNonKeyedOperator<IN1, IN2, OUT>
extends CoBroadcastWithNonKeyedOperator<IN1, IN2, OUT>
implements BoundedMultiInput, InputSelectable {
private static final long serialVersionUID = -1869740381935471752L;
private transient volatile boolean isBroadcastSideDone = false;
public BatchCoBroadcastWithNonKeyedOperator(
final BroadcastProcessFunction<IN1, IN2, OUT> function,
final List<MapStateDescriptor<?, ?>> broadcastStateDescriptors) {
super(function, broadcastStateDescriptors);
}
@Override
public void endInput(int inputId) throws Exception {
if (inputId == 2) {
// finished with the broadcast side
isBroadcastSideDone = true;
}
}
@Override
public InputSelection nextSelection() {
if (!isBroadcastSideDone) {
return InputSelection.SECOND;
} else {
return InputSelection.FIRST;
}
}
@Override
public void processElement1(StreamRecord<IN1> element) throws Exception {
checkState(
isBroadcastSideDone,
"Should not process regular input before broadcast side is done.");
super.processElement1(element);
}
}
......@@ -21,6 +21,7 @@ package org.apache.flink.streaming.runtime.translators;
import org.apache.flink.annotation.Internal;
import org.apache.flink.streaming.api.graph.TransformationTranslator;
import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
import org.apache.flink.streaming.api.operators.co.BatchCoBroadcastWithNonKeyedOperator;
import org.apache.flink.streaming.api.operators.co.CoBroadcastWithNonKeyedOperator;
import org.apache.flink.streaming.api.transformations.BroadcastStateTransformation;
......@@ -46,8 +47,23 @@ public class BroadcastStateTransformationTranslator<IN1, IN2, OUT>
protected Collection<Integer> translateForBatchInternal(
final BroadcastStateTransformation<IN1, IN2, OUT> transformation,
final Context context) {
throw new UnsupportedOperationException(
"The Broadcast State Pattern is not support in BATCH execution mode.");
checkNotNull(transformation);
checkNotNull(context);
BatchCoBroadcastWithNonKeyedOperator<IN1, IN2, OUT> operator =
new BatchCoBroadcastWithNonKeyedOperator<>(
transformation.getUserFunction(),
transformation.getBroadcastStateDescriptors());
return translateInternal(
transformation,
transformation.getRegularInput(),
transformation.getBroadcastInput(),
SimpleOperatorFactory.of(operator),
null /* no key type*/,
null /* no first key selector */,
null /* no second */,
context);
}
@Override
......
......@@ -19,8 +19,10 @@
package org.apache.flink.streaming.runtime.translators;
import org.apache.flink.annotation.Internal;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.graph.TransformationTranslator;
import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
import org.apache.flink.streaming.api.operators.co.BatchCoBroadcastWithKeyedOperator;
import org.apache.flink.streaming.api.operators.co.CoBroadcastWithKeyedOperator;
import org.apache.flink.streaming.api.transformations.KeyedBroadcastStateTransformation;
......@@ -47,8 +49,32 @@ public class KeyedBroadcastStateTransformationTranslator<KEY, IN1, IN2, OUT>
protected Collection<Integer> translateForBatchInternal(
final KeyedBroadcastStateTransformation<KEY, IN1, IN2, OUT> transformation,
final Context context) {
throw new UnsupportedOperationException(
"The Broadcast State Pattern is not support in BATCH execution mode.");
checkNotNull(transformation);
checkNotNull(context);
BatchCoBroadcastWithKeyedOperator<KEY, IN1, IN2, OUT> operator =
new BatchCoBroadcastWithKeyedOperator<>(
transformation.getUserFunction(),
transformation.getBroadcastStateDescriptors());
Collection<Integer> result =
translateInternal(
transformation,
transformation.getRegularInput(),
transformation.getBroadcastInput(),
SimpleOperatorFactory.of(operator),
transformation.getStateKeyType(),
transformation.getKeySelector(),
null /* no key selector on broadcast input */,
context);
BatchExecutionUtils.applyBatchExecutionSettings(
transformation.getId(),
context,
StreamConfig.InputRequirement.SORTED,
StreamConfig.InputRequirement.PASS_THROUGH);
return result;
}
@Override
......
......@@ -19,18 +19,29 @@
package org.apache.flink.api.datastream;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.CollectionUtil;
import org.apache.flink.util.Collector;
import org.junit.ClassRule;
import org.junit.Test;
......@@ -180,6 +191,113 @@ public class DataStreamBatchExecutionITCase {
}
}
/** Verifies that all broadcast input is processed before keyed input. */
@Test
public void batchKeyedBroadcastExecution() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
DataStream<Tuple2<String, Integer>> bcInput =
env.fromElements(Tuple2.of("bc1", 1), Tuple2.of("bc2", 2), Tuple2.of("bc3", 3))
.assignTimestampsAndWatermarks(
WatermarkStrategy.<Tuple2<String, Integer>>forMonotonousTimestamps()
.withTimestampAssigner((in, ts) -> in.f1));
DataStream<Tuple2<String, Integer>> regularInput =
env.fromElements(
Tuple2.of("regular1", 1),
Tuple2.of("regular1", 2),
Tuple2.of("regular2", 2),
Tuple2.of("regular1", 3),
Tuple2.of("regular1", 4),
Tuple2.of("regular1", 3),
Tuple2.of("regular2", 5),
Tuple2.of("regular1", 5),
Tuple2.of("regular2", 3),
Tuple2.of("regular1", 3))
.assignTimestampsAndWatermarks(
WatermarkStrategy.<Tuple2<String, Integer>>forMonotonousTimestamps()
.withTimestampAssigner((in, ts) -> in.f1));
BroadcastStream<Tuple2<String, Integer>> broadcastStream =
bcInput.broadcast(STATE_DESCRIPTOR);
DataStream<String> result =
regularInput
.keyBy((input) -> input.f0)
.connect(broadcastStream)
.process(new TestKeyedBroadcastFunction());
try (CloseableIterator<String> resultIterator = result.executeAndCollect()) {
List<String> results = CollectionUtil.iteratorToList(resultIterator);
assertThat(
results,
equalTo(
Arrays.asList(
"(regular1,1): [bc2=bc2, bc1=bc1, bc3=bc3]",
"(regular1,2): [bc2=bc2, bc1=bc1, bc3=bc3]",
"(regular1,3): [bc2=bc2, bc1=bc1, bc3=bc3]",
"(regular1,3): [bc2=bc2, bc1=bc1, bc3=bc3]",
"(regular1,3): [bc2=bc2, bc1=bc1, bc3=bc3]",
"(regular1,4): [bc2=bc2, bc1=bc1, bc3=bc3]",
"(regular1,5): [bc2=bc2, bc1=bc1, bc3=bc3]",
"(regular2,2): [bc2=bc2, bc1=bc1, bc3=bc3]",
"(regular2,3): [bc2=bc2, bc1=bc1, bc3=bc3]",
"(regular2,5): [bc2=bc2, bc1=bc1, bc3=bc3]")));
}
}
/** Verifies that all broadcast input is processed before regular input. */
@Test
public void batchBroadcastExecution() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
DataStream<Tuple2<String, Integer>> bcInput =
env.fromElements(Tuple2.of("bc1", 1), Tuple2.of("bc2", 2), Tuple2.of("bc3", 3))
.assignTimestampsAndWatermarks(
WatermarkStrategy.<Tuple2<String, Integer>>forMonotonousTimestamps()
.withTimestampAssigner((in, ts) -> in.f1));
DataStream<Tuple2<String, Integer>> regularInput =
env.fromElements(
Tuple2.of("regular1", 1),
Tuple2.of("regular1", 2),
Tuple2.of("regular1", 3),
Tuple2.of("regular1", 4),
Tuple2.of("regular1", 3),
Tuple2.of("regular1", 5),
Tuple2.of("regular1", 3))
.assignTimestampsAndWatermarks(
WatermarkStrategy.<Tuple2<String, Integer>>forMonotonousTimestamps()
.withTimestampAssigner((in, ts) -> in.f1));
BroadcastStream<Tuple2<String, Integer>> broadcastStream =
bcInput.broadcast(STATE_DESCRIPTOR);
DataStream<String> result =
regularInput.connect(broadcastStream).process(new TestBroadcastFunction());
try (CloseableIterator<String> resultIterator = result.executeAndCollect()) {
List<String> results = CollectionUtil.iteratorToList(resultIterator);
// regular, that is non-keyed input is not sorted by timestamp. For keyed inputs
// this is a by-product of the grouping/sorting we use to get the keyed groups.
assertThat(
results,
equalTo(
Arrays.asList(
"(regular1,1): [bc2=bc2, bc1=bc1, bc3=bc3]",
"(regular1,2): [bc2=bc2, bc1=bc1, bc3=bc3]",
"(regular1,3): [bc2=bc2, bc1=bc1, bc3=bc3]",
"(regular1,4): [bc2=bc2, bc1=bc1, bc3=bc3]",
"(regular1,3): [bc2=bc2, bc1=bc1, bc3=bc3]",
"(regular1,5): [bc2=bc2, bc1=bc1, bc3=bc3]",
"(regular1,3): [bc2=bc2, bc1=bc1, bc3=bc3]")));
}
}
private StreamExecutionEnvironment getExecutionEnvironment() {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
......@@ -227,4 +345,61 @@ public class DataStreamBatchExecutionITCase {
return value + "-" + suffix + getRuntimeContext().getAttemptNumber();
}
}
static final MapStateDescriptor<String, String> STATE_DESCRIPTOR =
new MapStateDescriptor<>(
"bc-input", StringSerializer.INSTANCE, StringSerializer.INSTANCE);
static final ValueStateDescriptor<String> KEYED_STATE_DESCRIPTOR =
new ValueStateDescriptor<>("keyed-state", StringSerializer.INSTANCE);
private static class TestKeyedBroadcastFunction
extends KeyedBroadcastProcessFunction<
String, Tuple2<String, Integer>, Tuple2<String, Integer>, String> {
@Override
public void processElement(
Tuple2<String, Integer> value, ReadOnlyContext ctx, Collector<String> out)
throws Exception {
ReadOnlyBroadcastState<String, String> state = ctx.getBroadcastState(STATE_DESCRIPTOR);
out.collect(value + ": " + state.immutableEntries().toString());
}
@Override
public void processBroadcastElement(
Tuple2<String, Integer> value, Context ctx, Collector<String> out)
throws Exception {
BroadcastState<String, String> state = ctx.getBroadcastState(STATE_DESCRIPTOR);
state.put(value.f0, value.f0);
// iterating over keys is a no-op in BATCH execution mode
ctx.applyToKeyedState(
KEYED_STATE_DESCRIPTOR,
(key, state1) -> {
throw new RuntimeException("Shouldn't happen");
});
}
}
private static class TestBroadcastFunction
extends BroadcastProcessFunction<
Tuple2<String, Integer>, Tuple2<String, Integer>, String> {
@Override
public void processElement(
Tuple2<String, Integer> value, ReadOnlyContext ctx, Collector<String> out)
throws Exception {
ReadOnlyBroadcastState<String, String> state = ctx.getBroadcastState(STATE_DESCRIPTOR);
out.collect(value + ": " + state.immutableEntries().toString());
}
@Override
public void processBroadcastElement(
Tuple2<String, Integer> value, Context ctx, Collector<String> out)
throws Exception {
BroadcastState<String, String> state = ctx.getBroadcastState(STATE_DESCRIPTOR);
state.put(value.f0, value.f0);
}
}
}
......@@ -18,7 +18,6 @@
package org.apache.flink.test.streaming.runtime;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.java.functions.KeySelector;
......@@ -40,9 +39,7 @@ import org.junit.rules.ExpectedException;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.junit.Assert.assertEquals;
......@@ -172,44 +169,6 @@ public class BroadcastStateITCase extends AbstractTestBase {
env.execute();
}
@Test
public void testBroadcastBatchTranslationThrowsException() throws Exception {
final MapStateDescriptor<Long, Long> utterDescriptor =
new MapStateDescriptor<>(
"broadcast-state",
BasicTypeInfo.LONG_TYPE_INFO,
BasicTypeInfo.LONG_TYPE_INFO);
final List<Long> input = new ArrayList<>();
input.add(1L);
input.add(2L);
input.add(3L);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
final DataStream<Long> srcOne = env.fromCollection(input);
final DataStream<Long> srcTwo = env.fromCollection(input);
final BroadcastStream<Long> broadcast = srcTwo.broadcast(utterDescriptor);
srcOne.connect(broadcast)
.process(
new BroadcastProcessFunction<Long, Long, Long>() {
@Override
public void processElement(
Long value, ReadOnlyContext ctx, Collector<Long> out) {}
@Override
public void processBroadcastElement(
Long value, Context ctx, Collector<Long> out) {}
});
thrown.expect(UnsupportedOperationException.class);
thrown.expectMessage("The Broadcast State Pattern is not support in BATCH execution mode.");
env.execute();
}
private static class TestSink extends RichSinkFunction<String> {
private static final long serialVersionUID = 7252508825104554749L;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册