From bf634a840d05f65d7d2001601668939e876c4e55 Mon Sep 17 00:00:00 2001 From: Benguang Zhao Date: Tue, 15 Nov 2022 17:13:31 +0800 Subject: [PATCH] enh: check lastVer and commit version during syncNodeOpen --- source/libs/sync/inc/syncInt.h | 9 ++- source/libs/sync/src/syncAppendEntries.c | 13 +++- source/libs/sync/src/syncAppendEntriesReply.c | 11 ++-- source/libs/sync/src/syncMain.c | 62 +++++++++++-------- 4 files changed, 56 insertions(+), 39 deletions(-) diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index ae1c4d9e50..7f6061cf00 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -120,16 +120,15 @@ static FORCE_INLINE int32_t syncLogGetNextRetryBackoff(SSyncLogReplMgr* pMgr) { } static FORCE_INLINE int32_t syncLogReplMgrUpdateTerm(SSyncLogReplMgr* pMgr, SyncIndex index, SyncTerm term) { - if (index < pMgr->startIndex || index >= pMgr->endIndex) { - return -1; - } + if (pMgr->endIndex == 0) return -1; + ASSERT(pMgr->startIndex <= index && index < pMgr->endIndex); pMgr->states[(index + pMgr->size) % pMgr->size].term = term; return 0; } SyncTerm syncLogReplMgrGetPrevLogTerm(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index); -int32_t syncLogBufferReplicateOneTo(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index, SRaftId* pDestId, - bool* pBarrier); +int32_t syncLogBufferReplicateOneTo(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index, SyncTerm* pTerm, + SRaftId* pDestId, bool* pBarrier); int32_t syncLogReplMgrProcessReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg); int32_t syncLogBufferReplicateOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode); int32_t syncLogReplMgrReplicateAttemptedOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode); diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index 1111547a9a..f94e7c041a 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -328,9 +328,16 @@ int32_t syncLogBufferInit(SSyncLogBuffer* pBuf, SSyncNode* pNode) { SyncTerm commitTerm = snapshot.lastApplyTerm; SyncIndex lastVer = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore); - SyncIndex toIndex = lastVer; - ASSERT(lastVer >= commitIndex); + if (lastVer < commitIndex) { + sError("vgId:%d, lastVer of WAL log less than tsdb commit version. lastVer: %" PRId64 + ", tsdb commit version: %" PRId64 "", + pNode->vgId, lastVer, commitIndex); + // TODO: terrno = TSDB_CODE_WAL_LOG_INCOMPLETE; + goto _err; + } + ASSERT(lastVer >= commitIndex); + SyncIndex toIndex = lastVer; // update match index pBuf->commitIndex = commitIndex; pBuf->matchIndex = toIndex; @@ -547,7 +554,7 @@ int64_t syncLogBufferProceed(SSyncLogBuffer* pBuf, SSyncNode* pNode) { ASSERT(prevLogIndex == pMatch->index); if (pMatch->term != prevLogTerm) { - sError( + sInfo( "vgId:%d, mismatching raft log entries encountered. " "{ index:%" PRId64 ", term:%" PRId64 " } " diff --git a/source/libs/sync/src/syncAppendEntriesReply.c b/source/libs/sync/src/syncAppendEntriesReply.c index ef20153fa7..eddd0d51c9 100644 --- a/source/libs/sync/src/syncAppendEntriesReply.c +++ b/source/libs/sync/src/syncAppendEntriesReply.c @@ -131,8 +131,8 @@ static FORCE_INLINE bool syncLogIsReplicationBarrier(SSyncRaftEntry* pEntry) { return pEntry->originalRpcType == TDMT_SYNC_NOOP; } -int32_t syncLogBufferReplicateOneTo(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index, SRaftId* pDestId, - bool* pBarrier) { +int32_t syncLogBufferReplicateOneTo(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index, SyncTerm* pTerm, + SRaftId* pDestId, bool* pBarrier) { SSyncRaftEntry* pEntry = NULL; SyncAppendEntries* pMsgOut = NULL; bool inBuf = false; @@ -140,8 +140,6 @@ int32_t syncLogBufferReplicateOneTo(SSyncLogReplMgr* pMgr, SSyncNode* pNode, Syn SyncTerm prevLogTerm = -1; SSyncLogBuffer* pBuf = pNode->pLogBuf; - sDebug("vgId:%d, replicate one msg index: %" PRId64 " to dest: 0x%016" PRIx64, pNode->vgId, index, pDestId->addr); - pEntry = syncLogBufferGetOneEntry(pBuf, pNode, index, &inBuf); if (pEntry == NULL) { sError("vgId:%d, failed to get raft entry for index: %" PRId64 "", pNode->vgId, index); @@ -154,7 +152,7 @@ int32_t syncLogBufferReplicateOneTo(SSyncLogReplMgr* pMgr, SSyncNode* pNode, Syn sError("vgId:%d, failed to get prev log term since %s. index: %" PRId64 "", pNode->vgId, terrstr(), index); goto _out; } - (void)syncLogReplMgrUpdateTerm(pMgr, pEntry->index, pEntry->term); + if (pTerm) *pTerm = pEntry->term; pMsgOut = syncLogToAppendEntries(pNode, pEntry, prevLogTerm); if (pMsgOut == NULL) { @@ -165,6 +163,9 @@ int32_t syncLogBufferReplicateOneTo(SSyncLogReplMgr* pMgr, SSyncNode* pNode, Syn (void)syncNodeSendAppendEntries(pNode, pDestId, pMsgOut); ret = 0; + sInfo("vgId:%d, replicate one msg index: %" PRId64 " term: %" PRId64 " prevterm: %" PRId64 " to dest: 0x%016" PRIx64, + pNode->vgId, pEntry->index, pEntry->term, prevLogTerm, pDestId->addr); + _out: syncAppendEntriesDestroy(pMsgOut); pMsgOut = NULL; diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index aaed6a10d0..ccaf5b92a3 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -1140,13 +1140,15 @@ int32_t syncLogReplMgrRetryOnNeed(SSyncLogReplMgr* pMgr, SSyncNode* pNode) { SRaftId* pDestId = &pNode->replicasId[pMgr->peerId]; bool barrier = false; - if (syncLogBufferReplicateOneTo(pMgr, pNode, index, pDestId, &barrier) < 0) { + SyncTerm term = -1; + if (syncLogBufferReplicateOneTo(pMgr, pNode, index, &term, pDestId, &barrier) < 0) { sError("vgId:%d, failed to replicate log entry since %s. index: %" PRId64 ", dest: 0x%016" PRIx64 "", pNode->vgId, terrstr(), index, pDestId->addr); goto _out; } ASSERT(barrier == pMgr->states[pos].barrier); pMgr->states[pos].timeMs = nowMs; + pMgr->states[pos].term = term; pMgr->states[pos].acked = false; retried = true; tsRetryCnt++; @@ -1162,6 +1164,7 @@ _out: int32_t syncLogReplMgrProcessReplyInRecoveryMode(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg) { + SSyncLogBuffer* pBuf = pNode->pLogBuf; SRaftId destId = pMsg->srcId; ASSERT(pMgr->restored == false); @@ -1170,6 +1173,10 @@ int32_t syncLogReplMgrProcessReplyInRecoveryMode(SSyncLogReplMgr* pMgr, SSyncNod ASSERT(pMgr->matchIndex == 0); if (pMsg->matchIndex < 0) { pMgr->restored = true; + sInfo("vgId:%d, sync log repl mgr of the %d'th peer restored. pMgr(rs:%d): [%" PRId64 " %" PRId64 ", %" PRId64 + "), pBuf: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", + pNode->vgId, pMgr->peerId, pMgr->restored, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex, + pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex); return 0; } } else { @@ -1182,6 +1189,10 @@ int32_t syncLogReplMgrProcessReplyInRecoveryMode(SSyncLogReplMgr* pMgr, SSyncNod if (pMsg->matchIndex == pMsg->lastSendIndex) { pMgr->restored = true; + sInfo("vgId:%d, sync log repl mgr of the %d'th peer restored. pMgr(rs:%d): [%" PRId64 " %" PRId64 ", %" PRId64 + "), pBuf: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", + pNode->vgId, pMgr->peerId, pMgr->restored, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex, + pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex); return 0; } @@ -1191,8 +1202,9 @@ int32_t syncLogReplMgrProcessReplyInRecoveryMode(SSyncLogReplMgr* pMgr, SSyncNod // send match index SyncIndex index = TMIN(pMsg->matchIndex, pNode->pLogBuf->matchIndex); bool barrier = false; + SyncTerm term = -1; ASSERT(index >= 0); - if (syncLogBufferReplicateOneTo(pMgr, pNode, index, &destId, &barrier) < 0) { + if (syncLogBufferReplicateOneTo(pMgr, pNode, index, &term, &destId, &barrier) < 0) { sError("vgId:%d, failed to replicate log entry since %s. index: %" PRId64 ", dest: 0x%016" PRIx64 "", pNode->vgId, terrstr(), index, destId.addr); return -1; @@ -1201,6 +1213,7 @@ int32_t syncLogReplMgrProcessReplyInRecoveryMode(SSyncLogReplMgr* pMgr, SSyncNod int64_t nowMs = taosGetMonoTimestampMs(); pMgr->states[index % pMgr->size].barrier = barrier; pMgr->states[index % pMgr->size].timeMs = nowMs; + pMgr->states[index % pMgr->size].term = term; pMgr->states[index % pMgr->size].acked = false; pMgr->matchIndex = index; @@ -1212,7 +1225,9 @@ int32_t syncLogReplMgrProcessReplyInRecoveryMode(SSyncLogReplMgr* pMgr, SSyncNod int32_t syncLogReplMgrProcessHeartbeatReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncHeartbeatReply* pMsg) { SSyncLogBuffer* pBuf = pNode->pLogBuf; taosThreadMutexLock(&pBuf->mutex); - if (pMsg->startTime != pMgr->peerStartTime) { + if (pMsg->startTime != 0 && pMsg->startTime != pMgr->peerStartTime) { + sInfo("vgId:%d, reset sync log repl mgr in heartbeat. start time:%" PRId64 ", old start time:%" PRId64 "", + pNode->vgId, pMsg->startTime, pMgr->peerStartTime); syncLogResetLogReplMgr(pMgr); pMgr->peerStartTime = pMsg->startTime; } @@ -1224,6 +1239,9 @@ int32_t syncLogReplMgrProcessReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode, Sync SSyncLogBuffer* pBuf = pNode->pLogBuf; taosThreadMutexLock(&pBuf->mutex); if (pMsg->startTime != pMgr->peerStartTime) { + sInfo("vgId:%d, reset sync log repl mgr in append entries reply. start time:%" PRId64 ", old start time:%" PRId64 + "", + pNode->vgId, pMsg->startTime, pMgr->peerStartTime); syncLogResetLogReplMgr(pMgr); pMgr->peerStartTime = pMsg->startTime; } @@ -1252,17 +1270,19 @@ int32_t syncLogReplMgrReplicateProbeOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode SyncIndex index = pNode->pLogBuf->matchIndex; SRaftId* pDestId = &pNode->replicasId[pMgr->peerId]; bool barrier = false; - if (syncLogBufferReplicateOneTo(pMgr, pNode, index, pDestId, &barrier) < 0) { + SyncTerm term = -1; + if (syncLogBufferReplicateOneTo(pMgr, pNode, index, &term, pDestId, &barrier) < 0) { sError("vgId:%d, failed to replicate log entry since %s. index: %" PRId64 ", dest: 0x%016" PRIx64 "", pNode->vgId, terrstr(), index, pDestId->addr); return -1; } SSyncLogBuffer* pBuf = pNode->pLogBuf; - sInfo("vgId:%d, attempted to probe the %d'th peer. pMgr(rs:%d): [%" PRId64 " %" PRId64 ", %" PRId64 - "), pBuf: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", - pNode->vgId, pMgr->peerId, pMgr->restored, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex, pBuf->startIndex, - pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex); + sInfo("vgId:%d, attempted to probe the %d'th peer with msg of index:%" PRId64 " term: %" PRId64 + ". pMgr(rs:%d): [%" PRId64 " %" PRId64 ", %" PRId64 "), pBuf: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 + ")", + pNode->vgId, pMgr->peerId, index, term, pMgr->restored, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex, + pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex); return 0; } @@ -1273,6 +1293,7 @@ int32_t syncLogReplMgrReplicateAttemptedOnce(SSyncLogReplMgr* pMgr, SSyncNode* p SRaftId* pDestId = &pNode->replicasId[pMgr->peerId]; int32_t batchSize = TMAX(1, pMgr->size / 20); int32_t count = 0; + int64_t nowMs = taosGetMonoTimestampMs(); for (SyncIndex index = pMgr->endIndex; index <= pNode->pLogBuf->matchIndex; index++) { if (batchSize < count++ || pMgr->startIndex + pMgr->size <= index) { @@ -1284,13 +1305,15 @@ int32_t syncLogReplMgrReplicateAttemptedOnce(SSyncLogReplMgr* pMgr, SSyncNode* p int64_t pos = index % pMgr->size; SRaftId* pDestId = &pNode->replicasId[pMgr->peerId]; bool barrier = false; - if (syncLogBufferReplicateOneTo(pMgr, pNode, index, pDestId, &barrier) < 0) { + SyncTerm term = -1; + if (syncLogBufferReplicateOneTo(pMgr, pNode, index, &term, pDestId, &barrier) < 0) { sError("vgId:%d, failed to replicate log entry since %s. index: %" PRId64 ", dest: 0x%016" PRIx64 "", pNode->vgId, terrstr(), index, pDestId->addr); return -1; } pMgr->states[pos].barrier = barrier; - pMgr->states[pos].timeMs = taosGetMonoTimestampMs(); + pMgr->states[pos].timeMs = nowMs; + pMgr->states[pos].term = term; pMgr->states[pos].acked = false; pMgr->endIndex = index + 1; @@ -1685,7 +1708,7 @@ SSyncNode* syncNodeOpen(SSyncInfo* pOldSyncInfo) { // init log buffer if (syncLogBufferInit(pSyncNode->pLogBuf, pSyncNode) < 0) { sError("vgId:%d, failed to init raft log buffer since %s", pSyncNode->vgId, terrstr()); - ASSERT(false); + goto _error; } syncNodeEventLog(pSyncNode, "sync open"); @@ -3497,6 +3520,7 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, SyncHeartbeat* pMsg) { pMsgReply->srcId = ths->myRaftId; pMsgReply->term = ths->pRaftStore->currentTerm; pMsgReply->privateTerm = 8864; // magic number + pMsgReply->startTime = ths->startTime; SRpcMsg rpcMsg; syncHeartbeatReply2RpcMsg(pMsgReply, &rpcMsg); @@ -3618,6 +3642,7 @@ SyncTerm syncLogReplMgrGetPrevLogTerm(SSyncLogReplMgr* pMgr, SSyncNode* pNode, S int64_t timeMs = pMgr->states[(prevIndex + pMgr->size) % pMgr->size].timeMs; ASSERT(timeMs != 0 && "no log entry found"); prevLogTerm = pMgr->states[(prevIndex + pMgr->size) % pMgr->size].term; + ASSERT(prevIndex == 0 || prevLogTerm != 0); return prevLogTerm; } @@ -3669,21 +3694,6 @@ void syncLogReplicateAppendEntries(SSyncNode* pNode, SyncAppendEntries* pMsg) { } } -int32_t syncLogBufferReplicate(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEntry* pEntry, SyncTerm prevLogTerm) { - SyncAppendEntries* pMsgOut = syncLogToAppendEntries(pNode, pEntry, prevLogTerm); - if (pMsgOut == NULL) { - sError("vgId:%d, failed to get append entries for index:%" PRId64 "", pNode->vgId, pEntry->index); - goto _err; - } - - // replicate pMsgOut - (void)syncLogReplicateAppendEntries(pNode, pMsgOut); - -_err: - syncAppendEntriesDestroy(pMsgOut); - return 0; -} - // TLA+ Spec // ClientRequest(i, v) == // /\ state[i] = Leader -- GitLab