From ab2b1bbf471bf174a816a50377e9e15ca1e4eddc Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao@163.com> Date: Tue, 12 Jul 2022 15:36:48 +0800 Subject: [PATCH] feat(stream): sliding window --- include/common/tcommon.h | 1 + source/libs/executor/inc/executorimpl.h | 1 + source/libs/executor/src/executil.c | 1 + source/libs/executor/src/scanoperator.c | 36 +++++++++++++-- source/libs/executor/src/timewindowoperator.c | 45 ++++++++++++------- 5 files changed, 65 insertions(+), 19 deletions(-) diff --git a/include/common/tcommon.h b/include/common/tcommon.h index 3d15e8b087..d8264ac5b5 100644 --- a/include/common/tcommon.h +++ b/include/common/tcommon.h @@ -108,6 +108,7 @@ typedef struct SDataBlockInfo { // TODO: optimize and remove following int32_t childId; // used for stream, do not serialize EStreamType type; // used for stream, do not serialize + STimeWindow calWin; // used for stream, do not serialize } SDataBlockInfo; typedef struct SSDataBlock { diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 69ba88916a..aab2f51421 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -389,6 +389,7 @@ typedef struct SStreamScanInfo { SSDataBlock* pPullDataRes; // pull data SSDataBlock SSDataBlock* pDeleteDataRes; // delete data SSDataBlock int32_t deleteDataIndex; + STimeWindow updateWin; // status for tmq // SSchemaWrapper schema; diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index 2da8811e5e..2469062d09 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -191,6 +191,7 @@ SSDataBlock* createResDataBlock(SDataBlockDescNode* pNode) { pBlock->info.blockId = pNode->dataBlockId; pBlock->info.type = STREAM_INVALID; + pBlock->info.calWin = (STimeWindow){.skey = INT64_MIN, .ekey = INT64_MAX}; for (int32_t i = 0; i < numOfCols; ++i) { SSlotDescNode* pDescNode = (SSlotDescNode*)nodesListGetNode(pNode->pSlots, i); diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index f3c240b6f2..c15f044932 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -884,6 +884,28 @@ static bool prepareRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pBlock, int32_ return true; } +static STimeWindow getSlidingWindow(TSKEY* tsCol, SInterval* pInterval, SDataBlockInfo* pDataBlockInfo, int32_t* pRowIndex) { + SResultRowInfo dumyInfo; + dumyInfo.cur.pageId = -1; + STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, tsCol[*pRowIndex], pInterval, + TSDB_ORDER_ASC); + STimeWindow endWin = win; + STimeWindow preWin = win; + while (1) { + (*pRowIndex) += getNumOfRowsInTimeWindow(pDataBlockInfo, tsCol, *pRowIndex, endWin.ekey, + binarySearchForKey, NULL, TSDB_ORDER_ASC); + do { + preWin = endWin; + getNextTimeWindow(pInterval, &endWin, TSDB_ORDER_ASC); + } while (tsCol[(*pRowIndex) - 1] >= endWin.skey); + endWin = preWin; + if (win.ekey == endWin.ekey || (*pRowIndex) == pDataBlockInfo->rows ) { + win.ekey = endWin.ekey; + return win; + } + win.ekey = endWin.ekey; + } +} static bool prepareDataScan(SStreamScanInfo* pInfo, SSDataBlock* pSDB, int32_t tsColIndex, int32_t* pRowIndex) { STimeWindow win = { .skey = INT64_MIN, @@ -905,10 +927,13 @@ static bool prepareDataScan(SStreamScanInfo* pInfo, SSDataBlock* pSDB, int32_t t setGroupId(pInfo, pSDB, GROUPID_COLUMN_INDEX, *pRowIndex); (*pRowIndex) += updateSessionWindowInfo(pCurWin, tsCols, NULL, pSDB->info.rows, *pRowIndex, gap, NULL); } else { - win = getActiveTimeWindow(NULL, &dumyInfo, tsCols[*pRowIndex], &pInfo->interval, TSDB_ORDER_ASC); setGroupId(pInfo, pSDB, GROUPID_COLUMN_INDEX, *pRowIndex); - (*pRowIndex) += - getNumOfRowsInTimeWindow(&pSDB->info, tsCols, *pRowIndex, win.ekey, binarySearchForKey, NULL, TSDB_ORDER_ASC); + pInfo->updateWin.skey = tsCols[*pRowIndex]; + win = getSlidingWindow(tsCols, &pInfo->interval, &pSDB->info, pRowIndex); + pInfo->updateWin.ekey = tsCols[*pRowIndex - 1]; + // win = getActiveTimeWindow(NULL, &dumyInfo, tsCols[*pRowIndex], &pInfo->interval, TSDB_ORDER_ASC); + // (*pRowIndex) += + // getNumOfRowsInTimeWindow(&pSDB->info, tsCols, *pRowIndex, win.ekey, binarySearchForKey, NULL, TSDB_ORDER_ASC); } needRead = true; } else if (isStateWindow(pInfo)) { @@ -974,10 +999,12 @@ static SSDataBlock* doDataScan(SStreamScanInfo* pInfo, SSDataBlock* pSDB, int32_ } } if (!pResult) { + pInfo->updateWin = (STimeWindow){.skey = INT64_MIN, .ekey = INT64_MAX}; return NULL; } if (pResult->info.groupId == pInfo->groupId) { + pResult->info.calWin = pInfo->updateWin; return pResult; } } @@ -1256,6 +1283,8 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { int32_t current = pInfo->validBlockIndex++; SSDataBlock* pBlock = taosArrayGetP(pInfo->pBlockLists, current); // TODO move into scan + pBlock->info.calWin.skey = INT64_MIN; + pBlock->info.calWin.ekey = INT64_MAX; blockDataUpdateTsWindow(pBlock, 0); switch (pBlock->info.type) { case STREAM_RETRIEVE: { @@ -1533,6 +1562,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys pInfo->pStreamScanOp = pOperator; pInfo->deleteDataIndex = 0; pInfo->pDeleteDataRes = createPullDataBlock(); + pInfo->updateWin = (STimeWindow){.skey = INT64_MAX, .ekey = INT64_MAX}; pOperator->name = "StreamScanOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN; diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 78775073a4..943763dadb 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -419,6 +419,14 @@ static bool setTimeWindowInterpolationEndTs(SIntervalAggOperatorInfo* pInfo, SEx return true; } +bool inSlidingWindow(SInterval* pInterval, STimeWindow* pWin, SDataBlockInfo* pBlockInfo) { + if (pInterval->interval != pInterval->sliding && (pWin->ekey < pBlockInfo->calWin.skey || + pWin->skey > pBlockInfo->calWin.ekey) ) { + return false; + } + return true; +} + static int32_t getNextQualifiedWindow(SInterval* pInterval, STimeWindow* pNext, SDataBlockInfo* pDataBlockInfo, TSKEY* primaryKeys, int32_t prevPosition, int32_t order) { bool ascQuery = (order == TSDB_ORDER_ASC); @@ -432,6 +440,10 @@ static int32_t getNextQualifiedWindow(SInterval* pInterval, STimeWindow* pNext, return -1; } + if (!inSlidingWindow(pInterval, pNext, pDataBlockInfo) && order == TSDB_ORDER_ASC) { + return -1; + } + TSKEY skey = ascQuery ? pNext->skey : pNext->ekey; int32_t startPos = 0; @@ -801,7 +813,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul STimeWindow win = getActiveTimeWindow(pInfo->aggSup.pResultBuf, pResultRowInfo, ts, &pInfo->interval, pInfo->order); int32_t ret = TSDB_CODE_SUCCESS; - if (!pInfo->ignoreExpiredData || !isCloseWindow(&win, &pInfo->twAggSup)) { + if ((!pInfo->ignoreExpiredData || !isCloseWindow(&win, &pInfo->twAggSup)) && inSlidingWindow(&pInfo->interval, &win, &pBlock->info)) { ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo); if (ret != TSDB_CODE_SUCCESS || pResult == NULL) { @@ -834,7 +846,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul doWindowBorderInterpolation(pInfo, pBlock, pResult, &win, startPos, forwardRows, pSup); } - if (!pInfo->ignoreExpiredData || !isCloseWindow(&win, &pInfo->twAggSup)) { + if ((!pInfo->ignoreExpiredData || !isCloseWindow(&win, &pInfo->twAggSup)) && inSlidingWindow(&pInfo->interval, &win, &pBlock->info)) { updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &win, true); doApplyFunctions(pTaskInfo, pSup->pCtx, &win, &pInfo->twAggSup.timeWindowData, startPos, forwardRows, tsCols, pBlock->info.rows, numOfOutput, pInfo->order); @@ -916,7 +928,7 @@ int64_t* extractTsCol(SSDataBlock* pBlock, const SIntervalAggOperatorInfo* pInfo tsCols = (int64_t*)pColDataInfo->pData; if (tsCols != NULL) { - blockDataUpdateTsWindow(pBlock, pInfo->primaryTsIndex); + blockDataUpdateTsWindow(pBlock, pInfo->primaryTsIndex); } } @@ -1279,17 +1291,23 @@ static void doClearWindows(SAggSupporter* pAggSup, SExprSupp* pSup1, SInterval* pGpDatas = (uint64_t*)pGpCol->pData; } int32_t step = 0; - for (int32_t i = 0; i < pBlock->info.rows; i += step) { - SResultRowInfo dumyInfo; - dumyInfo.cur.pageId = -1; - STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, tsCols[i], pInterval, TSDB_ORDER_ASC); - step = getNumOfRowsInTimeWindow(&pBlock->info, tsCols, i, win.ekey, binarySearchForKey, NULL, TSDB_ORDER_ASC); - uint64_t winGpId = pGpDatas ? pGpDatas[i] : pBlock->info.groupId; + int32_t startPos = 0; + SResultRowInfo dumyInfo; + dumyInfo.cur.pageId = -1; + STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, tsCols[0], pInterval, TSDB_ORDER_ASC); + while (1) { + step = getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, win.ekey, binarySearchForKey, NULL, TSDB_ORDER_ASC); + uint64_t winGpId = pGpDatas ? pGpDatas[startPos] : pBlock->info.groupId; bool res = doClearWindow(pAggSup, pSup1, (char*)&win.skey, sizeof(TKEY), winGpId, numOfOutput); if (pUpWins && res) { SWinRes winRes = {.ts = win.skey, .groupId = winGpId}; taosArrayPush(pUpWins, &winRes); } + int32_t prevEndPos = step - 1 + startPos; + startPos = getNextQualifiedWindow(pInterval, &win, &pBlock->info, tsCols, prevEndPos, TSDB_ORDER_ASC); + if (startPos < 0) { + break; + } } } @@ -2434,7 +2452,7 @@ static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBloc } while (1) { bool isClosed = isCloseWindow(&nextWin, &pInfo->twAggSup); - if (pInfo->ignoreExpiredData && isClosed) { + if ((pInfo->ignoreExpiredData && isClosed) || !inSlidingWindow(&pInfo->interval, &nextWin, &pSDataBlock->info)) { startPos = getNexWindowPos(&pInfo->interval, &pSDataBlock->info, tsCols, startPos, nextWin.ekey, &nextWin); if (startPos < 0) { break; @@ -3101,12 +3119,7 @@ int64_t getSessionWindowEndkey(void* data, int32_t index) { } bool isInTimeWindow(STimeWindow* pWin, TSKEY ts, int64_t gap) { - int64_t sGap = ts - pWin->skey + gap; - int64_t eGap = pWin->ekey - ts + gap; - // if ((sGap < 0 && sGap >= -gap) || (eGap < 0 && eGap >= -gap) || (sGap >= 0 && eGap >= 0)) { - // return true; - // } - if (sGap >= 0 && eGap >= 0) { + if (ts + gap >= pWin->skey && ts - gap <= pWin->ekey) { return true; } return false; -- GitLab