提交 54526adc 编写于 作者: C Cary Xu

fix: commit txn for rsma

上级 272236e3
...@@ -211,6 +211,7 @@ int32_t smaSyncCommit(SSma* pSma); ...@@ -211,6 +211,7 @@ int32_t smaSyncCommit(SSma* pSma);
int32_t smaSyncPostCommit(SSma* pSma); int32_t smaSyncPostCommit(SSma* pSma);
int32_t smaPreCommit(SSma* pSma); int32_t smaPreCommit(SSma* pSma);
int32_t smaCommit(SSma* pSma); int32_t smaCommit(SSma* pSma);
int32_t smaFinishCommit(SSma* pSma);
int32_t smaPostCommit(SSma* pSma); int32_t smaPostCommit(SSma* pSma);
int32_t smaDoRetention(SSma* pSma, int64_t now); int32_t smaDoRetention(SSma* pSma, int64_t now);
......
...@@ -112,6 +112,30 @@ int32_t smaBegin(SSma *pSma) { ...@@ -112,6 +112,30 @@ int32_t smaBegin(SSma *pSma) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t smaFinishCommit(SSma *pSma) {
int32_t code = 0;
SVnode *pVnode = pSma->pVnode;
SSmaEnv *pSmaEnv = SMA_RSMA_ENV(pSma);
if (!pSmaEnv) {
goto _exit;
}
if ((code = tsdbFinishCommit(VND_RSMA0(pVnode))) < 0) {
smaError("vgId:%d, failed to finish commit tsdb rsma0 since %s", TD_VID(pVnode), tstrerror(code));
goto _exit;
}
if ((code = tsdbFinishCommit(VND_RSMA1(pVnode))) < 0) {
smaError("vgId:%d, failed to finish commit tsdb rsma1 since %s", TD_VID(pVnode), tstrerror(code));
goto _exit;
}
if ((code = tsdbFinishCommit(VND_RSMA2(pVnode))) < 0) {
smaError("vgId:%d, failed to finish commit tsdb rsma2 since %s", TD_VID(pVnode), tstrerror(code));
goto _exit;
}
_exit:
terrno = code;
return code;
}
#if 0 #if 0
/** /**
* @brief pre-commit for rollup sma(sync commit). * @brief pre-commit for rollup sma(sync commit).
......
...@@ -239,10 +239,8 @@ int vnodeCommit(SVnode *pVnode) { ...@@ -239,10 +239,8 @@ int vnodeCommit(SVnode *pVnode) {
} }
walBeginSnapshot(pVnode->pWal, pVnode->state.applied); walBeginSnapshot(pVnode->pWal, pVnode->state.applied);
if (smaPreCommit(pVnode->pSma) < 0) { code = smaPreCommit(pVnode->pSma);
vError("vgId:%d, failed to pre-commit sma since %s", TD_VID(pVnode), tstrerror(terrno)); TSDB_CHECK_CODE(code, lino, _exit);
return -1;
}
vnodeBufPoolUnRef(pVnode->inUse); vnodeBufPoolUnRef(pVnode->inUse);
pVnode->inUse = NULL; pVnode->inUse = NULL;
...@@ -254,10 +252,8 @@ int vnodeCommit(SVnode *pVnode) { ...@@ -254,10 +252,8 @@ int vnodeCommit(SVnode *pVnode) {
} }
if (VND_IS_RSMA(pVnode)) { if (VND_IS_RSMA(pVnode)) {
if (smaCommit(pVnode->pSma) < 0) { code = smaCommit(pVnode->pSma);
vError("vgId:%d, failed to commit sma since %s", TD_VID(pVnode), tstrerror(terrno)); TSDB_CHECK_CODE(code, lino, _exit);
return -1;
}
} else { } else {
code = tsdbCommit(pVnode->pTsdb); code = tsdbCommit(pVnode->pTsdb);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
...@@ -274,7 +270,13 @@ int vnodeCommit(SVnode *pVnode) { ...@@ -274,7 +270,13 @@ int vnodeCommit(SVnode *pVnode) {
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
tsdbFinishCommit(pVnode->pTsdb); if (VND_IS_RSMA(pVnode)) {
code = smaFinishCommit(pVnode->pSma);
TSDB_CHECK_CODE(code, lino, _exit);
} else {
code = tsdbFinishCommit(pVnode->pTsdb);
TSDB_CHECK_CODE(code, lino, _exit);
}
if (metaFinishCommit(pVnode->pMeta) < 0) { if (metaFinishCommit(pVnode->pMeta) < 0) {
code = terrno; code = terrno;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册