提交 e8025d50 编写于 作者: B Boyang Jerry Peng 提交者: Matteo Merli

fix issue when submitting NAR via file url (#4577)

* fix issue when submitting NAR via file url

* fix unit tests

* add more specific errors

* fix test
上级 2f686157
......@@ -599,12 +599,11 @@ public class PulsarFunctionLocalRunTest {
testPulsarSourceLocalRun(null);
}
// TODO bug to fix involving submitting a NAR via URI file:///tmp/pulsar-io-twitter-0.0.1.nar
// @Test(timeOut = 20000)
// public void testPulsarSourceLocalRunWithFile() throws Exception {
// String jarFilePathUrl = Utils.FILE + ":" + getClass().getClassLoader().getResource("pulsar-io-data-generator.nar").getFile();
// testPulsarSourceStats(jarFilePathUrl);
// }
@Test(timeOut = 20000)
public void testPulsarSourceLocalRunWithFile() throws Exception {
String jarFilePathUrl = Utils.FILE + ":" + getClass().getClassLoader().getResource("pulsar-io-data-generator.nar").getFile();
testPulsarSourceLocalRun(jarFilePathUrl);
}
@Test(timeOut = 40000)
public void testPulsarSourceLocalRunWithUrl() throws Exception {
......@@ -705,11 +704,11 @@ public class PulsarFunctionLocalRunTest {
testPulsarSinkStats(null);
}
// @Test(timeOut = 20000)
// public void testPulsarSinkStatsWithFile() throws Exception {
// String jarFilePathUrl = Utils.FILE + ":" + getClass().getClassLoader().getResource("pulsar-io-data-generator.nar").getFile();
// testPulsarSinkStats(jarFilePathUrl);
// }
@Test(timeOut = 20000)
public void testPulsarSinkStatsWithFile() throws Exception {
String jarFilePathUrl = Utils.FILE + ":" + getClass().getClassLoader().getResource("pulsar-io-data-generator.nar").getFile();
testPulsarSinkStats(jarFilePathUrl);
}
@Test(timeOut = 40000)
public void testPulsarSinkStatsWithUrl() throws Exception {
......
......@@ -248,7 +248,6 @@ public class LocalRunner {
.loadClass(LocalRunner.class.getName())
.getProtectionDomain().getCodeSource().getLocation().getFile();
}
log.info("userCodeFile: {}", userCodeFile);
String builtInSource = isBuiltInSource(userCodeFile);
if (builtInSource != null) {
......
......@@ -42,7 +42,11 @@ import java.io.File;
import java.io.IOException;
import java.lang.reflect.Type;
import java.nio.file.Path;
import java.util.*;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import static org.apache.commons.lang3.StringUtils.isEmpty;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
......@@ -283,7 +287,7 @@ public class SinkConfigUtils {
}
public static ExtractedSinkDetails validate(SinkConfig sinkConfig, Path archivePath,
File uploadedInputStreamAsFile) {
File sinkPackageFile) {
if (isEmpty(sinkConfig.getTenant())) {
throw new IllegalArgumentException("Sink tenant cannot be null");
}
......@@ -318,79 +322,112 @@ public class SinkConfigUtils {
throw new IllegalArgumentException("Sink timeout must be a positive number");
}
String sinkClassName;
final Class<?> typeArg;
final ClassLoader classLoader;
if (!isEmpty(sinkConfig.getClassName())) {
sinkClassName = sinkConfig.getClassName();
// We really don't know if we should use nar class loader or regular classloader
ClassLoader jarClassLoader = null;
ClassLoader narClassLoader = null;
try {
jarClassLoader = FunctionCommon.extractClassLoader(archivePath, uploadedInputStreamAsFile);
} catch (Exception e) {
if (archivePath == null && sinkPackageFile == null) {
throw new IllegalArgumentException("Sink package is not provided");
}
Class<?> typeArg;
ClassLoader classLoader;
String sinkClassName = sinkConfig.getClassName();
ClassLoader jarClassLoader = null;
ClassLoader narClassLoader = null;
Exception jarClassLoaderException = null;
Exception narClassLoaderException = null;
try {
jarClassLoader = FunctionCommon.extractClassLoader(archivePath, sinkPackageFile);
} catch (Exception e) {
jarClassLoaderException = e;
}
try {
narClassLoader = FunctionCommon.extractNarClassLoader(archivePath, sinkPackageFile);
} catch (Exception e) {
narClassLoaderException = e;
}
// if sink class name is not provided, we can only try to load archive as a NAR
if (isEmpty(sinkClassName)) {
if (narClassLoader == null) {
throw new IllegalArgumentException("Sink package does not have the correct format. " +
"Pulsar cannot determine if the package is a NAR package or JAR package." +
"Sink classname is not provided and attempts to load it as a NAR package produced error: "
+ narClassLoaderException.getMessage());
}
try {
narClassLoader = FunctionCommon.extractNarClassLoader(archivePath, uploadedInputStreamAsFile);
} catch (Exception e) {
}
if (jarClassLoader == null && narClassLoader == null) {
throw new IllegalArgumentException("Invalid Sink Package");
sinkClassName = ConnectorUtils.getIOSinkClass(narClassLoader);
} catch (IOException e) {
throw new IllegalArgumentException("Failed to extract Sink class from archive", e);
}
// We use typeArg and classLoader as arguments for lambda functions that require them to be final
// Thus we use these tmp vars
Class<?> tmptypeArg;
ClassLoader tmpclassLoader;
try {
tmptypeArg = getSinkType(sinkClassName, narClassLoader);
tmpclassLoader = narClassLoader;
} catch (Exception e) {
typeArg = getSinkType(sinkClassName, narClassLoader);
classLoader = narClassLoader;
} catch (ClassNotFoundException e) {
throw new IllegalArgumentException(
String.format("Sink class %s must be in class path", sinkClassName), e);
}
} else {
// if sink class name is provided, we need to try to load it as a JAR and as a NAR.
if (jarClassLoader != null) {
try {
typeArg = getSinkType(sinkClassName, jarClassLoader);
classLoader = jarClassLoader;
} catch (ClassNotFoundException e) {
// class not found in JAR try loading as a NAR and searching for the class
if (narClassLoader != null) {
try {
typeArg = getSinkType(sinkClassName, narClassLoader);
classLoader = narClassLoader;
} catch (ClassNotFoundException e1) {
throw new IllegalArgumentException(
String.format("Sink class %s must be in class path", sinkClassName), e1);
}
} else {
throw new IllegalArgumentException(
String.format("Sink class %s must be in class path", sinkClassName), e);
}
}
} else if (narClassLoader != null) {
try {
tmptypeArg = getSinkType(sinkClassName, jarClassLoader);
typeArg = getSinkType(sinkClassName, narClassLoader);
classLoader = narClassLoader;
} catch (ClassNotFoundException e1) {
throw new IllegalArgumentException(
String.format("Sink class %s must be in class path", sinkClassName), e1);
}
tmpclassLoader = jarClassLoader;
}
typeArg = tmptypeArg;
classLoader = tmpclassLoader;
} else if (!org.apache.commons.lang3.StringUtils.isEmpty(sinkConfig.getArchive()) && sinkConfig.getArchive().startsWith(org.apache.pulsar.common.functions.Utils.FILE)) {
throw new IllegalArgumentException("Class-name must be present for archive with file-url");
} else {
classLoader = FunctionCommon.extractNarClassLoader(archivePath, uploadedInputStreamAsFile);
if (classLoader == null) {
throw new IllegalArgumentException("Sink Package is not provided");
}
try {
sinkClassName = ConnectorUtils.getIOSinkClass(classLoader);
} catch (IOException e1) {
throw new IllegalArgumentException("Failed to extract sink class from archive", e1);
}
try {
typeArg = getSinkType(sinkClassName, classLoader);
} catch (ClassNotFoundException e) {
throw new IllegalArgumentException(
String.format("Sink class %s must be in class path", sinkClassName), e);
} else {
StringBuilder errorMsg = new StringBuilder("Sink package does not have the correct format." +
" Pulsar cannot determine if the package is a NAR package or JAR package.");
if (jarClassLoaderException != null) {
errorMsg.append("Attempts to load it as a JAR package produced error: " + jarClassLoaderException.getMessage());
}
if (narClassLoaderException != null) {
errorMsg.append("Attempts to load it as a NAR package produced error: " + narClassLoaderException.getMessage());
}
throw new IllegalArgumentException(errorMsg.toString());
}
}
if (sinkConfig.getTopicToSerdeClassName() != null) {
sinkConfig.getTopicToSerdeClassName().forEach((topicName, serdeClassName) -> {
ValidatorUtils.validateSerde(serdeClassName, typeArg, classLoader, true);
});
for (String serdeClassName : sinkConfig.getTopicToSerdeClassName().values()) {
ValidatorUtils.validateSerde(serdeClassName, typeArg, classLoader, true);
}
}
if (sinkConfig.getTopicToSchemaType() != null) {
sinkConfig.getTopicToSchemaType().forEach((topicName, schemaType) -> {
for (String schemaType : sinkConfig.getTopicToSchemaType().values()) {
ValidatorUtils.validateSchema(schemaType, typeArg, classLoader, true);
});
}
}
// topicsPattern does not need checks
if (sinkConfig.getInputSpecs() != null) {
sinkConfig.getInputSpecs().forEach((topicName, consumerSpec) -> {
for (ConsumerConfig consumerSpec : sinkConfig.getInputSpecs().values()) {
// Only one is set
if (!isEmpty(consumerSpec.getSerdeClassName()) && !isEmpty(consumerSpec.getSchemaType())) {
throw new IllegalArgumentException("Only one of serdeClassName or schemaType should be set");
......@@ -401,7 +438,7 @@ public class SinkConfigUtils {
if (!isEmpty(consumerSpec.getSchemaType())) {
ValidatorUtils.validateSchema(consumerSpec.getSchemaType(), typeArg, classLoader, true);
}
});
}
}
return new ExtractedSinkDetails(sinkClassName, typeArg.getName());
}
......
......@@ -222,61 +222,93 @@ public class SourceConfigUtils {
if (sourceConfig.getResources() != null) {
ResourceConfigUtils.validate(sourceConfig.getResources());
}
if (archivePath == null && sourcePackageFile == null) {
throw new IllegalArgumentException("Source package is not provided");
}
String sourceClassName;
final Class<?> typeArg;
final ClassLoader classLoader;
if (!isEmpty(sourceConfig.getClassName())) {
sourceClassName = sourceConfig.getClassName();
// We really don't know if we should use nar class loader or regular classloader
ClassLoader jarClassLoader = null;
ClassLoader narClassLoader = null;
try {
jarClassLoader = FunctionCommon.extractClassLoader(archivePath, sourcePackageFile);
} catch (Exception e) {
Class<?> typeArg;
ClassLoader classLoader;
String sourceClassName = sourceConfig.getClassName();
ClassLoader jarClassLoader = null;
ClassLoader narClassLoader = null;
Exception jarClassLoaderException = null;
Exception narClassLoaderException = null;
try {
jarClassLoader = FunctionCommon.extractClassLoader(archivePath, sourcePackageFile);
} catch (Exception e) {
jarClassLoaderException = e;
}
try {
narClassLoader = FunctionCommon.extractNarClassLoader(archivePath, sourcePackageFile);
} catch (Exception e) {
narClassLoaderException = e;
}
// if source class name is not provided, we can only try to load archive as a NAR
if (isEmpty(sourceClassName)) {
if (narClassLoader == null) {
throw new IllegalArgumentException("Source package does not have the correct format. " +
"Pulsar cannot determine if the package is a NAR package or JAR package." +
"Source classname is not provided and attempts to load it as a NAR package produced the following error.",
narClassLoaderException);
}
try {
narClassLoader = FunctionCommon.extractNarClassLoader(archivePath, sourcePackageFile);
} catch (Exception e) {
}
if (jarClassLoader == null && narClassLoader == null) {
throw new IllegalArgumentException("Invalid Source Package");
sourceClassName = ConnectorUtils.getIOSourceClass((NarClassLoader) narClassLoader);
} catch (IOException e) {
throw new IllegalArgumentException("Failed to extract source class from archive", e);
}
// We use typeArg and classLoader as arguments for lambda functions that require them to be final
// Thus we use these tmp vars
Class<?> tmptypeArg;
ClassLoader tmpclassLoader;
try {
tmptypeArg = getSourceType(sourceClassName, narClassLoader);
tmpclassLoader = narClassLoader;
} catch (Exception e) {
typeArg = getSourceType(sourceClassName, narClassLoader);
classLoader = narClassLoader;
} catch (ClassNotFoundException e) {
throw new IllegalArgumentException(
String.format("Source class %s must be in class path", sourceClassName), e);
}
} else {
// if source class name is provided, we need to try to load it as a JAR and as a NAR.
if (jarClassLoader != null) {
try {
tmptypeArg = getSourceType(sourceClassName, jarClassLoader);
typeArg = getSourceType(sourceClassName, jarClassLoader);
classLoader = jarClassLoader;
} catch (ClassNotFoundException e) {
// class not found in JAR try loading as a NAR and searching for the class
if (narClassLoader != null) {
try {
typeArg = getSourceType(sourceClassName, narClassLoader);
classLoader = narClassLoader;
} catch (ClassNotFoundException e1) {
throw new IllegalArgumentException(
String.format("Source class %s must be in class path", sourceClassName), e1);
}
} else {
throw new IllegalArgumentException(
String.format("Source class %s must be in class path", sourceClassName), e);
}
}
} else if (narClassLoader != null) {
try {
typeArg = getSourceType(sourceClassName, narClassLoader);
classLoader = narClassLoader;
} catch (ClassNotFoundException e1) {
throw new IllegalArgumentException(
String.format("Source class %s must be in class path", sourceClassName), e1);
}
tmpclassLoader = jarClassLoader;
}
typeArg = tmptypeArg;
classLoader = tmpclassLoader;
} else if (!StringUtils.isEmpty(sourceConfig.getArchive()) && sourceConfig.getArchive().startsWith(org.apache.pulsar.common.functions.Utils.FILE)) {
throw new IllegalArgumentException("Class-name must be present for archive with file-url");
} else {
classLoader = FunctionCommon.extractNarClassLoader(archivePath, sourcePackageFile);
if (classLoader == null) {
throw new IllegalArgumentException("Source Package is not provided");
}
try {
sourceClassName = ConnectorUtils.getIOSourceClass((NarClassLoader) classLoader);
} catch (IOException e1) {
throw new IllegalArgumentException("Failed to extract source class from archive", e1);
}
try {
typeArg = getSourceType(sourceClassName, classLoader);
} catch (ClassNotFoundException e) {
throw new IllegalArgumentException(
String.format("Source class %s must be in class path", sourceClassName), e);
} else {
StringBuilder errorMsg = new StringBuilder("Source package does not have the correct format." +
" Pulsar cannot determine if the package is a NAR package or JAR package.");
if (jarClassLoaderException != null) {
errorMsg.append("Attempts to load it as a JAR package produced error: " + jarClassLoaderException.getMessage());
}
if (narClassLoaderException != null) {
errorMsg.append("Attempts to load it as a NAR package produced error: " + narClassLoaderException.getMessage());
}
throw new IllegalArgumentException(errorMsg.toString());
}
}
......
......@@ -339,7 +339,6 @@ public class FunctionsImpl extends ComponentImpl {
componentPackageFile = FunctionCommon.createPkgTempFile();
componentPackageFile.deleteOnExit();
log.info("componentPackageFile: {}", componentPackageFile);
WorkerUtils.downloadFromBookkeeper(worker().getDlogNamespace(), componentPackageFile, existingComponent.getPackageLocation().getPackagePath());
functionDetails = validateUpdateRequestParams(tenant, namespace, functionName,
......
......@@ -344,7 +344,6 @@ public class SinksImpl extends ComponentImpl {
componentPackageFile = FunctionCommon.createPkgTempFile();
componentPackageFile.deleteOnExit();
log.info("componentPackageFile: {}", componentPackageFile);
WorkerUtils.downloadFromBookkeeper(worker().getDlogNamespace(), componentPackageFile, existingComponent.getPackageLocation().getPackagePath());
functionDetails = validateUpdateRequestParams(tenant, namespace, sinkName,
......
......@@ -341,7 +341,6 @@ public class SourcesImpl extends ComponentImpl {
componentPackageFile = FunctionCommon.createPkgTempFile();
componentPackageFile.deleteOnExit();
log.info("componentPackageFile: {}", componentPackageFile);
WorkerUtils.downloadFromBookkeeper(worker().getDlogNamespace(), componentPackageFile, existingComponent.getPackageLocation().getPackagePath());
functionDetails = validateUpdateRequestParams(tenant, namespace, sourceName,
......
......@@ -19,7 +19,6 @@
package org.apache.pulsar.functions.worker.rest.api.v3;
import com.google.common.collect.Lists;
import com.google.gson.Gson;
import lombok.extern.slf4j.Slf4j;
import org.apache.distributedlog.api.namespace.Namespace;
import org.apache.logging.log4j.Level;
......@@ -243,7 +242,7 @@ public class SinkApiV3ResourceTest {
}
}
@Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Sink Package is not provided")
@Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Sink package is not provided")
public void testRegisterSinkMissingPackage() {
try {
testRegisterSinkMissingArguments(
......@@ -283,7 +282,10 @@ public class SinkApiV3ResourceTest {
}
}
@Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "zip file is empty")
@Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Sink package does not have the" +
" correct format. Pulsar cannot determine if the package is a NAR package" +
" or JAR package.Sink classname is not provided and attempts to load it as a NAR package produced error: " +
"zip file is empty")
public void testRegisterSinkMissingPackageDetails() {
try {
testRegisterSinkMissingArguments(
......@@ -303,7 +305,7 @@ public class SinkApiV3ResourceTest {
}
}
@Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Failed to extract sink class from archive")
@Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Failed to extract Sink class from archive")
public void testRegisterSinkInvalidJarNoSink() throws IOException {
try {
FileInputStream inputStream = new FileInputStream(INVALID_JAR_FILE_PATH);
......@@ -948,6 +950,7 @@ public class SinkApiV3ResourceTest {
anyString(),
any(File.class),
any(Namespace.class));
PowerMockito.when(WorkerUtils.class, "dumpToTmpFile", any()).thenCallRealMethod();
when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(sink))).thenReturn(true);
......@@ -961,7 +964,7 @@ public class SinkApiV3ResourceTest {
}
@Test
public void testUpdateSinkWithUrl() throws IOException, ClassNotFoundException {
public void testUpdateSinkWithUrl() throws Exception {
Configurator.setRootLevel(Level.DEBUG);
String filePackageUrl = "file://" + JAR_FILE_PATH;
......@@ -982,6 +985,7 @@ public class SinkApiV3ResourceTest {
mockStatic(FunctionCommon.class);
doReturn(String.class).when(FunctionCommon.class);
FunctionCommon.getSinkType(anyString(), any(NarClassLoader.class));
PowerMockito.when(FunctionCommon.class, "extractFileFromPkgURL", any()).thenCallRealMethod();
doReturn(mock(NarClassLoader.class)).when(FunctionCommon.class);
FunctionCommon.extractNarClassLoader(any(Path.class), any(File.class));
......@@ -989,7 +993,6 @@ public class SinkApiV3ResourceTest {
doReturn(ATLEAST_ONCE).when(FunctionCommon.class);
FunctionCommon.convertProcessingGuarantee(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
this.mockedFunctionMetaData = FunctionMetaData.newBuilder().setFunctionDetails(createDefaultFunctionDetails()).build();
when(mockedManager.getFunctionMetaData(any(), any(), any())).thenReturn(mockedFunctionMetaData);
......@@ -1019,6 +1022,7 @@ public class SinkApiV3ResourceTest {
anyString(),
any(File.class),
any(Namespace.class));
PowerMockito.when(WorkerUtils.class, "dumpToTmpFile", any()).thenCallRealMethod();
when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(sink))).thenReturn(true);
......@@ -1044,6 +1048,7 @@ public class SinkApiV3ResourceTest {
anyString(),
any(File.class),
any(Namespace.class));
PowerMockito.when(WorkerUtils.class, "dumpToTmpFile", any()).thenCallRealMethod();
when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(sink))).thenReturn(true);
......
......@@ -263,7 +263,7 @@ public class SourceApiV3ResourceTest {
}
}
@Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Source Package is not provided")
@Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Source package is not provided")
public void testRegisterSourceMissingPackage() {
try {
testRegisterSourceMissingArguments(
......@@ -979,7 +979,7 @@ public class SourceApiV3ResourceTest {
}
@Test
public void testUpdateSourceWithUrl() throws IOException, ClassNotFoundException {
public void testUpdateSourceWithUrl() throws Exception {
Configurator.setRootLevel(Level.DEBUG);
String filePackageUrl = "file://" + JAR_FILE_PATH;
......@@ -1000,6 +1000,7 @@ public class SourceApiV3ResourceTest {
mockStatic(FunctionCommon.class);
doReturn(String.class).when(FunctionCommon.class);
FunctionCommon.getSourceType(anyString(), any(NarClassLoader.class));
PowerMockito.when(FunctionCommon.class, "extractFileFromPkgURL", any()).thenCallRealMethod();
doReturn(mock(NarClassLoader.class)).when(FunctionCommon.class);
FunctionCommon.extractNarClassLoader(any(Path.class), any(File.class));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册