未验证 提交 7af6a0fe 编写于 作者: D dapan1121 提交者: GitHub

Merge pull request #17963 from taosdata/feature/stream_ly

fix(stream): state window, error result after deleting data
...@@ -60,19 +60,19 @@ int32_t streamStateDel(SStreamState* pState, const SWinKey* key); ...@@ -60,19 +60,19 @@ int32_t streamStateDel(SStreamState* pState, const SWinKey* key);
int32_t streamStateClear(SStreamState* pState); int32_t streamStateClear(SStreamState* pState);
void streamStateSetNumber(SStreamState* pState, int32_t number); void streamStateSetNumber(SStreamState* pState, int32_t number);
int32_t streamStateSessionAddIfNotExist(SStreamState* pState, SSessionKey* key, void** pVal, int32_t* pVLen); int32_t streamStateSessionAddIfNotExist(SStreamState* pState, SSessionKey* key, TSKEY gap, void** pVal, int32_t* pVLen);
int32_t streamStateSessionPut(SStreamState* pState, const SSessionKey* key, const void* value, int32_t vLen); int32_t streamStateSessionPut(SStreamState* pState, const SSessionKey* key, const void* value, int32_t vLen);
int32_t streamStateSessionGet(SStreamState* pState, SSessionKey* key, void** pVal, int32_t* pVLen); int32_t streamStateSessionGet(SStreamState* pState, SSessionKey* key, void** pVal, int32_t* pVLen);
int32_t streamStateSessionDel(SStreamState* pState, const SSessionKey* key); int32_t streamStateSessionDel(SStreamState* pState, const SSessionKey* key);
int32_t streamStateSessionClear(SStreamState* pState); int32_t streamStateSessionClear(SStreamState* pState);
int32_t streamStateSessionGetKVByCur(SStreamStateCur* pCur, SSessionKey* pKey, const void** pVal, int32_t* pVLen); int32_t streamStateSessionGetKVByCur(SStreamStateCur* pCur, SSessionKey* pKey, void** pVal, int32_t* pVLen);
int32_t streamStateStateAddIfNotExist(SStreamState* pState, SSessionKey* key, char* pKeyData, int32_t keyDataLen, int32_t streamStateStateAddIfNotExist(SStreamState* pState, SSessionKey* key, char* pKeyData, int32_t keyDataLen,
state_key_cmpr_fn fn, void** pVal, int32_t* pVLen); state_key_cmpr_fn fn, void** pVal, int32_t* pVLen);
int32_t streamStateSessionGetKey(SStreamState* pState, const SSessionKey* key, SSessionKey* curKey); int32_t streamStateSessionGetKeyByRange(SStreamState* pState, const SSessionKey* range, SSessionKey* curKey);
SStreamStateCur* streamStateSessionSeekKeyNext(SStreamState* pState, const SSessionKey* key); SStreamStateCur* streamStateSessionSeekKeyNext(SStreamState* pState, const SSessionKey* key);
SStreamStateCur* streamStateSessionSeekKeyCurrentPrev(SStreamState* pState, const SSessionKey* key); SStreamStateCur* streamStateSessionSeekKeyCurrentPrev(SStreamState* pState, const SSessionKey* key);
SStreamStateCur* streamStateSessionGetCur(SStreamState* pState, const SSessionKey* key); SStreamStateCur* streamStateSessionSeekKeyCurrentNext(SStreamState* pState, const SSessionKey* key);
int32_t streamStateFillPut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen); int32_t streamStateFillPut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen);
int32_t streamStateFillGet(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen); int32_t streamStateFillGet(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen);
...@@ -99,7 +99,9 @@ int32_t streamStateSeekLast(SStreamState* pState, SStreamStateCur* pCur); ...@@ -99,7 +99,9 @@ int32_t streamStateSeekLast(SStreamState* pState, SStreamStateCur* pCur);
int32_t streamStateCurNext(SStreamState* pState, SStreamStateCur* pCur); int32_t streamStateCurNext(SStreamState* pState, SStreamStateCur* pCur);
int32_t streamStateCurPrev(SStreamState* pState, SStreamStateCur* pCur); int32_t streamStateCurPrev(SStreamState* pState, SStreamStateCur* pCur);
// char* streamStateSessionDump(SStreamState* pState); #if 0
char* streamStateSessionDump(SStreamState* pState);
#endif
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -1438,7 +1438,7 @@ static int32_t generateSessionScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSr ...@@ -1438,7 +1438,7 @@ static int32_t generateSessionScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSr
uint64_t groupId = getGroupIdByData(pInfo, uidCol[i], startData[i], version); uint64_t groupId = getGroupIdByData(pInfo, uidCol[i], startData[i], version);
// gap must be 0. // gap must be 0.
SSessionKey startWin = {0}; SSessionKey startWin = {0};
getCurSessionWindow(pInfo->windowSup.pStreamAggSup, startData[i], endData[i], groupId, &startWin); getCurSessionWindow(pInfo->windowSup.pStreamAggSup, startData[i], startData[i], groupId, &startWin);
if (IS_INVALID_SESSION_WIN_KEY(startWin)) { if (IS_INVALID_SESSION_WIN_KEY(startWin)) {
// window has been closed. // window has been closed.
continue; continue;
......
...@@ -3550,7 +3550,7 @@ void getCurSessionWindow(SStreamAggSupporter* pAggSup, TSKEY startTs, TSKEY endT ...@@ -3550,7 +3550,7 @@ void getCurSessionWindow(SStreamAggSupporter* pAggSup, TSKEY startTs, TSKEY endT
pKey->win.skey = startTs; pKey->win.skey = startTs;
pKey->win.ekey = endTs; pKey->win.ekey = endTs;
pKey->groupId = groupId; pKey->groupId = groupId;
int32_t code = streamStateSessionGetKey(pAggSup->pState, pKey, pKey); int32_t code = streamStateSessionGetKeyByRange(pAggSup->pState, pKey, pKey);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
SET_SESSION_WIN_KEY_INVALID(pKey); SET_SESSION_WIN_KEY_INVALID(pKey);
} }
...@@ -3561,10 +3561,11 @@ bool isInvalidSessionWin(SResultWindowInfo* pWinInfo) { return pWinInfo->session ...@@ -3561,10 +3561,11 @@ bool isInvalidSessionWin(SResultWindowInfo* pWinInfo) { return pWinInfo->session
void setSessionOutputBuf(SStreamAggSupporter* pAggSup, TSKEY startTs, TSKEY endTs, uint64_t groupId, void setSessionOutputBuf(SStreamAggSupporter* pAggSup, TSKEY startTs, TSKEY endTs, uint64_t groupId,
SResultWindowInfo* pCurWin) { SResultWindowInfo* pCurWin) {
pCurWin->sessionWin.groupId = groupId; pCurWin->sessionWin.groupId = groupId;
pCurWin->sessionWin.win.skey = startTs - pAggSup->gap; pCurWin->sessionWin.win.skey = startTs;
pCurWin->sessionWin.win.ekey = endTs + pAggSup->gap; pCurWin->sessionWin.win.ekey = endTs;
int32_t size = pAggSup->resultRowSize; int32_t size = pAggSup->resultRowSize;
int32_t code = streamStateSessionAddIfNotExist(pAggSup->pState, &pCurWin->sessionWin, &pCurWin->pOutputBuf, &size); int32_t code =
streamStateSessionAddIfNotExist(pAggSup->pState, &pCurWin->sessionWin, pAggSup->gap, &pCurWin->pOutputBuf, &size);
if (code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
pCurWin->isOutput = true; pCurWin->isOutput = true;
} else { } else {
...@@ -3575,7 +3576,7 @@ void setSessionOutputBuf(SStreamAggSupporter* pAggSup, TSKEY startTs, TSKEY endT ...@@ -3575,7 +3576,7 @@ void setSessionOutputBuf(SStreamAggSupporter* pAggSup, TSKEY startTs, TSKEY endT
int32_t getSessionWinBuf(SStreamAggSupporter* pAggSup, SStreamStateCur* pCur, SResultWindowInfo* pWinInfo) { int32_t getSessionWinBuf(SStreamAggSupporter* pAggSup, SStreamStateCur* pCur, SResultWindowInfo* pWinInfo) {
int32_t size = 0; int32_t size = 0;
int32_t code = streamStateSessionGetKVByCur(pCur, &pWinInfo->sessionWin, (const void**)&pWinInfo->pOutputBuf, &size); int32_t code = streamStateSessionGetKVByCur(pCur, &pWinInfo->sessionWin, &pWinInfo->pOutputBuf, &size);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
...@@ -3680,7 +3681,7 @@ SStreamStateCur* getNextSessionWinInfo(SStreamAggSupporter* pAggSup, SSHashObj* ...@@ -3680,7 +3681,7 @@ SStreamStateCur* getNextSessionWinInfo(SStreamAggSupporter* pAggSup, SSHashObj*
setSessionWinOutputInfo(pStUpdated, pNextWin); setSessionWinOutputInfo(pStUpdated, pNextWin);
int32_t size = 0; int32_t size = 0;
pNextWin->sessionWin = pCurWin->sessionWin; pNextWin->sessionWin = pCurWin->sessionWin;
int32_t code = streamStateSessionGetKVByCur(pCur, &pNextWin->sessionWin, (const void**)&pNextWin->pOutputBuf, &size); int32_t code = streamStateSessionGetKVByCur(pCur, &pNextWin->sessionWin, &pNextWin->pOutputBuf, &size);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
SET_SESSION_WIN_INVALID(*pNextWin); SET_SESSION_WIN_INVALID(*pNextWin);
} }
...@@ -3894,9 +3895,11 @@ static void rebuildSessionWindow(SOperatorInfo* pOperator, SArray* pWinArray, SS ...@@ -3894,9 +3895,11 @@ static void rebuildSessionWindow(SOperatorInfo* pOperator, SArray* pWinArray, SS
SOperatorInfo* pChild = taosArrayGetP(pInfo->pChildren, j); SOperatorInfo* pChild = taosArrayGetP(pInfo->pChildren, j);
SStreamSessionAggOperatorInfo* pChInfo = pChild->info; SStreamSessionAggOperatorInfo* pChInfo = pChild->info;
SStreamAggSupporter* pChAggSup = &pChInfo->streamAggSup; SStreamAggSupporter* pChAggSup = &pChInfo->streamAggSup;
SStreamStateCur* pCur = streamStateSessionGetCur(pChAggSup->pState, pWinKey); SSessionKey chWinKey = *pWinKey;
SResultRow* pResult = NULL; chWinKey.win.ekey = chWinKey.win.skey;
SResultRow* pChResult = NULL; SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentNext(pChAggSup->pState, &chWinKey);
SResultRow* pResult = NULL;
SResultRow* pChResult = NULL;
while (1) { while (1) {
SResultWindowInfo childWin = {0}; SResultWindowInfo childWin = {0};
childWin.sessionWin = *pWinKey; childWin.sessionWin = *pWinKey;
...@@ -4112,6 +4115,12 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { ...@@ -4112,6 +4115,12 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
initGroupResInfoFromArrayList(&pInfo->groupResInfo, pUpdated); initGroupResInfoFromArrayList(&pInfo->groupResInfo, pUpdated);
blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
#if 0
char* pBuf = streamStateSessionDump(pAggSup->pState);
qDebug("===stream===final session%s", pBuf);
taosMemoryFree(pBuf);
#endif
doBuildDeleteDataBlock(pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator); doBuildDeleteDataBlock(pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
if (pInfo->pDelRes->info.rows > 0) { if (pInfo->pDelRes->info.rows > 0) {
printDataBlock(pInfo->pDelRes, IS_FINAL_OP(pInfo) ? "final session" : "single session"); printDataBlock(pInfo->pDelRes, IS_FINAL_OP(pInfo) ? "final session" : "single session");
...@@ -4306,6 +4315,12 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) { ...@@ -4306,6 +4315,12 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) {
initGroupResInfoFromArrayList(&pInfo->groupResInfo, pUpdated); initGroupResInfoFromArrayList(&pInfo->groupResInfo, pUpdated);
blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity); blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity);
#if 0
char* pBuf = streamStateSessionDump(pAggSup->pState);
qDebug("===stream===semi session%s", pBuf);
taosMemoryFree(pBuf);
#endif
doBuildSessionResult(pOperator, pAggSup->pState, &pInfo->groupResInfo, pBInfo->pRes); doBuildSessionResult(pOperator, pAggSup->pState, &pInfo->groupResInfo, pBInfo->pRes);
if (pBInfo->pRes->info.rows > 0) { if (pBInfo->pRes->info.rows > 0) {
printDataBlock(pBInfo->pRes, "semi session"); printDataBlock(pBInfo->pRes, "semi session");
......
...@@ -29,7 +29,7 @@ typedef struct SStateSessionKey { ...@@ -29,7 +29,7 @@ typedef struct SStateSessionKey {
int64_t opNum; int64_t opNum;
} SStateSessionKey; } SStateSessionKey;
static inline int sessionKeyCmpr(const SSessionKey* pWin1, const SSessionKey* pWin2) { static inline int sessionRangeKeyCmpr(const SSessionKey* pWin1, const SSessionKey* pWin2) {
if (pWin1->groupId > pWin2->groupId) { if (pWin1->groupId > pWin2->groupId) {
return 1; return 1;
} else if (pWin1->groupId < pWin2->groupId) { } else if (pWin1->groupId < pWin2->groupId) {
...@@ -45,6 +45,28 @@ static inline int sessionKeyCmpr(const SSessionKey* pWin1, const SSessionKey* pW ...@@ -45,6 +45,28 @@ static inline int sessionKeyCmpr(const SSessionKey* pWin1, const SSessionKey* pW
return 0; return 0;
} }
static inline int sessionWinKeyCmpr(const SSessionKey* pWin1, const SSessionKey* pWin2) {
if (pWin1->groupId > pWin2->groupId) {
return 1;
} else if (pWin1->groupId < pWin2->groupId) {
return -1;
}
if (pWin1->win.skey > pWin2->win.skey) {
return 1;
} else if (pWin1->win.skey < pWin2->win.skey) {
return -1;
}
if (pWin1->win.ekey > pWin2->win.ekey) {
return 1;
} else if (pWin1->win.ekey < pWin2->win.ekey) {
return -1;
}
return 0;
}
static inline int stateSessionKeyCmpr(const void* pKey1, int kLen1, const void* pKey2, int kLen2) { static inline int stateSessionKeyCmpr(const void* pKey1, int kLen1, const void* pKey2, int kLen2) {
SStateSessionKey* pWin1 = (SStateSessionKey*)pKey1; SStateSessionKey* pWin1 = (SStateSessionKey*)pKey1;
SStateSessionKey* pWin2 = (SStateSessionKey*)pKey2; SStateSessionKey* pWin2 = (SStateSessionKey*)pKey2;
...@@ -55,7 +77,7 @@ static inline int stateSessionKeyCmpr(const void* pKey1, int kLen1, const void* ...@@ -55,7 +77,7 @@ static inline int stateSessionKeyCmpr(const void* pKey1, int kLen1, const void*
return -1; return -1;
} }
return sessionKeyCmpr(&pWin1->key, &pWin2->key); return sessionWinKeyCmpr(&pWin1->key, &pWin2->key);
} }
static inline int stateKeyCmpr(const void* pKey1, int kLen1, const void* pKey2, int kLen2) { static inline int stateKeyCmpr(const void* pKey1, int kLen1, const void* pKey2, int kLen2) {
...@@ -400,7 +422,6 @@ SStreamStateCur* streamStateSeekKeyNext(SStreamState* pState, const SWinKey* key ...@@ -400,7 +422,6 @@ SStreamStateCur* streamStateSeekKeyNext(SStreamState* pState, const SWinKey* key
SStateKey sKey = {.key = *key, .opNum = pState->number}; SStateKey sKey = {.key = *key, .opNum = pState->number};
int32_t c = 0; int32_t c = 0;
if (tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateKey), &c) < 0) { if (tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateKey), &c) < 0) {
tdbTbcClose(pCur->pCur);
streamStateFreeCur(pCur); streamStateFreeCur(pCur);
return NULL; return NULL;
} }
...@@ -426,7 +447,6 @@ SStreamStateCur* streamStateFillSeekKeyNext(SStreamState* pState, const SWinKey* ...@@ -426,7 +447,6 @@ SStreamStateCur* streamStateFillSeekKeyNext(SStreamState* pState, const SWinKey*
int32_t c = 0; int32_t c = 0;
if (tdbTbcMoveTo(pCur->pCur, key, sizeof(SWinKey), &c) < 0) { if (tdbTbcMoveTo(pCur->pCur, key, sizeof(SWinKey), &c) < 0) {
tdbTbcClose(pCur->pCur);
streamStateFreeCur(pCur); streamStateFreeCur(pCur);
return NULL; return NULL;
} }
...@@ -452,7 +472,6 @@ SStreamStateCur* streamStateFillSeekKeyPrev(SStreamState* pState, const SWinKey* ...@@ -452,7 +472,6 @@ SStreamStateCur* streamStateFillSeekKeyPrev(SStreamState* pState, const SWinKey*
int32_t c = 0; int32_t c = 0;
if (tdbTbcMoveTo(pCur->pCur, key, sizeof(SWinKey), &c) < 0) { if (tdbTbcMoveTo(pCur->pCur, key, sizeof(SWinKey), &c) < 0) {
tdbTbcClose(pCur->pCur);
streamStateFreeCur(pCur); streamStateFreeCur(pCur);
return NULL; return NULL;
} }
...@@ -496,33 +515,18 @@ int32_t streamStateSessionPut(SStreamState* pState, const SSessionKey* key, cons ...@@ -496,33 +515,18 @@ int32_t streamStateSessionPut(SStreamState* pState, const SSessionKey* key, cons
return tdbTbUpsert(pState->pSessionStateDb, &sKey, sizeof(SStateSessionKey), value, vLen, &pState->txn); return tdbTbUpsert(pState->pSessionStateDb, &sKey, sizeof(SStateSessionKey), value, vLen, &pState->txn);
} }
SStreamStateCur* streamStateSessionGetRanomCur(SStreamState* pState, const SSessionKey* key) {
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
if (pCur == NULL) return NULL;
tdbTbcOpen(pState->pSessionStateDb, &pCur->pCur, NULL);
int32_t c = -2;
SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateSessionKey), &c);
if (c != 0) {
streamStateFreeCur(pCur);
return NULL;
}
pCur->number = pState->number;
return pCur;
}
int32_t streamStateSessionGet(SStreamState* pState, SSessionKey* key, void** pVal, int32_t* pVLen) { int32_t streamStateSessionGet(SStreamState* pState, SSessionKey* key, void** pVal, int32_t* pVLen) {
SStreamStateCur* pCur = streamStateSessionGetRanomCur(pState, key); SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentNext(pState, key);
SSessionKey resKey = *key;
void* tmp = NULL; void* tmp = NULL;
if (streamStateSessionGetKVByCur(pCur, key, (const void**)&tmp, pVLen) == 0) { int32_t code = streamStateSessionGetKVByCur(pCur, &resKey, &tmp, pVLen);
if (code == 0) {
*key = resKey;
*pVal = tdbRealloc(NULL, *pVLen); *pVal = tdbRealloc(NULL, *pVLen);
memcpy(*pVal, tmp, *pVLen); memcpy(*pVal, tmp, *pVLen);
streamStateFreeCur(pCur);
return 0;
} }
streamStateFreeCur(pCur); streamStateFreeCur(pCur);
return -1; return code;
} }
int32_t streamStateSessionDel(SStreamState* pState, const SSessionKey* key) { int32_t streamStateSessionDel(SStreamState* pState, const SSessionKey* key) {
...@@ -544,7 +548,6 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentPrev(SStreamState* pState, cons ...@@ -544,7 +548,6 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentPrev(SStreamState* pState, cons
SStateSessionKey sKey = {.key = *key, .opNum = pState->number}; SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
int32_t c = 0; int32_t c = 0;
if (tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateSessionKey), &c) < 0) { if (tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateSessionKey), &c) < 0) {
tdbTbcClose(pCur->pCur);
streamStateFreeCur(pCur); streamStateFreeCur(pCur);
return NULL; return NULL;
} }
...@@ -558,6 +561,34 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentPrev(SStreamState* pState, cons ...@@ -558,6 +561,34 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentPrev(SStreamState* pState, cons
return pCur; return pCur;
} }
SStreamStateCur* streamStateSessionSeekKeyCurrentNext(SStreamState* pState, const SSessionKey* key) {
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
if (pCur == NULL) {
return NULL;
}
pCur->number = pState->number;
if (tdbTbcOpen(pState->pSessionStateDb, &pCur->pCur, NULL) < 0) {
streamStateFreeCur(pCur);
return NULL;
}
SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
int32_t c = 0;
if (tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateSessionKey), &c) < 0) {
streamStateFreeCur(pCur);
return NULL;
}
if (c <= 0) return pCur;
if (tdbTbcMoveToNext(pCur->pCur) < 0) {
streamStateFreeCur(pCur);
return NULL;
}
return pCur;
}
SStreamStateCur* streamStateSessionSeekKeyNext(SStreamState* pState, const SSessionKey* key) { SStreamStateCur* streamStateSessionSeekKeyNext(SStreamState* pState, const SSessionKey* key) {
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
if (pCur == NULL) { if (pCur == NULL) {
...@@ -572,7 +603,6 @@ SStreamStateCur* streamStateSessionSeekKeyNext(SStreamState* pState, const SSess ...@@ -572,7 +603,6 @@ SStreamStateCur* streamStateSessionSeekKeyNext(SStreamState* pState, const SSess
SStateSessionKey sKey = {.key = *key, .opNum = pState->number}; SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
int32_t c = 0; int32_t c = 0;
if (tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateSessionKey), &c) < 0) { if (tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateSessionKey), &c) < 0) {
tdbTbcClose(pCur->pCur);
streamStateFreeCur(pCur); streamStateFreeCur(pCur);
return NULL; return NULL;
} }
...@@ -586,13 +616,13 @@ SStreamStateCur* streamStateSessionSeekKeyNext(SStreamState* pState, const SSess ...@@ -586,13 +616,13 @@ SStreamStateCur* streamStateSessionSeekKeyNext(SStreamState* pState, const SSess
return pCur; return pCur;
} }
int32_t streamStateSessionGetKVByCur(SStreamStateCur* pCur, SSessionKey* pKey, const void** pVal, int32_t* pVLen) { int32_t streamStateSessionGetKVByCur(SStreamStateCur* pCur, SSessionKey* pKey, void** pVal, int32_t* pVLen) {
if (!pCur) { if (!pCur) {
return -1; return -1;
} }
const SStateSessionKey* pKTmp = NULL; SStateSessionKey* pKTmp = NULL;
int32_t kLen; int32_t kLen;
if (tdbTbcGet(pCur->pCur, (const void**)&pKTmp, &kLen, pVal, pVLen) < 0) { if (tdbTbcGet(pCur->pCur, (const void**)&pKTmp, &kLen, (const void**)pVal, pVLen) < 0) {
return -1; return -1;
} }
if (pKTmp->opNum != pCur->number) { if (pKTmp->opNum != pCur->number) {
...@@ -606,15 +636,15 @@ int32_t streamStateSessionGetKVByCur(SStreamStateCur* pCur, SSessionKey* pKey, c ...@@ -606,15 +636,15 @@ int32_t streamStateSessionGetKVByCur(SStreamStateCur* pCur, SSessionKey* pKey, c
} }
int32_t streamStateSessionClear(SStreamState* pState) { int32_t streamStateSessionClear(SStreamState* pState) {
SSessionKey key = {.win.skey = 0, .win.ekey = 0, .groupId = 0}; SSessionKey key = {.win.skey = 0, .win.ekey = 0, .groupId = 0};
streamStateSessionPut(pState, &key, NULL, 0); SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentNext(pState, &key);
SStreamStateCur* pCur = streamStateSessionSeekKeyNext(pState, &key);
while (1) { while (1) {
SSessionKey delKey = {0}; SSessionKey delKey = {0};
void* buf = NULL; void* buf = NULL;
int32_t size = 0; int32_t size = 0;
int32_t code = streamStateSessionGetKVByCur(pCur, &delKey, buf, &size); int32_t code = streamStateSessionGetKVByCur(pCur, &delKey, &buf, &size);
if (code == 0) { if (code == 0) {
ASSERT(size > 0);
memset(buf, 0, size); memset(buf, 0, size);
streamStateSessionPut(pState, &delKey, buf, size); streamStateSessionPut(pState, &delKey, buf, size);
} else { } else {
...@@ -623,61 +653,104 @@ int32_t streamStateSessionClear(SStreamState* pState) { ...@@ -623,61 +653,104 @@ int32_t streamStateSessionClear(SStreamState* pState) {
streamStateCurNext(pState, pCur); streamStateCurNext(pState, pCur);
} }
streamStateFreeCur(pCur); streamStateFreeCur(pCur);
streamStateSessionDel(pState, &key);
return 0; return 0;
} }
SStreamStateCur* streamStateSessionGetCur(SStreamState* pState, const SSessionKey* key) { int32_t streamStateSessionGetKeyByRange(SStreamState* pState, const SSessionKey* key, SSessionKey* curKey) {
SStreamStateCur* pCur = streamStateSessionGetRanomCur(pState, key); SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
SSessionKey resKey = *key; if (pCur == NULL) {
while (1) { return -1;
streamStateCurPrev(pState, pCur); }
SSessionKey tmpKey = *key; pCur->number = pState->number;
int32_t code = streamStateSessionGetKVByCur(pCur, &tmpKey, NULL, 0); if (tdbTbcOpen(pState->pSessionStateDb, &pCur->pCur, NULL) < 0) {
if (code == 0 && sessionKeyCmpr(key, &tmpKey) == 0) { streamStateFreeCur(pCur);
resKey = tmpKey; return -1;
} else {
break;
}
} }
streamStateFreeCur(pCur);
return streamStateSessionGetRanomCur(pState, &resKey);
}
int32_t streamStateSessionGetKey(SStreamState* pState, const SSessionKey* key, SSessionKey* curKey) { SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
SStreamStateCur* pCur = streamStateSessionGetRanomCur(pState, key); int32_t c = 0;
SSessionKey resKey = *key; if (tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateSessionKey), &c) < 0) {
int32_t res = -1; streamStateFreeCur(pCur);
while (1) { return -1;
SSessionKey tmpKey = *key; }
int32_t code = streamStateSessionGetKVByCur(pCur, &tmpKey, NULL, 0);
if (code == 0 && sessionKeyCmpr(key, &tmpKey) == 0) { SSessionKey resKey = *key;
res = 0; int32_t code = streamStateSessionGetKVByCur(pCur, &resKey, NULL, 0);
resKey = tmpKey; if (code == 0 && sessionRangeKeyCmpr(key, &resKey) == 0) {
streamStateCurPrev(pState, pCur); *curKey = resKey;
} else { streamStateFreeCur(pCur);
break; return code;
}
if (c > 0) {
streamStateCurNext(pState, pCur);
code = streamStateSessionGetKVByCur(pCur, &resKey, NULL, 0);
if (code == 0 && sessionRangeKeyCmpr(key, &resKey) == 0) {
*curKey = resKey;
streamStateFreeCur(pCur);
return code;
}
} else if (c < 0) {
streamStateCurPrev(pState, pCur);
code = streamStateSessionGetKVByCur(pCur, &resKey, NULL, 0);
if (code == 0 && sessionRangeKeyCmpr(key, &resKey) == 0) {
*curKey = resKey;
streamStateFreeCur(pCur);
return code;
} }
} }
*curKey = resKey;
streamStateFreeCur(pCur); streamStateFreeCur(pCur);
return res; return -1;
} }
int32_t streamStateSessionAddIfNotExist(SStreamState* pState, SSessionKey* key, void** pVal, int32_t* pVLen) { int32_t streamStateSessionAddIfNotExist(SStreamState* pState, SSessionKey* key, TSKEY gap, void** pVal,
int32_t* pVLen) {
// todo refactor // todo refactor
SStreamStateCur* pCur = streamStateSessionGetRanomCur(pState, key); int32_t res = 0;
int32_t size = *pVLen; SSessionKey originKey = *key;
void* tmp = NULL; SSessionKey searchKey = *key;
*pVal = tdbRealloc(NULL, size); searchKey.win.skey = key->win.skey - gap;
memset(*pVal, 0, size); searchKey.win.ekey = key->win.ekey + gap;
if (streamStateSessionGetKVByCur(pCur, key, (const void**)&tmp, pVLen) == 0) { int32_t valSize = *pVLen;
memcpy(*pVal, tmp, *pVLen); void* tmp = tdbRealloc(NULL, valSize);
if (!tmp) {
return -1;
}
SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentPrev(pState, key);
int32_t code = streamStateSessionGetKVByCur(pCur, key, pVal, pVLen);
if (code == 0) {
if (sessionRangeKeyCmpr(&searchKey, key) == 0) {
memcpy(tmp, *pVal, valSize);
streamStateSessionDel(pState, key);
goto _end;
}
streamStateCurNext(pState, pCur);
} else {
*key = originKey;
streamStateFreeCur(pCur); streamStateFreeCur(pCur);
return 0; pCur = streamStateSessionSeekKeyNext(pState, key);
} }
code = streamStateSessionGetKVByCur(pCur, key, pVal, pVLen);
if (code == 0) {
if (sessionRangeKeyCmpr(&searchKey, key) == 0) {
memcpy(tmp, *pVal, valSize);
streamStateSessionDel(pState, key);
goto _end;
}
}
*key = originKey;
res = 1;
memset(tmp, 0, valSize);
_end:
*pVal = tmp;
streamStateFreeCur(pCur); streamStateFreeCur(pCur);
return 1; return res;
} }
int32_t streamStateStateAddIfNotExist(SStreamState* pState, SSessionKey* key, char* pKeyData, int32_t keyDataLen, int32_t streamStateStateAddIfNotExist(SStreamState* pState, SSessionKey* key, char* pKeyData, int32_t keyDataLen,
...@@ -692,16 +765,18 @@ int32_t streamStateStateAddIfNotExist(SStreamState* pState, SSessionKey* key, ch ...@@ -692,16 +765,18 @@ int32_t streamStateStateAddIfNotExist(SStreamState* pState, SSessionKey* key, ch
} }
SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentPrev(pState, key); SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentPrev(pState, key);
int32_t code = streamStateSessionGetKVByCur(pCur, key, (const void**)pVal, pVLen); int32_t code = streamStateSessionGetKVByCur(pCur, key, pVal, pVLen);
if (code == 0) { if (code == 0) {
if (key->win.skey <= tmpKey.win.skey && tmpKey.win.ekey <= key->win.ekey) { if (key->win.skey <= tmpKey.win.skey && tmpKey.win.ekey <= key->win.ekey) {
memcpy(tmp, *pVal, valSize); memcpy(tmp, *pVal, valSize);
streamStateSessionDel(pState, key);
goto _end; goto _end;
} }
void* stateKey = (char*)(*pVal) + (valSize - keyDataLen); void* stateKey = (char*)(*pVal) + (valSize - keyDataLen);
if (fn(pKeyData, stateKey) == true) { if (fn(pKeyData, stateKey) == true) {
memcpy(tmp, *pVal, valSize); memcpy(tmp, *pVal, valSize);
streamStateSessionDel(pState, key);
goto _end; goto _end;
} }
...@@ -712,11 +787,12 @@ int32_t streamStateStateAddIfNotExist(SStreamState* pState, SSessionKey* key, ch ...@@ -712,11 +787,12 @@ int32_t streamStateStateAddIfNotExist(SStreamState* pState, SSessionKey* key, ch
pCur = streamStateSessionSeekKeyNext(pState, key); pCur = streamStateSessionSeekKeyNext(pState, key);
} }
code = streamStateSessionGetKVByCur(pCur, key, (const void**)pVal, pVLen); code = streamStateSessionGetKVByCur(pCur, key, pVal, pVLen);
if (code == 0) { if (code == 0) {
void* stateKey = (char*)(*pVal) + (valSize - keyDataLen); void* stateKey = (char*)(*pVal) + (valSize - keyDataLen);
if (fn(pKeyData, stateKey) == true) { if (fn(pKeyData, stateKey) == true) {
memcpy(tmp, *pVal, valSize); memcpy(tmp, *pVal, valSize);
streamStateSessionDel(pState, key);
goto _end; goto _end;
} }
} }
...@@ -746,8 +822,11 @@ char* streamStateSessionDump(SStreamState* pState) { ...@@ -746,8 +822,11 @@ char* streamStateSessionDump(SStreamState* pState) {
tdbTbcMoveToFirst(pCur->pCur); tdbTbcMoveToFirst(pCur->pCur);
SSessionKey key = {0}; SSessionKey key = {0};
int32_t code = streamStateSessionGetKVByCur(pCur, &key, NULL, 0); void* buf = NULL;
int32_t bufSize = 0;
int32_t code = streamStateSessionGetKVByCur(pCur, &key, &buf, &bufSize);
if (code != 0) { if (code != 0) {
streamStateFreeCur(pCur);
return NULL; return NULL;
} }
...@@ -762,12 +841,14 @@ char* streamStateSessionDump(SStreamState* pState) { ...@@ -762,12 +841,14 @@ char* streamStateSessionDump(SStreamState* pState) {
key = (SSessionKey){0}; key = (SSessionKey){0};
code = streamStateSessionGetKVByCur(pCur, &key, NULL, 0); code = streamStateSessionGetKVByCur(pCur, &key, NULL, 0);
if (code != 0) { if (code != 0) {
streamStateFreeCur(pCur);
return dumpBuf; return dumpBuf;
} }
len += snprintf(dumpBuf + len, size - len, "||s:%15" PRId64 ",", key.win.skey); len += snprintf(dumpBuf + len, size - len, "||s:%15" PRId64 ",", key.win.skey);
len += snprintf(dumpBuf + len, size - len, "e:%15" PRId64 ",", key.win.ekey); len += snprintf(dumpBuf + len, size - len, "e:%15" PRId64 ",", key.win.ekey);
len += snprintf(dumpBuf + len, size - len, "g:%15" PRId64 "||", key.groupId); len += snprintf(dumpBuf + len, size - len, "g:%15" PRId64 "||", key.groupId);
} }
streamStateFreeCur(pCur);
return dumpBuf; return dumpBuf;
} }
#endif #endif
...@@ -201,7 +201,7 @@ if $loop_count == 10 then ...@@ -201,7 +201,7 @@ if $loop_count == 10 then
endi endi
if $rows != 1 then if $rows != 1 then
print ======$rows print =====rows=$rows
goto loop2 goto loop2
endi endi
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册