diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 577f9772be1223ba29bc087872f6f5e2bea9f57f..e89459e555d1290c04996a8b3368ecdc076e7f66 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -519,6 +519,7 @@ typedef struct SBlockDistInfo { typedef struct SOptrBasicInfo { SResultRowInfo resultRowInfo; SSDataBlock* pRes; + bool mergeResultBlock; } SOptrBasicInfo; typedef struct SIntervalAggOperatorInfo { @@ -593,7 +594,6 @@ typedef struct SAggOperatorInfo { uint64_t groupId; SGroupResInfo groupResInfo; SExprSupp scalarExprSup; - SNode *pCondition; } SAggOperatorInfo; @@ -891,7 +891,7 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScanPhysiNode *pScanPhyNode, const char* pUser, SExecTaskInfo* pTaskInfo); SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SNode* pCondition, SExprInfo* pScalarExprInfo, - int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo); + int32_t numOfScalarExpr, bool mergeResult, SExecTaskInfo* pTaskInfo); SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhysiNode *pNode, SExecTaskInfo* pTaskInfo); SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhysiNode* pProjPhyNode, SExecTaskInfo* pTaskInfo); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index e52cbf40a9470bf9a8ee6e725a94240e9ba63493..a7b843d9c293d342e5de6c0ce1e9f2820b889d85 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -136,9 +136,8 @@ SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn, void operatorDummyCloseFn(void* param, int32_t numOfCols) {} -static int32_t doCopyToSDataBlock(SExecTaskInfo* taskInfo, SSDataBlock* pBlock, SExprInfo* pExprInfo, - SDiskbasedBuf* pBuf, SGroupResInfo* pGroupResInfo, const int32_t* rowCellOffset, - SqlFunctionCtx* pCtx, int32_t numOfExprs); +static int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprSupp* pSup, SDiskbasedBuf* pBuf, + SGroupResInfo* pGroupResInfo); static void initCtxOutputBuffer(SqlFunctionCtx* pCtx, int32_t size); static void doSetTableGroupOutputBuf(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId); @@ -1501,19 +1500,24 @@ int32_t finalizeResultRowIntoResultDataBlock(SDiskbasedBuf* pBuf, SResultRowPosi return 0; } -int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprInfo* pExprInfo, SDiskbasedBuf* pBuf, - SGroupResInfo* pGroupResInfo, const int32_t* rowCellOffset, SqlFunctionCtx* pCtx, - int32_t numOfExprs) { +int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprSupp* pSup, SDiskbasedBuf* pBuf, + SGroupResInfo* pGroupResInfo) { + SExprInfo* pExprInfo = pSup->pExprInfo; + int32_t numOfExprs = pSup->numOfExprs; + int32_t* rowEntryOffset = pSup->rowEntryInfoOffset; + SqlFunctionCtx* pCtx = pSup->pCtx; + int32_t numOfRows = getNumOfTotalRes(pGroupResInfo); - int32_t start = pGroupResInfo->index; - for (int32_t i = start; i < numOfRows; i += 1) { + for (int32_t i = pGroupResInfo->index; i < numOfRows; i += 1) { SResKeyPos* pPos = taosArrayGetP(pGroupResInfo->pRows, i); SFilePage* page = getBufPage(pBuf, pPos->pos.pageId); SResultRow* pRow = (SResultRow*)((char*)page + pPos->pos.offset); - doUpdateNumOfRows(pRow, numOfExprs, rowCellOffset); + doUpdateNumOfRows(pRow, numOfExprs, rowEntryOffset); + + // no results, continue to check the next one if (pRow->numOfRows == 0) { pGroupResInfo->index += 1; releaseBufPage(pBuf, page); @@ -1531,6 +1535,7 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprI } if (pBlock->info.rows + pRow->numOfRows > pBlock->info.capacity) { + ASSERT(pBlock->info.rows > 0); releaseBufPage(pBuf, page); break; } @@ -1540,7 +1545,7 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprI for (int32_t j = 0; j < numOfExprs; ++j) { int32_t slotId = pExprInfo[j].base.resSchema.slotId; - pCtx[j].resultInfo = getResultEntryInfo(pRow, j, rowCellOffset); + pCtx[j].resultInfo = getResultEntryInfo(pRow, j, rowEntryOffset); if (pCtx[j].fpSet.finalize) { #ifdef BUF_PAGE_DEBUG qDebug("\npage_finalize %d", numOfExprs); @@ -1573,26 +1578,19 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprI releaseBufPage(pBuf, page); pBlock->info.rows += pRow->numOfRows; - // if (pBlock->info.rows >= pBlock->info.capacity) { // output buffer is full - // break; - // } } qDebug("%s result generated, rows:%d, groupId:%" PRIu64, GET_TASKID(pTaskInfo), pBlock->info.rows, pBlock->info.groupId); + blockDataUpdateTsWindow(pBlock, 0); return 0; } void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo, SDiskbasedBuf* pBuf) { - SExprInfo* pExprInfo = pOperator->exprSupp.pExprInfo; - int32_t numOfExprs = pOperator->exprSupp.numOfExprs; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - - int32_t* rowCellOffset = pOperator->exprSupp.rowEntryInfoOffset; - SSDataBlock* pBlock = pbInfo->pRes; - SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx; + SSDataBlock* pBlock = pbInfo->pRes; // set output datablock version pBlock->info.version = pTaskInfo->version; @@ -1604,30 +1602,22 @@ void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SG // clear the existed group id pBlock->info.groupId = 0; - doCopyToSDataBlock(pTaskInfo, pBlock, pExprInfo, pBuf, pGroupResInfo, rowCellOffset, pCtx, numOfExprs); -} - -static void updateNumOfRowsInResultRows(SqlFunctionCtx* pCtx, int32_t numOfOutput, SResultRowInfo* pResultRowInfo, - int32_t* rowEntryInfoOffset) { - // update the number of result for each, only update the number of rows for the corresponding window result. - // if (QUERY_IS_INTERVAL_QUERY(pQueryAttr)) { - // return; - // } -#if 0 - for (int32_t i = 0; i < pResultRowInfo->size; ++i) { - SResultRow* pResult = pResultRowInfo->pResult[i]; - - for (int32_t j = 0; j < numOfOutput; ++j) { - int32_t functionId = pCtx[j].functionId; - if (functionId == FUNCTION_TS || functionId == FUNCTION_TAG || functionId == FUNCTION_TAGPRJ) { - continue; + if (!pbInfo->mergeResultBlock) { + doCopyToSDataBlock(pTaskInfo, pBlock, &pOperator->exprSupp, pBuf, pGroupResInfo); + } else { + while(hasRemainResults(pGroupResInfo)) { + doCopyToSDataBlock(pTaskInfo, pBlock, &pOperator->exprSupp, pBuf, pGroupResInfo); + if (pBlock->info.rows >= pOperator->resultInfo.threshold) { + break; } - SResultRowEntryInfo* pCell = getResultEntryInfo(pResult, j, rowEntryInfoOffset); - pResult->numOfRows = (uint16_t)(TMAX(pResult->numOfRows, pCell->numOfRes)); + // clearing group id to continue to merge data that belong to different groups + pBlock->info.groupId = 0; } + + // clear the group id info in SSDataBlock, since the client does not need it + pBlock->info.groupId = 0; } -#endif } static int32_t compressQueryColData(SColumnInfoData* pColRes, int32_t numOfRows, char* data, int8_t compressed) { @@ -2979,6 +2969,7 @@ static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator) { break; } } + size_t rows = blockDataGetNumOfRows(pInfo->pRes); pOperator->resultInfo.totalRows += rows; @@ -3483,7 +3474,7 @@ void cleanupExprSupp(SExprSupp* pSupp) { SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SNode* pCondition, SExprInfo* pScalarExprInfo, - int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo) { + int32_t numOfScalarExpr, bool mergeResult, SExecTaskInfo* pTaskInfo) { SAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SAggOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { @@ -3505,6 +3496,7 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* goto _error; } + pInfo->binfo.mergeResultBlock = mergeResult; pInfo->groupId = UINT64_MAX; pInfo->pCondition = pCondition; pOperator->name = "TableAggregate"; @@ -4098,7 +4090,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo pScalarExprInfo, numOfScalarExpr, pTaskInfo); } else { pOptr = createAggregateOperatorInfo(ops[0], pExprInfo, num, pResBlock, pAggNode->node.pConditions, - pScalarExprInfo, numOfScalarExpr, pTaskInfo); + pScalarExprInfo, numOfScalarExpr, pAggNode->mergeDataBlock, pTaskInfo); } } else if (QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL == type || QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL == type) { SIntervalPhysiNode* pIntervalPhyNode = (SIntervalPhysiNode*)pPhyNode; diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 802e1f2306776165ccd1a45fd389adc6b5307ead..c447d6b43b02b967e29241cf6dcc1644a156921b 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1798,6 +1798,7 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pInfo->twAggSup = *pTwAggSupp; pInfo->ignoreExpiredData = pPhyNode->window.igExpired; pInfo->pCondition = pPhyNode->window.node.pConditions; + pInfo->binfo.mergeResultBlock = pPhyNode->window.mergeDataBlock; if (pPhyNode->window.pExprs != NULL) { int32_t numOfScalar = 0;