From 89271550133c6e0ae50feade6c686ffa200b8033 Mon Sep 17 00:00:00 2001 From: "slzhou@taodata.com" Date: Thu, 12 May 2022 16:49:31 +0800 Subject: [PATCH] feat:add udf dedicated errors --- include/util/taoserror.h | 1 + source/common/src/tglobal.c | 4 +- source/libs/function/src/tudf.c | 2 +- source/libs/function/src/udfd.c | 338 ++++++++++++++++--------------- source/libs/function/test/udf2.c | 6 + source/util/src/terror.c | 1 + 6 files changed, 190 insertions(+), 162 deletions(-) diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 58cdd2cab4..238600160a 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -654,6 +654,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_UDF_PIPE_NO_PIPE TAOS_DEF_ERROR_CODE(0, 0x2904) #define TSDB_CODE_UDF_LOAD_UDF_FAILURE TAOS_DEF_ERROR_CODE(0, 0x2905) #define TSDB_CODE_UDF_INVALID_STATE TAOS_DEF_ERROR_CODE(0, 0x2906) +#define TSDB_CODE_UDF_INVALID_INPUT TAOS_DEF_ERROR_CODE(0, 0x2907) #define TSDB_CODE_SML_INVALID_PROTOCOL_TYPE TAOS_DEF_ERROR_CODE(0, 0x3000) #define TSDB_CODE_SML_INVALID_PRECISION_TYPE TAOS_DEF_ERROR_CODE(0, 0x3001) diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index f73a982110..42c8e18f3e 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -444,7 +444,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { if (cfgAddInt32(pCfg, "transPullupInterval", tsTransPullupInterval, 1, 10000, 1) != 0) return -1; if (cfgAddInt32(pCfg, "mqRebalanceInterval", tsMqRebalanceInterval, 1, 10000, 1) != 0) return -1; - if (cfgAddBool(pCfg, "startUdfd", tsStartUdfd, 0) != 0) return -1; + if (cfgAddBool(pCfg, "udf", tsStartUdfd, 0) != 0) return -1; return 0; } @@ -585,7 +585,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { tsTransPullupInterval = cfgGetItem(pCfg, "transPullupInterval")->i32; tsMqRebalanceInterval = cfgGetItem(pCfg, "mqRebalanceInterval")->i32; - tsStartUdfd = cfgGetItem(pCfg, "startUdfd")->bval; + tsStartUdfd = cfgGetItem(pCfg, "udf")->bval; if (tsQueryBufferSize >= 0) { tsQueryBufferSizeBytes = tsQueryBufferSize * 1048576UL; diff --git a/source/libs/function/src/tudf.c b/source/libs/function/src/tudf.c index dfa7fac15a..8e96a2a063 100644 --- a/source/libs/function/src/tudf.c +++ b/source/libs/function/src/tudf.c @@ -1497,7 +1497,7 @@ int32_t udfAggProcess(struct SqlFunctionCtx *pCtx) { taosArrayDestroy(tempBlock.pDataBlock); taosMemoryFree(newState.buf); - return TSDB_CODE_SUCCESS; + return udfCode; } int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock* pBlock) { diff --git a/source/libs/function/src/udfd.c b/source/libs/function/src/udfd.c index f006695f14..34681dc6cd 100644 --- a/source/libs/function/src/udfd.c +++ b/source/libs/function/src/udfd.c @@ -140,6 +140,182 @@ int32_t udfdLoadUdf(char *udfName, SUdf *udf) { return 0; } +void udfdProcessSetupRequest(SUvUdfWork* uvUdf, SUdfRequest* request) { + // TODO: tracable id from client. connect, setup, call, teardown + fnInfo("%" PRId64 " setup request. udf name: %s", request->seqNum, request->setup.udfName); + SUdfSetupRequest *setup = &request->setup; + int32_t code = TSDB_CODE_SUCCESS; + SUdf *udf = NULL; + uv_mutex_lock(&global.udfsMutex); + SUdf **udfInHash = taosHashGet(global.udfsHash, request->setup.udfName, strlen(request->setup.udfName)); + if (udfInHash) { + ++(*udfInHash)->refCount; + udf = *udfInHash; + uv_mutex_unlock(&global.udfsMutex); + } else { + SUdf *udfNew = taosMemoryCalloc(1, sizeof(SUdf)); + udfNew->refCount = 1; + udfNew->state = UDF_STATE_INIT; + + uv_mutex_init(&udfNew->lock); + uv_cond_init(&udfNew->condReady); + udf = udfNew; + taosHashPut(global.udfsHash, request->setup.udfName, strlen(request->setup.udfName), &udfNew, sizeof(&udfNew)); + uv_mutex_unlock(&global.udfsMutex); + } + + uv_mutex_lock(&udf->lock); + if (udf->state == UDF_STATE_INIT) { + udf->state = UDF_STATE_LOADING; + code = udfdLoadUdf(setup->udfName, udf); + if (udf->initFunc) { + udf->initFunc(); + } + udf->state = UDF_STATE_READY; + uv_cond_broadcast(&udf->condReady); + uv_mutex_unlock(&udf->lock); + } else { + while (udf->state != UDF_STATE_READY) { + uv_cond_wait(&udf->condReady, &udf->lock); + } + uv_mutex_unlock(&udf->lock); + } + SUdfcFuncHandle *handle = taosMemoryMalloc(sizeof(SUdfcFuncHandle)); + handle->udf = udf; + + SUdfResponse rsp; + rsp.seqNum = request->seqNum; + rsp.type = request->type; + rsp.code = code; + rsp.setupRsp.udfHandle = (int64_t)(handle); + rsp.setupRsp.outputType = udf->outputType; + rsp.setupRsp.outputLen = udf->outputLen; + rsp.setupRsp.bufSize = udf->bufSize; + + int32_t len = encodeUdfResponse(NULL, &rsp); + rsp.msgLen = len; + void *bufBegin = taosMemoryMalloc(len); + void *buf = bufBegin; + encodeUdfResponse(&buf, &rsp); + + uvUdf->output = uv_buf_init(bufBegin, len); + + taosMemoryFree(uvUdf->input.base); + return; +} + +void udfdProcessCallRequest(SUvUdfWork *uvUdf, SUdfRequest *request) { + SUdfCallRequest *call = &request->call; + fnDebug("%" PRId64 "call request. call type %d, handle: %" PRIx64, request->seqNum, call->callType, + call->udfHandle); + SUdfcFuncHandle *handle = (SUdfcFuncHandle *)(call->udfHandle); + SUdf *udf = handle->udf; + SUdfResponse response = {0}; + SUdfResponse *rsp = &response; + SUdfCallResponse *subRsp = &rsp->callRsp; + + int32_t code = TSDB_CODE_SUCCESS; + switch(call->callType) { + case TSDB_UDF_CALL_SCALA_PROC: { + SUdfColumn output = {0}; + + SUdfDataBlock input = {0}; + convertDataBlockToUdfDataBlock(&call->block, &input); + code = udf->scalarProcFunc(&input, &output); + + convertUdfColumnToDataBlock(&output, &response.callRsp.resultData); + freeUdfColumn(&output); + break; + } + case TSDB_UDF_CALL_AGG_INIT: { + SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize), + .bufLen= udf->bufSize, + .numOfResult = 0}; + udf->aggStartFunc(&outBuf); + subRsp->resultBuf = outBuf; + break; + } + case TSDB_UDF_CALL_AGG_PROC: { + SUdfDataBlock input = {0}; + convertDataBlockToUdfDataBlock(&call->block, &input); + SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize), + .bufLen= udf->bufSize, + .numOfResult = 0}; + code = udf->aggProcFunc(&input, &call->interBuf, &outBuf); + subRsp->resultBuf = outBuf; + + break; + } + case TSDB_UDF_CALL_AGG_FIN: { + SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize), + .bufLen= udf->bufSize, + .numOfResult = 0}; + code = udf->aggFinishFunc(&call->interBuf, &outBuf); + subRsp->resultBuf = outBuf; + break; + } + default: + break; + } + + rsp->seqNum = request->seqNum; + rsp->type = request->type; + rsp->code = code; + subRsp->callType = call->callType; + + int32_t len = encodeUdfResponse(NULL, rsp); + rsp->msgLen = len; + void *bufBegin = taosMemoryMalloc(len); + void *buf = bufBegin; + encodeUdfResponse(&buf, rsp); + uvUdf->output = uv_buf_init(bufBegin, len); + + taosMemoryFree(uvUdf->input.base); + return; +} + +void udfdProcessTeardownRequest(SUvUdfWork* uvUdf, SUdfRequest* request) { + SUdfTeardownRequest *teardown = &request->teardown; + fnInfo("teardown. %" PRId64 "handle:%" PRIx64, request->seqNum, teardown->udfHandle); + SUdfcFuncHandle *handle = (SUdfcFuncHandle *)(teardown->udfHandle); + SUdf *udf = handle->udf; + bool unloadUdf = false; + int32_t code = TSDB_CODE_SUCCESS; + + uv_mutex_lock(&global.udfsMutex); + udf->refCount--; + if (udf->refCount == 0) { + unloadUdf = true; + taosHashRemove(global.udfsHash, udf->name, strlen(udf->name)); + } + uv_mutex_unlock(&global.udfsMutex); + if (unloadUdf) { + uv_cond_destroy(&udf->condReady); + uv_mutex_destroy(&udf->lock); + if (udf->destroyFunc) { + (udf->destroyFunc)(); + } + uv_dlclose(&udf->lib); + taosMemoryFree(udf); + } + taosMemoryFree(handle); + + SUdfResponse response; + SUdfResponse *rsp = &response; + rsp->seqNum = request->seqNum; + rsp->type = request->type; + rsp->code = code; + int32_t len = encodeUdfResponse(NULL, rsp); + rsp->msgLen = len; + void *bufBegin = taosMemoryMalloc(len); + void *buf = bufBegin; + encodeUdfResponse(&buf, rsp); + uvUdf->output = uv_buf_init(bufBegin, len); + + taosMemoryFree(uvUdf->input.base); + return; +} + void udfdProcessRequest(uv_work_t *req) { SUvUdfWork *uvUdf = (SUvUdfWork *)(req->data); SUdfRequest request = {0}; @@ -147,172 +323,16 @@ void udfdProcessRequest(uv_work_t *req) { switch (request.type) { case UDF_TASK_SETUP: { - // TODO: tracable id from client. connect, setup, call, teardown - fnInfo("%" PRId64 " setup request. udf name: %s", request.seqNum, request.setup.udfName); - SUdfSetupRequest *setup = &request.setup; - - SUdf *udf = NULL; - uv_mutex_lock(&global.udfsMutex); - SUdf **udfInHash = taosHashGet(global.udfsHash, request.setup.udfName, strlen(request.setup.udfName)); - if (udfInHash) { - ++(*udfInHash)->refCount; - udf = *udfInHash; - uv_mutex_unlock(&global.udfsMutex); - } else { - SUdf *udfNew = taosMemoryCalloc(1, sizeof(SUdf)); - udfNew->refCount = 1; - udfNew->state = UDF_STATE_INIT; - - uv_mutex_init(&udfNew->lock); - uv_cond_init(&udfNew->condReady); - udf = udfNew; - taosHashPut(global.udfsHash, request.setup.udfName, strlen(request.setup.udfName), &udfNew, sizeof(&udfNew)); - uv_mutex_unlock(&global.udfsMutex); - } - - uv_mutex_lock(&udf->lock); - if (udf->state == UDF_STATE_INIT) { - udf->state = UDF_STATE_LOADING; - udfdLoadUdf(setup->udfName, udf); - if (udf->initFunc) { - udf->initFunc(); - } - udf->state = UDF_STATE_READY; - uv_cond_broadcast(&udf->condReady); - uv_mutex_unlock(&udf->lock); - } else { - while (udf->state != UDF_STATE_READY) { - uv_cond_wait(&udf->condReady, &udf->lock); - } - uv_mutex_unlock(&udf->lock); - } - SUdfcFuncHandle *handle = taosMemoryMalloc(sizeof(SUdfcFuncHandle)); - handle->udf = udf; - SUdfResponse rsp; - rsp.seqNum = request.seqNum; - rsp.type = request.type; - rsp.code = 0; - rsp.setupRsp.udfHandle = (int64_t)(handle); - rsp.setupRsp.outputType = udf->outputType; - rsp.setupRsp.outputLen = udf->outputLen; - rsp.setupRsp.bufSize = udf->bufSize; - int32_t len = encodeUdfResponse(NULL, &rsp); - rsp.msgLen = len; - void *bufBegin = taosMemoryMalloc(len); - void *buf = bufBegin; - encodeUdfResponse(&buf, &rsp); - - uvUdf->output = uv_buf_init(bufBegin, len); - - taosMemoryFree(uvUdf->input.base); + udfdProcessSetupRequest(uvUdf, &request); break; } case UDF_TASK_CALL: { - SUdfCallRequest *call = &request.call; - fnDebug("%" PRId64 "call request. call type %d, handle: %" PRIx64, request.seqNum, call->callType, - call->udfHandle); - SUdfcFuncHandle *handle = (SUdfcFuncHandle *)(call->udfHandle); - SUdf *udf = handle->udf; - SUdfResponse response = {0}; - SUdfResponse *rsp = &response; - SUdfCallResponse *subRsp = &rsp->callRsp; - - switch(call->callType) { - case TSDB_UDF_CALL_SCALA_PROC: { - SUdfColumn output = {0}; - - SUdfDataBlock input = {0}; - convertDataBlockToUdfDataBlock(&call->block, &input); - udf->scalarProcFunc(&input, &output); - - convertUdfColumnToDataBlock(&output, &response.callRsp.resultData); - freeUdfColumn(&output); - break; - } - case TSDB_UDF_CALL_AGG_INIT: { - SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize), - .bufLen= udf->bufSize, - .numOfResult = 0}; - udf->aggStartFunc(&outBuf); - subRsp->resultBuf = outBuf; - break; - } - case TSDB_UDF_CALL_AGG_PROC: { - SUdfDataBlock input = {0}; - convertDataBlockToUdfDataBlock(&call->block, &input); - SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize), - .bufLen= udf->bufSize, - .numOfResult = 0}; - udf->aggProcFunc(&input, &call->interBuf, &outBuf); - subRsp->resultBuf = outBuf; - - break; - } - case TSDB_UDF_CALL_AGG_FIN: { - SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize), - .bufLen= udf->bufSize, - .numOfResult = 0}; - udf->aggFinishFunc(&call->interBuf, &outBuf); - subRsp->resultBuf = outBuf; - break; - } - default: - break; - } - - rsp->seqNum = request.seqNum; - rsp->type = request.type; - rsp->code = 0; - subRsp->callType = call->callType; - - int32_t len = encodeUdfResponse(NULL, rsp); - rsp->msgLen = len; - void *bufBegin = taosMemoryMalloc(len); - void *buf = bufBegin; - encodeUdfResponse(&buf, rsp); - uvUdf->output = uv_buf_init(bufBegin, len); - - taosMemoryFree(uvUdf->input.base); + udfdProcessCallRequest(uvUdf, &request); break; } case UDF_TASK_TEARDOWN: { - SUdfTeardownRequest *teardown = &request.teardown; - fnInfo("teardown. %" PRId64 "handle:%" PRIx64, request.seqNum, teardown->udfHandle) SUdfcFuncHandle *handle = - (SUdfcFuncHandle *)(teardown->udfHandle); - SUdf *udf = handle->udf; - bool unloadUdf = false; - uv_mutex_lock(&global.udfsMutex); - udf->refCount--; - if (udf->refCount == 0) { - unloadUdf = true; - taosHashRemove(global.udfsHash, udf->name, strlen(udf->name)); - } - uv_mutex_unlock(&global.udfsMutex); - if (unloadUdf) { - uv_cond_destroy(&udf->condReady); - uv_mutex_destroy(&udf->lock); - if (udf->destroyFunc) { - (udf->destroyFunc)(); - } - uv_dlclose(&udf->lib); - taosMemoryFree(udf); - } - taosMemoryFree(handle); - - SUdfResponse response; - SUdfResponse *rsp = &response; - rsp->seqNum = request.seqNum; - rsp->type = request.type; - rsp->code = 0; - int32_t len = encodeUdfResponse(NULL, rsp); - rsp->msgLen = len; - void *bufBegin = taosMemoryMalloc(len); - void *buf = bufBegin; - encodeUdfResponse(&buf, rsp); - uvUdf->output = uv_buf_init(bufBegin, len); - - taosMemoryFree(uvUdf->input.base); + udfdProcessTeardownRequest(uvUdf, &request); break; } default: { diff --git a/source/libs/function/test/udf2.c b/source/libs/function/test/udf2.c index be485bc905..b3b60f93a4 100644 --- a/source/libs/function/test/udf2.c +++ b/source/libs/function/test/udf2.c @@ -27,6 +27,12 @@ int32_t udf2_start(SUdfInterBuf *buf) { int32_t udf2(SUdfDataBlock* block, SUdfInterBuf *interBuf, SUdfInterBuf *newInterBuf) { int64_t sumSquares = *(int64_t*)interBuf->buf; int8_t numOutput = 0; + for (int32_t i = 0; i < block->numOfCols; ++i) { + SUdfColumn* col = block->udfCols[i]; + if (col->colMeta.type != TSDB_DATA_TYPE_INT) { + return TSDB_CODE_UDF_INVALID_INPUT; + } + } for (int32_t i = 0; i < block->numOfCols; ++i) { for (int32_t j = 0; j < block->numOfRows; ++j) { SUdfColumn* col = block->udfCols[i]; diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 9676e4e1f9..a1bc37e6cd 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -461,6 +461,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_UDF_PIPE_CONNECT_ERR, "udf pipe connect erro TAOS_DEFINE_ERROR(TSDB_CODE_UDF_PIPE_NO_PIPE, "udf no pipe") TAOS_DEFINE_ERROR(TSDB_CODE_UDF_LOAD_UDF_FAILURE, "udf load failure") TAOS_DEFINE_ERROR(TSDB_CODE_UDF_INVALID_STATE, "udf invalid state") +TAOS_DEFINE_ERROR(TSDB_CODE_UDF_INVALID_INPUT, "udf invalid function input") //schemaless TAOS_DEFINE_ERROR(TSDB_CODE_SML_INVALID_PROTOCOL_TYPE, "Invalid line protocol type") -- GitLab