From e74c0ac987432bc7b5786df51bce768b70dcaedb Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Fri, 16 Jun 2023 14:11:19 +0800 Subject: [PATCH] save index instead of ts --- source/libs/executor/src/timesliceoperator.c | 29 ++++++++------------ 1 file changed, 12 insertions(+), 17 deletions(-) diff --git a/source/libs/executor/src/timesliceoperator.c b/source/libs/executor/src/timesliceoperator.c index 25cb94f7e1..415fefe75f 100644 --- a/source/libs/executor/src/timesliceoperator.c +++ b/source/libs/executor/src/timesliceoperator.c @@ -45,7 +45,7 @@ typedef struct STimeSliceOperatorInfo { SGroupKeys* pPrevGroupKey; SSDataBlock* pNextGroupRes; SSDataBlock* pRemainRes; // save block unfinished processing - int64_t remainTs; // the remaining timestamp in the block to be processed + int32_t remainIndex; // the remaining index in the block to be processed } STimeSliceOperatorInfo; static void destroyTimeSliceOperatorInfo(void* param); @@ -669,8 +669,13 @@ static void saveBlockStatus(STimeSliceOperatorInfo* pSliceInfo, SSDataBlock* pBl SColumnInfoData* pTsCol = taosArrayGet(pBlock->pDataBlock, pSliceInfo->tsCol.slotId); if (curIndex < pBlock->info.rows - 1) { pSliceInfo->pRemainRes = pBlock; - pSliceInfo->remainTs = *(int64_t*)colDataGetData(pTsCol, curIndex + 1); + pSliceInfo->remainIndex = curIndex + 1; + return; } + + // all data in remaining block processed + pSliceInfo->pRemainRes = NULL; + } static void doTimesliceImpl(SOperatorInfo* pOperator, STimeSliceOperatorInfo* pSliceInfo, SSDataBlock* pBlock, @@ -679,14 +684,10 @@ static void doTimesliceImpl(SOperatorInfo* pOperator, STimeSliceOperatorInfo* pS SInterval* pInterval = &pSliceInfo->interval; SColumnInfoData* pTsCol = taosArrayGet(pBlock->pDataBlock, pSliceInfo->tsCol.slotId); - for (int32_t i = 0; i < pBlock->info.rows; ++i) { - int64_t ts = *(int64_t*)colDataGetData(pTsCol, i); - // check if need to resume from the position of last unfinished block - if (pSliceInfo->pRemainRes != NULL && ts < pSliceInfo->remainTs && - pSliceInfo->current <= pSliceInfo->remainTs) { - continue; - } + int32_t i = (pSliceInfo->pRemainRes == NULL) ? 0 : pSliceInfo->remainIndex; + for (; i < pBlock->info.rows; ++i) { + int64_t ts = *(int64_t*)colDataGetData(pTsCol, i); // check for duplicate timestamps if (checkDuplicateTimestamps(pSliceInfo, pTsCol, i, pBlock->info.rows)) { @@ -696,13 +697,6 @@ static void doTimesliceImpl(SOperatorInfo* pOperator, STimeSliceOperatorInfo* pS if (checkNullRow(&pOperator->exprSupp, pBlock, i, ignoreNull)) { continue; } - if (checkWindowBoundReached(pSliceInfo)) { - break; - } - if (checkThresholdReached(pSliceInfo, pOperator->resultInfo.threshold)) { - saveBlockStatus(pSliceInfo, pBlock, i); - return; - } if (ts == pSliceInfo->current) { addCurrentRowToResult(pSliceInfo, &pOperator->exprSupp, pResBlock, pBlock, i); @@ -984,7 +978,8 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode pInfo->pPrevGroupKey = NULL; pInfo->pNextGroupRes = NULL; pInfo->pRemainRes = NULL; - pInfo->remainTs = 0; + pInfo->remainIndex = 0; + pOperator->resultInfo.threshold = 1; if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) { STableScanInfo* pScanInfo = (STableScanInfo*)downstream->info; -- GitLab