diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java index 04300db37738026606161d5afdbee72bb4e307f2..190b34e900888f8897da356004026313230faeb6 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java @@ -623,9 +623,10 @@ public class FunctionConfigUtils { if (!existingConfig.getInputSpecs().containsKey(topicName)) { throw new IllegalArgumentException("Input Topics cannot be altered"); } - if (!consumerConfig.equals(existingConfig.getInputSpecs().get(topicName))) { - throw new IllegalArgumentException("Input Specs mismatch"); + if (consumerConfig.isRegexPattern() != existingConfig.getInputSpecs().get(topicName).isRegexPattern()) { + throw new IllegalArgumentException("isRegexPattern for input topic " + topicName + " cannot be altered"); } + mergedConfig.getInputSpecs().put(topicName, consumerConfig); }); } if (!StringUtils.isEmpty(newConfig.getOutput()) && !newConfig.getOutput().equals(existingConfig.getOutput())) { diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java index a972947198a594688bfeaa49be482b83db18e48f..dd57e7422e77e726f85d45d64269206e92b6b0d3 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java @@ -420,9 +420,10 @@ public class SinkConfigUtils { if (!existingConfig.getInputSpecs().containsKey(topicName)) { throw new IllegalArgumentException("Input Topics cannot be altered"); } - if (!consumerConfig.equals(existingConfig.getInputSpecs().get(topicName))) { - throw new IllegalArgumentException("Input Specs mismatch"); + if (consumerConfig.isRegexPattern() != existingConfig.getInputSpecs().get(topicName).isRegexPattern()) { + throw new IllegalArgumentException("isRegexPattern for input topic " + topicName + " cannot be altered"); } + mergedConfig.getInputSpecs().put(topicName, consumerConfig); }); } if (newConfig.getProcessingGuarantees() != null && !newConfig.getProcessingGuarantees().equals(existingConfig.getProcessingGuarantees())) { diff --git a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java index 0f4f40054304fdbe6c9847bc3aab187ba1c0461b..accf6247e63612529098db237b617166591dc177 100644 --- a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java +++ b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java @@ -151,6 +151,25 @@ public class FunctionConfigUtilsTest { FunctionConfigUtils.validateUpdate(functionConfig, newFunctionConfig); } + @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "isRegexPattern for input topic test-input cannot be altered") + public void testMergeDifferentInputSpecWithRegexChange() { + FunctionConfig functionConfig = createFunctionConfig(); + Map inputSpecs = new HashMap<>(); + inputSpecs.put("test-input", ConsumerConfig.builder().isRegexPattern(false).serdeClassName("my-serde").build()); + FunctionConfig newFunctionConfig = createUpdatedFunctionConfig("inputSpecs", inputSpecs); + FunctionConfigUtils.validateUpdate(functionConfig, newFunctionConfig); + } + + @Test + public void testMergeDifferentInputSpec() { + FunctionConfig functionConfig = createFunctionConfig(); + Map inputSpecs = new HashMap<>(); + inputSpecs.put("test-input", ConsumerConfig.builder().isRegexPattern(true).serdeClassName("test-serde").receiverQueueSize(58).build()); + FunctionConfig newFunctionConfig = createUpdatedFunctionConfig("inputSpecs", inputSpecs); + FunctionConfig mergedConfig = FunctionConfigUtils.validateUpdate(functionConfig, newFunctionConfig); + assertEquals(mergedConfig.getInputSpecs().get("test-input"), newFunctionConfig.getInputSpecs().get("test-input")); + } + @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Output topics differ") public void testMergeDifferentOutput() { FunctionConfig functionConfig = createFunctionConfig(); diff --git a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java index dcc415b9356cd696675e35987de50f1488d55446..efde8ab83af1162b74f55f1452838ed8139284fa 100644 --- a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java +++ b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java @@ -120,6 +120,25 @@ public class SinkConfigUtilsTest { SinkConfigUtils.validateUpdate(sinkConfig, newSinkConfig); } + @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "isRegexPattern for input topic test-input cannot be altered") + public void testMergeDifferentInputSpecWithRegexChange() { + SinkConfig sinkConfig = createSinkConfig(); + Map inputSpecs = new HashMap<>(); + inputSpecs.put("test-input", ConsumerConfig.builder().isRegexPattern(false).serdeClassName("my-serde").build()); + SinkConfig newSinkConfig = createUpdatedSinkConfig("inputSpecs", inputSpecs); + SinkConfigUtils.validateUpdate(sinkConfig, newSinkConfig); + } + + @Test + public void testMergeDifferentInputSpec() { + SinkConfig sinkConfig = createSinkConfig(); + Map inputSpecs = new HashMap<>(); + inputSpecs.put("test-input", ConsumerConfig.builder().isRegexPattern(true).serdeClassName("test-serde").receiverQueueSize(58).build()); + SinkConfig newSinkConfig = createUpdatedSinkConfig("inputSpecs", inputSpecs); + SinkConfig mergedConfig = SinkConfigUtils.validateUpdate(sinkConfig, newSinkConfig); + assertEquals(mergedConfig.getInputSpecs().get("test-input"), newSinkConfig.getInputSpecs().get("test-input")); + } + @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Processing Guarantess cannot be alterted") public void testMergeDifferentProcessingGuarantees() { SinkConfig sinkConfig = createSinkConfig();