未验证 提交 28a80be7 编写于 作者: D dapan1121 提交者: GitHub

Merge pull request #20928 from taosdata/szhou/replace-function-2

enhance: udf handle expired after 10s
...@@ -343,7 +343,7 @@ typedef struct SUdfcFuncStub { ...@@ -343,7 +343,7 @@ typedef struct SUdfcFuncStub {
char udfName[TSDB_FUNC_NAME_LEN + 1]; char udfName[TSDB_FUNC_NAME_LEN + 1];
UdfcFuncHandle handle; UdfcFuncHandle handle;
int32_t refCount; int32_t refCount;
int64_t lastRefTime; int64_t createTime;
} SUdfcFuncStub; } SUdfcFuncStub;
typedef struct SUdfcProxy { typedef struct SUdfcProxy {
...@@ -363,6 +363,7 @@ typedef struct SUdfcProxy { ...@@ -363,6 +363,7 @@ typedef struct SUdfcProxy {
uv_mutex_t udfStubsMutex; uv_mutex_t udfStubsMutex;
SArray *udfStubs; // SUdfcFuncStub SArray *udfStubs; // SUdfcFuncStub
SArray *expiredUdfStubs; //SUdfcFuncStub
uv_mutex_t udfcUvMutex; uv_mutex_t udfcUvMutex;
int8_t initialized; int8_t initialized;
...@@ -959,7 +960,7 @@ int32_t udfcOpen(); ...@@ -959,7 +960,7 @@ int32_t udfcOpen();
int32_t udfcClose(); int32_t udfcClose();
int32_t acquireUdfFuncHandle(char *udfName, UdfcFuncHandle *pHandle); int32_t acquireUdfFuncHandle(char *udfName, UdfcFuncHandle *pHandle);
void releaseUdfFuncHandle(char *udfName); void releaseUdfFuncHandle(char *udfName, UdfcFuncHandle handle);
int32_t cleanUpUdfs(); int32_t cleanUpUdfs();
bool udfAggGetEnv(struct SFunctionNode *pFunc, SFuncExecEnv *pEnv); bool udfAggGetEnv(struct SFunctionNode *pFunc, SFuncExecEnv *pEnv);
...@@ -967,6 +968,8 @@ bool udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo *pRes ...@@ -967,6 +968,8 @@ bool udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo *pRes
int32_t udfAggProcess(struct SqlFunctionCtx *pCtx); int32_t udfAggProcess(struct SqlFunctionCtx *pCtx);
int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock *pBlock); int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock *pBlock);
void cleanupNotExpiredUdfs();
void cleanupExpiredUdfs();
int compareUdfcFuncSub(const void *elem1, const void *elem2) { int compareUdfcFuncSub(const void *elem1, const void *elem2) {
SUdfcFuncStub *stub1 = (SUdfcFuncStub *)elem1; SUdfcFuncStub *stub1 = (SUdfcFuncStub *)elem1;
SUdfcFuncStub *stub2 = (SUdfcFuncStub *)elem2; SUdfcFuncStub *stub2 = (SUdfcFuncStub *)elem2;
...@@ -982,16 +985,24 @@ int32_t acquireUdfFuncHandle(char *udfName, UdfcFuncHandle *pHandle) { ...@@ -982,16 +985,24 @@ int32_t acquireUdfFuncHandle(char *udfName, UdfcFuncHandle *pHandle) {
if (stubIndex != -1) { if (stubIndex != -1) {
SUdfcFuncStub *foundStub = taosArrayGet(gUdfcProxy.udfStubs, stubIndex); SUdfcFuncStub *foundStub = taosArrayGet(gUdfcProxy.udfStubs, stubIndex);
UdfcFuncHandle handle = foundStub->handle; UdfcFuncHandle handle = foundStub->handle;
if (handle != NULL && ((SUdfcUvSession *)handle)->udfUvPipe != NULL) { int64_t currUs = taosGetTimestampUs();
*pHandle = foundStub->handle; bool expired = (currUs - foundStub->createTime) >= 10 * 1000 * 1000;
++foundStub->refCount; if (!expired) {
foundStub->lastRefTime = taosGetTimestampUs(); if (handle != NULL && ((SUdfcUvSession *)handle)->udfUvPipe != NULL) {
uv_mutex_unlock(&gUdfcProxy.udfStubsMutex); *pHandle = foundStub->handle;
return 0; ++foundStub->refCount;
uv_mutex_unlock(&gUdfcProxy.udfStubsMutex);
return 0;
} else {
fnInfo("udf invalid handle for %s, refCount: %d, create time: %" PRId64 ". remove it from cache", udfName,
foundStub->refCount, foundStub->createTime);
taosArrayRemove(gUdfcProxy.udfStubs, stubIndex);
}
} else { } else {
fnInfo("invalid handle for %s, refCount: %d, last ref time: %" PRId64 ". remove it from cache", udfName, fnInfo("udf handle expired for %s, will setup udf. move it to expired list", udfName);
foundStub->refCount, foundStub->lastRefTime);
taosArrayRemove(gUdfcProxy.udfStubs, stubIndex); taosArrayRemove(gUdfcProxy.udfStubs, stubIndex);
taosArrayPush(gUdfcProxy.expiredUdfStubs, foundStub);
taosArraySort(gUdfcProxy.expiredUdfStubs, compareUdfcFuncSub);
} }
} }
*pHandle = NULL; *pHandle = NULL;
...@@ -1001,7 +1012,7 @@ int32_t acquireUdfFuncHandle(char *udfName, UdfcFuncHandle *pHandle) { ...@@ -1001,7 +1012,7 @@ int32_t acquireUdfFuncHandle(char *udfName, UdfcFuncHandle *pHandle) {
strncpy(stub.udfName, udfName, TSDB_FUNC_NAME_LEN); strncpy(stub.udfName, udfName, TSDB_FUNC_NAME_LEN);
stub.handle = *pHandle; stub.handle = *pHandle;
++stub.refCount; ++stub.refCount;
stub.lastRefTime = taosGetTimestampUs(); stub.createTime = taosGetTimestampUs();
taosArrayPush(gUdfcProxy.udfStubs, &stub); taosArrayPush(gUdfcProxy.udfStubs, &stub);
taosArraySort(gUdfcProxy.udfStubs, compareUdfcFuncSub); taosArraySort(gUdfcProxy.udfStubs, compareUdfcFuncSub);
} else { } else {
...@@ -1012,32 +1023,51 @@ int32_t acquireUdfFuncHandle(char *udfName, UdfcFuncHandle *pHandle) { ...@@ -1012,32 +1023,51 @@ int32_t acquireUdfFuncHandle(char *udfName, UdfcFuncHandle *pHandle) {
return code; return code;
} }
void releaseUdfFuncHandle(char *udfName) { void releaseUdfFuncHandle(char *udfName, UdfcFuncHandle handle) {
uv_mutex_lock(&gUdfcProxy.udfStubsMutex); uv_mutex_lock(&gUdfcProxy.udfStubsMutex);
SUdfcFuncStub key = {0}; SUdfcFuncStub key = {0};
strncpy(key.udfName, udfName, TSDB_FUNC_NAME_LEN); strncpy(key.udfName, udfName, TSDB_FUNC_NAME_LEN);
SUdfcFuncStub *foundStub = taosArraySearch(gUdfcProxy.udfStubs, &key, compareUdfcFuncSub, TD_EQ); SUdfcFuncStub *foundStub = taosArraySearch(gUdfcProxy.udfStubs, &key, compareUdfcFuncSub, TD_EQ);
if (!foundStub) { SUdfcFuncStub *expiredStub = taosArraySearch(gUdfcProxy.expiredUdfStubs, &key, compareUdfcFuncSub, TD_EQ);
if (!foundStub && !expiredStub) {
uv_mutex_unlock(&gUdfcProxy.udfStubsMutex); uv_mutex_unlock(&gUdfcProxy.udfStubsMutex);
return; return;
} }
if (foundStub->refCount > 0) { if (foundStub != NULL && foundStub->handle == handle && foundStub->refCount > 0) {
--foundStub->refCount; --foundStub->refCount;
} }
if (expiredStub != NULL && expiredStub->handle == handle && expiredStub->refCount > 0) {
--expiredStub->refCount;
}
uv_mutex_unlock(&gUdfcProxy.udfStubsMutex); uv_mutex_unlock(&gUdfcProxy.udfStubsMutex);
} }
int32_t cleanUpUdfs() { void cleanupExpiredUdfs() {
int8_t initialized = atomic_load_8(&gUdfcProxy.initialized); int32_t i = 0;
if (!initialized) { SArray *expiredUdfStubs = taosArrayInit(16, sizeof(SUdfcFuncStub));
return TSDB_CODE_SUCCESS; while (i < taosArrayGetSize(gUdfcProxy.expiredUdfStubs)) {
SUdfcFuncStub *stub = taosArrayGet(gUdfcProxy.expiredUdfStubs, i);
if (stub->refCount == 0) {
fnInfo("tear down udf. expired. udf name: %s, handle: %p, ref count: %d", stub->udfName, stub->handle, stub->refCount);
doTeardownUdf(stub->handle);
} else {
fnInfo("udf still in use. expired. udf name: %s, ref count: %d, create time: %" PRId64 ", handle: %p", stub->udfName,
stub->refCount, stub->createTime, stub->handle);
UdfcFuncHandle handle = stub->handle;
if (handle != NULL && ((SUdfcUvSession *)handle)->udfUvPipe != NULL) {
taosArrayPush(expiredUdfStubs, stub);
} else {
fnInfo("udf invalid handle for %s, expired. refCount: %d, create time: %" PRId64 ". remove it from cache",
stub->udfName, stub->refCount, stub->createTime);
}
}
++i;
} }
taosArrayDestroy(gUdfcProxy.expiredUdfStubs);
gUdfcProxy.expiredUdfStubs = expiredUdfStubs;
}
uv_mutex_lock(&gUdfcProxy.udfStubsMutex); void cleanupNotExpiredUdfs() {
if (gUdfcProxy.udfStubs == NULL || taosArrayGetSize(gUdfcProxy.udfStubs) == 0) {
uv_mutex_unlock(&gUdfcProxy.udfStubsMutex);
return TSDB_CODE_SUCCESS;
}
SArray *udfStubs = taosArrayInit(16, sizeof(SUdfcFuncStub)); SArray *udfStubs = taosArrayInit(16, sizeof(SUdfcFuncStub));
int32_t i = 0; int32_t i = 0;
while (i < taosArrayGetSize(gUdfcProxy.udfStubs)) { while (i < taosArrayGetSize(gUdfcProxy.udfStubs)) {
...@@ -1046,20 +1076,38 @@ int32_t cleanUpUdfs() { ...@@ -1046,20 +1076,38 @@ int32_t cleanUpUdfs() {
fnInfo("tear down udf. udf name: %s, handle: %p, ref count: %d", stub->udfName, stub->handle, stub->refCount); fnInfo("tear down udf. udf name: %s, handle: %p, ref count: %d", stub->udfName, stub->handle, stub->refCount);
doTeardownUdf(stub->handle); doTeardownUdf(stub->handle);
} else { } else {
fnInfo("udf still in use. udf name: %s, ref count: %d, last ref time: %" PRId64 ", handle: %p", stub->udfName, fnInfo("udf still in use. udf name: %s, ref count: %d, create time: %" PRId64 ", handle: %p", stub->udfName,
stub->refCount, stub->lastRefTime, stub->handle); stub->refCount, stub->createTime, stub->handle);
UdfcFuncHandle handle = stub->handle; UdfcFuncHandle handle = stub->handle;
if (handle != NULL && ((SUdfcUvSession *)handle)->udfUvPipe != NULL) { if (handle != NULL && ((SUdfcUvSession *)handle)->udfUvPipe != NULL) {
taosArrayPush(udfStubs, stub); taosArrayPush(udfStubs, stub);
} else { } else {
fnInfo("udf invalid handle for %s, refCount: %d, last ref time: %" PRId64 ". remove it from cache", fnInfo("udf invalid handle for %s, refCount: %d, create time: %" PRId64 ". remove it from cache",
stub->udfName, stub->refCount, stub->lastRefTime); stub->udfName, stub->refCount, stub->createTime);
} }
} }
++i; ++i;
} }
taosArrayDestroy(gUdfcProxy.udfStubs); taosArrayDestroy(gUdfcProxy.udfStubs);
gUdfcProxy.udfStubs = udfStubs; gUdfcProxy.udfStubs = udfStubs;
}
int32_t cleanUpUdfs() {
int8_t initialized = atomic_load_8(&gUdfcProxy.initialized);
if (!initialized) {
return TSDB_CODE_SUCCESS;
}
uv_mutex_lock(&gUdfcProxy.udfStubsMutex);
if ((gUdfcProxy.udfStubs == NULL || taosArrayGetSize(gUdfcProxy.udfStubs) == 0) &&
(gUdfcProxy.expiredUdfStubs == NULL || taosArrayGetSize(gUdfcProxy.expiredUdfStubs) == 0)) {
uv_mutex_unlock(&gUdfcProxy.udfStubsMutex);
return TSDB_CODE_SUCCESS;
}
cleanupNotExpiredUdfs();
cleanupExpiredUdfs();
uv_mutex_unlock(&gUdfcProxy.udfStubsMutex); uv_mutex_unlock(&gUdfcProxy.udfStubsMutex);
return 0; return 0;
} }
...@@ -1075,7 +1123,7 @@ int32_t callUdfScalarFunc(char *udfName, SScalarParam *input, int32_t numOfCols, ...@@ -1075,7 +1123,7 @@ int32_t callUdfScalarFunc(char *udfName, SScalarParam *input, int32_t numOfCols,
code = doCallUdfScalarFunc(handle, input, numOfCols, output); code = doCallUdfScalarFunc(handle, input, numOfCols, output);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
fnError("udfc scalar function execution failure"); fnError("udfc scalar function execution failure");
releaseUdfFuncHandle(udfName); releaseUdfFuncHandle(udfName, handle);
return code; return code;
} }
...@@ -1089,7 +1137,7 @@ int32_t callUdfScalarFunc(char *udfName, SScalarParam *input, int32_t numOfCols, ...@@ -1089,7 +1137,7 @@ int32_t callUdfScalarFunc(char *udfName, SScalarParam *input, int32_t numOfCols,
code = TSDB_CODE_UDF_INVALID_OUTPUT_TYPE; code = TSDB_CODE_UDF_INVALID_OUTPUT_TYPE;
} }
} }
releaseUdfFuncHandle(udfName); releaseUdfFuncHandle(udfName, handle);
return code; return code;
} }
...@@ -1122,7 +1170,7 @@ bool udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo *pResult ...@@ -1122,7 +1170,7 @@ bool udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo *pResult
SUdfInterBuf buf = {0}; SUdfInterBuf buf = {0};
if ((udfCode = doCallUdfAggInit(handle, &buf)) != 0) { if ((udfCode = doCallUdfAggInit(handle, &buf)) != 0) {
fnError("udfAggInit error. step doCallUdfAggInit. udf code: %d", udfCode); fnError("udfAggInit error. step doCallUdfAggInit. udf code: %d", udfCode);
releaseUdfFuncHandle(pCtx->udfName); releaseUdfFuncHandle(pCtx->udfName, handle);
return false; return false;
} }
if (buf.bufLen <= session->bufSize) { if (buf.bufLen <= session->bufSize) {
...@@ -1131,10 +1179,10 @@ bool udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo *pResult ...@@ -1131,10 +1179,10 @@ bool udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo *pResult
udfRes->interResNum = buf.numOfResult; udfRes->interResNum = buf.numOfResult;
} else { } else {
fnError("udfc inter buf size %d is greater than function bufSize %d", buf.bufLen, session->bufSize); fnError("udfc inter buf size %d is greater than function bufSize %d", buf.bufLen, session->bufSize);
releaseUdfFuncHandle(pCtx->udfName); releaseUdfFuncHandle(pCtx->udfName, handle);
return false; return false;
} }
releaseUdfFuncHandle(pCtx->udfName); releaseUdfFuncHandle(pCtx->udfName, handle);
freeUdfInterBuf(&buf); freeUdfInterBuf(&buf);
return true; return true;
} }
...@@ -1191,7 +1239,7 @@ int32_t udfAggProcess(struct SqlFunctionCtx *pCtx) { ...@@ -1191,7 +1239,7 @@ int32_t udfAggProcess(struct SqlFunctionCtx *pCtx) {
taosArrayDestroy(pTempBlock->pDataBlock); taosArrayDestroy(pTempBlock->pDataBlock);
taosMemoryFree(pTempBlock); taosMemoryFree(pTempBlock);
releaseUdfFuncHandle(pCtx->udfName); releaseUdfFuncHandle(pCtx->udfName, handle);
freeUdfInterBuf(&newState); freeUdfInterBuf(&newState);
return udfCode; return udfCode;
} }
...@@ -1236,7 +1284,7 @@ int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock *pBlock) { ...@@ -1236,7 +1284,7 @@ int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock *pBlock) {
freeUdfInterBuf(&resultBuf); freeUdfInterBuf(&resultBuf);
int32_t numOfResults = functionFinalizeWithResultBuf(pCtx, pBlock, udfRes->finalResBuf); int32_t numOfResults = functionFinalizeWithResultBuf(pCtx, pBlock, udfRes->finalResBuf);
releaseUdfFuncHandle(pCtx->udfName); releaseUdfFuncHandle(pCtx->udfName, handle);
return udfCallCode == 0 ? numOfResults : udfCallCode; return udfCallCode == 0 ? numOfResults : udfCallCode;
} }
...@@ -1663,6 +1711,7 @@ int32_t udfcOpen() { ...@@ -1663,6 +1711,7 @@ int32_t udfcOpen() {
uv_barrier_wait(&proxy->initBarrier); uv_barrier_wait(&proxy->initBarrier);
uv_mutex_init(&proxy->udfStubsMutex); uv_mutex_init(&proxy->udfStubsMutex);
proxy->udfStubs = taosArrayInit(8, sizeof(SUdfcFuncStub)); proxy->udfStubs = taosArrayInit(8, sizeof(SUdfcFuncStub));
proxy->expiredUdfStubs = taosArrayInit(8, sizeof(SUdfcFuncStub));
uv_mutex_init(&proxy->udfcUvMutex); uv_mutex_init(&proxy->udfcUvMutex);
fnInfo("udfc initialized") return 0; fnInfo("udfc initialized") return 0;
} }
...@@ -1679,6 +1728,7 @@ int32_t udfcClose() { ...@@ -1679,6 +1728,7 @@ int32_t udfcClose() {
uv_thread_join(&udfc->loopThread); uv_thread_join(&udfc->loopThread);
uv_mutex_destroy(&udfc->taskQueueMutex); uv_mutex_destroy(&udfc->taskQueueMutex);
uv_barrier_destroy(&udfc->initBarrier); uv_barrier_destroy(&udfc->initBarrier);
taosArrayDestroy(udfc->expiredUdfStubs);
taosArrayDestroy(udfc->udfStubs); taosArrayDestroy(udfc->udfStubs);
uv_mutex_destroy(&udfc->udfStubsMutex); uv_mutex_destroy(&udfc->udfStubsMutex);
uv_mutex_destroy(&udfc->udfcUvMutex); uv_mutex_destroy(&udfc->udfcUvMutex);
......
...@@ -591,7 +591,7 @@ SUdf *udfdNewUdf(const char *udfName) { ...@@ -591,7 +591,7 @@ SUdf *udfdNewUdf(const char *udfName) {
SUdf *udfdGetOrCreateUdf(const char *udfName) { SUdf *udfdGetOrCreateUdf(const char *udfName) {
uv_mutex_lock(&global.udfsMutex); uv_mutex_lock(&global.udfsMutex);
SUdf **pUdfHash = taosHashGet(global.udfsHash, udfName, strlen(udfName)); SUdf **pUdfHash = taosHashGet(global.udfsHash, udfName, strlen(udfName));
int64_t currTime = taosGetTimestampSec(); int64_t currTime = taosGetTimestampMs();
bool expired = false; bool expired = false;
if (pUdfHash) { if (pUdfHash) {
expired = currTime - (*pUdfHash)->lastFetchTime > 10 * 1000; // 10s expired = currTime - (*pUdfHash)->lastFetchTime > 10 * 1000; // 10s
...@@ -688,6 +688,8 @@ void udfdProcessCallRequest(SUvUdfWork *uvUdf, SUdfRequest *request) { ...@@ -688,6 +688,8 @@ void udfdProcessCallRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
output.colMeta.type = udf->outputType; output.colMeta.type = udf->outputType;
output.colMeta.precision = 0; output.colMeta.precision = 0;
output.colMeta.scale = 0; output.colMeta.scale = 0;
udfColEnsureCapacity(&output, call->block.info.rows);
SUdfDataBlock input = {0}; SUdfDataBlock input = {0};
convertDataBlockToUdfDataBlock(&call->block, &input); convertDataBlockToUdfDataBlock(&call->block, &input);
code = udf->scriptPlugin->udfScalarProcFunc(&input, &output, udf->scriptUdfCtx); code = udf->scriptPlugin->udfScalarProcFunc(&input, &output, udf->scriptUdfCtx);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册