提交 c038848f 编写于 作者: S shenglian zhou

feature: output results in merge-interval agg buf when exhausted input stream

上级 433286c0
...@@ -2002,6 +2002,7 @@ int32_t finalizeResultRowIntoResultDataBlock(SDiskbasedBuf* pBuf, SResultRowPosi ...@@ -2002,6 +2002,7 @@ int32_t finalizeResultRowIntoResultDataBlock(SDiskbasedBuf* pBuf, SResultRowPosi
} }
releaseBufPage(pBuf, page); releaseBufPage(pBuf, page);
pBlock->info.rows += pRow->numOfRows;
return 0; return 0;
} }
......
...@@ -3202,6 +3202,7 @@ typedef struct SMergeIntervalAggOperatorInfo { ...@@ -3202,6 +3202,7 @@ typedef struct SMergeIntervalAggOperatorInfo {
bool hasGroupId; bool hasGroupId;
uint64_t groupId; uint64_t groupId;
SSDataBlock* prefetchedBlock; SSDataBlock* prefetchedBlock;
bool inputBlocksFinished;
} SMergeIntervalAggOperatorInfo; } SMergeIntervalAggOperatorInfo;
void destroyMergeIntervalOperatorInfo(void* param, int32_t numOfOutput) { void destroyMergeIntervalOperatorInfo(void* param, int32_t numOfOutput) {
...@@ -3223,7 +3224,7 @@ static int32_t outputPrevIntervalResult(SOperatorInfo* pOperatorInfo, uint64_t t ...@@ -3223,7 +3224,7 @@ static int32_t outputPrevIntervalResult(SOperatorInfo* pOperatorInfo, uint64_t t
return 0; return 0;
} }
if (ascScan && newWin->skey > prevWin->ekey || (!ascScan) && newWin->skey < prevWin->ekey) { if (newWin == NULL || (ascScan && newWin->skey > prevWin->ekey || (!ascScan) && newWin->skey < prevWin->ekey) ) {
SET_RES_WINDOW_KEY(iaInfo->aggSup.keyBuf, &prevWin->skey, TSDB_KEYSIZE, tableGroupId); SET_RES_WINDOW_KEY(iaInfo->aggSup.keyBuf, &prevWin->skey, TSDB_KEYSIZE, tableGroupId);
SResultRowPosition* p1 = (SResultRowPosition*)taosHashGet(iaInfo->aggSup.pResultRowHashTable, iaInfo->aggSup.keyBuf, SResultRowPosition* p1 = (SResultRowPosition*)taosHashGet(iaInfo->aggSup.pResultRowHashTable, iaInfo->aggSup.keyBuf,
GET_RES_WINDOW_KEY_LEN(TSDB_KEYSIZE)); GET_RES_WINDOW_KEY_LEN(TSDB_KEYSIZE));
...@@ -3233,9 +3234,13 @@ static int32_t outputPrevIntervalResult(SOperatorInfo* pOperatorInfo, uint64_t t ...@@ -3233,9 +3234,13 @@ static int32_t outputPrevIntervalResult(SOperatorInfo* pOperatorInfo, uint64_t t
pOperatorInfo->numOfExprs, iaInfo->binfo.rowCellInfoOffset, pResultBlock, pOperatorInfo->numOfExprs, iaInfo->binfo.rowCellInfoOffset, pResultBlock,
pTaskInfo); pTaskInfo);
taosHashRemove(iaInfo->aggSup.pResultRowHashTable, iaInfo->aggSup.keyBuf, GET_RES_WINDOW_KEY_LEN(TSDB_KEYSIZE)); taosHashRemove(iaInfo->aggSup.pResultRowHashTable, iaInfo->aggSup.keyBuf, GET_RES_WINDOW_KEY_LEN(TSDB_KEYSIZE));
if (newWin == NULL) {
taosHashPut(miaInfo->groupIntervalHash, &tableGroupId, sizeof(tableGroupId), newWin, sizeof(STimeWindow)); taosHashRemove(miaInfo->groupIntervalHash, &tableGroupId, sizeof(tableGroupId));
} else {
taosHashPut(miaInfo->groupIntervalHash, &tableGroupId, sizeof(tableGroupId), newWin, sizeof(STimeWindow));
}
} }
return 0; return 0;
} }
...@@ -3343,47 +3348,57 @@ static SSDataBlock* doMergeIntervalAgg(SOperatorInfo* pOperator) { ...@@ -3343,47 +3348,57 @@ static SSDataBlock* doMergeIntervalAgg(SOperatorInfo* pOperator) {
SSDataBlock* pRes = iaInfo->binfo.pRes; SSDataBlock* pRes = iaInfo->binfo.pRes;
blockDataCleanup(pRes); blockDataCleanup(pRes);
blockDataEnsureCapacity(pRes, pOperator->resultInfo.capacity);
SOperatorInfo* downstream = pOperator->pDownstream[0]; if (!miaInfo->inputBlocksFinished) {
int32_t scanFlag = MAIN_SCAN; SOperatorInfo* downstream = pOperator->pDownstream[0];
while (1) { int32_t scanFlag = MAIN_SCAN;
SSDataBlock* pBlock = NULL; while (1) {
if (miaInfo->prefetchedBlock == NULL) { SSDataBlock* pBlock = NULL;
pBlock = downstream->fpSet.getNextFn(downstream); if (miaInfo->prefetchedBlock == NULL) {
} else { pBlock = downstream->fpSet.getNextFn(downstream);
pBlock = miaInfo->prefetchedBlock; } else {
miaInfo->groupId = pBlock->info.groupId; pBlock = miaInfo->prefetchedBlock;
} miaInfo->groupId = pBlock->info.groupId;
}
if (pBlock == NULL) { if (pBlock == NULL) {
break; miaInfo->inputBlocksFinished = true;
} break;
}
if (!miaInfo->hasGroupId) { if (!miaInfo->hasGroupId) {
miaInfo->hasGroupId = true; miaInfo->hasGroupId = true;
miaInfo->groupId = pBlock->info.groupId; miaInfo->groupId = pBlock->info.groupId;
} else if (miaInfo->groupId != pBlock->info.groupId) { } else if (miaInfo->groupId != pBlock->info.groupId) {
miaInfo->prefetchedBlock = pBlock; miaInfo->prefetchedBlock = pBlock;
break; break;
} }
getTableScanInfo(pOperator, &iaInfo->order, &scanFlag); getTableScanInfo(pOperator, &iaInfo->order, &scanFlag);
setInputDataBlock(pOperator, iaInfo->binfo.pCtx, pBlock, iaInfo->order, scanFlag, true); setInputDataBlock(pOperator, iaInfo->binfo.pCtx, pBlock, iaInfo->order, scanFlag, true);
STableQueryInfo* pTableQueryInfo = iaInfo->pCurrent; STableQueryInfo* pTableQueryInfo = iaInfo->pCurrent;
setIntervalQueryRange(pTableQueryInfo, pBlock->info.window.skey, &pTaskInfo->window); setIntervalQueryRange(pTableQueryInfo, pBlock->info.window.skey, &pTaskInfo->window);
doMergeIntervalAggImpl(pOperator, &iaInfo->binfo.resultRowInfo, pBlock, scanFlag, pRes); doMergeIntervalAggImpl(pOperator, &iaInfo->binfo.resultRowInfo, pBlock, scanFlag, pRes);
if (pRes->info.rows >= pOperator->resultInfo.threshold) { if (pRes->info.rows >= pOperator->resultInfo.threshold) {
break; break;
}
}
pRes->info.groupId = miaInfo->groupId;
} else {
void* p = taosHashIterate(miaInfo->groupIntervalHash, NULL);
if (p != NULL) {
size_t len = 0;
uint64_t* pKey = taosHashGetKey(p, &len);
outputPrevIntervalResult(pOperator, *pKey, pRes, NULL);
} }
} }
pRes->info.groupId = miaInfo->groupId;
if (pRes->info.rows == 0) { if (pRes->info.rows == 0) {
doSetOperatorCompleted(pOperator); doSetOperatorCompleted(pOperator);
} else {
blockDataUpdateTsWindow(pRes, 0);
} }
size_t rows = pRes->info.rows; size_t rows = pRes->info.rows;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册