From c038848f72ed74e905fe2665499e9957c1f710e8 Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Fri, 10 Jun 2022 18:33:24 +0800 Subject: [PATCH] feature: output results in merge-interval agg buf when exhausted input stream --- source/libs/executor/src/executorimpl.c | 1 + source/libs/executor/src/timewindowoperator.c | 81 +++++++++++-------- 2 files changed, 49 insertions(+), 33 deletions(-) diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 4634cc9f47..726be6d0a2 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -2002,6 +2002,7 @@ int32_t finalizeResultRowIntoResultDataBlock(SDiskbasedBuf* pBuf, SResultRowPosi } releaseBufPage(pBuf, page); + pBlock->info.rows += pRow->numOfRows; return 0; } diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index a3c01629d6..c1c504a400 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -3202,6 +3202,7 @@ typedef struct SMergeIntervalAggOperatorInfo { bool hasGroupId; uint64_t groupId; SSDataBlock* prefetchedBlock; + bool inputBlocksFinished; } SMergeIntervalAggOperatorInfo; void destroyMergeIntervalOperatorInfo(void* param, int32_t numOfOutput) { @@ -3223,7 +3224,7 @@ static int32_t outputPrevIntervalResult(SOperatorInfo* pOperatorInfo, uint64_t t return 0; } - if (ascScan && newWin->skey > prevWin->ekey || (!ascScan) && newWin->skey < prevWin->ekey) { + if (newWin == NULL || (ascScan && newWin->skey > prevWin->ekey || (!ascScan) && newWin->skey < prevWin->ekey) ) { SET_RES_WINDOW_KEY(iaInfo->aggSup.keyBuf, &prevWin->skey, TSDB_KEYSIZE, tableGroupId); SResultRowPosition* p1 = (SResultRowPosition*)taosHashGet(iaInfo->aggSup.pResultRowHashTable, iaInfo->aggSup.keyBuf, GET_RES_WINDOW_KEY_LEN(TSDB_KEYSIZE)); @@ -3233,9 +3234,13 @@ static int32_t outputPrevIntervalResult(SOperatorInfo* pOperatorInfo, uint64_t t pOperatorInfo->numOfExprs, iaInfo->binfo.rowCellInfoOffset, pResultBlock, pTaskInfo); taosHashRemove(iaInfo->aggSup.pResultRowHashTable, iaInfo->aggSup.keyBuf, GET_RES_WINDOW_KEY_LEN(TSDB_KEYSIZE)); - - taosHashPut(miaInfo->groupIntervalHash, &tableGroupId, sizeof(tableGroupId), newWin, sizeof(STimeWindow)); + if (newWin == NULL) { + taosHashRemove(miaInfo->groupIntervalHash, &tableGroupId, sizeof(tableGroupId)); + } else { + taosHashPut(miaInfo->groupIntervalHash, &tableGroupId, sizeof(tableGroupId), newWin, sizeof(STimeWindow)); + } } + return 0; } @@ -3343,47 +3348,57 @@ static SSDataBlock* doMergeIntervalAgg(SOperatorInfo* pOperator) { SSDataBlock* pRes = iaInfo->binfo.pRes; blockDataCleanup(pRes); + blockDataEnsureCapacity(pRes, pOperator->resultInfo.capacity); - SOperatorInfo* downstream = pOperator->pDownstream[0]; - int32_t scanFlag = MAIN_SCAN; - while (1) { - SSDataBlock* pBlock = NULL; - if (miaInfo->prefetchedBlock == NULL) { - pBlock = downstream->fpSet.getNextFn(downstream); - } else { - pBlock = miaInfo->prefetchedBlock; - miaInfo->groupId = pBlock->info.groupId; - } + if (!miaInfo->inputBlocksFinished) { + SOperatorInfo* downstream = pOperator->pDownstream[0]; + int32_t scanFlag = MAIN_SCAN; + while (1) { + SSDataBlock* pBlock = NULL; + if (miaInfo->prefetchedBlock == NULL) { + pBlock = downstream->fpSet.getNextFn(downstream); + } else { + pBlock = miaInfo->prefetchedBlock; + miaInfo->groupId = pBlock->info.groupId; + } - if (pBlock == NULL) { - break; - } + if (pBlock == NULL) { + miaInfo->inputBlocksFinished = true; + break; + } - if (!miaInfo->hasGroupId) { - miaInfo->hasGroupId = true; - miaInfo->groupId = pBlock->info.groupId; - } else if (miaInfo->groupId != pBlock->info.groupId) { - miaInfo->prefetchedBlock = pBlock; - break; - } + if (!miaInfo->hasGroupId) { + miaInfo->hasGroupId = true; + miaInfo->groupId = pBlock->info.groupId; + } else if (miaInfo->groupId != pBlock->info.groupId) { + miaInfo->prefetchedBlock = pBlock; + break; + } - getTableScanInfo(pOperator, &iaInfo->order, &scanFlag); - setInputDataBlock(pOperator, iaInfo->binfo.pCtx, pBlock, iaInfo->order, scanFlag, true); - STableQueryInfo* pTableQueryInfo = iaInfo->pCurrent; + getTableScanInfo(pOperator, &iaInfo->order, &scanFlag); + setInputDataBlock(pOperator, iaInfo->binfo.pCtx, pBlock, iaInfo->order, scanFlag, true); + STableQueryInfo* pTableQueryInfo = iaInfo->pCurrent; - setIntervalQueryRange(pTableQueryInfo, pBlock->info.window.skey, &pTaskInfo->window); - doMergeIntervalAggImpl(pOperator, &iaInfo->binfo.resultRowInfo, pBlock, scanFlag, pRes); + setIntervalQueryRange(pTableQueryInfo, pBlock->info.window.skey, &pTaskInfo->window); + doMergeIntervalAggImpl(pOperator, &iaInfo->binfo.resultRowInfo, pBlock, scanFlag, pRes); - if (pRes->info.rows >= pOperator->resultInfo.threshold) { - break; + if (pRes->info.rows >= pOperator->resultInfo.threshold) { + break; + } + } + + pRes->info.groupId = miaInfo->groupId; + } else { + void* p = taosHashIterate(miaInfo->groupIntervalHash, NULL); + if (p != NULL) { + size_t len = 0; + uint64_t* pKey = taosHashGetKey(p, &len); + outputPrevIntervalResult(pOperator, *pKey, pRes, NULL); } } - pRes->info.groupId = miaInfo->groupId; if (pRes->info.rows == 0) { doSetOperatorCompleted(pOperator); - } else { - blockDataUpdateTsWindow(pRes, 0); } size_t rows = pRes->info.rows; -- GitLab