提交 52db9895 编写于 作者: S Sanjeev Kulkarni 提交者: Matteo Merli

Allow users to update everything in inputspecs except for isregexpattern (#3770)

* Allow users to update everything in inputspecs except for isregexpattern

* Added more tests and fixed a bug caught by them
上级 49baf5dc
......@@ -623,9 +623,10 @@ public class FunctionConfigUtils {
if (!existingConfig.getInputSpecs().containsKey(topicName)) {
throw new IllegalArgumentException("Input Topics cannot be altered");
}
if (!consumerConfig.equals(existingConfig.getInputSpecs().get(topicName))) {
throw new IllegalArgumentException("Input Specs mismatch");
if (consumerConfig.isRegexPattern() != existingConfig.getInputSpecs().get(topicName).isRegexPattern()) {
throw new IllegalArgumentException("isRegexPattern for input topic " + topicName + " cannot be altered");
}
mergedConfig.getInputSpecs().put(topicName, consumerConfig);
});
}
if (!StringUtils.isEmpty(newConfig.getOutput()) && !newConfig.getOutput().equals(existingConfig.getOutput())) {
......
......@@ -420,9 +420,10 @@ public class SinkConfigUtils {
if (!existingConfig.getInputSpecs().containsKey(topicName)) {
throw new IllegalArgumentException("Input Topics cannot be altered");
}
if (!consumerConfig.equals(existingConfig.getInputSpecs().get(topicName))) {
throw new IllegalArgumentException("Input Specs mismatch");
if (consumerConfig.isRegexPattern() != existingConfig.getInputSpecs().get(topicName).isRegexPattern()) {
throw new IllegalArgumentException("isRegexPattern for input topic " + topicName + " cannot be altered");
}
mergedConfig.getInputSpecs().put(topicName, consumerConfig);
});
}
if (newConfig.getProcessingGuarantees() != null && !newConfig.getProcessingGuarantees().equals(existingConfig.getProcessingGuarantees())) {
......
......@@ -151,6 +151,25 @@ public class FunctionConfigUtilsTest {
FunctionConfigUtils.validateUpdate(functionConfig, newFunctionConfig);
}
@Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "isRegexPattern for input topic test-input cannot be altered")
public void testMergeDifferentInputSpecWithRegexChange() {
FunctionConfig functionConfig = createFunctionConfig();
Map<String, ConsumerConfig> inputSpecs = new HashMap<>();
inputSpecs.put("test-input", ConsumerConfig.builder().isRegexPattern(false).serdeClassName("my-serde").build());
FunctionConfig newFunctionConfig = createUpdatedFunctionConfig("inputSpecs", inputSpecs);
FunctionConfigUtils.validateUpdate(functionConfig, newFunctionConfig);
}
@Test
public void testMergeDifferentInputSpec() {
FunctionConfig functionConfig = createFunctionConfig();
Map<String, ConsumerConfig> inputSpecs = new HashMap<>();
inputSpecs.put("test-input", ConsumerConfig.builder().isRegexPattern(true).serdeClassName("test-serde").receiverQueueSize(58).build());
FunctionConfig newFunctionConfig = createUpdatedFunctionConfig("inputSpecs", inputSpecs);
FunctionConfig mergedConfig = FunctionConfigUtils.validateUpdate(functionConfig, newFunctionConfig);
assertEquals(mergedConfig.getInputSpecs().get("test-input"), newFunctionConfig.getInputSpecs().get("test-input"));
}
@Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Output topics differ")
public void testMergeDifferentOutput() {
FunctionConfig functionConfig = createFunctionConfig();
......
......@@ -120,6 +120,25 @@ public class SinkConfigUtilsTest {
SinkConfigUtils.validateUpdate(sinkConfig, newSinkConfig);
}
@Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "isRegexPattern for input topic test-input cannot be altered")
public void testMergeDifferentInputSpecWithRegexChange() {
SinkConfig sinkConfig = createSinkConfig();
Map<String, ConsumerConfig> inputSpecs = new HashMap<>();
inputSpecs.put("test-input", ConsumerConfig.builder().isRegexPattern(false).serdeClassName("my-serde").build());
SinkConfig newSinkConfig = createUpdatedSinkConfig("inputSpecs", inputSpecs);
SinkConfigUtils.validateUpdate(sinkConfig, newSinkConfig);
}
@Test
public void testMergeDifferentInputSpec() {
SinkConfig sinkConfig = createSinkConfig();
Map<String, ConsumerConfig> inputSpecs = new HashMap<>();
inputSpecs.put("test-input", ConsumerConfig.builder().isRegexPattern(true).serdeClassName("test-serde").receiverQueueSize(58).build());
SinkConfig newSinkConfig = createUpdatedSinkConfig("inputSpecs", inputSpecs);
SinkConfig mergedConfig = SinkConfigUtils.validateUpdate(sinkConfig, newSinkConfig);
assertEquals(mergedConfig.getInputSpecs().get("test-input"), newSinkConfig.getInputSpecs().get("test-input"));
}
@Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Processing Guarantess cannot be alterted")
public void testMergeDifferentProcessingGuarantees() {
SinkConfig sinkConfig = createSinkConfig();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册