提交 6dae414e 编写于 作者: S slzhou

enhance: delay loading of python plugin

上级 5ae109cc
......@@ -30,8 +30,9 @@
#include "tmisce.h"
// clang-format on
#define MAX_NUM_SCRIPT_PLUGINS 64
#define MAX_NUM_PLUGIN_FUNCS 9
#define UDFD_MAX_SCRIPT_PLUGINS 64
#define UDFD_MAX_SCRIPT_TYPE 1
#define UDFD_MAX_PLUGIN_FUNCS 9
typedef struct SUdfCPluginCtx {
uv_lib_t lib;
......@@ -173,7 +174,6 @@ int32_t udfdCPluginUdfAggFinish(SUdfInterBuf *buf, SUdfInterBuf *resultData, voi
// for others, dlopen/dlsym to find function pointers
typedef struct SUdfScriptPlugin {
int8_t scriptType;
const char* scriptSuffix;
char libPath[PATH_MAX];
bool libLoaded;
......@@ -206,7 +206,7 @@ typedef struct SUdfdContext {
SHashObj *udfsHash;
uv_mutex_t scriptPluginsMutex;
SUdfScriptPlugin *scriptPlugins[MAX_NUM_SCRIPT_PLUGINS];
SUdfScriptPlugin *scriptPlugins[UDFD_MAX_SCRIPT_PLUGINS];
SArray *residentFuncs;
......@@ -314,7 +314,6 @@ static void udfdConnectMnodeThreadFunc(void *args);
void udfdInitializeCPlugin(SUdfScriptPlugin *plugin) {
plugin->scriptType = TSDB_FUNC_SCRIPT_BIN_LIB;
plugin->scriptSuffix = "so";
plugin->openFunc = udfdCPluginOpen;
plugin->closeFunc = udfdCPluginClose;
plugin->udfInitFunc = udfdCPluginUdfInit;
......@@ -348,18 +347,17 @@ int32_t udfdLoadSharedLib(char *libPath, uv_lib_t *pLib, const char *funcName[],
void udfdInitializePythonPlugin(SUdfScriptPlugin *plugin) {
plugin->scriptType = TSDB_FUNC_SCRIPT_PYTHON;
plugin->scriptSuffix = "py";
//todo: windows support
// todo: windows support
sprintf(plugin->libPath, "%s", "libtaospyudf.so");
plugin->libLoaded = false;
const char *funcName[MAX_NUM_PLUGIN_FUNCS] = {"pyOpen", "pyClose", "pyUdfInit",
const char *funcName[UDFD_MAX_PLUGIN_FUNCS] = {"pyOpen", "pyClose", "pyUdfInit",
"pyUdfDestroy", "pyUdfScalarProc", "pyUdfAggStart",
"pyUdfAggFinish", "pyUdfAggProc", "pyUdfAggMerge"};
void **funcs[MAX_NUM_PLUGIN_FUNCS] = {
void **funcs[UDFD_MAX_PLUGIN_FUNCS] = {
(void **)&plugin->openFunc, (void **)&plugin->closeFunc, (void **)&plugin->udfInitFunc,
(void **)&plugin->udfDestroyFunc, (void **)&plugin->udfScalarProcFunc, (void **)&plugin->udfAggStartFunc,
(void **)&plugin->udfAggFinishFunc, (void **)&plugin->udfAggProcFunc, (void **)&plugin->udfAggMergeFunc};
int32_t err = udfdLoadSharedLib(plugin->libPath, &plugin->lib, funcName, funcs, MAX_NUM_PLUGIN_FUNCS);
int32_t err = udfdLoadSharedLib(plugin->libPath, &plugin->lib, funcName, funcs, UDFD_MAX_PLUGIN_FUNCS);
if (err != 0) {
fnError("can not load python plugin. lib path %s", plugin->libPath);
return;
......@@ -415,30 +413,39 @@ void udfdDeinitPythonPlugin(SUdfScriptPlugin *plugin) {
plugin->udfAggFinishFunc = NULL;
}
void udfdInitScriptPlugins() {
SUdfScriptPlugin *plugin = NULL;
int32_t udfdInitScriptPlugin(int8_t scriptType) {
SUdfScriptPlugin *plugin = taosMemoryCalloc(1, sizeof(SUdfScriptPlugin));
// Initialize c plugin
plugin = taosMemoryCalloc(1, sizeof(SUdfScriptPlugin));
switch (scriptType) {
case TSDB_FUNC_SCRIPT_BIN_LIB:
udfdInitializeCPlugin(plugin);
global.scriptPlugins[TSDB_FUNC_SCRIPT_BIN_LIB] = plugin;
// Initialize python plugin
plugin = taosMemoryCalloc(1, sizeof(SUdfScriptPlugin));
break;
case TSDB_FUNC_SCRIPT_PYTHON:
udfdInitializePythonPlugin(plugin);
global.scriptPlugins[TSDB_FUNC_SCRIPT_PYTHON] = plugin;
return;
break;
default:
fnError("udf script type %d not supported", scriptType);
taosMemoryFree(plugin);
return TSDB_CODE_UDF_SCRIPT_NOT_SUPPORTED;
}
global.scriptPlugins[scriptType] = plugin;
return TSDB_CODE_SUCCESS;
}
void udfdDeinitScriptPlugins() {
SUdfScriptPlugin *plugin = NULL;
plugin = global.scriptPlugins[TSDB_FUNC_SCRIPT_PYTHON];
if (plugin != NULL) {
udfdDeinitPythonPlugin(plugin);
taosMemoryFree(plugin);
}
plugin = global.scriptPlugins[TSDB_FUNC_SCRIPT_BIN_LIB];
if (plugin != NULL) {
udfdDeinitCPlugin(plugin);
taosMemoryFree(plugin);
}
return;
}
......@@ -481,6 +488,22 @@ void convertUdf2UdfInfo(SUdf *udf, SScriptUdfInfo *udfInfo) {
udfInfo->scriptType = udf->scriptType;
}
int32_t udfdRenameUdfFile(SUdf *udf) {
char newPath[PATH_MAX];
if (udf->scriptType == TSDB_FUNC_SCRIPT_BIN_LIB) {
snprintf(newPath, PATH_MAX, "%s/lib%s.so", tsTempDir, udf->name);
taosRenameFile(udf->path, newPath);
sprintf(udf->path, "%s", newPath);
} else if (udf->scriptType == TSDB_FUNC_SCRIPT_PYTHON) {
snprintf(newPath, PATH_MAX, "%s/%s.py", tsTempDir, udf->name);
taosRenameFile(udf->path, newPath);
sprintf(udf->path, "%s", newPath);
} else {
return TSDB_CODE_UDF_SCRIPT_NOT_SUPPORTED;
}
return 0;
}
int32_t udfdInitUdf(char *udfName, SUdf *udf) {
int32_t err = 0;
err = udfdFillUdfInfoFromMNode(global.clientRpc, udfName, udf);
......@@ -488,16 +511,25 @@ int32_t udfdInitUdf(char *udfName, SUdf *udf) {
fnError("can not retrieve udf from mnode. udf name %s", udfName);
return TSDB_CODE_UDF_LOAD_UDF_FAILURE;
}
// TODO: remove script plugins mutex
if (udf->scriptType > UDFD_MAX_SCRIPT_TYPE) {
fnError("udf name %s script type %d not supported", udfName, udf->scriptType);
return TSDB_CODE_UDF_SCRIPT_NOT_SUPPORTED;
}
uv_mutex_lock(&global.scriptPluginsMutex);
SUdfScriptPlugin *scriptPlugin = global.scriptPlugins[udf->scriptType];
if (scriptPlugin == NULL) {
fnError("udf name %s script type %d not supported", udfName, udf->scriptType);
err = udfdInitScriptPlugin(udf->scriptType);
if (err != 0) {
uv_mutex_unlock(&global.scriptPluginsMutex);
return TSDB_CODE_UDF_SCRIPT_NOT_SUPPORTED;
return err;
}
}
uv_mutex_unlock(&global.scriptPluginsMutex);
udf->scriptPlugin = scriptPlugin;
udf->scriptPlugin = global.scriptPlugins[udf->scriptType];
udfdRenameUdfFile(udf);
SScriptUdfInfo info = {0};
convertUdf2UdfInfo(udf, &info);
udf->scriptPlugin->udfInitFunc(&info, &udf->scriptUdfCtx);
......@@ -779,7 +811,7 @@ void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
udf->outputType = pFuncInfo->outputType;
udf->outputLen = pFuncInfo->outputLen;
udf->bufSize = pFuncInfo->bufSize;
const char* suffix = global.scriptPlugins[udf->scriptType]->scriptSuffix;
if (!osTempSpaceAvailable()) {
terrno = TSDB_CODE_NO_AVAIL_DISK;
msgInfo->code = terrno;
......@@ -789,9 +821,9 @@ void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
char path[PATH_MAX] = {0};
#ifdef WINDOWS
snprintf(path, sizeof(path), "%s%s.%s", tsTempDir, pFuncInfo->name, suffix);
snprintf(path, sizeof(path), "%s%s", tsTempDir, pFuncInfo->name);
#else
snprintf(path, sizeof(path), "%s/%s.%s", tsTempDir, pFuncInfo->name, suffix);
snprintf(path, sizeof(path), "%s/%s", tsTempDir, pFuncInfo->name);
#endif
TdFilePtr file = taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC);
if (file == NULL) {
......@@ -1353,8 +1385,6 @@ int main(int argc, char *argv[]) {
return -5;
}
udfdInitScriptPlugins();
udfdInitResidentFuncs();
uv_thread_t mnodeConnectThread;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册