提交 1a0c9f31 编写于 作者: S shenglian zhou

enhance: refactor cleanup udf function

上级 430457e5
...@@ -968,6 +968,8 @@ bool udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo *pRes ...@@ -968,6 +968,8 @@ bool udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo *pRes
int32_t udfAggProcess(struct SqlFunctionCtx *pCtx); int32_t udfAggProcess(struct SqlFunctionCtx *pCtx);
int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock *pBlock); int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock *pBlock);
void cleanupNotExpiredUdfs();
void cleanupExpiredUdfs();
int compareUdfcFuncSub(const void *elem1, const void *elem2) { int compareUdfcFuncSub(const void *elem1, const void *elem2) {
SUdfcFuncStub *stub1 = (SUdfcFuncStub *)elem1; SUdfcFuncStub *stub1 = (SUdfcFuncStub *)elem1;
SUdfcFuncStub *stub2 = (SUdfcFuncStub *)elem2; SUdfcFuncStub *stub2 = (SUdfcFuncStub *)elem2;
...@@ -1040,62 +1042,72 @@ void releaseUdfFuncHandle(char *udfName, UdfcFuncHandle handle) { ...@@ -1040,62 +1042,72 @@ void releaseUdfFuncHandle(char *udfName, UdfcFuncHandle handle) {
uv_mutex_unlock(&gUdfcProxy.udfStubsMutex); uv_mutex_unlock(&gUdfcProxy.udfStubsMutex);
} }
int32_t cleanUpUdfs() { void cleanupExpiredUdfs() {
int8_t initialized = atomic_load_8(&gUdfcProxy.initialized);
if (!initialized) {
return TSDB_CODE_SUCCESS;
}
uv_mutex_lock(&gUdfcProxy.udfStubsMutex);
if ((gUdfcProxy.udfStubs == NULL || taosArrayGetSize(gUdfcProxy.udfStubs) == 0) &&
(gUdfcProxy.expiredUdfStubs == NULL || taosArrayGetSize(gUdfcProxy.expiredUdfStubs) == 0)) {
uv_mutex_unlock(&gUdfcProxy.udfStubsMutex);
return TSDB_CODE_SUCCESS;
}
SArray *udfStubs = taosArrayInit(16, sizeof(SUdfcFuncStub));
int32_t i = 0; int32_t i = 0;
while (i < taosArrayGetSize(gUdfcProxy.udfStubs)) { SArray *expiredUdfStubs = taosArrayInit(16, sizeof(SUdfcFuncStub));
SUdfcFuncStub *stub = taosArrayGet(gUdfcProxy.udfStubs, i); while (i < taosArrayGetSize(gUdfcProxy.expiredUdfStubs)) {
SUdfcFuncStub *stub = taosArrayGet(gUdfcProxy.expiredUdfStubs, i);
if (stub->refCount == 0) { if (stub->refCount == 0) {
fnInfo("tear down udf. udf name: %s, handle: %p, ref count: %d", stub->udfName, stub->handle, stub->refCount); fnInfo("tear down udf. expired. udf name: %s, handle: %p, ref count: %d", stub->udfName, stub->handle, stub->refCount);
doTeardownUdf(stub->handle); doTeardownUdf(stub->handle);
} else { } else {
fnInfo("udf still in use. udf name: %s, ref count: %d, create time: %" PRId64 ", handle: %p", stub->udfName, fnInfo("udf still in use. expired. udf name: %s, ref count: %d, create time: %" PRId64 ", handle: %p", stub->udfName,
stub->refCount, stub->createTime, stub->handle); stub->refCount, stub->createTime, stub->handle);
UdfcFuncHandle handle = stub->handle; UdfcFuncHandle handle = stub->handle;
if (handle != NULL && ((SUdfcUvSession *)handle)->udfUvPipe != NULL) { if (handle != NULL && ((SUdfcUvSession *)handle)->udfUvPipe != NULL) {
taosArrayPush(udfStubs, stub); taosArrayPush(expiredUdfStubs, stub);
} else { } else {
fnInfo("udf invalid handle for %s, refCount: %d, create time: %" PRId64 ". remove it from cache", fnInfo("udf invalid handle for %s, expired. refCount: %d, create time: %" PRId64 ". remove it from cache",
stub->udfName, stub->refCount, stub->createTime); stub->udfName, stub->refCount, stub->createTime);
} }
} }
++i; ++i;
} }
taosArrayDestroy(gUdfcProxy.udfStubs); taosArrayDestroy(gUdfcProxy.expiredUdfStubs);
gUdfcProxy.udfStubs = udfStubs; gUdfcProxy.expiredUdfStubs = expiredUdfStubs;
}
SArray *expiredUdfStubs = taosArrayInit(16, sizeof(SUdfcFuncStub)); void cleanupNotExpiredUdfs() {
while (i < taosArrayGetSize(gUdfcProxy.expiredUdfStubs)) { SArray *udfStubs = taosArrayInit(16, sizeof(SUdfcFuncStub));
SUdfcFuncStub *stub = taosArrayGet(gUdfcProxy.expiredUdfStubs, i); int32_t i = 0;
while (i < taosArrayGetSize(gUdfcProxy.udfStubs)) {
SUdfcFuncStub *stub = taosArrayGet(gUdfcProxy.udfStubs, i);
if (stub->refCount == 0) { if (stub->refCount == 0) {
fnInfo("tear down udf. expired. udf name: %s, handle: %p, ref count: %d", stub->udfName, stub->handle, stub->refCount); fnInfo("tear down udf. udf name: %s, handle: %p, ref count: %d", stub->udfName, stub->handle, stub->refCount);
doTeardownUdf(stub->handle); doTeardownUdf(stub->handle);
} else { } else {
fnInfo("udf still in use. expired. udf name: %s, ref count: %d, create time: %" PRId64 ", handle: %p", stub->udfName, fnInfo("udf still in use. udf name: %s, ref count: %d, create time: %" PRId64 ", handle: %p", stub->udfName,
stub->refCount, stub->createTime, stub->handle); stub->refCount, stub->createTime, stub->handle);
UdfcFuncHandle handle = stub->handle; UdfcFuncHandle handle = stub->handle;
if (handle != NULL && ((SUdfcUvSession *)handle)->udfUvPipe != NULL) { if (handle != NULL && ((SUdfcUvSession *)handle)->udfUvPipe != NULL) {
taosArrayPush(expiredUdfStubs, stub); taosArrayPush(udfStubs, stub);
} else { } else {
fnInfo("udf invalid handle for %s, expired. refCount: %d, create time: %" PRId64 ". remove it from cache", fnInfo("udf invalid handle for %s, refCount: %d, create time: %" PRId64 ". remove it from cache",
stub->udfName, stub->refCount, stub->createTime); stub->udfName, stub->refCount, stub->createTime);
} }
} }
++i; ++i;
} }
taosArrayDestroy(gUdfcProxy.expiredUdfStubs); taosArrayDestroy(gUdfcProxy.udfStubs);
gUdfcProxy.expiredUdfStubs = expiredUdfStubs; gUdfcProxy.udfStubs = udfStubs;
}
int32_t cleanUpUdfs() {
int8_t initialized = atomic_load_8(&gUdfcProxy.initialized);
if (!initialized) {
return TSDB_CODE_SUCCESS;
}
uv_mutex_lock(&gUdfcProxy.udfStubsMutex);
if ((gUdfcProxy.udfStubs == NULL || taosArrayGetSize(gUdfcProxy.udfStubs) == 0) &&
(gUdfcProxy.expiredUdfStubs == NULL || taosArrayGetSize(gUdfcProxy.expiredUdfStubs) == 0)) {
uv_mutex_unlock(&gUdfcProxy.udfStubsMutex);
return TSDB_CODE_SUCCESS;
}
cleanupNotExpiredUdfs();
cleanupExpiredUdfs();
uv_mutex_unlock(&gUdfcProxy.udfStubsMutex); uv_mutex_unlock(&gUdfcProxy.udfStubsMutex);
return 0; return 0;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册