From 52db989534c5e330861c8e412bf5ea993cf19588 Mon Sep 17 00:00:00 2001 From: Sanjeev Kulkarni Date: Thu, 7 Mar 2019 09:25:39 -0800 Subject: [PATCH] Allow users to update everything in inputspecs except for isregexpattern (#3770) * Allow users to update everything in inputspecs except for isregexpattern * Added more tests and fixed a bug caught by them --- .../functions/utils/FunctionConfigUtils.java | 5 +++-- .../functions/utils/SinkConfigUtils.java | 5 +++-- .../utils/FunctionConfigUtilsTest.java | 19 +++++++++++++++++++ .../functions/utils/SinkConfigUtilsTest.java | 19 +++++++++++++++++++ 4 files changed, 44 insertions(+), 4 deletions(-) diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java index 04300db3773..190b34e9008 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java @@ -623,9 +623,10 @@ public class FunctionConfigUtils { if (!existingConfig.getInputSpecs().containsKey(topicName)) { throw new IllegalArgumentException("Input Topics cannot be altered"); } - if (!consumerConfig.equals(existingConfig.getInputSpecs().get(topicName))) { - throw new IllegalArgumentException("Input Specs mismatch"); + if (consumerConfig.isRegexPattern() != existingConfig.getInputSpecs().get(topicName).isRegexPattern()) { + throw new IllegalArgumentException("isRegexPattern for input topic " + topicName + " cannot be altered"); } + mergedConfig.getInputSpecs().put(topicName, consumerConfig); }); } if (!StringUtils.isEmpty(newConfig.getOutput()) && !newConfig.getOutput().equals(existingConfig.getOutput())) { diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java index a972947198a..dd57e7422e7 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java @@ -420,9 +420,10 @@ public class SinkConfigUtils { if (!existingConfig.getInputSpecs().containsKey(topicName)) { throw new IllegalArgumentException("Input Topics cannot be altered"); } - if (!consumerConfig.equals(existingConfig.getInputSpecs().get(topicName))) { - throw new IllegalArgumentException("Input Specs mismatch"); + if (consumerConfig.isRegexPattern() != existingConfig.getInputSpecs().get(topicName).isRegexPattern()) { + throw new IllegalArgumentException("isRegexPattern for input topic " + topicName + " cannot be altered"); } + mergedConfig.getInputSpecs().put(topicName, consumerConfig); }); } if (newConfig.getProcessingGuarantees() != null && !newConfig.getProcessingGuarantees().equals(existingConfig.getProcessingGuarantees())) { diff --git a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java index 0f4f4005430..accf6247e63 100644 --- a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java +++ b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java @@ -151,6 +151,25 @@ public class FunctionConfigUtilsTest { FunctionConfigUtils.validateUpdate(functionConfig, newFunctionConfig); } + @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "isRegexPattern for input topic test-input cannot be altered") + public void testMergeDifferentInputSpecWithRegexChange() { + FunctionConfig functionConfig = createFunctionConfig(); + Map inputSpecs = new HashMap<>(); + inputSpecs.put("test-input", ConsumerConfig.builder().isRegexPattern(false).serdeClassName("my-serde").build()); + FunctionConfig newFunctionConfig = createUpdatedFunctionConfig("inputSpecs", inputSpecs); + FunctionConfigUtils.validateUpdate(functionConfig, newFunctionConfig); + } + + @Test + public void testMergeDifferentInputSpec() { + FunctionConfig functionConfig = createFunctionConfig(); + Map inputSpecs = new HashMap<>(); + inputSpecs.put("test-input", ConsumerConfig.builder().isRegexPattern(true).serdeClassName("test-serde").receiverQueueSize(58).build()); + FunctionConfig newFunctionConfig = createUpdatedFunctionConfig("inputSpecs", inputSpecs); + FunctionConfig mergedConfig = FunctionConfigUtils.validateUpdate(functionConfig, newFunctionConfig); + assertEquals(mergedConfig.getInputSpecs().get("test-input"), newFunctionConfig.getInputSpecs().get("test-input")); + } + @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Output topics differ") public void testMergeDifferentOutput() { FunctionConfig functionConfig = createFunctionConfig(); diff --git a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java index dcc415b9356..efde8ab83af 100644 --- a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java +++ b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java @@ -120,6 +120,25 @@ public class SinkConfigUtilsTest { SinkConfigUtils.validateUpdate(sinkConfig, newSinkConfig); } + @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "isRegexPattern for input topic test-input cannot be altered") + public void testMergeDifferentInputSpecWithRegexChange() { + SinkConfig sinkConfig = createSinkConfig(); + Map inputSpecs = new HashMap<>(); + inputSpecs.put("test-input", ConsumerConfig.builder().isRegexPattern(false).serdeClassName("my-serde").build()); + SinkConfig newSinkConfig = createUpdatedSinkConfig("inputSpecs", inputSpecs); + SinkConfigUtils.validateUpdate(sinkConfig, newSinkConfig); + } + + @Test + public void testMergeDifferentInputSpec() { + SinkConfig sinkConfig = createSinkConfig(); + Map inputSpecs = new HashMap<>(); + inputSpecs.put("test-input", ConsumerConfig.builder().isRegexPattern(true).serdeClassName("test-serde").receiverQueueSize(58).build()); + SinkConfig newSinkConfig = createUpdatedSinkConfig("inputSpecs", inputSpecs); + SinkConfig mergedConfig = SinkConfigUtils.validateUpdate(sinkConfig, newSinkConfig); + assertEquals(mergedConfig.getInputSpecs().get("test-input"), newSinkConfig.getInputSpecs().get("test-input")); + } + @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Processing Guarantess cannot be alterted") public void testMergeDifferentProcessingGuarantees() { SinkConfig sinkConfig = createSinkConfig(); -- GitLab