diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 0adabed626a090c17dd09c6d0ec963b596205edd..f503f9370b1cdfb72b66a751ee29e0b1c95378ee 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -483,9 +483,10 @@ typedef struct SStreamScanInfo { int32_t blockRecoverTotCnt; SSDataBlock* pRecoverRes; - SSDataBlock* pCreateTbRes; - int8_t igCheckUpdate; - int8_t igExpired; + SSDataBlock* pCreateTbRes; + int8_t igCheckUpdate; + int8_t igExpired; + SStreamState* pState; } SStreamScanInfo; typedef struct { diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 4b4cae1a55fcae3c770833c032577d219abe3b2a..fbc835beeaad4177dfdb868819673a0f53c49f0f 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1447,39 +1447,8 @@ static int32_t generateScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, return code; } -#if 0 -void calBlockTag(SStreamScanInfo* pInfo, SSDataBlock* pBlock) { - SExprSupp* pTagCalSup = &pInfo->tagCalSup; - SStreamState* pState = pInfo->pStreamScanOp->pTaskInfo->streamInfo.pState; - if (pTagCalSup == NULL || pTagCalSup->numOfExprs == 0) return; - if (pBlock == NULL || pBlock->info.rows == 0) return; - - void* tag = NULL; - int32_t tagLen = 0; - if (streamStateGetParTag(pState, pBlock->info.id.groupId, &tag, &tagLen) == 0) { - pBlock->info.tagLen = tagLen; - void* pTag = taosMemoryRealloc(pBlock->info.pTag, tagLen); - if (pTag == NULL) { - tdbFree(tag); - taosMemoryFree(pBlock->info.pTag); - pBlock->info.pTag = NULL; - pBlock->info.tagLen = 0; - return; - } - pBlock->info.pTag = pTag; - memcpy(pBlock->info.pTag, tag, tagLen); - tdbFree(tag); - return; - } else { - pBlock->info.pTag = NULL; - } - tdbFree(tag); -} -#endif - static void calBlockTbName(SStreamScanInfo* pInfo, SSDataBlock* pBlock) { SExprSupp* pTbNameCalSup = &pInfo->tbnameCalSup; - SStreamState* pState = pInfo->pStreamScanOp->pTaskInfo->streamInfo.pState; blockDataCleanup(pInfo->pCreateTbRes); if (pInfo->tbnameCalSup.numOfExprs == 0 && pInfo->tagCalSup.numOfExprs == 0) { pBlock->info.parTbName[0] = 0; @@ -1535,7 +1504,7 @@ static void checkUpdateData(SStreamScanInfo* pInfo, bool invertible, SSDataBlock bool update = updateInfoIsUpdated(pInfo->pUpdateInfo, pBlock->info.id.uid, tsCol[rowId]); bool closedWin = isClosed && isSignleIntervalWindow(pInfo) && isDeletedStreamWindow(&win, pBlock->info.id.groupId, - pInfo->pTableScanOp->pTaskInfo->streamInfo.pState, &pInfo->twAggSup); + pInfo->pState, &pInfo->twAggSup); if ((update || closedWin) && out) { qDebug("stream update check not pass, update %d, closedWin %d", update, closedWin); uint64_t gpId = 0; @@ -2534,6 +2503,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys pInfo->igCheckUpdate = pTableScanNode->igCheckUpdate; pInfo->igExpired = pTableScanNode->igExpired; pInfo->twAggSup.maxTs = INT64_MIN; + pInfo->pState = NULL; // todo(liuyao) get buff from rocks db; void* buff = NULL; diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 89bce58cceef06a35119bfafe84cce7c67b57c12..8034b8f87103a0b9b3e7176d1a048a19741dc290 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -25,7 +25,9 @@ #include "ttime.h" #define IS_FINAL_OP(op) ((op)->isFinal) -#define DEAULT_DELETE_MARK (1000LL * 60LL * 60LL * 24LL * 365LL * 10LL); +// #define DEAULT_DELETE_MARK (1000LL * 60LL * 60LL * 24LL * 365LL * 10LL); + +#define DEAULT_DELETE_MARK (1000LL * 60LL * 60LL); typedef struct SSessionAggOperatorInfo { SOptrBasicInfo binfo; @@ -1612,20 +1614,20 @@ static bool timeWindowinterpNeeded(SqlFunctionCtx* pCtx, int32_t numOfCols, SInt return needed; } -void initIntervalDownStream(SOperatorInfo* downstream, uint16_t type, SAggSupporter* pSup, SInterval* pInterval, - STimeWindowAggSupp* pTwSup) { +void initIntervalDownStream(SOperatorInfo* downstream, uint16_t type, SStreamIntervalOperatorInfo* pInfo) { if (downstream->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { - initIntervalDownStream(downstream->pDownstream[0], type, pSup, pInterval, pTwSup); + initIntervalDownStream(downstream->pDownstream[0], type, pInfo); return; } SStreamScanInfo* pScanInfo = downstream->info; pScanInfo->windowSup.parentType = type; - pScanInfo->windowSup.pIntervalAggSup = pSup; + pScanInfo->windowSup.pIntervalAggSup = &pInfo->aggSup; if (!pScanInfo->igCheckUpdate && !pScanInfo->pUpdateInfo) { - pScanInfo->pUpdateInfo = updateInfoInitP(pInterval, pTwSup->waterMark); + pScanInfo->pUpdateInfo = updateInfoInitP(&pInfo->interval, pInfo->twAggSup.waterMark); } - pScanInfo->interval = *pInterval; - pScanInfo->twAggSup = *pTwSup; + pScanInfo->interval = pInfo->interval; + pScanInfo->twAggSup = pInfo->twAggSup; + pScanInfo->pState = pInfo->pState; } void initStreamFunciton(SqlFunctionCtx* pCtx, int32_t numOfExpr) { @@ -2761,7 +2763,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, pOperator->fpSet = createOperatorFpSet(NULL, doStreamFinalIntervalAgg, NULL, destroyStreamFinalIntervalOperatorInfo, optrDefaultBufFn, NULL); if (pPhyNode->type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL) { - initIntervalDownStream(downstream, pPhyNode->type, &pInfo->aggSup, &pInfo->interval, &pInfo->twAggSup); + initIntervalDownStream(downstream, pPhyNode->type, pInfo); } code = appendDownstream(pOperator, &downstream, 1); if (code != TSDB_CODE_SUCCESS) { @@ -4930,7 +4932,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamIntervalAgg, NULL, destroyStreamFinalIntervalOperatorInfo, optrDefaultBufFn, NULL); - initIntervalDownStream(downstream, pPhyNode->type, &pInfo->aggSup, &pInfo->interval, &pInfo->twAggSup); + initIntervalDownStream(downstream, pPhyNode->type, pInfo); code = appendDownstream(pOperator, &downstream, 1); if (code != TSDB_CODE_SUCCESS) { goto _error;