diff --git a/source/libs/function/src/udfd.c b/source/libs/function/src/udfd.c index ec8cde58e1762ff75752b66a9bec4ce70bd55767..9401398838063ac8791628283ba55c057661045f 100644 --- a/source/libs/function/src/udfd.c +++ b/source/libs/function/src/udfd.c @@ -30,8 +30,9 @@ #include "tmisce.h" // clang-format on -#define MAX_NUM_SCRIPT_PLUGINS 64 -#define MAX_NUM_PLUGIN_FUNCS 9 +#define UDFD_MAX_SCRIPT_PLUGINS 64 +#define UDFD_MAX_SCRIPT_TYPE 1 +#define UDFD_MAX_PLUGIN_FUNCS 9 typedef struct SUdfCPluginCtx { uv_lib_t lib; @@ -173,7 +174,6 @@ int32_t udfdCPluginUdfAggFinish(SUdfInterBuf *buf, SUdfInterBuf *resultData, voi // for others, dlopen/dlsym to find function pointers typedef struct SUdfScriptPlugin { int8_t scriptType; - const char* scriptSuffix; char libPath[PATH_MAX]; bool libLoaded; @@ -206,7 +206,7 @@ typedef struct SUdfdContext { SHashObj *udfsHash; uv_mutex_t scriptPluginsMutex; - SUdfScriptPlugin *scriptPlugins[MAX_NUM_SCRIPT_PLUGINS]; + SUdfScriptPlugin *scriptPlugins[UDFD_MAX_SCRIPT_PLUGINS]; SArray *residentFuncs; @@ -314,7 +314,6 @@ static void udfdConnectMnodeThreadFunc(void *args); void udfdInitializeCPlugin(SUdfScriptPlugin *plugin) { plugin->scriptType = TSDB_FUNC_SCRIPT_BIN_LIB; - plugin->scriptSuffix = "so"; plugin->openFunc = udfdCPluginOpen; plugin->closeFunc = udfdCPluginClose; plugin->udfInitFunc = udfdCPluginUdfInit; @@ -348,18 +347,17 @@ int32_t udfdLoadSharedLib(char *libPath, uv_lib_t *pLib, const char *funcName[], void udfdInitializePythonPlugin(SUdfScriptPlugin *plugin) { plugin->scriptType = TSDB_FUNC_SCRIPT_PYTHON; - plugin->scriptSuffix = "py"; - //todo: windows support + // todo: windows support sprintf(plugin->libPath, "%s", "libtaospyudf.so"); plugin->libLoaded = false; - const char *funcName[MAX_NUM_PLUGIN_FUNCS] = {"pyOpen", "pyClose", "pyUdfInit", - "pyUdfDestroy", "pyUdfScalarProc", "pyUdfAggStart", - "pyUdfAggFinish", "pyUdfAggProc", "pyUdfAggMerge"}; - void **funcs[MAX_NUM_PLUGIN_FUNCS] = { + const char *funcName[UDFD_MAX_PLUGIN_FUNCS] = {"pyOpen", "pyClose", "pyUdfInit", + "pyUdfDestroy", "pyUdfScalarProc", "pyUdfAggStart", + "pyUdfAggFinish", "pyUdfAggProc", "pyUdfAggMerge"}; + void **funcs[UDFD_MAX_PLUGIN_FUNCS] = { (void **)&plugin->openFunc, (void **)&plugin->closeFunc, (void **)&plugin->udfInitFunc, (void **)&plugin->udfDestroyFunc, (void **)&plugin->udfScalarProcFunc, (void **)&plugin->udfAggStartFunc, (void **)&plugin->udfAggFinishFunc, (void **)&plugin->udfAggProcFunc, (void **)&plugin->udfAggMergeFunc}; - int32_t err = udfdLoadSharedLib(plugin->libPath, &plugin->lib, funcName, funcs, MAX_NUM_PLUGIN_FUNCS); + int32_t err = udfdLoadSharedLib(plugin->libPath, &plugin->lib, funcName, funcs, UDFD_MAX_PLUGIN_FUNCS); if (err != 0) { fnError("can not load python plugin. lib path %s", plugin->libPath); return; @@ -415,30 +413,39 @@ void udfdDeinitPythonPlugin(SUdfScriptPlugin *plugin) { plugin->udfAggFinishFunc = NULL; } -void udfdInitScriptPlugins() { - SUdfScriptPlugin *plugin = NULL; +int32_t udfdInitScriptPlugin(int8_t scriptType) { + SUdfScriptPlugin *plugin = taosMemoryCalloc(1, sizeof(SUdfScriptPlugin)); - // Initialize c plugin - plugin = taosMemoryCalloc(1, sizeof(SUdfScriptPlugin)); - udfdInitializeCPlugin(plugin); - global.scriptPlugins[TSDB_FUNC_SCRIPT_BIN_LIB] = plugin; + switch (scriptType) { + case TSDB_FUNC_SCRIPT_BIN_LIB: + udfdInitializeCPlugin(plugin); + break; + case TSDB_FUNC_SCRIPT_PYTHON: + udfdInitializePythonPlugin(plugin); + break; + default: + fnError("udf script type %d not supported", scriptType); + taosMemoryFree(plugin); + return TSDB_CODE_UDF_SCRIPT_NOT_SUPPORTED; + } - // Initialize python plugin - plugin = taosMemoryCalloc(1, sizeof(SUdfScriptPlugin)); - udfdInitializePythonPlugin(plugin); - global.scriptPlugins[TSDB_FUNC_SCRIPT_PYTHON] = plugin; - return; + global.scriptPlugins[scriptType] = plugin; + return TSDB_CODE_SUCCESS; } void udfdDeinitScriptPlugins() { SUdfScriptPlugin *plugin = NULL; plugin = global.scriptPlugins[TSDB_FUNC_SCRIPT_PYTHON]; - udfdDeinitPythonPlugin(plugin); - taosMemoryFree(plugin); + if (plugin != NULL) { + udfdDeinitPythonPlugin(plugin); + taosMemoryFree(plugin); + } plugin = global.scriptPlugins[TSDB_FUNC_SCRIPT_BIN_LIB]; - udfdDeinitCPlugin(plugin); - taosMemoryFree(plugin); + if (plugin != NULL) { + udfdDeinitCPlugin(plugin); + taosMemoryFree(plugin); + } return; } @@ -481,6 +488,22 @@ void convertUdf2UdfInfo(SUdf *udf, SScriptUdfInfo *udfInfo) { udfInfo->scriptType = udf->scriptType; } +int32_t udfdRenameUdfFile(SUdf *udf) { + char newPath[PATH_MAX]; + if (udf->scriptType == TSDB_FUNC_SCRIPT_BIN_LIB) { + snprintf(newPath, PATH_MAX, "%s/lib%s.so", tsTempDir, udf->name); + taosRenameFile(udf->path, newPath); + sprintf(udf->path, "%s", newPath); + } else if (udf->scriptType == TSDB_FUNC_SCRIPT_PYTHON) { + snprintf(newPath, PATH_MAX, "%s/%s.py", tsTempDir, udf->name); + taosRenameFile(udf->path, newPath); + sprintf(udf->path, "%s", newPath); + } else { + return TSDB_CODE_UDF_SCRIPT_NOT_SUPPORTED; + } + return 0; +} + int32_t udfdInitUdf(char *udfName, SUdf *udf) { int32_t err = 0; err = udfdFillUdfInfoFromMNode(global.clientRpc, udfName, udf); @@ -488,16 +511,25 @@ int32_t udfdInitUdf(char *udfName, SUdf *udf) { fnError("can not retrieve udf from mnode. udf name %s", udfName); return TSDB_CODE_UDF_LOAD_UDF_FAILURE; } - // TODO: remove script plugins mutex + if (udf->scriptType > UDFD_MAX_SCRIPT_TYPE) { + fnError("udf name %s script type %d not supported", udfName, udf->scriptType); + return TSDB_CODE_UDF_SCRIPT_NOT_SUPPORTED; + } + uv_mutex_lock(&global.scriptPluginsMutex); SUdfScriptPlugin *scriptPlugin = global.scriptPlugins[udf->scriptType]; if (scriptPlugin == NULL) { - fnError("udf name %s script type %d not supported", udfName, udf->scriptType); - uv_mutex_unlock(&global.scriptPluginsMutex); - return TSDB_CODE_UDF_SCRIPT_NOT_SUPPORTED; + err = udfdInitScriptPlugin(udf->scriptType); + if (err != 0) { + uv_mutex_unlock(&global.scriptPluginsMutex); + return err; + } } uv_mutex_unlock(&global.scriptPluginsMutex); - udf->scriptPlugin = scriptPlugin; + udf->scriptPlugin = global.scriptPlugins[udf->scriptType]; + + udfdRenameUdfFile(udf); + SScriptUdfInfo info = {0}; convertUdf2UdfInfo(udf, &info); udf->scriptPlugin->udfInitFunc(&info, &udf->scriptUdfCtx); @@ -779,7 +811,7 @@ void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { udf->outputType = pFuncInfo->outputType; udf->outputLen = pFuncInfo->outputLen; udf->bufSize = pFuncInfo->bufSize; - const char* suffix = global.scriptPlugins[udf->scriptType]->scriptSuffix; + if (!osTempSpaceAvailable()) { terrno = TSDB_CODE_NO_AVAIL_DISK; msgInfo->code = terrno; @@ -789,9 +821,9 @@ void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { char path[PATH_MAX] = {0}; #ifdef WINDOWS - snprintf(path, sizeof(path), "%s%s.%s", tsTempDir, pFuncInfo->name, suffix); + snprintf(path, sizeof(path), "%s%s", tsTempDir, pFuncInfo->name); #else - snprintf(path, sizeof(path), "%s/%s.%s", tsTempDir, pFuncInfo->name, suffix); + snprintf(path, sizeof(path), "%s/%s", tsTempDir, pFuncInfo->name); #endif TdFilePtr file = taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC); if (file == NULL) { @@ -1353,8 +1385,6 @@ int main(int argc, char *argv[]) { return -5; } - udfdInitScriptPlugins(); - udfdInitResidentFuncs(); uv_thread_t mnodeConnectThread;