From b725c9e192bb8451f128ef985589fb4483b69431 Mon Sep 17 00:00:00 2001 From: slzhou Date: Sun, 9 Apr 2023 20:31:25 +0800 Subject: [PATCH] enhance: change udf func body dir to tsDataDir/.udf --- source/libs/function/src/udfd.c | 58 +++++++++++++++++++++++++-------- 1 file changed, 45 insertions(+), 13 deletions(-) diff --git a/source/libs/function/src/udfd.c b/source/libs/function/src/udfd.c index 70631888ca..aa72309c62 100644 --- a/source/libs/function/src/udfd.c +++ b/source/libs/function/src/udfd.c @@ -229,6 +229,7 @@ typedef struct SUdfdContext { SArray *residentFuncs; + char udfDataDir[PATH_MAX]; bool printVersion; } SUdfdContext; @@ -390,12 +391,13 @@ int32_t udfdInitializePythonPlugin(SUdfScriptPlugin *plugin) { } if (plugin->openFunc) { - int16_t lenPythonPath = strlen(tsUdfdLdLibPath) + strlen(tsDataDir) + 1 + 1; // tsDataDir:tsUdfdLdLibPath - char *pythonPath = taosMemoryMalloc(lenPythonPath); + int16_t lenPythonPath = + strlen(tsUdfdLdLibPath) + strlen(global.udfDataDir) + 1 + 1; // global.udfDataDir:tsUdfdLdLibPath + char *pythonPath = taosMemoryMalloc(lenPythonPath); #ifdef WINDOWS - snprintf(pythonPath, lenPythonPath, "%s;%s", tsDataDir, tsUdfdLdLibPath); + snprintf(pythonPath, lenPythonPath, "%s;%s", global.udfDataDir, tsUdfdLdLibPath); #else - snprintf(pythonPath, lenPythonPath, "%s:%s", tsDataDir, tsUdfdLdLibPath); + snprintf(pythonPath, lenPythonPath, "%s:%s", global.udfDataDir, tsUdfdLdLibPath); #endif SScriptUdfEnvItem items[] = {{"PYTHONPATH", pythonPath}, {"LOGDIR", tsLogDir}}; err = plugin->openFunc(items, 2); @@ -567,7 +569,7 @@ int32_t udfdInitUdf(char *udfName, SUdf *udf) { SUdf *udfdNewUdf(const char *udfName) { SUdf *udfNew = taosMemoryCalloc(1, sizeof(SUdf)); udfNew->refCount = 1; - udfNew->lastFetchTime = taosGetTimestampUs(); + udfNew->lastFetchTime = taosGetTimestampMs(); strncpy(udfNew->name, udfName, TSDB_FUNC_NAME_LEN); udfNew->state = UDF_STATE_INIT; @@ -592,15 +594,19 @@ SUdf *udfdGetOrCreateUdf(const char *udfName) { int64_t currTime = taosGetTimestampSec(); bool expired = false; if (pUdfHash) { - expired = currTime - (*pUdfHash)->lastFetchTime > 10 * 1000 * 1000; // 10s + expired = currTime - (*pUdfHash)->lastFetchTime > 10 * 1000; // 10s if (!expired) { ++(*pUdfHash)->refCount; SUdf *udf = *pUdfHash; uv_mutex_unlock(&global.udfsMutex); + fnInfo("udfd reuse existing udf. udf %s udf version %d, udf created time %" PRIx64, udf->name, udf->version, + udf->createdTime); return udf; } else { (*pUdfHash)->expired = true; taosHashRemove(global.udfsHash, udfName, strlen(udfName)); + fnInfo("udfd expired, check for new version. existing udf %s udf version %d, udf created time %" PRIx64, + (*pUdfHash)->name, (*pUdfHash)->version, (*pUdfHash)->createdTime); } } @@ -814,21 +820,22 @@ void udfdProcessTeardownRequest(SUvUdfWork *uvUdf, SUdfRequest *request) { void udfdGetFuncBodyPath(const SUdf *udf, char *path) { if (udf->scriptType == TSDB_FUNC_SCRIPT_BIN_LIB) { #ifdef WINDOWS - snprintf(path, PATH_MAX, "%s%s_%d_%" PRIx64 ".dll", tsDataDir, udf->name, udf->version, udf->createdTime); + snprintf(path, PATH_MAX, "%s%s_%d_%" PRIx64 ".dll", global.udfDataDir, udf->name, udf->version, udf->createdTime); #else - snprintf(path, PATH_MAX, "%s/lib%s_%d_%" PRIx64 ".so", tsDataDir, udf->name, udf->version, udf->createdTime); + snprintf(path, PATH_MAX, "%s/lib%s_%d_%" PRIx64 ".so", global.udfDataDir, udf->name, udf->version, + udf->createdTime); #endif } else if (udf->scriptType == TSDB_FUNC_SCRIPT_PYTHON) { #ifdef WINDOWS - snprintf(path, PATH_MAX, "%s%s_%d_%" PRIx64 ".py", tsDataDir, udf->name, udf->version, udf->createdTime); + snprintf(path, PATH_MAX, "%s%s_%d_%" PRIx64 ".py", global.udfDataDir, udf->name, udf->version, udf->createdTime); #else - snprintf(path, PATH_MAX, "%s/%s_%d_%" PRIx64 ".py", tsDataDir, udf->name, udf->version, udf->createdTime); + snprintf(path, PATH_MAX, "%s/%s_%d_%" PRIx64 ".py", global.udfDataDir, udf->name, udf->version, udf->createdTime); #endif } else { #ifdef WINDOWS - snprintf(path, PATH_MAX, "%s%s_%d_%" PRIx64, tsDataDir, udf->name, udf->version, udf->createdTime); + snprintf(path, PATH_MAX, "%s%s_%d_%" PRIx64, global.udfDataDir, udf->name, udf->version, udf->createdTime); #else - snprintf(path, PATH_MAX, "%s/lib%s_%d_%" PRIx64, tsDataDir, udf->name, udf->version, udf->createdTime); + snprintf(path, PATH_MAX, "%s/lib%s_%d_%" PRIx64, global.udfDataDir, udf->name, udf->version, udf->createdTime); #endif } } @@ -845,6 +852,7 @@ int32_t udfdSaveFuncBodyToFile(SFuncInfo *pFuncInfo, SUdf *udf) { bool fileExist = !(taosStatFile(path, NULL, NULL) < 0); if (fileExist) { strncpy(udf->path, path, PATH_MAX); + fnInfo("udfd func body file. reuse existing file %s", path); return TSDB_CODE_SUCCESS; } @@ -1429,6 +1437,24 @@ int32_t udfdCleanup() { return 0; } +int32_t udfdCreateUdfSourceDir() { + snprintf(global.udfDataDir, PATH_MAX, "%s/.udf", tsDataDir); + int32_t code = taosMkDir(global.udfDataDir); + if (code != TSDB_CODE_SUCCESS) { + snprintf(global.udfDataDir, PATH_MAX, "%s/.udf", tsTempDir); + code = taosMkDir(global.udfDataDir); + } + fnInfo("udfd create udf source directory %s. result: %s", global.udfDataDir, tstrerror(code)); + + return code; +} + +int32_t udfdDestroyUdfSourceDir() { + fnInfo("destory udf source directory %s", global.udfDataDir); + taosRemoveDir(global.udfDataDir); + return 0; +} + int main(int argc, char *argv[]) { if (!taosCheckSystemIsLittleEnd()) { printf("failed to start since on non-little-end machines\n"); @@ -1457,10 +1483,15 @@ int main(int argc, char *argv[]) { initEpSetFromCfg(tsFirst, tsSecond, &global.mgmtEp); if (udfdOpenClientRpc() != 0) { - fnError("open rpc connection to mnode failure"); + fnError("open rpc connection to mnode failed"); return -3; } + if (udfdCreateUdfSourceDir() != 0) { + fnError("create udf source directory failed"); + return -4; + } + if (udfdUvInit() != 0) { fnError("uv init failure"); return -5; @@ -1474,6 +1505,7 @@ int main(int argc, char *argv[]) { udfdRun(); removeListeningPipe(); + udfdDestroyUdfSourceDir(); udfdCloseClientRpc(); udfdDeinitResidentFuncs(); -- GitLab