diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index d3f10ad52978acdc87ee39814b1b389c161b9f8f..8cf212cb1d084c20870ccfd13d6d2168f4cc9250 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -206,7 +206,7 @@ int32_t smaBegin(SSma* pSma); int32_t smaSyncPreCommit(SSma* pSma); int32_t smaSyncCommit(SSma* pSma); int32_t smaSyncPostCommit(SSma* pSma); -int32_t smaPreCommit(SSma* pSma); +int32_t smaPrepareAsyncCommit(SSma* pSma); int32_t smaCommit(SSma* pSma, SCommitInfo* pInfo); int32_t smaFinishCommit(SSma* pSma); int32_t smaPostCommit(SSma* pSma); diff --git a/source/dnode/vnode/src/sma/smaCommit.c b/source/dnode/vnode/src/sma/smaCommit.c index f038d96c7b0c2d117b8d8afd10d7c3e640252dac..9748963722cb5dac2f0a83bc89fad9a04987cf28 100644 --- a/source/dnode/vnode/src/sma/smaCommit.c +++ b/source/dnode/vnode/src/sma/smaCommit.c @@ -59,7 +59,7 @@ int32_t smaSyncPostCommit(SSma *pSma) { return tdProcessRSmaSyncPostCommitImpl(p * @param pSma * @return int32_t */ -int32_t smaPreCommit(SSma *pSma) { return tdProcessRSmaAsyncPreCommitImpl(pSma); } +int32_t smaPrepareAsyncCommit(SSma *pSma) { return tdProcessRSmaAsyncPreCommitImpl(pSma); } /** * @brief async commit, only applicable to Rollup SMA @@ -378,6 +378,11 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) { taosWUnLockLatch(SMA_ENV_LOCK(pEnv)); #endif + // all rsma results are written completely + STsdb *pTsdb = NULL; + if ((pTsdb = VND_RSMA1(pSma->pVnode))) tsdbPrepareCommit(pTsdb); + if ((pTsdb = VND_RSMA2(pSma->pVnode))) tsdbPrepareCommit(pTsdb); + return TSDB_CODE_SUCCESS; } diff --git a/source/dnode/vnode/src/vnd/vnodeCommit.c b/source/dnode/vnode/src/vnd/vnodeCommit.c index 4bb2b62f281a64ac40fdb9c091942db434e65496..f9a598fec6861f66107011ca5a04dc6df5a33726 100644 --- a/source/dnode/vnode/src/vnd/vnodeCommit.c +++ b/source/dnode/vnode/src/vnd/vnodeCommit.c @@ -191,6 +191,8 @@ static void vnodePrepareCommit(SVnode *pVnode) { tsdbPrepareCommit(pVnode->pTsdb); metaPrepareAsyncCommit(pVnode->pMeta); + smaPrepareAsyncCommit(pVnode->pSma); + vnodeBufPoolUnRef(pVnode->inUse); pVnode->inUse = NULL; @@ -280,9 +282,6 @@ static int vnodeCommitImpl(SCommitInfo *pInfo) { // walBeginSnapshot(pVnode->pWal, pVnode->state.applied); syncBeginSnapshot(pVnode->sync, pVnode->state.applied); - code = smaPreCommit(pVnode->pSma); - TSDB_CHECK_CODE(code, lino, _exit); - // commit each sub-system if (metaCommit(pVnode->pMeta, pInfo->txn) < 0) { code = TSDB_CODE_FAILED;