diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index 29d0d6ccc04c2a86dde861226d20c03570a4a070..19069dc1ce64dc9348e9e28e8d7b11a7f1b8f5ec 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -252,6 +252,7 @@ void syncNodeRelease(SSyncNode* pNode); // raft state change -------------- void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term); void syncNodeUpdateTermWithoutStepDown(SSyncNode* pSyncNode, SyncTerm term); +void syncNodeStepDown(SSyncNode* pSyncNode, SyncTerm newTerm); void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr); void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr); @@ -306,7 +307,6 @@ int32_t syncNodeDynamicQuorum(const SSyncNode* pSyncNode); bool syncNodeIsMnode(SSyncNode* pSyncNode); int32_t syncNodePeerStateInit(SSyncNode* pSyncNode); -void syncNodeStepDown(SSyncNode* pSyncNode, SyncTerm newTerm); // trace log void syncLogRecvTimer(SSyncNode* pSyncNode, const SyncTimeout* pMsg, const char* s); diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index 414a4dd915c29349f8e4129f6f9dd3af8fe04ce3..3853bdd8da0ff66623ea4dd418248c267ae9db14 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -308,24 +308,31 @@ int32_t syncNodeFollowerCommit(SSyncNode* ths, SyncIndex newCommitIndex) { } int32_t syncNodeOnAppendEntries(SSyncNode* ths, SyncAppendEntries* pMsg) { + // if already drop replica, do not process + if (!syncNodeInRaftGroup(ths, &(pMsg->srcId)) && !ths->pRaftCfg->isStandBy) { + syncLogRecvAppendEntries(ths, pMsg, "ignore, maybe replica already dropped"); + goto _IGNORE; + } + // prepare response msg SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild(ths->vgId); pReply->srcId = ths->myRaftId; pReply->destId = pMsg->srcId; pReply->term = ths->pRaftStore->currentTerm; pReply->success = false; - pReply->matchIndex = ths->pLogStore->syncLogLastIndex(ths->pLogStore); + // pReply->matchIndex = ths->pLogStore->syncLogLastIndex(ths->pLogStore); + pReply->matchIndex = SYNC_INDEX_INVALID; pReply->lastSendIndex = pMsg->prevLogIndex + 1; pReply->privateTerm = ths->pNewNodeReceiver->privateTerm; pReply->startTime = ths->startTime; if (pMsg->term < ths->pRaftStore->currentTerm) { + syncLogRecvAppendEntries(ths, pMsg, "reject, small term"); goto _SEND_RESPONSE; } if (pMsg->term > ths->pRaftStore->currentTerm) { pReply->term = pMsg->term; - goto _SEND_RESPONSE; } syncNodeStepDown(ths, pMsg->term); @@ -335,6 +342,7 @@ int32_t syncNodeOnAppendEntries(SSyncNode* ths, SyncAppendEntries* pMsg) { SyncIndex lastIndex = ths->pLogStore->syncLogLastIndex(ths->pLogStore); if (pMsg->prevLogIndex > lastIndex) { + syncLogRecvAppendEntries(ths, pMsg, "reject, index not match"); goto _SEND_RESPONSE; } @@ -343,6 +351,7 @@ int32_t syncNodeOnAppendEntries(SSyncNode* ths, SyncAppendEntries* pMsg) { ASSERT(myPreLogTerm != SYNC_TERM_INVALID); if (myPreLogTerm != pMsg->prevLogTerm) { + syncLogRecvAppendEntries(ths, pMsg, "reject, pre-term not match"); goto _SEND_RESPONSE; } } @@ -357,6 +366,71 @@ int32_t syncNodeOnAppendEntries(SSyncNode* ths, SyncAppendEntries* pMsg) { SyncIndex appendIndex = pMsg->prevLogIndex + 1; SSyncRaftEntry* pLocalEntry = NULL; int32_t code = ths->pLogStore->syncLogGetEntry(ths->pLogStore, appendIndex, &pLocalEntry); + if (code == 0) { + if (pLocalEntry->term == pAppendEntry->term) { + // do nothing + + char logBuf[128]; + snprintf(logBuf, sizeof(logBuf), "log match, do nothing, index:%ld", appendIndex); + syncNodeEventLog(ths, logBuf); + + } else { + // truncate + code = ths->pLogStore->syncLogTruncate(ths->pLogStore, appendIndex); + if (code != 0) { + char logBuf[128]; + snprintf(logBuf, sizeof(logBuf), "ignore, truncate error, append-index:%ld", appendIndex); + syncLogRecvAppendEntries(ths, pMsg, logBuf); + + goto _IGNORE; + } + + // append + code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pAppendEntry); + if (code != 0) { + char logBuf[128]; + snprintf(logBuf, sizeof(logBuf), "ignore, append error, append-index:%ld", appendIndex); + syncLogRecvAppendEntries(ths, pMsg, logBuf); + + goto _IGNORE; + } + } + + } else { + if (terrno == TSDB_CODE_WAL_LOG_NOT_EXIST) { + // log not exist + + // truncate + code = ths->pLogStore->syncLogTruncate(ths->pLogStore, appendIndex); + if (code != 0) { + char logBuf[128]; + snprintf(logBuf, sizeof(logBuf), "ignore, log not exist, truncate error, append-index:%ld", appendIndex); + syncLogRecvAppendEntries(ths, pMsg, logBuf); + + goto _IGNORE; + } + + // append + code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pAppendEntry); + if (code != 0) { + char logBuf[128]; + snprintf(logBuf, sizeof(logBuf), "ignore, log not exist, append error, append-index:%ld", appendIndex); + syncLogRecvAppendEntries(ths, pMsg, logBuf); + + goto _IGNORE; + } + + } else { + // error + char logBuf[128]; + snprintf(logBuf, sizeof(logBuf), "ignore, get local entry error, append-index:%ld", appendIndex); + syncLogRecvAppendEntries(ths, pMsg, logBuf); + + goto _IGNORE; + } + } + +#if 0 if (code != 0 && terrno == TSDB_CODE_WAL_LOG_NOT_EXIST) { code = ths->pLogStore->syncLogTruncate(ths->pLogStore, appendIndex); ASSERT(code == 0); @@ -377,17 +451,26 @@ int32_t syncNodeOnAppendEntries(SSyncNode* ths, SyncAppendEntries* pMsg) { ASSERT(code == 0); } } +#endif + + // update match index + pReply->matchIndex = pAppendEntry->index; syncEntryDestory(pLocalEntry); syncEntryDestory(pAppendEntry); - } - // update match index - pReply->matchIndex = ths->pLogStore->syncLogLastIndex(ths->pLogStore); + } else { + // no append entries, do nothing + // maybe has extra entries, no harm + + // update match index + pReply->matchIndex = pMsg->prevLogIndex; + } // maybe update commit index, leader notice me syncNodeFollowerCommit(ths, pMsg->commitIndex); + syncLogRecvAppendEntries(ths, pMsg, "accept"); goto _SEND_RESPONSE; _IGNORE: diff --git a/source/libs/sync/src/syncAppendEntriesReply.c b/source/libs/sync/src/syncAppendEntriesReply.c index eb8777bf37369085ffa73a9908e6d905380dfd5d..d4463b2bc16ca1560c708ea1e2af10e79b30c440 100644 --- a/source/libs/sync/src/syncAppendEntriesReply.c +++ b/source/libs/sync/src/syncAppendEntriesReply.c @@ -133,5 +133,6 @@ int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, SyncAppendEntriesReply* pMs } } + syncLogRecvAppendEntriesReply(ths, pMsg, "process"); return 0; } \ No newline at end of file diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index c8609be23b4fceaf0270745518986aaeec055f57..d65d791147ca20bb586ce2553195b2941ed1f607 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -2154,6 +2154,30 @@ void syncNodeUpdateTermWithoutStepDown(SSyncNode* pSyncNode, SyncTerm term) { } } +void syncNodeStepDown(SSyncNode* pSyncNode, SyncTerm newTerm) { + ASSERT(pSyncNode->pRaftStore->currentTerm <= newTerm); + + do { + char logBuf[128]; + snprintf(logBuf, sizeof(logBuf), "step down, new-term:%lu, current-term:%lu", newTerm, + pSyncNode->pRaftStore->currentTerm); + syncNodeEventLog(pSyncNode, logBuf); + } while (0); + + if (pSyncNode->pRaftStore->currentTerm < newTerm) { + raftStoreSetTerm(pSyncNode->pRaftStore, newTerm); + char tmpBuf[64]; + snprintf(tmpBuf, sizeof(tmpBuf), "step down, update term to %" PRIu64, newTerm); + syncNodeBecomeFollower(pSyncNode, tmpBuf); + raftStoreClearVote(pSyncNode->pRaftStore); + + } else { + if (pSyncNode->state != TAOS_SYNC_STATE_FOLLOWER) { + syncNodeBecomeFollower(pSyncNode, "step down"); + } + } +} + void syncNodeLeaderChangeRsp(SSyncNode* pSyncNode) { syncRespCleanRsp(pSyncNode->pSyncRespMgr); } void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) { @@ -2243,6 +2267,9 @@ void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) { pSyncNode->pMatchIndex->index[i] = SYNC_INDEX_INVALID; } + // init peer mgr + syncNodePeerStateInit(pSyncNode); + // update sender private term SSyncSnapshotSender* pMySender = syncNodeGetSnapshotSender(pSyncNode, &(pSyncNode->myRaftId)); if (pMySender != NULL) { @@ -2316,23 +2343,6 @@ int32_t syncNodePeerStateInit(SSyncNode* pSyncNode) { return 0; } -void syncNodeStepDown(SSyncNode* pSyncNode, SyncTerm newTerm) { - ASSERT(pSyncNode->pRaftStore->currentTerm <= newTerm); - - if (pSyncNode->pRaftStore->currentTerm < newTerm) { - raftStoreSetTerm(pSyncNode->pRaftStore, newTerm); - char tmpBuf[64]; - snprintf(tmpBuf, sizeof(tmpBuf), "step down, update term to %" PRIu64, newTerm); - syncNodeBecomeFollower(pSyncNode, tmpBuf); - raftStoreClearVote(pSyncNode->pRaftStore); - - } else { - if (pSyncNode->state != TAOS_SYNC_STATE_FOLLOWER) { - syncNodeBecomeFollower(pSyncNode, "step down"); - } - } -} - void syncNodeFollower2Candidate(SSyncNode* pSyncNode) { ASSERT(pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER); pSyncNode->state = TAOS_SYNC_STATE_CANDIDATE; @@ -2831,17 +2841,20 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, SyncHeartbeat* pMsg) { SRpcMsg rpcMsg; syncHeartbeatReply2RpcMsg(pMsgReply, &rpcMsg); +#if 0 if (pMsg->term >= ths->pRaftStore->currentTerm && ths->state != TAOS_SYNC_STATE_FOLLOWER) { - syncNodeBecomeFollower(ths, "become follower by hb"); + syncNodeStepDown(ths, pMsg->term); } +#endif - if (pMsg->term == ths->pRaftStore->currentTerm) { - // sInfo("vgId:%d, heartbeat reset timer", ths->vgId); + if (pMsg->term == ths->pRaftStore->currentTerm && ths->state != TAOS_SYNC_STATE_LEADER) { syncNodeResetElectTimer(ths); +#if 0 if (ths->state == TAOS_SYNC_STATE_FOLLOWER) { syncNodeFollowerCommit(ths, pMsg->commitIndex); } +#endif } /* diff --git a/source/libs/sync/src/syncRaftLog.c b/source/libs/sync/src/syncRaftLog.c index 3f180aef4f43031f6100c5d3c27c492cdccecadf..eac32f3ad36f1d59ed78954f60bbaaf31bcd89e5 100644 --- a/source/libs/sync/src/syncRaftLog.c +++ b/source/libs/sync/src/syncRaftLog.c @@ -263,11 +263,15 @@ static int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index, do { char logBuf[128]; - snprintf(logBuf, sizeof(logBuf), "wal read error, index:%" PRId64 ", err:%d %X, msg:%s, syserr:%d, sysmsg:%s", - index, err, err, errStr, sysErr, sysErrStr); if (terrno == TSDB_CODE_WAL_LOG_NOT_EXIST) { - // syncNodeEventLog(pData->pSyncNode, logBuf); + snprintf(logBuf, sizeof(logBuf), + "wal read not exist, index:%" PRId64 ", err:%d %X, msg:%s, syserr:%d, sysmsg:%s", index, err, err, + errStr, sysErr, sysErrStr); + syncNodeEventLog(pData->pSyncNode, logBuf); + } else { + snprintf(logBuf, sizeof(logBuf), "wal read error, index:%" PRId64 ", err:%d %X, msg:%s, syserr:%d, sysmsg:%s", + index, err, err, errStr, sysErr, sysErrStr); syncNodeErrorLog(pData->pSyncNode, logBuf); } } while (0); diff --git a/source/libs/sync/src/syncReplication.c b/source/libs/sync/src/syncReplication.c index 8bf0798f9663d9de7815693bfc878b76f699472a..0ce1f5989a3945bcdab37407ad0ca31350281542 100644 --- a/source/libs/sync/src/syncReplication.c +++ b/source/libs/sync/src/syncReplication.c @@ -94,8 +94,18 @@ int32_t syncNodeReplicateOne(SSyncNode* pSyncNode, SRaftId* pDestId) { ASSERT(pMsg != NULL); } else { - syncNodeLog3("", pSyncNode); - ASSERT(0); + do { + char host[64]; + uint16_t port; + syncUtilU642Addr(pDestId->addr, host, sizeof(host), &port); + + char logBuf[128]; + snprintf(logBuf, sizeof(logBuf), "replicate to %s:%d error, next-index:%ld", host, port, nextIndex); + syncNodeErrorLog(pSyncNode, logBuf); + } while (0); + + syncAppendEntriesDestroy(pMsg); + return -1; } } diff --git a/source/libs/sync/src/syncRequestVote.c b/source/libs/sync/src/syncRequestVote.c index 3122747c9b14b7933a376944ca21236ab13c47b6..1ac4aa992077ea6cf0114b357261711a60043f2c 100644 --- a/source/libs/sync/src/syncRequestVote.c +++ b/source/libs/sync/src/syncRequestVote.c @@ -111,14 +111,8 @@ int32_t syncNodeOnRequestVote(SSyncNode* ths, SyncRequestVote* pMsg) { // maybe update term if (pMsg->term > ths->pRaftStore->currentTerm) { - syncNodeUpdateTerm(ths, pMsg->term); -#if 0 - if (logOK) { - syncNodeUpdateTerm(ths, pMsg->term); - } else { - syncNodeUpdateTermWithoutStepDown(ths, pMsg->term); - } -#endif + syncNodeStepDown(ths, pMsg->term); + // syncNodeUpdateTerm(ths, pMsg->term); } ASSERT(pMsg->term <= ths->pRaftStore->currentTerm); @@ -129,6 +123,9 @@ int32_t syncNodeOnRequestVote(SSyncNode* ths, SyncRequestVote* pMsg) { // vote again, no harm raftStoreVote(ths->pRaftStore, &(pMsg->srcId)); + // candidate ? + syncNodeStepDown(ths, ths->pRaftStore->currentTerm); + // forbid elect for this round syncNodeResetElectTimer(ths); } diff --git a/source/libs/sync/src/syncRequestVoteReply.c b/source/libs/sync/src/syncRequestVoteReply.c index 3166e17cde955dc4adee24d680e98066e196d428..0be5519b06307b8e36de50e68b4de24bc854f5b1 100644 --- a/source/libs/sync/src/syncRequestVoteReply.c +++ b/source/libs/sync/src/syncRequestVoteReply.c @@ -60,6 +60,7 @@ int32_t syncNodeOnRequestVoteReply(SSyncNode* ths, SyncRequestVoteReply* pMsg) { if (pMsg->term > ths->pRaftStore->currentTerm) { syncLogRecvRequestVoteReply(ths, pMsg, "error term"); + syncNodeStepDown(ths, pMsg->term); return -1; } diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index 6c4a03e10e81d0cffc9c7f38996945ca73558ed3..a7bafa9f28f2b6bdeeb568477cc524648d67cd05 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -408,6 +408,8 @@ char *snapshotSender2SimpleStr(SSyncSnapshotSender *pSender, char *event) { int32_t syncNodeStartSnapshot(SSyncNode *pSyncNode, SRaftId *pDestId) { // calculate index + syncNodeEventLog(pSyncNode, "start snapshot ..."); + SSyncSnapshotSender *pSender = syncNodeGetSnapshotSender(pSyncNode, pDestId); if (pSender == NULL) { // create sender