From 231b0c6cb56b4cfd98511fc60fcaabfa5f2c1d06 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 3 May 2022 15:33:24 +0800 Subject: [PATCH] fix(query): set the correct primary timestamp column for state window operator. --- source/libs/executor/inc/executorimpl.h | 3 ++- source/libs/executor/src/executorimpl.c | 11 +++++---- source/libs/executor/src/timewindowoperator.c | 23 +++++++++---------- 3 files changed, 19 insertions(+), 18 deletions(-) diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 2f8aefcac0..6305022b27 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -551,6 +551,7 @@ typedef struct SStateWindowOperatorInfo { int32_t colIndex; // start row index bool hasKey; SStateKeys stateKey; + int32_t tsSlotId; // primary timestamp column slot id STimeWindowAggSupp twAggSup; // bool reptScan; } SStateWindowOperatorInfo; @@ -688,7 +689,7 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExp SInterval* pInterval, STimeWindow* pWindow, SSDataBlock* pResBlock, int32_t fillType, SNodeListNode* fillVal, bool multigroupResult, SExecTaskInfo* pTaskInfo); SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, - SSDataBlock* pResBlock, STimeWindowAggSupp *pTwAggSupp, SExecTaskInfo* pTaskInfo); + SSDataBlock* pResBlock, STimeWindowAggSupp *pTwAggSupp, int32_t tsSlotId, SExecTaskInfo* pTaskInfo); SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SArray* pGroupColList, SExecTaskInfo* pTaskInfo, diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index fc23722687..98804d4f1a 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -4977,13 +4977,12 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo } else if (QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW == type) { SSessionWinodwPhysiNode* pSessionNode = (SSessionWinodwPhysiNode*)pPhyNode; - STimeWindowAggSupp as = {.waterMark = pSessionNode->window.watermark, - .calTrigger = pSessionNode->window.triggerType}; - - int32_t tsSlotId = ((SColumnNode*)pSessionNode->window.pTspk)->slotId; + STimeWindowAggSupp as = {.waterMark = pSessionNode->window.watermark, .calTrigger = pSessionNode->window.triggerType}; SExprInfo* pExprInfo = createExprInfo(pSessionNode->window.pFuncs, NULL, &num); SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc); + int32_t tsSlotId = ((SColumnNode*)pSessionNode->window.pTspk)->slotId; + pOptr = createSessionAggOperatorInfo(ops[0], pExprInfo, num, pResBlock, pSessionNode->gap, tsSlotId, &as, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_PARTITION == type) { SPartitionPhysiNode* pPartNode = (SPartitionPhysiNode*)pPhyNode; @@ -4999,7 +4998,9 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo SExprInfo* pExprInfo = createExprInfo(pStateNode->window.pFuncs, NULL, &num); SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc); - pOptr = createStatewindowOperatorInfo(ops[0], pExprInfo, num, pResBlock, &as, pTaskInfo); + int32_t tsSlotId = ((SColumnNode*)pStateNode->window.pTspk)->slotId; + + pOptr = createStatewindowOperatorInfo(ops[0], pExprInfo, num, pResBlock, &as, tsSlotId, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_JOIN == type) { SJoinPhysiNode* pJoinNode = (SJoinPhysiNode*)pPhyNode; SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc); diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 04ff1515b0..2bd1cb8902 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -809,7 +809,6 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) { static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorInfo* pInfo, SSDataBlock* pBlock) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - SOptrBasicInfo* pBInfo = &pInfo->binfo; SColumnInfoData* pStateColInfoData = taosArrayGet(pBlock->pDataBlock, pInfo->colIndex); int64_t gid = pBlock->info.groupId; @@ -818,9 +817,9 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI int32_t numOfOutput = pOperator->numOfOutput; int16_t bytes = pStateColInfoData->info.bytes; - int16_t type = pStateColInfoData->info.type; - SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, 0); + // todo set the correct primary timestamp column + SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pInfo->tsSlotId); TSKEY* tsList = (TSKEY*)pColInfoData->pData; SWindowRowsSup* pRowSup = &pInfo->winSup; @@ -1368,7 +1367,7 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SExprInfo* } SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, - SSDataBlock* pResBlock, STimeWindowAggSupp* pTwAggSup, + SSDataBlock* pResBlock, STimeWindowAggSupp* pTwAggSup, int32_t tsSlotId, SExecTaskInfo* pTaskInfo) { SStateWindowOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStateWindowOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); @@ -1383,18 +1382,18 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInf initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExpr, numOfCols, pResBlock, keyBufSize, pTaskInfo->id.str); initResultRowInfo(&pInfo->binfo.resultRowInfo, 8); - pInfo->twAggSup = *pTwAggSup; + pInfo->twAggSup = *pTwAggSup; initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window); - pOperator->name = "StateWindowOperator"; + pInfo->tsSlotId = tsSlotId; + pOperator->name = "StateWindowOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STATE_WINDOW; pOperator->blockingOptr = true; - pOperator->status = OP_NOT_OPENED; - pOperator->pExpr = pExpr; - pOperator->numOfOutput = numOfCols; - - pOperator->pTaskInfo = pTaskInfo; - pOperator->info = pInfo; + pOperator->status = OP_NOT_OPENED; + pOperator->pExpr = pExpr; + pOperator->numOfOutput = numOfCols; + pOperator->pTaskInfo = pTaskInfo; + pOperator->info = pInfo; pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doStateWindowAgg, NULL, NULL, destroyStateWindowOperatorInfo, aggEncodeResultRow, aggDecodeResultRow, NULL); -- GitLab