提交 c0c1cd82 编写于 作者: B Benguang Zhao

enh: reset sync log repl mgr if restarting of the peer detected in HeartbeatReply

上级 def4058e
...@@ -166,21 +166,21 @@ int32_t syncLogBufferInit(SSyncLogBuffer* pBuf, SSyncNode* pNode); ...@@ -166,21 +166,21 @@ int32_t syncLogBufferInit(SSyncLogBuffer* pBuf, SSyncNode* pNode);
int64_t syncLogBufferGetEndIndex(SSyncLogBuffer* pBuf); int64_t syncLogBufferGetEndIndex(SSyncLogBuffer* pBuf);
int32_t syncLogBufferAppend(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEntry* pEntry); int32_t syncLogBufferAppend(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEntry* pEntry);
int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEntry* pEntry, SyncTerm prevTerm); int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEntry* pEntry, SyncTerm prevTerm);
int64_t syncLogBufferLoad(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncIndex toIndex);
int64_t syncLogBufferProceed(SSyncLogBuffer* pBuf, SSyncNode* pNode); int64_t syncLogBufferProceed(SSyncLogBuffer* pBuf, SSyncNode* pNode);
int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t commitIndex); int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t commitIndex);
SSyncRaftEntry* syncLogBufferGetOneEntry(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncIndex index, bool* pInBuf);
int64_t syncNodeUpdateCommitIndex(SSyncNode* ths, SyncIndex commtIndex); int64_t syncNodeUpdateCommitIndex(SSyncNode* ths, SyncIndex commtIndex);
SyncAppendEntries* syncLogToAppendEntries(SSyncNode* pNode, SSyncRaftEntry* pEntry, SyncTerm prevLogTerm); SyncAppendEntries* syncLogToAppendEntries(SSyncNode* pNode, SSyncRaftEntry* pEntry, SyncTerm prevLogTerm);
// private // private
SSyncRaftEntry* syncLogBufferGetOneEntry(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncIndex index, bool* pInBuf);
int32_t syncLogBufferValidate(SSyncLogBuffer* pBuf); int32_t syncLogBufferValidate(SSyncLogBuffer* pBuf);
int32_t syncLogBufferRollback(SSyncLogBuffer* pBuf, SyncIndex toIndex); int32_t syncLogBufferRollback(SSyncLogBuffer* pBuf, SyncIndex toIndex);
int32_t syncLogBufferReplicate(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEntry* pEntry, SyncTerm prevLogTerm); int32_t syncLogBufferReplicate(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEntry* pEntry, SyncTerm prevLogTerm);
void syncIndexMgrSetIndex(SSyncIndexMgr* pSyncIndexMgr, const SRaftId* pRaftId, SyncIndex index);
bool syncNodeAgreedUpon(SSyncNode* pNode, SyncIndex index); bool syncNodeAgreedUpon(SSyncNode* pNode, SyncIndex index);
void syncIndexMgrSetIndex(SSyncIndexMgr* pSyncIndexMgr, const SRaftId* pRaftId, SyncIndex index);
typedef struct SSyncNode { typedef struct SSyncNode {
// init by SSyncInfo // init by SSyncInfo
SyncGroupId vgId; SyncGroupId vgId;
......
...@@ -324,11 +324,11 @@ int32_t syncLogBufferInit(SSyncLogBuffer* pBuf, SSyncNode* pNode) { ...@@ -324,11 +324,11 @@ int32_t syncLogBufferInit(SSyncLogBuffer* pBuf, SSyncNode* pNode) {
sError("vgId:%d, failed to get snapshot info since %s", pNode->vgId, terrstr()); sError("vgId:%d, failed to get snapshot info since %s", pNode->vgId, terrstr());
goto _err; goto _err;
} }
SyncIndex lastVer = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore);
SyncIndex commitIndex = snapshot.lastApplyIndex; SyncIndex commitIndex = snapshot.lastApplyIndex;
SyncTerm commitTerm = snapshot.lastApplyTerm; SyncTerm commitTerm = snapshot.lastApplyTerm;
SyncIndex toIndex = TMAX(lastVer, commitIndex);
SyncIndex lastVer = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore);
SyncIndex toIndex = lastVer;
ASSERT(lastVer >= commitIndex); ASSERT(lastVer >= commitIndex);
// update match index // update match index
...@@ -406,93 +406,6 @@ _err: ...@@ -406,93 +406,6 @@ _err:
return -1; return -1;
} }
int64_t syncLogBufferLoadOld(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncIndex toIndex) {
taosThreadMutexLock(&pBuf->mutex);
syncLogBufferValidate(pBuf);
SSyncLogStore* pLogStore = pNode->pLogStore;
ASSERT(pBuf->startIndex <= pBuf->matchIndex);
ASSERT(pBuf->matchIndex + 1 == pBuf->endIndex);
SyncIndex index = pBuf->endIndex;
SSyncRaftEntry* pMatch = pBuf->entries[(index - 1 + pBuf->size) % pBuf->size].pItem;
ASSERT(pMatch != NULL);
while (index - pBuf->startIndex < pBuf->size && index <= toIndex) {
SSyncRaftEntry* pEntry = NULL;
if (pLogStore->syncLogGetEntry(pLogStore, index, &pEntry) < 0) {
sError("vgId:%d, failed to get log entry since %s. index:%" PRId64 "", pNode->vgId, terrstr(), index);
ASSERT(0);
break;
}
ASSERT(pMatch->index + 1 == pEntry->index);
SSyncLogBufEntry tmp = {.pItem = pEntry, .prevLogIndex = pMatch->index, .prevLogTerm = pMatch->term};
pBuf->entries[pBuf->endIndex % pBuf->size] = tmp;
sInfo("vgId:%d, loaded log entry into log buffer. index: %" PRId64 ", term: %" PRId64, pNode->vgId, pEntry->index,
pEntry->term);
pBuf->matchIndex = index;
pBuf->endIndex = index + 1;
pMatch = pEntry;
index++;
}
syncLogBufferValidate(pBuf);
taosThreadMutexUnlock(&pBuf->mutex);
return index;
}
int32_t syncLogBufferInitOld(SSyncLogBuffer* pBuf, SSyncNode* pNode) {
taosThreadMutexLock(&pBuf->mutex);
ASSERT(pNode->pLogStore != NULL && "log store not created");
ASSERT(pNode->pFsm != NULL && "pFsm not registered");
ASSERT(pNode->pFsm->FpGetSnapshotInfo != NULL && "FpGetSnapshotInfo not registered");
SSnapshot snapshot;
if (pNode->pFsm->FpGetSnapshotInfo(pNode->pFsm, &snapshot) < 0) {
sError("vgId:%d, failed to get snapshot info since %s", pNode->vgId, terrstr());
goto _err;
}
SyncIndex commitIndex = snapshot.lastApplyIndex;
SyncTerm commitTerm = snapshot.lastApplyTerm;
// init log buffer indexes
pBuf->startIndex = commitIndex;
pBuf->matchIndex = commitIndex;
pBuf->commitIndex = commitIndex;
pBuf->endIndex = commitIndex + 1;
// put a dummy record at initial commitIndex
SSyncRaftEntry* pDummy = syncEntryBuildDummy(commitTerm, commitIndex, pNode->vgId);
if (pDummy == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
SSyncLogBufEntry tmp = {.pItem = pDummy, .prevLogIndex = commitIndex - 1, .prevLogTerm = commitTerm};
pBuf->entries[(commitIndex + pBuf->size) % pBuf->size] = tmp;
taosThreadMutexUnlock(&pBuf->mutex);
return 0;
_err:
taosThreadMutexUnlock(&pBuf->mutex);
return -1;
}
int32_t syncLogBufferRollbackMatchIndex(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncIndex toIndex) {
if (toIndex <= pBuf->commitIndex) {
sError("vgId:%d, cannot rollback across commit index:%" PRId64 ", to index:%" PRId64 "", pNode->vgId,
pBuf->commitIndex, toIndex);
return -1;
}
pBuf->matchIndex = TMIN(pBuf->matchIndex, toIndex - 1);
// update my match index
syncIndexMgrSetIndex(pNode->pMatchIndex, &pNode->myRaftId, pBuf->matchIndex);
return 0;
}
FORCE_INLINE SyncTerm syncLogBufferGetLastMatchTerm(SSyncLogBuffer* pBuf) { FORCE_INLINE SyncTerm syncLogBufferGetLastMatchTerm(SSyncLogBuffer* pBuf) {
SyncIndex index = pBuf->matchIndex; SyncIndex index = pBuf->matchIndex;
SSyncRaftEntry* pEntry = pBuf->entries[(index + pBuf->size) % pBuf->size].pItem; SSyncRaftEntry* pEntry = pBuf->entries[(index + pBuf->size) % pBuf->size].pItem;
...@@ -509,26 +422,25 @@ int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt ...@@ -509,26 +422,25 @@ int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt
SyncTerm lastMatchTerm = syncLogBufferGetLastMatchTerm(pBuf); SyncTerm lastMatchTerm = syncLogBufferGetLastMatchTerm(pBuf);
if (index <= pBuf->commitIndex) { if (index <= pBuf->commitIndex) {
sInfo("vgId:%d, raft entry already committed. index: %" PRId64 ", term: %" PRId64 ". log buffer: [%" PRId64 sDebug("vgId:%d, raft entry already committed. index: %" PRId64 ", term: %" PRId64 ". log buffer: [%" PRId64
" %" PRId64 " %" PRId64 ", %" PRId64 ")", " %" PRId64 " %" PRId64 ", %" PRId64 ")",
pNode->vgId, pEntry->index, pEntry->term, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pNode->vgId, pEntry->index, pEntry->term, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex,
pBuf->endIndex); pBuf->endIndex);
ret = 0; ret = 0;
goto _out; goto _out;
} }
if (index - pBuf->startIndex >= pBuf->size) { if (index - pBuf->startIndex >= pBuf->size) {
sInfo("vgId:%d, raft entry out of buffer capacity. index: %" PRId64 ", term: %" PRId64 ". log buffer: [%" PRId64 sDebug("vgId:%d, raft entry out of buffer capacity. index: %" PRId64 ", term: %" PRId64 ". log buffer: [%" PRId64
" %" PRId64 " %" PRId64 ", %" PRId64 ")", " %" PRId64 " %" PRId64 ", %" PRId64 ")",
pNode->vgId, pEntry->index, pEntry->term, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pNode->vgId, pEntry->index, pEntry->term, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex,
pBuf->endIndex); pBuf->endIndex);
goto _out; goto _out;
} }
if (index > pBuf->matchIndex && lastMatchTerm != prevTerm) { if (index > pBuf->matchIndex && lastMatchTerm != prevTerm) {
sInfo("vgId:%d, not ready to accept raft entry (i.e. across barrier). index: %" PRId64 ", term: %" PRId64 sInfo("vgId:%d, not ready to accept raft entry. index: %" PRId64 ", term: %" PRId64 ": prevterm: %" PRId64
": prevterm: %" PRId64 " /= lastmatch: %" PRId64 ". log buffer: [%" PRId64 " %" PRId64 " %" PRId64 " != lastmatch: %" PRId64 ". log buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")",
", %" PRId64 ")",
pNode->vgId, pEntry->index, pEntry->term, prevTerm, lastMatchTerm, pBuf->startIndex, pBuf->commitIndex, pNode->vgId, pEntry->index, pEntry->term, prevTerm, lastMatchTerm, pBuf->startIndex, pBuf->commitIndex,
pBuf->matchIndex, pBuf->endIndex); pBuf->matchIndex, pBuf->endIndex);
goto _out; goto _out;
...@@ -542,10 +454,10 @@ int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt ...@@ -542,10 +454,10 @@ int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt
if (pEntry->term != pExist->term) { if (pEntry->term != pExist->term) {
(void)syncLogBufferRollback(pBuf, index); (void)syncLogBufferRollback(pBuf, index);
} else { } else {
sInfo("vgId:%d, duplicate raft entry received. index: %" PRId64 ", term: %" PRId64 ". log buffer: [%" PRId64 sDebug("vgId:%d, duplicate raft entry received. index: %" PRId64 ", term: %" PRId64 ". log buffer: [%" PRId64
" %" PRId64 " %" PRId64 ", %" PRId64 ")", " %" PRId64 " %" PRId64 ", %" PRId64 ")",
pNode->vgId, pEntry->index, pEntry->term, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pNode->vgId, pEntry->index, pEntry->term, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex,
pBuf->endIndex); pBuf->endIndex);
SyncTerm existPrevTerm = pBuf->entries[index % pBuf->size].prevLogTerm; SyncTerm existPrevTerm = pBuf->entries[index % pBuf->size].prevLogTerm;
ASSERT(pEntry->term == pExist->term && prevTerm == existPrevTerm); ASSERT(pEntry->term == pExist->term && prevTerm == existPrevTerm);
ret = 0; ret = 0;
...@@ -647,8 +559,8 @@ int64_t syncLogBufferProceed(SSyncLogBuffer* pBuf, SSyncNode* pNode) { ...@@ -647,8 +559,8 @@ int64_t syncLogBufferProceed(SSyncLogBuffer* pBuf, SSyncNode* pNode) {
// increase match index // increase match index
pBuf->matchIndex = index; pBuf->matchIndex = index;
sInfo("vgId:%d, log buffer proceed. start index: %" PRId64 ", match index: %" PRId64 ", end index: %" PRId64, sDebug("vgId:%d, log buffer proceed. start index: %" PRId64 ", match index: %" PRId64 ", end index: %" PRId64,
pNode->vgId, pBuf->startIndex, pBuf->matchIndex, pBuf->endIndex); pNode->vgId, pBuf->startIndex, pBuf->matchIndex, pBuf->endIndex);
// replicate on demand // replicate on demand
(void)syncNodeReplicate(pNode); (void)syncNodeReplicate(pNode);
...@@ -759,8 +671,8 @@ int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t comm ...@@ -759,8 +671,8 @@ int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t comm
} }
pBuf->commitIndex = index; pBuf->commitIndex = index;
sInfo("vgId:%d, committed index: %" PRId64 ", term: %" PRId64 ", role: %d, current term: %" PRId64 "", pNode->vgId, sDebug("vgId:%d, committed index: %" PRId64 ", term: %" PRId64 ", role: %d, current term: %" PRId64 "", pNode->vgId,
pEntry->index, pEntry->term, role, term); pEntry->index, pEntry->term, role, term);
if (!inBuf) { if (!inBuf) {
syncEntryDestroy(pEntry); syncEntryDestroy(pEntry);
...@@ -784,8 +696,8 @@ _out: ...@@ -784,8 +696,8 @@ _out:
if (!pNode->restoreFinish && pBuf->commitIndex >= pNode->commitIndex) { if (!pNode->restoreFinish && pBuf->commitIndex >= pNode->commitIndex) {
pNode->pFsm->FpRestoreFinishCb(pNode->pFsm); pNode->pFsm->FpRestoreFinishCb(pNode->pFsm);
pNode->restoreFinish = true; pNode->restoreFinish = true;
sInfo("vgId:%d, restore finished. commit index:%" PRId64 ", match index:%" PRId64 ", last index:%" PRId64 "", sInfo("vgId:%d, restore finished. pBuf: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", pNode->vgId,
pNode->vgId, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex - 1); pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
} }
if (!inBuf) { if (!inBuf) {
...@@ -799,6 +711,7 @@ _out: ...@@ -799,6 +711,7 @@ _out:
int32_t syncNodeOnAppendEntries(SSyncNode* ths, SyncAppendEntries* pMsg) { int32_t syncNodeOnAppendEntries(SSyncNode* ths, SyncAppendEntries* pMsg) {
SyncAppendEntriesReply* pReply = NULL; SyncAppendEntriesReply* pReply = NULL;
bool accepted = false;
// if already drop replica, do not process // if already drop replica, do not process
if (!syncNodeInRaftGroup(ths, &(pMsg->srcId))) { if (!syncNodeInRaftGroup(ths, &(pMsg->srcId))) {
syncLogRecvAppendEntries(ths, pMsg, "not in my config"); syncLogRecvAppendEntries(ths, pMsg, "not in my config");
...@@ -847,20 +760,21 @@ int32_t syncNodeOnAppendEntries(SSyncNode* ths, SyncAppendEntries* pMsg) { ...@@ -847,20 +760,21 @@ int32_t syncNodeOnAppendEntries(SSyncNode* ths, SyncAppendEntries* pMsg) {
goto _IGNORE; goto _IGNORE;
} }
sInfo("vgId:%d, recv append entries msg. index:%" PRId64 ", term:%" PRId64 ", preLogIndex:%" PRId64 sDebug("vgId:%d, recv append entries msg. index:%" PRId64 ", term:%" PRId64 ", preLogIndex:%" PRId64
", prevLogTerm:%" PRId64 " commitIndex:%" PRId64 "", ", prevLogTerm:%" PRId64 " commitIndex:%" PRId64 "",
pMsg->vgId, pMsg->prevLogIndex + 1, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->commitIndex); pMsg->vgId, pMsg->prevLogIndex + 1, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->commitIndex);
// accept // accept
if (syncLogBufferAccept(ths->pLogBuf, ths, pEntry, pMsg->prevLogTerm) < 0) { if (syncLogBufferAccept(ths->pLogBuf, ths, pEntry, pMsg->prevLogTerm) < 0) {
goto _SEND_RESPONSE; goto _SEND_RESPONSE;
} }
accepted = true;
_SEND_RESPONSE: _SEND_RESPONSE:
pReply->matchIndex = syncLogBufferProceed(ths->pLogBuf, ths); pReply->matchIndex = syncLogBufferProceed(ths->pLogBuf, ths);
bool matched = (pReply->matchIndex >= pReply->lastSendIndex); bool matched = (pReply->matchIndex >= pReply->lastSendIndex);
pReply->success = matched; if (accepted && matched) {
if (matched) { pReply->success = true;
// update commit index only after matching // update commit index only after matching
(void)syncNodeUpdateCommitIndex(ths, pMsg->commitIndex); (void)syncNodeUpdateCommitIndex(ths, pMsg->commitIndex);
} }
......
...@@ -96,8 +96,8 @@ int64_t syncNodeCheckCommitIndex(SSyncNode* ths, SyncIndex indexLikely) { ...@@ -96,8 +96,8 @@ int64_t syncNodeCheckCommitIndex(SSyncNode* ths, SyncIndex indexLikely) {
if (indexLikely > ths->commitIndex && syncNodeAgreedUpon(ths, indexLikely)) { if (indexLikely > ths->commitIndex && syncNodeAgreedUpon(ths, indexLikely)) {
SyncIndex commitIndex = indexLikely; SyncIndex commitIndex = indexLikely;
syncNodeUpdateCommitIndex(ths, commitIndex); syncNodeUpdateCommitIndex(ths, commitIndex);
sInfo("vgId:%d, agreed upon. role:%d, term:%" PRId64 ", index: %" PRId64 "", ths->vgId, ths->state, sDebug("vgId:%d, agreed upon. role:%d, term:%" PRId64 ", index: %" PRId64 "", ths->vgId, ths->state,
ths->pRaftStore->currentTerm, commitIndex); ths->pRaftStore->currentTerm, commitIndex);
} }
return ths->commitIndex; return ths->commitIndex;
} }
...@@ -140,7 +140,7 @@ int32_t syncLogBufferReplicateOneTo(SSyncLogReplMgr* pMgr, SSyncNode* pNode, Syn ...@@ -140,7 +140,7 @@ int32_t syncLogBufferReplicateOneTo(SSyncLogReplMgr* pMgr, SSyncNode* pNode, Syn
SyncTerm prevLogTerm = -1; SyncTerm prevLogTerm = -1;
SSyncLogBuffer* pBuf = pNode->pLogBuf; SSyncLogBuffer* pBuf = pNode->pLogBuf;
sInfo("vgId:%d, replicate one msg index: %" PRId64 " to dest: 0x%016" PRIx64, pNode->vgId, index, pDestId->addr); sDebug("vgId:%d, replicate one msg index: %" PRId64 " to dest: 0x%016" PRIx64, pNode->vgId, index, pDestId->addr);
pEntry = syncLogBufferGetOneEntry(pBuf, pNode, index, &inBuf); pEntry = syncLogBufferGetOneEntry(pBuf, pNode, index, &inBuf);
if (pEntry == NULL) { if (pEntry == NULL) {
...@@ -199,8 +199,8 @@ int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, SyncAppendEntriesReply* pMs ...@@ -199,8 +199,8 @@ int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, SyncAppendEntriesReply* pMs
ASSERT(pMsg->term == ths->pRaftStore->currentTerm); ASSERT(pMsg->term == ths->pRaftStore->currentTerm);
sInfo("vgId:%d received append entries reply. srcId:0x%016" PRIx64 ", term:%" PRId64 ", matchIndex:%" PRId64 "", sDebug("vgId:%d received append entries reply. srcId:0x%016" PRIx64 ", term:%" PRId64 ", matchIndex:%" PRId64 "",
pMsg->vgId, pMsg->srcId.addr, pMsg->term, pMsg->matchIndex); pMsg->vgId, pMsg->srcId.addr, pMsg->term, pMsg->matchIndex);
if (pMsg->success) { if (pMsg->success) {
SyncIndex oldMatchIndex = syncIndexMgrGetIndex(ths->pMatchIndex, &(pMsg->srcId)); SyncIndex oldMatchIndex = syncIndexMgrGetIndex(ths->pMatchIndex, &(pMsg->srcId));
...@@ -216,7 +216,7 @@ int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, SyncAppendEntriesReply* pMs ...@@ -216,7 +216,7 @@ int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, SyncAppendEntriesReply* pMs
// replicate log // replicate log
SSyncLogReplMgr* pMgr = syncNodeGetLogReplMgr(ths, &pMsg->srcId); SSyncLogReplMgr* pMgr = syncNodeGetLogReplMgr(ths, &pMsg->srcId);
ASSERT(pMgr != NULL); // ASSERT(pMgr != NULL);
if (pMgr != NULL) { if (pMgr != NULL) {
(void)syncLogReplMgrProcessReply(pMgr, ths, pMsg); (void)syncLogReplMgrProcessReply(pMgr, ths, pMsg);
} }
......
...@@ -51,93 +51,6 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) { ...@@ -51,93 +51,6 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {
return; return;
} }
// update commit index
SyncIndex newCommitIndex = pSyncNode->commitIndex;
for (SyncIndex index = syncNodeGetLastIndex(pSyncNode); index > pSyncNode->commitIndex; --index) {
bool agree = syncAgree(pSyncNode, index);
if (agree) {
// term
SSyncRaftEntry* pEntry = NULL;
SLRUCache* pCache = pSyncNode->pLogStore->pCache;
LRUHandle* h = taosLRUCacheLookup(pCache, &index, sizeof(index));
if (h) {
pEntry = (SSyncRaftEntry*)taosLRUCacheValue(pCache, h);
} else {
int32_t code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, index, &pEntry);
if (code != 0) {
char logBuf[128];
snprintf(logBuf, sizeof(logBuf), "advance commit index error, read wal index:%" PRId64, index);
syncNodeErrorLog(pSyncNode, logBuf);
return;
}
}
// cannot commit, even if quorum agree. need check term!
if (pEntry->term <= pSyncNode->pRaftStore->currentTerm) {
// update commit index
newCommitIndex = index;
if (h) {
taosLRUCacheRelease(pCache, h, false);
} else {
syncEntryDestroy(pEntry);
}
break;
} else {
do {
char logBuf[128];
snprintf(logBuf, sizeof(logBuf), "can not commit due to term not equal, index:%" PRId64 ", term:%" PRIu64,
pEntry->index, pEntry->term);
syncNodeEventLog(pSyncNode, logBuf);
} while (0);
}
if (h) {
taosLRUCacheRelease(pCache, h, false);
} else {
syncEntryDestroy(pEntry);
}
}
}
// advance commit index as large as possible
SyncIndex walCommitVer = logStoreWalCommitVer(pSyncNode->pLogStore);
if (walCommitVer > newCommitIndex) {
newCommitIndex = walCommitVer;
}
// maybe execute fsm
if (newCommitIndex > pSyncNode->commitIndex) {
SyncIndex beginIndex = pSyncNode->commitIndex + 1;
SyncIndex endIndex = newCommitIndex;
// update commit index
pSyncNode->commitIndex = newCommitIndex;
// call back Wal
pSyncNode->pLogStore->syncLogUpdateCommitIndex(pSyncNode->pLogStore, pSyncNode->commitIndex);
// execute fsm
if (pSyncNode->pFsm != NULL) {
int32_t code = syncNodeDoCommit(pSyncNode, beginIndex, endIndex, pSyncNode->state);
if (code != 0) {
char logBuf[128];
snprintf(logBuf, sizeof(logBuf), "advance commit index error, do commit begin:%" PRId64 ", end:%" PRId64,
beginIndex, endIndex);
syncNodeErrorLog(pSyncNode, logBuf);
return;
}
}
}
}
void syncMaybeAdvanceCommitIndexOld(SSyncNode* pSyncNode) {
if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) {
syncNodeErrorLog(pSyncNode, "not leader, can not advance commit index");
return;
}
// advance commit index to sanpshot first // advance commit index to sanpshot first
SSnapshot snapshot; SSnapshot snapshot;
pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot); pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
......
...@@ -111,4 +111,4 @@ int32_t syncNodeSendRequestVote(SSyncNode* pSyncNode, const SRaftId* destRaftId, ...@@ -111,4 +111,4 @@ int32_t syncNodeSendRequestVote(SSyncNode* pSyncNode, const SRaftId* destRaftId,
syncRequestVote2RpcMsg(pMsg, &rpcMsg); syncRequestVote2RpcMsg(pMsg, &rpcMsg);
syncNodeSendMsgById(destRaftId, pSyncNode, &rpcMsg); syncNodeSendMsgById(destRaftId, pSyncNode, &rpcMsg);
return ret; return ret;
} }
\ No newline at end of file
...@@ -1116,6 +1116,8 @@ int32_t syncLogResetLogReplMgr(SSyncLogReplMgr* pMgr) { ...@@ -1116,6 +1116,8 @@ int32_t syncLogResetLogReplMgr(SSyncLogReplMgr* pMgr) {
return 0; return 0;
} }
_Atomic int64_t tsRetryCnt = 0;
int32_t syncLogReplMgrRetryOnNeed(SSyncLogReplMgr* pMgr, SSyncNode* pNode) { int32_t syncLogReplMgrRetryOnNeed(SSyncLogReplMgr* pMgr, SSyncNode* pNode) {
if (pMgr->endIndex <= pMgr->startIndex) { if (pMgr->endIndex <= pMgr->startIndex) {
return 0; return 0;
...@@ -1147,6 +1149,7 @@ int32_t syncLogReplMgrRetryOnNeed(SSyncLogReplMgr* pMgr, SSyncNode* pNode) { ...@@ -1147,6 +1149,7 @@ int32_t syncLogReplMgrRetryOnNeed(SSyncLogReplMgr* pMgr, SSyncNode* pNode) {
pMgr->states[pos].timeMs = nowMs; pMgr->states[pos].timeMs = nowMs;
pMgr->states[pos].acked = false; pMgr->states[pos].acked = false;
retried = true; retried = true;
tsRetryCnt++;
} }
ret = 0; ret = 0;
...@@ -1185,10 +1188,10 @@ int32_t syncLogReplMgrProcessReplyInRecoveryMode(SSyncLogReplMgr* pMgr, SSyncNod ...@@ -1185,10 +1188,10 @@ int32_t syncLogReplMgrProcessReplyInRecoveryMode(SSyncLogReplMgr* pMgr, SSyncNod
(void)syncLogResetLogReplMgr(pMgr); (void)syncLogResetLogReplMgr(pMgr);
} }
// send match index
SyncIndex index = TMIN(pMsg->matchIndex, pNode->pLogBuf->matchIndex); SyncIndex index = TMIN(pMsg->matchIndex, pNode->pLogBuf->matchIndex);
bool barrier = false; bool barrier = false;
ASSERT(index >= 0); ASSERT(index >= 0);
// send match index
if (syncLogBufferReplicateOneTo(pMgr, pNode, index, &destId, &barrier) < 0) { if (syncLogBufferReplicateOneTo(pMgr, pNode, index, &destId, &barrier) < 0) {
sError("vgId:%d, failed to replicate log entry since %s. index: %" PRId64 ", dest: 0x%016" PRIx64 "", pNode->vgId, sError("vgId:%d, failed to replicate log entry since %s. index: %" PRId64 ", dest: 0x%016" PRIx64 "", pNode->vgId,
terrstr(), index, destId.addr); terrstr(), index, destId.addr);
...@@ -1206,6 +1209,17 @@ int32_t syncLogReplMgrProcessReplyInRecoveryMode(SSyncLogReplMgr* pMgr, SSyncNod ...@@ -1206,6 +1209,17 @@ int32_t syncLogReplMgrProcessReplyInRecoveryMode(SSyncLogReplMgr* pMgr, SSyncNod
return 0; return 0;
} }
int32_t syncLogReplMgrProcessHeartbeatReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncHeartbeatReply* pMsg) {
SSyncLogBuffer* pBuf = pNode->pLogBuf;
taosThreadMutexLock(&pBuf->mutex);
if (pMsg->startTime != pMgr->peerStartTime) {
syncLogResetLogReplMgr(pMgr);
pMgr->peerStartTime = pMsg->startTime;
}
taosThreadMutexUnlock(&pBuf->mutex);
return 0;
}
int32_t syncLogReplMgrProcessReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg) { int32_t syncLogReplMgrProcessReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg) {
SSyncLogBuffer* pBuf = pNode->pLogBuf; SSyncLogBuffer* pBuf = pNode->pLogBuf;
taosThreadMutexLock(&pBuf->mutex); taosThreadMutexLock(&pBuf->mutex);
...@@ -1245,17 +1259,19 @@ int32_t syncLogReplMgrReplicateProbeOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode ...@@ -1245,17 +1259,19 @@ int32_t syncLogReplMgrReplicateProbeOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode
} }
SSyncLogBuffer* pBuf = pNode->pLogBuf; SSyncLogBuffer* pBuf = pNode->pLogBuf;
sInfo("vgId:%d, attempted to probe the %d'th peer. pMgr(restored:%d): [%" PRId64 " %" PRId64 ", %" PRId64 sInfo("vgId:%d, attempted to probe the %d'th peer. pMgr(rs:%d): [%" PRId64 " %" PRId64 ", %" PRId64
"), pBuf: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", "), pBuf: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")",
pNode->vgId, pMgr->peerId, pMgr->restored, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex, pBuf->startIndex, pNode->vgId, pMgr->peerId, pMgr->restored, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex, pBuf->startIndex,
pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex); pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
return 0; return 0;
} }
_Atomic int64_t tsSendCnt = 0;
int32_t syncLogReplMgrReplicateAttemptedOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode) { int32_t syncLogReplMgrReplicateAttemptedOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode) {
ASSERT(pMgr->restored); ASSERT(pMgr->restored);
SRaftId* pDestId = &pNode->replicasId[pMgr->peerId]; SRaftId* pDestId = &pNode->replicasId[pMgr->peerId];
int32_t batchSize = TMAX(1, pMgr->size / 10); int32_t batchSize = TMAX(1, pMgr->size / 20);
int32_t count = 0; int32_t count = 0;
for (SyncIndex index = pMgr->endIndex; index <= pNode->pLogBuf->matchIndex; index++) { for (SyncIndex index = pMgr->endIndex; index <= pNode->pLogBuf->matchIndex; index++) {
...@@ -1278,16 +1294,17 @@ int32_t syncLogReplMgrReplicateAttemptedOnce(SSyncLogReplMgr* pMgr, SSyncNode* p ...@@ -1278,16 +1294,17 @@ int32_t syncLogReplMgrReplicateAttemptedOnce(SSyncLogReplMgr* pMgr, SSyncNode* p
pMgr->states[pos].acked = false; pMgr->states[pos].acked = false;
pMgr->endIndex = index + 1; pMgr->endIndex = index + 1;
tsSendCnt++;
if (barrier) { if (barrier) {
break; break;
} }
} }
SSyncLogBuffer* pBuf = pNode->pLogBuf; SSyncLogBuffer* pBuf = pNode->pLogBuf;
sInfo("vgId:%d, attempted to replicate %d msgs to the %d'th peer. pMgr(restored:%d): [%" PRId64 " %" PRId64 sDebug("vgId:%d, attempted to replicate %d msgs to the %d'th peer. pMgr(rs:%d): [%" PRId64 " %" PRId64 ", %" PRId64
", %" PRId64 "), pBuf: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", "), pBuf: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")",
pNode->vgId, count, pMgr->peerId, pMgr->restored, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex, pNode->vgId, count, pMgr->peerId, pMgr->restored, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex,
pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex); pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
syncLogReplMgrRetryOnNeed(pMgr, pNode); syncLogReplMgrRetryOnNeed(pMgr, pNode);
return 0; return 0;
} }
...@@ -1704,18 +1721,13 @@ int32_t syncNodeRestore(SSyncNode* pSyncNode) { ...@@ -1704,18 +1721,13 @@ int32_t syncNodeRestore(SSyncNode* pSyncNode) {
SyncIndex commitIndex = pSyncNode->pLogStore->syncLogCommitIndex(pSyncNode->pLogStore); SyncIndex commitIndex = pSyncNode->pLogStore->syncLogCommitIndex(pSyncNode->pLogStore);
SyncIndex endIndex = pSyncNode->pLogBuf->endIndex; SyncIndex endIndex = pSyncNode->pLogBuf->endIndex;
ASSERT(endIndex == lastVer + 1);
commitIndex = TMAX(pSyncNode->commitIndex, commitIndex); commitIndex = TMAX(pSyncNode->commitIndex, commitIndex);
if (syncLogBufferCommit(pSyncNode->pLogBuf, pSyncNode, commitIndex) < 0) { if (syncLogBufferCommit(pSyncNode->pLogBuf, pSyncNode, commitIndex) < 0) {
return -1; return -1;
} }
if (endIndex <= lastVer) {
sError("vgId:%d, failed to load log entries into log buffers. commit index:%" PRId64 ", lastVer: %" PRId64 "",
pSyncNode->vgId, commitIndex, lastVer);
return -1;
}
return 0; return 0;
} }
...@@ -2722,9 +2734,8 @@ int32_t syncLogBufferReset(SSyncLogBuffer* pBuf, SSyncNode* pNode) { ...@@ -2722,9 +2734,8 @@ int32_t syncLogBufferReset(SSyncLogBuffer* pBuf, SSyncNode* pNode) {
(void)syncLogBufferRollback(pBuf, pBuf->matchIndex + 1); (void)syncLogBufferRollback(pBuf, pBuf->matchIndex + 1);
sInfo("vgId:%d, reset log buffer. start index: %" PRId64 ", commit index: %" PRId64 ", match Index: %" PRId64 sInfo("vgId:%d, reset log buffer. pBuf: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", pNode->vgId,
", end index: %" PRId64 "", pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
pNode->vgId, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
pBuf->endIndex = pBuf->matchIndex + 1; pBuf->endIndex = pBuf->matchIndex + 1;
...@@ -3380,10 +3391,10 @@ int32_t syncNodeAppend(SSyncNode* ths, SSyncRaftEntry* pEntry) { ...@@ -3380,10 +3391,10 @@ int32_t syncNodeAppend(SSyncNode* ths, SSyncRaftEntry* pEntry) {
// proceed match index, with replicating on needed // proceed match index, with replicating on needed
SyncIndex matchIndex = syncLogBufferProceed(ths->pLogBuf, ths); SyncIndex matchIndex = syncLogBufferProceed(ths->pLogBuf, ths);
sInfo("vgId:%d, append raft log index: %" PRId64 ", term: %" PRId64 " log buffer: [%" PRId64 " %" PRId64 " %" PRId64 sDebug("vgId:%d, append raft entry. index: %" PRId64 ", term: %" PRId64 " pBuf: [%" PRId64 " %" PRId64 " %" PRId64
", %" PRId64 ")", ", %" PRId64 ")",
ths->vgId, pEntry->index, pEntry->term, ths->pLogBuf->startIndex, ths->pLogBuf->commitIndex, ths->vgId, pEntry->index, pEntry->term, ths->pLogBuf->startIndex, ths->pLogBuf->commitIndex,
ths->pLogBuf->matchIndex, ths->pLogBuf->endIndex); ths->pLogBuf->matchIndex, ths->pLogBuf->endIndex);
// multi replica // multi replica
if (ths->replicaNum > 1) { if (ths->replicaNum > 1) {
...@@ -3521,6 +3532,15 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, SyncHeartbeat* pMsg) { ...@@ -3521,6 +3532,15 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, SyncHeartbeat* pMsg) {
} }
int32_t syncNodeOnHeartbeatReply(SSyncNode* ths, SyncHeartbeatReply* pMsg) { int32_t syncNodeOnHeartbeatReply(SSyncNode* ths, SyncHeartbeatReply* pMsg) {
SSyncLogReplMgr* pMgr = syncNodeGetLogReplMgr(ths, &pMsg->srcId);
if (pMgr == NULL) {
sError("vgId:%d, failed to get log repl mgr for the peer at addr 0x016%" PRIx64 "", ths->vgId, pMsg->srcId.addr);
return -1;
}
return syncLogReplMgrProcessHeartbeatReply(pMgr, ths, pMsg);
}
int32_t syncNodeOnHeartbeatReplyOld(SSyncNode* ths, SyncHeartbeatReply* pMsg) {
syncLogRecvHeartbeatReply(ths, pMsg, ""); syncLogRecvHeartbeatReply(ths, pMsg, "");
// update last reply time, make decision whether the other node is alive or not // update last reply time, make decision whether the other node is alive or not
......
...@@ -173,8 +173,8 @@ int32_t syncNodeReplicateOld(SSyncNode* pSyncNode) { ...@@ -173,8 +173,8 @@ int32_t syncNodeReplicateOld(SSyncNode* pSyncNode) {
} }
int32_t syncNodeSendAppendEntries(SSyncNode* pSyncNode, SRaftId* destRaftId, SyncAppendEntries* pMsg) { int32_t syncNodeSendAppendEntries(SSyncNode* pSyncNode, SRaftId* destRaftId, SyncAppendEntries* pMsg) {
sInfo("vgId:%d, send append entries msg index: %" PRId64 " to dest: 0x%016" PRId64, pSyncNode->vgId, sTrace("vgId:%d, send append entries msg index: %" PRId64 " to dest: 0x%016" PRId64, pSyncNode->vgId,
pMsg->prevLogIndex + 1, destRaftId->addr); pMsg->prevLogIndex + 1, destRaftId->addr);
int32_t ret = 0; int32_t ret = 0;
pMsg->destId = *destRaftId; pMsg->destId = *destRaftId;
SRpcMsg rpcMsg; SRpcMsg rpcMsg;
......
...@@ -135,4 +135,4 @@ int32_t syncNodeOnTimer(SSyncNode* ths, SyncTimeout* pMsg) { ...@@ -135,4 +135,4 @@ int32_t syncNodeOnTimer(SSyncNode* ths, SyncTimeout* pMsg) {
} }
return ret; return ret;
} }
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册