diff --git a/source/libs/executor/src/eventwindowoperator.c b/source/libs/executor/src/eventwindowoperator.c index 0e3c67c5d7c5ce5d998a252a2141de206a9dc4af..49e2d5bc4ac0c85fabd8ffde93aa527bbe3c85e0 100644 --- a/source/libs/executor/src/eventwindowoperator.c +++ b/source/libs/executor/src/eventwindowoperator.c @@ -26,23 +26,19 @@ typedef struct SEventWindowOperatorInfo { SOptrBasicInfo binfo; SAggSupporter aggSup; SExprSupp scalarSup; - SGroupResInfo groupResInfo; SWindowRowsSup winSup; - bool hasKey; - SStateKeys stateKey; int32_t tsSlotId; // primary timestamp column slot id STimeWindowAggSupp twAggSup; - - SFilterInfo* pStartCondInfo; - SFilterInfo* pEndCondInfo; - bool inWindow; - SResultRow* pRow; + uint64_t groupId; // current group id, used to identify the data block from different groups + SFilterInfo* pStartCondInfo; + SFilterInfo* pEndCondInfo; + bool inWindow; + SResultRow* pRow; } SEventWindowOperatorInfo; static SSDataBlock* eventWindowAggregate(SOperatorInfo* pOperator); static void destroyEWindowOperatorInfo(void* param); -static void eventWindowAggImpl(SOperatorInfo* pOperator, SEventWindowOperatorInfo* pInfo, SSDataBlock* pBlock); -static SSDataBlock* doEventWindowAgg(SOperatorInfo* pOperator); +static int32_t eventWindowAggImpl(SOperatorInfo* pOperator, SEventWindowOperatorInfo* pInfo, SSDataBlock* pBlock); // todo : move to util static void doKeepNewWindowStartInfo(SWindowRowsSup* pRowSup, const int64_t* tsList, int32_t rowIndex, @@ -176,7 +172,6 @@ void destroyEWindowOperatorInfo(void* param) { colDataDestroy(&pInfo->twAggSup.timeWindowData); cleanupAggSup(&pInfo->aggSup); - cleanupGroupResInfo(&pInfo->groupResInfo); taosMemoryFreeClear(param); } @@ -254,36 +249,45 @@ static void doEventWindowAggImpl(SEventWindowOperatorInfo* pInfo, SExprSupp* pSu pBlock->info.rows, numOfOutput); } -void eventWindowAggImpl(SOperatorInfo* pOperator, SEventWindowOperatorInfo* pInfo, SSDataBlock* pBlock) { - SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - SExprSupp* pSup = &pOperator->exprSupp; - - SSDataBlock* pRes = pInfo->binfo.pRes; - int64_t gid = pBlock->info.id.groupId; - +int32_t eventWindowAggImpl(SOperatorInfo* pOperator, SEventWindowOperatorInfo* pInfo, SSDataBlock* pBlock) { + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + SExprSupp* pSup = &pOperator->exprSupp; + SSDataBlock* pRes = pInfo->binfo.pRes; + int64_t gid = pBlock->info.id.groupId; SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pInfo->tsSlotId); TSKEY* tsList = (TSKEY*)pColInfoData->pData; - + SWindowRowsSup* pRowSup = &pInfo->winSup; SColumnInfoData *ps = NULL, *pe = NULL; + int32_t rowIndex = 0; - SWindowRowsSup* pRowSup = &pInfo->winSup; pRowSup->numOfRows = 0; + if (pInfo->groupId == 0) { + pInfo->groupId = gid; + } else if (pInfo->groupId != gid) { + // this is a new group, reset the info + pInfo->inWindow = false; + } SFilterColumnParam param1 = {.numOfCols = taosArrayGetSize(pBlock->pDataBlock), .pDataBlock = pBlock->pDataBlock}; - int32_t code = filterSetDataFromSlotId(pInfo->pStartCondInfo, ¶m1); + + int32_t code = filterSetDataFromSlotId(pInfo->pStartCondInfo, ¶m1); + if (code != TSDB_CODE_SUCCESS) { + return code; + } int32_t status1 = 0; - bool keep1 = filterExecute(pInfo->pStartCondInfo, pBlock, &ps, NULL, param1.numOfCols, &status1); + filterExecute(pInfo->pStartCondInfo, pBlock, &ps, NULL, param1.numOfCols, &status1); SFilterColumnParam param2 = {.numOfCols = taosArrayGetSize(pBlock->pDataBlock), .pDataBlock = pBlock->pDataBlock}; code = filterSetDataFromSlotId(pInfo->pEndCondInfo, ¶m2); + if (code != TSDB_CODE_SUCCESS) { + return code; + } int32_t status2 = 0; - bool keep2 = filterExecute(pInfo->pEndCondInfo, pBlock, &pe, NULL, param2.numOfCols, &status2); + filterExecute(pInfo->pEndCondInfo, pBlock, &pe, NULL, param2.numOfCols, &status2); - int32_t rowIndex = 0; int32_t startIndex = pInfo->inWindow ? 0 : -1; - while (rowIndex < pBlock->info.rows) { if (pInfo->inWindow) { // let's find the first end value for (rowIndex = startIndex; rowIndex < pBlock->info.rows; ++rowIndex) { @@ -294,7 +298,6 @@ void eventWindowAggImpl(SOperatorInfo* pOperator, SEventWindowOperatorInfo* pInf if (rowIndex < pBlock->info.rows) { doEventWindowAggImpl(pInfo, pSup, startIndex, rowIndex, pBlock, tsList, pTaskInfo); - doUpdateNumOfRows(pSup->pCtx, pInfo->pRow, pSup->numOfExprs, pSup->rowEntryInfoOffset); // check buffer size @@ -324,9 +327,9 @@ void eventWindowAggImpl(SOperatorInfo* pOperator, SEventWindowOperatorInfo* pInf } if (pInfo->inWindow) { - continue; + continue; // try to find the end position } else { - break; + break; // no valid start position, quit } } } @@ -335,4 +338,6 @@ void eventWindowAggImpl(SOperatorInfo* pOperator, SEventWindowOperatorInfo* pInf taosMemoryFree(ps); colDataDestroy(pe); taosMemoryFree(pe); + + return TSDB_CODE_SUCCESS; }