提交 1d7bf59f 编写于 作者: S slzhou

fix: fix iteration bugs of last window result

上级 2cc9c691
......@@ -3902,6 +3902,7 @@ typedef struct SMergeIntervalAggOperatorInfo {
SIntervalAggOperatorInfo intervalAggOperatorInfo;
SHashObj* groupIntervalHash;
void* groupIntervalIter;
bool hasGroupId;
uint64_t groupId;
SSDataBlock* prefetchedBlock;
......@@ -3914,6 +3915,23 @@ void destroyMergeIntervalOperatorInfo(void* param, int32_t numOfOutput) {
destroyIntervalOperatorInfo(&miaInfo->intervalAggOperatorInfo, numOfOutput);
}
static int32_t finalizeWindowResult(SOperatorInfo* pOperatorInfo, uint64_t tableGroupId, STimeWindow* win, SSDataBlock* pResultBlock) {
SMergeIntervalAggOperatorInfo* miaInfo = pOperatorInfo->info;
SIntervalAggOperatorInfo* iaInfo = &miaInfo->intervalAggOperatorInfo;
SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
bool ascScan = (iaInfo->order == TSDB_ORDER_ASC);
SExprSupp* pExprSup = &pOperatorInfo->exprSupp;
SET_RES_WINDOW_KEY(iaInfo->aggSup.keyBuf, &win->skey, TSDB_KEYSIZE, tableGroupId);
SResultRowPosition* p1 = (SResultRowPosition*)taosHashGet(iaInfo->aggSup.pResultRowHashTable, iaInfo->aggSup.keyBuf,
GET_RES_WINDOW_KEY_LEN(TSDB_KEYSIZE));
ASSERT(p1 != NULL);
finalizeResultRowIntoResultDataBlock(iaInfo->aggSup.pResultBuf, p1, pExprSup->pCtx, pExprSup->pExprInfo,
pExprSup->numOfExprs, pExprSup->rowEntryInfoOffset, pResultBlock, pTaskInfo);
taosHashRemove(iaInfo->aggSup.pResultRowHashTable, iaInfo->aggSup.keyBuf, GET_RES_WINDOW_KEY_LEN(TSDB_KEYSIZE));
return TSDB_CODE_SUCCESS;
}
static int32_t outputPrevIntervalResult(SOperatorInfo* pOperatorInfo, uint64_t tableGroupId, SSDataBlock* pResultBlock,
STimeWindow* newWin) {
SMergeIntervalAggOperatorInfo* miaInfo = pOperatorInfo->info;
......@@ -3928,21 +3946,10 @@ static int32_t outputPrevIntervalResult(SOperatorInfo* pOperatorInfo, uint64_t t
return 0;
}
if (newWin == NULL || (ascScan && newWin->skey > prevWin->skey || (!ascScan) && newWin->skey < prevWin->skey)) {
SET_RES_WINDOW_KEY(iaInfo->aggSup.keyBuf, &prevWin->skey, TSDB_KEYSIZE, tableGroupId);
SResultRowPosition* p1 = (SResultRowPosition*)taosHashGet(iaInfo->aggSup.pResultRowHashTable, iaInfo->aggSup.keyBuf,
GET_RES_WINDOW_KEY_LEN(TSDB_KEYSIZE));
ASSERT(p1 != NULL);
finalizeResultRowIntoResultDataBlock(iaInfo->aggSup.pResultBuf, p1, pExprSup->pCtx, pExprSup->pExprInfo,
pExprSup->numOfExprs, pExprSup->rowEntryInfoOffset, pResultBlock, pTaskInfo);
taosHashRemove(iaInfo->aggSup.pResultRowHashTable, iaInfo->aggSup.keyBuf, GET_RES_WINDOW_KEY_LEN(TSDB_KEYSIZE));
if (newWin == NULL) {
taosHashRemove(miaInfo->groupIntervalHash, &tableGroupId, sizeof(tableGroupId));
} else {
if ((ascScan && newWin->skey > prevWin->skey || (!ascScan) && newWin->skey < prevWin->skey)) {
finalizeWindowResult(pOperatorInfo, tableGroupId, prevWin, pResultBlock);
taosHashPut(miaInfo->groupIntervalHash, &tableGroupId, sizeof(tableGroupId), newWin, sizeof(STimeWindow));
}
}
return 0;
}
......@@ -4090,12 +4097,17 @@ static SSDataBlock* doMergeIntervalAgg(SOperatorInfo* pOperator) {
}
pRes->info.groupId = miaInfo->groupId;
} else {
void* p = taosHashIterate(miaInfo->groupIntervalHash, NULL);
if (p != NULL) {
}
if (miaInfo->inputBlocksFinished) {
void* win = taosHashIterate(miaInfo->groupIntervalHash, miaInfo->groupIntervalIter);
if (win != NULL) {
miaInfo->groupIntervalIter = win;
size_t len = 0;
uint64_t* pKey = taosHashGetKey(p, &len);
outputPrevIntervalResult(pOperator, *pKey, pRes, NULL);
uint64_t* pTableGroupId = taosHashGetKey(win, &len);
finalizeWindowResult(pOperator, *pTableGroupId, win, pRes);
pRes->info.groupId = *pTableGroupId;
}
}
......@@ -4118,6 +4130,7 @@ SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SExprI
}
miaInfo->groupIntervalHash = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), true, HASH_NO_LOCK);
miaInfo->groupIntervalIter = NULL;
SIntervalAggOperatorInfo* iaInfo = &miaInfo->intervalAggOperatorInfo;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册