From c3d702fe7cca73060590f6344481ad9cda855cef Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 10 Jul 2022 09:40:35 +0800 Subject: [PATCH] refactor: do some internal refactor. --- source/libs/executor/inc/executorimpl.h | 2 +- source/libs/executor/src/scanoperator.c | 2 +- source/libs/executor/src/timewindowoperator.c | 132 ++++++------------ 3 files changed, 45 insertions(+), 91 deletions(-) diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 82387119b6..72eddcfc51 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -902,7 +902,7 @@ int32_t aggDecodeResultRow(SOperatorInfo* pOperator, char* result); int32_t aggEncodeResultRow(SOperatorInfo* pOperator, char** result, int32_t* length); STimeWindow getActiveTimeWindow(SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowInfo, int64_t ts, SInterval* pInterval, - int32_t precision, int32_t order); + int32_t order); int32_t getNumOfRowsInTimeWindow(SDataBlockInfo* pDataBlockInfo, TSKEY* pPrimaryColumn, int32_t startPos, TSKEY ekey, __block_search_fn_t searchFn, STableQueryInfo* item, int32_t order); int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order); diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index cdac46c948..471d510385 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -938,7 +938,7 @@ static bool prepareDataScan(SStreamScanInfo* pInfo, SSDataBlock* pSDB, int32_t t setGroupId(pInfo, pSDB, GROUPID_COLUMN_INDEX, *pRowIndex); (*pRowIndex) += updateSessionWindowInfo(pCurWin, tsCols, NULL, pSDB->info.rows, *pRowIndex, gap, NULL); } else { - win = getActiveTimeWindow(NULL, &dumyInfo, tsCols[*pRowIndex], &pInfo->interval, pInfo->interval.precision, TSDB_ORDER_ASC); + win = getActiveTimeWindow(NULL, &dumyInfo, tsCols[*pRowIndex], &pInfo->interval, TSDB_ORDER_ASC); setGroupId(pInfo, pSDB, GROUPID_COLUMN_INDEX, *pRowIndex); (*pRowIndex) += getNumOfRowsInTimeWindow(&pSDB->info, tsCols, *pRowIndex, win.ekey, binarySearchForKey, NULL, TSDB_ORDER_ASC); diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 991bbe525d..987882706a 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -93,99 +93,55 @@ static STimeWindow getFirstQualifiedTimeWindow(int64_t ts, STimeWindow* pWindow, return save; } +static STimeWindow doCalculateTimeWindow(int64_t ts, SInterval* pInterval) { + STimeWindow w = {0}; + + if (pInterval->intervalUnit == 'n' || pInterval->intervalUnit == 'y') { + w.skey = taosTimeTruncate(ts, pInterval, pInterval->precision); + w.ekey = taosTimeAdd(w.skey, pInterval->interval, pInterval->intervalUnit, pInterval->precision) - 1; + } else { + int64_t st = w.skey; + + if (st > ts) { + st -= ((st - ts + pInterval->sliding - 1) / pInterval->sliding) * pInterval->sliding; + } + + int64_t et = st + pInterval->interval - 1; + if (et < ts) { + st += ((ts - et + pInterval->sliding - 1) / pInterval->sliding) * pInterval->sliding; + } + + w.skey = st; + w.ekey = taosTimeAdd(w.skey, pInterval->interval, pInterval->intervalUnit, pInterval->precision) - 1; + } + + return w; +} + // todo do refactor // get the correct time window according to the handled timestamp STimeWindow getActiveTimeWindow(SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowInfo, int64_t ts, SInterval* pInterval, - int32_t precision, int32_t order) { + int32_t order) { STimeWindow w = {0}; - if (pResultRowInfo->cur.pageId == -1) { // the first window, from the previous stored value - getInitialStartTimeWindow(pInterval, precision, ts, &w, true); - w.ekey = taosTimeAdd(w.skey, pInterval->interval, pInterval->intervalUnit, precision) - 1; + getInitialStartTimeWindow(pInterval, pInterval->precision, ts, &w, (order == TSDB_ORDER_ASC)); + w.ekey = taosTimeAdd(w.skey, pInterval->interval, pInterval->intervalUnit, pInterval->precision) - 1; return w; } w = getResultRowByPos(pBuf, &pResultRowInfo->cur)->win; - if (pInterval->interval == pInterval->sliding) { - if (w.skey > ts || w.ekey < ts) { - if (pInterval->intervalUnit == 'n' || pInterval->intervalUnit == 'y') { - w.skey = taosTimeTruncate(ts, pInterval, precision); - w.ekey = taosTimeAdd(w.skey, pInterval->interval, pInterval->intervalUnit, precision) - 1; - } else { - int64_t st = w.skey; - - if (st > ts) { - st -= ((st - ts + pInterval->sliding - 1) / pInterval->sliding) * pInterval->sliding; - } - - int64_t et = st + pInterval->interval - 1; - if (et < ts) { - st += ((ts - et + pInterval->sliding - 1) / pInterval->sliding) * pInterval->sliding; - } - - w.skey = st; - w.ekey = taosTimeAdd(w.skey, pInterval->interval, pInterval->intervalUnit, precision) - 1; - } - } - } else { // it is an sliding window query, in which sliding value is not equalled to - // interval value, and we need to find the first qualified time window for asc/desc traverse respectively. - if (order == TSDB_ORDER_ASC) { - if (w.skey <= ts && w.ekey >= ts) { - // ts is resident in current time window, but we need to find the first - //qualified time window that cover this timestamp - w = getFirstQualifiedTimeWindow(ts, &w, pInterval, order); - } else { - // todo refactor: - if (pInterval->intervalUnit == 'n' || pInterval->intervalUnit == 'y') { - w.skey = taosTimeTruncate(ts, pInterval, precision); - w.ekey = taosTimeAdd(w.skey, pInterval->interval, pInterval->intervalUnit, precision) - 1; - } else { - int64_t st = w.skey; - - if (st > ts) { - st -= ((st - ts + pInterval->sliding - 1) / pInterval->sliding) * pInterval->sliding; - } - - int64_t et = st + pInterval->interval - 1; - if (et < ts) { - st += ((ts - et + pInterval->sliding - 1) / pInterval->sliding) * pInterval->sliding; - } - - w.skey = st; - w.ekey = taosTimeAdd(w.skey, pInterval->interval, pInterval->intervalUnit, precision) - 1; - - w = getFirstQualifiedTimeWindow(ts, &w, pInterval, order); - } - } - } else { - if (w.skey <= ts && w.ekey >= ts) { - w = getFirstQualifiedTimeWindow(ts, &w, pInterval, order); - } else { - // todo refactor: - if (pInterval->intervalUnit == 'n' || pInterval->intervalUnit == 'y') { - w.skey = taosTimeTruncate(ts, pInterval, precision); - w.ekey = taosTimeAdd(w.skey, pInterval->interval, pInterval->intervalUnit, precision) - 1; - } else { - int64_t st = w.skey; - - if (st > ts) { - st -= ((st - ts + pInterval->sliding - 1) / pInterval->sliding) * pInterval->sliding; - } - - int64_t et = st + pInterval->interval - 1; - if (et < ts) { - st += ((ts - et + pInterval->sliding - 1) / pInterval->sliding) * pInterval->sliding; - } - - w.skey = st; - w.ekey = taosTimeAdd(w.skey, pInterval->interval, pInterval->intervalUnit, precision) - 1; + // in case of typical time window, we can calculate time window directly. + if (w.skey > ts || w.ekey < ts) { + w = doCalculateTimeWindow(ts, pInterval); + } - w = getFirstQualifiedTimeWindow(ts, &w, pInterval, order); - } - } - } + if (pInterval->interval != pInterval->sliding) { + // it is an sliding window query, in which sliding value is not equalled to + // interval value, and we need to find the first qualified time window. + w = getFirstQualifiedTimeWindow(ts, &w, pInterval, order); } + return w; } @@ -930,8 +886,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul TSKEY ts = getStartTsKey(&pBlock->info.window, tsCols); SResultRow* pResult = NULL; - STimeWindow win = getActiveTimeWindow(pInfo->aggSup.pResultBuf, pResultRowInfo, ts, &pInfo->interval, - pInfo->interval.precision, pInfo->order); + STimeWindow win = getActiveTimeWindow(pInfo->aggSup.pResultBuf, pResultRowInfo, ts, &pInfo->interval, pInfo->order); int32_t ret = TSDB_CODE_SUCCESS; if (!pInfo->ignoreExpiredData || !isCloseWindow(&win, &pInfo->twAggSup)) { ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pSup->pCtx, @@ -1390,7 +1345,7 @@ void doDeleteSpecifyIntervalWindow(SAggSupporter* pAggSup, SSDataBlock* pBlock, for (int32_t i = 0; i < pBlock->info.rows; i++) { SResultRowInfo dumyInfo; dumyInfo.cur.pageId = -1; - STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, tsStarts[i], pInterval, pInterval->precision, TSDB_ORDER_ASC); + STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, tsStarts[i], pInterval, TSDB_ORDER_ASC); doDeleteIntervalWindow(pAggSup, win.skey, groupIds[i]); if (pUpWins) { SWinRes winRes = {.ts = win.skey, .groupId = groupIds[i]}; @@ -1412,7 +1367,7 @@ static void doClearWindows(SAggSupporter* pAggSup, SExprSupp* pSup1, SInterval* for (int32_t i = 0; i < pBlock->info.rows; i += step) { SResultRowInfo dumyInfo; dumyInfo.cur.pageId = -1; - STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, tsCols[i], pInterval, pInterval->precision, TSDB_ORDER_ASC); + STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, tsCols[i], pInterval, TSDB_ORDER_ASC); step = getNumOfRowsInTimeWindow(&pBlock->info, tsCols, i, win.ekey, binarySearchForKey, NULL, TSDB_ORDER_ASC); uint64_t winGpId = pGpDatas ? pGpDatas[i] : pBlock->info.groupId; bool res = doClearWindow(pAggSup, pSup1, (char*)&win.skey, sizeof(TKEY), winGpId, numOfOutput); @@ -1452,7 +1407,7 @@ static int32_t closeIntervalWindow(SHashObj* pHashMap, STimeWindowAggSupp* pSup, TSKEY ts = *(int64_t*)((char*)key + sizeof(uint64_t)); SResultRowInfo dumyInfo; dumyInfo.cur.pageId = -1; - STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, ts, pInterval, pInterval->precision, TSDB_ORDER_ASC); + STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, ts, pInterval, TSDB_ORDER_ASC); SWinRes winRe = { .ts = win.skey, .groupId = groupId, @@ -2547,8 +2502,7 @@ static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBloc int32_t startPos = ascScan ? 0 : (pSDataBlock->info.rows - 1); TSKEY ts = getStartTsKey(&pSDataBlock->info.window, tsCols); - STimeWindow nextWin = getActiveTimeWindow(pInfo->aggSup.pResultBuf, pResultRowInfo, ts, &pInfo->interval, - pInfo->interval.precision, pInfo->order); + STimeWindow nextWin = getActiveTimeWindow(pInfo->aggSup.pResultBuf, pResultRowInfo, ts, &pInfo->interval, pInfo->order); while (1) { bool isClosed = isCloseWindow(&nextWin, &pInfo->twAggSup); if (pInfo->ignoreExpiredData && isClosed) { @@ -4751,7 +4705,7 @@ static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo* SResultRow* pResult = NULL; STimeWindow win = getActiveTimeWindow(iaInfo->aggSup.pResultBuf, pResultRowInfo, blockStartTs, &iaInfo->interval, - iaInfo->interval.precision, iaInfo->order); + iaInfo->order); int32_t ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pExprSup->pCtx, -- GitLab