提交 84dba37e 编写于 作者: B Boyang Jerry Peng 提交者: Matteo Merli

fix: function config cleanupSubscription update bug (#3771)

* fix: function config cleanupSubscription update bug

* add and fix unit tests
上级 f46bed75
......@@ -1210,10 +1210,18 @@ public class PulsarFunctionE2ETest {
functionConfig.setInputs(Collections.singleton(sourceTopic));
functionConfig.setClassName("org.apache.pulsar.functions.api.examples.ExclamationFunction");
functionConfig.setOutput(sinkTopic);
functionConfig.setCleanupSubscription(true);
functionConfig.setCleanupSubscription(false);
functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);
admin.functions().createFunctionWithUrl(functionConfig, jarFilePathUrl);
retryStrategically((test) -> {
try {
return admin.functions().getFunction(tenant, namespacePortion, functionName).getCleanupSubscription();
} catch (PulsarAdminException e) {
return false;
}
}, 5, 150);
assertFalse(admin.functions().getFunction(tenant, namespacePortion, functionName).getCleanupSubscription());
retryStrategically((test) -> {
try {
......@@ -1225,6 +1233,19 @@ public class PulsarFunctionE2ETest {
// validate pulsar source consumer has started on the topic
assertEquals(admin.topics().getStats(sourceTopic).subscriptions.size(), 1);
// test update cleanup subscription
functionConfig.setCleanupSubscription(true);
admin.functions().updateFunctionWithUrl(functionConfig, jarFilePathUrl);
retryStrategically((test) -> {
try {
return admin.functions().getFunction(tenant, namespacePortion, functionName).getCleanupSubscription();
} catch (PulsarAdminException e) {
return false;
}
}, 5, 150);
assertTrue(admin.functions().getFunction(tenant, namespacePortion, functionName).getCleanupSubscription());
int totalMsgs = 10;
for (int i = 0; i < totalMsgs; i++) {
String data = "my-message-" + i;
......@@ -1269,6 +1290,59 @@ public class PulsarFunctionE2ETest {
// make sure subscriptions are cleanup
assertEquals(admin.topics().getStats(sourceTopic).subscriptions.size(), 0);
/** test do not cleanup subscription **/
functionConfig.setCleanupSubscription(false);
admin.functions().createFunctionWithUrl(functionConfig, jarFilePathUrl);
retryStrategically((test) -> {
try {
return admin.topics().getStats(sourceTopic).subscriptions.size() == 1;
} catch (PulsarAdminException e) {
return false;
}
}, 5, 150);
// validate pulsar source consumer has started on the topic
assertEquals(admin.topics().getStats(sourceTopic).subscriptions.size(), 1);
retryStrategically((test) -> {
try {
FunctionConfig result = admin.functions().getFunction(tenant, namespacePortion, functionName);
return result.getParallelism() == 2 && result.getCleanupSubscription() == false;
} catch (PulsarAdminException e) {
return false;
}
}, 5, 150);
assertFalse(admin.functions().getFunction(tenant, namespacePortion, functionName).getCleanupSubscription());
// test update another config and making sure that subscription cleanup remains unchanged
functionConfig.setParallelism(2);
admin.functions().updateFunctionWithUrl(functionConfig, jarFilePathUrl);
retryStrategically((test) -> {
try {
FunctionConfig result = admin.functions().getFunction(tenant, namespacePortion, functionName);
return result.getParallelism() == 2 && result.getCleanupSubscription() == false;
} catch (PulsarAdminException e) {
return false;
}
}, 5, 150);
assertFalse(admin.functions().getFunction(tenant, namespacePortion, functionName).getCleanupSubscription());
// delete functions
admin.functions().deleteFunction(tenant, namespacePortion, functionName);
retryStrategically((test) -> {
try {
return admin.topics().getStats(sourceTopic).subscriptions.size() == 1;
} catch (PulsarAdminException e) {
return false;
}
}, 5, 150);
// make sure subscriptions are cleanup
assertEquals(admin.topics().getStats(sourceTopic).subscriptions.size(), 1);
}
......
......@@ -250,6 +250,7 @@ public class FunctionConfigUtils {
functionConfig.setRetainOrdering(false);
functionConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
}
functionConfig.setCleanupSubscription(functionDetails.getSource().getCleanupSubscription());
functionConfig.setAutoAck(functionDetails.getAutoAck());
if (functionDetails.getSource().getTimeoutMs() != 0) {
functionConfig.setTimeoutMs(functionDetails.getSource().getTimeoutMs());
......@@ -677,6 +678,9 @@ public class FunctionConfigUtils {
if (newConfig.getTimeoutMs() != null) {
mergedConfig.setTimeoutMs(newConfig.getTimeoutMs());
}
if (newConfig.getCleanupSubscription() != null) {
mergedConfig.setCleanupSubscription(newConfig.getCleanupSubscription());
}
return mergedConfig;
}
}
......@@ -35,6 +35,8 @@ import java.util.Map;
import static org.apache.pulsar.common.functions.FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE;
import static org.apache.pulsar.common.functions.FunctionConfig.Runtime.PYTHON;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
/**
* Unit test of {@link Reflections}.
......@@ -65,6 +67,8 @@ public class FunctionConfigUtilsTest {
// add default resources
functionConfig.setResources(Resources.getDefaultResources());
// set default cleanupSubscription config
functionConfig.setCleanupSubscription(true);
assertEquals(
new Gson().toJson(functionConfig),
new Gson().toJson(convertedConfig)
......@@ -96,6 +100,8 @@ public class FunctionConfigUtilsTest {
// add default resources
functionConfig.setResources(Resources.getDefaultResources());
// set default cleanupSubscription config
functionConfig.setCleanupSubscription(true);
assertEquals(
new Gson().toJson(functionConfig),
new Gson().toJson(convertedConfig)
......@@ -199,6 +205,22 @@ public class FunctionConfigUtilsTest {
);
}
@Test
public void testMergeCleanupSubscription() {
FunctionConfig functionConfig = createFunctionConfig();
FunctionConfig newFunctionConfig = createUpdatedFunctionConfig("cleanupSubscription", true);
FunctionConfig mergedConfig = FunctionConfigUtils.validateUpdate(functionConfig, newFunctionConfig);
assertTrue(mergedConfig.getCleanupSubscription());
newFunctionConfig = createUpdatedFunctionConfig("cleanupSubscription", false);
mergedConfig = FunctionConfigUtils.validateUpdate(functionConfig, newFunctionConfig);
assertFalse(mergedConfig.getCleanupSubscription());
newFunctionConfig = createUpdatedFunctionConfig("cleanupSubscription", true);
mergedConfig = FunctionConfigUtils.validateUpdate(functionConfig, newFunctionConfig);
assertTrue(mergedConfig.getCleanupSubscription());
}
@Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Processing Guarantess cannot be alterted")
public void testMergeDifferentProcessingGuarantees() {
FunctionConfig functionConfig = createFunctionConfig();
......@@ -392,6 +414,7 @@ public class FunctionConfigUtilsTest {
functionConfig.setAutoAck(true);
functionConfig.setTimeoutMs(2000l);
functionConfig.setWindowConfig(new WindowConfig().setWindowLengthCount(10));
functionConfig.setCleanupSubscription(true);
return functionConfig;
}
......@@ -425,7 +448,9 @@ public class FunctionConfigUtilsTest {
.setSchemaType(JSONSchema.class.getName()).build());
Function.SourceSpec sourceSpec = Function.SourceSpec.newBuilder()
.putAllInputSpecs(consumerSpecMap)
.setSubscriptionType(Function.SubscriptionType.FAILOVER).build();
.setSubscriptionType(Function.SubscriptionType.FAILOVER)
.setCleanupSubscription(true)
.build();
boolean autoAck = true;
String logTopic = "log-topic1";
Function.Resources resources = Function.Resources.newBuilder().setCpu(1.5).setDisk(1024 * 20).setRam(1024 * 10).build();
......@@ -466,5 +491,6 @@ public class FunctionConfigUtilsTest {
assertEquals(functionConfig.getResources().getRam().longValue(), resources.getRam());
assertEquals(functionConfig.getOutput(), sinkSpec.getTopic());
assertEquals(functionConfig.getInputSpecs().keySet(), sourceSpec.getInputSpecsMap().keySet());
assertEquals(functionConfig.getCleanupSubscription().booleanValue(), sourceSpec.getCleanupSubscription());
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册