From 009777a727564a98bbc1e3db736508e0186ac2fe Mon Sep 17 00:00:00 2001 From: slzhou Date: Thu, 11 Aug 2022 16:23:53 +0800 Subject: [PATCH] fix: add multi-timeline support for session and state --- source/libs/executor/inc/executorimpl.h | 1 + source/libs/executor/src/timewindowoperator.c | 35 ++++++++++--------- 2 files changed, 20 insertions(+), 16 deletions(-) diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index d460644633..9ea1d534cc 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -682,6 +682,7 @@ typedef struct SWindowRowsSup { TSKEY prevTs; int32_t startRowIndex; int32_t numOfRows; + uint64_t groupId; } SWindowRowsSup; typedef struct SSessionAggOperatorInfo { diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 610303dc50..029bbf3d8f 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -90,16 +90,18 @@ static void updateTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pWin, b ts[4] = pWin->ekey + delta; // window end key } -static void doKeepTuple(SWindowRowsSup* pRowSup, int64_t ts) { +static void doKeepTuple(SWindowRowsSup* pRowSup, int64_t ts, uint64_t groupId) { pRowSup->win.ekey = ts; pRowSup->prevTs = ts; pRowSup->numOfRows += 1; + pRowSup->groupId = groupId; } -static void doKeepNewWindowStartInfo(SWindowRowsSup* pRowSup, const int64_t* tsList, int32_t rowIndex) { +static void doKeepNewWindowStartInfo(SWindowRowsSup* pRowSup, const int64_t* tsList, int32_t rowIndex, uint64_t groupId) { pRowSup->startRowIndex = rowIndex; pRowSup->numOfRows = 0; pRowSup->win.skey = tsList[rowIndex]; + pRowSup->groupId = groupId; } static FORCE_INLINE int32_t getForwardStepsInBlock(int32_t numOfRows, __block_search_fn_t searchFn, TSKEY ekey, @@ -1142,7 +1144,7 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI char* val = colDataGetData(pStateColInfoData, j); - if (!pInfo->hasKey) { + if (gid != pRowSup->groupId || !pInfo->hasKey) { // todo extract method if (IS_VAR_DATA_TYPE(pInfo->stateKey.type)) { varDataCopy(pInfo->stateKey.pData, val); @@ -1152,10 +1154,10 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI pInfo->hasKey = true; - doKeepNewWindowStartInfo(pRowSup, tsList, j); - doKeepTuple(pRowSup, tsList[j]); + doKeepNewWindowStartInfo(pRowSup, tsList, j, gid); + doKeepTuple(pRowSup, tsList[j], gid); } else if (compareVal(val, &pInfo->stateKey)) { - doKeepTuple(pRowSup, tsList[j]); + doKeepTuple(pRowSup, tsList[j], gid); if (j == 0 && pRowSup->startRowIndex != 0) { pRowSup->startRowIndex = 0; } @@ -1177,8 +1179,8 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI pRowSup->numOfRows, NULL, pBlock->info.rows, numOfOutput, TSDB_ORDER_ASC); // here we start a new session window - doKeepNewWindowStartInfo(pRowSup, tsList, j); - doKeepTuple(pRowSup, tsList[j]); + doKeepNewWindowStartInfo(pRowSup, tsList, j, gid); + doKeepTuple(pRowSup, tsList[j], gid); // todo extract method if (IS_VAR_DATA_TYPE(pInfo->stateKey.type)) { @@ -1911,7 +1913,7 @@ _error: return NULL; } -// todo handle multiple tables cases. +// todo handle multiple timeline cases. assume no timeline interweaving static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperatorInfo* pInfo, SSDataBlock* pBlock) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExprSupp* pSup = &pOperator->exprSupp; @@ -1935,12 +1937,13 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperator // In case of ascending or descending order scan data, only one time window needs to be kepted for each table. TSKEY* tsList = (TSKEY*)pColInfoData->pData; for (int32_t j = 0; j < pBlock->info.rows; ++j) { - if (pInfo->winSup.prevTs == INT64_MIN) { - doKeepNewWindowStartInfo(pRowSup, tsList, j); - doKeepTuple(pRowSup, tsList[j]); - } else if (tsList[j] - pRowSup->prevTs <= gap && (tsList[j] - pRowSup->prevTs) >= 0) { + if (gid != pRowSup->groupId || pInfo->winSup.prevTs == INT64_MIN) { + doKeepNewWindowStartInfo(pRowSup, tsList, j, gid); + doKeepTuple(pRowSup, tsList[j], gid); + } else if ((tsList[j] - pRowSup->prevTs >= 0) && tsList[j] - pRowSup->prevTs <= gap || + (pRowSup->prevTs - tsList[j] >= 0 ) && (pRowSup->prevTs - tsList[j] <= gap)) { // The gap is less than the threshold, so it belongs to current session window that has been opened already. - doKeepTuple(pRowSup, tsList[j]); + doKeepTuple(pRowSup, tsList[j], gid); if (j == 0 && pRowSup->startRowIndex != 0) { pRowSup->startRowIndex = 0; } @@ -1963,8 +1966,8 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperator pRowSup->numOfRows, NULL, pBlock->info.rows, numOfOutput, TSDB_ORDER_ASC); // here we start a new session window - doKeepNewWindowStartInfo(pRowSup, tsList, j); - doKeepTuple(pRowSup, tsList[j]); + doKeepNewWindowStartInfo(pRowSup, tsList, j, gid); + doKeepTuple(pRowSup, tsList[j], gid); } } -- GitLab