diff --git a/include/libs/function/function.h b/include/libs/function/function.h index 68fc3be617d9292f5a2f618af6eb4e52b2ec7aba..004d834287295f82a1817b3d445b1091d2e0565c 100644 --- a/include/libs/function/function.h +++ b/include/libs/function/function.h @@ -37,7 +37,7 @@ typedef struct SFuncExecEnv { typedef bool (*FExecGetEnv)(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); typedef bool (*FExecInit)(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo* pResultCellInfo); typedef int32_t (*FExecProcess)(struct SqlFunctionCtx *pCtx); -typedef void (*FExecFinalize)(struct SqlFunctionCtx *pCtx); +typedef int32_t (*FExecFinalize)(struct SqlFunctionCtx *pCtx, SSDataBlock* pBlock, int32_t slotId); typedef int32_t (*FScalarExecProcess)(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); typedef struct SScalarFuncExecFuncs { @@ -113,7 +113,7 @@ typedef struct SResultRowEntryInfo { bool initialized:1; // output buffer has been initialized bool complete:1; // query has completed uint8_t isNullRes:6; // the result is null - uint8_t numOfRes; // num of output result in current buffer + uint8_t numOfRes; // num of output result in current buffer } SResultRowEntryInfo; // determine the real data need to calculated the result diff --git a/source/libs/executor/inc/executil.h b/source/libs/executor/inc/executil.h index 7c931c860c05fb46a8388322da6b20675a9412e8..834d37927a932aeabccaacf3edfc1fffe8594ad3 100644 --- a/source/libs/executor/inc/executil.h +++ b/source/libs/executor/inc/executil.h @@ -60,7 +60,7 @@ typedef struct SResultRow { uint32_t numOfRows; // number of rows of current time window struct SResultRowEntryInfo* pEntryInfo; // For each result column, there is a resultInfo STimeWindow win; - char *key; // start key of current result row + char *key; // start key of current result row } SResultRow; typedef struct SResultRowPosition { diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 5adfb7caca4a3f633ac8b3858581a4e9760fe1bf..c6d209e706744ae66a5be15d67b6c0cbafa1af78 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -608,8 +608,8 @@ int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t int32_t initAggInfo(SOptrBasicInfo* pBasicInfo, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, size_t keyBufSize, const char* pkey); void initResultSizeInfo(SOperatorInfo* pOperator, int32_t numOfRows); -void doBuildResultDatablock(SSDataBlock* pBlock, int32_t rowCapacity, SGroupResInfo* pGroupResInfo, SExprInfo* pExprInfo, - SDiskbasedBuf* pBuf, int32_t* rowCellOffset); +void doBuildResultDatablock(SSDataBlock* pBlock, SGroupResInfo* pGroupResInfo, SExprInfo* pExprInfo, + SDiskbasedBuf* pBuf, int32_t* rowCellOffset, SqlFunctionCtx* pCtx); void finalizeMultiTupleQueryResult(SqlFunctionCtx* pCtx, int32_t numOfOutput, SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowInfo, int32_t* rowCellInfoOffset); void doApplyFunctions(SqlFunctionCtx* pCtx, STimeWindow* pWin, SColumnInfoData* pTimeWindowData, int32_t offset, diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 7a7cb68424ff4b74a5596b2dfbdc83cd00bf4493..4a0206f55e3c2729695d3331b8fd12f96b612f1e 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -230,10 +230,10 @@ int32_t operatorDummyOpenFn(SOperatorInfo* pOperator) { void operatorDummyCloseFn(void* param, int32_t numOfCols) {} -static int32_t doCopyToSDataBlock(SSDataBlock* pBlock, int32_t rowCapacity, SExprInfo* pExprInfo, SDiskbasedBuf* pBuf, SGroupResInfo* pGroupResInfo, - int32_t orderType, int32_t* rowCellOffset); -static void initCtxOutputBuffer(SqlFunctionCtx* pCtx, int32_t size); +static int32_t doCopyToSDataBlock(SSDataBlock* pBlock, SExprInfo* pExprInfo, SDiskbasedBuf* pBuf, SGroupResInfo* pGroupResInfo, + int32_t orderType, int32_t* rowCellOffset, SqlFunctionCtx* pCtx); +static void initCtxOutputBuffer(SqlFunctionCtx* pCtx, int32_t size); static void setResultBufSize(STaskAttr* pQueryAttr, SResultInfo* pResultInfo); static void setCtxTagForJoin(STaskRuntimeEnv* pRuntimeEnv, SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, void* pTable); static void doSetTableGroupOutputBuf(SAggOperatorInfo* pAggInfo, int32_t numOfOutput, uint64_t groupId, SExecTaskInfo* pTaskInfo); @@ -2809,16 +2809,6 @@ void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status) { } } -void finalizeQueryResult(SqlFunctionCtx* pCtx, int32_t numOfOutput) { - for (int32_t j = 0; j < numOfOutput; ++j) { - if (pCtx[j].functionId == -1) { - continue; - } - - pCtx[j].fpSet.finalize(&pCtx[j]); - } -} - void finalizeMultiTupleQueryResult(SqlFunctionCtx* pCtx, int32_t numOfOutput, SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowInfo, int32_t* rowCellInfoOffset) { for (int32_t i = 0; i < pResultRowInfo->size; ++i) { @@ -2841,7 +2831,7 @@ void finalizeMultiTupleQueryResult(SqlFunctionCtx* pCtx, int32_t numOfOutput, SD } if (pCtx[j].fpSet.process) { // TODO set the dummy function, to avoid the check for null ptr. - pCtx[j].fpSet.finalize(&pCtx[j]); +// pCtx[j].fpSet.finalize(&pCtx[j]); } if (pRow->numOfRows < pResInfo->numOfRes) { @@ -2872,7 +2862,7 @@ void finalizeUpdatedResult(SqlFunctionCtx* pCtx, int32_t numOfOutput, SDiskbased } if (pCtx[j].fpSet.process) { // TODO set the dummy function. - pCtx[j].fpSet.finalize(&pCtx[j]); +// pCtx[j].fpSet.finalize(&pCtx[j]); pResInfo->initialized = true; } @@ -3107,7 +3097,8 @@ void setIntervalQueryRange(STableQueryInfo* pTableQueryInfo, TSKEY key, STimeWin * @param pQInfo * @param result */ -int32_t doCopyToSDataBlock(SSDataBlock* pBlock, int32_t rowCapacity, SExprInfo* pExprInfo, SDiskbasedBuf* pBuf, SGroupResInfo* pGroupResInfo, int32_t orderType, int32_t* rowCellOffset) { +int32_t doCopyToSDataBlock(SSDataBlock* pBlock, SExprInfo* pExprInfo, SDiskbasedBuf* pBuf, SGroupResInfo* pGroupResInfo, + int32_t orderType, int32_t* rowCellOffset, SqlFunctionCtx* pCtx) { int32_t numOfRows = getNumOfTotalRes(pGroupResInfo); int32_t numOfResult = pBlock->info.rows; // there are already exists result rows @@ -3125,8 +3116,6 @@ int32_t doCopyToSDataBlock(SSDataBlock* pBlock, int32_t rowCapacity, SExprInfo* step = -1; } - int32_t nrows = pBlock->info.rows; - for (int32_t i = start; (i < numOfRows) && (i >= 0); i += step) { SResultRowPosition* pPos = taosArrayGet(pGroupResInfo->pRows, i); SFilePage* page = getBufPage(pBuf, pPos->pageId); @@ -3139,7 +3128,7 @@ int32_t doCopyToSDataBlock(SSDataBlock* pBlock, int32_t rowCapacity, SExprInfo* // TODO copy multiple rows? int32_t numOfRowsToCopy = pRow->numOfRows; - if (numOfResult + numOfRowsToCopy >= rowCapacity) { + if (numOfResult + numOfRowsToCopy >= pBlock->info.capacity) { break; } @@ -3148,31 +3137,32 @@ int32_t doCopyToSDataBlock(SSDataBlock* pBlock, int32_t rowCapacity, SExprInfo* for (int32_t j = 0; j < pBlock->info.numOfCols; ++j) { int32_t slotId = pExprInfo[j].base.resSchema.slotId; - SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId); - SResultRowEntryInfo* pEntryInfo = getResultCell(pRow, j, rowCellOffset); + pCtx[j].resultInfo = getResultCell(pRow, j, rowCellOffset); + if (pCtx[j].fpSet.process) { + pCtx[j].fpSet.finalize(&pCtx[j], pBlock, slotId); + } else { + SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId); - char* in = GET_ROWCELL_INTERBUF(pEntryInfo); - colDataAppend(pColInfoData, nrows, in, pEntryInfo->isNullRes); + char* in = GET_ROWCELL_INTERBUF(pCtx[j].resultInfo); + colDataAppend(pColInfoData, pBlock->info.rows, in, pCtx[j].resultInfo->isNullRes); + } } releaseBufPage(pBuf, page); - nrows += 1; - numOfResult += numOfRowsToCopy; - if (numOfResult == rowCapacity) { // output buffer is full + pBlock->info.rows += pRow->numOfRows; + if (pBlock->info.rows >= pBlock->info.capacity) { // output buffer is full break; } } // qDebug("QInfo:0x%"PRIx64" copy data to query buf completed", GET_TASKID(pRuntimeEnv)); - pBlock->info.rows = numOfResult; blockDataUpdateTsWindow(pBlock); - return 0; } -void doBuildResultDatablock(SSDataBlock* pBlock, int32_t rowCapacity, SGroupResInfo* pGroupResInfo, SExprInfo* pExprInfo, SDiskbasedBuf* pBuf, - int32_t* rowCellOffset) { +void doBuildResultDatablock(SSDataBlock* pBlock, SGroupResInfo* pGroupResInfo, SExprInfo* pExprInfo, SDiskbasedBuf* pBuf, + int32_t* rowCellOffset, SqlFunctionCtx* pCtx) { assert(pGroupResInfo->currentGroup <= pGroupResInfo->totalGroup); blockDataCleanup(pBlock); @@ -3181,7 +3171,7 @@ void doBuildResultDatablock(SSDataBlock* pBlock, int32_t rowCapacity, SGroupResI } int32_t orderType = TSDB_ORDER_ASC; - doCopyToSDataBlock(pBlock, rowCapacity, pExprInfo, pBuf, pGroupResInfo, orderType, rowCellOffset); + doCopyToSDataBlock(pBlock, pExprInfo, pBuf, pGroupResInfo, orderType, rowCellOffset, pCtx); // add condition (pBlock->info.rows >= 1) just to runtime happy blockDataUpdateTsWindow(pBlock); @@ -4405,7 +4395,7 @@ static void doFinalizeResultImpl(SqlFunctionCtx* pCtx, int32_t numOfExpr) { // SUdfInfo* pUdfInfo = taosArrayGet(pInfo->udfInfo, -1 * functionId - 1); // doInvokeUdf(pUdfInfo, &pCtx[j], 0, TSDB_UDF_FUNC_FINALIZE); // } else { - pCtx[j].fpSet.finalize(&pCtx[j]); +// pCtx[j].fpSet.finalize(&pCtx[j]); } } @@ -4805,7 +4795,7 @@ static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator, bool* newgroup) } blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity); - doBuildResultDatablock(pInfo->pRes, pOperator->resultInfo.capacity, &pAggInfo->groupResInfo, pOperator->pExpr, pAggInfo->aggSup.pResultBuf, pInfo->rowCellInfoOffset); + doBuildResultDatablock(pInfo->pRes, &pAggInfo->groupResInfo, pOperator->pExpr, pAggInfo->aggSup.pResultBuf, pInfo->rowCellInfoOffset, pInfo->pCtx); if (pInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pAggInfo->groupResInfo)) { doSetOperatorCompleted(pOperator); } @@ -5129,8 +5119,7 @@ static SSDataBlock* doBuildIntervalResult(SOperatorInfo* pOperator, bool* newgro } blockDataEnsureCapacity(pBlock, pOperator->resultInfo.capacity); - doBuildResultDatablock(pBlock, pOperator->resultInfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, - pInfo->aggSup.pResultBuf, pInfo->binfo.rowCellInfoOffset); + doBuildResultDatablock(pBlock, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pInfo->binfo.rowCellInfoOffset, pInfo->binfo.pCtx); if (pBlock->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) { doSetOperatorCompleted(pOperator); @@ -5149,7 +5138,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo *pOperator, bool* newgroup } if (pOperator->status == OP_RES_TO_RETURN) { - doBuildResultDatablock(pInfo->binfo.pRes, pOperator->resultInfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pInfo->binfo.rowCellInfoOffset); + doBuildResultDatablock(pInfo->binfo.pRes, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pInfo->binfo.rowCellInfoOffset, pInfo->binfo.pCtx); if (pInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) { pOperator->status = OP_EXEC_DONE; } @@ -5184,7 +5173,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo *pOperator, bool* newgroup initMultiResInfoFromArrayList(&pInfo->groupResInfo, pUpdated); blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); - doBuildResultDatablock(pInfo->binfo.pRes, pOperator->resultInfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pInfo->binfo.rowCellInfoOffset); + doBuildResultDatablock(pInfo->binfo.pRes, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pInfo->binfo.rowCellInfoOffset, pInfo->binfo.pCtx); ASSERT(pInfo->binfo.pRes->info.rows > 0); pOperator->status = OP_RES_TO_RETURN; @@ -5229,7 +5218,7 @@ static SSDataBlock* doAllIntervalAgg(SOperatorInfo *pOperator, bool* newgroup) { pOperator->status = OP_RES_TO_RETURN; closeAllResultRows(&pSliceInfo->binfo.resultRowInfo); setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED); - finalizeQueryResult(pSliceInfo->binfo.pCtx, pOperator->numOfOutput); +// finalizeQueryResult(pSliceInfo->binfo.pCtx, pOperator->numOfOutput); initGroupResInfo(&pSliceInfo->groupResInfo, &pSliceInfo->binfo.resultRowInfo); // doBuildResultDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pSliceInfo->pRes); @@ -5286,8 +5275,8 @@ static SSDataBlock* doSTableIntervalAgg(SOperatorInfo* pOperator, bool* newgroup OPTR_SET_OPENED(pOperator); blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); - doBuildResultDatablock(pInfo->binfo.pRes, pOperator->resultInfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, - pInfo->aggSup.pResultBuf, pInfo->binfo.rowCellInfoOffset); + doBuildResultDatablock(pInfo->binfo.pRes, &pInfo->groupResInfo, pOperator->pExpr, + pInfo->aggSup.pResultBuf, pInfo->binfo.rowCellInfoOffset, pInfo->binfo.pCtx); if (pInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) { doSetOperatorCompleted(pOperator); @@ -5377,7 +5366,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator, bool* newgroup) { SOptrBasicInfo* pBInfo = &pInfo->binfo; if (pOperator->status == OP_RES_TO_RETURN) { - doBuildResultDatablock(pBInfo->pRes, pOperator->resultInfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pBInfo->rowCellInfoOffset); + doBuildResultDatablock(pBInfo->pRes, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pBInfo->rowCellInfoOffset, pInfo->binfo.pCtx); if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) { doSetOperatorCompleted(pOperator); return NULL; @@ -5409,7 +5398,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator, bool* newgroup) { initGroupResInfo(&pInfo->groupResInfo, &pBInfo->resultRowInfo); blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity); - doBuildResultDatablock(pBInfo->pRes, pOperator->resultInfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pBInfo->rowCellInfoOffset); + doBuildResultDatablock(pBInfo->pRes, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pBInfo->rowCellInfoOffset, pInfo->binfo.pCtx); if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) { doSetOperatorCompleted(pOperator); } @@ -5426,7 +5415,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator, bool* newgroup) SOptrBasicInfo* pBInfo = &pInfo->binfo; if (pOperator->status == OP_RES_TO_RETURN) { - doBuildResultDatablock(pBInfo->pRes, pOperator->resultInfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pBInfo->rowCellInfoOffset); + doBuildResultDatablock(pBInfo->pRes, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pBInfo->rowCellInfoOffset, pInfo->binfo.pCtx); if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) { doSetOperatorCompleted(pOperator); return NULL; @@ -5458,7 +5447,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator, bool* newgroup) initGroupResInfo(&pInfo->groupResInfo, &pBInfo->resultRowInfo); blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity); - doBuildResultDatablock(pBInfo->pRes, pOperator->resultInfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pBInfo->rowCellInfoOffset); + doBuildResultDatablock(pBInfo->pRes, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pBInfo->rowCellInfoOffset, pInfo->binfo.pCtx); if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) { doSetOperatorCompleted(pOperator); } diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index 9f75f97632fe965a8324eb1778f2b7961a1b12a9..8a0da52a8ea17ab00dee6cb652910657169754cb 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -265,7 +265,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator, bool* newgrou SSDataBlock* pRes = pInfo->binfo.pRes; if (pOperator->status == OP_RES_TO_RETURN) { - doBuildResultDatablock(pRes, pOperator->resultInfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pInfo->binfo.rowCellInfoOffset); + doBuildResultDatablock(pRes, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pInfo->binfo.rowCellInfoOffset, pInfo->binfo.pCtx); if (pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) { pOperator->status = OP_EXEC_DONE; } @@ -311,7 +311,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator, bool* newgrou initGroupResInfo(&pInfo->groupResInfo, &pInfo->binfo.resultRowInfo); while(1) { - doBuildResultDatablock(pRes, pOperator->resultInfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pInfo->binfo.rowCellInfoOffset); + doBuildResultDatablock(pRes, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pInfo->binfo.rowCellInfoOffset, pInfo->binfo.pCtx); doFilter(pInfo->pCondition, pRes); bool hasRemain = hasRemainDataInCurrentGroup(&pInfo->groupResInfo); diff --git a/source/libs/function/inc/builtinsimpl.h b/source/libs/function/inc/builtinsimpl.h index 8473138e4af4fd5f075a2bb7a7ff75087b75d90b..db92740fff260f07dbc438166b7d04682bbea136 100644 --- a/source/libs/function/inc/builtinsimpl.h +++ b/source/libs/function/inc/builtinsimpl.h @@ -24,7 +24,7 @@ extern "C" { #include "functionMgt.h" bool functionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo); -void functionFinalize(SqlFunctionCtx *pCtx); +int32_t functionFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t slotId); EFuncDataRequired countDataRequired(SFunctionNode* pFunc, STimeWindow* pTimeWindow); bool getCountFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); @@ -43,12 +43,12 @@ int32_t maxFunction(SqlFunctionCtx *pCtx); bool getStddevFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); bool stddevFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo); int32_t stddevFunction(SqlFunctionCtx* pCtx); -void stddevFinalize(SqlFunctionCtx* pCtx); +int32_t stddevFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t slotId); bool getPercentileFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); bool percentileFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo); int32_t percentileFunction(SqlFunctionCtx *pCtx); -void percentileFinalize(SqlFunctionCtx* pCtx); +int32_t percentileFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t slotId); bool getDiffFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); bool diffFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResInfo); @@ -60,6 +60,7 @@ int32_t lastFunction(SqlFunctionCtx *pCtx); bool getTopBotFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv); int32_t topFunction(SqlFunctionCtx *pCtx); +int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t slotId); #ifdef __cplusplus } diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index efd8de84c40b233f8a3a18f144b5112ec5993a2b..4f5dc7d6a1228cb2fa7696dcc6771650f77eae91 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -507,7 +507,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .getEnvFunc = getTopBotFuncEnv, .initFunc = functionSetup, .processFunc = topFunction, - .finalizeFunc = functionFinalize + .finalizeFunc = topBotFinalize, }, { .name = "bottom", diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 09427265889f921b7c023738dde788b9669495a4..3dc2e62e92e4b8e28d4178032b4aaed81d050a8a 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -49,11 +49,17 @@ bool functionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo) { return true; } -void functionFinalize(SqlFunctionCtx *pCtx) { - SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); +int32_t functionFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t slotId) { + SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId); - cleanupResultRowEntry(pResInfo); + SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); pResInfo->isNullRes = (pResInfo->numOfRes == 0)? 1:0; + cleanupResultRowEntry(pResInfo); + + char* in = GET_ROWCELL_INTERBUF(pResInfo); + colDataAppend(pCol, pBlock->info.rows, in, pResInfo->isNullRes); + + return pResInfo->numOfRes; } EFuncDataRequired countDataRequired(SFunctionNode* pFunc, STimeWindow* pTimeWindow) { @@ -612,12 +618,11 @@ int32_t stddevFunction(SqlFunctionCtx* pCtx) { return TSDB_CODE_SUCCESS; } -void stddevFinalize(SqlFunctionCtx* pCtx) { - functionFinalize(pCtx); - +int32_t stddevFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t slotId) { SStddevRes* pStddevRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); double avg = pStddevRes->isum / ((double) pStddevRes->count); pStddevRes->result = sqrt(pStddevRes->quadraticISum/((double)pStddevRes->count) - avg*avg); + return functionFinalize(pCtx, pBlock, slotId); } typedef struct SPercentileInfo { @@ -739,7 +744,7 @@ int32_t percentileFunction(SqlFunctionCtx *pCtx) { return TSDB_CODE_SUCCESS; } -void percentileFinalize(SqlFunctionCtx* pCtx) { +int32_t percentileFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t slotId) { SVariant* pVal = &pCtx->param[1].param; double v = pVal->nType == TSDB_DATA_TYPE_INT ? pVal->i : pVal->d; @@ -752,7 +757,7 @@ void percentileFinalize(SqlFunctionCtx* pCtx) { } tMemBucketDestroy(pMemBucket); - functionFinalize(pCtx); + return functionFinalize(pCtx, pBlock, slotId); } bool getFirstLastFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) { @@ -1173,16 +1178,14 @@ typedef struct STopBotResItem { } STopBotResItem; typedef struct STopBotRes { - int32_t num; + int32_t pageId; +// int32_t num; STopBotResItem *pItems; } STopBotRes; bool getTopBotFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) { - SColumnNode* pColNode = (SColumnNode*) nodesListGetNode(pFunc->pParameterList, 0); - int32_t bytes = pColNode->node.resType.bytes; SValueNode* pkNode = (SValueNode*) nodesListGetNode(pFunc->pParameterList, 1); - - pEnv->calcMemSize = sizeof(STopBotRes) + pkNode->datum.i * bytes; + pEnv->calcMemSize = sizeof(STopBotRes) + pkNode->datum.i * sizeof(STopBotResItem); return true; } @@ -1194,13 +1197,14 @@ static STopBotRes *getTopBotOutputInfo(SqlFunctionCtx *pCtx) { return pRes; } -static void doAddIntoResult(STopBotRes *pRes, int32_t maxSize, void *pData, uint16_t type, uint64_t uid); +static void doAddIntoResult(STopBotRes* pRes, int32_t maxSize, void* pData, int32_t rowIndex, SSDataBlock* pSrcBlock, + uint16_t type, uint64_t uid, SResultRowEntryInfo* pEntryInfo); int32_t topFunction(SqlFunctionCtx *pCtx) { int32_t numOfElems = 0; + SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx); STopBotRes *pRes = getTopBotOutputInfo(pCtx); - assert(pRes->num >= 0); // if ((void *)pRes->res[0] != (void *)((char *)pRes + sizeof(STopBotRes) + POINTER_BYTES * pCtx->param[0].i)) { // buildTopBotStruct(pRes, pCtx); @@ -1221,11 +1225,9 @@ int32_t topFunction(SqlFunctionCtx *pCtx) { numOfElems++; char* data = colDataGetData(pCol, i); - doAddIntoResult(pRes, pCtx->param[1].param.i, data, type, pInput->uid); + doAddIntoResult(pRes, pCtx->param[1].param.i, data, i, NULL, type, pInput->uid, pResInfo); } - // treat the result as only one result - SET_VAL(GET_RES_INFO(pCtx), numOfElems, 1); return TSDB_CODE_SUCCESS; } @@ -1256,7 +1258,9 @@ static int32_t topBotResComparFn(const void *p1, const void *p2, const void *par return (val1->v.d > val2->v.d) ? 1 : -1; } -void doAddIntoResult(STopBotRes *pRes, int32_t maxSize, void *pData, uint16_t type, uint64_t uid) { + +void doAddIntoResult(STopBotRes *pRes, int32_t maxSize, void *pData, int32_t rowIndex, SSDataBlock* pSrcBlock, uint16_t type, + uint64_t uid, SResultRowEntryInfo* pEntryInfo) { SVariant val = {0}; taosVariantCreateFromBinary(&val, pData, tDataTypes[type].bytes, type); @@ -1264,29 +1268,71 @@ void doAddIntoResult(STopBotRes *pRes, int32_t maxSize, void *pData, uint16_t ty assert(pItems != NULL); // not full yet - if (pRes->num < maxSize) { - STopBotResItem* pItem = &pItems[pRes->num]; + if (pEntryInfo->numOfRes < maxSize) { + STopBotResItem* pItem = &pItems[pEntryInfo->numOfRes]; pItem->v = val; pItem->uid = uid; pItem->tuplePos.pageId = -1; // todo set the corresponding tuple data in the disk-based buffer - pRes->num++; - taosheapsort((void *) pItem, sizeof(STopBotResItem), pRes->num, (const void *) &type, topBotResComparFn, false); + if (pRes->pageId == -1) { + SFilePage* pPage = getNewBufPage(NULL, 0, &pRes->pageId); + pPage->num = sizeof(SFilePage); + + // keep the current row data + for(int32_t i = 0; i < pSrcBlock->info.numOfCols; ++i) { + SColumnInfoData* pCol = taosArrayGet(pSrcBlock->pDataBlock, i); + bool isNull = colDataIsNull_s(pCol, rowIndex); + + + colDataGetData(pCol, rowIndex); + } + + } + + // allocate the buffer and keep the data of this row into the new allocated buffer + pEntryInfo->numOfRes++; + taosheapsort((void *) pItems, sizeof(STopBotResItem), pEntryInfo->numOfRes, (const void *) &type, topBotResComparFn, false); } else { // replace the minimum value in the result if ((IS_SIGNED_NUMERIC_TYPE(type) && val.i > pItems[0].v.i) || (IS_UNSIGNED_NUMERIC_TYPE(type) && val.u > pItems[0].v.u) || (IS_FLOAT_TYPE(type) && val.d > pItems[0].v.d)) { - STopBotResItem* pItem = &pItems[pRes->num]; + STopBotResItem* pItem = &pItems[0]; pItem->v = val; pItem->uid = uid; pItem->tuplePos.pageId = -1; // todo set the corresponding tuple data in the disk-based buffer - taosheapadjust((void *) pItem, sizeof(STopBotResItem), 0, pRes->num - 1, (const void *) &type, topBotResComparFn, NULL, false); + taosheapadjust((void *) pItems, sizeof(STopBotResItem), 0, pEntryInfo->numOfRes - 1, (const void *) &type, topBotResComparFn, NULL, false); } } } -void topBotFinalize(SqlFunctionCtx* pCtx) { - functionFinalize(pCtx); +int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t slotId) { + SResultRowEntryInfo *pEntryInfo = GET_RES_INFO(pCtx); + STopBotRes* pRes = GET_ROWCELL_INTERBUF(pEntryInfo); + pEntryInfo->complete = true; + + int32_t type = pCtx->input.pData[0]->info.type; + SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId); + + // todo assign the tag value and the corresponding row data + int32_t currentRow = pBlock->info.rows; + switch(type) { + case TSDB_DATA_TYPE_INT: { + for (int32_t i = 0; i < pEntryInfo->numOfRes; ++i) { + STopBotResItem* pItem = &pRes->pItems[i]; + colDataAppendInt32(pCol, currentRow++, (int32_t*)&pItem->v.i); + + int32_t pageId = pItem->tuplePos.pageId; + int32_t offset = pItem->tuplePos.offset; + if (pageId != -1) { + // todo + } + } + break; + } + } + + return pEntryInfo->numOfRes; +// return functionFinalize(pCtx, pBlock, slotId); } \ No newline at end of file