diff --git a/include/libs/function/function.h b/include/libs/function/function.h index 8d0b93dde2f79398f8526747131377a68fd71fab..616aec8c02d2f6f7f6b45bf11a1cd096e2408f25 100644 --- a/include/libs/function/function.h +++ b/include/libs/function/function.h @@ -309,7 +309,7 @@ void qAddUdfInfo(uint64_t id, struct SUdfInfo* pUdfInfo); void qRemoveUdfInfo(uint64_t id, struct SUdfInfo* pUdfInfo); /** - * create udfd proxy, called once in process that call setupUdf/callUdfxxx/teardownUdf + * create udfd proxy, called once in process that call doSetupUdf/callUdfxxx/doTeardownUdf * @return error code */ int32_t udfcOpen(); diff --git a/include/libs/function/tudf.h b/include/libs/function/tudf.h index b5c38e14f49f9ed84555d24ba3d5cd42c8ac6f5e..b37dcd2b6188c2a2c08eaf553d3d9ebbd91d5592 100644 --- a/include/libs/function/tudf.h +++ b/include/libs/function/tudf.h @@ -39,16 +39,6 @@ extern "C" { //====================================================================================== //begin API to taosd and qworker -typedef void *UdfcFuncHandle; - -/** - * setup udf - * @param udf, in - * @param handle, out - * @return error code - */ -int32_t setupUdf(char udfName[], UdfcFuncHandle *handle); - typedef struct SUdfColumnMeta { int16_t type; int32_t bytes; @@ -95,32 +85,42 @@ typedef struct SUdfInterBuf { char* buf; int8_t numOfResult; //zero or one } SUdfInterBuf; +typedef void *UdfcFuncHandle; +/** + * setup udf + * @param udf, in + * @param funcHandle, out + * @return error code + */ +int32_t doSetupUdf(char udfName[], UdfcFuncHandle *funcHandle); // output: interBuf -int32_t callUdfAggInit(UdfcFuncHandle handle, SUdfInterBuf *interBuf); +int32_t doCallUdfAggInit(UdfcFuncHandle handle, SUdfInterBuf *interBuf); // input: block, state // output: newState -int32_t callUdfAggProcess(UdfcFuncHandle handle, SSDataBlock *block, SUdfInterBuf *state, SUdfInterBuf *newState); +int32_t doCallUdfAggProcess(UdfcFuncHandle handle, SSDataBlock *block, SUdfInterBuf *state, SUdfInterBuf *newState); // input: interBuf // output: resultData -int32_t callUdfAggFinalize(UdfcFuncHandle handle, SUdfInterBuf *interBuf, SUdfInterBuf *resultData); +int32_t doCallUdfAggFinalize(UdfcFuncHandle handle, SUdfInterBuf *interBuf, SUdfInterBuf *resultData); // input: interbuf1, interbuf2 // output: resultBuf -int32_t callUdfAggMerge(UdfcFuncHandle handle, SUdfInterBuf *interBuf1, SUdfInterBuf *interBuf2, SUdfInterBuf *resultBuf); +int32_t doCallUdfAggMerge(UdfcFuncHandle handle, SUdfInterBuf *interBuf1, SUdfInterBuf *interBuf2, SUdfInterBuf *resultBuf); // input: block // output: resultData -int32_t callUdfScalarFunc(UdfcFuncHandle handle, SScalarParam *input, int32_t numOfCols, SScalarParam *output); +int32_t doCallUdfScalarFunc(UdfcFuncHandle handle, SScalarParam *input, int32_t numOfCols, SScalarParam *output); /** * tearn down udf * @param handle * @return */ -int32_t teardownUdf(UdfcFuncHandle handle); +int32_t doTeardownUdf(UdfcFuncHandle handle); bool udfAggGetEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); bool udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo* pResultCellInfo); int32_t udfAggProcess(struct SqlFunctionCtx *pCtx); int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock* pBlock); + +int32_t callUdfScalarFunc(char *udfName, SScalarParam *input, int32_t numOfCols, SScalarParam *output); // end API to taosd and qworker //============================================================================================================================= // begin API to UDF writer. diff --git a/source/libs/function/inc/udfc.h b/source/libs/function/inc/udfc.h index a693e476e83b05659509dc4718d5f27581ce2b52..f414c2b29e85089404e74f03fa4dcb875650d41f 100644 --- a/source/libs/function/inc/udfc.h +++ b/source/libs/function/inc/udfc.h @@ -43,7 +43,7 @@ int32_t setupUdf(SUdfInfo* udf, UdfcFuncHandle* handle); int32_t callUdf(UdfcFuncHandle handle, int8_t step, char *state, int32_t stateSize, SSDataBlock input, char **newstate, int32_t *newStateSize, SSDataBlock *output); -int32_t teardownUdf(UdfcFuncHandle handle); +int32_t doTeardownUdf(UdfcFuncHandle handle); typedef struct SUdfSetupRequest { char udfName[16]; // diff --git a/source/libs/function/src/tudf.c b/source/libs/function/src/tudf.c index d9e3ff0a5bc2796acfa9b613dae9dc4cdcfc7de3..f2a0b4ec2c11be47c76d9560816ff079da059ad5 100644 --- a/source/libs/function/src/tudf.c +++ b/source/libs/function/src/tudf.c @@ -310,6 +310,11 @@ enum { }; int64_t gUdfTaskSeqNum = 0; +typedef struct SUdfcFuncStub { + char udfName[TSDB_FUNC_NAME_LEN]; + UdfcFuncHandle handle; +} SUdfcFuncStub; + typedef struct SUdfcProxy { char udfdPipeName[PATH_MAX + UDF_LISTEN_PIPE_NAME_LEN + 2]; uv_barrier_t initBarrier; @@ -325,6 +330,9 @@ typedef struct SUdfcProxy { QUEUE taskQueue; QUEUE uvProcTaskQueue; + uv_mutex_t udfStubsMutex; + SArray* udfStubs; // SUdfcFuncStub + int8_t initialized; } SUdfcProxy; @@ -1222,6 +1230,8 @@ int32_t udfcOpen() { atomic_store_8(&proxy->udfcState, UDFC_STATE_READY); proxy->udfcState = UDFC_STATE_READY; uv_barrier_wait(&proxy->initBarrier); + uv_mutex_init(&proxy->udfStubsMutex); + proxy->udfStubs = taosArrayInit(8, sizeof(SUdfcFuncStub)); fnInfo("udfc initialized") return 0; } @@ -1238,6 +1248,8 @@ int32_t udfcClose() { uv_thread_join(&udfc->loopThread); uv_mutex_destroy(&udfc->taskQueueMutex); uv_barrier_destroy(&udfc->initBarrier); + taosArrayDestroy(udfc->udfStubs); + uv_mutex_destroy(&udfc->udfStubsMutex); udfc->udfcState = UDFC_STATE_INITAL; fnInfo("udfc cleaned up"); return 0; @@ -1259,7 +1271,7 @@ int32_t udfcRunUdfUvTask(SClientUdfTask *task, int8_t uvTaskType) { return task->errCode; } -int32_t setupUdf(char udfName[], UdfcFuncHandle *funcHandle) { +int32_t doSetupUdf(char udfName[], UdfcFuncHandle *funcHandle) { fnInfo("udfc setup udf. udfName: %s", udfName); if (gUdfdProxy.udfcState != UDFC_STATE_READY) { return TSDB_CODE_UDF_INVALID_STATE; @@ -1287,7 +1299,7 @@ int32_t setupUdf(char udfName[], UdfcFuncHandle *funcHandle) { task->session->outputLen = rsp->outputLen; task->session->bufSize = rsp->bufSize; if (task->errCode != 0) { - fnError("failed to setup udf. err: %d", task->errCode) + fnError("failed to setup udf. udfname: %s, err: %d", udfName, task->errCode) } else { fnInfo("sucessfully setup udf func handle. handle: %p", task->session); *funcHandle = task->session; @@ -1373,7 +1385,7 @@ int32_t callUdf(UdfcFuncHandle handle, int8_t callType, SSDataBlock *input, SUdf return err; } -int32_t callUdfAggInit(UdfcFuncHandle handle, SUdfInterBuf *interBuf) { +int32_t doCallUdfAggInit(UdfcFuncHandle handle, SUdfInterBuf *interBuf) { int8_t callType = TSDB_UDF_CALL_AGG_INIT; int32_t err = callUdf(handle, callType, NULL, NULL, NULL, NULL, interBuf); @@ -1383,7 +1395,7 @@ int32_t callUdfAggInit(UdfcFuncHandle handle, SUdfInterBuf *interBuf) { // input: block, state // output: interbuf, -int32_t callUdfAggProcess(UdfcFuncHandle handle, SSDataBlock *block, SUdfInterBuf *state, SUdfInterBuf *newState) { +int32_t doCallUdfAggProcess(UdfcFuncHandle handle, SSDataBlock *block, SUdfInterBuf *state, SUdfInterBuf *newState) { int8_t callType = TSDB_UDF_CALL_AGG_PROC; int32_t err = callUdf(handle, callType, block, state, NULL, NULL, newState); return err; @@ -1391,7 +1403,7 @@ int32_t callUdfAggProcess(UdfcFuncHandle handle, SSDataBlock *block, SUdfInterBu // input: interbuf1, interbuf2 // output: resultBuf -int32_t callUdfAggMerge(UdfcFuncHandle handle, SUdfInterBuf *interBuf1, SUdfInterBuf *interBuf2, SUdfInterBuf *resultBuf) { +int32_t doCallUdfAggMerge(UdfcFuncHandle handle, SUdfInterBuf *interBuf1, SUdfInterBuf *interBuf2, SUdfInterBuf *resultBuf) { int8_t callType = TSDB_UDF_CALL_AGG_MERGE; int32_t err = callUdf(handle, callType, NULL, interBuf1, interBuf2, NULL, resultBuf); return err; @@ -1399,13 +1411,13 @@ int32_t callUdfAggMerge(UdfcFuncHandle handle, SUdfInterBuf *interBuf1, SUdfInte // input: interBuf // output: resultData -int32_t callUdfAggFinalize(UdfcFuncHandle handle, SUdfInterBuf *interBuf, SUdfInterBuf *resultData) { +int32_t doCallUdfAggFinalize(UdfcFuncHandle handle, SUdfInterBuf *interBuf, SUdfInterBuf *resultData) { int8_t callType = TSDB_UDF_CALL_AGG_FIN; int32_t err = callUdf(handle, callType, NULL, interBuf, NULL, NULL, resultData); return err; } -int32_t callUdfScalarFunc(UdfcFuncHandle handle, SScalarParam *input, int32_t numOfCols, SScalarParam* output) { +int32_t doCallUdfScalarFunc(UdfcFuncHandle handle, SScalarParam *input, int32_t numOfCols, SScalarParam* output) { int8_t callType = TSDB_UDF_CALL_SCALA_PROC; SSDataBlock inputBlock = {0}; convertScalarParamToDataBlock(input, numOfCols, &inputBlock); @@ -1417,7 +1429,50 @@ int32_t callUdfScalarFunc(UdfcFuncHandle handle, SScalarParam *input, int32_t nu return err; } -int32_t teardownUdf(UdfcFuncHandle handle) { +int compareUdfcFuncSub(const void* elem1, const void* elem2) { + SUdfcFuncStub *stub1 = (SUdfcFuncStub *)elem1; + SUdfcFuncStub *stub2 = (SUdfcFuncStub *)elem2; + return strcmp(stub1->udfName, stub2->udfName); +} + +int32_t setupUdf(char* udfName, UdfcFuncHandle* pHandle) { + int32_t code = 0; + uv_mutex_lock(&gUdfdProxy.udfStubsMutex); + SUdfcFuncStub key = {0}; + strcpy(key.udfName, udfName); + SUdfcFuncStub *foundStub = taosArraySearch(gUdfdProxy.udfStubs, &key, compareUdfcFuncSub, TD_EQ); + if (foundStub != NULL) { + uv_mutex_unlock(&gUdfdProxy.udfStubsMutex); + *pHandle = foundStub->handle; + return 0; + } + *pHandle = NULL; + code = doSetupUdf(udfName, pHandle); + if (code == TSDB_CODE_SUCCESS) { + SUdfcFuncStub stub = {0}; + strcpy(stub.udfName, udfName); + stub.handle = *pHandle; + taosArrayPush(gUdfdProxy.udfStubs, &stub); + taosArraySort(gUdfdProxy.udfStubs, compareUdfcFuncSub); + } else { + *pHandle = NULL; + } + + uv_mutex_unlock(&gUdfdProxy.udfStubsMutex); + return code; +} + +int32_t callUdfScalarFunc(char *udfName, SScalarParam *input, int32_t numOfCols, SScalarParam *output) { + UdfcFuncHandle handle = NULL; + int32_t code = setupUdf(udfName, &handle); + if (code != 0) { + return code; + } + code = doCallUdfScalarFunc(handle, input, numOfCols, output); + return code; +} + +int32_t doTeardownUdf(UdfcFuncHandle handle) { fnInfo("tear down udf. udf func handle: %p", handle); SClientUdfUvSession *session = (SClientUdfUvSession *) handle; @@ -1471,8 +1526,8 @@ bool udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo* pResult } UdfcFuncHandle handle; int32_t udfCode = 0; - if ((udfCode = setupUdf((char*)pCtx->udfName, &handle)) != 0) { - fnError("udfAggInit error. step setupUdf. udf code: %d", udfCode); + if ((udfCode = setupUdf((char *)pCtx->udfName, &handle)) != 0) { + fnError("udfAggInit error. step doSetupUdf. udf code: %d", udfCode); return false; } SClientUdfUvSession *session = (SClientUdfUvSession *)handle; @@ -1485,8 +1540,8 @@ bool udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo* pResult udfRes->session = (SClientUdfUvSession *)handle; SUdfInterBuf buf = {0}; - if ((udfCode = callUdfAggInit(handle, &buf)) != 0) { - fnError("udfAggInit error. step callUdfAggInit. udf code: %d", udfCode); + if ((udfCode = doCallUdfAggInit(handle, &buf)) != 0) { + fnError("udfAggInit error. step doCallUdfAggInit. udf code: %d", udfCode); return false; } udfRes->interResNum = buf.numOfResult; @@ -1533,7 +1588,7 @@ int32_t udfAggProcess(struct SqlFunctionCtx *pCtx) { .numOfResult = udfRes->interResNum}; SUdfInterBuf newState = {0}; - int32_t udfCode = callUdfAggProcess(session, inputBlock, &state, &newState); + int32_t udfCode = doCallUdfAggProcess(session, inputBlock, &state, &newState); if (udfCode != 0) { fnError("udfAggProcess error. code: %d", udfCode); newState.numOfResult = 0; @@ -1567,9 +1622,9 @@ int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock* pBlock) { .bufLen = session->bufSize, .numOfResult = udfRes->interResNum}; int32_t udfCallCode= 0; - udfCallCode= callUdfAggFinalize(session, &state, &resultBuf); - if (udfCallCode!= 0) { - fnError("udfAggFinalize error. callUdfAggFinalize step. udf code:%d", udfCallCode); + udfCallCode= doCallUdfAggFinalize(session, &state, &resultBuf); + if (udfCallCode != 0) { + fnError("udfAggFinalize error. doCallUdfAggFinalize step. udf code:%d", udfCallCode); GET_RES_INFO(pCtx)->numOfRes = 0; } else { memcpy(udfRes->finalResBuf, resultBuf.buf, session->outputLen); @@ -1577,11 +1632,11 @@ int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock* pBlock) { GET_RES_INFO(pCtx)->numOfRes = udfRes->finalResNum; } - int32_t code = teardownUdf(session); - if (code != 0) { - fnError("udfAggFinalize error. teardownUdf step. udf code: %d", code); - } - - return functionFinalizeWithResultBuf(pCtx, pBlock, udfRes->finalResBuf); +// int32_t code = doTeardownUdf(session); +// if (code != 0) { +// fnError("udfAggFinalize error. doTeardownUdf step. udf code: %d", code); +// } + int32_t numOfResults = functionFinalizeWithResultBuf(pCtx, pBlock, udfRes->finalResBuf); + return udfCallCode == 0 ? numOfResults : udfCallCode; } \ No newline at end of file diff --git a/source/libs/function/test/runUdf.c b/source/libs/function/test/runUdf.c index a8d6fbd7152a76f13d06fea6ef42e13acc57e25c..d7c539e5c2ce857e105ee86554bfea3aa671c256 100644 --- a/source/libs/function/test/runUdf.c +++ b/source/libs/function/test/runUdf.c @@ -47,7 +47,7 @@ int main(int argc, char *argv[]) { UdfcFuncHandle handle; - setupUdf("udf1", &handle); + doSetupUdf("udf1", &handle); SSDataBlock block = {0}; SSDataBlock *pBlock = █ @@ -73,12 +73,12 @@ int main(int argc, char *argv[]) { input.numOfRows = pBlock->info.rows; input.columnData = taosArrayGet(pBlock->pDataBlock, 0); SScalarParam output = {0}; - callUdfScalarFunc(handle, &input, 1, &output); + doCallUdfScalarFunc(handle, &input, 1, &output); SColumnInfoData *col = output.columnData; for (int32_t i = 0; i < output.numOfRows; ++i) { fprintf(stderr, "%d\t%d\n", i, *(int32_t *)(col->pData + i * sizeof(int32_t))); } - teardownUdf(handle); + doTeardownUdf(handle); udfcClose(); } diff --git a/source/libs/scalar/src/scalar.c b/source/libs/scalar/src/scalar.c index 6c8326b09fa89b7feb97eb58d952b915f4746ad6..7e3dbaf7d02595b04ea6a744660579e223ba29fc 100644 --- a/source/libs/scalar/src/scalar.c +++ b/source/libs/scalar/src/scalar.c @@ -362,19 +362,7 @@ int32_t sclExecFunction(SFunctionNode *node, SScalarCtx *ctx, SScalarParam *outp SCL_ERR_RET(sclInitParamList(¶ms, node->pParameterList, ctx, ¶mNum, &rowNum)); if (fmIsUserDefinedFunc(node->funcId)) { - UdfcFuncHandle udfHandle = NULL; - - code = setupUdf(node->functionName, &udfHandle); - if (code != 0) { - sclError("fmExecFunction error. setupUdf. function name: %s, code:%d", node->functionName, code); - goto _return; - } - code = callUdfScalarFunc(udfHandle, params, paramNum, output); - if (code != 0) { - sclError("fmExecFunction error. callUdfScalarFunc. function name: %s, udf code:%d", node->functionName, code); - goto _return; - } - code = teardownUdf(udfHandle); + code = callUdfScalarFunc(node->functionName, params, paramNum, output); if (code != 0) { sclError("fmExecFunction error. callUdfScalarFunc. function name: %s, udf code:%d", node->functionName, code); goto _return;