diff --git a/source/libs/function/src/tudf.c b/source/libs/function/src/tudf.c index 73bb56a509198712859483b131528b2f150576e5..cf9dc15aa8311620f3c534e6e870efeb41d7ed23 100644 --- a/source/libs/function/src/tudf.c +++ b/source/libs/function/src/tudf.c @@ -1253,8 +1253,8 @@ bool udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo* pResult if (callUdfAggInit(handle, &buf) != 0) { return false; } - memcpy(udfRes->interResBuf, buf.buf, buf.bufLen); udfRes->interResNum = buf.numOfResult; + memcpy(udfRes->interResBuf, buf.buf, buf.bufLen); return true; } @@ -1263,14 +1263,14 @@ int32_t udfAggProcess(struct SqlFunctionCtx *pCtx) { SInputColumnInfoData* pInput = &pCtx->input; int32_t numOfCols = pInput->numOfInputCols; - char* udfRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); - + SUdfAggRes* udfRes = (SUdfAggRes *)GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); + SUdfUvSession *session = udfRes->session; + udfRes->finalResBuf = (char*)udfRes + sizeof(SUdfAggRes); + udfRes->interResBuf = (char*)udfRes + sizeof(SUdfAggRes) + session->outputLen; int32_t start = pInput->startRowIndex; int32_t numOfRows = pInput->numOfRows; - UdfcFuncHandle handle =(UdfcFuncHandle)(*(int64_t*)(udfRes)); - SUdfUvSession *session = (SUdfUvSession *)handle; SSDataBlock tempBlock = {0}; tempBlock.info.numOfCols = numOfCols; @@ -1290,16 +1290,20 @@ int32_t udfAggProcess(struct SqlFunctionCtx *pCtx) { SSDataBlock *inputBlock = blockDataExtractBlock(&tempBlock, start, numOfRows); - SUdfInterBuf state = {.buf = udfRes + sizeof(int64_t) + session->outputLen, + SUdfInterBuf state = {.buf = udfRes->interResBuf, .bufLen = session->bufSize, - .numOfResult = GET_RES_INFO(pCtx)->numOfRes}; + .numOfResult = udfRes->interResNum}; SUdfInterBuf newState = {0}; - callUdfAggProcess(handle, inputBlock, &state, &newState); + callUdfAggProcess(session, inputBlock, &state, &newState); - memcpy(state.buf, newState.buf, newState.bufLen); + udfRes->interResNum = newState.numOfResult; + memcpy(udfRes->interResBuf, newState.buf, newState.bufLen); + + if (newState.numOfResult == 1 || state.numOfResult == 1) { + GET_RES_INFO(pCtx)->numOfRes = 1; + } - GET_RES_INFO(pCtx)->numOfRes = (newState.numOfResult == 1 ? 1 : 0); blockDataDestroy(inputBlock); taosArrayDestroy(tempBlock.pDataBlock); @@ -1309,22 +1313,26 @@ int32_t udfAggProcess(struct SqlFunctionCtx *pCtx) { } int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock* pBlock) { - char* udfRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); + SUdfAggRes* udfRes = (SUdfAggRes *)GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); + SUdfUvSession *session = udfRes->session; + udfRes->finalResBuf = (char*)udfRes + sizeof(SUdfAggRes); + udfRes->interResBuf = (char*)udfRes + sizeof(SUdfAggRes) + session->outputLen; - UdfcFuncHandle handle =(UdfcFuncHandle)(*(int64_t*)(udfRes)); - SUdfUvSession *session = (SUdfUvSession *)handle; - SUdfInterBuf resultBuf = {.buf = udfRes + sizeof(int64_t), - .bufLen = session->outputLen}; - SUdfInterBuf state = {.buf = udfRes + sizeof(int64_t) + session->outputLen, + SUdfInterBuf resultBuf = {.buf = udfRes->finalResBuf, + .bufLen = session->outputLen, + .numOfResult = udfRes->finalResNum}; + SUdfInterBuf state = {.buf = udfRes->interResBuf, .bufLen = session->bufSize, - .numOfResult = GET_RES_INFO(pCtx)->numOfRes}; - callUdfAggFinalize(handle, &state, &resultBuf); + .numOfResult = udfRes->interResNum}; + callUdfAggFinalize(session, &state, &resultBuf); + teardownUdf(session); - GET_RES_INFO(pCtx)->numOfRes = (resultBuf.numOfResult == 1 ? 1 : 0); + if (resultBuf.numOfResult == 1) { + GET_RES_INFO(pCtx)->numOfRes = 1; + } functionFinalize(pCtx, pBlock); - teardownUdf(handle); return 0; } \ No newline at end of file