From b0b9877265eb1a157c6d6fbb46f25977d1c81651 Mon Sep 17 00:00:00 2001 From: Benguang Zhao Date: Tue, 21 Feb 2023 19:00:47 +0800 Subject: [PATCH] fix: check if log buffer is empty in syncNodeOnLocalCmd --- source/libs/sync/inc/syncPipeline.h | 1 + source/libs/sync/src/syncMain.c | 4 ++++ source/libs/sync/src/syncPipeline.c | 11 +++++++++++ 3 files changed, 16 insertions(+) diff --git a/source/libs/sync/inc/syncPipeline.h b/source/libs/sync/inc/syncPipeline.h index 504a9f0bd7..a823cfda0b 100644 --- a/source/libs/sync/inc/syncPipeline.h +++ b/source/libs/sync/inc/syncPipeline.h @@ -99,6 +99,7 @@ int32_t syncLogBufferReInit(SSyncLogBuffer* pBuf, SSyncNode* pNode); // access int64_t syncLogBufferGetEndIndex(SSyncLogBuffer* pBuf); SyncTerm syncLogBufferGetLastMatchTerm(SSyncLogBuffer* pBuf); +bool syncLogBufferIsEmpty(SSyncLogBuffer* pBuf); int32_t syncLogBufferAppend(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEntry* pEntry); int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEntry* pEntry, SyncTerm prevTerm); int64_t syncLogBufferProceed(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncTerm* pMatchTerm); diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 3f0432d998..07ecc63c25 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -2468,6 +2468,10 @@ int32_t syncNodeOnLocalCmd(SSyncNode* ths, const SRpcMsg* pRpcMsg) { syncNodeStepDown(ths, pMsg->currentTerm); } else if (pMsg->cmd == SYNC_LOCAL_CMD_FOLLOWER_CMT) { + if (syncLogBufferIsEmpty(ths->pLogBuf)) { + sError("vgId:%d, sync log buffer is empty.", ths->vgId); + return 0; + } SyncTerm matchTerm = syncLogBufferGetLastMatchTerm(ths->pLogBuf); if (pMsg->currentTerm == matchTerm) { (void)syncNodeUpdateCommitIndex(ths, pMsg->commitIndex); diff --git a/source/libs/sync/src/syncPipeline.c b/source/libs/sync/src/syncPipeline.c index c9ff2d2dcc..e2b039a2e4 100644 --- a/source/libs/sync/src/syncPipeline.c +++ b/source/libs/sync/src/syncPipeline.c @@ -253,6 +253,7 @@ int32_t syncLogBufferInit(SSyncLogBuffer* pBuf, SSyncNode* pNode) { int32_t syncLogBufferReInit(SSyncLogBuffer* pBuf, SSyncNode* pNode) { taosThreadMutexLock(&pBuf->mutex); + syncLogBufferValidate(pBuf); for (SyncIndex index = pBuf->startIndex; index < pBuf->endIndex; index++) { SSyncRaftEntry* pEntry = pBuf->entries[(index + pBuf->size) % pBuf->size].pItem; if (pEntry == NULL) continue; @@ -265,6 +266,7 @@ int32_t syncLogBufferReInit(SSyncLogBuffer* pBuf, SSyncNode* pNode) { if (ret < 0) { sError("vgId:%d, failed to re-initialize sync log buffer since %s.", pNode->vgId, terrstr()); } + syncLogBufferValidate(pBuf); taosThreadMutexUnlock(&pBuf->mutex); return ret; } @@ -283,6 +285,13 @@ SyncTerm syncLogBufferGetLastMatchTerm(SSyncLogBuffer* pBuf) { return term; } +bool syncLogBufferIsEmpty(SSyncLogBuffer* pBuf) { + taosThreadMutexLock(&pBuf->mutex); + bool empty = (pBuf->endIndex <= pBuf->startIndex); + taosThreadMutexUnlock(&pBuf->mutex); + return empty; +} + int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEntry* pEntry, SyncTerm prevTerm) { taosThreadMutexLock(&pBuf->mutex); syncLogBufferValidate(pBuf); @@ -1073,6 +1082,7 @@ int32_t syncLogBufferRollback(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncIndex int32_t syncLogBufferReset(SSyncLogBuffer* pBuf, SSyncNode* pNode) { taosThreadMutexLock(&pBuf->mutex); + syncLogBufferValidate(pBuf); SyncIndex lastVer = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore); ASSERT(lastVer == pBuf->matchIndex); SyncIndex index = pBuf->endIndex - 1; @@ -1089,6 +1099,7 @@ int32_t syncLogBufferReset(SSyncLogBuffer* pBuf, SSyncNode* pNode) { SSyncLogReplMgr* pMgr = pNode->logReplMgrs[i]; syncLogReplMgrReset(pMgr); } + syncLogBufferValidate(pBuf); taosThreadMutexUnlock(&pBuf->mutex); return 0; } -- GitLab