提交 1a4cf696 编写于 作者: M Minglei Jin

fix: add post commit to tq & stream

上级 c17f8244
...@@ -125,6 +125,10 @@ int32_t tqMetaSaveCheckInfo(STQ* pTq, const char* key, const void* value, int32_ ...@@ -125,6 +125,10 @@ int32_t tqMetaSaveCheckInfo(STQ* pTq, const char* key, const void* value, int32_
return -1; return -1;
} }
if (tdbPostCommit(pTq->pMetaDB, &txn) < 0) {
return -1;
}
return 0; return 0;
} }
...@@ -147,6 +151,10 @@ int32_t tqMetaDeleteCheckInfo(STQ* pTq, const char* key) { ...@@ -147,6 +151,10 @@ int32_t tqMetaDeleteCheckInfo(STQ* pTq, const char* key) {
ASSERT(0); ASSERT(0);
} }
if (tdbPostCommit(pTq->pMetaDB, &txn) < 0) {
ASSERT(0);
}
return 0; return 0;
} }
...@@ -226,6 +234,10 @@ int32_t tqMetaSaveHandle(STQ* pTq, const char* key, const STqHandle* pHandle) { ...@@ -226,6 +234,10 @@ int32_t tqMetaSaveHandle(STQ* pTq, const char* key, const STqHandle* pHandle) {
ASSERT(0); ASSERT(0);
} }
if (tdbPostCommit(pTq->pMetaDB, &txn) < 0) {
ASSERT(0);
}
tEncoderClear(&encoder); tEncoderClear(&encoder);
taosMemoryFree(buf); taosMemoryFree(buf);
return 0; return 0;
...@@ -250,6 +262,10 @@ int32_t tqMetaDeleteHandle(STQ* pTq, const char* key) { ...@@ -250,6 +262,10 @@ int32_t tqMetaDeleteHandle(STQ* pTq, const char* key) {
ASSERT(0); ASSERT(0);
} }
if (tdbPostCommit(pTq->pMetaDB, &txn) < 0) {
ASSERT(0);
}
return 0; return 0;
} }
......
...@@ -169,6 +169,8 @@ int32_t tqSnapWriterClose(STqSnapWriter** ppWriter, int8_t rollback) { ...@@ -169,6 +169,8 @@ int32_t tqSnapWriterClose(STqSnapWriter** ppWriter, int8_t rollback) {
} else { } else {
code = tdbCommit(pWriter->pTq->pMetaDB, &pWriter->txn); code = tdbCommit(pWriter->pTq->pMetaDB, &pWriter->txn);
if (code) goto _err; if (code) goto _err;
code = tdbPostCommit(pWriter->pTq->pMetaDB, &pWriter->txn);
if (code) goto _err;
} }
taosMemoryFree(pWriter); taosMemoryFree(pWriter);
......
...@@ -169,6 +169,8 @@ int32_t tqSnapWriterClose(STqSnapWriter** ppWriter, int8_t rollback) { ...@@ -169,6 +169,8 @@ int32_t tqSnapWriterClose(STqSnapWriter** ppWriter, int8_t rollback) {
} else { } else {
code = tdbCommit(pWriter->pTq->pMetaDB, &pWriter->txn); code = tdbCommit(pWriter->pTq->pMetaDB, &pWriter->txn);
if (code) goto _err; if (code) goto _err;
code = tdbPostCommit(pWriter->pTq->pMetaDB, &pWriter->txn);
if (code) goto _err;
} }
taosMemoryFree(pWriter); taosMemoryFree(pWriter);
......
...@@ -169,6 +169,8 @@ int32_t tqSnapWriterClose(STqSnapWriter** ppWriter, int8_t rollback) { ...@@ -169,6 +169,8 @@ int32_t tqSnapWriterClose(STqSnapWriter** ppWriter, int8_t rollback) {
} else { } else {
code = tdbCommit(pWriter->pTq->pMetaStore, &pWriter->txn); code = tdbCommit(pWriter->pTq->pMetaStore, &pWriter->txn);
if (code) goto _err; if (code) goto _err;
code = tdbPostCommit(pWriter->pTq->pMetaStore, &pWriter->txn);
if (code) goto _err;
} }
taosMemoryFree(pWriter); taosMemoryFree(pWriter);
......
...@@ -70,6 +70,7 @@ _err: ...@@ -70,6 +70,7 @@ _err:
void streamMetaClose(SStreamMeta* pMeta) { void streamMetaClose(SStreamMeta* pMeta) {
tdbCommit(pMeta->db, &pMeta->txn); tdbCommit(pMeta->db, &pMeta->txn);
tdbPostCommit(pMeta->db, &pMeta->txn);
tdbTbClose(pMeta->pTaskDb); tdbTbClose(pMeta->pTaskDb);
tdbTbClose(pMeta->pCheckpointDb); tdbTbClose(pMeta->pCheckpointDb);
tdbClose(pMeta->db); tdbClose(pMeta->db);
......
...@@ -142,6 +142,7 @@ _err: ...@@ -142,6 +142,7 @@ _err:
void streamStateClose(SStreamState* pState) { void streamStateClose(SStreamState* pState) {
tdbCommit(pState->db, &pState->txn); tdbCommit(pState->db, &pState->txn);
tdbPostCommit(pState->db, &pState->txn);
tdbTbClose(pState->pStateDb); tdbTbClose(pState->pStateDb);
tdbTbClose(pState->pFuncStateDb); tdbTbClose(pState->pFuncStateDb);
tdbTbClose(pState->pFillStateDb); tdbTbClose(pState->pFillStateDb);
...@@ -168,6 +169,9 @@ int32_t streamStateCommit(SStreamState* pState) { ...@@ -168,6 +169,9 @@ int32_t streamStateCommit(SStreamState* pState) {
if (tdbCommit(pState->db, &pState->txn) < 0) { if (tdbCommit(pState->db, &pState->txn) < 0) {
return -1; return -1;
} }
if (tdbPostCommit(pState->db, &pState->txn) < 0) {
return -1;
}
memset(&pState->txn, 0, sizeof(TXN)); memset(&pState->txn, 0, sizeof(TXN));
if (tdbTxnOpen(&pState->txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < if (tdbTxnOpen(&pState->txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) <
0) { 0) {
......
...@@ -140,6 +140,7 @@ int tdbBtreeOpen(int keyLen, int valLen, SPager *pPager, char const *tbname, SPg ...@@ -140,6 +140,7 @@ int tdbBtreeOpen(int keyLen, int valLen, SPager *pPager, char const *tbname, SPg
// tdbUnrefPage(pPage); // tdbUnrefPage(pPage);
tdbPCacheRelease(pPager->pCache, pPage, &txn); tdbPCacheRelease(pPager->pCache, pPage, &txn);
tdbCommit(pPager->pEnv, &txn); tdbCommit(pPager->pEnv, &txn);
tdbPostCommit(pPager->pEnv, &txn);
tdbTxnClose(&txn); tdbTxnClose(&txn);
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册