diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 34f462cb3d480874df3feaa311b3a4a1e4074c72..7fd288cd57df53d06d7fd9c888104e1b23d1a39f 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -3423,6 +3423,7 @@ int32_t buildSessionResultDataBlock(SOperatorInfo* pOperator, SStreamState* pSta ASSERT(code == 0); if (code == -1) { // coverity scan + pGroupResInfo->index += 1; continue; } SResultRow* pRow = (SResultRow*)pVal; diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 4c369e880266017667db5b0563a5129799e57d9f..40742087eaee42037320266f7cc7d246f322611f 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -3580,6 +3580,11 @@ static void removeSessionResult(SSHashObj* pHashMap, SSHashObj* pResMap, SSessio tSimpleHashRemove(pResMap, &key, sizeof(SSessionKey)); } +static void getSessionHashKey(const SSessionKey* pKey, SSessionKey* pHashKey) { + *pHashKey = *pKey; + pHashKey->win.ekey = pKey->win.skey; +} + static void removeSessionResults(SSHashObj* pHashMap, SArray* pWins) { if (tSimpleHashGetSize(pHashMap) == 0) { return; @@ -3588,8 +3593,8 @@ static void removeSessionResults(SSHashObj* pHashMap, SArray* pWins) { for (int32_t i = 0; i < size; i++) { SSessionKey* pWin = taosArrayGet(pWins, i); if (!pWin) continue; - SSessionKey key = *pWin; - key.win.ekey = key.win.skey; + SSessionKey key = {0}; + getSessionHashKey(pWin, &key); tSimpleHashRemove(pHashMap, &key, sizeof(SSessionKey)); } } @@ -3642,7 +3647,9 @@ static int32_t doOneWindowAggImpl(SColumnInfoData* pTimeWindowData, SResultWindo static bool doDeleteSessionWindow(SStreamAggSupporter* pAggSup, SSessionKey* pKey) { streamStateSessionDel(pAggSup->pState, pKey); - tSimpleHashRemove(pAggSup->pResultRows, pKey, sizeof(SSessionKey)); + SSessionKey hashKey = {0}; + getSessionHashKey(pKey, &hashKey); + tSimpleHashRemove(pAggSup->pResultRows, &hashKey, sizeof(SSessionKey)); return true; } @@ -3753,8 +3760,8 @@ static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSData } } if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) { - SSessionKey key = winInfo.sessionWin; - key.win.ekey = key.win.skey; + SSessionKey key = {0}; + getSessionHashKey(&winInfo.sessionWin, &key); tSimpleHashPut(pAggSup->pResultRows, &key, sizeof(SSessionKey), &winInfo, sizeof(SResultWindowInfo)); } @@ -3896,8 +3903,8 @@ static void rebuildSessionWindow(SOperatorInfo* pOperator, SArray* pWinArray, SS SOperatorInfo* pChild = taosArrayGetP(pInfo->pChildren, j); SStreamSessionAggOperatorInfo* pChInfo = pChild->info; SStreamAggSupporter* pChAggSup = &pChInfo->streamAggSup; - SSessionKey chWinKey = *pWinKey; - chWinKey.win.ekey = chWinKey.win.skey; + SSessionKey chWinKey = {0}; + getSessionHashKey(pWinKey, &chWinKey); SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentNext(pChAggSup->pState, &chWinKey); SResultRow* pResult = NULL; SResultRow* pChResult = NULL; @@ -3978,8 +3985,8 @@ static void copyDeleteWindowInfo(SArray* pResWins, SSHashObj* pStDeleted) { for (int32_t i = 0; i < size; i++) { SSessionKey* pWinKey = taosArrayGet(pResWins, i); if (!pWinKey) continue; - SSessionKey winInfo = *pWinKey; - winInfo.win.ekey = winInfo.win.skey; + SSessionKey winInfo = {0}; + getSessionHashKey(pWinKey, &winInfo); tSimpleHashPut(pStDeleted, &winInfo, sizeof(SSessionKey), NULL, 0); } } @@ -4561,8 +4568,8 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl } if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) { - SSessionKey key = curWin.winInfo.sessionWin; - key.win.ekey = key.win.skey; + SSessionKey key = {0}; + getSessionHashKey(&curWin.winInfo.sessionWin, &key); tSimpleHashPut(pAggSup->pResultRows, &key, sizeof(SSessionKey), &curWin.winInfo, sizeof(SResultWindowInfo)); } } @@ -4645,6 +4652,12 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) { initGroupResInfoFromArrayList(&pInfo->groupResInfo, pUpdated); blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); +#if 0 + char* pBuf = streamStateSessionDump(pInfo->streamAggSup.pState); + qDebug("===stream===final session%s", pBuf); + taosMemoryFree(pBuf); +#endif + doBuildDeleteDataBlock(pOperator, pInfo->pSeDeleted, pInfo->pDelRes, &pInfo->pDelIterator); if (pInfo->pDelRes->info.rows > 0) { printDataBlock(pInfo->pDelRes, "single state delete"); diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index ccb0dd4a92e12ee51bbdba6ffb4777ae663cae8e..88c39c1157c4ecae57829bd44b1d31ec66d5ab68 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -521,9 +521,13 @@ int32_t streamStateSessionGet(SStreamState* pState, SSessionKey* key, void** pVa void* tmp = NULL; int32_t code = streamStateSessionGetKVByCur(pCur, &resKey, &tmp, pVLen); if (code == 0) { - *key = resKey; - *pVal = tdbRealloc(NULL, *pVLen); - memcpy(*pVal, tmp, *pVLen); + if (key->win.skey != resKey.win.skey) { + code = -1; + } else { + *key = resKey; + *pVal = tdbRealloc(NULL, *pVLen); + memcpy(*pVal, tmp, *pVLen); + } } streamStateFreeCur(pCur); return code; diff --git a/tests/script/tsim/stream/state0.sim b/tests/script/tsim/stream/state0.sim index dc7d9bc40746b76ad3b34430981c86c2b75c9010..87d7d4b7fcd49261ed602886057f4291da840219 100644 --- a/tests/script/tsim/stream/state0.sim +++ b/tests/script/tsim/stream/state0.sim @@ -544,4 +544,192 @@ if $rows != 10 then endi +sql drop stream if exists streams4; +sql drop database if exists test4; +sql drop stable if exists streamt4; +sql create database if not exists test4 vgroups 10 precision "ms" ; +sql use test4; +sql create table st (ts timestamp, c1 tinyint, c2 smallint) tags (t1 tinyint) ; +sql create table t1 using st tags (-81) ; +sql create table t2 using st tags (-81) ; +sql create stream if not exists streams4 trigger window_close into streamt4 as select _wstart AS start, min(c1),count(c1) from t1 state_window(c1); + +sql insert into t1 (ts, c1) values (1668073288209, 11); +sql insert into t1 (ts, c1) values (1668073288210, 11); +sql insert into t1 (ts, c1) values (1668073288211, 11); +sql insert into t1 (ts, c1) values (1668073288212, 11); +sql insert into t1 (ts, c1) values (1668073288213, 11); +sql insert into t1 (ts, c1) values (1668073288214, 11); +sql insert into t1 (ts, c1) values (1668073288215, 29); + +$loop_count = 0 +loop7: + +sleep 200 + +sql select * from streamt4 order by start; + +$loop_count = $loop_count + 1 +if $loop_count == 20 then + return -1 +endi + +if $rows != 1 then + print =====rows=$rows + goto loop7 +endi + +if $data01 != 11 then + print =====data01=$data01 + goto loop7 +endi + +if $data02 != 6 then + print =====data02=$data02 + goto loop7 +endi + +sql delete from t1 where ts = cast(1668073288214 as timestamp); +sql insert into t1 (ts, c1) values (1668073288216, 29); +sql delete from t1 where ts = cast(1668073288215 as timestamp); +sql insert into t1 (ts, c1) values (1668073288217, 29); +sql delete from t1 where ts = cast(1668073288216 as timestamp); +sql insert into t1 (ts, c1) values (1668073288218, 29); +sql delete from t1 where ts = cast(1668073288217 as timestamp); +sql insert into t1 (ts, c1) values (1668073288219, 29); +sql delete from t1 where ts = cast(1668073288218 as timestamp); +sql insert into t1 (ts, c1) values (1668073288220, 29); +sql delete from t1 where ts = cast(1668073288219 as timestamp); + +$loop_count = 0 +loop8: + +sleep 200 + +sql select * from streamt4 order by start; + +$loop_count = $loop_count + 1 +if $loop_count == 20 then + return -1 +endi + +if $rows != 1 then + print =====rows=$rows + goto loop8 +endi + +if $data01 != 11 then + print =====data01=$data01 + goto loop8 +endi + +if $data02 != 5 then + print =====data02=$data02 + goto loop8 +endi + +sql insert into t1 (ts, c1) values (1668073288221, 65); +sql insert into t1 (ts, c1) values (1668073288222, 65); +sql insert into t1 (ts, c1) values (1668073288223, 65); +sql insert into t1 (ts, c1) values (1668073288224, 65); +sql insert into t1 (ts, c1) values (1668073288225, 65); +sql insert into t1 (ts, c1) values (1668073288226, 65); + +$loop_count = 0 +loop8: + +sleep 200 + +sql select * from streamt4 order by start; + +$loop_count = $loop_count + 1 +if $loop_count == 20 then + return -1 +endi + +if $rows != 2 then + print =====rows=$rows + goto loop8 +endi + +if $data01 != 11 then + print =====data01=$data01 + goto loop8 +endi + +if $data02 != 5 then + print =====data02=$data02 + goto loop8 +endi + +if $data11 != 29 then + print =====data11=$data11 + goto loop8 +endi + +if $data12 != 1 then + print =====data12=$data12 + goto loop8 +endi + +sql insert into t1 (ts, c1) values (1668073288224, 64); + +$loop_count = 0 +loop9: + +sleep 200 + +sql select * from streamt4 order by start; + +$loop_count = $loop_count + 1 +if $loop_count == 20 then + return -1 +endi + +if $rows != 4 then + print =====rows=$rows + goto loop9 +endi + +if $data01 != 11 then + print =====data01=$data01 + goto loop9 +endi + +if $data02 != 5 then + print =====data02=$data02 + goto loop9 +endi + +if $data11 != 29 then + print =====data11=$data11 + goto loop9 +endi + +if $data12 != 1 then + print =====data12=$data12 + goto loop9 +endi + +if $data21 != 65 then + print =====data21=$data21 + goto loop9 +endi + +if $data22 != 3 then + print =====data22=$data22 + goto loop9 +endi + +if $data31 != 64 then + print =====data31=$data31 + goto loop9 +endi + +if $data32 != 1 then + print =====data32=$data32 + goto loop9 +endi + + system sh/exec.sh -n dnode1 -s stop -x SIGINT