提交 d1eb14dc 编写于 作者: S slzhou

fix: interval operator _wstart error and interval operator close window for...

fix: interval operator _wstart error and interval operator close window for interpolation on different group id
上级 f3890f74
...@@ -33,11 +33,16 @@ typedef struct SPullWindowInfo { ...@@ -33,11 +33,16 @@ typedef struct SPullWindowInfo {
uint64_t groupId; uint64_t groupId;
} SPullWindowInfo; } SPullWindowInfo;
typedef struct SOpenWindowInfo {
SResultRowPosition pos;
uint64_t groupId;
} SOpenWindowInfo;
static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator); static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator);
static int64_t* extractTsCol(SSDataBlock* pBlock, const SIntervalAggOperatorInfo* pInfo); static int64_t* extractTsCol(SSDataBlock* pBlock, const SIntervalAggOperatorInfo* pInfo);
static SResultRowPosition addToOpenWindowList(SResultRowInfo* pResultRowInfo, const SResultRow* pResult); static SResultRowPosition addToOpenWindowList(SResultRowInfo* pResultRowInfo, const SResultRow* pResult, uint64_t groupId);
static void doCloseWindow(SResultRowInfo* pResultRowInfo, const SIntervalAggOperatorInfo* pInfo, SResultRow* pResult); static void doCloseWindow(SResultRowInfo* pResultRowInfo, const SIntervalAggOperatorInfo* pInfo, SResultRow* pResult);
///* ///*
...@@ -598,14 +603,14 @@ static void doInterpUnclosedTimeWindow(SOperatorInfo* pOperatorInfo, int32_t num ...@@ -598,14 +603,14 @@ static void doInterpUnclosedTimeWindow(SOperatorInfo* pOperatorInfo, int32_t num
int32_t startPos = 0; int32_t startPos = 0;
int32_t numOfOutput = pSup->numOfExprs; int32_t numOfOutput = pSup->numOfExprs;
uint64_t groupId = pBlock->info.groupId;
SResultRow* pResult = NULL; SResultRow* pResult = NULL;
while (1) { while (1) {
SListNode* pn = tdListGetHead(pResultRowInfo->openWindow); SListNode* pn = tdListGetHead(pResultRowInfo->openWindow);
SOpenWindowInfo* pOpenWin = (SOpenWindowInfo *)pn->data;
SResultRowPosition* p1 = (SResultRowPosition*)pn->data; uint64_t groupId = pOpenWin->groupId;
SResultRowPosition* p1 = &pOpenWin->pos;
if (p->pageId == p1->pageId && p->offset == p1->offset) { if (p->pageId == p1->pageId && p->offset == p1->offset) {
break; break;
} }
...@@ -631,12 +636,15 @@ static void doInterpUnclosedTimeWindow(SOperatorInfo* pOperatorInfo, int32_t num ...@@ -631,12 +636,15 @@ static void doInterpUnclosedTimeWindow(SOperatorInfo* pOperatorInfo, int32_t num
SGroupKeys* pTsKey = taosArrayGet(pInfo->pPrevValues, 0); SGroupKeys* pTsKey = taosArrayGet(pInfo->pPrevValues, 0);
int64_t prevTs = *(int64_t*)pTsKey->pData; int64_t prevTs = *(int64_t*)pTsKey->pData;
doTimeWindowInterpolation(pInfo->pPrevValues, pBlock->pDataBlock, prevTs, -1, tsCols[startPos], startPos, w.ekey, if (groupId == pBlock->info.groupId) {
RESULT_ROW_END_INTERP, pSup); doTimeWindowInterpolation(pInfo->pPrevValues, pBlock->pDataBlock, prevTs, -1, tsCols[startPos], startPos, w.ekey,
RESULT_ROW_END_INTERP, pSup);
}
setResultRowInterpo(pResult, RESULT_ROW_END_INTERP); setResultRowInterpo(pResult, RESULT_ROW_END_INTERP);
setNotInterpoWindowKey(pSup->pCtx, numOfExprs, RESULT_ROW_START_INTERP); setNotInterpoWindowKey(pSup->pCtx, numOfExprs, RESULT_ROW_START_INTERP);
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &w, true);
doApplyFunctions(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, 0, pBlock->info.rows, doApplyFunctions(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, 0, pBlock->info.rows,
numOfExprs); numOfExprs);
...@@ -965,7 +973,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul ...@@ -965,7 +973,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
// prev time window not interpolation yet. // prev time window not interpolation yet.
if (pInfo->timeWindowInterpo) { if (pInfo->timeWindowInterpo) {
SResultRowPosition pos = addToOpenWindowList(pResultRowInfo, pResult); SResultRowPosition pos = addToOpenWindowList(pResultRowInfo, pResult, tableGroupId);
doInterpUnclosedTimeWindow(pOperatorInfo, numOfOutput, pResultRowInfo, pBlock, scanFlag, tsCols, &pos); doInterpUnclosedTimeWindow(pOperatorInfo, numOfOutput, pResultRowInfo, pBlock, scanFlag, tsCols, &pos);
// restore current time window // restore current time window
...@@ -1017,9 +1025,13 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul ...@@ -1017,9 +1025,13 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
ekey = ascScan ? nextWin.ekey : nextWin.skey; ekey = ascScan ? nextWin.ekey : nextWin.skey;
forwardRows = forwardRows =
getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, pInfo->inputOrder); getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, pInfo->inputOrder);
if ((ascScan && ekey <= pBlock->info.window.ekey) ||
// window start(end) key interpolation (!ascScan && ekey >= pBlock->info.window.skey)) {
doWindowBorderInterpolation(pInfo, pBlock, pResult, &nextWin, startPos, forwardRows, pSup); // window start(end) key interpolation
doWindowBorderInterpolation(pInfo, pBlock, pResult, &nextWin, startPos, forwardRows, pSup);
} else {
addToOpenWindowList(pResultRowInfo, pResult, tableGroupId);
}
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, true); updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, true);
doApplyFunctions(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows, pBlock->info.rows, doApplyFunctions(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows, pBlock->info.rows,
...@@ -1040,20 +1052,23 @@ void doCloseWindow(SResultRowInfo* pResultRowInfo, const SIntervalAggOperatorInf ...@@ -1040,20 +1052,23 @@ void doCloseWindow(SResultRowInfo* pResultRowInfo, const SIntervalAggOperatorInf
} }
} }
SResultRowPosition addToOpenWindowList(SResultRowInfo* pResultRowInfo, const SResultRow* pResult) { SResultRowPosition addToOpenWindowList(SResultRowInfo* pResultRowInfo, const SResultRow* pResult, uint64_t groupId) {
SResultRowPosition pos = (SResultRowPosition){.pageId = pResult->pageId, .offset = pResult->offset}; SOpenWindowInfo openWin = {0};
openWin.pos.pageId = pResult->pageId;
openWin.pos.offset = pResult->offset;
openWin.groupId = groupId;
SListNode* pn = tdListGetTail(pResultRowInfo->openWindow); SListNode* pn = tdListGetTail(pResultRowInfo->openWindow);
if (pn == NULL) { if (pn == NULL) {
tdListAppend(pResultRowInfo->openWindow, &pos); tdListAppend(pResultRowInfo->openWindow, &openWin);
return pos; return openWin.pos;
} }
SResultRowPosition* px = (SResultRowPosition*)pn->data; SOpenWindowInfo * px = (SOpenWindowInfo *)pn->data;
if (px->pageId != pos.pageId || px->offset != pos.offset) { if (px->pos.pageId != openWin.pos.pageId || px->pos.offset != openWin.pos.offset || px->groupId != openWin.groupId) {
tdListAppend(pResultRowInfo->openWindow, &pos); tdListAppend(pResultRowInfo->openWindow, &openWin);
} }
return pos; return openWin.pos;
} }
int64_t* extractTsCol(SSDataBlock* pBlock, const SIntervalAggOperatorInfo* pInfo) { int64_t* extractTsCol(SSDataBlock* pBlock, const SIntervalAggOperatorInfo* pInfo) {
...@@ -1884,7 +1899,7 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* ...@@ -1884,7 +1899,7 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo*
pInfo->timeWindowInterpo = timeWindowinterpNeeded(pSup->pCtx, numOfCols, pInfo); pInfo->timeWindowInterpo = timeWindowinterpNeeded(pSup->pCtx, numOfCols, pInfo);
if (pInfo->timeWindowInterpo) { if (pInfo->timeWindowInterpo) {
pInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SResultRowPosition)); pInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SOpenWindowInfo));
if (pInfo->binfo.resultRowInfo.openWindow == NULL) { if (pInfo->binfo.resultRowInfo.openWindow == NULL) {
goto _error; goto _error;
} }
...@@ -5157,7 +5172,7 @@ SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, ...@@ -5157,7 +5172,7 @@ SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream,
iaInfo->timeWindowInterpo = timeWindowinterpNeeded(pSup->pCtx, numOfCols, iaInfo); iaInfo->timeWindowInterpo = timeWindowinterpNeeded(pSup->pCtx, numOfCols, iaInfo);
if (iaInfo->timeWindowInterpo) { if (iaInfo->timeWindowInterpo) {
iaInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SResultRowPosition)); iaInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SOpenWindowInfo));
} }
initResultRowInfo(&iaInfo->binfo.resultRowInfo); initResultRowInfo(&iaInfo->binfo.resultRowInfo);
...@@ -5294,7 +5309,7 @@ static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo* ...@@ -5294,7 +5309,7 @@ static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo*
// prev time window not interpolation yet. // prev time window not interpolation yet.
if (iaInfo->timeWindowInterpo) { if (iaInfo->timeWindowInterpo) {
SResultRowPosition pos = addToOpenWindowList(pResultRowInfo, pResult); SResultRowPosition pos = addToOpenWindowList(pResultRowInfo, pResult, tableGroupId);
doInterpUnclosedTimeWindow(pOperatorInfo, numOfOutput, pResultRowInfo, pBlock, scanFlag, tsCols, &pos); doInterpUnclosedTimeWindow(pOperatorInfo, numOfOutput, pResultRowInfo, pBlock, scanFlag, tsCols, &pos);
// restore current time window // restore current time window
...@@ -5462,7 +5477,7 @@ SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SExprI ...@@ -5462,7 +5477,7 @@ SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SExprI
iaInfo->timeWindowInterpo = timeWindowinterpNeeded(pExprSupp->pCtx, numOfCols, iaInfo); iaInfo->timeWindowInterpo = timeWindowinterpNeeded(pExprSupp->pCtx, numOfCols, iaInfo);
if (iaInfo->timeWindowInterpo) { if (iaInfo->timeWindowInterpo) {
iaInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SResultRowPosition)); iaInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SOpenWindowInfo));
if (iaInfo->binfo.resultRowInfo.openWindow == NULL) { if (iaInfo->binfo.resultRowInfo.openWindow == NULL) {
goto _error; goto _error;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册