diff --git a/include/libs/stream/streamState.h b/include/libs/stream/streamState.h index 225466a0157cd0b529871af5b89d0a32e8bb6d8f..9443df5e14e32ca54d76e4bb3c21219509b90cf2 100644 --- a/include/libs/stream/streamState.h +++ b/include/libs/stream/streamState.h @@ -60,19 +60,19 @@ int32_t streamStateDel(SStreamState* pState, const SWinKey* key); int32_t streamStateClear(SStreamState* pState); void streamStateSetNumber(SStreamState* pState, int32_t number); -int32_t streamStateSessionAddIfNotExist(SStreamState* pState, SSessionKey* key, void** pVal, int32_t* pVLen); +int32_t streamStateSessionAddIfNotExist(SStreamState* pState, SSessionKey* key, TSKEY gap, void** pVal, int32_t* pVLen); int32_t streamStateSessionPut(SStreamState* pState, const SSessionKey* key, const void* value, int32_t vLen); int32_t streamStateSessionGet(SStreamState* pState, SSessionKey* key, void** pVal, int32_t* pVLen); int32_t streamStateSessionDel(SStreamState* pState, const SSessionKey* key); int32_t streamStateSessionClear(SStreamState* pState); -int32_t streamStateSessionGetKVByCur(SStreamStateCur* pCur, SSessionKey* pKey, const void** pVal, int32_t* pVLen); +int32_t streamStateSessionGetKVByCur(SStreamStateCur* pCur, SSessionKey* pKey, void** pVal, int32_t* pVLen); int32_t streamStateStateAddIfNotExist(SStreamState* pState, SSessionKey* key, char* pKeyData, int32_t keyDataLen, state_key_cmpr_fn fn, void** pVal, int32_t* pVLen); -int32_t streamStateSessionGetKey(SStreamState* pState, const SSessionKey* key, SSessionKey* curKey); +int32_t streamStateSessionGetKeyByRange(SStreamState* pState, const SSessionKey* range, SSessionKey* curKey); SStreamStateCur* streamStateSessionSeekKeyNext(SStreamState* pState, const SSessionKey* key); SStreamStateCur* streamStateSessionSeekKeyCurrentPrev(SStreamState* pState, const SSessionKey* key); -SStreamStateCur* streamStateSessionGetCur(SStreamState* pState, const SSessionKey* key); +SStreamStateCur* streamStateSessionSeekKeyCurrentNext(SStreamState* pState, const SSessionKey* key); int32_t streamStateFillPut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen); int32_t streamStateFillGet(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen); @@ -99,7 +99,9 @@ int32_t streamStateSeekLast(SStreamState* pState, SStreamStateCur* pCur); int32_t streamStateCurNext(SStreamState* pState, SStreamStateCur* pCur); int32_t streamStateCurPrev(SStreamState* pState, SStreamStateCur* pCur); -// char* streamStateSessionDump(SStreamState* pState); +#if 0 +char* streamStateSessionDump(SStreamState* pState); +#endif #ifdef __cplusplus } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 6d130481233966c5bea5127f687d11dd57f13911..7faf75d5cd1031a793b430ee34ad1168aab544c9 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1438,7 +1438,7 @@ static int32_t generateSessionScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSr uint64_t groupId = getGroupIdByData(pInfo, uidCol[i], startData[i], version); // gap must be 0. SSessionKey startWin = {0}; - getCurSessionWindow(pInfo->windowSup.pStreamAggSup, startData[i], endData[i], groupId, &startWin); + getCurSessionWindow(pInfo->windowSup.pStreamAggSup, startData[i], startData[i], groupId, &startWin); if (IS_INVALID_SESSION_WIN_KEY(startWin)) { // window has been closed. continue; diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 7b6ed5b67dd1e8b92c93640ade09a6fd8cb441e2..e5b49acf0b0ae86d5f8c8923de34dd4c015428f9 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -3550,7 +3550,7 @@ void getCurSessionWindow(SStreamAggSupporter* pAggSup, TSKEY startTs, TSKEY endT pKey->win.skey = startTs; pKey->win.ekey = endTs; pKey->groupId = groupId; - int32_t code = streamStateSessionGetKey(pAggSup->pState, pKey, pKey); + int32_t code = streamStateSessionGetKeyByRange(pAggSup->pState, pKey, pKey); if (code != TSDB_CODE_SUCCESS) { SET_SESSION_WIN_KEY_INVALID(pKey); } @@ -3561,10 +3561,11 @@ bool isInvalidSessionWin(SResultWindowInfo* pWinInfo) { return pWinInfo->session void setSessionOutputBuf(SStreamAggSupporter* pAggSup, TSKEY startTs, TSKEY endTs, uint64_t groupId, SResultWindowInfo* pCurWin) { pCurWin->sessionWin.groupId = groupId; - pCurWin->sessionWin.win.skey = startTs - pAggSup->gap; - pCurWin->sessionWin.win.ekey = endTs + pAggSup->gap; + pCurWin->sessionWin.win.skey = startTs; + pCurWin->sessionWin.win.ekey = endTs; int32_t size = pAggSup->resultRowSize; - int32_t code = streamStateSessionAddIfNotExist(pAggSup->pState, &pCurWin->sessionWin, &pCurWin->pOutputBuf, &size); + int32_t code = + streamStateSessionAddIfNotExist(pAggSup->pState, &pCurWin->sessionWin, pAggSup->gap, &pCurWin->pOutputBuf, &size); if (code == TSDB_CODE_SUCCESS) { pCurWin->isOutput = true; } else { @@ -3575,7 +3576,7 @@ void setSessionOutputBuf(SStreamAggSupporter* pAggSup, TSKEY startTs, TSKEY endT int32_t getSessionWinBuf(SStreamAggSupporter* pAggSup, SStreamStateCur* pCur, SResultWindowInfo* pWinInfo) { int32_t size = 0; - int32_t code = streamStateSessionGetKVByCur(pCur, &pWinInfo->sessionWin, (const void**)&pWinInfo->pOutputBuf, &size); + int32_t code = streamStateSessionGetKVByCur(pCur, &pWinInfo->sessionWin, &pWinInfo->pOutputBuf, &size); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -3680,7 +3681,7 @@ SStreamStateCur* getNextSessionWinInfo(SStreamAggSupporter* pAggSup, SSHashObj* setSessionWinOutputInfo(pStUpdated, pNextWin); int32_t size = 0; pNextWin->sessionWin = pCurWin->sessionWin; - int32_t code = streamStateSessionGetKVByCur(pCur, &pNextWin->sessionWin, (const void**)&pNextWin->pOutputBuf, &size); + int32_t code = streamStateSessionGetKVByCur(pCur, &pNextWin->sessionWin, &pNextWin->pOutputBuf, &size); if (code != TSDB_CODE_SUCCESS) { SET_SESSION_WIN_INVALID(*pNextWin); } @@ -3894,9 +3895,11 @@ static void rebuildSessionWindow(SOperatorInfo* pOperator, SArray* pWinArray, SS SOperatorInfo* pChild = taosArrayGetP(pInfo->pChildren, j); SStreamSessionAggOperatorInfo* pChInfo = pChild->info; SStreamAggSupporter* pChAggSup = &pChInfo->streamAggSup; - SStreamStateCur* pCur = streamStateSessionGetCur(pChAggSup->pState, pWinKey); - SResultRow* pResult = NULL; - SResultRow* pChResult = NULL; + SSessionKey chWinKey = *pWinKey; + chWinKey.win.ekey = chWinKey.win.skey; + SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentNext(pChAggSup->pState, &chWinKey); + SResultRow* pResult = NULL; + SResultRow* pChResult = NULL; while (1) { SResultWindowInfo childWin = {0}; childWin.sessionWin = *pWinKey; @@ -4112,6 +4115,12 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { initGroupResInfoFromArrayList(&pInfo->groupResInfo, pUpdated); blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); +#if 0 + char* pBuf = streamStateSessionDump(pAggSup->pState); + qDebug("===stream===final session%s", pBuf); + taosMemoryFree(pBuf); +#endif + doBuildDeleteDataBlock(pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator); if (pInfo->pDelRes->info.rows > 0) { printDataBlock(pInfo->pDelRes, IS_FINAL_OP(pInfo) ? "final session" : "single session"); @@ -4306,6 +4315,12 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) { initGroupResInfoFromArrayList(&pInfo->groupResInfo, pUpdated); blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity); +#if 0 + char* pBuf = streamStateSessionDump(pAggSup->pState); + qDebug("===stream===semi session%s", pBuf); + taosMemoryFree(pBuf); +#endif + doBuildSessionResult(pOperator, pAggSup->pState, &pInfo->groupResInfo, pBInfo->pRes); if (pBInfo->pRes->info.rows > 0) { printDataBlock(pBInfo->pRes, "semi session"); diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index cf2ead364b5743ef289351b12894a082f41e0492..ccb0dd4a92e12ee51bbdba6ffb4777ae663cae8e 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -29,7 +29,7 @@ typedef struct SStateSessionKey { int64_t opNum; } SStateSessionKey; -static inline int sessionKeyCmpr(const SSessionKey* pWin1, const SSessionKey* pWin2) { +static inline int sessionRangeKeyCmpr(const SSessionKey* pWin1, const SSessionKey* pWin2) { if (pWin1->groupId > pWin2->groupId) { return 1; } else if (pWin1->groupId < pWin2->groupId) { @@ -45,6 +45,28 @@ static inline int sessionKeyCmpr(const SSessionKey* pWin1, const SSessionKey* pW return 0; } +static inline int sessionWinKeyCmpr(const SSessionKey* pWin1, const SSessionKey* pWin2) { + if (pWin1->groupId > pWin2->groupId) { + return 1; + } else if (pWin1->groupId < pWin2->groupId) { + return -1; + } + + if (pWin1->win.skey > pWin2->win.skey) { + return 1; + } else if (pWin1->win.skey < pWin2->win.skey) { + return -1; + } + + if (pWin1->win.ekey > pWin2->win.ekey) { + return 1; + } else if (pWin1->win.ekey < pWin2->win.ekey) { + return -1; + } + + return 0; +} + static inline int stateSessionKeyCmpr(const void* pKey1, int kLen1, const void* pKey2, int kLen2) { SStateSessionKey* pWin1 = (SStateSessionKey*)pKey1; SStateSessionKey* pWin2 = (SStateSessionKey*)pKey2; @@ -55,7 +77,7 @@ static inline int stateSessionKeyCmpr(const void* pKey1, int kLen1, const void* return -1; } - return sessionKeyCmpr(&pWin1->key, &pWin2->key); + return sessionWinKeyCmpr(&pWin1->key, &pWin2->key); } static inline int stateKeyCmpr(const void* pKey1, int kLen1, const void* pKey2, int kLen2) { @@ -400,7 +422,6 @@ SStreamStateCur* streamStateSeekKeyNext(SStreamState* pState, const SWinKey* key SStateKey sKey = {.key = *key, .opNum = pState->number}; int32_t c = 0; if (tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateKey), &c) < 0) { - tdbTbcClose(pCur->pCur); streamStateFreeCur(pCur); return NULL; } @@ -426,7 +447,6 @@ SStreamStateCur* streamStateFillSeekKeyNext(SStreamState* pState, const SWinKey* int32_t c = 0; if (tdbTbcMoveTo(pCur->pCur, key, sizeof(SWinKey), &c) < 0) { - tdbTbcClose(pCur->pCur); streamStateFreeCur(pCur); return NULL; } @@ -452,7 +472,6 @@ SStreamStateCur* streamStateFillSeekKeyPrev(SStreamState* pState, const SWinKey* int32_t c = 0; if (tdbTbcMoveTo(pCur->pCur, key, sizeof(SWinKey), &c) < 0) { - tdbTbcClose(pCur->pCur); streamStateFreeCur(pCur); return NULL; } @@ -496,33 +515,18 @@ int32_t streamStateSessionPut(SStreamState* pState, const SSessionKey* key, cons return tdbTbUpsert(pState->pSessionStateDb, &sKey, sizeof(SStateSessionKey), value, vLen, &pState->txn); } -SStreamStateCur* streamStateSessionGetRanomCur(SStreamState* pState, const SSessionKey* key) { - SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); - if (pCur == NULL) return NULL; - tdbTbcOpen(pState->pSessionStateDb, &pCur->pCur, NULL); - - int32_t c = -2; - SStateSessionKey sKey = {.key = *key, .opNum = pState->number}; - tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateSessionKey), &c); - if (c != 0) { - streamStateFreeCur(pCur); - return NULL; - } - pCur->number = pState->number; - return pCur; -} - int32_t streamStateSessionGet(SStreamState* pState, SSessionKey* key, void** pVal, int32_t* pVLen) { - SStreamStateCur* pCur = streamStateSessionGetRanomCur(pState, key); + SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentNext(pState, key); + SSessionKey resKey = *key; void* tmp = NULL; - if (streamStateSessionGetKVByCur(pCur, key, (const void**)&tmp, pVLen) == 0) { + int32_t code = streamStateSessionGetKVByCur(pCur, &resKey, &tmp, pVLen); + if (code == 0) { + *key = resKey; *pVal = tdbRealloc(NULL, *pVLen); memcpy(*pVal, tmp, *pVLen); - streamStateFreeCur(pCur); - return 0; } streamStateFreeCur(pCur); - return -1; + return code; } int32_t streamStateSessionDel(SStreamState* pState, const SSessionKey* key) { @@ -544,7 +548,6 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentPrev(SStreamState* pState, cons SStateSessionKey sKey = {.key = *key, .opNum = pState->number}; int32_t c = 0; if (tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateSessionKey), &c) < 0) { - tdbTbcClose(pCur->pCur); streamStateFreeCur(pCur); return NULL; } @@ -558,6 +561,34 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentPrev(SStreamState* pState, cons return pCur; } +SStreamStateCur* streamStateSessionSeekKeyCurrentNext(SStreamState* pState, const SSessionKey* key) { + SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); + if (pCur == NULL) { + return NULL; + } + pCur->number = pState->number; + if (tdbTbcOpen(pState->pSessionStateDb, &pCur->pCur, NULL) < 0) { + streamStateFreeCur(pCur); + return NULL; + } + + SStateSessionKey sKey = {.key = *key, .opNum = pState->number}; + int32_t c = 0; + if (tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateSessionKey), &c) < 0) { + streamStateFreeCur(pCur); + return NULL; + } + + if (c <= 0) return pCur; + + if (tdbTbcMoveToNext(pCur->pCur) < 0) { + streamStateFreeCur(pCur); + return NULL; + } + + return pCur; +} + SStreamStateCur* streamStateSessionSeekKeyNext(SStreamState* pState, const SSessionKey* key) { SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); if (pCur == NULL) { @@ -572,7 +603,6 @@ SStreamStateCur* streamStateSessionSeekKeyNext(SStreamState* pState, const SSess SStateSessionKey sKey = {.key = *key, .opNum = pState->number}; int32_t c = 0; if (tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateSessionKey), &c) < 0) { - tdbTbcClose(pCur->pCur); streamStateFreeCur(pCur); return NULL; } @@ -586,13 +616,13 @@ SStreamStateCur* streamStateSessionSeekKeyNext(SStreamState* pState, const SSess return pCur; } -int32_t streamStateSessionGetKVByCur(SStreamStateCur* pCur, SSessionKey* pKey, const void** pVal, int32_t* pVLen) { +int32_t streamStateSessionGetKVByCur(SStreamStateCur* pCur, SSessionKey* pKey, void** pVal, int32_t* pVLen) { if (!pCur) { return -1; } - const SStateSessionKey* pKTmp = NULL; - int32_t kLen; - if (tdbTbcGet(pCur->pCur, (const void**)&pKTmp, &kLen, pVal, pVLen) < 0) { + SStateSessionKey* pKTmp = NULL; + int32_t kLen; + if (tdbTbcGet(pCur->pCur, (const void**)&pKTmp, &kLen, (const void**)pVal, pVLen) < 0) { return -1; } if (pKTmp->opNum != pCur->number) { @@ -606,15 +636,15 @@ int32_t streamStateSessionGetKVByCur(SStreamStateCur* pCur, SSessionKey* pKey, c } int32_t streamStateSessionClear(SStreamState* pState) { - SSessionKey key = {.win.skey = 0, .win.ekey = 0, .groupId = 0}; - streamStateSessionPut(pState, &key, NULL, 0); - SStreamStateCur* pCur = streamStateSessionSeekKeyNext(pState, &key); + SSessionKey key = {.win.skey = 0, .win.ekey = 0, .groupId = 0}; + SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentNext(pState, &key); while (1) { SSessionKey delKey = {0}; void* buf = NULL; int32_t size = 0; - int32_t code = streamStateSessionGetKVByCur(pCur, &delKey, buf, &size); + int32_t code = streamStateSessionGetKVByCur(pCur, &delKey, &buf, &size); if (code == 0) { + ASSERT(size > 0); memset(buf, 0, size); streamStateSessionPut(pState, &delKey, buf, size); } else { @@ -623,61 +653,104 @@ int32_t streamStateSessionClear(SStreamState* pState) { streamStateCurNext(pState, pCur); } streamStateFreeCur(pCur); - streamStateSessionDel(pState, &key); return 0; } -SStreamStateCur* streamStateSessionGetCur(SStreamState* pState, const SSessionKey* key) { - SStreamStateCur* pCur = streamStateSessionGetRanomCur(pState, key); - SSessionKey resKey = *key; - while (1) { - streamStateCurPrev(pState, pCur); - SSessionKey tmpKey = *key; - int32_t code = streamStateSessionGetKVByCur(pCur, &tmpKey, NULL, 0); - if (code == 0 && sessionKeyCmpr(key, &tmpKey) == 0) { - resKey = tmpKey; - } else { - break; - } +int32_t streamStateSessionGetKeyByRange(SStreamState* pState, const SSessionKey* key, SSessionKey* curKey) { + SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); + if (pCur == NULL) { + return -1; + } + pCur->number = pState->number; + if (tdbTbcOpen(pState->pSessionStateDb, &pCur->pCur, NULL) < 0) { + streamStateFreeCur(pCur); + return -1; } - streamStateFreeCur(pCur); - return streamStateSessionGetRanomCur(pState, &resKey); -} -int32_t streamStateSessionGetKey(SStreamState* pState, const SSessionKey* key, SSessionKey* curKey) { - SStreamStateCur* pCur = streamStateSessionGetRanomCur(pState, key); - SSessionKey resKey = *key; - int32_t res = -1; - while (1) { - SSessionKey tmpKey = *key; - int32_t code = streamStateSessionGetKVByCur(pCur, &tmpKey, NULL, 0); - if (code == 0 && sessionKeyCmpr(key, &tmpKey) == 0) { - res = 0; - resKey = tmpKey; - streamStateCurPrev(pState, pCur); - } else { - break; + SStateSessionKey sKey = {.key = *key, .opNum = pState->number}; + int32_t c = 0; + if (tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateSessionKey), &c) < 0) { + streamStateFreeCur(pCur); + return -1; + } + + SSessionKey resKey = *key; + int32_t code = streamStateSessionGetKVByCur(pCur, &resKey, NULL, 0); + if (code == 0 && sessionRangeKeyCmpr(key, &resKey) == 0) { + *curKey = resKey; + streamStateFreeCur(pCur); + return code; + } + + if (c > 0) { + streamStateCurNext(pState, pCur); + code = streamStateSessionGetKVByCur(pCur, &resKey, NULL, 0); + if (code == 0 && sessionRangeKeyCmpr(key, &resKey) == 0) { + *curKey = resKey; + streamStateFreeCur(pCur); + return code; + } + } else if (c < 0) { + streamStateCurPrev(pState, pCur); + code = streamStateSessionGetKVByCur(pCur, &resKey, NULL, 0); + if (code == 0 && sessionRangeKeyCmpr(key, &resKey) == 0) { + *curKey = resKey; + streamStateFreeCur(pCur); + return code; } } - *curKey = resKey; + streamStateFreeCur(pCur); - return res; + return -1; } -int32_t streamStateSessionAddIfNotExist(SStreamState* pState, SSessionKey* key, void** pVal, int32_t* pVLen) { +int32_t streamStateSessionAddIfNotExist(SStreamState* pState, SSessionKey* key, TSKEY gap, void** pVal, + int32_t* pVLen) { // todo refactor - SStreamStateCur* pCur = streamStateSessionGetRanomCur(pState, key); - int32_t size = *pVLen; - void* tmp = NULL; - *pVal = tdbRealloc(NULL, size); - memset(*pVal, 0, size); - if (streamStateSessionGetKVByCur(pCur, key, (const void**)&tmp, pVLen) == 0) { - memcpy(*pVal, tmp, *pVLen); + int32_t res = 0; + SSessionKey originKey = *key; + SSessionKey searchKey = *key; + searchKey.win.skey = key->win.skey - gap; + searchKey.win.ekey = key->win.ekey + gap; + int32_t valSize = *pVLen; + void* tmp = tdbRealloc(NULL, valSize); + if (!tmp) { + return -1; + } + + SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentPrev(pState, key); + int32_t code = streamStateSessionGetKVByCur(pCur, key, pVal, pVLen); + if (code == 0) { + if (sessionRangeKeyCmpr(&searchKey, key) == 0) { + memcpy(tmp, *pVal, valSize); + streamStateSessionDel(pState, key); + goto _end; + } + streamStateCurNext(pState, pCur); + } else { + *key = originKey; streamStateFreeCur(pCur); - return 0; + pCur = streamStateSessionSeekKeyNext(pState, key); } + + code = streamStateSessionGetKVByCur(pCur, key, pVal, pVLen); + if (code == 0) { + if (sessionRangeKeyCmpr(&searchKey, key) == 0) { + memcpy(tmp, *pVal, valSize); + streamStateSessionDel(pState, key); + goto _end; + } + } + + *key = originKey; + res = 1; + memset(tmp, 0, valSize); + +_end: + + *pVal = tmp; streamStateFreeCur(pCur); - return 1; + return res; } int32_t streamStateStateAddIfNotExist(SStreamState* pState, SSessionKey* key, char* pKeyData, int32_t keyDataLen, @@ -692,16 +765,18 @@ int32_t streamStateStateAddIfNotExist(SStreamState* pState, SSessionKey* key, ch } SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentPrev(pState, key); - int32_t code = streamStateSessionGetKVByCur(pCur, key, (const void**)pVal, pVLen); + int32_t code = streamStateSessionGetKVByCur(pCur, key, pVal, pVLen); if (code == 0) { if (key->win.skey <= tmpKey.win.skey && tmpKey.win.ekey <= key->win.ekey) { memcpy(tmp, *pVal, valSize); + streamStateSessionDel(pState, key); goto _end; } void* stateKey = (char*)(*pVal) + (valSize - keyDataLen); if (fn(pKeyData, stateKey) == true) { memcpy(tmp, *pVal, valSize); + streamStateSessionDel(pState, key); goto _end; } @@ -712,11 +787,12 @@ int32_t streamStateStateAddIfNotExist(SStreamState* pState, SSessionKey* key, ch pCur = streamStateSessionSeekKeyNext(pState, key); } - code = streamStateSessionGetKVByCur(pCur, key, (const void**)pVal, pVLen); + code = streamStateSessionGetKVByCur(pCur, key, pVal, pVLen); if (code == 0) { void* stateKey = (char*)(*pVal) + (valSize - keyDataLen); if (fn(pKeyData, stateKey) == true) { memcpy(tmp, *pVal, valSize); + streamStateSessionDel(pState, key); goto _end; } } @@ -746,8 +822,11 @@ char* streamStateSessionDump(SStreamState* pState) { tdbTbcMoveToFirst(pCur->pCur); SSessionKey key = {0}; - int32_t code = streamStateSessionGetKVByCur(pCur, &key, NULL, 0); + void* buf = NULL; + int32_t bufSize = 0; + int32_t code = streamStateSessionGetKVByCur(pCur, &key, &buf, &bufSize); if (code != 0) { + streamStateFreeCur(pCur); return NULL; } @@ -762,12 +841,14 @@ char* streamStateSessionDump(SStreamState* pState) { key = (SSessionKey){0}; code = streamStateSessionGetKVByCur(pCur, &key, NULL, 0); if (code != 0) { + streamStateFreeCur(pCur); return dumpBuf; } len += snprintf(dumpBuf + len, size - len, "||s:%15" PRId64 ",", key.win.skey); len += snprintf(dumpBuf + len, size - len, "e:%15" PRId64 ",", key.win.ekey); len += snprintf(dumpBuf + len, size - len, "g:%15" PRId64 "||", key.groupId); } + streamStateFreeCur(pCur); return dumpBuf; } #endif diff --git a/tests/script/tsim/stream/session0.sim b/tests/script/tsim/stream/session0.sim index afae8ef5da64d6d54b28b8898fbe9a5a973620fc..5e95428e0a2d34adaf830fc74019e320513d2586 100644 --- a/tests/script/tsim/stream/session0.sim +++ b/tests/script/tsim/stream/session0.sim @@ -201,7 +201,7 @@ if $loop_count == 10 then endi if $rows != 1 then - print ======$rows + print =====rows=$rows goto loop2 endi