提交 cf0995f7 编写于 作者: K kailixu

feat: async commit for rsma

上级 a4adcbbe
...@@ -206,7 +206,7 @@ int32_t smaBegin(SSma* pSma); ...@@ -206,7 +206,7 @@ int32_t smaBegin(SSma* pSma);
int32_t smaSyncPreCommit(SSma* pSma); int32_t smaSyncPreCommit(SSma* pSma);
int32_t smaSyncCommit(SSma* pSma); int32_t smaSyncCommit(SSma* pSma);
int32_t smaSyncPostCommit(SSma* pSma); int32_t smaSyncPostCommit(SSma* pSma);
int32_t smaPreCommit(SSma* pSma); int32_t smaPrepareAsyncCommit(SSma* pSma);
int32_t smaCommit(SSma* pSma, SCommitInfo* pInfo); int32_t smaCommit(SSma* pSma, SCommitInfo* pInfo);
int32_t smaFinishCommit(SSma* pSma); int32_t smaFinishCommit(SSma* pSma);
int32_t smaPostCommit(SSma* pSma); int32_t smaPostCommit(SSma* pSma);
......
...@@ -59,7 +59,7 @@ int32_t smaSyncPostCommit(SSma *pSma) { return tdProcessRSmaSyncPostCommitImpl(p ...@@ -59,7 +59,7 @@ int32_t smaSyncPostCommit(SSma *pSma) { return tdProcessRSmaSyncPostCommitImpl(p
* @param pSma * @param pSma
* @return int32_t * @return int32_t
*/ */
int32_t smaPreCommit(SSma *pSma) { return tdProcessRSmaAsyncPreCommitImpl(pSma); } int32_t smaPrepareAsyncCommit(SSma *pSma) { return tdProcessRSmaAsyncPreCommitImpl(pSma); }
/** /**
* @brief async commit, only applicable to Rollup SMA * @brief async commit, only applicable to Rollup SMA
...@@ -378,6 +378,11 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) { ...@@ -378,6 +378,11 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) {
taosWUnLockLatch(SMA_ENV_LOCK(pEnv)); taosWUnLockLatch(SMA_ENV_LOCK(pEnv));
#endif #endif
// all rsma results are written completely
STsdb *pTsdb = NULL;
if ((pTsdb = VND_RSMA1(pSma->pVnode))) tsdbPrepareCommit(pTsdb);
if ((pTsdb = VND_RSMA2(pSma->pVnode))) tsdbPrepareCommit(pTsdb);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
......
...@@ -191,6 +191,8 @@ static void vnodePrepareCommit(SVnode *pVnode) { ...@@ -191,6 +191,8 @@ static void vnodePrepareCommit(SVnode *pVnode) {
tsdbPrepareCommit(pVnode->pTsdb); tsdbPrepareCommit(pVnode->pTsdb);
metaPrepareAsyncCommit(pVnode->pMeta); metaPrepareAsyncCommit(pVnode->pMeta);
smaPrepareAsyncCommit(pVnode->pSma);
vnodeBufPoolUnRef(pVnode->inUse); vnodeBufPoolUnRef(pVnode->inUse);
pVnode->inUse = NULL; pVnode->inUse = NULL;
...@@ -280,9 +282,6 @@ static int vnodeCommitImpl(SCommitInfo *pInfo) { ...@@ -280,9 +282,6 @@ static int vnodeCommitImpl(SCommitInfo *pInfo) {
// walBeginSnapshot(pVnode->pWal, pVnode->state.applied); // walBeginSnapshot(pVnode->pWal, pVnode->state.applied);
syncBeginSnapshot(pVnode->sync, pVnode->state.applied); syncBeginSnapshot(pVnode->sync, pVnode->state.applied);
code = smaPreCommit(pVnode->pSma);
TSDB_CHECK_CODE(code, lino, _exit);
// commit each sub-system // commit each sub-system
if (metaCommit(pVnode->pMeta, pInfo->txn) < 0) { if (metaCommit(pVnode->pMeta, pInfo->txn) < 0) {
code = TSDB_CODE_FAILED; code = TSDB_CODE_FAILED;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册