From 32523d72f9c6a7037c19802387942779096261e5 Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Sat, 8 Apr 2023 10:11:57 +0800 Subject: [PATCH] fix: add created time to file name and keep file reuse --- include/common/tmsg.h | 7 +++- include/libs/function/taosudf.h | 1 + source/common/src/tmsg.c | 24 ++++++------ source/dnode/mnode/impl/src/mndFunc.c | 10 +++-- source/libs/catalog/test/catalogTests.cpp | 6 +-- source/libs/function/src/udfd.c | 45 ++++++++++++++++------- source/libs/qcom/src/querymsg.c | 2 +- 7 files changed, 61 insertions(+), 34 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 0ef6347a44..eefb8fc99e 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1093,10 +1093,15 @@ typedef struct { char* pCode; } SFuncInfo; +typedef struct { + int32_t funcVersion; + int64_t funcCreatedTime; +} SFuncExtraInfo; + typedef struct { int32_t numOfFuncs; SArray* pFuncInfos; - SArray* pFuncVersions; + SArray* pFuncExtraInfos; } SRetrieveFuncRsp; int32_t tSerializeSRetrieveFuncRsp(void* buf, int32_t bufLen, SRetrieveFuncRsp* pRsp); diff --git a/include/libs/function/taosudf.h b/include/libs/function/taosudf.h index 2eccb6225c..5703df87fa 100644 --- a/include/libs/function/taosudf.h +++ b/include/libs/function/taosudf.h @@ -277,6 +277,7 @@ typedef enum EUdfFuncType { UDF_FUNC_TYPE_SCALAR = 1, UDF_FUNC_TYPE_AGG = 2 } EU typedef struct SScriptUdfInfo { const char *name; int32_t version; + int64_t createdTime; EUdfFuncType funcType; int8_t scriptType; diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index f103b33138..ac3d4e6a10 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -1865,10 +1865,11 @@ int32_t tSerializeSRetrieveFuncRsp(void *buf, int32_t bufLen, SRetrieveFuncRsp * } } - if (pRsp->numOfFuncs != (int32_t)taosArrayGetSize(pRsp->pFuncVersions)) return -1; + if (pRsp->numOfFuncs != (int32_t)taosArrayGetSize(pRsp->pFuncExtraInfos)) return -1; for (int32_t i = 0; i < pRsp->numOfFuncs; ++i) { - int32_t version = *(int32_t*)taosArrayGet(pRsp->pFuncVersions, i); - if (tEncodeI32(&encoder, version) < 0) return -1; + SFuncExtraInfo *extraInfo = taosArrayGet(pRsp->pFuncExtraInfos, i); + if (tEncodeI32(&encoder, extraInfo->funcVersion) < 0) return -1; + if (tEncodeI64(&encoder, extraInfo->funcCreatedTime) < 0) return -1; } tEndEncode(&encoder); @@ -1919,18 +1920,19 @@ int32_t tDeserializeSRetrieveFuncRsp(void *buf, int32_t bufLen, SRetrieveFuncRsp taosArrayPush(pRsp->pFuncInfos, &fInfo); } - pRsp->pFuncVersions = taosArrayInit(pRsp->numOfFuncs, sizeof(int32_t)); - if (pRsp->pFuncVersions == NULL) return -1; + pRsp->pFuncExtraInfos = taosArrayInit(pRsp->numOfFuncs, sizeof(SFuncExtraInfo)); + if (pRsp->pFuncExtraInfos == NULL) return -1; if (tDecodeIsEnd(&decoder)) { for (int32_t i = 0; i < pRsp->numOfFuncs; ++i) { - int32_t version = 0; - taosArrayPush(pRsp->pFuncVersions, &version); + SFuncExtraInfo extraInfo = { 0 }; + taosArrayPush(pRsp->pFuncExtraInfos, &extraInfo); } } else { for (int32_t i = 0; i < pRsp->numOfFuncs; ++i) { - int32_t version = 0; - if (tDecodeI32(&decoder, &version) < 0) return -1; - taosArrayPush(pRsp->pFuncVersions, &version); + SFuncExtraInfo extraInfo = { 0 }; + if (tDecodeI32(&decoder, &extraInfo.funcVersion) < 0) return -1; + if (tDecodeI64(&decoder, &extraInfo.funcCreatedTime) < 0) return -1; + taosArrayPush(pRsp->pFuncExtraInfos, &extraInfo); } } tEndDecode(&decoder); @@ -1955,7 +1957,7 @@ void tFreeSRetrieveFuncRsp(SRetrieveFuncRsp *pRsp) { tFreeSFuncInfo(pInfo); } taosArrayDestroy(pRsp->pFuncInfos); - taosArrayDestroy(pRsp->pFuncVersions); + taosArrayDestroy(pRsp->pFuncExtraInfos); } int32_t tSerializeSTableCfgReq(void *buf, int32_t bufLen, STableCfgReq *pReq) { diff --git a/source/dnode/mnode/impl/src/mndFunc.c b/source/dnode/mnode/impl/src/mndFunc.c index 08336ef70a..a4ccc0ad73 100644 --- a/source/dnode/mnode/impl/src/mndFunc.c +++ b/source/dnode/mnode/impl/src/mndFunc.c @@ -428,8 +428,8 @@ static int32_t mndProcessRetrieveFuncReq(SRpcMsg *pReq) { goto RETRIEVE_FUNC_OVER; } - retrieveRsp.pFuncVersions = taosArrayInit(retrieveReq.numOfFuncs, sizeof(int32_t)); - if (retrieveRsp.pFuncVersions == NULL) { + retrieveRsp.pFuncExtraInfos = taosArrayInit(retrieveReq.numOfFuncs, sizeof(SFuncExtraInfo)); + if (retrieveRsp.pFuncExtraInfos == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; goto RETRIEVE_FUNC_OVER; } @@ -472,8 +472,10 @@ static int32_t mndProcessRetrieveFuncReq(SRpcMsg *pReq) { } } taosArrayPush(retrieveRsp.pFuncInfos, &funcInfo); - - taosArrayPush(retrieveRsp.pFuncVersions, &pFunc->funcVersion); + SFuncExtraInfo extraInfo = {0}; + extraInfo.funcVersion = pFunc->funcVersion; + extraInfo.funcCreatedTime = pFunc->createdTime; + taosArrayPush(retrieveRsp.pFuncExtraInfos, &pFunc->funcVersion); mndReleaseFunc(pMnode, pFunc); } diff --git a/source/libs/catalog/test/catalogTests.cpp b/source/libs/catalog/test/catalogTests.cpp index f3f7d62acf..c4bd1df2d6 100644 --- a/source/libs/catalog/test/catalogTests.cpp +++ b/source/libs/catalog/test/catalogTests.cpp @@ -672,14 +672,14 @@ void ctgTestRspUdfInfo(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pR SRetrieveFuncRsp funcRsp = {0}; funcRsp.numOfFuncs = 1; funcRsp.pFuncInfos = taosArrayInit(1, sizeof(SFuncInfo)); - funcRsp.pFuncVersions = taosArrayInit(1, sizeof(int32_t)); + funcRsp.pFuncExtraInfos = taosArrayInit(1, sizeof(SFuncExtraInfo)); SFuncInfo funcInfo = {0}; strcpy(funcInfo.name, "func1"); funcInfo.funcType = ctgTestFuncType; (void)taosArrayPush(funcRsp.pFuncInfos, &funcInfo); - int32_t version = 0; - (void)taosArrayPush(funcRsp.pFuncVersions, &version); + SFuncExtraInfo extraInfo = {.funcVersion = 1, .funcCreatedTime = taosGetTimestampMs()}; + (void)taosArrayPush(funcRsp.pFuncExtraInfos, &extraInfo); int32_t rspLen = tSerializeSRetrieveFuncRsp(NULL, 0, &funcRsp); void *pReq = rpcMallocCont(rspLen); diff --git a/source/libs/function/src/udfd.c b/source/libs/function/src/udfd.c index 836d2d893c..4ef7a0e7cc 100644 --- a/source/libs/function/src/udfd.c +++ b/source/libs/function/src/udfd.c @@ -53,9 +53,9 @@ int32_t udfdCPluginOpen(SScriptUdfEnvItem *items, int numItems) { return 0; } int32_t udfdCPluginClose() { return 0; } -const char *udfdCPluginUdfInitLoadInitDestoryFuncs(SUdfCPluginCtx *udfCtx, const char* udfName) { - char initFuncName[TSDB_FUNC_NAME_LEN + 5] = {0}; - char *initSuffix = "_init"; +const char *udfdCPluginUdfInitLoadInitDestoryFuncs(SUdfCPluginCtx *udfCtx, const char *udfName) { + char initFuncName[TSDB_FUNC_NAME_LEN + 5] = {0}; + char *initSuffix = "_init"; strcpy(initFuncName, udfName); strncat(initFuncName, initSuffix, strlen(initSuffix)); uv_dlsym(&udfCtx->lib, initFuncName, (void **)(&udfCtx->initFunc)); @@ -100,7 +100,7 @@ int32_t udfdCPluginUdfInit(SScriptUdfInfo *udf, void **pUdfCtx) { fnError("can not load library %s. error: %s", udf->path, uv_strerror(err)); return TSDB_CODE_UDF_LOAD_UDF_FAILURE; } - const char* udfName = udf->name; + const char *udfName = udf->name; udfdCPluginUdfInitLoadInitDestoryFuncs(udfCtx, udfName); @@ -260,6 +260,7 @@ typedef enum { UDF_STATE_INIT = 0, UDF_STATE_LOADING, UDF_STATE_READY } EUdfStat typedef struct SUdf { char name[TSDB_FUNC_NAME_LEN + 1]; int32_t version; + int64_t createdTime; int8_t funcType; int8_t scriptType; @@ -518,6 +519,7 @@ void convertUdf2UdfInfo(SUdf *udf, SScriptUdfInfo *udfInfo) { } udfInfo->name = udf->name; udfInfo->version = udf->version; + udfInfo->createdTime = udf->createdTime; udfInfo->outputLen = udf->outputLen; udfInfo->outputType = udf->outputType; udfInfo->path = udf->path; @@ -527,9 +529,9 @@ void convertUdf2UdfInfo(SUdf *udf, SScriptUdfInfo *udfInfo) { int32_t udfdRenameUdfFile(SUdf *udf) { char newPath[PATH_MAX]; if (udf->scriptType == TSDB_FUNC_SCRIPT_BIN_LIB) { - snprintf(newPath, PATH_MAX, "%s/lib%s_%d_%" PRIx64 ".so", tsDataDir, udf->name, udf->version, udf->lastFetchTime); + snprintf(newPath, PATH_MAX, "%s/lib%s_%d_%" PRIx64 ".so", tsDataDir, udf->name, udf->version, udf->createdTime); } else if (udf->scriptType == TSDB_FUNC_SCRIPT_PYTHON) { - snprintf(newPath, PATH_MAX, "%s/%s_%d_%" PRIx64 ".py", tsDataDir, udf->name, udf->version, udf->lastFetchTime); + snprintf(newPath, PATH_MAX, "%s/%s_%d_%" PRIx64 ".py", tsDataDir, udf->name, udf->version, udf->createdTime); } else { return TSDB_CODE_UDF_SCRIPT_NOT_SUPPORTED; } @@ -606,7 +608,7 @@ SUdf *udfdGetOrCreateUdf(const char *udfName) { int64_t currTime = taosGetTimestampSec(); bool expired = false; if (pUdfHash) { - expired = currTime - (*pUdfHash)->lastFetchTime > 10 * 1000 * 1000; // 10s + expired = currTime - (*pUdfHash)->lastFetchTime > 10 * 1000 * 1000; // 10s if (!expired) { ++(*pUdfHash)->refCount; SUdf *udf = *pUdfHash; @@ -618,7 +620,7 @@ SUdf *udfdGetOrCreateUdf(const char *udfName) { } } - SUdf *udf = udfdNewUdf(udfName); + SUdf *udf = udfdNewUdf(udfName); SUdf **pUdf = &udf; taosHashPut(global.udfsHash, udfName, strlen(udfName), pUdf, POINTER_BYTES); @@ -805,7 +807,6 @@ void udfdProcessTeardownRequest(SUvUdfWork *uvUdf, SUdfRequest *request) { uv_mutex_destroy(&udf->lock); code = udf->scriptPlugin->udfDestroyFunc(udf->scriptUdfCtx); fnDebug("udfd destroy function returns %d", code); - taosRemoveFile(udf->path); taosMemoryFree(udf); } taosMemoryFree(handle); @@ -835,22 +836,36 @@ int32_t udfdSaveFuncBodyToFile(SFuncInfo *pFuncInfo, SUdf *udf) { char path[PATH_MAX] = {0}; #ifdef WINDOWS - snprintf(path, sizeof(path), "%s%s_%d_%" PRIx64, tsDataDir, pFuncInfo->name, udf->version, udf->lastFetchTime); + snprintf(path, sizeof(path), "%s%s_%d_%" PRIx64, tsDataDir, pFuncInfo->name, udf->version, udf->createdTime); #else - snprintf(path, sizeof(path), "%s/%s_%d_%" PRIx64, tsDataDir, pFuncInfo->name, udf->version, udf->lastFetchTime); + snprintf(path, sizeof(path), "%s/%s_%d_%" PRIx64, tsDataDir, pFuncInfo->name, udf->version, udf->createdTime); #endif + + bool fileExist = !(taosStatFile(path, NULL, NULL) < 0); + if (fileExist) { + // TODO: error processing + TdFilePtr file = taosOpenFile(path, TD_FILE_READ); + int64_t size = 0; + taosFStatFile(file, &size, NULL); + taosCloseFile(file); + if (size == pFuncInfo->codeSize) { + strncpy(udf->path, path, PATH_MAX); + return TSDB_CODE_SUCCESS; + } + } + TdFilePtr file = taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC); if (file == NULL) { fnError("udfd write udf shared library: %s failed, error: %d %s", path, errno, strerror(errno)); return TSDB_CODE_FILE_CORRUPTED; } - int64_t count = taosWriteFile(file, pFuncInfo->pCode, pFuncInfo->codeSize); if (count != pFuncInfo->codeSize) { fnError("udfd write udf shared library failed"); return TSDB_CODE_FILE_CORRUPTED; } taosCloseFile(&file); + strncpy(udf->path, path, PATH_MAX); return TSDB_CODE_SUCCESS; } @@ -901,15 +916,17 @@ void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { udf->outputType = pFuncInfo->outputType; udf->outputLen = pFuncInfo->outputLen; udf->bufSize = pFuncInfo->bufSize; - udf->version = *(int32_t *)taosArrayGet(retrieveRsp.pFuncVersions, 0); + SFuncExtraInfo *pFuncExtraInfo = (SFuncExtraInfo *)taosArrayGet(retrieveRsp.pFuncExtraInfos, 0); + udf->version = pFuncExtraInfo->funcVersion; + udf->createdTime = pFuncExtraInfo->funcCreatedTime; msgInfo->code = udfdSaveFuncBodyToFile(pFuncInfo, udf); if (msgInfo->code != 0) { udf->lastFetchTime = 0; } tFreeSFuncInfo(pFuncInfo); taosArrayDestroy(retrieveRsp.pFuncInfos); - taosArrayDestroy(retrieveRsp.pFuncVersions); + taosArrayDestroy(retrieveRsp.pFuncExtraInfos); } _return: diff --git a/source/libs/qcom/src/querymsg.c b/source/libs/qcom/src/querymsg.c index 4b41c24371..b62a3e4932 100644 --- a/source/libs/qcom/src/querymsg.c +++ b/source/libs/qcom/src/querymsg.c @@ -587,7 +587,7 @@ int32_t queryProcessRetrieveFuncRsp(void *output, char *msg, int32_t msgSize) { memcpy(output, funcInfo, sizeof(*funcInfo)); taosArrayDestroy(out.pFuncInfos); - taosArrayDestroy(out.pFuncVersions); + taosArrayDestroy(out.pFuncExtraInfos); return TSDB_CODE_SUCCESS; } -- GitLab