From 7d8a282258e955e2d04e68644fab6ef8ff680aec Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao@163.com> Date: Thu, 27 Oct 2022 15:51:04 +0800 Subject: [PATCH] fix(stream):tqBuildDeleteReq error --- include/libs/stream/streamState.h | 2 + source/dnode/vnode/src/tq/tqSink.c | 2 +- source/libs/executor/src/scanoperator.c | 3 + source/libs/function/src/builtinsimpl.c | 7 ++- source/libs/stream/src/streamState.c | 42 +++++++++++++- .../tsim/stream/partitionbyColumnInterval.sim | 58 +++++++++++++++++-- 6 files changed, 105 insertions(+), 9 deletions(-) diff --git a/include/libs/stream/streamState.h b/include/libs/stream/streamState.h index 3d59a69dea..225466a015 100644 --- a/include/libs/stream/streamState.h +++ b/include/libs/stream/streamState.h @@ -99,6 +99,8 @@ int32_t streamStateSeekLast(SStreamState* pState, SStreamStateCur* pCur); int32_t streamStateCurNext(SStreamState* pState, SStreamStateCur* pCur); int32_t streamStateCurPrev(SStreamState* pState, SStreamStateCur* pCur); +// char* streamStateSessionDump(SStreamState* pState); + #ifdef __cplusplus } #endif diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index 8a81151273..65e8d69994 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -42,7 +42,7 @@ int32_t tqBuildDeleteReq(SVnode* pVnode, const char* stbFullName, const SSDataBl if (metaGetTableEntryByName(&mr, name) < 0) { metaReaderClear(&mr); taosMemoryFree(name); - return -1; + continue; } int64_t uid = mr.me.uid; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 76035a65ae..9518e97d00 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1299,6 +1299,9 @@ static int32_t generateSessionScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSr SSessionKey startWin = {0}; getCurSessionWindow(pInfo->windowSup.pStreamAggSup, startData[i], endData[i], groupId, &startWin); if (IS_INVALID_SESSION_WIN_KEY(startWin)) { + // char* tmp = streamStateSessionDump(pInfo->windowSup.pStreamAggSup->pState); + // qInfo("%s", tmp); + // taosMemoryFree(tmp); // window has been closed. continue; } diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 40e3e7c35d..5dcef7cd17 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -380,7 +380,7 @@ typedef struct SGroupKeyInfo { numOfElem += 1; \ pStddevRes->count -= 1; \ sumT -= plist[i]; \ - pStddevRes->quadraticISum -= plist[i] * plist[i]; \ + pStddevRes->quadraticISum -= (int64_t)(plist[i] * plist[i]); \ } \ } while (0) @@ -2526,8 +2526,9 @@ int32_t apercentileFunction(SqlFunctionCtx* pCtx) { // might be a race condition here that pHisto can be overwritten or setup function // has not been called, need to relink the buffer pHisto points to. buildHistogramInfo(pInfo); - qDebug("%s before add %d elements into histogram, total:%" PRId64 ", numOfEntry:%d, pHisto:%p, elems: %p", __FUNCTION__, - numOfElems, pInfo->pHisto->numOfElems, pInfo->pHisto->numOfEntries, pInfo->pHisto, pInfo->pHisto->elems); + qDebug("%s before add %d elements into histogram, total:%" PRId64 ", numOfEntry:%d, pHisto:%p, elems: %p", + __FUNCTION__, numOfElems, pInfo->pHisto->numOfElems, pInfo->pHisto->numOfEntries, pInfo->pHisto, + pInfo->pHisto->elems); for (int32_t i = start; i < pInput->numOfRows + start; ++i) { if (colDataIsNull_f(pCol->nullbitmap, i)) { continue; diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index ea429a76b0..9829955baf 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -497,7 +497,7 @@ SStreamStateCur* streamStateSessionGetRanomCur(SStreamState* pState, const SSess if (pCur == NULL) return NULL; tdbTbcOpen(pState->pSessionStateDb, &pCur->pCur, NULL); - int32_t c = 0; + int32_t c = -2; SStateSessionKey sKey = {.key = *key, .opNum = pState->number}; tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateSessionKey), &c); if (c != 0) { @@ -727,3 +727,43 @@ _end: streamStateFreeCur(pCur); return res; } + +#if 0 +char* streamStateSessionDump(SStreamState* pState) { + SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); + if (pCur == NULL) { + return NULL; + } + pCur->number = pState->number; + if (tdbTbcOpen(pState->pSessionStateDb, &pCur->pCur, NULL) < 0) { + streamStateFreeCur(pCur); + return NULL; + } + tdbTbcMoveToFirst(pCur->pCur); + + SSessionKey key = {0}; + int32_t code = streamStateSessionGetKVByCur(pCur, &key, NULL, 0); + if (code != 0) { + return NULL; + } + + int32_t size = 2048; + char* dumpBuf = taosMemoryCalloc(size, 1); + int64_t len = 0; + len += snprintf(dumpBuf + len, size - len, "||s:%15" PRId64 ",", key.win.skey); + len += snprintf(dumpBuf + len, size - len, "e:%15" PRId64 ",", key.win.ekey); + len += snprintf(dumpBuf + len, size - len, "g:%15" PRId64 "||", key.groupId); + while (1) { + tdbTbcMoveToNext(pCur->pCur); + key = (SSessionKey){0}; + code = streamStateSessionGetKVByCur(pCur, &key, NULL, 0); + if (code != 0) { + return dumpBuf; + } + len += snprintf(dumpBuf + len, size - len, "||s:%15" PRId64 ",", key.win.skey); + len += snprintf(dumpBuf + len, size - len, "e:%15" PRId64 ",", key.win.ekey); + len += snprintf(dumpBuf + len, size - len, "g:%15" PRId64 "||", key.groupId); + } + return dumpBuf; +} +#endif diff --git a/tests/script/tsim/stream/partitionbyColumnInterval.sim b/tests/script/tsim/stream/partitionbyColumnInterval.sim index 8375df5064..2e57e8d699 100644 --- a/tests/script/tsim/stream/partitionbyColumnInterval.sim +++ b/tests/script/tsim/stream/partitionbyColumnInterval.sim @@ -599,12 +599,62 @@ if $loop_count == 20 then endi if $rows != 4 then - print =====rows=$rows - print =====rows=$rows - print =====rows=$rows - #goto loop15 + print ======rows=$rows + goto loop15 +endi + +sql insert into t1 values(1648791223001,11,2,3,1.0); +sql insert into t2 values(1648791223001,21,2,3,1.0); +sql insert into t3 values(1648791223001,31,2,3,1.0); +sql insert into t4 values(1648791223001,41,2,3,1.0); + +sleep 300 + +sql delete from st where ts = 1648791223001; + +$loop_count = 0 + +loop16: +sleep 50 +sql select * from test.streamt5 order by c1, c2, c3; + +$loop_count = $loop_count + 1 +if $loop_count == 20 then + print ======rows=$rows + return -1 +endi + +if $rows != 4 then + goto loop16 +endi + +sql insert into t1 values(1648791223001,12,2,3,1.0); +sql insert into t2 values(1648791223001,22,2,3,1.0); +sql insert into t3 values(1648791223001,32,2,3,1.0); +sql insert into t4 values(1648791223001,42,2,3,1.0); + +sleep 300 + +sql delete from st where ts = 1648791223001; + +$loop_count = 0 + +loop17: +sleep 50 +sql select * from test.streamt5 order by c1, c2, c3; + +$loop_count = $loop_count + 1 +if $loop_count == 20 then + return -1 endi +if $rows != 4 then + print ======rows=$rows + print ======rows=$rows + print ======rows=$rows + return 1 + #goto loop17 +endi $loop_all = $loop_all + 1 print ============loop_all=$loop_all -- GitLab