diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index f17d7f6468e25ca7265b405506e4e0521aed3c74..7100be58e36baf851bc16b03f195e255a7fdc5e0 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -536,6 +536,7 @@ typedef struct SStreamIntervalOperatorInfo { SArray* pChildren; SStreamState* pState; SWinKey delKey; + uint64_t numOfDatapack; } SStreamIntervalOperatorInfo; typedef struct SDataGroupInfo { diff --git a/source/libs/executor/src/filloperator.c b/source/libs/executor/src/filloperator.c index e26bfe98898d217f57bc238a53912e8444e15076..5ed26f627a26736d21e478ea79fec0543b45df27 100644 --- a/source/libs/executor/src/filloperator.c +++ b/source/libs/executor/src/filloperator.c @@ -1146,7 +1146,7 @@ static void doDeleteFillResult(SOperatorInfo* pOperator) { if (delTs > nextKey.ts) { break; } - endTs = delTs; + SWinKey delKey = {.groupId = delGroupId, .ts = delTs}; if (delTs == nextKey.ts) { code = streamStateCurNext(pOperator->pTaskInfo->streamInfo.pState, pCur); @@ -1159,7 +1159,7 @@ static void doDeleteFillResult(SOperatorInfo* pOperator) { streamStateFreeCur(pCur); pCur = streamStateGetAndCheckCur(pOperator->pTaskInfo->streamInfo.pState, &nextKey); } - endTs = TMAX(ts, nextKey.ts - 1); + endTs = TMAX(delTs, nextKey.ts - 1); if (code != TSDB_CODE_SUCCESS) { break; } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index b6353061fb484a5b4910a278e4aa0f38dc60f999..7f0dec1959031dba37a078f519d5d7f31b62cece 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1113,7 +1113,7 @@ static STimeWindow getSlidingWindow(TSKEY* startTsCol, TSKEY* endTsCol, uint64_t if (hasGroup) { (*pRowIndex) += 1; } else { - while ((groupId == gpIdCol[(*pRowIndex)] && startTsCol[*pRowIndex] < endWin.ekey)) { + while ((groupId == gpIdCol[(*pRowIndex)] && startTsCol[*pRowIndex] <= endWin.ekey)) { (*pRowIndex) += 1; if ((*pRowIndex) == pDataBlockInfo->rows) { break; diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index f2c2e24a41ede56491d45691f82149c191cc0492..5cf0d3111732c668c367b073c9a0cbff40e35644 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -2516,9 +2516,11 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); if (pBlock == NULL) { pOperator->status = OP_RES_TO_RETURN; - qDebug("%s return data", IS_FINAL_OP(pInfo) ? "interval final" : "interval semi"); + qDebug("===stream===return data:%s. recv datablock num:%" PRIu64 , IS_FINAL_OP(pInfo) ? "interval final" : "interval semi", pInfo->numOfDatapack); + pInfo->numOfDatapack = 0; break; } + pInfo->numOfDatapack++; printDataBlock(pBlock, IS_FINAL_OP(pInfo) ? "interval final recv" : "interval semi recv"); ASSERT(pBlock->info.type != STREAM_INVERT); @@ -2734,6 +2736,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, pInfo->pDelWins = taosArrayInit(4, sizeof(SWinKey)); pInfo->delKey.ts = INT64_MAX; pInfo->delKey.groupId = 0; + pInfo->numOfDatapack = 0; pOperator->operatorType = pPhyNode->type; pOperator->blocking = true; @@ -4700,8 +4703,11 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { while (1) { SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); if (pBlock == NULL) { + qDebug("===stream===return data:single interval. recv datablock num:%" PRIu64, pInfo->numOfDatapack); + pInfo->numOfDatapack = 0; break; } + pInfo->numOfDatapack++; printDataBlock(pBlock, "single interval recv"); if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT || @@ -4851,6 +4857,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys pInfo->pChildren = NULL; pInfo->delKey.ts = INT64_MAX; pInfo->delKey.groupId = 0; + pInfo->numOfDatapack = 0; setOperatorInfo(pOperator, "StreamIntervalOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL, true, OP_NOT_OPENED, pInfo, pTaskInfo);