提交 a4adcbbe 编写于 作者: M Minglei Jin

vnode/async-commit: adapter vnode-commit to use txn commit

上级 2a203472
...@@ -101,8 +101,9 @@ typedef struct STbUidStore STbUidStore; ...@@ -101,8 +101,9 @@ typedef struct STbUidStore STbUidStore;
int metaOpen(SVnode* pVnode, SMeta** ppMeta, int8_t rollback); int metaOpen(SVnode* pVnode, SMeta** ppMeta, int8_t rollback);
int metaClose(SMeta* pMeta); int metaClose(SMeta* pMeta);
int metaBegin(SMeta* pMeta, int8_t fromSys); int metaBegin(SMeta* pMeta, int8_t fromSys);
int metaCommit(SMeta* pMeta); TXN* metaGetTxn(SMeta* pMeta);
int metaFinishCommit(SMeta* pMeta); int metaCommit(SMeta* pMeta, TXN* txn);
int metaFinishCommit(SMeta* pMeta, TXN* txn);
int metaPrepareAsyncCommit(SMeta* pMeta); int metaPrepareAsyncCommit(SMeta* pMeta);
int metaCreateSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* pReq); int metaCreateSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* pReq);
int metaAlterSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* pReq); int metaAlterSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* pReq);
...@@ -411,6 +412,7 @@ struct SSnapDataHdr { ...@@ -411,6 +412,7 @@ struct SSnapDataHdr {
struct SCommitInfo { struct SCommitInfo {
SVnodeInfo info; SVnodeInfo info;
SVnode* pVnode; SVnode* pVnode;
TXN* txn;
}; };
#ifdef __cplusplus #ifdef __cplusplus
......
...@@ -40,9 +40,10 @@ int metaBegin(SMeta *pMeta, int8_t fromSys) { ...@@ -40,9 +40,10 @@ int metaBegin(SMeta *pMeta, int8_t fromSys) {
} }
// commit the meta txn // commit the meta txn
int metaCommit(SMeta *pMeta) { return tdbCommit(pMeta->pEnv, pMeta->txn); } TXN *metaGetTxn(SMeta *pMeta) { return pMeta->txn; }
int metaFinishCommit(SMeta *pMeta) { return tdbPostCommit(pMeta->pEnv, pMeta->txn); } int metaCommit(SMeta *pMeta, TXN *txn) { return tdbCommit(pMeta->pEnv, txn); }
int metaPrepareAsyncCommit(SMeta *pMeta) { return tdbPrepareAsyncCommit(pMeta->pEnv, pMeta->txn); } int metaFinishCommit(SMeta *pMeta, TXN *txn) { return tdbPostCommit(pMeta->pEnv, txn); }
int metaPrepareAsyncCommit(SMeta *pMeta) { return tdbPrepareAsyncCommit(pMeta->pEnv, pMeta->txn); }
// abort the meta txn // abort the meta txn
int metaAbort(SMeta *pMeta) { return tdbAbort(pMeta->pEnv, pMeta->txn); } int metaAbort(SMeta *pMeta) { return tdbAbort(pMeta->pEnv, pMeta->txn); }
...@@ -163,9 +163,9 @@ int32_t metaSnapWriterClose(SMetaSnapWriter** ppWriter, int8_t rollback) { ...@@ -163,9 +163,9 @@ int32_t metaSnapWriterClose(SMetaSnapWriter** ppWriter, int8_t rollback) {
if (rollback) { if (rollback) {
ASSERT(0); ASSERT(0);
} else { } else {
code = metaCommit(pWriter->pMeta); code = metaCommit(pWriter->pMeta, pWriter->pMeta->txn);
if (code) goto _err; if (code) goto _err;
code = metaFinishCommit(pWriter->pMeta); code = metaFinishCommit(pWriter->pMeta, pWriter->pMeta->txn);
if (code) goto _err; if (code) goto _err;
} }
taosMemoryFree(pWriter); taosMemoryFree(pWriter);
......
...@@ -189,10 +189,11 @@ _err: ...@@ -189,10 +189,11 @@ _err:
static void vnodePrepareCommit(SVnode *pVnode) { static void vnodePrepareCommit(SVnode *pVnode) {
tsem_wait(&pVnode->canCommit); tsem_wait(&pVnode->canCommit);
tsdbPrepareCommit(pVnode->pTsdb);
metaPrepareAsyncCommit(pVnode->pMeta);
vnodeBufPoolUnRef(pVnode->inUse); vnodeBufPoolUnRef(pVnode->inUse);
pVnode->inUse = NULL; pVnode->inUse = NULL;
tsdbPrepareCommit(pVnode->pTsdb);
} }
static int32_t vnodeCommitTask(void *arg) { static int32_t vnodeCommitTask(void *arg) {
int32_t code = 0; int32_t code = 0;
...@@ -229,6 +230,7 @@ int vnodeAsyncCommit(SVnode *pVnode) { ...@@ -229,6 +230,7 @@ int vnodeAsyncCommit(SVnode *pVnode) {
pInfo->info.state.commitTerm = pVnode->state.applyTerm; pInfo->info.state.commitTerm = pVnode->state.applyTerm;
pInfo->info.state.commitID = pVnode->state.commitID; pInfo->info.state.commitID = pVnode->state.commitID;
pInfo->pVnode = pVnode; pInfo->pVnode = pVnode;
pInfo->txn = metaGetTxn(pVnode->pMeta);
vnodeScheduleTask(vnodeCommitTask, pInfo); vnodeScheduleTask(vnodeCommitTask, pInfo);
_exit: _exit:
...@@ -282,7 +284,7 @@ static int vnodeCommitImpl(SCommitInfo *pInfo) { ...@@ -282,7 +284,7 @@ static int vnodeCommitImpl(SCommitInfo *pInfo) {
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
// commit each sub-system // commit each sub-system
if (metaCommit(pVnode->pMeta) < 0) { if (metaCommit(pVnode->pMeta, pInfo->txn) < 0) {
code = TSDB_CODE_FAILED; code = TSDB_CODE_FAILED;
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
...@@ -314,7 +316,7 @@ static int vnodeCommitImpl(SCommitInfo *pInfo) { ...@@ -314,7 +316,7 @@ static int vnodeCommitImpl(SCommitInfo *pInfo) {
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
if (metaFinishCommit(pVnode->pMeta) < 0) { if (metaFinishCommit(pVnode->pMeta, pInfo->txn) < 0) {
code = terrno; code = terrno;
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册