diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java index 2de99159b08a15facc9f136d966c642ecec7a9d9..e0be035707b9c6802aadd8fbd7a7d7bbf96bf5e8 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java @@ -248,7 +248,7 @@ public class StreamConfig implements Serializable { } } - public void setTypeSerializersIn(TypeSerializer... serializers) { + public void setupNetworkInputs(TypeSerializer... serializers) { InputConfig[] inputs = new InputConfig[serializers.length]; for (int i = 0; i < serializers.length; i++) { inputs[i] = new NetworkInputConfig(serializers[i], i); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java index 3d5bc4a307ba4d869467c50288ff5f6f7cc31c1f..26d473d81fa3b173d77ad06f735eb7f048d5dfc9 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java @@ -997,7 +997,7 @@ public class OneInputStreamTaskTest extends TestLogger { for (int chainedIndex = 1; chainedIndex < numberChainedTasks; chainedIndex++) { TestingStreamOperator chainedOperator = new TestingStreamOperator<>(); StreamConfig chainedConfig = new StreamConfig(new Configuration()); - chainedConfig.setTypeSerializersIn(StringSerializer.INSTANCE); + chainedConfig.setupNetworkInputs(StringSerializer.INSTANCE); chainedConfig.setStreamOperator(chainedOperator); chainedConfig.setOperatorID(new OperatorID(0L, chainedIndex)); chainedTaskConfigs.put(chainedIndex, chainedConfig); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java index 984ab3fbb1d2ad6f0da720ed181d5104c1056e07..04c1772da6bd60ff4a5bfa5a460fe2e9b8e2c3f4 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java @@ -133,7 +133,7 @@ public class OneInputStreamTaskTestHarness extends StreamTaskTestHarnes } streamConfig.setNumberOfNetworkInputs(1); - streamConfig.setTypeSerializersIn(inputSerializer); + streamConfig.setupNetworkInputs(inputSerializer); } public void configureForKeyedStream( diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamConfigChainer.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamConfigChainer.java index aa1e30bafccc62266e124d7fca27bde656634e40..aa019d7f7baf341d0cbcf90394d21e90e824ac70 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamConfigChainer.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamConfigChainer.java @@ -144,7 +144,7 @@ public class StreamConfigChainer { tailConfig = new StreamConfig(new Configuration()); tailConfig.setStreamOperatorFactory(checkNotNull(operatorFactory)); tailConfig.setOperatorID(checkNotNull(operatorID)); - tailConfig.setTypeSerializersIn(inputSerializer); + tailConfig.setupNetworkInputs(inputSerializer); tailConfig.setTypeSerializerOut(outputSerializer); if (createKeyedStateBackend) { // used to test multiple stateful operators chained in a single task. diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTestHarness.java index c05ff93929b6f029e72259550e7d86851ebdf5b5..7c336427c402cf7cdc5624f98205d327a5d07864 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTestHarness.java @@ -179,7 +179,7 @@ public class TwoInputStreamTaskTestHarness extends StreamTaskTest streamConfig.setInPhysicalEdges(inPhysicalEdges); streamConfig.setNumberOfNetworkInputs(numInputGates); - streamConfig.setTypeSerializersIn(inputSerializer1, inputSerializer2); + streamConfig.setupNetworkInputs(inputSerializer1, inputSerializer2); } @Override diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java index 5c9613ad752a16562964aad0ec5cce19f123588e..962f3e5f44b90b69eb9e9e7271736f4dd3463150 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java @@ -58,7 +58,7 @@ public class OneInputStreamOperatorTestHarness throws Exception { this(operator, 1, 1, 0); - config.setTypeSerializersIn(Preconditions.checkNotNull(typeSerializerIn)); + config.setupNetworkInputs(Preconditions.checkNotNull(typeSerializerIn)); } public OneInputStreamOperatorTestHarness( @@ -75,7 +75,7 @@ public class OneInputStreamOperatorTestHarness parallelism, subtaskIndex, operatorID); - config.setTypeSerializersIn(Preconditions.checkNotNull(typeSerializerIn)); + config.setupNetworkInputs(Preconditions.checkNotNull(typeSerializerIn)); } public OneInputStreamOperatorTestHarness( @@ -85,7 +85,7 @@ public class OneInputStreamOperatorTestHarness throws Exception { this(operator, environment); - config.setTypeSerializersIn(Preconditions.checkNotNull(typeSerializerIn)); + config.setupNetworkInputs(Preconditions.checkNotNull(typeSerializerIn)); } public OneInputStreamOperatorTestHarness(OneInputStreamOperator operator) @@ -139,7 +139,7 @@ public class OneInputStreamOperatorTestHarness throws Exception { this(factory, environment); - config.setTypeSerializersIn(Preconditions.checkNotNull(typeSerializerIn)); + config.setupNetworkInputs(Preconditions.checkNotNull(typeSerializerIn)); } public OneInputStreamOperatorTestHarness( @@ -153,7 +153,7 @@ public class OneInputStreamOperatorTestHarness throws Exception { this(factory, 1, 1, 0); - config.setTypeSerializersIn(Preconditions.checkNotNull(typeSerializerIn)); + config.setupNetworkInputs(Preconditions.checkNotNull(typeSerializerIn)); } public OneInputStreamOperatorTestHarness( diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/multipleinput/MultipleInputStreamOperatorBase.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/multipleinput/MultipleInputStreamOperatorBase.java index e6db716fba1cac85d948bffdb2eda4852ae33cfa..6a613be8f5c1359508b9d10d5e0f17e6f3b4d0be 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/multipleinput/MultipleInputStreamOperatorBase.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/multipleinput/MultipleInputStreamOperatorBase.java @@ -285,7 +285,7 @@ public abstract class MultipleInputStreamOperatorBase extends AbstractStreamOper streamConfig.setOperatorName(wrapper.getOperatorName()); streamConfig.setNumberOfNetworkInputs(wrapper.getAllInputTypes().size()); streamConfig.setNumberOfOutputs(wrapper.getOutputEdges().size()); - streamConfig.setTypeSerializersIn( + streamConfig.setupNetworkInputs( wrapper.getAllInputTypes().stream() .map(t -> t.createSerializer(executionConfig)) .toArray(TypeSerializer[]::new));