diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 50a877244840112480175a06f57f6933c7e38ae7..f198277cdcb04a1af572f11f6a8ee21dcff32b6b 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1455,18 +1455,6 @@ STimeWindow getFinalTimeWindow(int64_t ts, SInterval* pInterval) { return w; } -static void closeChildIntervalWindow(SOperatorInfo* pOperator, SArray* pChildren, TSKEY maxTs) { - int32_t size = taosArrayGetSize(pChildren); - for (int32_t i = 0; i < size; i++) { - SOperatorInfo* pChildOp = taosArrayGetP(pChildren, i); - SStreamIntervalOperatorInfo* pChInfo = pChildOp->info; - ASSERTS(pChInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE, "children trigger type should be at once"); - pChInfo->twAggSup.maxTs = TMAX(pChInfo->twAggSup.maxTs, maxTs); - closeStreamIntervalWindow(pChInfo->aggSup.pResultRowHashTable, &pChInfo->twAggSup, &pChInfo->interval, NULL, NULL, - NULL, pOperator); - } -} - static void doBuildDeleteResult(SStreamIntervalOperatorInfo* pInfo, SArray* pWins, int32_t* index, SSDataBlock* pBlock) { blockDataCleanup(pBlock); @@ -1534,6 +1522,10 @@ void destroyStreamFinalIntervalOperatorInfo(void* param) { cleanupBasicInfo(&pInfo->binfo); cleanupAggSup(&pInfo->aggSup); // it should be empty. + void* pIte = NULL; + while ((pIte = taosHashIterate(pInfo->pPullDataMap, pIte)) != NULL) { + taosArrayDestroy(*(void**)pIte); + } taosHashCleanup(pInfo->pPullDataMap); taosArrayDestroy(pInfo->pPullWins); blockDataDestroy(pInfo->pPullDataRes); @@ -4426,23 +4418,6 @@ void destroyMergeIntervalOperatorInfo(void* param) { taosMemoryFreeClear(param); } -static int32_t finalizeWindowResult(SOperatorInfo* pOperatorInfo, uint64_t tableGroupId, STimeWindow* win, - SSDataBlock* pResultBlock) { - SMergeIntervalAggOperatorInfo* miaInfo = pOperatorInfo->info; - SIntervalAggOperatorInfo* iaInfo = &miaInfo->intervalAggOperatorInfo; - SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo; - bool ascScan = (iaInfo->inputOrder == TSDB_ORDER_ASC); - SExprSupp* pExprSup = &pOperatorInfo->exprSupp; - - SET_RES_WINDOW_KEY(iaInfo->aggSup.keyBuf, &win->skey, TSDB_KEYSIZE, tableGroupId); - SResultRowPosition* p1 = (SResultRowPosition*)tSimpleHashGet( - iaInfo->aggSup.pResultRowHashTable, iaInfo->aggSup.keyBuf, GET_RES_WINDOW_KEY_LEN(TSDB_KEYSIZE)); - ASSERT(p1 != NULL); - // finalizeResultRows(iaInfo->aggSup.pResultBuf, p1, pResultBlock, pTaskInfo); - tSimpleHashRemove(iaInfo->aggSup.pResultRowHashTable, iaInfo->aggSup.keyBuf, GET_RES_WINDOW_KEY_LEN(TSDB_KEYSIZE)); - return TSDB_CODE_SUCCESS; -} - static int32_t outputPrevIntervalResult(SOperatorInfo* pOperatorInfo, uint64_t tableGroupId, SSDataBlock* pResultBlock, STimeWindow* newWin) { SMergeIntervalAggOperatorInfo* miaInfo = pOperatorInfo->info; @@ -4463,7 +4438,6 @@ static int32_t outputPrevIntervalResult(SOperatorInfo* pOperatorInfo, uint64_t t STimeWindow* prevWin = &prevGrpWin->window; if ((ascScan && newWin->skey > prevWin->ekey) || ((!ascScan) && newWin->skey < prevWin->ekey)) { - // finalizeWindowResult(pOperatorInfo, tableGroupId, prevWin, pResultBlock); tdListPopNode(miaInfo->groupIntervals, listNode); } } @@ -4623,7 +4597,6 @@ static SSDataBlock* doMergeIntervalAgg(SOperatorInfo* pOperator) { if (listNode != NULL) { SGroupTimeWindow* grpWin = (SGroupTimeWindow*)(listNode->data); - // finalizeWindowResult(pOperator, grpWin->groupId, &grpWin->window, pRes); pRes->info.id.groupId = grpWin->groupId; } } @@ -4768,6 +4741,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { doDeleteWindows(pOperator, &pInfo->interval, pBlock, pInfo->pDelWins, pInfo->pUpdatedMap); continue; } else if (pBlock->info.type == STREAM_GET_ALL) { + qDebug("===stream===single interval recv|block type STREAM_GET_ALL"); getAllIntervalWindow(pInfo->aggSup.pResultRowHashTable, pInfo->pUpdatedMap); continue; } else if (pBlock->info.type == STREAM_CREATE_CHILD_TABLE) { diff --git a/tests/script/tsim/stream/distributeInterval0.sim b/tests/script/tsim/stream/distributeInterval0.sim index 2dd645f3a28b06b3d9ef217b03dcc98a2be38de2..b6ae651d5af4e1ba50a1ba8653c205e8ede692d3 100644 --- a/tests/script/tsim/stream/distributeInterval0.sim +++ b/tests/script/tsim/stream/distributeInterval0.sim @@ -62,7 +62,7 @@ $loop_count = 0 loop0: $loop_count = $loop_count + 1 -if $loop_count == 10 then +if $loop_count == 30 then return -1 endi @@ -93,7 +93,7 @@ $loop_count = 0 loop01: $loop_count = $loop_count + 1 -if $loop_count == 10 then +if $loop_count == 30 then return -1 endi @@ -132,7 +132,7 @@ $loop_count = 0 loop011: $loop_count = $loop_count + 1 -if $loop_count == 10 then +if $loop_count == 30 then return -1 endi @@ -171,7 +171,7 @@ $loop_count = 0 loop02: $loop_count = $loop_count + 1 -if $loop_count == 10 then +if $loop_count == 30 then return -1 endi @@ -211,7 +211,7 @@ $loop_count = 0 loop03: $loop_count = $loop_count + 1 -if $loop_count == 10 then +if $loop_count == 30 then return -1 endi @@ -258,7 +258,7 @@ $loop_count = 0 loop04: $loop_count = $loop_count + 1 -if $loop_count == 10 then +if $loop_count == 30 then return -1 endi @@ -300,7 +300,7 @@ sleep 2000 sql select * from streamtST1; $loop_count = $loop_count + 1 -if $loop_count == 10 then +if $loop_count == 30 then return -1 endi @@ -429,7 +429,7 @@ sql select * from streamtST1; sleep 2000 $loop_count = $loop_count + 1 -if $loop_count == 10 then +if $loop_count == 30 then return -1 endi @@ -468,7 +468,7 @@ sql select * from streamtST3; sleep 2000 $loop_count = $loop_count + 1 -if $loop_count == 10 then +if $loop_count == 30 then return -1 endi diff --git a/tests/script/tsim/stream/fillHistoryBasic1.sim b/tests/script/tsim/stream/fillHistoryBasic1.sim index e7a8da90e23c6b83bbc0b7beb6f736312f292b89..c24ad364918226941e4a235837321d0b1ef120f8 100644 --- a/tests/script/tsim/stream/fillHistoryBasic1.sim +++ b/tests/script/tsim/stream/fillHistoryBasic1.sim @@ -481,64 +481,73 @@ sql insert into t1 values(1648791213004,4,2,3,4.1); sql create stream stream2 trigger at_once fill_history 1 IGNORE EXPIRED 0 into streamt as select _wstart, count(*) c1, count(d) c2 , sum(a) c3 , max(b) c4, min(c) c5 from t1 interval(10s); +$loop_count = 0 +loop0: + sleep 5000 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + sql select `_wstart`, c1, c2 ,c3 ,c4, c5 from streamt; if $rows != 4 then print ======$rows - return -1 + goto loop0 endi # row 0 if $data01 != 2 then print ======$data01 - return -1 + goto loop0 endi if $data02 != 2 then print ======$data02 - return -1 + goto loop0 endi if $data03 != 5 then print ======$data03 - return -1 + goto loop0 endi if $data04 != 2 then print ======$data04 - return -1 + goto loop0 endi if $data05 != 3 then print ======$data05 - return -1 + goto loop0 endi # row 1 if $data11 != 1 then print ======$data11 - return -1 + goto loop0 endi if $data12 != 1 then print ======$data12 - return -1 + goto loop0 endi if $data13 != 2 then print ======$data13 - return -1 + goto loop0 endi if $data14 != 2 then print ======$data14 - return -1 + goto loop0 endi if $data15 != 3 then print ======$data15 - return -1 + goto loop0 endi # row 2