提交 4959f51f 编写于 作者: S Sanjeev Kulkarni 提交者: Matteo Merli

Classloader choice for validating Source/Sink (#3865)

* Try both regular classloader as well as nar class loader for validating source/sinks

* Fixed test

* Fix unittest

* Added more comments to the code

* rename variables

* Wait for the create to succeed before updating. Otherwise there might be some reamnant producers
上级 d1c17cd1
......@@ -598,6 +598,14 @@ public class PulsarFunctionE2ETest {
SourceConfig sourceConfig = createSourceConfig(tenant, namespacePortion, functionName, sinkTopic);
admin.source().createSourceWithUrl(sourceConfig, jarFilePathUrl);
retryStrategically((test) -> {
try {
return (admin.topics().getStats(sinkTopic).publishers.size() == 1);
} catch (PulsarAdminException e) {
return false;
}
}, 10, 150);
admin.source().updateSourceWithUrl(sourceConfig, jarFilePathUrl);
retryStrategically((test) -> {
......
......@@ -24,6 +24,7 @@ import com.google.gson.reflect.TypeToken;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.apache.pulsar.common.functions.ConsumerConfig;
import org.apache.pulsar.common.functions.FunctionConfig;
......@@ -47,6 +48,7 @@ import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.apache.pulsar.functions.utils.Utils.convertProcessingGuarantee;
import static org.apache.pulsar.functions.utils.Utils.getSinkType;
@Slf4j
public class SinkConfigUtils {
@Getter
......@@ -297,14 +299,37 @@ public class SinkConfigUtils {
}
String sinkClassName;
ClassLoader classLoader;
final Class<?> typeArg;
final ClassLoader classLoader;
if (!isEmpty(sinkConfig.getClassName())) {
sinkClassName = sinkConfig.getClassName();
// We really don't know if we should use nar class loader or regular classloader
ClassLoader jarClassLoader = null;
ClassLoader narClassLoader = null;
try {
classLoader = Utils.extractClassLoader(archivePath, functionPkgUrl, uploadedInputStreamAsFile);
jarClassLoader = Utils.extractClassLoader(archivePath, functionPkgUrl, uploadedInputStreamAsFile);
} catch (Exception e) {
throw new IllegalArgumentException("Invalid Sink Jar");
}
try {
narClassLoader = Utils.extractNarClassLoader(archivePath, functionPkgUrl, uploadedInputStreamAsFile);
} catch (Exception e) {
}
if (jarClassLoader == null && narClassLoader == null) {
throw new IllegalArgumentException("Invalid Sink Package");
}
// We use typeArg and classLoader as arguments for lambda functions that require them to be final
// Thus we use these tmp vars
Class<?> tmptypeArg;
ClassLoader tmpclassLoader;
try {
tmptypeArg = getSinkType(sinkClassName, narClassLoader);
tmpclassLoader = narClassLoader;
} catch (Exception e) {
tmptypeArg = getSinkType(sinkClassName, jarClassLoader);
tmpclassLoader = jarClassLoader;
}
typeArg = tmptypeArg;
classLoader = tmpclassLoader;
} else if (!org.apache.commons.lang3.StringUtils.isEmpty(sinkConfig.getArchive()) && sinkConfig.getArchive().startsWith(org.apache.pulsar.common.functions.Utils.FILE)) {
throw new IllegalArgumentException("Class-name must be present for archive with file-url");
} else {
......@@ -317,10 +342,9 @@ public class SinkConfigUtils {
} catch (IOException e1) {
throw new IllegalArgumentException("Failed to extract sink class from archive", e1);
}
typeArg = getSinkType(sinkClassName, classLoader);
}
Class<?> typeArg = getSinkType(sinkClassName, classLoader);
if (sinkConfig.getTopicToSerdeClassName() != null) {
sinkConfig.getTopicToSerdeClassName().forEach((topicName, serdeClassName) -> {
ValidatorUtils.validateSerde(serdeClassName, typeArg, classLoader, true);
......
......@@ -209,14 +209,37 @@ public class SourceConfigUtils {
}
String sourceClassName;
ClassLoader classLoader;
final Class<?> typeArg;
final ClassLoader classLoader;
if (!isEmpty(sourceConfig.getClassName())) {
sourceClassName = sourceConfig.getClassName();
// We really don't know if we should use nar class loader or regular classloader
ClassLoader jarClassLoader = null;
ClassLoader narClassLoader = null;
try {
classLoader = Utils.extractClassLoader(archivePath, functionPkgUrl, uploadedInputStreamAsFile);
jarClassLoader = Utils.extractClassLoader(archivePath, functionPkgUrl, uploadedInputStreamAsFile);
} catch (Exception e) {
throw new IllegalArgumentException("Invalid Source Jar");
}
try {
narClassLoader = Utils.extractNarClassLoader(archivePath, functionPkgUrl, uploadedInputStreamAsFile);
} catch (Exception e) {
}
if (jarClassLoader == null && narClassLoader == null) {
throw new IllegalArgumentException("Invalid Source Package");
}
// We use typeArg and classLoader as arguments for lambda functions that require them to be final
// Thus we use these tmp vars
Class<?> tmptypeArg;
ClassLoader tmpclassLoader;
try {
tmptypeArg = getSourceType(sourceClassName, narClassLoader);
tmpclassLoader = narClassLoader;
} catch (Exception e) {
tmptypeArg = getSourceType(sourceClassName, jarClassLoader);
tmpclassLoader = jarClassLoader;
}
typeArg = tmptypeArg;
classLoader = tmpclassLoader;
} else if (!StringUtils.isEmpty(sourceConfig.getArchive()) && sourceConfig.getArchive().startsWith(org.apache.pulsar.common.functions.Utils.FILE)) {
throw new IllegalArgumentException("Class-name must be present for archive with file-url");
} else {
......@@ -229,10 +252,9 @@ public class SourceConfigUtils {
} catch (IOException e1) {
throw new IllegalArgumentException("Failed to extract source class from archive", e1);
}
typeArg = getSourceType(sourceClassName, classLoader);
}
Class<?> typeArg = getSourceType(sourceClassName, classLoader);
// Only one of serdeClassName or schemaType should be set
if (!StringUtils.isEmpty(sourceConfig.getSerdeClassName()) && !StringUtils.isEmpty(sourceConfig.getSchemaType())) {
throw new IllegalArgumentException("Only one of serdeClassName or schemaType should be set");
......
......@@ -559,6 +559,9 @@ public abstract class ComponentImpl {
} else if (uploadedInputStream != null) {
functionDetails = validateUpdateRequestParams(tenant, namespace, componentName, uploadedInputStreamAsFile,
fileDetail, functionDetailsJson, mergedComponentConfigJson, componentType);
} else if (existingComponent.getPackageLocation().getPackagePath().startsWith("builtin://")) {
functionDetails = validateUpdateRequestParams(tenant, namespace, componentName, null,
null, functionDetailsJson, mergedComponentConfigJson, componentType);
} else {
functionDetails = validateUpdateRequestParamsWithExistingMetadata(
tenant, namespace, componentName, existingComponent.getPackageLocation(), mergedComponentConfigJson, componentType);
......
......@@ -361,7 +361,7 @@ public class SinkApiV3ResourceTest {
}
}
@Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Invalid Sink Jar")
@Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Invalid Sink Package")
public void testRegisterSinkHttpUrl() {
try {
testRegisterSinkMissingArguments(
......
......@@ -325,7 +325,7 @@ public class SourceApiV3ResourceTest {
}
}
@Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Invalid Source Jar")
@Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Invalid Source Package")
public void testRegisterSourceHttpUrl() {
try {
testRegisterSourceMissingArguments(
......
......@@ -172,6 +172,9 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
// validate the sink result
tester.validateSinkResult(kvs);
// update the sink
updateSinkConnector(tester, tenant, namespace, sinkName, inputTopicName);
// delete the sink
deleteSink(tenant, namespace, sinkName);
......@@ -220,6 +223,45 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
result.getStdout());
}
protected void updateSinkConnector(SinkTester tester,
String tenant,
String namespace,
String sinkName,
String inputTopicName) throws Exception {
String[] commands;
if (tester.getSinkType() != SinkTester.SinkType.UNDEFINED) {
commands = new String[] {
PulsarCluster.ADMIN_SCRIPT,
"sink", "update",
"--tenant", tenant,
"--namespace", namespace,
"--name", sinkName,
"--sink-type", tester.sinkType().name().toLowerCase(),
"--sinkConfig", new Gson().toJson(tester.sinkConfig()),
"--inputs", inputTopicName,
"--parallelism", "2"
};
} else {
commands = new String[] {
PulsarCluster.ADMIN_SCRIPT,
"sink", "create",
"--tenant", tenant,
"--namespace", namespace,
"--name", sinkName,
"--archive", tester.getSinkArchive(),
"--classname", tester.getSinkClassName(),
"--sinkConfig", new Gson().toJson(tester.sinkConfig()),
"--inputs", inputTopicName,
"--parallelism", "2"
};
}
log.info("Run command : {}", StringUtils.join(commands, ' '));
ContainerExecResult result = pulsarCluster.getAnyWorker().execCmd(commands);
assertTrue(
result.getStdout().contains("\"Updated successfully\""),
result.getStdout());
}
protected void getSinkInfoSuccess(SinkTester tester,
String tenant,
String namespace,
......@@ -422,6 +464,9 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
// validate the source result
validateSourceResult(consumer, kvs);
// update the source connector
updateSourceConnector(tester, tenant, namespace, sourceName, outputTopicName);
// delete the source
deleteSource(tenant, namespace, sourceName);
......@@ -455,6 +500,29 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
result.getStdout());
}
protected void updateSourceConnector(SourceTester tester,
String tenant,
String namespace,
String sourceName,
String outputTopicName) throws Exception {
String[] commands = {
PulsarCluster.ADMIN_SCRIPT,
"source", "update",
"--tenant", tenant,
"--namespace", namespace,
"--name", sourceName,
"--source-type", tester.sourceType(),
"--sourceConfig", new Gson().toJson(tester.sourceConfig()),
"--destinationTopicName", outputTopicName,
"--parallelism", "2"
};
log.info("Run command : {}", StringUtils.join(commands, ' '));
ContainerExecResult result = pulsarCluster.getAnyWorker().execCmd(commands);
assertTrue(
result.getStdout().contains("\"Updated successfully\""),
result.getStdout());
}
protected void getSourceInfoSuccess(SourceTester tester,
String tenant,
String namespace,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册