提交 6adc19d4 编写于 作者: S slzhou

fix: overlapping intervals problem

上级 1d7bf59f
...@@ -3900,18 +3900,22 @@ _error: ...@@ -3900,18 +3900,22 @@ _error:
// merge interval operator // merge interval operator
typedef struct SMergeIntervalAggOperatorInfo { typedef struct SMergeIntervalAggOperatorInfo {
SIntervalAggOperatorInfo intervalAggOperatorInfo; SIntervalAggOperatorInfo intervalAggOperatorInfo;
SList* groupIntervals;
SHashObj* groupIntervalHash; SListIter groupIntervalsIter;
void* groupIntervalIter;
bool hasGroupId; bool hasGroupId;
uint64_t groupId; uint64_t groupId;
SSDataBlock* prefetchedBlock; SSDataBlock* prefetchedBlock;
bool inputBlocksFinished; bool inputBlocksFinished;
} SMergeIntervalAggOperatorInfo; } SMergeIntervalAggOperatorInfo;
typedef struct SGroupTimeWindow {
uint64_t groupId;
STimeWindow window;
} SGroupTimeWindow;
void destroyMergeIntervalOperatorInfo(void* param, int32_t numOfOutput) { void destroyMergeIntervalOperatorInfo(void* param, int32_t numOfOutput) {
SMergeIntervalAggOperatorInfo* miaInfo = (SMergeIntervalAggOperatorInfo*)param; SMergeIntervalAggOperatorInfo* miaInfo = (SMergeIntervalAggOperatorInfo*)param;
taosHashCleanup(miaInfo->groupIntervalHash); tdListFree(miaInfo->groupIntervals);
destroyIntervalOperatorInfo(&miaInfo->intervalAggOperatorInfo, numOfOutput); destroyIntervalOperatorInfo(&miaInfo->intervalAggOperatorInfo, numOfOutput);
} }
...@@ -3940,15 +3944,22 @@ static int32_t outputPrevIntervalResult(SOperatorInfo* pOperatorInfo, uint64_t t ...@@ -3940,15 +3944,22 @@ static int32_t outputPrevIntervalResult(SOperatorInfo* pOperatorInfo, uint64_t t
bool ascScan = (iaInfo->order == TSDB_ORDER_ASC); bool ascScan = (iaInfo->order == TSDB_ORDER_ASC);
SExprSupp* pExprSup = &pOperatorInfo->exprSupp; SExprSupp* pExprSup = &pOperatorInfo->exprSupp;
STimeWindow* prevWin = taosHashGet(miaInfo->groupIntervalHash, &tableGroupId, sizeof(tableGroupId)); SGroupTimeWindow groupTimeWindow = {.groupId = tableGroupId, .window = *newWin};
if (prevWin == NULL) { tdListAppend(miaInfo->groupIntervals, &groupTimeWindow);
taosHashPut(miaInfo->groupIntervalHash, &tableGroupId, sizeof(tableGroupId), newWin, sizeof(STimeWindow));
return 0;
}
if ((ascScan && newWin->skey > prevWin->skey || (!ascScan) && newWin->skey < prevWin->skey)) { SListIter iter = {0};
finalizeWindowResult(pOperatorInfo, tableGroupId, prevWin, pResultBlock); tdListInitIter(miaInfo->groupIntervals, &iter, TD_LIST_FORWARD);
taosHashPut(miaInfo->groupIntervalHash, &tableGroupId, sizeof(tableGroupId), newWin, sizeof(STimeWindow)); SListNode* listNode = NULL;
while ((listNode = tdListNext(&iter)) != NULL) {
SGroupTimeWindow* prevGrpWin = (SGroupTimeWindow*)listNode->data;
if (prevGrpWin->groupId != tableGroupId ) {
continue;
}
STimeWindow* prevWin = &prevGrpWin->window;
if ((ascScan && newWin->skey > prevWin->ekey || (!ascScan) && newWin->skey < prevWin->ekey)) {
finalizeWindowResult(pOperatorInfo, tableGroupId, prevWin, pResultBlock);
tdListPopNode(miaInfo->groupIntervals, listNode);
}
} }
return 0; return 0;
...@@ -4075,6 +4086,7 @@ static SSDataBlock* doMergeIntervalAgg(SOperatorInfo* pOperator) { ...@@ -4075,6 +4086,7 @@ static SSDataBlock* doMergeIntervalAgg(SOperatorInfo* pOperator) {
} }
if (pBlock == NULL) { if (pBlock == NULL) {
tdListInitIter(miaInfo->groupIntervals, &miaInfo->groupIntervalsIter, TD_LIST_FORWARD);
miaInfo->inputBlocksFinished = true; miaInfo->inputBlocksFinished = true;
break; break;
} }
...@@ -4100,14 +4112,12 @@ static SSDataBlock* doMergeIntervalAgg(SOperatorInfo* pOperator) { ...@@ -4100,14 +4112,12 @@ static SSDataBlock* doMergeIntervalAgg(SOperatorInfo* pOperator) {
} }
if (miaInfo->inputBlocksFinished) { if (miaInfo->inputBlocksFinished) {
void* win = taosHashIterate(miaInfo->groupIntervalHash, miaInfo->groupIntervalIter); SListNode* listNode = tdListNext(&miaInfo->groupIntervalsIter);
if (win != NULL) {
miaInfo->groupIntervalIter = win;
size_t len = 0; if (listNode != NULL) {
uint64_t* pTableGroupId = taosHashGetKey(win, &len); SGroupTimeWindow* grpWin = (SGroupTimeWindow*)(listNode->data);
finalizeWindowResult(pOperator, *pTableGroupId, win, pRes); finalizeWindowResult(pOperator, grpWin->groupId, &grpWin->window, pRes);
pRes->info.groupId = *pTableGroupId; pRes->info.groupId = grpWin->groupId;
} }
} }
...@@ -4129,8 +4139,7 @@ SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SExprI ...@@ -4129,8 +4139,7 @@ SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SExprI
goto _error; goto _error;
} }
miaInfo->groupIntervalHash = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), true, HASH_NO_LOCK); miaInfo->groupIntervals = tdListNew(sizeof(SGroupTimeWindow));
miaInfo->groupIntervalIter = NULL;
SIntervalAggOperatorInfo* iaInfo = &miaInfo->intervalAggOperatorInfo; SIntervalAggOperatorInfo* iaInfo = &miaInfo->intervalAggOperatorInfo;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册