From 524a10708aed7f9c67cccba909d489e8d14a633f Mon Sep 17 00:00:00 2001 From: Aljoscha Krettek Date: Thu, 10 Dec 2020 15:07:57 +0100 Subject: [PATCH] [FLINK-20491] Add per-input setting of BATCH execution requirements This doesn't change the actual behavior, we still set the same "sorted" setting on both inputs. We will add tests and actually change the behavior in a follow-up commit. --- .../streaming/api/graph/StreamConfig.java | 59 ++++++++++++----- .../streaming/api/graph/StreamGraph.java | 4 -- .../flink/streaming/api/graph/StreamNode.java | 12 ++-- .../api/graph/StreamingJobGraphGenerator.java | 11 +++- .../StreamMultipleInputProcessorFactory.java | 63 ++++++++++++++----- .../io/StreamTwoInputProcessorFactory.java | 56 ++++++++++++++--- .../runtime/tasks/OneInputStreamTask.java | 6 +- .../translators/BatchExecutionUtils.java | 12 +++- .../LegacySinkTransformationTranslator.java | 4 +- .../MultiInputTransformationTranslator.java | 9 ++- .../OneInputTransformationTranslator.java | 4 +- .../ReduceTransformationTranslator.java | 4 +- .../TwoInputTransformationTranslator.java | 7 ++- ...treamGraphGeneratorBatchExecutionTest.java | 34 +++++++--- 14 files changed, 217 insertions(+), 68 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java index e0be035707b..67b43cc9583 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java @@ -110,12 +110,6 @@ public class StreamConfig implements Serializable { .withDescription( "If state backend is specified, whether it uses managed memory."); - private static final ConfigOption SORTED_INPUTS = - ConfigOptions.key("sorted-inputs") - .booleanType() - .defaultValue(false) - .withDescription("A flag to enable/disable sorting inputs of keyed operators."); - // ------------------------------------------------------------------------ // Default Values // ------------------------------------------------------------------------ @@ -251,7 +245,7 @@ public class StreamConfig implements Serializable { public void setupNetworkInputs(TypeSerializer... serializers) { InputConfig[] inputs = new InputConfig[serializers.length]; for (int i = 0; i < serializers.length; i++) { - inputs[i] = new NetworkInputConfig(serializers[i], i); + inputs[i] = new NetworkInputConfig(serializers[i], i, InputRequirement.PASS_THROUGH); } setInputs(inputs); } @@ -680,14 +674,6 @@ public class StreamConfig implements Serializable { return builder.toString(); } - public void setShouldSortInputs(boolean sortInputs) { - config.set(SORTED_INPUTS, sortInputs); - } - - public boolean shouldSortInputs() { - return config.get(SORTED_INPUTS); - } - public void setGraphContainingLoops(boolean graphContainingLoops) { config.setBoolean(GRAPH_CONTAINING_LOOPS, graphContainingLoops); } @@ -696,17 +682,50 @@ public class StreamConfig implements Serializable { return config.getBoolean(GRAPH_CONTAINING_LOOPS, false); } + /** + * Requirements of the different inputs of an operator. Each input can have a different + * requirement. For all {@link #SORTED} inputs, records are sorted/grouped by key and all + * records of a given key are passed to the operator consecutively before moving on to the next + * group. + */ + public enum InputRequirement { + /** + * Records from all sorted inputs are grouped (sorted) by key and are then fed to the + * operator one group at a time. This "zig-zags" between different inputs if records for the + * same key arrive on multiple inputs to ensure that the operator sees all records with a + * key as one consecutive group. + */ + SORTED, + + /** + * Records from {@link #PASS_THROUGH} inputs are passed to the operator before passing any + * records from {@link #SORTED} inputs. There are no guarantees on ordering between and + * within the different {@link #PASS_THROUGH} inputs. + */ + PASS_THROUGH; + } + /** Interface representing chained inputs. */ public interface InputConfig extends Serializable {} /** A representation of a Network {@link InputConfig}. */ public static class NetworkInputConfig implements InputConfig { private final TypeSerializer typeSerializer; + private final InputRequirement inputRequirement; + private int inputGateIndex; public NetworkInputConfig(TypeSerializer typeSerializer, int inputGateIndex) { + this(typeSerializer, inputGateIndex, InputRequirement.PASS_THROUGH); + } + + public NetworkInputConfig( + TypeSerializer typeSerializer, + int inputGateIndex, + InputRequirement inputRequirement) { this.typeSerializer = typeSerializer; this.inputGateIndex = inputGateIndex; + this.inputRequirement = inputRequirement; } public TypeSerializer getTypeSerializer() { @@ -716,6 +735,10 @@ public class StreamConfig implements Serializable { public int getInputGateIndex() { return inputGateIndex; } + + public InputRequirement getInputRequirement() { + return inputRequirement; + } } /** A serialized representation of an input. */ @@ -752,4 +775,10 @@ public class StreamConfig implements Serializable { return inputEdge.hashCode(); } } + + public static boolean requiresSorting(StreamConfig.InputConfig inputConfig) { + return inputConfig instanceof StreamConfig.NetworkInputConfig + && ((StreamConfig.NetworkInputConfig) inputConfig).getInputRequirement() + == StreamConfig.InputRequirement.SORTED; + } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java index fa3a510aaa1..93a2d984aee 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java @@ -734,10 +734,6 @@ public class StreamGraph implements Pipeline { getStreamNode(vertexID).setOutputFormat(outputFormat); } - void setSortedInputs(int vertexId, boolean shouldSort) { - getStreamNode(vertexId).setSortedInputs(shouldSort); - } - public void setTransformationUID(Integer nodeId, String transformationId) { StreamNode node = streamNodes.get(nodeId); if (node != null) { diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java index 00854ea1ff0..151a4b7d6c1 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java @@ -84,7 +84,8 @@ public class StreamNode { private String transformationUID; private String userHash; - private boolean sortedInputs = false; + + private final Map inputRequirements = new HashMap<>(); @VisibleForTesting public StreamNode( @@ -343,12 +344,13 @@ public class StreamNode { this.userHash = userHash; } - public void setSortedInputs(boolean sortedInputs) { - this.sortedInputs = sortedInputs; + public void addInputRequirement( + int inputIndex, StreamConfig.InputRequirement inputRequirement) { + inputRequirements.put(inputIndex, inputRequirement); } - public boolean getSortedInputs() { - return sortedInputs; + public Map getInputRequirements() { + return inputRequirements; } public Optional getCoordinatorProvider( diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java index b06bdb7a66e..d0da83ba90d 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java @@ -649,16 +649,23 @@ public class StreamingJobGraphGenerator { // network input. null if we move to a new input, non-null if this is a further edge // that is union-ed into the same input if (inputConfigs[inputIndex] == null) { + // PASS_THROUGH is a sensible default for streaming jobs. Only for BATCH + // execution can we have sorted inputs + StreamConfig.InputRequirement inputRequirement = + vertex.getInputRequirements() + .getOrDefault( + inputIndex, StreamConfig.InputRequirement.PASS_THROUGH); inputConfigs[inputIndex] = new StreamConfig.NetworkInputConfig( - inputSerializers[inputIndex], inputGateCount++); + inputSerializers[inputIndex], + inputGateCount++, + inputRequirement); } } } config.setInputs(inputConfigs); config.setTypeSerializerOut(vertex.getTypeSerializerOut()); - config.setShouldSortInputs(vertex.getSortedInputs()); // iterate edges, find sideOutput edges create and save serializers for each outputTag type for (StreamEdge edge : chainableOutputs) { diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessorFactory.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessorFactory.java index bd1a6c08524..c60c5c7dba9 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessorFactory.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessorFactory.java @@ -51,6 +51,7 @@ import java.util.Arrays; import java.util.List; import java.util.stream.IntStream; +import static org.apache.flink.streaming.api.graph.StreamConfig.requiresSorting; import static org.apache.flink.util.Preconditions.checkNotNull; import static org.apache.flink.util.Preconditions.checkState; @@ -118,31 +119,48 @@ public class StreamMultipleInputProcessorFactory { InputSelectable inputSelectable = mainOperator instanceof InputSelectable ? (InputSelectable) mainOperator : null; - if (streamConfig.shouldSortInputs()) { + + StreamConfig.InputConfig[] inputConfigs = streamConfig.getInputs(userClassloader); + boolean anyRequiresSorting = + Arrays.stream(inputConfigs).anyMatch(StreamConfig::requiresSorting); + + if (anyRequiresSorting) { if (inputSelectable != null) { throw new IllegalStateException( "The InputSelectable interface is not supported with sorting inputs"); } + StreamTaskInput[] sortingInputs = + IntStream.range(0, inputsCount) + .filter(idx -> requiresSorting(inputConfigs[idx])) + .mapToObj(idx -> inputs[idx]) + .toArray(StreamTaskInput[]::new); + KeySelector[] sortingInputKeySelectors = + IntStream.range(0, inputsCount) + .filter(idx -> requiresSorting(inputConfigs[idx])) + .mapToObj(idx -> streamConfig.getStatePartitioner(idx, userClassloader)) + .toArray(KeySelector[]::new); + TypeSerializer[] sortingInputKeySerializers = + IntStream.range(0, inputsCount) + .filter(idx -> requiresSorting(inputConfigs[idx])) + .mapToObj(idx -> streamConfig.getTypeSerializerIn(idx, userClassloader)) + .toArray(TypeSerializer[]::new); + + StreamTaskInput[] passThroughInputs = + IntStream.range(0, inputsCount) + .filter(idx -> !requiresSorting(inputConfigs[idx])) + .mapToObj(idx -> inputs[idx]) + .toArray(StreamTaskInput[]::new); + SelectableSortingInputs selectableSortingInputs = MultiInputSortingDataInput.wrapInputs( ownerTask, - inputs, - IntStream.range(0, inputsCount) - .mapToObj( - idx -> - streamConfig.getStatePartitioner( - idx, userClassloader)) - .toArray(KeySelector[]::new), - IntStream.range(0, inputsCount) - .mapToObj( - idx -> - streamConfig.getTypeSerializerIn( - idx, userClassloader)) - .toArray(TypeSerializer[]::new), + sortingInputs, + sortingInputKeySelectors, + sortingInputKeySerializers, streamConfig.getStateKeySerializer(userClassloader), - new StreamTaskInput[0], + passThroughInputs, memoryManager, ioManager, executionConfig.isObjectReuseEnabled(), @@ -152,7 +170,20 @@ public class StreamMultipleInputProcessorFactory { userClassloader), jobConfig); - inputs = selectableSortingInputs.getSortedInputs(); + StreamTaskInput[] sortedInputs = selectableSortingInputs.getSortedInputs(); + StreamTaskInput[] passedThroughInputs = + selectableSortingInputs.getPassThroughInputs(); + int sortedIndex = 0; + int passThroughIndex = 0; + for (int i = 0; i < inputs.length; i++) { + if (requiresSorting(inputConfigs[i])) { + inputs[i] = sortedInputs[sortedIndex]; + sortedIndex++; + } else { + inputs[i] = passedThroughInputs[passThroughIndex]; + passThroughIndex++; + } + } inputSelectable = selectableSortingInputs.getInputSelectable(); } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessorFactory.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessorFactory.java index 25f81a9f7e6..efdf8240f12 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessorFactory.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessorFactory.java @@ -43,6 +43,10 @@ import org.apache.flink.streaming.runtime.streamstatus.StreamStatus; import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer; import org.apache.flink.util.function.ThrowingConsumer; +import java.util.ArrayList; +import java.util.List; + +import static org.apache.flink.streaming.api.graph.StreamConfig.requiresSorting; import static org.apache.flink.util.Preconditions.checkNotNull; /** A factory for {@link StreamTwoInputProcessor}. */ @@ -90,25 +94,47 @@ public class StreamTwoInputProcessorFactory { InputSelectable inputSelectable = streamOperator instanceof InputSelectable ? (InputSelectable) streamOperator : null; - if (streamConfig.shouldSortInputs()) { + + // this is a bit verbose because we're manually handling input1 and input2 + // TODO: extract method + StreamConfig.InputConfig[] inputConfigs = streamConfig.getInputs(userClassloader); + boolean input1IsSorted = requiresSorting(inputConfigs[0]); + boolean input2IsSorted = requiresSorting(inputConfigs[1]); + + if (input1IsSorted || input2IsSorted) { + // as soon as one input requires sorting we need to treat all inputs differently, to + // make sure that pass-through inputs have precedence if (inputSelectable != null) { throw new IllegalStateException( "The InputSelectable interface is not supported with sorting inputs"); } + List> sortedTaskInputs = new ArrayList<>(); + List> keySelectors = new ArrayList<>(); + List> passThroughTaskInputs = new ArrayList<>(); + if (input1IsSorted) { + sortedTaskInputs.add(input1); + keySelectors.add(streamConfig.getStatePartitioner(0, userClassloader)); + } else { + passThroughTaskInputs.add(input1); + } + if (input2IsSorted) { + sortedTaskInputs.add(input2); + keySelectors.add(streamConfig.getStatePartitioner(1, userClassloader)); + } else { + passThroughTaskInputs.add(input2); + } + @SuppressWarnings("unchecked") SelectableSortingInputs selectableSortingInputs = MultiInputSortingDataInput.wrapInputs( ownerTask, - new StreamTaskInput[] {input1, input2}, - new KeySelector[] { - streamConfig.getStatePartitioner(0, userClassloader), - streamConfig.getStatePartitioner(1, userClassloader) - }, + sortedTaskInputs.toArray(new StreamTaskInput[0]), + keySelectors.toArray(new KeySelector[0]), new TypeSerializer[] {typeSerializer1, typeSerializer2}, streamConfig.getStateKeySerializer(userClassloader), - new StreamTaskInput[0], + passThroughTaskInputs.toArray(new StreamTaskInput[0]), memoryManager, ioManager, executionConfig.isObjectReuseEnabled(), @@ -118,8 +144,18 @@ public class StreamTwoInputProcessorFactory { userClassloader), jobConfig); inputSelectable = selectableSortingInputs.getInputSelectable(); - input1 = getSortedInput(selectableSortingInputs.getSortedInputs()[0]); - input2 = getSortedInput(selectableSortingInputs.getSortedInputs()[1]); + StreamTaskInput[] sortedInputs = selectableSortingInputs.getSortedInputs(); + StreamTaskInput[] passThroughInputs = selectableSortingInputs.getPassThroughInputs(); + if (input1IsSorted) { + input1 = toTypedInput(sortedInputs[0]); + } else { + input1 = toTypedInput(passThroughInputs[0]); + } + if (input2IsSorted) { + input2 = toTypedInput(sortedInputs[sortedInputs.length - 1]); + } else { + input2 = toTypedInput(passThroughInputs[passThroughInputs.length - 1]); + } } StreamTaskNetworkOutput output1 = @@ -151,7 +187,7 @@ public class StreamTwoInputProcessorFactory { } @SuppressWarnings("unchecked") - private static StreamTaskInput getSortedInput(StreamTaskInput multiInput) { + private static StreamTaskInput toTypedInput(StreamTaskInput multiInput) { return (StreamTaskInput) multiInput; } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java index 8b0a6b73bec..00435366aeb 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java @@ -45,6 +45,7 @@ import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer; import javax.annotation.Nullable; +import static org.apache.flink.streaming.api.graph.StreamConfig.requiresSorting; import static org.apache.flink.util.Preconditions.checkNotNull; import static org.apache.flink.util.Preconditions.checkState; @@ -90,7 +91,10 @@ public class OneInputStreamTask extends StreamTask output = createDataOutput(numRecordsIn); StreamTaskInput input = createTaskInput(inputGate); - if (configuration.shouldSortInputs()) { + StreamConfig.InputConfig[] inputConfigs = + configuration.getInputs(getUserCodeClassLoader()); + StreamConfig.InputConfig inputConfig = inputConfigs[0]; + if (requiresSorting(inputConfig)) { checkState( !configuration.isCheckpointingEnabled(), "Checkpointing is not allowed with sorted inputs."); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/BatchExecutionUtils.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/BatchExecutionUtils.java index bd851f87e7f..7e72bd56001 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/BatchExecutionUtils.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/BatchExecutionUtils.java @@ -20,6 +20,7 @@ package org.apache.flink.streaming.runtime.translators; import org.apache.flink.configuration.ExecutionOptions; import org.apache.flink.core.memory.ManagedMemoryUseCase; +import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.graph.StreamNode; import org.apache.flink.streaming.api.graph.TransformationTranslator; import org.apache.flink.streaming.api.operators.ChainingStrategy; @@ -40,7 +41,10 @@ import static org.apache.flink.util.Preconditions.checkState; class BatchExecutionUtils { private static final Logger LOG = LoggerFactory.getLogger(BatchExecutionUtils.class); - static void applySortingInputs(int transformationId, TransformationTranslator.Context context) { + static void applyBatchExecutionSettings( + int transformationId, + TransformationTranslator.Context context, + StreamConfig.InputRequirement... inputRequirements) { StreamNode node = context.getStreamGraph().getStreamNode(transformationId); boolean sortInputs = context.getGraphGeneratorConfig().get(ExecutionOptions.SORT_INPUTS); boolean isInputSelectable = isInputSelectable(node); @@ -52,8 +56,10 @@ class BatchExecutionUtils { "Batch state backend and sorting inputs are not supported in graphs with an InputSelectable operator."); if (sortInputs) { - LOG.debug("Enabling sorting inputs for an operator {}.", node); - node.setSortedInputs(true); + LOG.debug("Applying sorting/pass-through input requirements for operator {}.", node); + for (int i = 0; i < inputRequirements.length; i++) { + node.addInputRequirement(i, inputRequirements[i]); + } Map operatorScopeUseCaseWeights = new HashMap<>(); operatorScopeUseCaseWeights.put(ManagedMemoryUseCase.BATCH_OP, 1); node.setManagedMemoryUseCaseWeights( diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/LegacySinkTransformationTranslator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/LegacySinkTransformationTranslator.java index 3786350ba36..fa2b1d3608c 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/LegacySinkTransformationTranslator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/LegacySinkTransformationTranslator.java @@ -23,6 +23,7 @@ import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.dag.Transformation; import org.apache.flink.streaming.api.graph.SimpleTransformationTranslator; +import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.graph.StreamGraph; import org.apache.flink.streaming.api.graph.TransformationTranslator; import org.apache.flink.streaming.api.transformations.LegacySinkTransformation; @@ -49,7 +50,8 @@ public class LegacySinkTransformationTranslator final Collection ids = translateInternal(transformation, context); boolean isKeyed = transformation.getStateKeySelector() != null; if (isKeyed) { - BatchExecutionUtils.applySortingInputs(transformation.getId(), context); + BatchExecutionUtils.applyBatchExecutionSettings( + transformation.getId(), context, StreamConfig.InputRequirement.SORTED); } return ids; } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/MultiInputTransformationTranslator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/MultiInputTransformationTranslator.java index aed2ee632f0..8d87c7674af 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/MultiInputTransformationTranslator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/MultiInputTransformationTranslator.java @@ -23,6 +23,7 @@ import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.dag.Transformation; import org.apache.flink.streaming.api.graph.SimpleTransformationTranslator; +import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.graph.StreamGraph; import org.apache.flink.streaming.api.graph.TransformationTranslator; import org.apache.flink.streaming.api.transformations.AbstractMultipleInputTransformation; @@ -53,7 +54,13 @@ public class MultiInputTransformationTranslator Collection ids = translateInternal(transformation, context); boolean isKeyed = transformation instanceof KeyedMultipleInputTransformation; if (isKeyed) { - BatchExecutionUtils.applySortingInputs(transformation.getId(), context); + List> inputs = transformation.getInputs(); + StreamConfig.InputRequirement[] inputRequirements = + inputs.stream() + .map((input) -> StreamConfig.InputRequirement.SORTED) + .toArray(StreamConfig.InputRequirement[]::new); + BatchExecutionUtils.applyBatchExecutionSettings( + transformation.getId(), context, inputRequirements); } return ids; } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/OneInputTransformationTranslator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/OneInputTransformationTranslator.java index e992ad1071a..a80b7c06922 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/OneInputTransformationTranslator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/OneInputTransformationTranslator.java @@ -20,6 +20,7 @@ package org.apache.flink.streaming.runtime.translators; import org.apache.flink.annotation.Internal; import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.graph.TransformationTranslator; import org.apache.flink.streaming.api.transformations.OneInputTransformation; @@ -51,7 +52,8 @@ public final class OneInputTransformationTranslator context); boolean isKeyed = keySelector != null; if (isKeyed) { - BatchExecutionUtils.applySortingInputs(transformation.getId(), context); + BatchExecutionUtils.applyBatchExecutionSettings( + transformation.getId(), context, StreamConfig.InputRequirement.SORTED); } return ids; diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/ReduceTransformationTranslator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/ReduceTransformationTranslator.java index a545979d7cd..63bf563e550 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/ReduceTransformationTranslator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/ReduceTransformationTranslator.java @@ -18,6 +18,7 @@ package org.apache.flink.streaming.runtime.translators; +import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.graph.TransformationTranslator; import org.apache.flink.streaming.api.operators.BatchGroupedReduceOperator; import org.apache.flink.streaming.api.operators.SimpleOperatorFactory; @@ -53,7 +54,8 @@ public class ReduceTransformationTranslator transformation.getKeySelector(), transformation.getKeyTypeInfo(), context); - BatchExecutionUtils.applySortingInputs(transformation.getId(), context); + BatchExecutionUtils.applyBatchExecutionSettings( + transformation.getId(), context, StreamConfig.InputRequirement.SORTED); return ids; } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/TwoInputTransformationTranslator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/TwoInputTransformationTranslator.java index 87474de1f51..3c261efee4a 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/TwoInputTransformationTranslator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/TwoInputTransformationTranslator.java @@ -19,6 +19,7 @@ package org.apache.flink.streaming.runtime.translators; import org.apache.flink.annotation.Internal; +import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.graph.TransformationTranslator; import org.apache.flink.streaming.api.transformations.TwoInputTransformation; @@ -46,7 +47,11 @@ public class TwoInputTransformationTranslator transformation.getStateKeySelector1() != null && transformation.getStateKeySelector2() != null; if (isKeyed) { - BatchExecutionUtils.applySortingInputs(transformation.getId(), context); + BatchExecutionUtils.applyBatchExecutionSettings( + transformation.getId(), + context, + StreamConfig.InputRequirement.SORTED, + StreamConfig.InputRequirement.SORTED); } return ids; } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorBatchExecutionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorBatchExecutionTest.java index b6d4fe4ab13..1089283442f 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorBatchExecutionTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorBatchExecutionTest.java @@ -96,7 +96,9 @@ public class StreamGraphGeneratorBatchExecutionTest extends TestLogger { StreamGraph graph = graphGenerator.generate(); StreamNode processNode = graph.getStreamNode(process.getId()); - assertThat(processNode.getSortedInputs(), equalTo(true)); + assertThat( + processNode.getInputRequirements().get(0), + equalTo(StreamConfig.InputRequirement.SORTED)); assertThat( processNode.getOperatorFactory().getChainingStrategy(), equalTo(ChainingStrategy.HEAD)); @@ -124,7 +126,9 @@ public class StreamGraphGeneratorBatchExecutionTest extends TestLogger { StreamGraph graph = graphGenerator.generate(); StreamNode processNode = graph.getStreamNode(process.getId()); - assertThat(processNode.getSortedInputs(), equalTo(true)); + assertThat( + processNode.getInputRequirements().get(0), + equalTo(StreamConfig.InputRequirement.SORTED)); assertThat( processNode.getOperatorFactory().getChainingStrategy(), equalTo(ChainingStrategy.HEAD)); @@ -152,7 +156,7 @@ public class StreamGraphGeneratorBatchExecutionTest extends TestLogger { StreamGraph graph = graphGenerator.generate(); StreamNode processNode = graph.getStreamNode(process.getId()); - assertThat(processNode.getSortedInputs(), equalTo(false)); + assertThat(processNode.getInputRequirements().get(0), nullValue()); assertThat(graph.getStateBackend(), nullValue()); assertThat(graph.getTimerServiceProvider(), nullValue()); } @@ -201,7 +205,12 @@ public class StreamGraphGeneratorBatchExecutionTest extends TestLogger { StreamGraph graph = graphGenerator.generate(); StreamNode processNode = graph.getStreamNode(process.getId()); - assertThat(processNode.getSortedInputs(), equalTo(true)); + assertThat( + processNode.getInputRequirements().get(0), + equalTo(StreamConfig.InputRequirement.SORTED)); + assertThat( + processNode.getInputRequirements().get(1), + equalTo(StreamConfig.InputRequirement.SORTED)); assertThat( processNode.getOperatorFactory().getChainingStrategy(), equalTo(ChainingStrategy.HEAD)); @@ -234,7 +243,12 @@ public class StreamGraphGeneratorBatchExecutionTest extends TestLogger { StreamGraph graph = graphGenerator.generate(); StreamNode processNode = graph.getStreamNode(process.getId()); - assertThat(processNode.getSortedInputs(), equalTo(true)); + assertThat( + processNode.getInputRequirements().get(0), + equalTo(StreamConfig.InputRequirement.SORTED)); + assertThat( + processNode.getInputRequirements().get(1), + equalTo(StreamConfig.InputRequirement.SORTED)); assertThat( processNode.getOperatorFactory().getChainingStrategy(), equalTo(ChainingStrategy.HEAD)); @@ -267,7 +281,8 @@ public class StreamGraphGeneratorBatchExecutionTest extends TestLogger { StreamGraph graph = graphGenerator.generate(); StreamNode processNode = graph.getStreamNode(process.getId()); - assertThat(processNode.getSortedInputs(), equalTo(false)); + assertThat(processNode.getInputRequirements().get(0), nullValue()); + assertThat(processNode.getInputRequirements().get(1), nullValue()); assertThat(graph.getStateBackend(), nullValue()); assertThat(graph.getTimerServiceProvider(), nullValue()); } @@ -368,7 +383,12 @@ public class StreamGraphGeneratorBatchExecutionTest extends TestLogger { StreamGraph graph = graphGenerator.generate(); StreamNode operatorNode = graph.getStreamNode(multipleInputTransformation.getId()); - assertThat(operatorNode.getSortedInputs(), equalTo(true)); + assertThat( + operatorNode.getInputRequirements().get(0), + equalTo(StreamConfig.InputRequirement.SORTED)); + assertThat( + operatorNode.getInputRequirements().get(1), + equalTo(StreamConfig.InputRequirement.SORTED)); assertThat( operatorNode.getOperatorFactory().getChainingStrategy(), equalTo(ChainingStrategy.HEAD)); -- GitLab