diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index e88c149b3d8a3f48d233179e041acb935f424bbd..82387119b68fa1128c5350ba1e1e9ad5abf1c529 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -455,7 +455,6 @@ typedef struct SIntervalAggOperatorInfo { int32_t primaryTsIndex; // primary time stamp slot id from result of downstream operator. STimeWindow win; // query time range bool timeWindowInterpo; // interpolation needed or not - char** pRow; // previous row/tuple of already processed datablock SArray* pInterpCols; // interpolation columns int32_t order; // current SSDataBlock scan order EOPTR_EXEC_MODEL execModel; // operator execution model [batch model|stream model] @@ -903,7 +902,7 @@ int32_t aggDecodeResultRow(SOperatorInfo* pOperator, char* result); int32_t aggEncodeResultRow(SOperatorInfo* pOperator, char** result, int32_t* length); STimeWindow getActiveTimeWindow(SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowInfo, int64_t ts, SInterval* pInterval, - int32_t precision, STimeWindow* win); + int32_t precision, int32_t order); int32_t getNumOfRowsInTimeWindow(SDataBlockInfo* pDataBlockInfo, TSKEY* pPrimaryColumn, int32_t startPos, TSKEY ekey, __block_search_fn_t searchFn, STableQueryInfo* item, int32_t order); int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order); diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 360a7b2ed04f17324b467b107b143b4dbc4e2a90..cdac46c94872569d3c0f3b6bc14abaf6507d56da 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -938,7 +938,7 @@ static bool prepareDataScan(SStreamScanInfo* pInfo, SSDataBlock* pSDB, int32_t t setGroupId(pInfo, pSDB, GROUPID_COLUMN_INDEX, *pRowIndex); (*pRowIndex) += updateSessionWindowInfo(pCurWin, tsCols, NULL, pSDB->info.rows, *pRowIndex, gap, NULL); } else { - win = getActiveTimeWindow(NULL, &dumyInfo, tsCols[*pRowIndex], &pInfo->interval, pInfo->interval.precision, NULL); + win = getActiveTimeWindow(NULL, &dumyInfo, tsCols[*pRowIndex], &pInfo->interval, pInfo->interval.precision, TSDB_ORDER_ASC); setGroupId(pInfo, pSDB, GROUPID_COLUMN_INDEX, *pRowIndex); (*pRowIndex) += getNumOfRowsInTimeWindow(&pSDB->info, tsCols, *pRowIndex, win.ekey, binarySearchForKey, NULL, TSDB_ORDER_ASC); diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 91b0e242c0939e89ae9541a0b82686212d3246dc..f9810913e362b8fea75699976685eb6911f825df 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -79,36 +79,111 @@ static void getInitialStartTimeWindow(SInterval* pInterval, int32_t precision, T } } +static STimeWindow getFirstQualifiedTimeWindow(int64_t ts, STimeWindow* pWindow, SInterval* pInterval, int32_t order) { + int32_t factor = (order == TSDB_ORDER_ASC)? -1:1; + + STimeWindow win = *pWindow; + STimeWindow save = win; + while(win.skey <= ts && win.ekey >= ts) { + save = win; + win.skey = taosTimeAdd(win.skey, factor * pInterval->sliding, pInterval->slidingUnit, pInterval->precision); + win.ekey = taosTimeAdd(win.ekey, factor * pInterval->sliding, pInterval->slidingUnit, pInterval->precision); + } + + return save; +} + +// todo do refactor // get the correct time window according to the handled timestamp STimeWindow getActiveTimeWindow(SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowInfo, int64_t ts, SInterval* pInterval, - int32_t precision, STimeWindow* win) { + int32_t precision, int32_t order) { STimeWindow w = {0}; if (pResultRowInfo->cur.pageId == -1) { // the first window, from the previous stored value getInitialStartTimeWindow(pInterval, precision, ts, &w, true); w.ekey = taosTimeAdd(w.skey, pInterval->interval, pInterval->intervalUnit, precision) - 1; - } else { - w = getResultRowByPos(pBuf, &pResultRowInfo->cur)->win; + return w; } - if (w.skey > ts || w.ekey < ts) { - if (pInterval->intervalUnit == 'n' || pInterval->intervalUnit == 'y') { - w.skey = taosTimeTruncate(ts, pInterval, precision); - w.ekey = taosTimeAdd(w.skey, pInterval->interval, pInterval->intervalUnit, precision) - 1; - } else { - int64_t st = w.skey; + w = getResultRowByPos(pBuf, &pResultRowInfo->cur)->win; + + if (pInterval->interval == pInterval->sliding) { + if (w.skey > ts || w.ekey < ts) { + if (pInterval->intervalUnit == 'n' || pInterval->intervalUnit == 'y') { + w.skey = taosTimeTruncate(ts, pInterval, precision); + w.ekey = taosTimeAdd(w.skey, pInterval->interval, pInterval->intervalUnit, precision) - 1; + } else { + int64_t st = w.skey; - if (st > ts) { - st -= ((st - ts + pInterval->sliding - 1) / pInterval->sliding) * pInterval->sliding; + if (st > ts) { + st -= ((st - ts + pInterval->sliding - 1) / pInterval->sliding) * pInterval->sliding; + } + + int64_t et = st + pInterval->interval - 1; + if (et < ts) { + st += ((ts - et + pInterval->sliding - 1) / pInterval->sliding) * pInterval->sliding; + } + + w.skey = st; + w.ekey = taosTimeAdd(w.skey, pInterval->interval, pInterval->intervalUnit, precision) - 1; } + } + } else { // it is an sliding window query, in which sliding value is not equalled to + // interval value, and we need to find the first qualified time window for asc/desc traverse respectively. + if (order == TSDB_ORDER_ASC) { + if (w.skey <= ts && w.ekey >= ts) { + // ts is resident in current time window, but we need to find the first + //qualified time window that cover this timestamp + w = getFirstQualifiedTimeWindow(ts, &w, pInterval, order); + } else { + // todo refactor: + if (pInterval->intervalUnit == 'n' || pInterval->intervalUnit == 'y') { + w.skey = taosTimeTruncate(ts, pInterval, precision); + w.ekey = taosTimeAdd(w.skey, pInterval->interval, pInterval->intervalUnit, precision) - 1; + } else { + int64_t st = w.skey; + + if (st > ts) { + st -= ((st - ts + pInterval->sliding - 1) / pInterval->sliding) * pInterval->sliding; + } - int64_t et = st + pInterval->interval - 1; - if (et < ts) { - st += ((ts - et + pInterval->sliding - 1) / pInterval->sliding) * pInterval->sliding; + int64_t et = st + pInterval->interval - 1; + if (et < ts) { + st += ((ts - et + pInterval->sliding - 1) / pInterval->sliding) * pInterval->sliding; + } + + w.skey = st; + w.ekey = taosTimeAdd(w.skey, pInterval->interval, pInterval->intervalUnit, precision) - 1; + + w = getFirstQualifiedTimeWindow(ts, &w, pInterval, order); + } } + } else { + if (w.skey <= ts && w.ekey >= ts) { + w = getFirstQualifiedTimeWindow(ts, &w, pInterval, order); + } else { + // todo refactor: + if (pInterval->intervalUnit == 'n' || pInterval->intervalUnit == 'y') { + w.skey = taosTimeTruncate(ts, pInterval, precision); + w.ekey = taosTimeAdd(w.skey, pInterval->interval, pInterval->intervalUnit, precision) - 1; + } else { + int64_t st = w.skey; + + if (st > ts) { + st -= ((st - ts + pInterval->sliding - 1) / pInterval->sliding) * pInterval->sliding; + } - w.skey = st; - w.ekey = taosTimeAdd(w.skey, pInterval->interval, pInterval->intervalUnit, precision) - 1; + int64_t et = st + pInterval->interval - 1; + if (et < ts) { + st += ((ts - et + pInterval->sliding - 1) / pInterval->sliding) * pInterval->sliding; + } + + w.skey = st; + w.ekey = taosTimeAdd(w.skey, pInterval->interval, pInterval->intervalUnit, precision) - 1; + + w = getFirstQualifiedTimeWindow(ts, &w, pInterval, order); + } + } } } return w; @@ -856,7 +931,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul SResultRow* pResult = NULL; STimeWindow win = getActiveTimeWindow(pInfo->aggSup.pResultBuf, pResultRowInfo, ts, &pInfo->interval, - pInfo->interval.precision, &pInfo->win); + pInfo->interval.precision, pInfo->order); int32_t ret = TSDB_CODE_SUCCESS; if (!pInfo->ignoreExpiredData || !isCloseWindow(&win, &pInfo->twAggSup)) { ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pSup->pCtx, @@ -1010,21 +1085,6 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) { // the pDataBlock are always the same one, no need to call this again setInputDataBlock(pOperator, pSup->pCtx, pBlock, pInfo->order, scanFlag, true); hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, scanFlag, NULL); - -#if 0 // test for encode/decode result info - if(pOperator->fpSet.encodeResultRow){ - char *result = NULL; - int32_t length = 0; - SAggSupporter *pSup = &pInfo->aggSup; - pOperator->fpSet.encodeResultRow(pOperator, &result, &length); - taosHashClear(pSup->pResultRowHashTable); - pInfo->binfo.resultRowInfo.size = 0; - pOperator->fpSet.decodeResultRow(pOperator, result); - if(result){ - taosMemoryFree(result); - } - } -#endif } closeAllResultRows(&pInfo->binfo.resultRowInfo); @@ -1330,7 +1390,7 @@ void doDeleteSpecifyIntervalWindow(SAggSupporter* pAggSup, SSDataBlock* pBlock, for (int32_t i = 0; i < pBlock->info.rows; i++) { SResultRowInfo dumyInfo; dumyInfo.cur.pageId = -1; - STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, tsStarts[i], pInterval, pInterval->precision, NULL); + STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, tsStarts[i], pInterval, pInterval->precision, TSDB_ORDER_ASC); doDeleteIntervalWindow(pAggSup, win.skey, groupIds[i]); if (pUpWins) { SWinRes winRes = {.ts = win.skey, .groupId = groupIds[i]}; @@ -1352,7 +1412,7 @@ static void doClearWindows(SAggSupporter* pAggSup, SExprSupp* pSup1, SInterval* for (int32_t i = 0; i < pBlock->info.rows; i += step) { SResultRowInfo dumyInfo; dumyInfo.cur.pageId = -1; - STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, tsCols[i], pInterval, pInterval->precision, NULL); + STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, tsCols[i], pInterval, pInterval->precision, TSDB_ORDER_ASC); step = getNumOfRowsInTimeWindow(&pBlock->info, tsCols, i, win.ekey, binarySearchForKey, NULL, TSDB_ORDER_ASC); uint64_t winGpId = pGpDatas ? pGpDatas[i] : pBlock->info.groupId; bool res = doClearWindow(pAggSup, pSup1, (char*)&win.skey, sizeof(TKEY), winGpId, numOfOutput); @@ -1392,7 +1452,7 @@ static int32_t closeIntervalWindow(SHashObj* pHashMap, STimeWindowAggSupp* pSup, TSKEY ts = *(int64_t*)((char*)key + sizeof(uint64_t)); SResultRowInfo dumyInfo; dumyInfo.cur.pageId = -1; - STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, ts, pInterval, pInterval->precision, NULL); + STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, ts, pInterval, pInterval->precision, TSDB_ORDER_ASC); SWinRes winRe = { .ts = win.skey, .groupId = groupId, @@ -2488,7 +2548,7 @@ static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBloc int32_t startPos = ascScan ? 0 : (pSDataBlock->info.rows - 1); TSKEY ts = getStartTsKey(&pSDataBlock->info.window, tsCols); STimeWindow nextWin = getActiveTimeWindow(pInfo->aggSup.pResultBuf, pResultRowInfo, ts, &pInfo->interval, - pInfo->interval.precision, NULL); + pInfo->interval.precision, pInfo->order); while (1) { bool isClosed = isCloseWindow(&nextWin, &pInfo->twAggSup); if (pInfo->ignoreExpiredData && isClosed) { @@ -4454,8 +4514,7 @@ static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultR currTs = tsCols[currPos]; currWin.skey = currTs; currWin.ekey = taosTimeAdd(currWin.skey, iaInfo->interval.interval, iaInfo->interval.intervalUnit, - iaInfo->interval.precision) - - 1; + iaInfo->interval.precision) - 1; startPos = currPos; ret = setTimeWindowOutputBuf(pResultRowInfo, &currWin, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &iaInfo->aggSup, pTaskInfo); @@ -4692,7 +4751,7 @@ static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo* SResultRow* pResult = NULL; STimeWindow win = getActiveTimeWindow(iaInfo->aggSup.pResultBuf, pResultRowInfo, blockStartTs, &iaInfo->interval, - iaInfo->interval.precision, &iaInfo->win); + iaInfo->interval.precision, iaInfo->order); int32_t ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pExprSup->pCtx,