diff --git a/source/libs/function/src/tudf.c b/source/libs/function/src/tudf.c index 53b2cf4aca1fc7029d530362999ab352fc8fe72e..f13235f24bac6405b82ee7d4e4a747895055a5a8 100644 --- a/source/libs/function/src/tudf.c +++ b/source/libs/function/src/tudf.c @@ -960,7 +960,7 @@ int32_t udfcOpen(); int32_t udfcClose(); int32_t acquireUdfFuncHandle(char *udfName, UdfcFuncHandle *pHandle); -void releaseUdfFuncHandle(char *udfName); +void releaseUdfFuncHandle(char *udfName, UdfcFuncHandle handle); int32_t cleanUpUdfs(); bool udfAggGetEnv(struct SFunctionNode *pFunc, SFuncExecEnv *pEnv); @@ -992,11 +992,12 @@ int32_t acquireUdfFuncHandle(char *udfName, UdfcFuncHandle *pHandle) { uv_mutex_unlock(&gUdfcProxy.udfStubsMutex); return 0; } else { - fnInfo("invalid handle for %s, refCount: %d, create time: %" PRId64 ". remove it from cache", udfName, + fnInfo("udf invalid handle for %s, refCount: %d, create time: %" PRId64 ". remove it from cache", udfName, foundStub->refCount, foundStub->createTime); taosArrayRemove(gUdfcProxy.udfStubs, stubIndex); } } else { + fnInfo("udf handle expired for %s, will setup udf. move it to expired list", udfName); taosArrayRemove(gUdfcProxy.udfStubs, stubIndex); taosArrayPush(gUdfcProxy.expiredUdfStubs, foundStub); taosArraySort(gUdfcProxy.expiredUdfStubs, compareUdfcFuncSub); @@ -1020,7 +1021,7 @@ int32_t acquireUdfFuncHandle(char *udfName, UdfcFuncHandle *pHandle) { return code; } -void releaseUdfFuncHandle(char *udfName) { +void releaseUdfFuncHandle(char *udfName, UdfcFuncHandle handle) { uv_mutex_lock(&gUdfcProxy.udfStubsMutex); SUdfcFuncStub key = {0}; strncpy(key.udfName, udfName, TSDB_FUNC_NAME_LEN); @@ -1030,10 +1031,10 @@ void releaseUdfFuncHandle(char *udfName) { uv_mutex_unlock(&gUdfcProxy.udfStubsMutex); return; } - if (foundStub != NULL && foundStub->refCount > 0) { + if (foundStub != NULL && foundStub->handle == handle && foundStub->refCount > 0) { --foundStub->refCount; } - if (expiredStub != NULL && expiredStub->refCount > 0) { + if (expiredStub != NULL && expiredStub->handle == handle && expiredStub->refCount > 0) { --expiredStub->refCount; } uv_mutex_unlock(&gUdfcProxy.udfStubsMutex); @@ -1046,7 +1047,8 @@ int32_t cleanUpUdfs() { } uv_mutex_lock(&gUdfcProxy.udfStubsMutex); - if (gUdfcProxy.udfStubs == NULL || taosArrayGetSize(gUdfcProxy.udfStubs) == 0) { + if ((gUdfcProxy.udfStubs == NULL || taosArrayGetSize(gUdfcProxy.udfStubs) == 0) && + (gUdfcProxy.expiredUdfStubs == NULL || taosArrayGetSize(gUdfcProxy.expiredUdfStubs) == 0)) { uv_mutex_unlock(&gUdfcProxy.udfStubsMutex); return TSDB_CODE_SUCCESS; } @@ -1092,7 +1094,7 @@ int32_t cleanUpUdfs() { } ++i; } - taosArrayDestroy(gUdfcProxy.udfStubs); + taosArrayDestroy(gUdfcProxy.expiredUdfStubs); gUdfcProxy.expiredUdfStubs = expiredUdfStubs; uv_mutex_unlock(&gUdfcProxy.udfStubsMutex); return 0; @@ -1109,7 +1111,7 @@ int32_t callUdfScalarFunc(char *udfName, SScalarParam *input, int32_t numOfCols, code = doCallUdfScalarFunc(handle, input, numOfCols, output); if (code != TSDB_CODE_SUCCESS) { fnError("udfc scalar function execution failure"); - releaseUdfFuncHandle(udfName); + releaseUdfFuncHandle(udfName, handle); return code; } @@ -1123,7 +1125,7 @@ int32_t callUdfScalarFunc(char *udfName, SScalarParam *input, int32_t numOfCols, code = TSDB_CODE_UDF_INVALID_OUTPUT_TYPE; } } - releaseUdfFuncHandle(udfName); + releaseUdfFuncHandle(udfName, handle); return code; } @@ -1156,7 +1158,7 @@ bool udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo *pResult SUdfInterBuf buf = {0}; if ((udfCode = doCallUdfAggInit(handle, &buf)) != 0) { fnError("udfAggInit error. step doCallUdfAggInit. udf code: %d", udfCode); - releaseUdfFuncHandle(pCtx->udfName); + releaseUdfFuncHandle(pCtx->udfName, handle); return false; } if (buf.bufLen <= session->bufSize) { @@ -1165,10 +1167,10 @@ bool udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo *pResult udfRes->interResNum = buf.numOfResult; } else { fnError("udfc inter buf size %d is greater than function bufSize %d", buf.bufLen, session->bufSize); - releaseUdfFuncHandle(pCtx->udfName); + releaseUdfFuncHandle(pCtx->udfName, handle); return false; } - releaseUdfFuncHandle(pCtx->udfName); + releaseUdfFuncHandle(pCtx->udfName, handle); freeUdfInterBuf(&buf); return true; } @@ -1225,7 +1227,7 @@ int32_t udfAggProcess(struct SqlFunctionCtx *pCtx) { taosArrayDestroy(pTempBlock->pDataBlock); taosMemoryFree(pTempBlock); - releaseUdfFuncHandle(pCtx->udfName); + releaseUdfFuncHandle(pCtx->udfName, handle); freeUdfInterBuf(&newState); return udfCode; } @@ -1270,7 +1272,7 @@ int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock *pBlock) { freeUdfInterBuf(&resultBuf); int32_t numOfResults = functionFinalizeWithResultBuf(pCtx, pBlock, udfRes->finalResBuf); - releaseUdfFuncHandle(pCtx->udfName); + releaseUdfFuncHandle(pCtx->udfName, handle); return udfCallCode == 0 ? numOfResults : udfCallCode; } diff --git a/source/libs/function/src/udfd.c b/source/libs/function/src/udfd.c index aa72309c62f7008c1191b2640444af5f51f0cd7c..61c6f92e2f8daff3735168e3ed92c3fd600a28c5 100644 --- a/source/libs/function/src/udfd.c +++ b/source/libs/function/src/udfd.c @@ -591,7 +591,7 @@ SUdf *udfdNewUdf(const char *udfName) { SUdf *udfdGetOrCreateUdf(const char *udfName) { uv_mutex_lock(&global.udfsMutex); SUdf **pUdfHash = taosHashGet(global.udfsHash, udfName, strlen(udfName)); - int64_t currTime = taosGetTimestampSec(); + int64_t currTime = taosGetTimestampMs(); bool expired = false; if (pUdfHash) { expired = currTime - (*pUdfHash)->lastFetchTime > 10 * 1000; // 10s