提交 c17d9eca 编写于 作者: S shenglian zhou

fix: add fetchtime and version to udf init

上级 a71caffe
......@@ -276,6 +276,7 @@ typedef enum EUdfFuncType { UDF_FUNC_TYPE_SCALAR = 1, UDF_FUNC_TYPE_AGG = 2 } EU
typedef struct SScriptUdfInfo {
const char *name;
int32_t version;
EUdfFuncType funcType;
int8_t scriptType;
......
......@@ -100,7 +100,6 @@ int32_t udfdCPluginUdfInit(SScriptUdfInfo *udf, void **pUdfCtx) {
}
int32_t code = 0;
if (udfCtx->initFunc) {
// TODO: handle init call return error
code = (udfCtx->initFunc)();
if (code != 0) {
uv_dlclose(&udfCtx->lib);
......@@ -245,7 +244,7 @@ typedef struct SUvUdfWork {
typedef enum { UDF_STATE_INIT = 0, UDF_STATE_LOADING, UDF_STATE_READY } EUdfState;
typedef struct SUdf {
char name[TSDB_FUNC_NAME_LEN + 1];
char name[TSDB_FUNC_NAME_LEN + 1];
int32_t version;
int8_t funcType;
......@@ -264,9 +263,11 @@ typedef struct SUdf {
SUdfScriptPlugin *scriptPlugin;
void *scriptUdfCtx;
int64_t lastFetchTime; // last fetch time in milliseconds
bool expired;
} SUdf;
// TODO: add private udf structure.
typedef struct SUdfcFuncHandle {
SUdf *udf;
} SUdfcFuncHandle;
......@@ -319,7 +320,8 @@ static void udfdCloseWalkCb(uv_handle_t *handle, void *arg);
static int32_t udfdRun();
static void udfdConnectMnodeThreadFunc(void *args);
void udfdInitializeCPlugin(SUdfScriptPlugin *plugin) {
SUdf *udfdNewUdf(const char *udfName);
void udfdInitializeCPlugin(SUdfScriptPlugin *plugin) {
plugin->scriptType = TSDB_FUNC_SCRIPT_BIN_LIB;
plugin->openFunc = udfdCPluginOpen;
plugin->closeFunc = udfdCPluginClose;
......@@ -501,6 +503,7 @@ void convertUdf2UdfInfo(SUdf *udf, SScriptUdfInfo *udfInfo) {
udfInfo->funcType = UDF_FUNC_TYPE_SCALAR;
}
udfInfo->name = udf->name;
udfInfo->version = udf->version;
udfInfo->outputLen = udf->outputLen;
udfInfo->outputType = udf->outputType;
udfInfo->path = udf->path;
......@@ -510,9 +513,9 @@ void convertUdf2UdfInfo(SUdf *udf, SScriptUdfInfo *udfInfo) {
int32_t udfdRenameUdfFile(SUdf *udf) {
char newPath[PATH_MAX];
if (udf->scriptType == TSDB_FUNC_SCRIPT_BIN_LIB) {
snprintf(newPath, PATH_MAX, "%s/lib%s.so", tsTempDir, udf->name);
snprintf(newPath, PATH_MAX, "%s/lib%s_%d_%"PRId64".so", tsTempDir, udf->name, udf->version, udf->lastFetchTime);
} else if (udf->scriptType == TSDB_FUNC_SCRIPT_PYTHON) {
snprintf(newPath, PATH_MAX, "%s/%s.py", tsTempDir, udf->name);
snprintf(newPath, PATH_MAX, "%s/%s_%d_%"PRId64".py", tsTempDir, udf->name, udf->version, udf->lastFetchTime);
} else {
return TSDB_CODE_UDF_SCRIPT_NOT_SUPPORTED;
}
......@@ -557,40 +560,53 @@ int32_t udfdInitUdf(char *udfName, SUdf *udf) {
return err;
}
fnInfo("udf init succeeded. name %s type %d context %p", udf->name, udf->scriptType, (void*)udf->scriptUdfCtx);
fnInfo("udf init succeeded. name %s type %d context %p", udf->name, udf->scriptType, (void *)udf->scriptUdfCtx);
return 0;
}
SUdf *udfdNewUdf(const char *udfName) {
SUdf *udfNew = taosMemoryCalloc(1, sizeof(SUdf));
udfNew->refCount = 1;
strncpy(udfNew->name, udfName, TSDB_FUNC_NAME_LEN);
udfNew->state = UDF_STATE_INIT;
uv_mutex_init(&udfNew->lock);
uv_cond_init(&udfNew->condReady);
udfNew->resident = false;
udfNew->expired = false;
for (int32_t i = 0; i < taosArrayGetSize(global.residentFuncs); ++i) {
char *funcName = taosArrayGet(global.residentFuncs, i);
if (strcmp(udfName, funcName) == 0) {
udfNew->resident = true;
break;
}
}
return udfNew;
}
SUdf *udfdGetOrCreateUdf(const char *udfName) {
SUdf *udf = NULL;
uv_mutex_lock(&global.udfsMutex);
SUdf **udfInHash = taosHashGet(global.udfsHash, udfName, strlen(udfName));
if (udfInHash) {
++(*udfInHash)->refCount;
udf = *udfInHash;
uv_mutex_unlock(&global.udfsMutex);
} else {
SUdf *udfNew = taosMemoryCalloc(1, sizeof(SUdf));
udfNew->refCount = 1;
strncpy(udfNew->name, udfName, TSDB_FUNC_NAME_LEN);
udfNew->state = UDF_STATE_INIT;
uv_mutex_init(&udfNew->lock);
uv_cond_init(&udfNew->condReady);
udf = udfNew;
udf->resident = false;
for (int32_t i = 0; i < taosArrayGetSize(global.residentFuncs); ++i) {
char *funcName = taosArrayGet(global.residentFuncs, i);
if (strcmp(udfName, funcName) == 0) {
udf->resident = true;
break;
}
}
SUdf **pUdf = &udf;
taosHashPut(global.udfsHash, udfName, strlen(udfName), pUdf, POINTER_BYTES);
SUdf **pUdfHash = taosHashGet(global.udfsHash, udfName, strlen(udfName));
int64_t currTime = taosGetTimestampSec();
bool expired = currTime - (*pUdfHash)->lastFetchTime > 10 * 1000;
if (pUdfHash && !expired) {
++(*pUdfHash)->refCount;
SUdf *udf = *pUdfHash;
uv_mutex_unlock(&global.udfsMutex);
return udf;
}
if (pUdfHash && expired) {
(*pUdfHash)->expired = true;
taosHashRemove(global.udfsHash, udfName, strlen(udfName));
}
SUdf *udf = udfdNewUdf(udfName);
SUdf **pUdf = &udf;
taosHashPut(global.udfsHash, udfName, strlen(udfName), pUdf, POINTER_BYTES);
uv_mutex_unlock(&global.udfsMutex);
return udf;
}
......@@ -761,17 +777,19 @@ void udfdProcessTeardownRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
uv_mutex_lock(&global.udfsMutex);
udf->refCount--;
if (udf->refCount == 0 && !udf->resident) {
if ((udf->refCount == 0 && !udf->resident) ||
(udf->resident && udf->expired)) {
unloadUdf = true;
taosHashRemove(global.udfsHash, udf->name, strlen(udf->name));
}
uv_mutex_unlock(&global.udfsMutex);
if (unloadUdf) {
fnInfo("udf teardown. udf name: %s type %d: context %p", udf->name, udf->scriptType, (void*)(udf->scriptUdfCtx));
fnInfo("udf teardown. udf name: %s type %d: context %p", udf->name, udf->scriptType, (void *)(udf->scriptUdfCtx));
uv_cond_destroy(&udf->condReady);
uv_mutex_destroy(&udf->lock);
code = udf->scriptPlugin->udfDestroyFunc(udf->scriptUdfCtx);
fnDebug("udfd destroy function returns %d", code);
taosRemoveFile(udf->path);
taosMemoryFree(udf);
}
taosMemoryFree(handle);
......@@ -792,6 +810,35 @@ void udfdProcessTeardownRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
return;
}
int32_t udfdSaveFuncBodyToFile(SFuncInfo* pFuncInfo, SUdf* udf) {
if (!osTempSpaceAvailable()) {
terrno = TSDB_CODE_NO_AVAIL_DISK;
fnError("udfd create shared library failed since %s", terrstr(terrno));
return terrno;
}
char path[PATH_MAX] = {0};
#ifdef WINDOWS
snprintf(path, sizeof(path), "%s%s_%d_%" PRId64, tsTempDir, pFuncInfo->name, udf->version, udf->lastFetchTime);
#else
snprintf(path, sizeof(path), "%s/%s_%d_%" PRId64, tsTempDir, pFuncInfo->name, udf->version, udf->lastFetchTime);
#endif
TdFilePtr file = taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC);
if (file == NULL) {
fnError("udfd write udf shared library: %s failed, error: %d %s", path, errno, strerror(errno));
return TSDB_CODE_FILE_CORRUPTED;
}
int64_t count = taosWriteFile(file, pFuncInfo->pCode, pFuncInfo->codeSize);
if (count != pFuncInfo->codeSize) {
fnError("udfd write udf shared library failed");
return TSDB_CODE_FILE_CORRUPTED;
}
taosCloseFile(&file);
strncpy(udf->path, path, PATH_MAX);
return TSDB_CODE_SUCCESS;
}
void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
SUdfdRpcSendRecvInfo *msgInfo = (SUdfdRpcSendRecvInfo *)pMsg->info.ahandle;
......@@ -830,49 +877,23 @@ void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
} else if (msgInfo->rpcType == UDFD_RPC_RETRIVE_FUNC) {
SRetrieveFuncRsp retrieveRsp = {0};
tDeserializeSRetrieveFuncRsp(pMsg->pCont, pMsg->contLen, &retrieveRsp);
if (retrieveRsp.pFuncInfos == NULL) {
goto _return;
}
SFuncInfo *pFuncInfo = (SFuncInfo *)taosArrayGet(retrieveRsp.pFuncInfos, 0);
SUdf *udf = msgInfo->param;
SUdf *udf = msgInfo->param;
udf->funcType = pFuncInfo->funcType;
udf->scriptType = pFuncInfo->scriptType;
udf->outputType = pFuncInfo->outputType;
udf->outputLen = pFuncInfo->outputLen;
udf->bufSize = pFuncInfo->bufSize;
udf->version = *(int32_t*)taosArrayGet(retrieveRsp.pFuncVersions,0);
udf->version = *(int32_t *)taosArrayGet(retrieveRsp.pFuncVersions, 0);
if (!osTempSpaceAvailable()) {
terrno = TSDB_CODE_NO_AVAIL_DISK;
msgInfo->code = terrno;
fnError("udfd create shared library failed since %s", terrstr(terrno));
goto _return;
msgInfo->code = udfdSaveFuncBodyToFile(pFuncInfo, udf);
if (msgInfo->code == 0) {
udf->lastFetchTime = taosGetTimestampMs();
}
char path[PATH_MAX] = {0};
#ifdef WINDOWS
snprintf(path, sizeof(path), "%s%s%d", tsTempDir, pFuncInfo->name, udf->version);
#else
snprintf(path, sizeof(path), "%s/%s%d", tsTempDir, pFuncInfo->name, udf->version);
#endif
TdFilePtr file = taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC);
if (file == NULL) {
fnError("udfd write udf shared library: %s failed, error: %d %s", path, errno, strerror(errno));
msgInfo->code = TSDB_CODE_FILE_CORRUPTED;
goto _return;
}
int64_t count = taosWriteFile(file, pFuncInfo->pCode, pFuncInfo->codeSize);
if (count != pFuncInfo->codeSize) {
fnError("udfd write udf shared library failed");
msgInfo->code = TSDB_CODE_FILE_CORRUPTED;
goto _return;
}
taosCloseFile(&file);
strncpy(udf->path, path, PATH_MAX);
tFreeSFuncInfo(pFuncInfo);
taosArrayDestroy(retrieveRsp.pFuncInfos);
msgInfo->code = 0;
taosArrayDestroy(retrieveRsp.pFuncVersions);
}
_return:
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册