diff --git a/include/common/tcommon.h b/include/common/tcommon.h index 60bace3f73dafe676e3e5748fa31f523b6c3d48f..73ba0f47b8d3003f6f924db86be3b2830444be4d 100644 --- a/include/common/tcommon.h +++ b/include/common/tcommon.h @@ -45,6 +45,7 @@ typedef enum EStreamType { STREAM_REPROCESS, STREAM_INVALID, STREAM_GET_ALL, + STREAM_DELETE, } EStreamType; typedef struct { diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index ad72fd26af4c163327387323db154035eb10ebbc..7b9b56fb73a6d6132f4c54b8703dc92cc2e889de 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -2578,6 +2578,7 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh pInfo->pStDeleted = taosHashInit(64, hashFn, true, HASH_NO_LOCK); pInfo->pDelIterator = NULL; pInfo->pDelRes = createOneDataBlock(pResBlock, false); + pInfo->pDelRes->info.type = STREAM_DELETE; blockDataEnsureCapacity(pInfo->pDelRes, 64); pInfo->pChildren = NULL; pInfo->isFinal = false; @@ -3650,6 +3651,7 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys pInfo->pSeDeleted = taosHashInit(64, hashFn, true, HASH_NO_LOCK); pInfo->pDelIterator = NULL; pInfo->pDelRes = createOneDataBlock(pResBlock, false); + pInfo->pDelRes->info.type = STREAM_DELETE; blockDataEnsureCapacity(pInfo->pDelRes, 64); pInfo->pChildren = NULL;