From 3e29f1978270034973b303ec43bbb6df28000c01 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao@163.com> Date: Fri, 18 Nov 2022 17:23:07 +0800 Subject: [PATCH] fix:stream state abnormal failure --- include/libs/stream/streamState.h | 11 ++- source/libs/stream/src/streamState.c | 142 +++++++++++++++------------ 2 files changed, 87 insertions(+), 66 deletions(-) diff --git a/include/libs/stream/streamState.h b/include/libs/stream/streamState.h index 7234d306a4..59f030a60c 100644 --- a/include/libs/stream/streamState.h +++ b/include/libs/stream/streamState.h @@ -27,8 +27,7 @@ typedef struct SStreamTask SStreamTask; typedef bool (*state_key_cmpr_fn)(void* pKey1, void* pKey2); -// incremental state storage -typedef struct { +typedef struct STdbState { SStreamTask* pOwner; TDB* db; TTB* pStateDb; @@ -37,7 +36,12 @@ typedef struct { TTB* pSessionStateDb; TTB* pParNameDb; TXN txn; - int32_t number; +} STdbState; + +// incremental state storage +typedef struct { + STdbState* pTdbState; + int32_t number; } SStreamState; SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath, int32_t szPage, int32_t pages); @@ -45,6 +49,7 @@ void streamStateClose(SStreamState* pState); int32_t streamStateBegin(SStreamState* pState); int32_t streamStateCommit(SStreamState* pState); int32_t streamStateAbort(SStreamState* pState); +void streamStateDestroy(SStreamState* pState); typedef struct { TBC* pCur; diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index 63ec0caa95..0374e22a4a 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -114,6 +114,12 @@ SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath, int terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } + pState->pTdbState = taosMemoryCalloc(1, sizeof(STdbState)); + if (pState->pTdbState == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + streamStateDestroy(pState); + return NULL; + } char statePath[1024]; if (!specPath) { @@ -122,31 +128,34 @@ SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath, int memset(statePath, 0, 1024); tstrncpy(statePath, path, 1024); } - if (tdbOpen(statePath, szPage, pages, &pState->db, 0) < 0) { + if (tdbOpen(statePath, szPage, pages, &pState->pTdbState->db, 0) < 0) { goto _err; } // open state storage backend - if (tdbTbOpen("state.db", sizeof(SStateKey), -1, stateKeyCmpr, pState->db, &pState->pStateDb, 0) < 0) { + if (tdbTbOpen("state.db", sizeof(SStateKey), -1, stateKeyCmpr, pState->pTdbState->db, &pState->pTdbState->pStateDb, + 0) < 0) { goto _err; } // todo refactor - if (tdbTbOpen("fill.state.db", sizeof(SWinKey), -1, winKeyCmpr, pState->db, &pState->pFillStateDb, 0) < 0) { + if (tdbTbOpen("fill.state.db", sizeof(SWinKey), -1, winKeyCmpr, pState->pTdbState->db, + &pState->pTdbState->pFillStateDb, 0) < 0) { goto _err; } - if (tdbTbOpen("session.state.db", sizeof(SStateSessionKey), -1, stateSessionKeyCmpr, pState->db, - &pState->pSessionStateDb, 0) < 0) { + if (tdbTbOpen("session.state.db", sizeof(SStateSessionKey), -1, stateSessionKeyCmpr, pState->pTdbState->db, + &pState->pTdbState->pSessionStateDb, 0) < 0) { goto _err; } - if (tdbTbOpen("func.state.db", sizeof(STupleKey), -1, STupleKeyCmpr, pState->db, &pState->pFuncStateDb, 0) < 0) { + if (tdbTbOpen("func.state.db", sizeof(STupleKey), -1, STupleKeyCmpr, pState->pTdbState->db, + &pState->pTdbState->pFuncStateDb, 0) < 0) { goto _err; } - if (tdbTbOpen("parname.state.db", sizeof(int64_t), TSDB_TABLE_NAME_LEN, NULL, pState->db, &pState->pParNameDb, 0) < - 0) { + if (tdbTbOpen("parname.state.db", sizeof(int64_t), TSDB_TABLE_NAME_LEN, NULL, pState->pTdbState->db, + &pState->pTdbState->pParNameDb, 0) < 0) { goto _err; } @@ -154,117 +163,117 @@ SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath, int goto _err; } - pState->pOwner = pTask; + pState->pTdbState->pOwner = pTask; return pState; _err: - tdbTbClose(pState->pStateDb); - tdbTbClose(pState->pFuncStateDb); - tdbTbClose(pState->pFillStateDb); - tdbTbClose(pState->pSessionStateDb); - tdbTbClose(pState->pParNameDb); - tdbClose(pState->db); - taosMemoryFree(pState); + tdbTbClose(pState->pTdbState->pStateDb); + tdbTbClose(pState->pTdbState->pFuncStateDb); + tdbTbClose(pState->pTdbState->pFillStateDb); + tdbTbClose(pState->pTdbState->pSessionStateDb); + tdbTbClose(pState->pTdbState->pParNameDb); + tdbClose(pState->pTdbState->db); + streamStateDestroy(pState); return NULL; } void streamStateClose(SStreamState* pState) { - tdbCommit(pState->db, &pState->txn); - tdbPostCommit(pState->db, &pState->txn); - tdbTbClose(pState->pStateDb); - tdbTbClose(pState->pFuncStateDb); - tdbTbClose(pState->pFillStateDb); - tdbTbClose(pState->pSessionStateDb); - tdbTbClose(pState->pParNameDb); - tdbClose(pState->db); + tdbCommit(pState->pTdbState->db, &pState->pTdbState->txn); + tdbPostCommit(pState->pTdbState->db, &pState->pTdbState->txn); + tdbTbClose(pState->pTdbState->pStateDb); + tdbTbClose(pState->pTdbState->pFuncStateDb); + tdbTbClose(pState->pTdbState->pFillStateDb); + tdbTbClose(pState->pTdbState->pSessionStateDb); + tdbTbClose(pState->pTdbState->pParNameDb); + tdbClose(pState->pTdbState->db); - taosMemoryFree(pState); + streamStateDestroy(pState); } int32_t streamStateBegin(SStreamState* pState) { - if (tdbTxnOpen(&pState->txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < - 0) { + if (tdbTxnOpen(&pState->pTdbState->txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, + TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) { return -1; } - if (tdbBegin(pState->db, &pState->txn) < 0) { - tdbTxnClose(&pState->txn); + if (tdbBegin(pState->pTdbState->db, &pState->pTdbState->txn) < 0) { + tdbTxnClose(&pState->pTdbState->txn); return -1; } return 0; } int32_t streamStateCommit(SStreamState* pState) { - if (tdbCommit(pState->db, &pState->txn) < 0) { + if (tdbCommit(pState->pTdbState->db, &pState->pTdbState->txn) < 0) { return -1; } - if (tdbPostCommit(pState->db, &pState->txn) < 0) { + if (tdbPostCommit(pState->pTdbState->db, &pState->pTdbState->txn) < 0) { return -1; } - memset(&pState->txn, 0, sizeof(TXN)); - if (tdbTxnOpen(&pState->txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < - 0) { + memset(&pState->pTdbState->txn, 0, sizeof(TXN)); + if (tdbTxnOpen(&pState->pTdbState->txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, + TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) { return -1; } - if (tdbBegin(pState->db, &pState->txn) < 0) { + if (tdbBegin(pState->pTdbState->db, &pState->pTdbState->txn) < 0) { return -1; } return 0; } int32_t streamStateAbort(SStreamState* pState) { - if (tdbAbort(pState->db, &pState->txn) < 0) { + if (tdbAbort(pState->pTdbState->db, &pState->pTdbState->txn) < 0) { return -1; } - memset(&pState->txn, 0, sizeof(TXN)); - if (tdbTxnOpen(&pState->txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < - 0) { + memset(&pState->pTdbState->txn, 0, sizeof(TXN)); + if (tdbTxnOpen(&pState->pTdbState->txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, + TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) { return -1; } - if (tdbBegin(pState->db, &pState->txn) < 0) { + if (tdbBegin(pState->pTdbState->db, &pState->pTdbState->txn) < 0) { return -1; } return 0; } int32_t streamStateFuncPut(SStreamState* pState, const STupleKey* key, const void* value, int32_t vLen) { - return tdbTbUpsert(pState->pFuncStateDb, key, sizeof(STupleKey), value, vLen, &pState->txn); + return tdbTbUpsert(pState->pTdbState->pFuncStateDb, key, sizeof(STupleKey), value, vLen, &pState->pTdbState->txn); } int32_t streamStateFuncGet(SStreamState* pState, const STupleKey* key, void** pVal, int32_t* pVLen) { - return tdbTbGet(pState->pFuncStateDb, key, sizeof(STupleKey), pVal, pVLen); + return tdbTbGet(pState->pTdbState->pFuncStateDb, key, sizeof(STupleKey), pVal, pVLen); } int32_t streamStateFuncDel(SStreamState* pState, const STupleKey* key) { - return tdbTbDelete(pState->pFuncStateDb, key, sizeof(STupleKey), &pState->txn); + return tdbTbDelete(pState->pTdbState->pFuncStateDb, key, sizeof(STupleKey), &pState->pTdbState->txn); } // todo refactor int32_t streamStatePut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen) { SStateKey sKey = {.key = *key, .opNum = pState->number}; - return tdbTbUpsert(pState->pStateDb, &sKey, sizeof(SStateKey), value, vLen, &pState->txn); + return tdbTbUpsert(pState->pTdbState->pStateDb, &sKey, sizeof(SStateKey), value, vLen, &pState->pTdbState->txn); } // todo refactor int32_t streamStateFillPut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen) { - return tdbTbUpsert(pState->pFillStateDb, key, sizeof(SWinKey), value, vLen, &pState->txn); + return tdbTbUpsert(pState->pTdbState->pFillStateDb, key, sizeof(SWinKey), value, vLen, &pState->pTdbState->txn); } // todo refactor int32_t streamStateGet(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) { SStateKey sKey = {.key = *key, .opNum = pState->number}; - return tdbTbGet(pState->pStateDb, &sKey, sizeof(SStateKey), pVal, pVLen); + return tdbTbGet(pState->pTdbState->pStateDb, &sKey, sizeof(SStateKey), pVal, pVLen); } // todo refactor int32_t streamStateFillGet(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) { - return tdbTbGet(pState->pFillStateDb, key, sizeof(SWinKey), pVal, pVLen); + return tdbTbGet(pState->pTdbState->pFillStateDb, key, sizeof(SWinKey), pVal, pVLen); } // todo refactor int32_t streamStateDel(SStreamState* pState, const SWinKey* key) { SStateKey sKey = {.key = *key, .opNum = pState->number}; - return tdbTbDelete(pState->pStateDb, &sKey, sizeof(SStateKey), &pState->txn); + return tdbTbDelete(pState->pTdbState->pStateDb, &sKey, sizeof(SStateKey), &pState->pTdbState->txn); } int32_t streamStateClear(SStreamState* pState) { @@ -288,7 +297,7 @@ void streamStateSetNumber(SStreamState* pState, int32_t number) { pState->number // todo refactor int32_t streamStateFillDel(SStreamState* pState, const SWinKey* key) { - return tdbTbDelete(pState->pFillStateDb, key, sizeof(SWinKey), &pState->txn); + return tdbTbDelete(pState->pTdbState->pFillStateDb, key, sizeof(SWinKey), &pState->pTdbState->txn); } int32_t streamStateAddIfNotExist(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) { @@ -314,7 +323,7 @@ int32_t streamStateReleaseBuf(SStreamState* pState, const SWinKey* key, void* pV SStreamStateCur* streamStateGetCur(SStreamState* pState, const SWinKey* key) { SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); if (pCur == NULL) return NULL; - tdbTbcOpen(pState->pStateDb, &pCur->pCur, NULL); + tdbTbcOpen(pState->pTdbState->pStateDb, &pCur->pCur, NULL); int32_t c = 0; SStateKey sKey = {.key = *key, .opNum = pState->number}; @@ -330,7 +339,7 @@ SStreamStateCur* streamStateGetCur(SStreamState* pState, const SWinKey* key) { SStreamStateCur* streamStateFillGetCur(SStreamState* pState, const SWinKey* key) { SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); if (pCur == NULL) return NULL; - tdbTbcOpen(pState->pFillStateDb, &pCur->pCur, NULL); + tdbTbcOpen(pState->pTdbState->pFillStateDb, &pCur->pCur, NULL); int32_t c = 0; tdbTbcMoveTo(pCur->pCur, key, sizeof(SWinKey), &c); @@ -422,7 +431,7 @@ SStreamStateCur* streamStateSeekKeyNext(SStreamState* pState, const SWinKey* key return NULL; } pCur->number = pState->number; - if (tdbTbcOpen(pState->pStateDb, &pCur->pCur, NULL) < 0) { + if (tdbTbcOpen(pState->pTdbState->pStateDb, &pCur->pCur, NULL) < 0) { streamStateFreeCur(pCur); return NULL; } @@ -448,7 +457,7 @@ SStreamStateCur* streamStateFillSeekKeyNext(SStreamState* pState, const SWinKey* if (!pCur) { return NULL; } - if (tdbTbcOpen(pState->pFillStateDb, &pCur->pCur, NULL) < 0) { + if (tdbTbcOpen(pState->pTdbState->pFillStateDb, &pCur->pCur, NULL) < 0) { streamStateFreeCur(pCur); return NULL; } @@ -473,7 +482,7 @@ SStreamStateCur* streamStateFillSeekKeyPrev(SStreamState* pState, const SWinKey* if (pCur == NULL) { return NULL; } - if (tdbTbcOpen(pState->pFillStateDb, &pCur->pCur, NULL) < 0) { + if (tdbTbcOpen(pState->pTdbState->pFillStateDb, &pCur->pCur, NULL) < 0) { streamStateFreeCur(pCur); return NULL; } @@ -520,7 +529,8 @@ void streamFreeVal(void* val) { tdbFree(val); } int32_t streamStateSessionPut(SStreamState* pState, const SSessionKey* key, const void* value, int32_t vLen) { SStateSessionKey sKey = {.key = *key, .opNum = pState->number}; - return tdbTbUpsert(pState->pSessionStateDb, &sKey, sizeof(SStateSessionKey), value, vLen, &pState->txn); + return tdbTbUpsert(pState->pTdbState->pSessionStateDb, &sKey, sizeof(SStateSessionKey), value, vLen, + &pState->pTdbState->txn); } int32_t streamStateSessionGet(SStreamState* pState, SSessionKey* key, void** pVal, int32_t* pVLen) { @@ -543,7 +553,7 @@ int32_t streamStateSessionGet(SStreamState* pState, SSessionKey* key, void** pVa int32_t streamStateSessionDel(SStreamState* pState, const SSessionKey* key) { SStateSessionKey sKey = {.key = *key, .opNum = pState->number}; - return tdbTbDelete(pState->pSessionStateDb, &sKey, sizeof(SStateSessionKey), &pState->txn); + return tdbTbDelete(pState->pTdbState->pSessionStateDb, &sKey, sizeof(SStateSessionKey), &pState->pTdbState->txn); } SStreamStateCur* streamStateSessionSeekKeyCurrentPrev(SStreamState* pState, const SSessionKey* key) { @@ -552,7 +562,7 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentPrev(SStreamState* pState, cons return NULL; } pCur->number = pState->number; - if (tdbTbcOpen(pState->pSessionStateDb, &pCur->pCur, NULL) < 0) { + if (tdbTbcOpen(pState->pTdbState->pSessionStateDb, &pCur->pCur, NULL) < 0) { streamStateFreeCur(pCur); return NULL; } @@ -579,7 +589,7 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentNext(SStreamState* pState, cons return NULL; } pCur->number = pState->number; - if (tdbTbcOpen(pState->pSessionStateDb, &pCur->pCur, NULL) < 0) { + if (tdbTbcOpen(pState->pTdbState->pSessionStateDb, &pCur->pCur, NULL) < 0) { streamStateFreeCur(pCur); return NULL; } @@ -607,7 +617,7 @@ SStreamStateCur* streamStateSessionSeekKeyNext(SStreamState* pState, const SSess return NULL; } pCur->number = pState->number; - if (tdbTbcOpen(pState->pSessionStateDb, &pCur->pCur, NULL) < 0) { + if (tdbTbcOpen(pState->pTdbState->pSessionStateDb, &pCur->pCur, NULL) < 0) { streamStateFreeCur(pCur); return NULL; } @@ -674,7 +684,7 @@ int32_t streamStateSessionGetKeyByRange(SStreamState* pState, const SSessionKey* return -1; } pCur->number = pState->number; - if (tdbTbcOpen(pState->pSessionStateDb, &pCur->pCur, NULL) < 0) { + if (tdbTbcOpen(pState->pTdbState->pSessionStateDb, &pCur->pCur, NULL) < 0) { streamStateFreeCur(pCur); return -1; } @@ -821,13 +831,19 @@ _end: } int32_t streamStatePutParName(SStreamState* pState, int64_t groupId, const char tbname[TSDB_TABLE_NAME_LEN]) { - tdbTbUpsert(pState->pParNameDb, &groupId, sizeof(int64_t), tbname, TSDB_TABLE_NAME_LEN, &pState->txn); + tdbTbUpsert(pState->pTdbState->pParNameDb, &groupId, sizeof(int64_t), tbname, TSDB_TABLE_NAME_LEN, + &pState->pTdbState->txn); return 0; } int32_t streamStateGetParName(SStreamState* pState, int64_t groupId, void** pVal) { int32_t len; - return tdbTbGet(pState->pParNameDb, &groupId, sizeof(int64_t), pVal, &len); + return tdbTbGet(pState->pTdbState->pParNameDb, &groupId, sizeof(int64_t), pVal, &len); +} + +void streamStateDestroy(SStreamState* pState) { + taosMemoryFreeClear(pState->pTdbState); + taosMemoryFreeClear(pState); } #if 0 @@ -837,7 +853,7 @@ char* streamStateSessionDump(SStreamState* pState) { return NULL; } pCur->number = pState->number; - if (tdbTbcOpen(pState->pSessionStateDb, &pCur->pCur, NULL) < 0) { + if (tdbTbcOpen(pState->pTdbState->pSessionStateDb, &pCur->pCur, NULL) < 0) { streamStateFreeCur(pCur); return NULL; } -- GitLab