提交 9114ddc1 编写于 作者: S Sanjeev Kulkarni 提交者: Sijie Guo

Moved localrun to process based runtime (#261)

上级 dc8f39c7
...@@ -51,7 +51,7 @@ import org.apache.pulsar.functions.api.PulsarFunction; ...@@ -51,7 +51,7 @@ import org.apache.pulsar.functions.api.PulsarFunction;
import org.apache.pulsar.functions.api.utils.DefaultSerDe; import org.apache.pulsar.functions.api.utils.DefaultSerDe;
import org.apache.pulsar.functions.proto.Function.FunctionConfig; import org.apache.pulsar.functions.proto.Function.FunctionConfig;
import org.apache.pulsar.functions.instance.InstanceConfig; import org.apache.pulsar.functions.instance.InstanceConfig;
import org.apache.pulsar.functions.runtime.ThreadRuntimeFactory; import org.apache.pulsar.functions.runtime.ProcessRuntimeFactory;
import org.apache.pulsar.functions.api.SerDe; import org.apache.pulsar.functions.api.SerDe;
import org.apache.pulsar.functions.runtime.RuntimeSpawner; import org.apache.pulsar.functions.runtime.RuntimeSpawner;
import org.apache.pulsar.functions.utils.FunctionConfigUtils; import org.apache.pulsar.functions.utils.FunctionConfigUtils;
...@@ -447,10 +447,8 @@ public class CmdFunctions extends CmdBase { ...@@ -447,10 +447,8 @@ public class CmdFunctions extends CmdBase {
if (brokerServiceUrl != null) { if (brokerServiceUrl != null) {
serviceUrl = brokerServiceUrl; serviceUrl = brokerServiceUrl;
} }
try (ThreadRuntimeFactory containerFactory = new ThreadRuntimeFactory( try (ProcessRuntimeFactory containerFactory = new ProcessRuntimeFactory(
"LocalRunnerThreadGroup", serviceUrl, null, null, null)) {
serviceUrl,
stateStorageServiceUrl)) {
InstanceConfig instanceConfig = new InstanceConfig(); InstanceConfig instanceConfig = new InstanceConfig();
instanceConfig.setFunctionConfig(functionConfig); instanceConfig.setFunctionConfig(functionConfig);
......
...@@ -23,6 +23,8 @@ import com.google.common.annotations.VisibleForTesting; ...@@ -23,6 +23,8 @@ import com.google.common.annotations.VisibleForTesting;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.functions.instance.InstanceConfig; import org.apache.pulsar.functions.instance.InstanceConfig;
import java.nio.file.Paths;
/** /**
* Thread based function container factory implementation. * Thread based function container factory implementation.
*/ */
...@@ -44,6 +46,39 @@ public class ProcessRuntimeFactory implements RuntimeFactory { ...@@ -44,6 +46,39 @@ public class ProcessRuntimeFactory implements RuntimeFactory {
this.javaInstanceJarFile = javaInstanceJarFile; this.javaInstanceJarFile = javaInstanceJarFile;
this.pythonInstanceFile = pythonInstanceFile; this.pythonInstanceFile = pythonInstanceFile;
this.logDirectory = logDirectory; this.logDirectory = logDirectory;
// if things are not specified, try to figure out by env properties
if (this.javaInstanceJarFile == null) {
String envJavaInstanceJarLocation = System.getProperty("pulsar.functions.java.instance.jar");
if (null != envJavaInstanceJarLocation) {
log.info("Java instance jar location is not defined,"
+ " using the location defined in system environment : {}", envJavaInstanceJarLocation);
this.javaInstanceJarFile = envJavaInstanceJarLocation;
} else {
throw new RuntimeException("No JavaInstanceJar specified");
}
}
if (this.pythonInstanceFile == null) {
String envPythonInstanceLocation = System.getProperty("pulsar.functions.python.instance.file");
if (null != envPythonInstanceLocation) {
log.info("Python instance file location is not defined"
+ " using the location defined in system environment : {}", envPythonInstanceLocation);
this.pythonInstanceFile = envPythonInstanceLocation;
} else {
throw new RuntimeException("No PythonInstanceFile specified");
}
}
if (this.logDirectory == null) {
String envProcessContainerLogDirectory = System.getProperty("pulsar.functions.process.container.log.dir");
if (null != envProcessContainerLogDirectory) {
this.logDirectory = envProcessContainerLogDirectory;
} else {
// use a default location
this.logDirectory = Paths.get("logs").toFile().getAbsolutePath();
}
}
} }
@Override @Override
......
...@@ -22,7 +22,6 @@ import static org.apache.commons.lang3.StringUtils.isBlank; ...@@ -22,7 +22,6 @@ import static org.apache.commons.lang3.StringUtils.isBlank;
import com.beust.jcommander.JCommander; import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter; import com.beust.jcommander.Parameter;
import java.nio.file.Paths;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
/** /**
...@@ -61,40 +60,6 @@ public class FunctionWorkerStarter { ...@@ -61,40 +60,6 @@ public class FunctionWorkerStarter {
} else { } else {
workerConfig = WorkerConfig.load(workerArguments.configFile); workerConfig = WorkerConfig.load(workerArguments.configFile);
} }
// searching the java instance location if it is not defined
if (null != workerConfig.getProcessContainerFactory()
&& null == workerConfig.getProcessContainerFactory().getJavaInstanceJarLocation()) {
String envJavaInstanceJarLocation = System.getProperty("pulsar.functions.java.instance.jar");
if (null != envJavaInstanceJarLocation) {
log.info("Java instance jar location is not defined in worker config yml."
+ " Use the location defined in system environment : {}", envJavaInstanceJarLocation);
workerConfig.getProcessContainerFactory().setJavaInstanceJarLocation(envJavaInstanceJarLocation);
}
}
if (null != workerConfig.getProcessContainerFactory()
&& null == workerConfig.getProcessContainerFactory().getPythonInstanceLocation()) {
String envPythonInstanceLocation = System.getProperty("pulsar.functions.python.instance.file");
if (null != envPythonInstanceLocation) {
log.info("Python instance file location is not defined in worker config yml."
+ " Use the location defined in system environment : {}", envPythonInstanceLocation);
workerConfig.getProcessContainerFactory().setPythonInstanceLocation(envPythonInstanceLocation);
}
}
// config the log directory
if (null != workerConfig.getProcessContainerFactory()
&& null == workerConfig.getProcessContainerFactory().getLogDirectory()) {
String envProcessContainerLogDirectory = System.getProperty("pulsar.functions.process.container.log.dir");
if (null != envProcessContainerLogDirectory) {
workerConfig.getProcessContainerFactory().setLogDirectory(envProcessContainerLogDirectory);
} else {
// use a default location
workerConfig.getProcessContainerFactory().setLogDirectory(
Paths.get("logs").toFile().getAbsolutePath());
}
}
final Worker worker = new Worker(workerConfig); final Worker worker = new Worker(workerConfig);
worker.startAsync(); worker.startAsync();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册