diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index fc8f9420156f554477e25f36ec47a4a13d38456a..6516d768305a1aa9164b4be3386137a54a8d83a1 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -459,10 +459,11 @@ typedef struct SPartitionDataInfo { SArray* rowIds; } SPartitionDataInfo; -typedef struct STimeWindowSupp { +typedef struct STimeWindowAggSupp { int8_t calTrigger; int64_t waterMark; TSKEY maxTs; + TSKEY minTs; SColumnInfoData timeWindowData; // query time window info for scalar function execution. } STimeWindowAggSupp; diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index a19b6f817936b8a360298b5b7de627b1de24f5f8..fa559f0692dd2408c8c88ea6fddeb405e2b51059 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1355,7 +1355,7 @@ static void setInverFunction(SqlFunctionCtx* pCtx, int32_t num, EStreamType type } } -void doClearWindowImpl(SResultRowPosition* p1, SDiskbasedBuf* pResultBuf, SExprSupp* pSup, int32_t numOfOutput) { +static void doClearWindowImpl(SResultRowPosition* p1, SDiskbasedBuf* pResultBuf, SExprSupp* pSup, int32_t numOfOutput) { SResultRow* pResult = getResultRowByPos(pResultBuf, p1, false); SqlFunctionCtx* pCtx = pSup->pCtx; for (int32_t i = 0; i < numOfOutput; ++i) { @@ -1374,8 +1374,8 @@ void doClearWindowImpl(SResultRowPosition* p1, SDiskbasedBuf* pResultBuf, SExprS releaseBufPage(pResultBuf, bufPage); } -bool doClearWindow(SAggSupporter* pAggSup, SExprSupp* pSup, char* pData, int16_t bytes, uint64_t groupId, - int32_t numOfOutput) { +static bool doClearWindow(SAggSupporter* pAggSup, SExprSupp* pSup, char* pData, int16_t bytes, uint64_t groupId, + int32_t numOfOutput) { SET_RES_WINDOW_KEY(pAggSup->keyBuf, pData, bytes, groupId); SResultRowPosition* p1 = (SResultRowPosition*)tSimpleHashGet(pAggSup->pResultRowHashTable, pAggSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes)); @@ -1402,18 +1402,21 @@ bool doDeleteIntervalWindow(SAggSupporter* pAggSup, TSKEY ts, uint64_t groupId) return true; } -void doDeleteSpecifyIntervalWindow(SAggSupporter* pAggSup, SSDataBlock* pBlock, SArray* pDelWins, SInterval* pInterval, - SHashObj* pUpdatedMap) { +static void doDeleteSpecifyIntervalWindow(SAggSupporter* pAggSup, STimeWindowAggSupp* pTwSup, SSDataBlock* pBlock, + SArray* pDelWins, SInterval* pInterval, SHashObj* pUpdatedMap) { SColumnInfoData* pStartCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX); TSKEY* tsStarts = (TSKEY*)pStartCol->pData; SColumnInfoData* pEndCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX); TSKEY* tsEnds = (TSKEY*)pEndCol->pData; SColumnInfoData* pGroupCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX); uint64_t* groupIds = (uint64_t*)pGroupCol->pData; + int64_t numOfWin = tSimpleHashGetSize(pAggSup->pResultRowHashTable); for (int32_t i = 0; i < pBlock->info.rows; i++) { + TSKEY startTs = TMAX(tsStarts[i], pTwSup->minTs); + TSKEY endTs = TMIN(tsEnds[i], pTwSup->maxTs); SResultRowInfo dumyInfo; dumyInfo.cur.pageId = -1; - STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, tsStarts[i], pInterval, TSDB_ORDER_ASC); + STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, startTs, pInterval, TSDB_ORDER_ASC); do { doDeleteIntervalWindow(pAggSup, win.skey, groupIds[i]); SWinKey winRes = {.ts = win.skey, .groupId = groupIds[i]}; @@ -1424,7 +1427,7 @@ void doDeleteSpecifyIntervalWindow(SAggSupporter* pAggSup, SSDataBlock* pBlock, taosHashRemove(pUpdatedMap, &winRes, sizeof(SWinKey)); } getNextTimeWindow(pInterval, pInterval->precision, TSDB_ORDER_ASC, &win); - } while (win.skey <= tsEnds[i]); + } while (win.skey <= endTs); } } @@ -3030,6 +3033,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); SHashObj* pUpdatedMap = taosHashInit(1024, hashFn, false, HASH_NO_LOCK); TSKEY maxTs = INT64_MIN; + TSKEY minTs = INT64_MAX; SExprSupp* pSup = &pOperator->exprSupp; @@ -3101,8 +3105,6 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { break; } printDataBlock(pBlock, IS_FINAL_OP(pInfo) ? "interval final recv" : "interval semi recv"); - maxTs = TMAX(maxTs, pBlock->info.window.ekey); - maxTs = TMAX(maxTs, pBlock->info.watermark); ASSERT(pBlock->info.type != STREAM_INVERT); if (pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_PULL_DATA) { @@ -3129,13 +3131,13 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { break; } else if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT) { SArray* delWins = taosArrayInit(8, sizeof(SWinKey)); - doDeleteSpecifyIntervalWindow(&pInfo->aggSup, pBlock, delWins, &pInfo->interval, pUpdatedMap); + doDeleteSpecifyIntervalWindow(&pInfo->aggSup, &pInfo->twAggSup, pBlock, delWins, &pInfo->interval, pUpdatedMap); if (IS_FINAL_OP(pInfo)) { int32_t childIndex = getChildIndex(pBlock); SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, childIndex); SStreamFinalIntervalOperatorInfo* pChildInfo = pChildOp->info; SExprSupp* pChildSup = &pChildOp->exprSupp; - doDeleteSpecifyIntervalWindow(&pChildInfo->aggSup, pBlock, NULL, &pChildInfo->interval, NULL); + doDeleteSpecifyIntervalWindow(&pChildInfo->aggSup, &pInfo->twAggSup, pBlock, NULL, &pChildInfo->interval, NULL); rebuildIntervalWindow(pInfo, pSup, delWins, pInfo->binfo.pRes->info.groupId, pOperator->exprSupp.numOfExprs, pOperator->pTaskInfo, pUpdatedMap); addRetriveWindow(delWins, pInfo); @@ -3189,9 +3191,13 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { setInputDataBlock(pChildOp, pChildOp->exprSupp.pCtx, pBlock, pChInfo->order, MAIN_SCAN, true); doHashIntervalAgg(pChildOp, pBlock, pBlock->info.groupId, NULL); } + maxTs = TMAX(maxTs, pBlock->info.window.ekey); + maxTs = TMAX(maxTs, pBlock->info.watermark); + minTs = TMIN(minTs, pBlock->info.window.skey); } pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs); + pInfo->twAggSup.minTs = TMIN(pInfo->twAggSup.minTs, minTs); if (IS_FINAL_OP(pInfo)) { closeIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval, pInfo->pPullDataMap, pUpdatedMap, pInfo->pRecycledPages, pInfo->aggSup.pResultBuf); @@ -3264,6 +3270,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, .waterMark = pIntervalPhyNode->window.watermark, .calTrigger = pIntervalPhyNode->window.triggerType, .maxTs = INT64_MIN, + .minTs = INT64_MAX, }; ASSERT(pInfo->twAggSup.calTrigger != STREAM_TRIGGER_MAX_DELAY); pInfo->primaryTsIndex = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId; @@ -3507,7 +3514,11 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh initDummyFunction(pInfo->pDummyCtx, pSup->pCtx, numOfCols); pInfo->twAggSup = (STimeWindowAggSupp){ - .waterMark = pSessionNode->window.watermark, .calTrigger = pSessionNode->window.triggerType, .maxTs = INT64_MIN}; + .waterMark = pSessionNode->window.watermark, + .calTrigger = pSessionNode->window.triggerType, + .maxTs = INT64_MIN, + .minTs = INT64_MAX, + }; initResultRowInfo(&pInfo->binfo.resultRowInfo); initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window); @@ -4832,6 +4843,7 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys .waterMark = pStateNode->window.watermark, .calTrigger = pStateNode->window.triggerType, .maxTs = INT64_MIN, + .minTs = INT64_MAX, }; initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window); @@ -5632,6 +5644,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { SStreamIntervalOperatorInfo* pInfo = pOperator->info; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; int64_t maxTs = INT64_MIN; + int64_t minTs = INT64_MAX; SExprSupp* pSup = &pOperator->exprSupp; if (pOperator->status == OP_EXEC_DONE) { @@ -5676,7 +5689,8 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { qDebug("%s clear existed time window results for updates checked", GET_TASKID(pTaskInfo)); continue; } else if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT) { - doDeleteSpecifyIntervalWindow(&pInfo->aggSup, pBlock, pInfo->pDelWins, &pInfo->interval, pUpdatedMap); + doDeleteSpecifyIntervalWindow(&pInfo->aggSup, &pInfo->twAggSup, pBlock, pInfo->pDelWins, &pInfo->interval, + pUpdatedMap); continue; } else if (pBlock->info.type == STREAM_GET_ALL) { getAllIntervalWindow(pInfo->aggSup.pResultRowHashTable, pUpdatedMap); @@ -5702,11 +5716,13 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { } maxTs = TMAX(maxTs, pBlock->info.window.ekey); + minTs = TMIN(minTs, pBlock->info.window.skey); doStreamIntervalAggImpl(pOperator, &pInfo->binfo.resultRowInfo, pBlock, MAIN_SCAN, pUpdatedMap); // new disc buf /*doStreamIntervalAggImpl2(pOperator, pBlock, pBlock->info.groupId, pUpdatedMap);*/ } pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs); + pInfo->twAggSup.minTs = TMIN(pInfo->twAggSup.minTs, minTs); #if 0 if (pState) { @@ -5805,6 +5821,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys .waterMark = pIntervalPhyNode->window.watermark, .calTrigger = pIntervalPhyNode->window.triggerType, .maxTs = INT64_MIN, + .minTs = INT64_MAX, }; ASSERT(twAggSupp.calTrigger != STREAM_TRIGGER_MAX_DELAY); pOperator->pTaskInfo = pTaskInfo; diff --git a/source/libs/executor/src/tsimplehash.c b/source/libs/executor/src/tsimplehash.c index 84b615af7a93aef9fbf86190a2544474b7b2c87b..16fd11f97d330fa3bc2622e2f7671fa532dce61f 100644 --- a/source/libs/executor/src/tsimplehash.c +++ b/source/libs/executor/src/tsimplehash.c @@ -247,8 +247,9 @@ void *tSimpleHashGet(SSHashObj *pHashObj, const void *key, size_t keyLen) { } int32_t tSimpleHashRemove(SSHashObj *pHashObj, const void *key, size_t keyLen) { + int32_t code = TSDB_CODE_FAILED; if (!pHashObj || !key) { - return TSDB_CODE_FAILED; + return code; } uint32_t hashVal = (*pHashObj->hashFp)(key, (uint32_t)keyLen); @@ -266,13 +267,14 @@ int32_t tSimpleHashRemove(SSHashObj *pHashObj, const void *key, size_t keyLen) { } FREE_HASH_NODE(pNode); atomic_sub_fetch_64(&pHashObj->size, 1); + code = TSDB_CODE_SUCCESS; break; } pPrev = pNode; pNode = pNode->next; } - return TSDB_CODE_SUCCESS; + return code; } int32_t tSimpleHashIterateRemove(SSHashObj *pHashObj, const void *key, size_t keyLen, void **pIter, int32_t *iter) { diff --git a/tests/script/tsim/stream/deleteInterval.sim b/tests/script/tsim/stream/deleteInterval.sim index dfd0ddc9d3a203e617bfcdffa3b12e9414c2feb6..00d10afad99a258d3c931add7e7ceedc653e85df 100644 --- a/tests/script/tsim/stream/deleteInterval.sim +++ b/tests/script/tsim/stream/deleteInterval.sim @@ -186,7 +186,9 @@ endi sql drop stream if exists streams2; sql drop database if exists test2; +sql drop database if exists test; sql create database test2 vgroups 4; +sql create database test vgroups 1; sql use test2; sql create stable st(ts timestamp, a int, b int, c int, d double) tags(ta int,tb int,tc int); sql create table t1 using st tags(1,1,1); @@ -411,6 +413,80 @@ if $data12 != 3 then goto loop14 endi +return 1 + +sql drop stream if exists streams3; +sql drop database if exists test3; +sql drop database if exists test; +sql create database test3 vgroups 4; +sql create database test vgroups 1; +sql use test3; +sql create stable st(ts timestamp, a int, b int, c int, d double) tags(ta int,tb int,tc int); +sql create table t1 using st tags(1,1,1); +sql create table t2 using st tags(2,2,2); +sql create stream streams3 trigger at_once into test.streamt3 as select _wstart c1, count(*) c2, max(a) c3 from st interval(10s); + +sql insert into t1 values(1648791213000,NULL,NULL,NULL,NULL); +sql insert into t2 values(1648791213000,NULL,NULL,NULL,NULL); + +$loop_count = 0 + +sql delete from t1; + +loop15: +sleep 200 +sql select * from test.streamt2 order by c1, c2, c3; + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +if $rows != 1 then + print =====rows=$rows + goto loop15 +endi + +$loop_count = 0 + +sql delete from t1 where ts > 100; + +loop16: +sleep 200 +sql select * from test.streamt2 order by c1, c2, c3; + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +if $rows != 1 then + print =====rows=$rows + goto loop16 +endi + +$loop_count = 0 + +sql delete from st; + +loop17: +sleep 200 +sql select * from test.streamt2 order by c1, c2, c3; + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +if $rows != 0 then + print =====rows=$rows + goto loop17 +endi + + + + + $loop_all = $loop_all + 1 print ============loop_all=$loop_all