diff --git a/include/common/tcommon.h b/include/common/tcommon.h index c7ba618b255f1d43ef81568157b0fe2ceee58207..4a06d81c7b8edcafadb91c04186e9cc21af29052 100644 --- a/include/common/tcommon.h +++ b/include/common/tcommon.h @@ -56,7 +56,8 @@ typedef enum EStreamType { STREAM_CLEAR, STREAM_INVALID, STREAM_GET_ALL, - STREAM_DELETE, + STREAM_DELETE_RESULT, + STREAM_DELETE_DATA, STREAM_RETRIEVE, STREAM_PULL_DATA, STREAM_PULL_OVER, diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 4881dd88aa507feb9182ee15fdb1738484d2e639..b72fd961f5940fca7e3eef02fa6ccdb1ce42c9a3 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -1259,6 +1259,7 @@ int32_t copyDataBlock(SSDataBlock* dst, const SSDataBlock* src) { dst->info.rows = src->info.rows; dst->info.window = src->info.window; + dst->info.type = src->info.type; return TSDB_CODE_SUCCESS; } diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index a485d265b9c68982d085763f8265871a621a1001..ebfc70ceae9321dbebe3458665b75c1af70d53cc 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -3253,6 +3253,10 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { doSetOperatorCompleted(pOperator); break; } + if (pBlock->info.type == STREAM_RETRIEVE) { + // for stream interval + return pBlock; + } // the pDataBlock are always the same one, no need to call this again int32_t code = getTableScanInfo(pOperator->pDownstream[0], &order, &scanFlag); diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 5f66b6e2be80da7f4ac6f353aa4aa17373e284ef..7b027a0570741241c894c2ee94ce6f2a4319b426 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -807,6 +807,23 @@ static bool isStateWindow(SStreamBlockScanInfo* pInfo) { return pInfo->sessionSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE; } +static void setGroupId(SStreamBlockScanInfo* pInfo, SSDataBlock* pBlock, int32_t groupColIndex, int32_t rowIndex) { + ASSERT(rowIndex < pBlock->info.rows); + switch (pBlock->info.type) + { + case STREAM_RETRIEVE: { + SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, groupColIndex); + uint64_t* groupCol = (uint64_t*)pColInfo->pData; + pInfo->groupId = groupCol[rowIndex]; + } + break; + case STREAM_DELETE_DATA: + break; + default: + break; + } +} + static bool prepareDataScan(SStreamBlockScanInfo* pInfo, SSDataBlock* pSDB, int32_t tsColIndex, int32_t* pRowIndex) { STimeWindow win = { .skey = INT64_MIN, @@ -829,6 +846,7 @@ static bool prepareDataScan(SStreamBlockScanInfo* pInfo, SSDataBlock* pSDB, int3 } else { win = getActiveTimeWindow(NULL, &dumyInfo, tsCols[(*pRowIndex)], &pInfo->interval, pInfo->interval.precision, NULL); + setGroupId(pInfo, pSDB, 2, *pRowIndex); (*pRowIndex) += getNumOfRowsInTimeWindow(&pSDB->info, tsCols, (*pRowIndex), win.ekey, binarySearchForKey, NULL, TSDB_ORDER_ASC); } diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 58685e4bee34051d580cc29048d7b3c929dd8a76..03c939cc954a689af99e0e21c18fd035265f6286 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -2852,7 +2852,7 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh pInfo->pStDeleted = taosHashInit(64, hashFn, true, HASH_NO_LOCK); pInfo->pDelIterator = NULL; pInfo->pDelRes = createOneDataBlock(pResBlock, false); - pInfo->pDelRes->info.type = STREAM_DELETE; + pInfo->pDelRes->info.type = STREAM_DELETE_RESULT; blockDataEnsureCapacity(pInfo->pDelRes, 64); pInfo->pChildren = NULL; pInfo->isFinal = false; @@ -3980,7 +3980,7 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys pInfo->pSeDeleted = taosHashInit(64, hashFn, true, HASH_NO_LOCK); pInfo->pDelIterator = NULL; pInfo->pDelRes = createOneDataBlock(pResBlock, false); - pInfo->pDelRes->info.type = STREAM_DELETE; + pInfo->pDelRes->info.type = STREAM_DELETE_RESULT; blockDataEnsureCapacity(pInfo->pDelRes, 64); pInfo->pChildren = NULL; pInfo->ignoreExpiredData = pStateNode->window.igExpired; diff --git a/tests/script/jenkins/basic.txt b/tests/script/jenkins/basic.txt index 74c1c582b40f4b51dc60a3b912c934b04a922bb0..9738fad24dbc8d5075a1c19e9b6e14f05e634a1b 100644 --- a/tests/script/jenkins/basic.txt +++ b/tests/script/jenkins/basic.txt @@ -106,6 +106,7 @@ ./test.sh -f tsim/stream/partitionby1.sim ./test.sh -f tsim/stream/schedSnode.sim ./test.sh -f tsim/stream/windowClose.sim +./test.sh -f tsim/stream/ignoreExpiredData.sim # ---- transaction ./test.sh -f tsim/trans/lossdata1.sim