提交 3e29f197 编写于 作者: 5 54liuyao

fix:stream state abnormal failure

上级 7e8873a3
...@@ -27,8 +27,7 @@ typedef struct SStreamTask SStreamTask; ...@@ -27,8 +27,7 @@ typedef struct SStreamTask SStreamTask;
typedef bool (*state_key_cmpr_fn)(void* pKey1, void* pKey2); typedef bool (*state_key_cmpr_fn)(void* pKey1, void* pKey2);
// incremental state storage typedef struct STdbState {
typedef struct {
SStreamTask* pOwner; SStreamTask* pOwner;
TDB* db; TDB* db;
TTB* pStateDb; TTB* pStateDb;
...@@ -37,7 +36,12 @@ typedef struct { ...@@ -37,7 +36,12 @@ typedef struct {
TTB* pSessionStateDb; TTB* pSessionStateDb;
TTB* pParNameDb; TTB* pParNameDb;
TXN txn; TXN txn;
int32_t number; } STdbState;
// incremental state storage
typedef struct {
STdbState* pTdbState;
int32_t number;
} SStreamState; } SStreamState;
SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath, int32_t szPage, int32_t pages); SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath, int32_t szPage, int32_t pages);
...@@ -45,6 +49,7 @@ void streamStateClose(SStreamState* pState); ...@@ -45,6 +49,7 @@ void streamStateClose(SStreamState* pState);
int32_t streamStateBegin(SStreamState* pState); int32_t streamStateBegin(SStreamState* pState);
int32_t streamStateCommit(SStreamState* pState); int32_t streamStateCommit(SStreamState* pState);
int32_t streamStateAbort(SStreamState* pState); int32_t streamStateAbort(SStreamState* pState);
void streamStateDestroy(SStreamState* pState);
typedef struct { typedef struct {
TBC* pCur; TBC* pCur;
......
...@@ -114,6 +114,12 @@ SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath, int ...@@ -114,6 +114,12 @@ SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath, int
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL; return NULL;
} }
pState->pTdbState = taosMemoryCalloc(1, sizeof(STdbState));
if (pState->pTdbState == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
streamStateDestroy(pState);
return NULL;
}
char statePath[1024]; char statePath[1024];
if (!specPath) { if (!specPath) {
...@@ -122,31 +128,34 @@ SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath, int ...@@ -122,31 +128,34 @@ SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath, int
memset(statePath, 0, 1024); memset(statePath, 0, 1024);
tstrncpy(statePath, path, 1024); tstrncpy(statePath, path, 1024);
} }
if (tdbOpen(statePath, szPage, pages, &pState->db, 0) < 0) { if (tdbOpen(statePath, szPage, pages, &pState->pTdbState->db, 0) < 0) {
goto _err; goto _err;
} }
// open state storage backend // open state storage backend
if (tdbTbOpen("state.db", sizeof(SStateKey), -1, stateKeyCmpr, pState->db, &pState->pStateDb, 0) < 0) { if (tdbTbOpen("state.db", sizeof(SStateKey), -1, stateKeyCmpr, pState->pTdbState->db, &pState->pTdbState->pStateDb,
0) < 0) {
goto _err; goto _err;
} }
// todo refactor // todo refactor
if (tdbTbOpen("fill.state.db", sizeof(SWinKey), -1, winKeyCmpr, pState->db, &pState->pFillStateDb, 0) < 0) { if (tdbTbOpen("fill.state.db", sizeof(SWinKey), -1, winKeyCmpr, pState->pTdbState->db,
&pState->pTdbState->pFillStateDb, 0) < 0) {
goto _err; goto _err;
} }
if (tdbTbOpen("session.state.db", sizeof(SStateSessionKey), -1, stateSessionKeyCmpr, pState->db, if (tdbTbOpen("session.state.db", sizeof(SStateSessionKey), -1, stateSessionKeyCmpr, pState->pTdbState->db,
&pState->pSessionStateDb, 0) < 0) { &pState->pTdbState->pSessionStateDb, 0) < 0) {
goto _err; goto _err;
} }
if (tdbTbOpen("func.state.db", sizeof(STupleKey), -1, STupleKeyCmpr, pState->db, &pState->pFuncStateDb, 0) < 0) { if (tdbTbOpen("func.state.db", sizeof(STupleKey), -1, STupleKeyCmpr, pState->pTdbState->db,
&pState->pTdbState->pFuncStateDb, 0) < 0) {
goto _err; goto _err;
} }
if (tdbTbOpen("parname.state.db", sizeof(int64_t), TSDB_TABLE_NAME_LEN, NULL, pState->db, &pState->pParNameDb, 0) < if (tdbTbOpen("parname.state.db", sizeof(int64_t), TSDB_TABLE_NAME_LEN, NULL, pState->pTdbState->db,
0) { &pState->pTdbState->pParNameDb, 0) < 0) {
goto _err; goto _err;
} }
...@@ -154,117 +163,117 @@ SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath, int ...@@ -154,117 +163,117 @@ SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath, int
goto _err; goto _err;
} }
pState->pOwner = pTask; pState->pTdbState->pOwner = pTask;
return pState; return pState;
_err: _err:
tdbTbClose(pState->pStateDb); tdbTbClose(pState->pTdbState->pStateDb);
tdbTbClose(pState->pFuncStateDb); tdbTbClose(pState->pTdbState->pFuncStateDb);
tdbTbClose(pState->pFillStateDb); tdbTbClose(pState->pTdbState->pFillStateDb);
tdbTbClose(pState->pSessionStateDb); tdbTbClose(pState->pTdbState->pSessionStateDb);
tdbTbClose(pState->pParNameDb); tdbTbClose(pState->pTdbState->pParNameDb);
tdbClose(pState->db); tdbClose(pState->pTdbState->db);
taosMemoryFree(pState); streamStateDestroy(pState);
return NULL; return NULL;
} }
void streamStateClose(SStreamState* pState) { void streamStateClose(SStreamState* pState) {
tdbCommit(pState->db, &pState->txn); tdbCommit(pState->pTdbState->db, &pState->pTdbState->txn);
tdbPostCommit(pState->db, &pState->txn); tdbPostCommit(pState->pTdbState->db, &pState->pTdbState->txn);
tdbTbClose(pState->pStateDb); tdbTbClose(pState->pTdbState->pStateDb);
tdbTbClose(pState->pFuncStateDb); tdbTbClose(pState->pTdbState->pFuncStateDb);
tdbTbClose(pState->pFillStateDb); tdbTbClose(pState->pTdbState->pFillStateDb);
tdbTbClose(pState->pSessionStateDb); tdbTbClose(pState->pTdbState->pSessionStateDb);
tdbTbClose(pState->pParNameDb); tdbTbClose(pState->pTdbState->pParNameDb);
tdbClose(pState->db); tdbClose(pState->pTdbState->db);
taosMemoryFree(pState); streamStateDestroy(pState);
} }
int32_t streamStateBegin(SStreamState* pState) { int32_t streamStateBegin(SStreamState* pState) {
if (tdbTxnOpen(&pState->txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < if (tdbTxnOpen(&pState->pTdbState->txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL,
0) { TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) {
return -1; return -1;
} }
if (tdbBegin(pState->db, &pState->txn) < 0) { if (tdbBegin(pState->pTdbState->db, &pState->pTdbState->txn) < 0) {
tdbTxnClose(&pState->txn); tdbTxnClose(&pState->pTdbState->txn);
return -1; return -1;
} }
return 0; return 0;
} }
int32_t streamStateCommit(SStreamState* pState) { int32_t streamStateCommit(SStreamState* pState) {
if (tdbCommit(pState->db, &pState->txn) < 0) { if (tdbCommit(pState->pTdbState->db, &pState->pTdbState->txn) < 0) {
return -1; return -1;
} }
if (tdbPostCommit(pState->db, &pState->txn) < 0) { if (tdbPostCommit(pState->pTdbState->db, &pState->pTdbState->txn) < 0) {
return -1; return -1;
} }
memset(&pState->txn, 0, sizeof(TXN)); memset(&pState->pTdbState->txn, 0, sizeof(TXN));
if (tdbTxnOpen(&pState->txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < if (tdbTxnOpen(&pState->pTdbState->txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL,
0) { TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) {
return -1; return -1;
} }
if (tdbBegin(pState->db, &pState->txn) < 0) { if (tdbBegin(pState->pTdbState->db, &pState->pTdbState->txn) < 0) {
return -1; return -1;
} }
return 0; return 0;
} }
int32_t streamStateAbort(SStreamState* pState) { int32_t streamStateAbort(SStreamState* pState) {
if (tdbAbort(pState->db, &pState->txn) < 0) { if (tdbAbort(pState->pTdbState->db, &pState->pTdbState->txn) < 0) {
return -1; return -1;
} }
memset(&pState->txn, 0, sizeof(TXN)); memset(&pState->pTdbState->txn, 0, sizeof(TXN));
if (tdbTxnOpen(&pState->txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < if (tdbTxnOpen(&pState->pTdbState->txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL,
0) { TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) {
return -1; return -1;
} }
if (tdbBegin(pState->db, &pState->txn) < 0) { if (tdbBegin(pState->pTdbState->db, &pState->pTdbState->txn) < 0) {
return -1; return -1;
} }
return 0; return 0;
} }
int32_t streamStateFuncPut(SStreamState* pState, const STupleKey* key, const void* value, int32_t vLen) { int32_t streamStateFuncPut(SStreamState* pState, const STupleKey* key, const void* value, int32_t vLen) {
return tdbTbUpsert(pState->pFuncStateDb, key, sizeof(STupleKey), value, vLen, &pState->txn); return tdbTbUpsert(pState->pTdbState->pFuncStateDb, key, sizeof(STupleKey), value, vLen, &pState->pTdbState->txn);
} }
int32_t streamStateFuncGet(SStreamState* pState, const STupleKey* key, void** pVal, int32_t* pVLen) { int32_t streamStateFuncGet(SStreamState* pState, const STupleKey* key, void** pVal, int32_t* pVLen) {
return tdbTbGet(pState->pFuncStateDb, key, sizeof(STupleKey), pVal, pVLen); return tdbTbGet(pState->pTdbState->pFuncStateDb, key, sizeof(STupleKey), pVal, pVLen);
} }
int32_t streamStateFuncDel(SStreamState* pState, const STupleKey* key) { int32_t streamStateFuncDel(SStreamState* pState, const STupleKey* key) {
return tdbTbDelete(pState->pFuncStateDb, key, sizeof(STupleKey), &pState->txn); return tdbTbDelete(pState->pTdbState->pFuncStateDb, key, sizeof(STupleKey), &pState->pTdbState->txn);
} }
// todo refactor // todo refactor
int32_t streamStatePut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen) { int32_t streamStatePut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen) {
SStateKey sKey = {.key = *key, .opNum = pState->number}; SStateKey sKey = {.key = *key, .opNum = pState->number};
return tdbTbUpsert(pState->pStateDb, &sKey, sizeof(SStateKey), value, vLen, &pState->txn); return tdbTbUpsert(pState->pTdbState->pStateDb, &sKey, sizeof(SStateKey), value, vLen, &pState->pTdbState->txn);
} }
// todo refactor // todo refactor
int32_t streamStateFillPut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen) { int32_t streamStateFillPut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen) {
return tdbTbUpsert(pState->pFillStateDb, key, sizeof(SWinKey), value, vLen, &pState->txn); return tdbTbUpsert(pState->pTdbState->pFillStateDb, key, sizeof(SWinKey), value, vLen, &pState->pTdbState->txn);
} }
// todo refactor // todo refactor
int32_t streamStateGet(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) { int32_t streamStateGet(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) {
SStateKey sKey = {.key = *key, .opNum = pState->number}; SStateKey sKey = {.key = *key, .opNum = pState->number};
return tdbTbGet(pState->pStateDb, &sKey, sizeof(SStateKey), pVal, pVLen); return tdbTbGet(pState->pTdbState->pStateDb, &sKey, sizeof(SStateKey), pVal, pVLen);
} }
// todo refactor // todo refactor
int32_t streamStateFillGet(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) { int32_t streamStateFillGet(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) {
return tdbTbGet(pState->pFillStateDb, key, sizeof(SWinKey), pVal, pVLen); return tdbTbGet(pState->pTdbState->pFillStateDb, key, sizeof(SWinKey), pVal, pVLen);
} }
// todo refactor // todo refactor
int32_t streamStateDel(SStreamState* pState, const SWinKey* key) { int32_t streamStateDel(SStreamState* pState, const SWinKey* key) {
SStateKey sKey = {.key = *key, .opNum = pState->number}; SStateKey sKey = {.key = *key, .opNum = pState->number};
return tdbTbDelete(pState->pStateDb, &sKey, sizeof(SStateKey), &pState->txn); return tdbTbDelete(pState->pTdbState->pStateDb, &sKey, sizeof(SStateKey), &pState->pTdbState->txn);
} }
int32_t streamStateClear(SStreamState* pState) { int32_t streamStateClear(SStreamState* pState) {
...@@ -288,7 +297,7 @@ void streamStateSetNumber(SStreamState* pState, int32_t number) { pState->number ...@@ -288,7 +297,7 @@ void streamStateSetNumber(SStreamState* pState, int32_t number) { pState->number
// todo refactor // todo refactor
int32_t streamStateFillDel(SStreamState* pState, const SWinKey* key) { int32_t streamStateFillDel(SStreamState* pState, const SWinKey* key) {
return tdbTbDelete(pState->pFillStateDb, key, sizeof(SWinKey), &pState->txn); return tdbTbDelete(pState->pTdbState->pFillStateDb, key, sizeof(SWinKey), &pState->pTdbState->txn);
} }
int32_t streamStateAddIfNotExist(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) { int32_t streamStateAddIfNotExist(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) {
...@@ -314,7 +323,7 @@ int32_t streamStateReleaseBuf(SStreamState* pState, const SWinKey* key, void* pV ...@@ -314,7 +323,7 @@ int32_t streamStateReleaseBuf(SStreamState* pState, const SWinKey* key, void* pV
SStreamStateCur* streamStateGetCur(SStreamState* pState, const SWinKey* key) { SStreamStateCur* streamStateGetCur(SStreamState* pState, const SWinKey* key) {
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
if (pCur == NULL) return NULL; if (pCur == NULL) return NULL;
tdbTbcOpen(pState->pStateDb, &pCur->pCur, NULL); tdbTbcOpen(pState->pTdbState->pStateDb, &pCur->pCur, NULL);
int32_t c = 0; int32_t c = 0;
SStateKey sKey = {.key = *key, .opNum = pState->number}; SStateKey sKey = {.key = *key, .opNum = pState->number};
...@@ -330,7 +339,7 @@ SStreamStateCur* streamStateGetCur(SStreamState* pState, const SWinKey* key) { ...@@ -330,7 +339,7 @@ SStreamStateCur* streamStateGetCur(SStreamState* pState, const SWinKey* key) {
SStreamStateCur* streamStateFillGetCur(SStreamState* pState, const SWinKey* key) { SStreamStateCur* streamStateFillGetCur(SStreamState* pState, const SWinKey* key) {
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
if (pCur == NULL) return NULL; if (pCur == NULL) return NULL;
tdbTbcOpen(pState->pFillStateDb, &pCur->pCur, NULL); tdbTbcOpen(pState->pTdbState->pFillStateDb, &pCur->pCur, NULL);
int32_t c = 0; int32_t c = 0;
tdbTbcMoveTo(pCur->pCur, key, sizeof(SWinKey), &c); tdbTbcMoveTo(pCur->pCur, key, sizeof(SWinKey), &c);
...@@ -422,7 +431,7 @@ SStreamStateCur* streamStateSeekKeyNext(SStreamState* pState, const SWinKey* key ...@@ -422,7 +431,7 @@ SStreamStateCur* streamStateSeekKeyNext(SStreamState* pState, const SWinKey* key
return NULL; return NULL;
} }
pCur->number = pState->number; pCur->number = pState->number;
if (tdbTbcOpen(pState->pStateDb, &pCur->pCur, NULL) < 0) { if (tdbTbcOpen(pState->pTdbState->pStateDb, &pCur->pCur, NULL) < 0) {
streamStateFreeCur(pCur); streamStateFreeCur(pCur);
return NULL; return NULL;
} }
...@@ -448,7 +457,7 @@ SStreamStateCur* streamStateFillSeekKeyNext(SStreamState* pState, const SWinKey* ...@@ -448,7 +457,7 @@ SStreamStateCur* streamStateFillSeekKeyNext(SStreamState* pState, const SWinKey*
if (!pCur) { if (!pCur) {
return NULL; return NULL;
} }
if (tdbTbcOpen(pState->pFillStateDb, &pCur->pCur, NULL) < 0) { if (tdbTbcOpen(pState->pTdbState->pFillStateDb, &pCur->pCur, NULL) < 0) {
streamStateFreeCur(pCur); streamStateFreeCur(pCur);
return NULL; return NULL;
} }
...@@ -473,7 +482,7 @@ SStreamStateCur* streamStateFillSeekKeyPrev(SStreamState* pState, const SWinKey* ...@@ -473,7 +482,7 @@ SStreamStateCur* streamStateFillSeekKeyPrev(SStreamState* pState, const SWinKey*
if (pCur == NULL) { if (pCur == NULL) {
return NULL; return NULL;
} }
if (tdbTbcOpen(pState->pFillStateDb, &pCur->pCur, NULL) < 0) { if (tdbTbcOpen(pState->pTdbState->pFillStateDb, &pCur->pCur, NULL) < 0) {
streamStateFreeCur(pCur); streamStateFreeCur(pCur);
return NULL; return NULL;
} }
...@@ -520,7 +529,8 @@ void streamFreeVal(void* val) { tdbFree(val); } ...@@ -520,7 +529,8 @@ void streamFreeVal(void* val) { tdbFree(val); }
int32_t streamStateSessionPut(SStreamState* pState, const SSessionKey* key, const void* value, int32_t vLen) { int32_t streamStateSessionPut(SStreamState* pState, const SSessionKey* key, const void* value, int32_t vLen) {
SStateSessionKey sKey = {.key = *key, .opNum = pState->number}; SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
return tdbTbUpsert(pState->pSessionStateDb, &sKey, sizeof(SStateSessionKey), value, vLen, &pState->txn); return tdbTbUpsert(pState->pTdbState->pSessionStateDb, &sKey, sizeof(SStateSessionKey), value, vLen,
&pState->pTdbState->txn);
} }
int32_t streamStateSessionGet(SStreamState* pState, SSessionKey* key, void** pVal, int32_t* pVLen) { int32_t streamStateSessionGet(SStreamState* pState, SSessionKey* key, void** pVal, int32_t* pVLen) {
...@@ -543,7 +553,7 @@ int32_t streamStateSessionGet(SStreamState* pState, SSessionKey* key, void** pVa ...@@ -543,7 +553,7 @@ int32_t streamStateSessionGet(SStreamState* pState, SSessionKey* key, void** pVa
int32_t streamStateSessionDel(SStreamState* pState, const SSessionKey* key) { int32_t streamStateSessionDel(SStreamState* pState, const SSessionKey* key) {
SStateSessionKey sKey = {.key = *key, .opNum = pState->number}; SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
return tdbTbDelete(pState->pSessionStateDb, &sKey, sizeof(SStateSessionKey), &pState->txn); return tdbTbDelete(pState->pTdbState->pSessionStateDb, &sKey, sizeof(SStateSessionKey), &pState->pTdbState->txn);
} }
SStreamStateCur* streamStateSessionSeekKeyCurrentPrev(SStreamState* pState, const SSessionKey* key) { SStreamStateCur* streamStateSessionSeekKeyCurrentPrev(SStreamState* pState, const SSessionKey* key) {
...@@ -552,7 +562,7 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentPrev(SStreamState* pState, cons ...@@ -552,7 +562,7 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentPrev(SStreamState* pState, cons
return NULL; return NULL;
} }
pCur->number = pState->number; pCur->number = pState->number;
if (tdbTbcOpen(pState->pSessionStateDb, &pCur->pCur, NULL) < 0) { if (tdbTbcOpen(pState->pTdbState->pSessionStateDb, &pCur->pCur, NULL) < 0) {
streamStateFreeCur(pCur); streamStateFreeCur(pCur);
return NULL; return NULL;
} }
...@@ -579,7 +589,7 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentNext(SStreamState* pState, cons ...@@ -579,7 +589,7 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentNext(SStreamState* pState, cons
return NULL; return NULL;
} }
pCur->number = pState->number; pCur->number = pState->number;
if (tdbTbcOpen(pState->pSessionStateDb, &pCur->pCur, NULL) < 0) { if (tdbTbcOpen(pState->pTdbState->pSessionStateDb, &pCur->pCur, NULL) < 0) {
streamStateFreeCur(pCur); streamStateFreeCur(pCur);
return NULL; return NULL;
} }
...@@ -607,7 +617,7 @@ SStreamStateCur* streamStateSessionSeekKeyNext(SStreamState* pState, const SSess ...@@ -607,7 +617,7 @@ SStreamStateCur* streamStateSessionSeekKeyNext(SStreamState* pState, const SSess
return NULL; return NULL;
} }
pCur->number = pState->number; pCur->number = pState->number;
if (tdbTbcOpen(pState->pSessionStateDb, &pCur->pCur, NULL) < 0) { if (tdbTbcOpen(pState->pTdbState->pSessionStateDb, &pCur->pCur, NULL) < 0) {
streamStateFreeCur(pCur); streamStateFreeCur(pCur);
return NULL; return NULL;
} }
...@@ -674,7 +684,7 @@ int32_t streamStateSessionGetKeyByRange(SStreamState* pState, const SSessionKey* ...@@ -674,7 +684,7 @@ int32_t streamStateSessionGetKeyByRange(SStreamState* pState, const SSessionKey*
return -1; return -1;
} }
pCur->number = pState->number; pCur->number = pState->number;
if (tdbTbcOpen(pState->pSessionStateDb, &pCur->pCur, NULL) < 0) { if (tdbTbcOpen(pState->pTdbState->pSessionStateDb, &pCur->pCur, NULL) < 0) {
streamStateFreeCur(pCur); streamStateFreeCur(pCur);
return -1; return -1;
} }
...@@ -821,13 +831,19 @@ _end: ...@@ -821,13 +831,19 @@ _end:
} }
int32_t streamStatePutParName(SStreamState* pState, int64_t groupId, const char tbname[TSDB_TABLE_NAME_LEN]) { int32_t streamStatePutParName(SStreamState* pState, int64_t groupId, const char tbname[TSDB_TABLE_NAME_LEN]) {
tdbTbUpsert(pState->pParNameDb, &groupId, sizeof(int64_t), tbname, TSDB_TABLE_NAME_LEN, &pState->txn); tdbTbUpsert(pState->pTdbState->pParNameDb, &groupId, sizeof(int64_t), tbname, TSDB_TABLE_NAME_LEN,
&pState->pTdbState->txn);
return 0; return 0;
} }
int32_t streamStateGetParName(SStreamState* pState, int64_t groupId, void** pVal) { int32_t streamStateGetParName(SStreamState* pState, int64_t groupId, void** pVal) {
int32_t len; int32_t len;
return tdbTbGet(pState->pParNameDb, &groupId, sizeof(int64_t), pVal, &len); return tdbTbGet(pState->pTdbState->pParNameDb, &groupId, sizeof(int64_t), pVal, &len);
}
void streamStateDestroy(SStreamState* pState) {
taosMemoryFreeClear(pState->pTdbState);
taosMemoryFreeClear(pState);
} }
#if 0 #if 0
...@@ -837,7 +853,7 @@ char* streamStateSessionDump(SStreamState* pState) { ...@@ -837,7 +853,7 @@ char* streamStateSessionDump(SStreamState* pState) {
return NULL; return NULL;
} }
pCur->number = pState->number; pCur->number = pState->number;
if (tdbTbcOpen(pState->pSessionStateDb, &pCur->pCur, NULL) < 0) { if (tdbTbcOpen(pState->pTdbState->pSessionStateDb, &pCur->pCur, NULL) < 0) {
streamStateFreeCur(pCur); streamStateFreeCur(pCur);
return NULL; return NULL;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册