[refactor] Rename StreamConfig.setTypeSerializersIn() to setupNetworkInputs()

Because that's what it does.
上级 28975540
......@@ -248,7 +248,7 @@ public class StreamConfig implements Serializable {
}
}
public void setTypeSerializersIn(TypeSerializer<?>... serializers) {
public void setupNetworkInputs(TypeSerializer<?>... serializers) {
InputConfig[] inputs = new InputConfig[serializers.length];
for (int i = 0; i < serializers.length; i++) {
inputs[i] = new NetworkInputConfig(serializers[i], i);
......
......@@ -997,7 +997,7 @@ public class OneInputStreamTaskTest extends TestLogger {
for (int chainedIndex = 1; chainedIndex < numberChainedTasks; chainedIndex++) {
TestingStreamOperator<Integer, Integer> chainedOperator = new TestingStreamOperator<>();
StreamConfig chainedConfig = new StreamConfig(new Configuration());
chainedConfig.setTypeSerializersIn(StringSerializer.INSTANCE);
chainedConfig.setupNetworkInputs(StringSerializer.INSTANCE);
chainedConfig.setStreamOperator(chainedOperator);
chainedConfig.setOperatorID(new OperatorID(0L, chainedIndex));
chainedTaskConfigs.put(chainedIndex, chainedConfig);
......
......@@ -133,7 +133,7 @@ public class OneInputStreamTaskTestHarness<IN, OUT> extends StreamTaskTestHarnes
}
streamConfig.setNumberOfNetworkInputs(1);
streamConfig.setTypeSerializersIn(inputSerializer);
streamConfig.setupNetworkInputs(inputSerializer);
}
public <K> void configureForKeyedStream(
......
......@@ -144,7 +144,7 @@ public class StreamConfigChainer<OWNER> {
tailConfig = new StreamConfig(new Configuration());
tailConfig.setStreamOperatorFactory(checkNotNull(operatorFactory));
tailConfig.setOperatorID(checkNotNull(operatorID));
tailConfig.setTypeSerializersIn(inputSerializer);
tailConfig.setupNetworkInputs(inputSerializer);
tailConfig.setTypeSerializerOut(outputSerializer);
if (createKeyedStateBackend) {
// used to test multiple stateful operators chained in a single task.
......
......@@ -179,7 +179,7 @@ public class TwoInputStreamTaskTestHarness<IN1, IN2, OUT> extends StreamTaskTest
streamConfig.setInPhysicalEdges(inPhysicalEdges);
streamConfig.setNumberOfNetworkInputs(numInputGates);
streamConfig.setTypeSerializersIn(inputSerializer1, inputSerializer2);
streamConfig.setupNetworkInputs(inputSerializer1, inputSerializer2);
}
@Override
......
......@@ -58,7 +58,7 @@ public class OneInputStreamOperatorTestHarness<IN, OUT>
throws Exception {
this(operator, 1, 1, 0);
config.setTypeSerializersIn(Preconditions.checkNotNull(typeSerializerIn));
config.setupNetworkInputs(Preconditions.checkNotNull(typeSerializerIn));
}
public OneInputStreamOperatorTestHarness(
......@@ -75,7 +75,7 @@ public class OneInputStreamOperatorTestHarness<IN, OUT>
parallelism,
subtaskIndex,
operatorID);
config.setTypeSerializersIn(Preconditions.checkNotNull(typeSerializerIn));
config.setupNetworkInputs(Preconditions.checkNotNull(typeSerializerIn));
}
public OneInputStreamOperatorTestHarness(
......@@ -85,7 +85,7 @@ public class OneInputStreamOperatorTestHarness<IN, OUT>
throws Exception {
this(operator, environment);
config.setTypeSerializersIn(Preconditions.checkNotNull(typeSerializerIn));
config.setupNetworkInputs(Preconditions.checkNotNull(typeSerializerIn));
}
public OneInputStreamOperatorTestHarness(OneInputStreamOperator<IN, OUT> operator)
......@@ -139,7 +139,7 @@ public class OneInputStreamOperatorTestHarness<IN, OUT>
throws Exception {
this(factory, environment);
config.setTypeSerializersIn(Preconditions.checkNotNull(typeSerializerIn));
config.setupNetworkInputs(Preconditions.checkNotNull(typeSerializerIn));
}
public OneInputStreamOperatorTestHarness(
......@@ -153,7 +153,7 @@ public class OneInputStreamOperatorTestHarness<IN, OUT>
throws Exception {
this(factory, 1, 1, 0);
config.setTypeSerializersIn(Preconditions.checkNotNull(typeSerializerIn));
config.setupNetworkInputs(Preconditions.checkNotNull(typeSerializerIn));
}
public OneInputStreamOperatorTestHarness(
......
......@@ -285,7 +285,7 @@ public abstract class MultipleInputStreamOperatorBase extends AbstractStreamOper
streamConfig.setOperatorName(wrapper.getOperatorName());
streamConfig.setNumberOfNetworkInputs(wrapper.getAllInputTypes().size());
streamConfig.setNumberOfOutputs(wrapper.getOutputEdges().size());
streamConfig.setTypeSerializersIn(
streamConfig.setupNetworkInputs(
wrapper.getAllInputTypes().stream()
.map(t -> t.createSerializer(executionConfig))
.toArray(TypeSerializer[]::new));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册