提交 32523d72 编写于 作者: S shenglian zhou

fix: add created time to file name and keep file reuse

上级 26f1e91d
...@@ -1093,10 +1093,15 @@ typedef struct { ...@@ -1093,10 +1093,15 @@ typedef struct {
char* pCode; char* pCode;
} SFuncInfo; } SFuncInfo;
typedef struct {
int32_t funcVersion;
int64_t funcCreatedTime;
} SFuncExtraInfo;
typedef struct { typedef struct {
int32_t numOfFuncs; int32_t numOfFuncs;
SArray* pFuncInfos; SArray* pFuncInfos;
SArray* pFuncVersions; SArray* pFuncExtraInfos;
} SRetrieveFuncRsp; } SRetrieveFuncRsp;
int32_t tSerializeSRetrieveFuncRsp(void* buf, int32_t bufLen, SRetrieveFuncRsp* pRsp); int32_t tSerializeSRetrieveFuncRsp(void* buf, int32_t bufLen, SRetrieveFuncRsp* pRsp);
......
...@@ -277,6 +277,7 @@ typedef enum EUdfFuncType { UDF_FUNC_TYPE_SCALAR = 1, UDF_FUNC_TYPE_AGG = 2 } EU ...@@ -277,6 +277,7 @@ typedef enum EUdfFuncType { UDF_FUNC_TYPE_SCALAR = 1, UDF_FUNC_TYPE_AGG = 2 } EU
typedef struct SScriptUdfInfo { typedef struct SScriptUdfInfo {
const char *name; const char *name;
int32_t version; int32_t version;
int64_t createdTime;
EUdfFuncType funcType; EUdfFuncType funcType;
int8_t scriptType; int8_t scriptType;
......
...@@ -1865,10 +1865,11 @@ int32_t tSerializeSRetrieveFuncRsp(void *buf, int32_t bufLen, SRetrieveFuncRsp * ...@@ -1865,10 +1865,11 @@ int32_t tSerializeSRetrieveFuncRsp(void *buf, int32_t bufLen, SRetrieveFuncRsp *
} }
} }
if (pRsp->numOfFuncs != (int32_t)taosArrayGetSize(pRsp->pFuncVersions)) return -1; if (pRsp->numOfFuncs != (int32_t)taosArrayGetSize(pRsp->pFuncExtraInfos)) return -1;
for (int32_t i = 0; i < pRsp->numOfFuncs; ++i) { for (int32_t i = 0; i < pRsp->numOfFuncs; ++i) {
int32_t version = *(int32_t*)taosArrayGet(pRsp->pFuncVersions, i); SFuncExtraInfo *extraInfo = taosArrayGet(pRsp->pFuncExtraInfos, i);
if (tEncodeI32(&encoder, version) < 0) return -1; if (tEncodeI32(&encoder, extraInfo->funcVersion) < 0) return -1;
if (tEncodeI64(&encoder, extraInfo->funcCreatedTime) < 0) return -1;
} }
tEndEncode(&encoder); tEndEncode(&encoder);
...@@ -1919,18 +1920,19 @@ int32_t tDeserializeSRetrieveFuncRsp(void *buf, int32_t bufLen, SRetrieveFuncRsp ...@@ -1919,18 +1920,19 @@ int32_t tDeserializeSRetrieveFuncRsp(void *buf, int32_t bufLen, SRetrieveFuncRsp
taosArrayPush(pRsp->pFuncInfos, &fInfo); taosArrayPush(pRsp->pFuncInfos, &fInfo);
} }
pRsp->pFuncVersions = taosArrayInit(pRsp->numOfFuncs, sizeof(int32_t)); pRsp->pFuncExtraInfos = taosArrayInit(pRsp->numOfFuncs, sizeof(SFuncExtraInfo));
if (pRsp->pFuncVersions == NULL) return -1; if (pRsp->pFuncExtraInfos == NULL) return -1;
if (tDecodeIsEnd(&decoder)) { if (tDecodeIsEnd(&decoder)) {
for (int32_t i = 0; i < pRsp->numOfFuncs; ++i) { for (int32_t i = 0; i < pRsp->numOfFuncs; ++i) {
int32_t version = 0; SFuncExtraInfo extraInfo = { 0 };
taosArrayPush(pRsp->pFuncVersions, &version); taosArrayPush(pRsp->pFuncExtraInfos, &extraInfo);
} }
} else { } else {
for (int32_t i = 0; i < pRsp->numOfFuncs; ++i) { for (int32_t i = 0; i < pRsp->numOfFuncs; ++i) {
int32_t version = 0; SFuncExtraInfo extraInfo = { 0 };
if (tDecodeI32(&decoder, &version) < 0) return -1; if (tDecodeI32(&decoder, &extraInfo.funcVersion) < 0) return -1;
taosArrayPush(pRsp->pFuncVersions, &version); if (tDecodeI64(&decoder, &extraInfo.funcCreatedTime) < 0) return -1;
taosArrayPush(pRsp->pFuncExtraInfos, &extraInfo);
} }
} }
tEndDecode(&decoder); tEndDecode(&decoder);
...@@ -1955,7 +1957,7 @@ void tFreeSRetrieveFuncRsp(SRetrieveFuncRsp *pRsp) { ...@@ -1955,7 +1957,7 @@ void tFreeSRetrieveFuncRsp(SRetrieveFuncRsp *pRsp) {
tFreeSFuncInfo(pInfo); tFreeSFuncInfo(pInfo);
} }
taosArrayDestroy(pRsp->pFuncInfos); taosArrayDestroy(pRsp->pFuncInfos);
taosArrayDestroy(pRsp->pFuncVersions); taosArrayDestroy(pRsp->pFuncExtraInfos);
} }
int32_t tSerializeSTableCfgReq(void *buf, int32_t bufLen, STableCfgReq *pReq) { int32_t tSerializeSTableCfgReq(void *buf, int32_t bufLen, STableCfgReq *pReq) {
......
...@@ -428,8 +428,8 @@ static int32_t mndProcessRetrieveFuncReq(SRpcMsg *pReq) { ...@@ -428,8 +428,8 @@ static int32_t mndProcessRetrieveFuncReq(SRpcMsg *pReq) {
goto RETRIEVE_FUNC_OVER; goto RETRIEVE_FUNC_OVER;
} }
retrieveRsp.pFuncVersions = taosArrayInit(retrieveReq.numOfFuncs, sizeof(int32_t)); retrieveRsp.pFuncExtraInfos = taosArrayInit(retrieveReq.numOfFuncs, sizeof(SFuncExtraInfo));
if (retrieveRsp.pFuncVersions == NULL) { if (retrieveRsp.pFuncExtraInfos == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
goto RETRIEVE_FUNC_OVER; goto RETRIEVE_FUNC_OVER;
} }
...@@ -472,8 +472,10 @@ static int32_t mndProcessRetrieveFuncReq(SRpcMsg *pReq) { ...@@ -472,8 +472,10 @@ static int32_t mndProcessRetrieveFuncReq(SRpcMsg *pReq) {
} }
} }
taosArrayPush(retrieveRsp.pFuncInfos, &funcInfo); taosArrayPush(retrieveRsp.pFuncInfos, &funcInfo);
SFuncExtraInfo extraInfo = {0};
taosArrayPush(retrieveRsp.pFuncVersions, &pFunc->funcVersion); extraInfo.funcVersion = pFunc->funcVersion;
extraInfo.funcCreatedTime = pFunc->createdTime;
taosArrayPush(retrieveRsp.pFuncExtraInfos, &pFunc->funcVersion);
mndReleaseFunc(pMnode, pFunc); mndReleaseFunc(pMnode, pFunc);
} }
......
...@@ -672,14 +672,14 @@ void ctgTestRspUdfInfo(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pR ...@@ -672,14 +672,14 @@ void ctgTestRspUdfInfo(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pR
SRetrieveFuncRsp funcRsp = {0}; SRetrieveFuncRsp funcRsp = {0};
funcRsp.numOfFuncs = 1; funcRsp.numOfFuncs = 1;
funcRsp.pFuncInfos = taosArrayInit(1, sizeof(SFuncInfo)); funcRsp.pFuncInfos = taosArrayInit(1, sizeof(SFuncInfo));
funcRsp.pFuncVersions = taosArrayInit(1, sizeof(int32_t)); funcRsp.pFuncExtraInfos = taosArrayInit(1, sizeof(SFuncExtraInfo));
SFuncInfo funcInfo = {0}; SFuncInfo funcInfo = {0};
strcpy(funcInfo.name, "func1"); strcpy(funcInfo.name, "func1");
funcInfo.funcType = ctgTestFuncType; funcInfo.funcType = ctgTestFuncType;
(void)taosArrayPush(funcRsp.pFuncInfos, &funcInfo); (void)taosArrayPush(funcRsp.pFuncInfos, &funcInfo);
int32_t version = 0; SFuncExtraInfo extraInfo = {.funcVersion = 1, .funcCreatedTime = taosGetTimestampMs()};
(void)taosArrayPush(funcRsp.pFuncVersions, &version); (void)taosArrayPush(funcRsp.pFuncExtraInfos, &extraInfo);
int32_t rspLen = tSerializeSRetrieveFuncRsp(NULL, 0, &funcRsp); int32_t rspLen = tSerializeSRetrieveFuncRsp(NULL, 0, &funcRsp);
void *pReq = rpcMallocCont(rspLen); void *pReq = rpcMallocCont(rspLen);
......
...@@ -53,7 +53,7 @@ int32_t udfdCPluginOpen(SScriptUdfEnvItem *items, int numItems) { return 0; } ...@@ -53,7 +53,7 @@ int32_t udfdCPluginOpen(SScriptUdfEnvItem *items, int numItems) { return 0; }
int32_t udfdCPluginClose() { return 0; } int32_t udfdCPluginClose() { return 0; }
const char *udfdCPluginUdfInitLoadInitDestoryFuncs(SUdfCPluginCtx *udfCtx, const char* udfName) { const char *udfdCPluginUdfInitLoadInitDestoryFuncs(SUdfCPluginCtx *udfCtx, const char *udfName) {
char initFuncName[TSDB_FUNC_NAME_LEN + 5] = {0}; char initFuncName[TSDB_FUNC_NAME_LEN + 5] = {0};
char *initSuffix = "_init"; char *initSuffix = "_init";
strcpy(initFuncName, udfName); strcpy(initFuncName, udfName);
...@@ -100,7 +100,7 @@ int32_t udfdCPluginUdfInit(SScriptUdfInfo *udf, void **pUdfCtx) { ...@@ -100,7 +100,7 @@ int32_t udfdCPluginUdfInit(SScriptUdfInfo *udf, void **pUdfCtx) {
fnError("can not load library %s. error: %s", udf->path, uv_strerror(err)); fnError("can not load library %s. error: %s", udf->path, uv_strerror(err));
return TSDB_CODE_UDF_LOAD_UDF_FAILURE; return TSDB_CODE_UDF_LOAD_UDF_FAILURE;
} }
const char* udfName = udf->name; const char *udfName = udf->name;
udfdCPluginUdfInitLoadInitDestoryFuncs(udfCtx, udfName); udfdCPluginUdfInitLoadInitDestoryFuncs(udfCtx, udfName);
...@@ -260,6 +260,7 @@ typedef enum { UDF_STATE_INIT = 0, UDF_STATE_LOADING, UDF_STATE_READY } EUdfStat ...@@ -260,6 +260,7 @@ typedef enum { UDF_STATE_INIT = 0, UDF_STATE_LOADING, UDF_STATE_READY } EUdfStat
typedef struct SUdf { typedef struct SUdf {
char name[TSDB_FUNC_NAME_LEN + 1]; char name[TSDB_FUNC_NAME_LEN + 1];
int32_t version; int32_t version;
int64_t createdTime;
int8_t funcType; int8_t funcType;
int8_t scriptType; int8_t scriptType;
...@@ -518,6 +519,7 @@ void convertUdf2UdfInfo(SUdf *udf, SScriptUdfInfo *udfInfo) { ...@@ -518,6 +519,7 @@ void convertUdf2UdfInfo(SUdf *udf, SScriptUdfInfo *udfInfo) {
} }
udfInfo->name = udf->name; udfInfo->name = udf->name;
udfInfo->version = udf->version; udfInfo->version = udf->version;
udfInfo->createdTime = udf->createdTime;
udfInfo->outputLen = udf->outputLen; udfInfo->outputLen = udf->outputLen;
udfInfo->outputType = udf->outputType; udfInfo->outputType = udf->outputType;
udfInfo->path = udf->path; udfInfo->path = udf->path;
...@@ -527,9 +529,9 @@ void convertUdf2UdfInfo(SUdf *udf, SScriptUdfInfo *udfInfo) { ...@@ -527,9 +529,9 @@ void convertUdf2UdfInfo(SUdf *udf, SScriptUdfInfo *udfInfo) {
int32_t udfdRenameUdfFile(SUdf *udf) { int32_t udfdRenameUdfFile(SUdf *udf) {
char newPath[PATH_MAX]; char newPath[PATH_MAX];
if (udf->scriptType == TSDB_FUNC_SCRIPT_BIN_LIB) { if (udf->scriptType == TSDB_FUNC_SCRIPT_BIN_LIB) {
snprintf(newPath, PATH_MAX, "%s/lib%s_%d_%" PRIx64 ".so", tsDataDir, udf->name, udf->version, udf->lastFetchTime); snprintf(newPath, PATH_MAX, "%s/lib%s_%d_%" PRIx64 ".so", tsDataDir, udf->name, udf->version, udf->createdTime);
} else if (udf->scriptType == TSDB_FUNC_SCRIPT_PYTHON) { } else if (udf->scriptType == TSDB_FUNC_SCRIPT_PYTHON) {
snprintf(newPath, PATH_MAX, "%s/%s_%d_%" PRIx64 ".py", tsDataDir, udf->name, udf->version, udf->lastFetchTime); snprintf(newPath, PATH_MAX, "%s/%s_%d_%" PRIx64 ".py", tsDataDir, udf->name, udf->version, udf->createdTime);
} else { } else {
return TSDB_CODE_UDF_SCRIPT_NOT_SUPPORTED; return TSDB_CODE_UDF_SCRIPT_NOT_SUPPORTED;
} }
...@@ -805,7 +807,6 @@ void udfdProcessTeardownRequest(SUvUdfWork *uvUdf, SUdfRequest *request) { ...@@ -805,7 +807,6 @@ void udfdProcessTeardownRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
uv_mutex_destroy(&udf->lock); uv_mutex_destroy(&udf->lock);
code = udf->scriptPlugin->udfDestroyFunc(udf->scriptUdfCtx); code = udf->scriptPlugin->udfDestroyFunc(udf->scriptUdfCtx);
fnDebug("udfd destroy function returns %d", code); fnDebug("udfd destroy function returns %d", code);
taosRemoveFile(udf->path);
taosMemoryFree(udf); taosMemoryFree(udf);
} }
taosMemoryFree(handle); taosMemoryFree(handle);
...@@ -835,22 +836,36 @@ int32_t udfdSaveFuncBodyToFile(SFuncInfo *pFuncInfo, SUdf *udf) { ...@@ -835,22 +836,36 @@ int32_t udfdSaveFuncBodyToFile(SFuncInfo *pFuncInfo, SUdf *udf) {
char path[PATH_MAX] = {0}; char path[PATH_MAX] = {0};
#ifdef WINDOWS #ifdef WINDOWS
snprintf(path, sizeof(path), "%s%s_%d_%" PRIx64, tsDataDir, pFuncInfo->name, udf->version, udf->lastFetchTime); snprintf(path, sizeof(path), "%s%s_%d_%" PRIx64, tsDataDir, pFuncInfo->name, udf->version, udf->createdTime);
#else #else
snprintf(path, sizeof(path), "%s/%s_%d_%" PRIx64, tsDataDir, pFuncInfo->name, udf->version, udf->lastFetchTime); snprintf(path, sizeof(path), "%s/%s_%d_%" PRIx64, tsDataDir, pFuncInfo->name, udf->version, udf->createdTime);
#endif #endif
bool fileExist = !(taosStatFile(path, NULL, NULL) < 0);
if (fileExist) {
// TODO: error processing
TdFilePtr file = taosOpenFile(path, TD_FILE_READ);
int64_t size = 0;
taosFStatFile(file, &size, NULL);
taosCloseFile(file);
if (size == pFuncInfo->codeSize) {
strncpy(udf->path, path, PATH_MAX);
return TSDB_CODE_SUCCESS;
}
}
TdFilePtr file = taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC); TdFilePtr file = taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC);
if (file == NULL) { if (file == NULL) {
fnError("udfd write udf shared library: %s failed, error: %d %s", path, errno, strerror(errno)); fnError("udfd write udf shared library: %s failed, error: %d %s", path, errno, strerror(errno));
return TSDB_CODE_FILE_CORRUPTED; return TSDB_CODE_FILE_CORRUPTED;
} }
int64_t count = taosWriteFile(file, pFuncInfo->pCode, pFuncInfo->codeSize); int64_t count = taosWriteFile(file, pFuncInfo->pCode, pFuncInfo->codeSize);
if (count != pFuncInfo->codeSize) { if (count != pFuncInfo->codeSize) {
fnError("udfd write udf shared library failed"); fnError("udfd write udf shared library failed");
return TSDB_CODE_FILE_CORRUPTED; return TSDB_CODE_FILE_CORRUPTED;
} }
taosCloseFile(&file); taosCloseFile(&file);
strncpy(udf->path, path, PATH_MAX); strncpy(udf->path, path, PATH_MAX);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -901,15 +916,17 @@ void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { ...@@ -901,15 +916,17 @@ void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
udf->outputType = pFuncInfo->outputType; udf->outputType = pFuncInfo->outputType;
udf->outputLen = pFuncInfo->outputLen; udf->outputLen = pFuncInfo->outputLen;
udf->bufSize = pFuncInfo->bufSize; udf->bufSize = pFuncInfo->bufSize;
udf->version = *(int32_t *)taosArrayGet(retrieveRsp.pFuncVersions, 0); SFuncExtraInfo *pFuncExtraInfo = (SFuncExtraInfo *)taosArrayGet(retrieveRsp.pFuncExtraInfos, 0);
udf->version = pFuncExtraInfo->funcVersion;
udf->createdTime = pFuncExtraInfo->funcCreatedTime;
msgInfo->code = udfdSaveFuncBodyToFile(pFuncInfo, udf); msgInfo->code = udfdSaveFuncBodyToFile(pFuncInfo, udf);
if (msgInfo->code != 0) { if (msgInfo->code != 0) {
udf->lastFetchTime = 0; udf->lastFetchTime = 0;
} }
tFreeSFuncInfo(pFuncInfo); tFreeSFuncInfo(pFuncInfo);
taosArrayDestroy(retrieveRsp.pFuncInfos); taosArrayDestroy(retrieveRsp.pFuncInfos);
taosArrayDestroy(retrieveRsp.pFuncVersions); taosArrayDestroy(retrieveRsp.pFuncExtraInfos);
} }
_return: _return:
......
...@@ -587,7 +587,7 @@ int32_t queryProcessRetrieveFuncRsp(void *output, char *msg, int32_t msgSize) { ...@@ -587,7 +587,7 @@ int32_t queryProcessRetrieveFuncRsp(void *output, char *msg, int32_t msgSize) {
memcpy(output, funcInfo, sizeof(*funcInfo)); memcpy(output, funcInfo, sizeof(*funcInfo));
taosArrayDestroy(out.pFuncInfos); taosArrayDestroy(out.pFuncInfos);
taosArrayDestroy(out.pFuncVersions); taosArrayDestroy(out.pFuncExtraInfos);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册