diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 648a60c29958aca84c3cf632aff214bea4bdbd04..c6ab9f859d735a4599a9fbdd6401b1b8fae0516c 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -89,6 +89,7 @@ typedef struct SStddevRes { double dsum; int64_t isum; }; + int16_t type; } SStddevRes; typedef struct SLeastSQRInfo { @@ -1513,6 +1514,7 @@ int32_t stddevFunction(SqlFunctionCtx* pCtx) { int32_t type = pInput->pData[0]->info.type; SStddevRes* pStddevRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); + pStddevRes->type = type; // computing based on the true data block SColumnInfoData* pCol = pInput->pData[0]; @@ -1629,6 +1631,39 @@ _stddev_over: return TSDB_CODE_SUCCESS; } +static void stddevTransferInfo(SStddevRes* pInput, SStddevRes* pOutput) { + pOutput->type = pInput->type; + if (IS_INTEGER_TYPE(pOutput->type)) { + pOutput->quadraticISum += pInput->quadraticISum; + pOutput->isum += pInput->isum; + } else { + pOutput->quadraticDSum += pInput->quadraticDSum; + pOutput->dsum += pInput->dsum; + } + + pOutput->count += pInput->count; + + return; +} + +int32_t stddevFunctionMerge(SqlFunctionCtx* pCtx) { + SInputColumnInfoData* pInput = &pCtx->input; + SColumnInfoData* pCol = pInput->pData[0]; + ASSERT(pCol->info.type == TSDB_DATA_TYPE_BINARY); + + SStddevRes* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); + + int32_t start = pInput->startRowIndex; + char* data = colDataGetData(pCol, start); + SStddevRes* pInputInfo = (SStddevRes*)varDataVal(data); + + stddevTransferInfo(pInputInfo, pInfo); + + SET_VAL(GET_RES_INFO(pCtx), 1, 1); + + return TSDB_CODE_SUCCESS; +} + #define LIST_STDDEV_SUB_N(sumT, T) \ do { \ T* plist = (T*)pCol->pData; \ @@ -1694,9 +1729,10 @@ int32_t stddevInvertFunction(SqlFunctionCtx* pCtx) { int32_t stddevFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { SInputColumnInfoData* pInput = &pCtx->input; - int32_t type = pInput->pData[0]->info.type; SStddevRes* pStddevRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); + int32_t type = pStddevRes->type; double avg; + if (IS_INTEGER_TYPE(type)) { avg = pStddevRes->isum / ((double)pStddevRes->count); pStddevRes->result = sqrt(pStddevRes->quadraticISum / ((double)pStddevRes->count) - avg * avg); @@ -1708,6 +1744,24 @@ int32_t stddevFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { return functionFinalize(pCtx, pBlock); } +int32_t stddevPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { + SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); + SStddevRes* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); + int32_t resultBytes = getStddevInfoSize(); + char* res = taosMemoryCalloc(resultBytes + VARSTR_HEADER_SIZE, sizeof(char)); + + memcpy(varDataVal(res), pInfo, resultBytes); + varDataSetLen(res, resultBytes); + + int32_t slotId = pCtx->pExpr->base.resSchema.slotId; + SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId); + + colDataAppend(pCol, pBlock->info.rows, res, false); + + taosMemoryFree(res); + return pResInfo->numOfRes; +} + int32_t stddevCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) { SResultRowEntryInfo* pDResInfo = GET_RES_INFO(pDestCtx); SStddevRes* pDBuf = GET_ROWCELL_INTERBUF(pDResInfo);