提交 b0b98772 编写于 作者: B Benguang Zhao

fix: check if log buffer is empty in syncNodeOnLocalCmd

上级 4902e9a6
...@@ -99,6 +99,7 @@ int32_t syncLogBufferReInit(SSyncLogBuffer* pBuf, SSyncNode* pNode); ...@@ -99,6 +99,7 @@ int32_t syncLogBufferReInit(SSyncLogBuffer* pBuf, SSyncNode* pNode);
// access // access
int64_t syncLogBufferGetEndIndex(SSyncLogBuffer* pBuf); int64_t syncLogBufferGetEndIndex(SSyncLogBuffer* pBuf);
SyncTerm syncLogBufferGetLastMatchTerm(SSyncLogBuffer* pBuf); SyncTerm syncLogBufferGetLastMatchTerm(SSyncLogBuffer* pBuf);
bool syncLogBufferIsEmpty(SSyncLogBuffer* pBuf);
int32_t syncLogBufferAppend(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEntry* pEntry); int32_t syncLogBufferAppend(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEntry* pEntry);
int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEntry* pEntry, SyncTerm prevTerm); int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEntry* pEntry, SyncTerm prevTerm);
int64_t syncLogBufferProceed(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncTerm* pMatchTerm); int64_t syncLogBufferProceed(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncTerm* pMatchTerm);
......
...@@ -2468,6 +2468,10 @@ int32_t syncNodeOnLocalCmd(SSyncNode* ths, const SRpcMsg* pRpcMsg) { ...@@ -2468,6 +2468,10 @@ int32_t syncNodeOnLocalCmd(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
syncNodeStepDown(ths, pMsg->currentTerm); syncNodeStepDown(ths, pMsg->currentTerm);
} else if (pMsg->cmd == SYNC_LOCAL_CMD_FOLLOWER_CMT) { } else if (pMsg->cmd == SYNC_LOCAL_CMD_FOLLOWER_CMT) {
if (syncLogBufferIsEmpty(ths->pLogBuf)) {
sError("vgId:%d, sync log buffer is empty.", ths->vgId);
return 0;
}
SyncTerm matchTerm = syncLogBufferGetLastMatchTerm(ths->pLogBuf); SyncTerm matchTerm = syncLogBufferGetLastMatchTerm(ths->pLogBuf);
if (pMsg->currentTerm == matchTerm) { if (pMsg->currentTerm == matchTerm) {
(void)syncNodeUpdateCommitIndex(ths, pMsg->commitIndex); (void)syncNodeUpdateCommitIndex(ths, pMsg->commitIndex);
......
...@@ -253,6 +253,7 @@ int32_t syncLogBufferInit(SSyncLogBuffer* pBuf, SSyncNode* pNode) { ...@@ -253,6 +253,7 @@ int32_t syncLogBufferInit(SSyncLogBuffer* pBuf, SSyncNode* pNode) {
int32_t syncLogBufferReInit(SSyncLogBuffer* pBuf, SSyncNode* pNode) { int32_t syncLogBufferReInit(SSyncLogBuffer* pBuf, SSyncNode* pNode) {
taosThreadMutexLock(&pBuf->mutex); taosThreadMutexLock(&pBuf->mutex);
syncLogBufferValidate(pBuf);
for (SyncIndex index = pBuf->startIndex; index < pBuf->endIndex; index++) { for (SyncIndex index = pBuf->startIndex; index < pBuf->endIndex; index++) {
SSyncRaftEntry* pEntry = pBuf->entries[(index + pBuf->size) % pBuf->size].pItem; SSyncRaftEntry* pEntry = pBuf->entries[(index + pBuf->size) % pBuf->size].pItem;
if (pEntry == NULL) continue; if (pEntry == NULL) continue;
...@@ -265,6 +266,7 @@ int32_t syncLogBufferReInit(SSyncLogBuffer* pBuf, SSyncNode* pNode) { ...@@ -265,6 +266,7 @@ int32_t syncLogBufferReInit(SSyncLogBuffer* pBuf, SSyncNode* pNode) {
if (ret < 0) { if (ret < 0) {
sError("vgId:%d, failed to re-initialize sync log buffer since %s.", pNode->vgId, terrstr()); sError("vgId:%d, failed to re-initialize sync log buffer since %s.", pNode->vgId, terrstr());
} }
syncLogBufferValidate(pBuf);
taosThreadMutexUnlock(&pBuf->mutex); taosThreadMutexUnlock(&pBuf->mutex);
return ret; return ret;
} }
...@@ -283,6 +285,13 @@ SyncTerm syncLogBufferGetLastMatchTerm(SSyncLogBuffer* pBuf) { ...@@ -283,6 +285,13 @@ SyncTerm syncLogBufferGetLastMatchTerm(SSyncLogBuffer* pBuf) {
return term; return term;
} }
bool syncLogBufferIsEmpty(SSyncLogBuffer* pBuf) {
taosThreadMutexLock(&pBuf->mutex);
bool empty = (pBuf->endIndex <= pBuf->startIndex);
taosThreadMutexUnlock(&pBuf->mutex);
return empty;
}
int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEntry* pEntry, SyncTerm prevTerm) { int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEntry* pEntry, SyncTerm prevTerm) {
taosThreadMutexLock(&pBuf->mutex); taosThreadMutexLock(&pBuf->mutex);
syncLogBufferValidate(pBuf); syncLogBufferValidate(pBuf);
...@@ -1073,6 +1082,7 @@ int32_t syncLogBufferRollback(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncIndex ...@@ -1073,6 +1082,7 @@ int32_t syncLogBufferRollback(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncIndex
int32_t syncLogBufferReset(SSyncLogBuffer* pBuf, SSyncNode* pNode) { int32_t syncLogBufferReset(SSyncLogBuffer* pBuf, SSyncNode* pNode) {
taosThreadMutexLock(&pBuf->mutex); taosThreadMutexLock(&pBuf->mutex);
syncLogBufferValidate(pBuf);
SyncIndex lastVer = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore); SyncIndex lastVer = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore);
ASSERT(lastVer == pBuf->matchIndex); ASSERT(lastVer == pBuf->matchIndex);
SyncIndex index = pBuf->endIndex - 1; SyncIndex index = pBuf->endIndex - 1;
...@@ -1089,6 +1099,7 @@ int32_t syncLogBufferReset(SSyncLogBuffer* pBuf, SSyncNode* pNode) { ...@@ -1089,6 +1099,7 @@ int32_t syncLogBufferReset(SSyncLogBuffer* pBuf, SSyncNode* pNode) {
SSyncLogReplMgr* pMgr = pNode->logReplMgrs[i]; SSyncLogReplMgr* pMgr = pNode->logReplMgrs[i];
syncLogReplMgrReset(pMgr); syncLogReplMgrReset(pMgr);
} }
syncLogBufferValidate(pBuf);
taosThreadMutexUnlock(&pBuf->mutex); taosThreadMutexUnlock(&pBuf->mutex);
return 0; return 0;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册