提交 7ea79002 编写于 作者: 5 54liuyao

fix:avoid duplicate results

上级 170b78ec
...@@ -3423,6 +3423,7 @@ int32_t buildSessionResultDataBlock(SOperatorInfo* pOperator, SStreamState* pSta ...@@ -3423,6 +3423,7 @@ int32_t buildSessionResultDataBlock(SOperatorInfo* pOperator, SStreamState* pSta
ASSERT(code == 0); ASSERT(code == 0);
if (code == -1) { if (code == -1) {
// coverity scan // coverity scan
pGroupResInfo->index += 1;
continue; continue;
} }
SResultRow* pRow = (SResultRow*)pVal; SResultRow* pRow = (SResultRow*)pVal;
......
...@@ -3580,6 +3580,11 @@ static void removeSessionResult(SSHashObj* pHashMap, SSHashObj* pResMap, SSessio ...@@ -3580,6 +3580,11 @@ static void removeSessionResult(SSHashObj* pHashMap, SSHashObj* pResMap, SSessio
tSimpleHashRemove(pResMap, &key, sizeof(SSessionKey)); tSimpleHashRemove(pResMap, &key, sizeof(SSessionKey));
} }
static void getSessionHashKey(const SSessionKey* pKey, SSessionKey* pHashKey) {
*pHashKey = *pKey;
pHashKey->win.ekey = pKey->win.skey;
}
static void removeSessionResults(SSHashObj* pHashMap, SArray* pWins) { static void removeSessionResults(SSHashObj* pHashMap, SArray* pWins) {
if (tSimpleHashGetSize(pHashMap) == 0) { if (tSimpleHashGetSize(pHashMap) == 0) {
return; return;
...@@ -3588,8 +3593,8 @@ static void removeSessionResults(SSHashObj* pHashMap, SArray* pWins) { ...@@ -3588,8 +3593,8 @@ static void removeSessionResults(SSHashObj* pHashMap, SArray* pWins) {
for (int32_t i = 0; i < size; i++) { for (int32_t i = 0; i < size; i++) {
SSessionKey* pWin = taosArrayGet(pWins, i); SSessionKey* pWin = taosArrayGet(pWins, i);
if (!pWin) continue; if (!pWin) continue;
SSessionKey key = *pWin; SSessionKey key = {0};
key.win.ekey = key.win.skey; getSessionHashKey(pWin, &key);
tSimpleHashRemove(pHashMap, &key, sizeof(SSessionKey)); tSimpleHashRemove(pHashMap, &key, sizeof(SSessionKey));
} }
} }
...@@ -3642,7 +3647,9 @@ static int32_t doOneWindowAggImpl(SColumnInfoData* pTimeWindowData, SResultWindo ...@@ -3642,7 +3647,9 @@ static int32_t doOneWindowAggImpl(SColumnInfoData* pTimeWindowData, SResultWindo
static bool doDeleteSessionWindow(SStreamAggSupporter* pAggSup, SSessionKey* pKey) { static bool doDeleteSessionWindow(SStreamAggSupporter* pAggSup, SSessionKey* pKey) {
streamStateSessionDel(pAggSup->pState, pKey); streamStateSessionDel(pAggSup->pState, pKey);
tSimpleHashRemove(pAggSup->pResultRows, pKey, sizeof(SSessionKey)); SSessionKey hashKey = {0};
getSessionHashKey(pKey, &hashKey);
tSimpleHashRemove(pAggSup->pResultRows, &hashKey, sizeof(SSessionKey));
return true; return true;
} }
...@@ -3753,8 +3760,8 @@ static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSData ...@@ -3753,8 +3760,8 @@ static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSData
} }
} }
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) { if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) {
SSessionKey key = winInfo.sessionWin; SSessionKey key = {0};
key.win.ekey = key.win.skey; getSessionHashKey(&winInfo.sessionWin, &key);
tSimpleHashPut(pAggSup->pResultRows, &key, sizeof(SSessionKey), &winInfo, sizeof(SResultWindowInfo)); tSimpleHashPut(pAggSup->pResultRows, &key, sizeof(SSessionKey), &winInfo, sizeof(SResultWindowInfo));
} }
...@@ -3896,8 +3903,8 @@ static void rebuildSessionWindow(SOperatorInfo* pOperator, SArray* pWinArray, SS ...@@ -3896,8 +3903,8 @@ static void rebuildSessionWindow(SOperatorInfo* pOperator, SArray* pWinArray, SS
SOperatorInfo* pChild = taosArrayGetP(pInfo->pChildren, j); SOperatorInfo* pChild = taosArrayGetP(pInfo->pChildren, j);
SStreamSessionAggOperatorInfo* pChInfo = pChild->info; SStreamSessionAggOperatorInfo* pChInfo = pChild->info;
SStreamAggSupporter* pChAggSup = &pChInfo->streamAggSup; SStreamAggSupporter* pChAggSup = &pChInfo->streamAggSup;
SSessionKey chWinKey = *pWinKey; SSessionKey chWinKey = {0};
chWinKey.win.ekey = chWinKey.win.skey; getSessionHashKey(pWinKey, &chWinKey);
SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentNext(pChAggSup->pState, &chWinKey); SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentNext(pChAggSup->pState, &chWinKey);
SResultRow* pResult = NULL; SResultRow* pResult = NULL;
SResultRow* pChResult = NULL; SResultRow* pChResult = NULL;
...@@ -3978,8 +3985,8 @@ static void copyDeleteWindowInfo(SArray* pResWins, SSHashObj* pStDeleted) { ...@@ -3978,8 +3985,8 @@ static void copyDeleteWindowInfo(SArray* pResWins, SSHashObj* pStDeleted) {
for (int32_t i = 0; i < size; i++) { for (int32_t i = 0; i < size; i++) {
SSessionKey* pWinKey = taosArrayGet(pResWins, i); SSessionKey* pWinKey = taosArrayGet(pResWins, i);
if (!pWinKey) continue; if (!pWinKey) continue;
SSessionKey winInfo = *pWinKey; SSessionKey winInfo = {0};
winInfo.win.ekey = winInfo.win.skey; getSessionHashKey(pWinKey, &winInfo);
tSimpleHashPut(pStDeleted, &winInfo, sizeof(SSessionKey), NULL, 0); tSimpleHashPut(pStDeleted, &winInfo, sizeof(SSessionKey), NULL, 0);
} }
} }
...@@ -4561,8 +4568,8 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl ...@@ -4561,8 +4568,8 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl
} }
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) { if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) {
SSessionKey key = curWin.winInfo.sessionWin; SSessionKey key = {0};
key.win.ekey = key.win.skey; getSessionHashKey(&curWin.winInfo.sessionWin, &key);
tSimpleHashPut(pAggSup->pResultRows, &key, sizeof(SSessionKey), &curWin.winInfo, sizeof(SResultWindowInfo)); tSimpleHashPut(pAggSup->pResultRows, &key, sizeof(SSessionKey), &curWin.winInfo, sizeof(SResultWindowInfo));
} }
} }
...@@ -4645,6 +4652,12 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) { ...@@ -4645,6 +4652,12 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
initGroupResInfoFromArrayList(&pInfo->groupResInfo, pUpdated); initGroupResInfoFromArrayList(&pInfo->groupResInfo, pUpdated);
blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
#if 0
char* pBuf = streamStateSessionDump(pInfo->streamAggSup.pState);
qDebug("===stream===final session%s", pBuf);
taosMemoryFree(pBuf);
#endif
doBuildDeleteDataBlock(pOperator, pInfo->pSeDeleted, pInfo->pDelRes, &pInfo->pDelIterator); doBuildDeleteDataBlock(pOperator, pInfo->pSeDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
if (pInfo->pDelRes->info.rows > 0) { if (pInfo->pDelRes->info.rows > 0) {
printDataBlock(pInfo->pDelRes, "single state delete"); printDataBlock(pInfo->pDelRes, "single state delete");
......
...@@ -521,9 +521,13 @@ int32_t streamStateSessionGet(SStreamState* pState, SSessionKey* key, void** pVa ...@@ -521,9 +521,13 @@ int32_t streamStateSessionGet(SStreamState* pState, SSessionKey* key, void** pVa
void* tmp = NULL; void* tmp = NULL;
int32_t code = streamStateSessionGetKVByCur(pCur, &resKey, &tmp, pVLen); int32_t code = streamStateSessionGetKVByCur(pCur, &resKey, &tmp, pVLen);
if (code == 0) { if (code == 0) {
*key = resKey; if (key->win.skey != resKey.win.skey) {
*pVal = tdbRealloc(NULL, *pVLen); code = -1;
memcpy(*pVal, tmp, *pVLen); } else {
*key = resKey;
*pVal = tdbRealloc(NULL, *pVLen);
memcpy(*pVal, tmp, *pVLen);
}
} }
streamStateFreeCur(pCur); streamStateFreeCur(pCur);
return code; return code;
......
...@@ -544,4 +544,192 @@ if $rows != 10 then ...@@ -544,4 +544,192 @@ if $rows != 10 then
endi endi
sql drop stream if exists streams4;
sql drop database if exists test4;
sql drop stable if exists streamt4;
sql create database if not exists test4 vgroups 10 precision "ms" ;
sql use test4;
sql create table st (ts timestamp, c1 tinyint, c2 smallint) tags (t1 tinyint) ;
sql create table t1 using st tags (-81) ;
sql create table t2 using st tags (-81) ;
sql create stream if not exists streams4 trigger window_close into streamt4 as select _wstart AS start, min(c1),count(c1) from t1 state_window(c1);
sql insert into t1 (ts, c1) values (1668073288209, 11);
sql insert into t1 (ts, c1) values (1668073288210, 11);
sql insert into t1 (ts, c1) values (1668073288211, 11);
sql insert into t1 (ts, c1) values (1668073288212, 11);
sql insert into t1 (ts, c1) values (1668073288213, 11);
sql insert into t1 (ts, c1) values (1668073288214, 11);
sql insert into t1 (ts, c1) values (1668073288215, 29);
$loop_count = 0
loop7:
sleep 200
sql select * from streamt4 order by start;
$loop_count = $loop_count + 1
if $loop_count == 20 then
return -1
endi
if $rows != 1 then
print =====rows=$rows
goto loop7
endi
if $data01 != 11 then
print =====data01=$data01
goto loop7
endi
if $data02 != 6 then
print =====data02=$data02
goto loop7
endi
sql delete from t1 where ts = cast(1668073288214 as timestamp);
sql insert into t1 (ts, c1) values (1668073288216, 29);
sql delete from t1 where ts = cast(1668073288215 as timestamp);
sql insert into t1 (ts, c1) values (1668073288217, 29);
sql delete from t1 where ts = cast(1668073288216 as timestamp);
sql insert into t1 (ts, c1) values (1668073288218, 29);
sql delete from t1 where ts = cast(1668073288217 as timestamp);
sql insert into t1 (ts, c1) values (1668073288219, 29);
sql delete from t1 where ts = cast(1668073288218 as timestamp);
sql insert into t1 (ts, c1) values (1668073288220, 29);
sql delete from t1 where ts = cast(1668073288219 as timestamp);
$loop_count = 0
loop8:
sleep 200
sql select * from streamt4 order by start;
$loop_count = $loop_count + 1
if $loop_count == 20 then
return -1
endi
if $rows != 1 then
print =====rows=$rows
goto loop8
endi
if $data01 != 11 then
print =====data01=$data01
goto loop8
endi
if $data02 != 5 then
print =====data02=$data02
goto loop8
endi
sql insert into t1 (ts, c1) values (1668073288221, 65);
sql insert into t1 (ts, c1) values (1668073288222, 65);
sql insert into t1 (ts, c1) values (1668073288223, 65);
sql insert into t1 (ts, c1) values (1668073288224, 65);
sql insert into t1 (ts, c1) values (1668073288225, 65);
sql insert into t1 (ts, c1) values (1668073288226, 65);
$loop_count = 0
loop8:
sleep 200
sql select * from streamt4 order by start;
$loop_count = $loop_count + 1
if $loop_count == 20 then
return -1
endi
if $rows != 2 then
print =====rows=$rows
goto loop8
endi
if $data01 != 11 then
print =====data01=$data01
goto loop8
endi
if $data02 != 5 then
print =====data02=$data02
goto loop8
endi
if $data11 != 29 then
print =====data11=$data11
goto loop8
endi
if $data12 != 1 then
print =====data12=$data12
goto loop8
endi
sql insert into t1 (ts, c1) values (1668073288224, 64);
$loop_count = 0
loop9:
sleep 200
sql select * from streamt4 order by start;
$loop_count = $loop_count + 1
if $loop_count == 20 then
return -1
endi
if $rows != 4 then
print =====rows=$rows
goto loop9
endi
if $data01 != 11 then
print =====data01=$data01
goto loop9
endi
if $data02 != 5 then
print =====data02=$data02
goto loop9
endi
if $data11 != 29 then
print =====data11=$data11
goto loop9
endi
if $data12 != 1 then
print =====data12=$data12
goto loop9
endi
if $data21 != 65 then
print =====data21=$data21
goto loop9
endi
if $data22 != 3 then
print =====data22=$data22
goto loop9
endi
if $data31 != 64 then
print =====data31=$data31
goto loop9
endi
if $data32 != 1 then
print =====data32=$data32
goto loop9
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT system sh/exec.sh -n dnode1 -s stop -x SIGINT
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册