diff --git a/source/libs/function/src/tudf.c b/source/libs/function/src/tudf.c index f13235f24bac6405b82ee7d4e4a747895055a5a8..8c8b99a6f804b2b7200e0641ca3d5bdb2660292c 100644 --- a/source/libs/function/src/tudf.c +++ b/source/libs/function/src/tudf.c @@ -968,6 +968,8 @@ bool udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo *pRes int32_t udfAggProcess(struct SqlFunctionCtx *pCtx); int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock *pBlock); +void cleanupNotExpiredUdfs(); +void cleanupExpiredUdfs(); int compareUdfcFuncSub(const void *elem1, const void *elem2) { SUdfcFuncStub *stub1 = (SUdfcFuncStub *)elem1; SUdfcFuncStub *stub2 = (SUdfcFuncStub *)elem2; @@ -1040,62 +1042,72 @@ void releaseUdfFuncHandle(char *udfName, UdfcFuncHandle handle) { uv_mutex_unlock(&gUdfcProxy.udfStubsMutex); } -int32_t cleanUpUdfs() { - int8_t initialized = atomic_load_8(&gUdfcProxy.initialized); - if (!initialized) { - return TSDB_CODE_SUCCESS; - } - - uv_mutex_lock(&gUdfcProxy.udfStubsMutex); - if ((gUdfcProxy.udfStubs == NULL || taosArrayGetSize(gUdfcProxy.udfStubs) == 0) && - (gUdfcProxy.expiredUdfStubs == NULL || taosArrayGetSize(gUdfcProxy.expiredUdfStubs) == 0)) { - uv_mutex_unlock(&gUdfcProxy.udfStubsMutex); - return TSDB_CODE_SUCCESS; - } - SArray *udfStubs = taosArrayInit(16, sizeof(SUdfcFuncStub)); +void cleanupExpiredUdfs() { int32_t i = 0; - while (i < taosArrayGetSize(gUdfcProxy.udfStubs)) { - SUdfcFuncStub *stub = taosArrayGet(gUdfcProxy.udfStubs, i); + SArray *expiredUdfStubs = taosArrayInit(16, sizeof(SUdfcFuncStub)); + while (i < taosArrayGetSize(gUdfcProxy.expiredUdfStubs)) { + SUdfcFuncStub *stub = taosArrayGet(gUdfcProxy.expiredUdfStubs, i); if (stub->refCount == 0) { - fnInfo("tear down udf. udf name: %s, handle: %p, ref count: %d", stub->udfName, stub->handle, stub->refCount); + fnInfo("tear down udf. expired. udf name: %s, handle: %p, ref count: %d", stub->udfName, stub->handle, stub->refCount); doTeardownUdf(stub->handle); } else { - fnInfo("udf still in use. udf name: %s, ref count: %d, create time: %" PRId64 ", handle: %p", stub->udfName, + fnInfo("udf still in use. expired. udf name: %s, ref count: %d, create time: %" PRId64 ", handle: %p", stub->udfName, stub->refCount, stub->createTime, stub->handle); UdfcFuncHandle handle = stub->handle; if (handle != NULL && ((SUdfcUvSession *)handle)->udfUvPipe != NULL) { - taosArrayPush(udfStubs, stub); + taosArrayPush(expiredUdfStubs, stub); } else { - fnInfo("udf invalid handle for %s, refCount: %d, create time: %" PRId64 ". remove it from cache", + fnInfo("udf invalid handle for %s, expired. refCount: %d, create time: %" PRId64 ". remove it from cache", stub->udfName, stub->refCount, stub->createTime); } } ++i; } - taosArrayDestroy(gUdfcProxy.udfStubs); - gUdfcProxy.udfStubs = udfStubs; + taosArrayDestroy(gUdfcProxy.expiredUdfStubs); + gUdfcProxy.expiredUdfStubs = expiredUdfStubs; +} - SArray *expiredUdfStubs = taosArrayInit(16, sizeof(SUdfcFuncStub)); - while (i < taosArrayGetSize(gUdfcProxy.expiredUdfStubs)) { - SUdfcFuncStub *stub = taosArrayGet(gUdfcProxy.expiredUdfStubs, i); +void cleanupNotExpiredUdfs() { + SArray *udfStubs = taosArrayInit(16, sizeof(SUdfcFuncStub)); + int32_t i = 0; + while (i < taosArrayGetSize(gUdfcProxy.udfStubs)) { + SUdfcFuncStub *stub = taosArrayGet(gUdfcProxy.udfStubs, i); if (stub->refCount == 0) { - fnInfo("tear down udf. expired. udf name: %s, handle: %p, ref count: %d", stub->udfName, stub->handle, stub->refCount); + fnInfo("tear down udf. udf name: %s, handle: %p, ref count: %d", stub->udfName, stub->handle, stub->refCount); doTeardownUdf(stub->handle); } else { - fnInfo("udf still in use. expired. udf name: %s, ref count: %d, create time: %" PRId64 ", handle: %p", stub->udfName, + fnInfo("udf still in use. udf name: %s, ref count: %d, create time: %" PRId64 ", handle: %p", stub->udfName, stub->refCount, stub->createTime, stub->handle); UdfcFuncHandle handle = stub->handle; if (handle != NULL && ((SUdfcUvSession *)handle)->udfUvPipe != NULL) { - taosArrayPush(expiredUdfStubs, stub); + taosArrayPush(udfStubs, stub); } else { - fnInfo("udf invalid handle for %s, expired. refCount: %d, create time: %" PRId64 ". remove it from cache", + fnInfo("udf invalid handle for %s, refCount: %d, create time: %" PRId64 ". remove it from cache", stub->udfName, stub->refCount, stub->createTime); } } ++i; } - taosArrayDestroy(gUdfcProxy.expiredUdfStubs); - gUdfcProxy.expiredUdfStubs = expiredUdfStubs; + taosArrayDestroy(gUdfcProxy.udfStubs); + gUdfcProxy.udfStubs = udfStubs; +} + +int32_t cleanUpUdfs() { + int8_t initialized = atomic_load_8(&gUdfcProxy.initialized); + if (!initialized) { + return TSDB_CODE_SUCCESS; + } + + uv_mutex_lock(&gUdfcProxy.udfStubsMutex); + if ((gUdfcProxy.udfStubs == NULL || taosArrayGetSize(gUdfcProxy.udfStubs) == 0) && + (gUdfcProxy.expiredUdfStubs == NULL || taosArrayGetSize(gUdfcProxy.expiredUdfStubs) == 0)) { + uv_mutex_unlock(&gUdfcProxy.udfStubsMutex); + return TSDB_CODE_SUCCESS; + } + + cleanupNotExpiredUdfs(); + cleanupExpiredUdfs(); + uv_mutex_unlock(&gUdfcProxy.udfStubsMutex); return 0; }