diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index e3f08e912ab3bf955f5e1905439a317c4464e214..c8f1efa1ab4f6e398c3252f1fa1376747d1371cd 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -61,7 +61,7 @@ int32_t tsHeartbeatInterval = 1000; int32_t tsHeartbeatTimeout = 20 * 1000; // vnode -int64_t tsVndCommitMaxIntervalMs = 60 * 1000; +int64_t tsVndCommitMaxIntervalMs = 600 * 1000; // monitor bool tsEnableMonitor = true; diff --git a/source/dnode/vnode/src/vnd/vnodeBufPool.c b/source/dnode/vnode/src/vnd/vnodeBufPool.c index 83a414dae08bd48e41e12682922daf7055e1f5a0..88abc1b3f0e25537707ba44b20860b77c55b35df 100644 --- a/source/dnode/vnode/src/vnd/vnodeBufPool.c +++ b/source/dnode/vnode/src/vnd/vnodeBufPool.c @@ -94,6 +94,7 @@ int vnodeOpenBufPool(SVnode *pVnode) { int vnodeCloseBufPool(SVnode *pVnode) { SVBufPool *pPool; + taosThreadMutexLock(&pVnode->mutex); for (pPool = pVnode->pPool; pPool; pPool = pVnode->pPool) { pVnode->pPool = pPool->next; vnodeBufPoolDestroy(pPool); @@ -103,8 +104,9 @@ int vnodeCloseBufPool(SVnode *pVnode) { vnodeBufPoolDestroy(pVnode->inUse); pVnode->inUse = NULL; } - vDebug("vgId:%d, vnode buffer pool is closed", TD_VID(pVnode)); + taosThreadMutexUnlock(&pVnode->mutex); + vDebug("vgId:%d, vnode buffer pool is closed", TD_VID(pVnode)); return 0; } @@ -244,6 +246,9 @@ void vnodeBufPoolUnRef(SVBufPool *pPool) { pVnode->pPool = pPool; taosThreadCondSignal(&pVnode->poolNotEmpty); + if (pVnode->inUse == pPool) { + pVnode->inUse = NULL; + } taosThreadMutexUnlock(&pVnode->mutex); } } diff --git a/source/dnode/vnode/src/vnd/vnodeCommit.c b/source/dnode/vnode/src/vnd/vnodeCommit.c index cefa9e675562bf0b96d2e107b73f2beac4ebb8eb..e8280ea7518012e887d80004b547b9c59ca01f04 100644 --- a/source/dnode/vnode/src/vnd/vnodeCommit.c +++ b/source/dnode/vnode/src/vnd/vnodeCommit.c @@ -87,22 +87,21 @@ void vnodeUpdCommitSched(SVnode *pVnode) { } int vnodeShouldCommit(SVnode *pVnode) { - if (!pVnode->inUse || !osDataSpaceAvailable()) { - return false; - } - SVCommitSched *pSched = &pVnode->commitSched; int64_t nowMs = taosGetMonoTimestampMs(); + bool diskAvail = osDataSpaceAvailable(); + bool needCommit = false; - return (((pVnode->inUse->size > pVnode->inUse->node.size) && (pSched->commitMs + SYNC_VND_COMMIT_MIN_MS < nowMs)) || - (pVnode->inUse->size > 0 && pSched->commitMs + pSched->maxWaitMs < nowMs)); -} - -int vnodeShouldCommitOld(SVnode *pVnode) { - if (pVnode->inUse) { - return osDataSpaceAvailable() && (pVnode->inUse->size > pVnode->inUse->node.size); + taosThreadMutexLock(&pVnode->mutex); + if (!pVnode->inUse || !diskAvail) { + goto _out; } - return false; + needCommit = + (((pVnode->inUse->size > pVnode->inUse->node.size) && (pSched->commitMs + SYNC_VND_COMMIT_MIN_MS < nowMs)) || + (pVnode->inUse->size > 0 && pSched->commitMs + pSched->maxWaitMs < nowMs)); +_out: + taosThreadMutexUnlock(&pVnode->mutex); + return needCommit; } int vnodeSaveInfo(const char *dir, const SVnodeInfo *pInfo) { @@ -259,7 +258,6 @@ static int32_t vnodePrepareCommit(SVnode *pVnode, SCommitInfo *pInfo) { metaPrepareAsyncCommit(pVnode->pMeta); vnodeBufPoolUnRef(pVnode->inUse); - pVnode->inUse = NULL; _exit: if (code) {