From 193a84717d5bb449b66dfaf2b7db2031db041530 Mon Sep 17 00:00:00 2001 From: slzhou Date: Fri, 29 Apr 2022 18:20:50 +0800 Subject: [PATCH] udaf pass the first time --- source/libs/function/src/tudf.c | 17 +++++++++-------- source/libs/function/src/udfd.c | 2 +- source/libs/function/test/udf2.c | 4 ++-- source/libs/nodes/src/nodesCloneFuncs.c | 1 + source/libs/nodes/src/nodesCodeFuncs.c | 7 +++++++ 5 files changed, 20 insertions(+), 11 deletions(-) diff --git a/source/libs/function/src/tudf.c b/source/libs/function/src/tudf.c index df46999379..9aac2bfe0c 100644 --- a/source/libs/function/src/tudf.c +++ b/source/libs/function/src/tudf.c @@ -1176,7 +1176,7 @@ int32_t callUdfAggMerge(UdfcFuncHandle handle, SUdfInterBuf *interBuf1, SUdfInte // input: interBuf // output: resultData int32_t callUdfAggFinalize(UdfcFuncHandle handle, SUdfInterBuf *interBuf, SUdfInterBuf *resultData) { - int8_t callType = TSDB_UDF_CALL_AGG_PROC; + int8_t callType = TSDB_UDF_CALL_AGG_FIN; int32_t err = callUdf(handle, callType, NULL, interBuf, NULL, NULL, resultData); return err; } @@ -1243,12 +1243,12 @@ bool udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo* pResult } SUdfUvSession *session = (SUdfUvSession *)handle; SUdfAggRes *udfRes = (SUdfAggRes*)GET_ROWCELL_INTERBUF(pResultCellInfo); - udfRes->finalResBuf = (char*)udfRes + sizeof(SUdfAggRes); - udfRes->interResBuf = (char*)udfRes + sizeof(SUdfAggRes) + session->outputLen; - int32_t envSize = sizeof(SUdfAggRes) + session->outputLen + session->bufSize; memset(udfRes, 0, envSize); + udfRes->finalResBuf = (char*)udfRes + sizeof(SUdfAggRes); + udfRes->interResBuf = (char*)udfRes + sizeof(SUdfAggRes) + session->outputLen; + udfRes->session = (SUdfUvSession *)handle; SUdfInterBuf buf = {0}; if (callUdfAggInit(handle, &buf) != 0) { @@ -1260,7 +1260,6 @@ bool udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo* pResult } int32_t udfAggProcess(struct SqlFunctionCtx *pCtx) { - SInputColumnInfoData* pInput = &pCtx->input; int32_t numOfCols = pInput->numOfInputCols; @@ -1320,13 +1319,15 @@ int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock* pBlock) { udfRes->interResBuf = (char*)udfRes + sizeof(SUdfAggRes) + session->outputLen; - SUdfInterBuf resultBuf = {.buf = udfRes->finalResBuf, - .bufLen = session->outputLen, - .numOfResult = udfRes->finalResNum}; + SUdfInterBuf resultBuf = {0}; SUdfInterBuf state = {.buf = udfRes->interResBuf, .bufLen = session->bufSize, .numOfResult = udfRes->interResNum}; callUdfAggFinalize(session, &state, &resultBuf); + + udfRes->finalResBuf = resultBuf.buf; + udfRes->finalResNum = resultBuf.numOfResult; + teardownUdf(session); if (resultBuf.numOfResult == 1) { diff --git a/source/libs/function/src/udfd.c b/source/libs/function/src/udfd.c index ae24a832c8..06fa49e1c2 100644 --- a/source/libs/function/src/udfd.c +++ b/source/libs/function/src/udfd.c @@ -124,7 +124,7 @@ int32_t udfdLoadUdf(char *udfName, SUdf *udf) { char *finishSuffix = "_finish"; strncpy(finishFuncName, processFuncName, strlen(processFuncName)); strncat(finishFuncName, finishSuffix, strlen(finishSuffix)); - uv_dlsym(&udf->lib, startFuncName, (void **)(&udf->aggFinishFunc)); + uv_dlsym(&udf->lib, finishFuncName, (void **)(&udf->aggFinishFunc)); //TODO: merge } return 0; diff --git a/source/libs/function/test/udf2.c b/source/libs/function/test/udf2.c index 83187c5855..69ed515d2b 100644 --- a/source/libs/function/test/udf2.c +++ b/source/libs/function/test/udf2.c @@ -27,7 +27,7 @@ int32_t udf2_start(SUdfInterBuf *buf) { int32_t udf2(SUdfDataBlock* block, SUdfInterBuf *interBuf, SUdfInterBuf *newInterBuf) { int64_t sumSquares = *(int64_t*)interBuf->buf; for (int32_t i = 0; i < block->numOfCols; ++i) { - for (int32_t j = 0; j < block->numOfRows; ++i) { + for (int32_t j = 0; j < block->numOfRows; ++j) { SUdfColumn* col = block->udfCols[i]; //TODO: check the bitmap for null value int32_t* rows = (int32_t*)col->colData.fixLenCol.data; @@ -35,7 +35,7 @@ int32_t udf2(SUdfDataBlock* block, SUdfInterBuf *interBuf, SUdfInterBuf *newInte } } - *(int64_t*)newInterBuf = sumSquares; + *(int64_t*)(newInterBuf->buf) = sumSquares; newInterBuf->bufLen = sizeof(int64_t); //TODO: if all null value, numOfResult = 0; newInterBuf->numOfResult = 1; diff --git a/source/libs/nodes/src/nodesCloneFuncs.c b/source/libs/nodes/src/nodesCloneFuncs.c index f1fe68c55a..92da476319 100644 --- a/source/libs/nodes/src/nodesCloneFuncs.c +++ b/source/libs/nodes/src/nodesCloneFuncs.c @@ -154,6 +154,7 @@ static SNode* logicConditionNodeCopy(const SLogicConditionNode* pSrc, SLogicCond } static SNode* functionNodeCopy(const SFunctionNode* pSrc, SFunctionNode* pDst) { + COPY_ALL_SCALAR_FIELDS; exprNodeCopy((const SExprNode*)pSrc, (SExprNode*)pDst); COPY_CHAR_ARRAY_FIELD(functionName); COPY_SCALAR_FIELD(funcId); diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index 0e6ec4f945..828044623d 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -1885,6 +1885,7 @@ static const char* jkFunctionName = "Name"; static const char* jkFunctionId = "Id"; static const char* jkFunctionType = "Type"; static const char* jkFunctionParameter = "Parameters"; +static const char* jkFunctionUdfBufSize = "UdfBufSize"; static int32_t functionNodeToJson(const void* pObj, SJson* pJson) { const SFunctionNode* pNode = (const SFunctionNode*)pObj; @@ -1902,6 +1903,9 @@ static int32_t functionNodeToJson(const void* pObj, SJson* pJson) { if (TSDB_CODE_SUCCESS == code) { code = nodeListToJson(pJson, jkFunctionParameter, pNode->pParameterList); } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkFunctionUdfBufSize, pNode->udfBufSize); + } return code; } @@ -1922,6 +1926,9 @@ static int32_t jsonToFunctionNode(const SJson* pJson, void* pObj) { if (TSDB_CODE_SUCCESS == code) { code = jsonToNodeList(pJson, jkFunctionParameter, &pNode->pParameterList); } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetIntValue(pJson, jkFunctionUdfBufSize, &pNode->udfBufSize); + } return code; } -- GitLab