diff --git a/include/util/taoserror.h b/include/util/taoserror.h index b73d39090bfe2be5d8cecce0b6e5761bea7239ca..a0d7b59cf9289851b192acd68c229869a89e7b8c 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -657,6 +657,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_UDF_LOAD_UDF_FAILURE TAOS_DEF_ERROR_CODE(0, 0x2905) #define TSDB_CODE_UDF_INVALID_STATE TAOS_DEF_ERROR_CODE(0, 0x2906) #define TSDB_CODE_UDF_INVALID_INPUT TAOS_DEF_ERROR_CODE(0, 0x2907) +#define TSDB_CODE_UDF_NO_FUNC_HANDLE TAOS_DEF_ERROR_CODE(0, 0x2908) #define TSDB_CODE_SML_INVALID_PROTOCOL_TYPE TAOS_DEF_ERROR_CODE(0, 0x3000) #define TSDB_CODE_SML_INVALID_PRECISION_TYPE TAOS_DEF_ERROR_CODE(0, 0x3001) diff --git a/source/libs/function/src/tudf.c b/source/libs/function/src/tudf.c index 3a388d1c07d5b6d797fef1b8151a1ef6d4443120..d9e3ff0a5bc2796acfa9b613dae9dc4cdcfc7de3 100644 --- a/source/libs/function/src/tudf.c +++ b/source/libs/function/src/tudf.c @@ -1072,6 +1072,8 @@ int32_t udfcQueueUvTask(SClientUvTaskNode *uvTask) { int32_t udfcStartUvTask(SClientUvTaskNode *uvTask) { fnTrace("event loop start uv task. task: %d, %p", uvTask->type, uvTask); + int32_t code = 0; + switch (uvTask->type) { case UV_TASK_CONNECT: { uv_pipe_t *pipe = taosMemoryMalloc(sizeof(uv_pipe_t)); @@ -1091,22 +1093,34 @@ int32_t udfcStartUvTask(SClientUvTaskNode *uvTask) { uv_connect_t *connReq = taosMemoryMalloc(sizeof(uv_connect_t)); connReq->data = uvTask; uv_pipe_connect(connReq, pipe, uvTask->udfc->udfdPipeName, onUdfcPipeConnect); + code = 0; break; } case UV_TASK_REQ_RSP: { uv_pipe_t *pipe = uvTask->pipe; - uv_write_t *write = taosMemoryMalloc(sizeof(uv_write_t)); - write->data = uvTask; - int err = uv_write(write, (uv_stream_t *)pipe, &uvTask->reqBuf, 1, onUdfcPipetWrite); - if (err != 0) { - fnError("udfc event loop start req/rsp task uv_write failed. code: %s", uv_strerror(err)); + if (pipe == NULL) { + code = TSDB_CODE_UDF_PIPE_NO_PIPE; + } else { + uv_write_t *write = taosMemoryMalloc(sizeof(uv_write_t)); + write->data = uvTask; + int err = uv_write(write, (uv_stream_t *)pipe, &uvTask->reqBuf, 1, onUdfcPipetWrite); + if (err != 0) { + fnError("udfc event loop start req/rsp task uv_write failed. code: %s", uv_strerror(err)); + } + code = err; } break; } case UV_TASK_DISCONNECT: { - SClientUvConn *conn = uvTask->pipe->data; - QUEUE_INSERT_TAIL(&conn->taskQueue, &uvTask->connTaskQueue); - uv_close((uv_handle_t *) uvTask->pipe, onUdfcPipeClose); + uv_pipe_t *pipe = uvTask->pipe; + if (pipe == NULL) { + code = TSDB_CODE_UDF_PIPE_NO_PIPE; + } else { + SClientUvConn *conn = pipe->data; + QUEUE_INSERT_TAIL(&conn->taskQueue, &uvTask->connTaskQueue); + uv_close((uv_handle_t *)uvTask->pipe, onUdfcPipeClose); + code = 0; + } break; } default: { @@ -1115,7 +1129,7 @@ int32_t udfcStartUvTask(SClientUvTaskNode *uvTask) { } } - return 0; + return code; } void udfClientAsyncCb(uv_async_t *async) { @@ -1133,6 +1147,9 @@ void udfClientAsyncCb(uv_async_t *async) { int32_t code = udfcStartUvTask(task); if (code == 0) { QUEUE_INSERT_TAIL(&udfc->uvProcTaskQueue, &task->procTaskQueue); + } else { + task->errCode = code; + uv_sem_post(&task->taskSem); } } @@ -1483,6 +1500,9 @@ int32_t udfAggProcess(struct SqlFunctionCtx *pCtx) { SUdfAggRes* udfRes = (SUdfAggRes *)GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); SClientUdfUvSession *session = udfRes->session; + if (session == NULL) { + return TSDB_CODE_UDF_NO_FUNC_HANDLE; + } udfRes->finalResBuf = (char*)udfRes + sizeof(SUdfAggRes); udfRes->interResBuf = (char*)udfRes + sizeof(SUdfAggRes) + session->outputLen; @@ -1535,6 +1555,9 @@ int32_t udfAggProcess(struct SqlFunctionCtx *pCtx) { int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock* pBlock) { SUdfAggRes* udfRes = (SUdfAggRes *)GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); SClientUdfUvSession *session = udfRes->session; + if (session == NULL) { + return TSDB_CODE_UDF_NO_FUNC_HANDLE; + } udfRes->finalResBuf = (char*)udfRes + sizeof(SUdfAggRes); udfRes->interResBuf = (char*)udfRes + sizeof(SUdfAggRes) + session->outputLen; diff --git a/source/libs/scalar/inc/sclInt.h b/source/libs/scalar/inc/sclInt.h index 99e61ad1db7b45725177255087ebc19a667c23e0..9dbfeceb5940d4237ead01ff445529c2d7d447ac 100644 --- a/source/libs/scalar/inc/sclInt.h +++ b/source/libs/scalar/inc/sclInt.h @@ -27,13 +27,11 @@ typedef struct SScalarCtx { SArray *pBlockList; /* element is SSDataBlock* */ SHashObj *pRes; /* element is SScalarParam */ void *param; // additional parameter (meta actually) for acquire value such as tbname/tags values - SHashObj *udf2Handle; } SScalarCtx; #define SCL_DATA_TYPE_DUMMY_HASH 9000 #define SCL_DEFAULT_OP_NUM 10 -#define SCL_DEFAULT_UDF_NUM 8 #define SCL_IS_CONST_NODE(_node) ((NULL == (_node)) || (QUERY_NODE_VALUE == (_node)->type) || (QUERY_NODE_NODE_LIST == (_node)->type)) #define SCL_IS_CONST_CALC(_ctx) (NULL == (_ctx)->pBlockList) diff --git a/source/libs/scalar/src/scalar.c b/source/libs/scalar/src/scalar.c index 8f4a9b96985afe52e6733d7a44683e30da878aa1..6c8326b09fa89b7feb97eb58d952b915f4746ad6 100644 --- a/source/libs/scalar/src/scalar.c +++ b/source/libs/scalar/src/scalar.c @@ -154,18 +154,6 @@ void sclFreeRes(SHashObj *res) { taosHashCleanup(res); } -void sclFreeUdfHandles(SHashObj *udf2handle) { - void *pIter = taosHashIterate(udf2handle, NULL); - while (pIter) { - UdfcFuncHandle *handle = (UdfcFuncHandle *)pIter; - if (handle) { - teardownUdf(*handle); - } - pIter = taosHashIterate(udf2handle, pIter); - } - taosHashCleanup(udf2handle); -} - void sclFreeParam(SScalarParam *param) { if (param->columnData != NULL) { colDataDestroy(param->columnData); @@ -375,28 +363,22 @@ int32_t sclExecFunction(SFunctionNode *node, SScalarCtx *ctx, SScalarParam *outp if (fmIsUserDefinedFunc(node->funcId)) { UdfcFuncHandle udfHandle = NULL; - char* udfName = node->functionName; - if (ctx->udf2Handle) { - UdfcFuncHandle *pHandle = taosHashGet(ctx->udf2Handle, udfName, strlen(udfName)); - if (pHandle) { - udfHandle = *pHandle; - } - } - if (udfHandle == NULL) { - code = setupUdf(udfName, &udfHandle); - if (code != 0) { - sclError("fmExecFunction error. setupUdf. function name: %s, code:%d", udfName, code); - goto _return; - } - if (ctx->udf2Handle) { - taosHashPut(ctx->udf2Handle, udfName, strlen(udfName), &udfHandle, sizeof(UdfcFuncHandle)); - } + + code = setupUdf(node->functionName, &udfHandle); + if (code != 0) { + sclError("fmExecFunction error. setupUdf. function name: %s, code:%d", node->functionName, code); + goto _return; } code = callUdfScalarFunc(udfHandle, params, paramNum, output); if (code != 0) { sclError("fmExecFunction error. callUdfScalarFunc. function name: %s, udf code:%d", node->functionName, code); goto _return; } + code = teardownUdf(udfHandle); + if (code != 0) { + sclError("fmExecFunction error. callUdfScalarFunc. function name: %s, udf code:%d", node->functionName, code); + goto _return; + } } else { SScalarFuncExecFuncs ffpSet = {0}; code = fmGetScalarFuncExecFuncs(node->funcId, &ffpSet); @@ -910,20 +892,15 @@ int32_t scalarCalculateConstants(SNode *pNode, SNode **pRes) { SScalarCtx ctx = {0}; ctx.pRes = taosHashInit(SCL_DEFAULT_OP_NUM, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); if (NULL == ctx.pRes) { - sclError("taosHashInit result map failed, num:%d", SCL_DEFAULT_OP_NUM); - SCL_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); - } - ctx.udf2Handle = taosHashInit(SCL_DEFAULT_UDF_NUM, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); - if (NULL == ctx.udf2Handle) { - sclError("taosHashInit udf to handle map failed, num:%d", SCL_DEFAULT_OP_NUM); + sclError("taosHashInit failed, num:%d", SCL_DEFAULT_OP_NUM); SCL_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } + nodesRewriteExprPostOrder(&pNode, sclConstantsRewriter, (void *)&ctx); SCL_ERR_JRET(ctx.code); *pRes = pNode; _return: - sclFreeUdfHandles(ctx.udf2Handle); sclFreeRes(ctx.pRes); return code; } @@ -939,14 +916,10 @@ int32_t scalarCalculate(SNode *pNode, SArray *pBlockList, SScalarParam *pDst) { // TODO: OPT performance ctx.pRes = taosHashInit(SCL_DEFAULT_OP_NUM, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); if (NULL == ctx.pRes) { - sclError("taosHashInit result map failed, num:%d", SCL_DEFAULT_OP_NUM); - SCL_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); - } - ctx.udf2Handle = taosHashInit(SCL_DEFAULT_UDF_NUM, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); - if (NULL == ctx.udf2Handle) { - sclError("taosHashInit udf to handle map failed, num:%d", SCL_DEFAULT_OP_NUM); + sclError("taosHashInit failed, num:%d", SCL_DEFAULT_OP_NUM); SCL_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } + nodesWalkExprPostOrder(pNode, sclCalcWalker, (void *)&ctx); SCL_ERR_JRET(ctx.code); @@ -964,7 +937,6 @@ int32_t scalarCalculate(SNode *pNode, SArray *pBlockList, SScalarParam *pDst) { _return: //nodesDestroyNode(pNode); - sclFreeUdfHandles(ctx.udf2Handle); sclFreeRes(ctx.pRes); return code; } diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 6c0a9b132480944c082c4c7e0e8e4abd9ac4727d..a2b0648a4b78b66aca74f548fafe7ee2162fd30d 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -463,7 +463,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_UDF_PIPE_NO_PIPE, "udf no pipe") TAOS_DEFINE_ERROR(TSDB_CODE_UDF_LOAD_UDF_FAILURE, "udf load failure") TAOS_DEFINE_ERROR(TSDB_CODE_UDF_INVALID_STATE, "udf invalid state") TAOS_DEFINE_ERROR(TSDB_CODE_UDF_INVALID_INPUT, "udf invalid function input") - +TAOS_DEFINE_ERROR(TSDB_CODE_UDF_NO_FUNC_HANDLE, "udf no function handle") //schemaless TAOS_DEFINE_ERROR(TSDB_CODE_SML_INVALID_PROTOCOL_TYPE, "Invalid line protocol type") TAOS_DEFINE_ERROR(TSDB_CODE_SML_INVALID_PRECISION_TYPE, "Invalid timestamp precision type")