提交 99587db7 编写于 作者: S slzhou

fix: udf handle expired after 10s

上级 fe718f60
......@@ -960,7 +960,7 @@ int32_t udfcOpen();
int32_t udfcClose();
int32_t acquireUdfFuncHandle(char *udfName, UdfcFuncHandle *pHandle);
void releaseUdfFuncHandle(char *udfName);
void releaseUdfFuncHandle(char *udfName, UdfcFuncHandle handle);
int32_t cleanUpUdfs();
bool udfAggGetEnv(struct SFunctionNode *pFunc, SFuncExecEnv *pEnv);
......@@ -992,11 +992,12 @@ int32_t acquireUdfFuncHandle(char *udfName, UdfcFuncHandle *pHandle) {
uv_mutex_unlock(&gUdfcProxy.udfStubsMutex);
return 0;
} else {
fnInfo("invalid handle for %s, refCount: %d, create time: %" PRId64 ". remove it from cache", udfName,
fnInfo("udf invalid handle for %s, refCount: %d, create time: %" PRId64 ". remove it from cache", udfName,
foundStub->refCount, foundStub->createTime);
taosArrayRemove(gUdfcProxy.udfStubs, stubIndex);
}
} else {
fnInfo("udf handle expired for %s, will setup udf. move it to expired list", udfName);
taosArrayRemove(gUdfcProxy.udfStubs, stubIndex);
taosArrayPush(gUdfcProxy.expiredUdfStubs, foundStub);
taosArraySort(gUdfcProxy.expiredUdfStubs, compareUdfcFuncSub);
......@@ -1020,7 +1021,7 @@ int32_t acquireUdfFuncHandle(char *udfName, UdfcFuncHandle *pHandle) {
return code;
}
void releaseUdfFuncHandle(char *udfName) {
void releaseUdfFuncHandle(char *udfName, UdfcFuncHandle handle) {
uv_mutex_lock(&gUdfcProxy.udfStubsMutex);
SUdfcFuncStub key = {0};
strncpy(key.udfName, udfName, TSDB_FUNC_NAME_LEN);
......@@ -1030,10 +1031,10 @@ void releaseUdfFuncHandle(char *udfName) {
uv_mutex_unlock(&gUdfcProxy.udfStubsMutex);
return;
}
if (foundStub != NULL && foundStub->refCount > 0) {
if (foundStub != NULL && foundStub->handle == handle && foundStub->refCount > 0) {
--foundStub->refCount;
}
if (expiredStub != NULL && expiredStub->refCount > 0) {
if (expiredStub != NULL && expiredStub->handle == handle && expiredStub->refCount > 0) {
--expiredStub->refCount;
}
uv_mutex_unlock(&gUdfcProxy.udfStubsMutex);
......@@ -1046,7 +1047,8 @@ int32_t cleanUpUdfs() {
}
uv_mutex_lock(&gUdfcProxy.udfStubsMutex);
if (gUdfcProxy.udfStubs == NULL || taosArrayGetSize(gUdfcProxy.udfStubs) == 0) {
if ((gUdfcProxy.udfStubs == NULL || taosArrayGetSize(gUdfcProxy.udfStubs) == 0) &&
(gUdfcProxy.expiredUdfStubs == NULL || taosArrayGetSize(gUdfcProxy.expiredUdfStubs) == 0)) {
uv_mutex_unlock(&gUdfcProxy.udfStubsMutex);
return TSDB_CODE_SUCCESS;
}
......@@ -1092,7 +1094,7 @@ int32_t cleanUpUdfs() {
}
++i;
}
taosArrayDestroy(gUdfcProxy.udfStubs);
taosArrayDestroy(gUdfcProxy.expiredUdfStubs);
gUdfcProxy.expiredUdfStubs = expiredUdfStubs;
uv_mutex_unlock(&gUdfcProxy.udfStubsMutex);
return 0;
......@@ -1109,7 +1111,7 @@ int32_t callUdfScalarFunc(char *udfName, SScalarParam *input, int32_t numOfCols,
code = doCallUdfScalarFunc(handle, input, numOfCols, output);
if (code != TSDB_CODE_SUCCESS) {
fnError("udfc scalar function execution failure");
releaseUdfFuncHandle(udfName);
releaseUdfFuncHandle(udfName, handle);
return code;
}
......@@ -1123,7 +1125,7 @@ int32_t callUdfScalarFunc(char *udfName, SScalarParam *input, int32_t numOfCols,
code = TSDB_CODE_UDF_INVALID_OUTPUT_TYPE;
}
}
releaseUdfFuncHandle(udfName);
releaseUdfFuncHandle(udfName, handle);
return code;
}
......@@ -1156,7 +1158,7 @@ bool udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo *pResult
SUdfInterBuf buf = {0};
if ((udfCode = doCallUdfAggInit(handle, &buf)) != 0) {
fnError("udfAggInit error. step doCallUdfAggInit. udf code: %d", udfCode);
releaseUdfFuncHandle(pCtx->udfName);
releaseUdfFuncHandle(pCtx->udfName, handle);
return false;
}
if (buf.bufLen <= session->bufSize) {
......@@ -1165,10 +1167,10 @@ bool udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo *pResult
udfRes->interResNum = buf.numOfResult;
} else {
fnError("udfc inter buf size %d is greater than function bufSize %d", buf.bufLen, session->bufSize);
releaseUdfFuncHandle(pCtx->udfName);
releaseUdfFuncHandle(pCtx->udfName, handle);
return false;
}
releaseUdfFuncHandle(pCtx->udfName);
releaseUdfFuncHandle(pCtx->udfName, handle);
freeUdfInterBuf(&buf);
return true;
}
......@@ -1225,7 +1227,7 @@ int32_t udfAggProcess(struct SqlFunctionCtx *pCtx) {
taosArrayDestroy(pTempBlock->pDataBlock);
taosMemoryFree(pTempBlock);
releaseUdfFuncHandle(pCtx->udfName);
releaseUdfFuncHandle(pCtx->udfName, handle);
freeUdfInterBuf(&newState);
return udfCode;
}
......@@ -1270,7 +1272,7 @@ int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock *pBlock) {
freeUdfInterBuf(&resultBuf);
int32_t numOfResults = functionFinalizeWithResultBuf(pCtx, pBlock, udfRes->finalResBuf);
releaseUdfFuncHandle(pCtx->udfName);
releaseUdfFuncHandle(pCtx->udfName, handle);
return udfCallCode == 0 ? numOfResults : udfCallCode;
}
......
......@@ -591,7 +591,7 @@ SUdf *udfdNewUdf(const char *udfName) {
SUdf *udfdGetOrCreateUdf(const char *udfName) {
uv_mutex_lock(&global.udfsMutex);
SUdf **pUdfHash = taosHashGet(global.udfsHash, udfName, strlen(udfName));
int64_t currTime = taosGetTimestampSec();
int64_t currTime = taosGetTimestampMs();
bool expired = false;
if (pUdfHash) {
expired = currTime - (*pUdfHash)->lastFetchTime > 10 * 1000; // 10s
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册