diff --git a/source/libs/sync/inc/syncPipeline.h b/source/libs/sync/inc/syncPipeline.h index 504a9f0bd77158a2e73ed1b3d1da317cbf1ddcd6..a823cfda0b76cda1a60650a8208c5cfa4a39a160 100644 --- a/source/libs/sync/inc/syncPipeline.h +++ b/source/libs/sync/inc/syncPipeline.h @@ -99,6 +99,7 @@ int32_t syncLogBufferReInit(SSyncLogBuffer* pBuf, SSyncNode* pNode); // access int64_t syncLogBufferGetEndIndex(SSyncLogBuffer* pBuf); SyncTerm syncLogBufferGetLastMatchTerm(SSyncLogBuffer* pBuf); +bool syncLogBufferIsEmpty(SSyncLogBuffer* pBuf); int32_t syncLogBufferAppend(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEntry* pEntry); int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEntry* pEntry, SyncTerm prevTerm); int64_t syncLogBufferProceed(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncTerm* pMatchTerm); diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 3f0432d9980bf1f774606f77cc1b48cec8078b74..07ecc63c256aafe7e9e066ed1af04414f0dfa76e 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -2468,6 +2468,10 @@ int32_t syncNodeOnLocalCmd(SSyncNode* ths, const SRpcMsg* pRpcMsg) { syncNodeStepDown(ths, pMsg->currentTerm); } else if (pMsg->cmd == SYNC_LOCAL_CMD_FOLLOWER_CMT) { + if (syncLogBufferIsEmpty(ths->pLogBuf)) { + sError("vgId:%d, sync log buffer is empty.", ths->vgId); + return 0; + } SyncTerm matchTerm = syncLogBufferGetLastMatchTerm(ths->pLogBuf); if (pMsg->currentTerm == matchTerm) { (void)syncNodeUpdateCommitIndex(ths, pMsg->commitIndex); diff --git a/source/libs/sync/src/syncPipeline.c b/source/libs/sync/src/syncPipeline.c index c9ff2d2dcc341569787d17b42fb9a167aa6eebfd..e2b039a2e489e1dc9d3c6887c11c188523391640 100644 --- a/source/libs/sync/src/syncPipeline.c +++ b/source/libs/sync/src/syncPipeline.c @@ -253,6 +253,7 @@ int32_t syncLogBufferInit(SSyncLogBuffer* pBuf, SSyncNode* pNode) { int32_t syncLogBufferReInit(SSyncLogBuffer* pBuf, SSyncNode* pNode) { taosThreadMutexLock(&pBuf->mutex); + syncLogBufferValidate(pBuf); for (SyncIndex index = pBuf->startIndex; index < pBuf->endIndex; index++) { SSyncRaftEntry* pEntry = pBuf->entries[(index + pBuf->size) % pBuf->size].pItem; if (pEntry == NULL) continue; @@ -265,6 +266,7 @@ int32_t syncLogBufferReInit(SSyncLogBuffer* pBuf, SSyncNode* pNode) { if (ret < 0) { sError("vgId:%d, failed to re-initialize sync log buffer since %s.", pNode->vgId, terrstr()); } + syncLogBufferValidate(pBuf); taosThreadMutexUnlock(&pBuf->mutex); return ret; } @@ -283,6 +285,13 @@ SyncTerm syncLogBufferGetLastMatchTerm(SSyncLogBuffer* pBuf) { return term; } +bool syncLogBufferIsEmpty(SSyncLogBuffer* pBuf) { + taosThreadMutexLock(&pBuf->mutex); + bool empty = (pBuf->endIndex <= pBuf->startIndex); + taosThreadMutexUnlock(&pBuf->mutex); + return empty; +} + int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEntry* pEntry, SyncTerm prevTerm) { taosThreadMutexLock(&pBuf->mutex); syncLogBufferValidate(pBuf); @@ -1073,6 +1082,7 @@ int32_t syncLogBufferRollback(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncIndex int32_t syncLogBufferReset(SSyncLogBuffer* pBuf, SSyncNode* pNode) { taosThreadMutexLock(&pBuf->mutex); + syncLogBufferValidate(pBuf); SyncIndex lastVer = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore); ASSERT(lastVer == pBuf->matchIndex); SyncIndex index = pBuf->endIndex - 1; @@ -1089,6 +1099,7 @@ int32_t syncLogBufferReset(SSyncLogBuffer* pBuf, SSyncNode* pNode) { SSyncLogReplMgr* pMgr = pNode->logReplMgrs[i]; syncLogReplMgrReset(pMgr); } + syncLogBufferValidate(pBuf); taosThreadMutexUnlock(&pBuf->mutex); return 0; }