diff --git a/src/inc/tsdb.h b/src/inc/tsdb.h index 15e6d5920fa26ce7267bee45f58c997bbc8398d8..ad7eaef8cbda7a52f9e2340969d7ab95791d127d 100644 --- a/src/inc/tsdb.h +++ b/src/inc/tsdb.h @@ -421,7 +421,8 @@ bool tsdbNoProblem(STsdbRepo* pRepo); // unit of walSize: MB int tsdbCheckWal(STsdbRepo *pRepo, uint32_t walSize); -int tsdbGetValOfWaitCommit(STsdbRepo *pRepo, int *nVal); +// not commit if other instances in committing state or waiting to commit +bool tsdbIsNeedCommit(STsdbRepo *pRepo); #ifdef __cplusplus } diff --git a/src/tsdb/src/tsdbMain.c b/src/tsdb/src/tsdbMain.c index da68cb63919cb9767c97009e662346b1c8f44cac..675ce7b144741b67c0ff76fdd36c6865d25a7661 100644 --- a/src/tsdb/src/tsdbMain.c +++ b/src/tsdb/src/tsdbMain.c @@ -185,6 +185,22 @@ int tsdbUnlockRepo(STsdbRepo *pRepo) { return 0; } +bool tsdbIsNeedCommit(STsdbRepo *pRepo) { + int nVal = 0; + if (sem_getvalue(&pRepo->readyToCommit, &nVal) != 0) { + tsdbError("vgId:%d failed to sem_getvalue of readyToCommit", REPO_ID(pRepo)); + } + return nVal > 0; +} + +int tsdbCheckWal(STsdbRepo *pRepo, uint32_t walSize) { // MB + STsdbCfg *pCfg = &(pRepo->config); + if ((walSize > tsdbWalFlushSize) && (walSize > (pCfg->totalBlocks / 2 * pCfg->cacheBlockSize))) { + if (tsdbIsNeedCommit(pRepo) && (tsdbAsyncCommit(pRepo) < 0)) return -1; + } + return 0; +} + int tsdbCheckCommit(STsdbRepo *pRepo) { ASSERT(pRepo->mem != NULL); STsdbCfg *pCfg = &(pRepo->config); @@ -194,23 +210,7 @@ int tsdbCheckCommit(STsdbRepo *pRepo) { if ((pRepo->mem->extraBuffList != NULL) || ((listNEles(pRepo->mem->bufBlockList) >= pCfg->totalBlocks / 3) && (pBufBlock->remain < TSDB_BUFFER_RESERVE))) { // trigger commit - if (tsdbAsyncCommit(pRepo) < 0) return -1; - } - return 0; -} - -int tsdbGetValOfWaitCommit(STsdbRepo *pRepo, int *nVal) { - if (sem_getvalue(&pRepo->readyToCommit, nVal) != 0) { - tsdbError("vgId:%d failed to sem_getvalue of readyToCommit", REPO_ID(pRepo)); - return -1; - } - return 0; -} - -int tsdbCheckWal(STsdbRepo *pRepo, uint32_t walSize) { // MB - STsdbCfg *pCfg = &(pRepo->config); - if ((walSize > tsdbWalFlushSize) && (walSize > (pCfg->totalBlocks / 2 * pCfg->cacheBlockSize))) { - if (tsdbAsyncCommit(pRepo) < 0) return -1; + if (tsdbIsNeedCommit(pRepo) && (tsdbAsyncCommit(pRepo) < 0)) return -1; } return 0; } diff --git a/src/vnode/src/vnodeMain.c b/src/vnode/src/vnodeMain.c index 92ad4c0f9b12ab424faccf8bc6aaebe01be0a0b0..c823880ae2028c4bcfe26dbfc5cd60af62443722 100644 --- a/src/vnode/src/vnodeMain.c +++ b/src/vnode/src/vnodeMain.c @@ -550,14 +550,13 @@ static int32_t vnodeProcessTsdbStatus(void *arg, int32_t status, int32_t eno) { } if (status == TSDB_STATUS_COMMIT_OVER) { + pVnode->isCommiting = 0; pVnode->isFull = 0; pVnode->fversion = pVnode->cversion; vInfo("vgId:%d, commit over, fver:%" PRIu64 " vver:%" PRIu64, pVnode->vgId, pVnode->fversion, pVnode->version); if (!vnodeInInitStatus(pVnode)) { walRemoveOneOldFile(pVnode->wal); } - // vnodeGetVersion() and calling tsdbCheckWal() would reply on the vnode isCommiting state - pVnode->isCommiting = 0; return vnodeSaveVersion(pVnode); } diff --git a/src/vnode/src/vnodeWrite.c b/src/vnode/src/vnodeWrite.c index efbdfed80ec276a77647eefefe12a1a38e16df70..7f7d37a255ff42ab47be94507a33647fce853e8e 100644 --- a/src/vnode/src/vnodeWrite.c +++ b/src/vnode/src/vnodeWrite.c @@ -169,14 +169,7 @@ static int32_t vnodeProcessSubmitMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pR } static int32_t vnodeCheckWal(SVnodeObj *pVnode) { - int nVal = 0; - if (tsdbGetValOfWaitCommit(pVnode->tsdb, &nVal) != 0) { - return -1; - } - // no need to check wal size to trigger commit if: - // 1) have instances waiting to commit; - // or 2) vnode in committing state; - if ((nVal > 0) && (pVnode->isCommiting == 0)) { + if (tsdbIsNeedCommit(pVnode->tsdb)) { return tsdbCheckWal(pVnode->tsdb, walGetFSize(pVnode->wal) >> 20); } return 0;