From 5b35fcacd6c91033e44f2babeb0c81d17bf84555 Mon Sep 17 00:00:00 2001 From: slzhou Date: Mon, 16 May 2022 07:47:56 +0800 Subject: [PATCH] fix: teardown udf functions handles --- include/libs/function/tudf.h | 2 + source/libs/executor/src/executorMain.c | 8 ++- source/libs/function/src/tudf.c | 75 ++++++++++++++++++------- 3 files changed, 61 insertions(+), 24 deletions(-) diff --git a/include/libs/function/tudf.h b/include/libs/function/tudf.h index b37dcd2b61..ca5079a0a8 100644 --- a/include/libs/function/tudf.h +++ b/include/libs/function/tudf.h @@ -121,6 +121,8 @@ int32_t udfAggProcess(struct SqlFunctionCtx *pCtx); int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock* pBlock); int32_t callUdfScalarFunc(char *udfName, SScalarParam *input, int32_t numOfCols, SScalarParam *output); + +int32_t teardownUdfs(); // end API to taosd and qworker //============================================================================================================================= // begin API to UDF writer. diff --git a/source/libs/executor/src/executorMain.c b/source/libs/executor/src/executorMain.c index 3cc75a815d..0c464d9b43 100644 --- a/source/libs/executor/src/executorMain.c +++ b/source/libs/executor/src/executorMain.c @@ -21,13 +21,14 @@ #include "tcache.h" #include "tglobal.h" #include "tmsg.h" +#include "tudf.h" -#include "thash.h" -#include "executorimpl.h" #include "executor.h" +#include "executorimpl.h" +#include "query.h" +#include "thash.h" #include "tlosertree.h" #include "ttypes.h" -#include "query.h" typedef struct STaskMgmt { TdThreadMutex lock; @@ -156,6 +157,7 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds) { int32_t current = (*pRes != NULL)? (*pRes)->info.rows:0; pTaskInfo->totalRows += current; + teardownUdfs(); qDebug("%s task suspended, %d rows returned, total:%" PRId64 " rows, in sinkNode:%d, elapsed:%.2f ms", GET_TASKID(pTaskInfo), current, pTaskInfo->totalRows, 0, el/1000.0); diff --git a/source/libs/function/src/tudf.c b/source/libs/function/src/tudf.c index 35702eb71f..b5058a2d60 100644 --- a/source/libs/function/src/tudf.c +++ b/source/libs/function/src/tudf.c @@ -313,6 +313,7 @@ int64_t gUdfTaskSeqNum = 0; typedef struct SUdfcFuncStub { char udfName[TSDB_FUNC_NAME_LEN]; UdfcFuncHandle handle; + int32_t refCount; } SUdfcFuncStub; typedef struct SUdfcProxy { @@ -338,7 +339,7 @@ typedef struct SUdfcProxy { SUdfcProxy gUdfdProxy = {0}; -typedef struct SClientUdfUvSession { +typedef struct SUdfcUvSession { SUdfcProxy *udfc; int64_t severHandle; uv_pipe_t *udfUvPipe; @@ -346,7 +347,9 @@ typedef struct SClientUdfUvSession { int8_t outputType; int32_t outputLen; int32_t bufSize; -} SClientUdfUvSession; + + char udfName[TSDB_FUNC_NAME_LEN]; +} SUdfcUvSession; typedef struct SClientUvTaskNode { SUdfcProxy *udfc; @@ -369,7 +372,7 @@ typedef struct SClientUvTaskNode { typedef struct SClientUdfTask { int8_t type; - SClientUdfUvSession *session; + SUdfcUvSession *session; int32_t errCode; @@ -401,7 +404,7 @@ typedef struct SClientUvConn { uv_pipe_t *pipe; QUEUE taskQueue; SClientConnBuf readBuf; - SClientUdfUvSession *session; + SUdfcUvSession *session; } SClientUvConn; enum { @@ -825,11 +828,6 @@ void onUdfcPipeClose(uv_handle_t *handle) { taosMemoryFree(conn->readBuf.buf); taosMemoryFree(conn); taosMemoryFree((uv_pipe_t *) handle); - - //clear the udf handles cache TODO move to other thread - uv_mutex_lock(&gUdfdProxy.udfStubsMutex); - taosArrayClear(gUdfdProxy.udfStubs); - uv_mutex_unlock(&gUdfdProxy.udfStubsMutex); } int32_t udfcGetUdfTaskResultFromUvTask(SClientUdfTask *task, SClientUvTaskNode *uvTask) { @@ -1283,7 +1281,7 @@ int32_t doSetupUdf(char udfName[], UdfcFuncHandle *funcHandle) { } SClientUdfTask *task = taosMemoryCalloc(1,sizeof(SClientUdfTask)); task->errCode = 0; - task->session = taosMemoryCalloc(1, sizeof(SClientUdfUvSession)); + task->session = taosMemoryCalloc(1, sizeof(SUdfcUvSession)); task->session->udfc = &gUdfdProxy; task->type = UDF_TASK_SETUP; @@ -1303,6 +1301,7 @@ int32_t doSetupUdf(char udfName[], UdfcFuncHandle *funcHandle) { task->session->outputType = rsp->outputType; task->session->outputLen = rsp->outputLen; task->session->bufSize = rsp->bufSize; + strcpy(task->session->udfName, udfName); if (task->errCode != 0) { fnError("failed to setup udf. udfname: %s, err: %d", udfName, task->errCode) } else { @@ -1317,14 +1316,14 @@ int32_t doSetupUdf(char udfName[], UdfcFuncHandle *funcHandle) { int32_t callUdf(UdfcFuncHandle handle, int8_t callType, SSDataBlock *input, SUdfInterBuf *state, SUdfInterBuf *state2, SSDataBlock* output, SUdfInterBuf *newState) { fnTrace("udfc call udf. callType: %d, funcHandle: %p", callType, handle); - SClientUdfUvSession *session = (SClientUdfUvSession *) handle; + SUdfcUvSession *session = (SUdfcUvSession *) handle; if (session->udfUvPipe == NULL) { fnError("No pipe to udfd"); return TSDB_CODE_UDF_PIPE_NO_PIPE; } SClientUdfTask *task = taosMemoryCalloc(1, sizeof(SClientUdfTask)); task->errCode = 0; - task->session = (SClientUdfUvSession *) handle; + task->session = (SUdfcUvSession *) handle; task->type = UDF_TASK_CALL; SUdfCallRequest *req = &task->_call.req; @@ -1440,7 +1439,7 @@ int compareUdfcFuncSub(const void* elem1, const void* elem2) { return strcmp(stub1->udfName, stub2->udfName); } -int32_t setupUdf(char* udfName, UdfcFuncHandle* pHandle) { +int32_t accquireUdfFuncHandle(char* udfName, UdfcFuncHandle* pHandle) { int32_t code = 0; uv_mutex_lock(&gUdfdProxy.udfStubsMutex); SUdfcFuncStub key = {0}; @@ -1449,6 +1448,7 @@ int32_t setupUdf(char* udfName, UdfcFuncHandle* pHandle) { if (foundStub != NULL) { uv_mutex_unlock(&gUdfdProxy.udfStubsMutex); *pHandle = foundStub->handle; + ++foundStub->refCount; return 0; } *pHandle = NULL; @@ -1457,6 +1457,7 @@ int32_t setupUdf(char* udfName, UdfcFuncHandle* pHandle) { SUdfcFuncStub stub = {0}; strcpy(stub.udfName, udfName); stub.handle = *pHandle; + ++stub.refCount; taosArrayPush(gUdfdProxy.udfStubs, &stub); taosArraySort(gUdfdProxy.udfStubs, compareUdfcFuncSub); } else { @@ -1467,13 +1468,25 @@ int32_t setupUdf(char* udfName, UdfcFuncHandle* pHandle) { return code; } +void releaseUdfFuncHandle(char* udfName) { + uv_mutex_lock(&gUdfdProxy.udfStubsMutex); + SUdfcFuncStub key = {0}; + strcpy(key.udfName, udfName); + SUdfcFuncStub *foundStub = taosArraySearch(gUdfdProxy.udfStubs, &key, compareUdfcFuncSub, TD_EQ); + ASSERT(foundStub); + --foundStub->refCount; + ASSERT(foundStub->refCount>=0); + uv_mutex_unlock(&gUdfdProxy.udfStubsMutex); +} + int32_t callUdfScalarFunc(char *udfName, SScalarParam *input, int32_t numOfCols, SScalarParam *output) { UdfcFuncHandle handle = NULL; - int32_t code = setupUdf(udfName, &handle); + int32_t code = accquireUdfFuncHandle(udfName, &handle); if (code != 0) { return code; } code = doCallUdfScalarFunc(handle, input, numOfCols, output); + releaseUdfFuncHandle(udfName); return code; } @@ -1481,7 +1494,7 @@ int32_t callUdfScalarFunc(char *udfName, SScalarParam *input, int32_t numOfCols, int32_t doTeardownUdf(UdfcFuncHandle handle) { fnInfo("tear down udf. udf func handle: %p", handle); - SClientUdfUvSession *session = (SClientUdfUvSession *) handle; + SUdfcUvSession *session = (SUdfcUvSession *) handle; if (session->udfUvPipe == NULL) { fnError("pipe to udfd does not exist"); return TSDB_CODE_UDF_PIPE_NO_PIPE; @@ -1511,7 +1524,7 @@ int32_t doTeardownUdf(UdfcFuncHandle handle) { //memory layout |---SUdfAggRes----|-----final result-----|---inter result----| typedef struct SUdfAggRes { - SClientUdfUvSession *session; + SUdfcUvSession *session; int8_t finalResNum; int8_t interResNum; char* finalResBuf; @@ -1532,11 +1545,11 @@ bool udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo* pResult } UdfcFuncHandle handle; int32_t udfCode = 0; - if ((udfCode = setupUdf((char *)pCtx->udfName, &handle)) != 0) { + if ((udfCode = accquireUdfFuncHandle((char *)pCtx->udfName, &handle)) != 0) { fnError("udfAggInit error. step doSetupUdf. udf code: %d", udfCode); return false; } - SClientUdfUvSession *session = (SClientUdfUvSession *)handle; + SUdfcUvSession *session = (SUdfcUvSession *)handle; SUdfAggRes *udfRes = (SUdfAggRes*)GET_ROWCELL_INTERBUF(pResultCellInfo); int32_t envSize = sizeof(SUdfAggRes) + session->outputLen + session->bufSize; memset(udfRes, 0, envSize); @@ -1544,7 +1557,7 @@ bool udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo* pResult udfRes->finalResBuf = (char*)udfRes + sizeof(SUdfAggRes); udfRes->interResBuf = (char*)udfRes + sizeof(SUdfAggRes) + session->outputLen; - udfRes->session = (SClientUdfUvSession *)handle; + udfRes->session = (SUdfcUvSession *)handle; SUdfInterBuf buf = {0}; if ((udfCode = doCallUdfAggInit(handle, &buf)) != 0) { fnError("udfAggInit error. step doCallUdfAggInit. udf code: %d", udfCode); @@ -1560,7 +1573,7 @@ int32_t udfAggProcess(struct SqlFunctionCtx *pCtx) { int32_t numOfCols = pInput->numOfInputCols; SUdfAggRes* udfRes = (SUdfAggRes *)GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); - SClientUdfUvSession *session = udfRes->session; + SUdfcUvSession *session = udfRes->session; if (session == NULL) { return TSDB_CODE_UDF_NO_FUNC_HANDLE; } @@ -1615,7 +1628,7 @@ int32_t udfAggProcess(struct SqlFunctionCtx *pCtx) { int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock* pBlock) { SUdfAggRes* udfRes = (SUdfAggRes *)GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); - SClientUdfUvSession *session = udfRes->session; + SUdfcUvSession *session = udfRes->session; if (session == NULL) { return TSDB_CODE_UDF_NO_FUNC_HANDLE; } @@ -1644,5 +1657,25 @@ int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock* pBlock) { // } int32_t numOfResults = functionFinalizeWithResultBuf(pCtx, pBlock, udfRes->finalResBuf); + releaseUdfFuncHandle(pCtx->udfName); return udfCallCode == 0 ? numOfResults : udfCallCode; +} + +int32_t teardownUdfs() { + uv_mutex_lock(&gUdfdProxy.udfStubsMutex); + int32_t i = 0; + SArray* udfStubs = taosArrayInit(16, sizeof(SUdfcFuncStub)); + while (i < taosArrayGetSize(gUdfdProxy.udfStubs)) { + SUdfcFuncStub *stub = taosArrayGet(gUdfdProxy.udfStubs, i); + if (stub->refCount == 0) { + doTeardownUdf(stub->handle); + } else { + taosArrayPush(udfStubs, stub); + } + ++i; + } + taosArrayDestroy(gUdfdProxy.udfStubs); + gUdfdProxy.udfStubs = udfStubs; + uv_mutex_unlock(&gUdfdProxy.udfStubsMutex); + return 0; } \ No newline at end of file -- GitLab