[FLINK-20491] Add per-input setting of BATCH execution requirements

This doesn't change the actual behavior, we still set the same "sorted"
setting on both inputs. We will add tests and actually change the
behavior in a follow-up commit.
上级 eb506e7d
......@@ -110,12 +110,6 @@ public class StreamConfig implements Serializable {
.withDescription(
"If state backend is specified, whether it uses managed memory.");
private static final ConfigOption<Boolean> SORTED_INPUTS =
ConfigOptions.key("sorted-inputs")
.booleanType()
.defaultValue(false)
.withDescription("A flag to enable/disable sorting inputs of keyed operators.");
// ------------------------------------------------------------------------
// Default Values
// ------------------------------------------------------------------------
......@@ -251,7 +245,7 @@ public class StreamConfig implements Serializable {
public void setupNetworkInputs(TypeSerializer<?>... serializers) {
InputConfig[] inputs = new InputConfig[serializers.length];
for (int i = 0; i < serializers.length; i++) {
inputs[i] = new NetworkInputConfig(serializers[i], i);
inputs[i] = new NetworkInputConfig(serializers[i], i, InputRequirement.PASS_THROUGH);
}
setInputs(inputs);
}
......@@ -680,14 +674,6 @@ public class StreamConfig implements Serializable {
return builder.toString();
}
public void setShouldSortInputs(boolean sortInputs) {
config.set(SORTED_INPUTS, sortInputs);
}
public boolean shouldSortInputs() {
return config.get(SORTED_INPUTS);
}
public void setGraphContainingLoops(boolean graphContainingLoops) {
config.setBoolean(GRAPH_CONTAINING_LOOPS, graphContainingLoops);
}
......@@ -696,17 +682,50 @@ public class StreamConfig implements Serializable {
return config.getBoolean(GRAPH_CONTAINING_LOOPS, false);
}
/**
* Requirements of the different inputs of an operator. Each input can have a different
* requirement. For all {@link #SORTED} inputs, records are sorted/grouped by key and all
* records of a given key are passed to the operator consecutively before moving on to the next
* group.
*/
public enum InputRequirement {
/**
* Records from all sorted inputs are grouped (sorted) by key and are then fed to the
* operator one group at a time. This "zig-zags" between different inputs if records for the
* same key arrive on multiple inputs to ensure that the operator sees all records with a
* key as one consecutive group.
*/
SORTED,
/**
* Records from {@link #PASS_THROUGH} inputs are passed to the operator before passing any
* records from {@link #SORTED} inputs. There are no guarantees on ordering between and
* within the different {@link #PASS_THROUGH} inputs.
*/
PASS_THROUGH;
}
/** Interface representing chained inputs. */
public interface InputConfig extends Serializable {}
/** A representation of a Network {@link InputConfig}. */
public static class NetworkInputConfig implements InputConfig {
private final TypeSerializer<?> typeSerializer;
private final InputRequirement inputRequirement;
private int inputGateIndex;
public NetworkInputConfig(TypeSerializer<?> typeSerializer, int inputGateIndex) {
this(typeSerializer, inputGateIndex, InputRequirement.PASS_THROUGH);
}
public NetworkInputConfig(
TypeSerializer<?> typeSerializer,
int inputGateIndex,
InputRequirement inputRequirement) {
this.typeSerializer = typeSerializer;
this.inputGateIndex = inputGateIndex;
this.inputRequirement = inputRequirement;
}
public TypeSerializer<?> getTypeSerializer() {
......@@ -716,6 +735,10 @@ public class StreamConfig implements Serializable {
public int getInputGateIndex() {
return inputGateIndex;
}
public InputRequirement getInputRequirement() {
return inputRequirement;
}
}
/** A serialized representation of an input. */
......@@ -752,4 +775,10 @@ public class StreamConfig implements Serializable {
return inputEdge.hashCode();
}
}
public static boolean requiresSorting(StreamConfig.InputConfig inputConfig) {
return inputConfig instanceof StreamConfig.NetworkInputConfig
&& ((StreamConfig.NetworkInputConfig) inputConfig).getInputRequirement()
== StreamConfig.InputRequirement.SORTED;
}
}
......@@ -734,10 +734,6 @@ public class StreamGraph implements Pipeline {
getStreamNode(vertexID).setOutputFormat(outputFormat);
}
void setSortedInputs(int vertexId, boolean shouldSort) {
getStreamNode(vertexId).setSortedInputs(shouldSort);
}
public void setTransformationUID(Integer nodeId, String transformationId) {
StreamNode node = streamNodes.get(nodeId);
if (node != null) {
......
......@@ -84,7 +84,8 @@ public class StreamNode {
private String transformationUID;
private String userHash;
private boolean sortedInputs = false;
private final Map<Integer, StreamConfig.InputRequirement> inputRequirements = new HashMap<>();
@VisibleForTesting
public StreamNode(
......@@ -343,12 +344,13 @@ public class StreamNode {
this.userHash = userHash;
}
public void setSortedInputs(boolean sortedInputs) {
this.sortedInputs = sortedInputs;
public void addInputRequirement(
int inputIndex, StreamConfig.InputRequirement inputRequirement) {
inputRequirements.put(inputIndex, inputRequirement);
}
public boolean getSortedInputs() {
return sortedInputs;
public Map<Integer, StreamConfig.InputRequirement> getInputRequirements() {
return inputRequirements;
}
public Optional<OperatorCoordinator.Provider> getCoordinatorProvider(
......
......@@ -649,16 +649,23 @@ public class StreamingJobGraphGenerator {
// network input. null if we move to a new input, non-null if this is a further edge
// that is union-ed into the same input
if (inputConfigs[inputIndex] == null) {
// PASS_THROUGH is a sensible default for streaming jobs. Only for BATCH
// execution can we have sorted inputs
StreamConfig.InputRequirement inputRequirement =
vertex.getInputRequirements()
.getOrDefault(
inputIndex, StreamConfig.InputRequirement.PASS_THROUGH);
inputConfigs[inputIndex] =
new StreamConfig.NetworkInputConfig(
inputSerializers[inputIndex], inputGateCount++);
inputSerializers[inputIndex],
inputGateCount++,
inputRequirement);
}
}
}
config.setInputs(inputConfigs);
config.setTypeSerializerOut(vertex.getTypeSerializerOut());
config.setShouldSortInputs(vertex.getSortedInputs());
// iterate edges, find sideOutput edges create and save serializers for each outputTag type
for (StreamEdge edge : chainableOutputs) {
......
......@@ -51,6 +51,7 @@ import java.util.Arrays;
import java.util.List;
import java.util.stream.IntStream;
import static org.apache.flink.streaming.api.graph.StreamConfig.requiresSorting;
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkState;
......@@ -118,31 +119,48 @@ public class StreamMultipleInputProcessorFactory {
InputSelectable inputSelectable =
mainOperator instanceof InputSelectable ? (InputSelectable) mainOperator : null;
if (streamConfig.shouldSortInputs()) {
StreamConfig.InputConfig[] inputConfigs = streamConfig.getInputs(userClassloader);
boolean anyRequiresSorting =
Arrays.stream(inputConfigs).anyMatch(StreamConfig::requiresSorting);
if (anyRequiresSorting) {
if (inputSelectable != null) {
throw new IllegalStateException(
"The InputSelectable interface is not supported with sorting inputs");
}
StreamTaskInput[] sortingInputs =
IntStream.range(0, inputsCount)
.filter(idx -> requiresSorting(inputConfigs[idx]))
.mapToObj(idx -> inputs[idx])
.toArray(StreamTaskInput[]::new);
KeySelector[] sortingInputKeySelectors =
IntStream.range(0, inputsCount)
.filter(idx -> requiresSorting(inputConfigs[idx]))
.mapToObj(idx -> streamConfig.getStatePartitioner(idx, userClassloader))
.toArray(KeySelector[]::new);
TypeSerializer[] sortingInputKeySerializers =
IntStream.range(0, inputsCount)
.filter(idx -> requiresSorting(inputConfigs[idx]))
.mapToObj(idx -> streamConfig.getTypeSerializerIn(idx, userClassloader))
.toArray(TypeSerializer[]::new);
StreamTaskInput[] passThroughInputs =
IntStream.range(0, inputsCount)
.filter(idx -> !requiresSorting(inputConfigs[idx]))
.mapToObj(idx -> inputs[idx])
.toArray(StreamTaskInput[]::new);
SelectableSortingInputs selectableSortingInputs =
MultiInputSortingDataInput.wrapInputs(
ownerTask,
inputs,
IntStream.range(0, inputsCount)
.mapToObj(
idx ->
streamConfig.getStatePartitioner(
idx, userClassloader))
.toArray(KeySelector[]::new),
IntStream.range(0, inputsCount)
.mapToObj(
idx ->
streamConfig.getTypeSerializerIn(
idx, userClassloader))
.toArray(TypeSerializer[]::new),
sortingInputs,
sortingInputKeySelectors,
sortingInputKeySerializers,
streamConfig.getStateKeySerializer(userClassloader),
new StreamTaskInput[0],
passThroughInputs,
memoryManager,
ioManager,
executionConfig.isObjectReuseEnabled(),
......@@ -152,7 +170,20 @@ public class StreamMultipleInputProcessorFactory {
userClassloader),
jobConfig);
inputs = selectableSortingInputs.getSortedInputs();
StreamTaskInput<?>[] sortedInputs = selectableSortingInputs.getSortedInputs();
StreamTaskInput<?>[] passedThroughInputs =
selectableSortingInputs.getPassThroughInputs();
int sortedIndex = 0;
int passThroughIndex = 0;
for (int i = 0; i < inputs.length; i++) {
if (requiresSorting(inputConfigs[i])) {
inputs[i] = sortedInputs[sortedIndex];
sortedIndex++;
} else {
inputs[i] = passedThroughInputs[passThroughIndex];
passThroughIndex++;
}
}
inputSelectable = selectableSortingInputs.getInputSelectable();
}
......
......@@ -43,6 +43,10 @@ import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
import org.apache.flink.util.function.ThrowingConsumer;
import java.util.ArrayList;
import java.util.List;
import static org.apache.flink.streaming.api.graph.StreamConfig.requiresSorting;
import static org.apache.flink.util.Preconditions.checkNotNull;
/** A factory for {@link StreamTwoInputProcessor}. */
......@@ -90,25 +94,47 @@ public class StreamTwoInputProcessorFactory {
InputSelectable inputSelectable =
streamOperator instanceof InputSelectable ? (InputSelectable) streamOperator : null;
if (streamConfig.shouldSortInputs()) {
// this is a bit verbose because we're manually handling input1 and input2
// TODO: extract method
StreamConfig.InputConfig[] inputConfigs = streamConfig.getInputs(userClassloader);
boolean input1IsSorted = requiresSorting(inputConfigs[0]);
boolean input2IsSorted = requiresSorting(inputConfigs[1]);
if (input1IsSorted || input2IsSorted) {
// as soon as one input requires sorting we need to treat all inputs differently, to
// make sure that pass-through inputs have precedence
if (inputSelectable != null) {
throw new IllegalStateException(
"The InputSelectable interface is not supported with sorting inputs");
}
List<StreamTaskInput<?>> sortedTaskInputs = new ArrayList<>();
List<KeySelector<?, ?>> keySelectors = new ArrayList<>();
List<StreamTaskInput<?>> passThroughTaskInputs = new ArrayList<>();
if (input1IsSorted) {
sortedTaskInputs.add(input1);
keySelectors.add(streamConfig.getStatePartitioner(0, userClassloader));
} else {
passThroughTaskInputs.add(input1);
}
if (input2IsSorted) {
sortedTaskInputs.add(input2);
keySelectors.add(streamConfig.getStatePartitioner(1, userClassloader));
} else {
passThroughTaskInputs.add(input2);
}
@SuppressWarnings("unchecked")
SelectableSortingInputs selectableSortingInputs =
MultiInputSortingDataInput.wrapInputs(
ownerTask,
new StreamTaskInput[] {input1, input2},
new KeySelector[] {
streamConfig.getStatePartitioner(0, userClassloader),
streamConfig.getStatePartitioner(1, userClassloader)
},
sortedTaskInputs.toArray(new StreamTaskInput[0]),
keySelectors.toArray(new KeySelector[0]),
new TypeSerializer[] {typeSerializer1, typeSerializer2},
streamConfig.getStateKeySerializer(userClassloader),
new StreamTaskInput[0],
passThroughTaskInputs.toArray(new StreamTaskInput[0]),
memoryManager,
ioManager,
executionConfig.isObjectReuseEnabled(),
......@@ -118,8 +144,18 @@ public class StreamTwoInputProcessorFactory {
userClassloader),
jobConfig);
inputSelectable = selectableSortingInputs.getInputSelectable();
input1 = getSortedInput(selectableSortingInputs.getSortedInputs()[0]);
input2 = getSortedInput(selectableSortingInputs.getSortedInputs()[1]);
StreamTaskInput<?>[] sortedInputs = selectableSortingInputs.getSortedInputs();
StreamTaskInput<?>[] passThroughInputs = selectableSortingInputs.getPassThroughInputs();
if (input1IsSorted) {
input1 = toTypedInput(sortedInputs[0]);
} else {
input1 = toTypedInput(passThroughInputs[0]);
}
if (input2IsSorted) {
input2 = toTypedInput(sortedInputs[sortedInputs.length - 1]);
} else {
input2 = toTypedInput(passThroughInputs[passThroughInputs.length - 1]);
}
}
StreamTaskNetworkOutput<IN1> output1 =
......@@ -151,7 +187,7 @@ public class StreamTwoInputProcessorFactory {
}
@SuppressWarnings("unchecked")
private static <IN1> StreamTaskInput<IN1> getSortedInput(StreamTaskInput<?> multiInput) {
private static <IN1> StreamTaskInput<IN1> toTypedInput(StreamTaskInput<?> multiInput) {
return (StreamTaskInput<IN1>) multiInput;
}
......
......@@ -45,6 +45,7 @@ import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
import javax.annotation.Nullable;
import static org.apache.flink.streaming.api.graph.StreamConfig.requiresSorting;
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkState;
......@@ -90,7 +91,10 @@ public class OneInputStreamTask<IN, OUT> extends StreamTask<OUT, OneInputStreamO
DataOutput<IN> output = createDataOutput(numRecordsIn);
StreamTaskInput<IN> input = createTaskInput(inputGate);
if (configuration.shouldSortInputs()) {
StreamConfig.InputConfig[] inputConfigs =
configuration.getInputs(getUserCodeClassLoader());
StreamConfig.InputConfig inputConfig = inputConfigs[0];
if (requiresSorting(inputConfig)) {
checkState(
!configuration.isCheckpointingEnabled(),
"Checkpointing is not allowed with sorted inputs.");
......
......@@ -20,6 +20,7 @@ package org.apache.flink.streaming.runtime.translators;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.core.memory.ManagedMemoryUseCase;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.graph.StreamNode;
import org.apache.flink.streaming.api.graph.TransformationTranslator;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
......@@ -40,7 +41,10 @@ import static org.apache.flink.util.Preconditions.checkState;
class BatchExecutionUtils {
private static final Logger LOG = LoggerFactory.getLogger(BatchExecutionUtils.class);
static void applySortingInputs(int transformationId, TransformationTranslator.Context context) {
static void applyBatchExecutionSettings(
int transformationId,
TransformationTranslator.Context context,
StreamConfig.InputRequirement... inputRequirements) {
StreamNode node = context.getStreamGraph().getStreamNode(transformationId);
boolean sortInputs = context.getGraphGeneratorConfig().get(ExecutionOptions.SORT_INPUTS);
boolean isInputSelectable = isInputSelectable(node);
......@@ -52,8 +56,10 @@ class BatchExecutionUtils {
"Batch state backend and sorting inputs are not supported in graphs with an InputSelectable operator.");
if (sortInputs) {
LOG.debug("Enabling sorting inputs for an operator {}.", node);
node.setSortedInputs(true);
LOG.debug("Applying sorting/pass-through input requirements for operator {}.", node);
for (int i = 0; i < inputRequirements.length; i++) {
node.addInputRequirement(i, inputRequirements[i]);
}
Map<ManagedMemoryUseCase, Integer> operatorScopeUseCaseWeights = new HashMap<>();
operatorScopeUseCaseWeights.put(ManagedMemoryUseCase.BATCH_OP, 1);
node.setManagedMemoryUseCaseWeights(
......
......@@ -23,6 +23,7 @@ import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.streaming.api.graph.SimpleTransformationTranslator;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.api.graph.TransformationTranslator;
import org.apache.flink.streaming.api.transformations.LegacySinkTransformation;
......@@ -49,7 +50,8 @@ public class LegacySinkTransformationTranslator<IN>
final Collection<Integer> ids = translateInternal(transformation, context);
boolean isKeyed = transformation.getStateKeySelector() != null;
if (isKeyed) {
BatchExecutionUtils.applySortingInputs(transformation.getId(), context);
BatchExecutionUtils.applyBatchExecutionSettings(
transformation.getId(), context, StreamConfig.InputRequirement.SORTED);
}
return ids;
}
......
......@@ -23,6 +23,7 @@ import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.streaming.api.graph.SimpleTransformationTranslator;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.api.graph.TransformationTranslator;
import org.apache.flink.streaming.api.transformations.AbstractMultipleInputTransformation;
......@@ -53,7 +54,13 @@ public class MultiInputTransformationTranslator<OUT>
Collection<Integer> ids = translateInternal(transformation, context);
boolean isKeyed = transformation instanceof KeyedMultipleInputTransformation;
if (isKeyed) {
BatchExecutionUtils.applySortingInputs(transformation.getId(), context);
List<Transformation<?>> inputs = transformation.getInputs();
StreamConfig.InputRequirement[] inputRequirements =
inputs.stream()
.map((input) -> StreamConfig.InputRequirement.SORTED)
.toArray(StreamConfig.InputRequirement[]::new);
BatchExecutionUtils.applyBatchExecutionSettings(
transformation.getId(), context, inputRequirements);
}
return ids;
}
......
......@@ -20,6 +20,7 @@ package org.apache.flink.streaming.runtime.translators;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.graph.TransformationTranslator;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
......@@ -51,7 +52,8 @@ public final class OneInputTransformationTranslator<IN, OUT>
context);
boolean isKeyed = keySelector != null;
if (isKeyed) {
BatchExecutionUtils.applySortingInputs(transformation.getId(), context);
BatchExecutionUtils.applyBatchExecutionSettings(
transformation.getId(), context, StreamConfig.InputRequirement.SORTED);
}
return ids;
......
......@@ -18,6 +18,7 @@
package org.apache.flink.streaming.runtime.translators;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.graph.TransformationTranslator;
import org.apache.flink.streaming.api.operators.BatchGroupedReduceOperator;
import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
......@@ -53,7 +54,8 @@ public class ReduceTransformationTranslator<IN, KEY>
transformation.getKeySelector(),
transformation.getKeyTypeInfo(),
context);
BatchExecutionUtils.applySortingInputs(transformation.getId(), context);
BatchExecutionUtils.applyBatchExecutionSettings(
transformation.getId(), context, StreamConfig.InputRequirement.SORTED);
return ids;
}
......
......@@ -19,6 +19,7 @@
package org.apache.flink.streaming.runtime.translators;
import org.apache.flink.annotation.Internal;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.graph.TransformationTranslator;
import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
......@@ -46,7 +47,11 @@ public class TwoInputTransformationTranslator<IN1, IN2, OUT>
transformation.getStateKeySelector1() != null
&& transformation.getStateKeySelector2() != null;
if (isKeyed) {
BatchExecutionUtils.applySortingInputs(transformation.getId(), context);
BatchExecutionUtils.applyBatchExecutionSettings(
transformation.getId(),
context,
StreamConfig.InputRequirement.SORTED,
StreamConfig.InputRequirement.SORTED);
}
return ids;
}
......
......@@ -96,7 +96,9 @@ public class StreamGraphGeneratorBatchExecutionTest extends TestLogger {
StreamGraph graph = graphGenerator.generate();
StreamNode processNode = graph.getStreamNode(process.getId());
assertThat(processNode.getSortedInputs(), equalTo(true));
assertThat(
processNode.getInputRequirements().get(0),
equalTo(StreamConfig.InputRequirement.SORTED));
assertThat(
processNode.getOperatorFactory().getChainingStrategy(),
equalTo(ChainingStrategy.HEAD));
......@@ -124,7 +126,9 @@ public class StreamGraphGeneratorBatchExecutionTest extends TestLogger {
StreamGraph graph = graphGenerator.generate();
StreamNode processNode = graph.getStreamNode(process.getId());
assertThat(processNode.getSortedInputs(), equalTo(true));
assertThat(
processNode.getInputRequirements().get(0),
equalTo(StreamConfig.InputRequirement.SORTED));
assertThat(
processNode.getOperatorFactory().getChainingStrategy(),
equalTo(ChainingStrategy.HEAD));
......@@ -152,7 +156,7 @@ public class StreamGraphGeneratorBatchExecutionTest extends TestLogger {
StreamGraph graph = graphGenerator.generate();
StreamNode processNode = graph.getStreamNode(process.getId());
assertThat(processNode.getSortedInputs(), equalTo(false));
assertThat(processNode.getInputRequirements().get(0), nullValue());
assertThat(graph.getStateBackend(), nullValue());
assertThat(graph.getTimerServiceProvider(), nullValue());
}
......@@ -201,7 +205,12 @@ public class StreamGraphGeneratorBatchExecutionTest extends TestLogger {
StreamGraph graph = graphGenerator.generate();
StreamNode processNode = graph.getStreamNode(process.getId());
assertThat(processNode.getSortedInputs(), equalTo(true));
assertThat(
processNode.getInputRequirements().get(0),
equalTo(StreamConfig.InputRequirement.SORTED));
assertThat(
processNode.getInputRequirements().get(1),
equalTo(StreamConfig.InputRequirement.SORTED));
assertThat(
processNode.getOperatorFactory().getChainingStrategy(),
equalTo(ChainingStrategy.HEAD));
......@@ -234,7 +243,12 @@ public class StreamGraphGeneratorBatchExecutionTest extends TestLogger {
StreamGraph graph = graphGenerator.generate();
StreamNode processNode = graph.getStreamNode(process.getId());
assertThat(processNode.getSortedInputs(), equalTo(true));
assertThat(
processNode.getInputRequirements().get(0),
equalTo(StreamConfig.InputRequirement.SORTED));
assertThat(
processNode.getInputRequirements().get(1),
equalTo(StreamConfig.InputRequirement.SORTED));
assertThat(
processNode.getOperatorFactory().getChainingStrategy(),
equalTo(ChainingStrategy.HEAD));
......@@ -267,7 +281,8 @@ public class StreamGraphGeneratorBatchExecutionTest extends TestLogger {
StreamGraph graph = graphGenerator.generate();
StreamNode processNode = graph.getStreamNode(process.getId());
assertThat(processNode.getSortedInputs(), equalTo(false));
assertThat(processNode.getInputRequirements().get(0), nullValue());
assertThat(processNode.getInputRequirements().get(1), nullValue());
assertThat(graph.getStateBackend(), nullValue());
assertThat(graph.getTimerServiceProvider(), nullValue());
}
......@@ -368,7 +383,12 @@ public class StreamGraphGeneratorBatchExecutionTest extends TestLogger {
StreamGraph graph = graphGenerator.generate();
StreamNode operatorNode = graph.getStreamNode(multipleInputTransformation.getId());
assertThat(operatorNode.getSortedInputs(), equalTo(true));
assertThat(
operatorNode.getInputRequirements().get(0),
equalTo(StreamConfig.InputRequirement.SORTED));
assertThat(
operatorNode.getInputRequirements().get(1),
equalTo(StreamConfig.InputRequirement.SORTED));
assertThat(
operatorNode.getOperatorFactory().getChainingStrategy(),
equalTo(ChainingStrategy.HEAD));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册