diff --git a/src/inc/tsdb.h b/src/inc/tsdb.h index 9d82245c2199b5fa0b62d709a08633e5a976b007..9f80b4e6acb483207d771cbbf1245ca568ec48d9 100644 --- a/src/inc/tsdb.h +++ b/src/inc/tsdb.h @@ -421,6 +421,8 @@ bool tsdbNoProblem(STsdbRepo* pRepo); // unit of walSize: MB int tsdbCheckWal(STsdbRepo *pRepo, uint32_t walSize); +int tsdbGetNumOfWaitCommit(STsdbRepo *pRepo, int *nNum); + #ifdef __cplusplus } #endif diff --git a/src/tsdb/src/tsdbMain.c b/src/tsdb/src/tsdbMain.c index 000b8c69f4e2b72d7acb169a9920dcfeaa9f8f84..10cfacca57a543886c85f52a6c2ea9c4a911de6e 100644 --- a/src/tsdb/src/tsdbMain.c +++ b/src/tsdb/src/tsdbMain.c @@ -199,13 +199,19 @@ int tsdbCheckCommit(STsdbRepo *pRepo) { return 0; } +int tsdbGetNumOfWaitCommit(STsdbRepo *pRepo, int *nNum) { + if (sem_getvalue(&pRepo->readyToCommit, nNum) != 0) { + tsdbError("vgId:%d failed to sem_getvalue of readyToCommit", REPO_ID(pRepo)); + return -1; + } + return 0; +} + int tsdbCheckWal(STsdbRepo *pRepo, uint32_t walSize) { // MB STsdbCfg *pCfg = &(pRepo->config); - if ((walSize > tsdbWalFlushSize) && (walSize > (pCfg->totalBlocks / 2 * pCfg->cacheBlockSize))) { if (tsdbAsyncCommit(pRepo) < 0) return -1; } - return 0; } diff --git a/src/vnode/src/vnodeWrite.c b/src/vnode/src/vnodeWrite.c index c804a862a341700c5828a41986e22e372db73225..0b6064474a925474390c71f4c09ce0052ff33a2f 100644 --- a/src/vnode/src/vnodeWrite.c +++ b/src/vnode/src/vnodeWrite.c @@ -36,6 +36,7 @@ static int32_t vnodeProcessAlterTableMsg(SVnodeObj *pVnode, void *pCont, SRspRet static int32_t vnodeProcessDropStableMsg(SVnodeObj *pVnode, void *pCont, SRspRet *); static int32_t vnodeProcessUpdateTagValMsg(SVnodeObj *pVnode, void *pCont, SRspRet *); static int32_t vnodePerformFlowCtrl(SVWriteMsg *pWrite); +static int32_t vnodeCheckWal(SVnodeObj *pVnode); int32_t vnodeInitWrite(void) { vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_SUBMIT] = vnodeProcessSubmitMsg; @@ -167,6 +168,20 @@ static int32_t vnodeProcessSubmitMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pR return code; } +static int32_t vnodeCheckWal(SVnodeObj *pVnode) { + // no need to check wal size to trigger commit if: + // 1) vnode in committing state; + // 2) other instance wait to commit; + int nWaitCommit = 0; + if (tsdbGetNumOfWaitCommit(pVnode->tsdb, &nWaitCommit) != 0) { + return -1; + } + if ((nWaitCommit > 0) && (pVnode->isCommiting == 0)) { + return tsdbCheckWal(pVnode->tsdb, walGetFSize(pVnode->wal) >> 20); + } + return 0; +} + static int32_t vnodeProcessCreateTableMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pRet) { int code = TSDB_CODE_SUCCESS; @@ -181,9 +196,8 @@ static int32_t vnodeProcessCreateTableMsg(SVnodeObj *pVnode, void *pCont, SRspRe ASSERT(code != 0); } - // no need to set wal size to trigger commit if vnode in committing state - if ((((++pVnode->tblMsgVer) & 32767) == 0) && (pVnode->isCommiting == 0)) { // lazy check - tsdbCheckWal(pVnode->tsdb, walGetFSize(pVnode->wal) >> 20); + if (((++pVnode->tblMsgVer) & 16383) == 0) { // lazy check + vnodeCheckWal(pVnode); } tsdbClearTableCfg(pCfg);