提交 7808fdfc 编写于 作者: M Minghao Li

refactor(sync): add trace log

上级 c2b348be
...@@ -257,6 +257,22 @@ int32_t syncNodeLeaderTransfer(SSyncNode* pSyncNode); ...@@ -257,6 +257,22 @@ int32_t syncNodeLeaderTransfer(SSyncNode* pSyncNode);
int32_t syncNodeLeaderTransferTo(SSyncNode* pSyncNode, SNodeInfo newLeader); int32_t syncNodeLeaderTransferTo(SSyncNode* pSyncNode, SNodeInfo newLeader);
int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* pEntry); int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* pEntry);
// trace log
void syncLogSendRequestVote(SSyncNode* pSyncNode, const SyncRequestVote* pMsg, const char* s);
void syncLogRecvRequestVote(SSyncNode* pSyncNode, const SyncRequestVote* pMsg, const char* s);
void syncLogSendRequestVoteReply(SSyncNode* pSyncNode, const SyncRequestVoteReply* pMsg, const char* s);
void syncLogRecvRequestVoteReply(SSyncNode* pSyncNode, const SyncRequestVoteReply* pMsg, const char* s);
void syncLogSendAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMsg, const char* s);
void syncLogRecvAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMsg, const char* s);
void syncLogSendAppendEntriesBatch(SSyncNode* pSyncNode, const SyncAppendEntriesBatch* pMsg, const char* s);
void syncLogRecvAppendEntriesBatch(SSyncNode* pSyncNode, const SyncAppendEntriesBatch* pMsg, const char* s);
void syncLogSendAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s);
void syncLogRecvAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s);
// for debug -------------- // for debug --------------
void syncNodePrint(SSyncNode* pObj); void syncNodePrint(SSyncNode* pObj);
void syncNodePrint2(char* s, SSyncNode* pObj); void syncNodePrint2(char* s, SSyncNode* pObj);
......
...@@ -94,21 +94,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { ...@@ -94,21 +94,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
// if already drop replica, do not process // if already drop replica, do not process
if (!syncNodeInRaftGroup(ths, &(pMsg->srcId)) && !ths->pRaftCfg->isStandBy) { if (!syncNodeInRaftGroup(ths, &(pMsg->srcId)) && !ths->pRaftCfg->isStandBy) {
do { syncLogRecvAppendEntries(ths, pMsg, "maybe replica already dropped");
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
char logBuf[256];
snprintf(logBuf, sizeof(logBuf),
"recv sync-append-entries from %s:%d {term:" PRIu64 ", pre-index:" PRId64 ", pre-term:" PRIu64
", commit:" PRId64 ", pterm:" PRIu64
", "
"datalen:%d}, maybe replica already dropped",
host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->commitIndex, pMsg->privateTerm,
pMsg->dataLen);
syncNodeErrorLog(ths, logBuf);
} while (0);
return -1; return -1;
} }
...@@ -125,30 +111,12 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { ...@@ -125,30 +111,12 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
} }
ASSERT(pMsg->dataLen >= 0); ASSERT(pMsg->dataLen >= 0);
do { // return to follower state
// return to follower state if (pMsg->term == ths->pRaftStore->currentTerm && ths->state == TAOS_SYNC_STATE_CANDIDATE) {
if (pMsg->term == ths->pRaftStore->currentTerm && ths->state == TAOS_SYNC_STATE_CANDIDATE) { syncLogRecvAppendEntries(ths, pMsg, "candidate to follower");
do { syncNodeBecomeFollower(ths, "from candidate by append entries");
char host[64]; return -1; // ret or reply?
uint16_t port; }
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
char logBuf[256];
snprintf(logBuf, sizeof(logBuf),
"recv sync-append-entries from %s:%d {term:" PRIu64 ", pre-index:" PRId64 ", pre-term:" PRIu64
", commit:" PRId64 ", pterm:" PRIu64
", "
"datalen:%d}, candidate to follower",
host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->commitIndex, pMsg->privateTerm,
pMsg->dataLen);
syncNodeEventLog(ths, logBuf);
} while (0);
syncNodeBecomeFollower(ths, "from candidate by append entries");
// ret or reply?
return -1;
}
} while (0);
SyncTerm localPreLogTerm = 0; SyncTerm localPreLogTerm = 0;
if (pMsg->prevLogIndex >= SYNC_INDEX_BEGIN && pMsg->prevLogIndex <= ths->pLogStore->getLastIndex(ths->pLogStore)) { if (pMsg->prevLogIndex >= SYNC_INDEX_BEGIN && pMsg->prevLogIndex <= ths->pLogStore->getLastIndex(ths->pLogStore)) {
...@@ -172,20 +140,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { ...@@ -172,20 +140,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
// reject request // reject request
if ((pMsg->term < ths->pRaftStore->currentTerm) || if ((pMsg->term < ths->pRaftStore->currentTerm) ||
((pMsg->term == ths->pRaftStore->currentTerm) && (ths->state == TAOS_SYNC_STATE_FOLLOWER) && !logOK)) { ((pMsg->term == ths->pRaftStore->currentTerm) && (ths->state == TAOS_SYNC_STATE_FOLLOWER) && !logOK)) {
do { syncLogRecvAppendEntries(ths, pMsg, "reject");
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
char logBuf[256];
snprintf(logBuf, sizeof(logBuf),
"recv sync-append-entries from %s:%d {term:" PRIu64 ", pre-index:" PRId64 ", pre-term:" PRIu64
", commit:" PRId64 ", pterm:" PRIu64
", "
"datalen:%d}, reject",
host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->commitIndex, pMsg->privateTerm,
pMsg->dataLen);
syncNodeEventLog(ths, logBuf);
} while (0);
SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild(ths->vgId); SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild(ths->vgId);
pReply->srcId = ths->myRaftId; pReply->srcId = ths->myRaftId;
...@@ -195,17 +150,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { ...@@ -195,17 +150,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
pReply->matchIndex = SYNC_INDEX_INVALID; pReply->matchIndex = SYNC_INDEX_INVALID;
// msg event log // msg event log
do { syncLogSendAppendEntriesReply(ths, pReply, "");
char host[64];
uint16_t port;
syncUtilU642Addr(pReply->destId.addr, host, sizeof(host), &port);
char logBuf[256];
snprintf(logBuf, sizeof(logBuf),
"send sync-append-entries-reply to %s:%d, {term:%" PRIu64 ", pterm:%" PRIu64
", success:%d, match-index:%" PRId64 "}",
host, port, pReply->term, pReply->privateTerm, pReply->success, pReply->matchIndex);
syncNodeEventLog(ths, logBuf);
} while (0);
SRpcMsg rpcMsg; SRpcMsg rpcMsg;
syncAppendEntriesReply2RpcMsg(pReply, &rpcMsg); syncAppendEntriesReply2RpcMsg(pReply, &rpcMsg);
...@@ -226,20 +171,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { ...@@ -226,20 +171,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
// has entries in SyncAppendEntries msg // has entries in SyncAppendEntries msg
bool hasAppendEntries = pMsg->dataLen > 0; bool hasAppendEntries = pMsg->dataLen > 0;
do { syncLogRecvAppendEntries(ths, pMsg, "accept");
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
char logBuf[256];
snprintf(logBuf, sizeof(logBuf),
"recv sync-append-entries from %s:%d {term:" PRIu64 ", pre-index:" PRId64 ", pre-term:" PRIu64
", commit:" PRId64 ", pterm:" PRIu64
", "
"datalen:%d}, accept",
host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->commitIndex, pMsg->privateTerm,
pMsg->dataLen);
syncNodeEventLog(ths, logBuf);
} while (0);
if (hasExtraEntries && hasAppendEntries) { if (hasExtraEntries && hasAppendEntries) {
// not conflict by default // not conflict by default
...@@ -389,17 +321,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { ...@@ -389,17 +321,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
} }
// msg event log // msg event log
do { syncLogSendAppendEntriesReply(ths, pReply, "");
char host[64];
uint16_t port;
syncUtilU642Addr(pReply->destId.addr, host, sizeof(host), &port);
char logBuf[256];
snprintf(logBuf, sizeof(logBuf),
"send sync-append-entries-reply to %s:%d, {term:%" PRIu64 ", pterm:%" PRIu64
", success:%d, match-index:%" PRId64 "}",
host, port, pReply->term, pReply->privateTerm, pReply->success, pReply->matchIndex);
syncNodeEventLog(ths, logBuf);
} while (0);
SRpcMsg rpcMsg; SRpcMsg rpcMsg;
syncAppendEntriesReply2RpcMsg(pReply, &rpcMsg); syncAppendEntriesReply2RpcMsg(pReply, &rpcMsg);
...@@ -602,22 +524,8 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc ...@@ -602,22 +524,8 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc
// if already drop replica, do not process // if already drop replica, do not process
if (!syncNodeInRaftGroup(ths, &(pMsg->srcId)) && !ths->pRaftCfg->isStandBy) { if (!syncNodeInRaftGroup(ths, &(pMsg->srcId)) && !ths->pRaftCfg->isStandBy) {
do { syncLogRecvAppendEntriesBatch(ths, pMsg, "maybe replica already dropped");
char host[64]; return -1;
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
char logBuf[256];
snprintf(logBuf, sizeof(logBuf),
"recv sync-append-entries-batch from %s:%d {term:" PRIu64 ", pre-index:" PRId64 ", pre-term:" PRIu64
", commit:" PRId64 ", pterm:" PRIu64
", "
"count:%d}, maybe replica already dropped",
host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->commitIndex, pMsg->privateTerm,
pMsg->dataCount);
syncNodeErrorLog(ths, logBuf);
} while (0);
return ret;
} }
// maybe update term // maybe update term
...@@ -640,28 +548,13 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc ...@@ -640,28 +548,13 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc
do { do {
bool condition = pMsg->term == ths->pRaftStore->currentTerm && ths->state == TAOS_SYNC_STATE_CANDIDATE; bool condition = pMsg->term == ths->pRaftStore->currentTerm && ths->state == TAOS_SYNC_STATE_CANDIDATE;
if (condition) { if (condition) {
do { syncLogRecvAppendEntriesBatch(ths, pMsg, "candidate to follower");
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
char logBuf[256];
snprintf(logBuf, sizeof(logBuf),
"recv sync-append-entries-batch from %s:%d {term:" PRIu64 ", pre-index:" PRId64 ", pre-term:" PRIu64
", commit:" PRId64 ", pterm:" PRIu64
", "
"count:%d}, candidate to follower",
host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->commitIndex, pMsg->privateTerm,
pMsg->dataCount);
syncNodeEventLog(ths, logBuf);
} while (0);
syncNodeBecomeFollower(ths, "from candidate by append entries"); syncNodeBecomeFollower(ths, "from candidate by append entries");
// do not reply? return 0; // do not reply?
return ret;
} }
} while (0); } while (0);
// fake match2 // fake match
// //
// condition1: // condition1:
// preIndex <= my commit index // preIndex <= my commit index
...@@ -673,20 +566,7 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc ...@@ -673,20 +566,7 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc
bool condition = (pMsg->term == ths->pRaftStore->currentTerm) && (ths->state == TAOS_SYNC_STATE_FOLLOWER) && bool condition = (pMsg->term == ths->pRaftStore->currentTerm) && (ths->state == TAOS_SYNC_STATE_FOLLOWER) &&
(pMsg->prevLogIndex <= ths->commitIndex); (pMsg->prevLogIndex <= ths->commitIndex);
if (condition) { if (condition) {
do { syncLogRecvAppendEntriesBatch(ths, pMsg, "fake match");
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
char logBuf[256];
snprintf(logBuf, sizeof(logBuf),
"recv sync-append-entries-batch from %s:%d {term:" PRIu64 ", pre-index:" PRId64 ", pre-term:" PRIu64
", commit:" PRId64 ", pterm:" PRIu64
", "
"count:%d}, fake match2",
host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->commitIndex, pMsg->privateTerm,
pMsg->dataCount);
syncNodeEventLog(ths, logBuf);
} while (0);
SyncIndex matchIndex = ths->commitIndex; SyncIndex matchIndex = ths->commitIndex;
bool hasAppendEntries = pMsg->dataLen > 0; bool hasAppendEntries = pMsg->dataLen > 0;
...@@ -739,17 +619,7 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc ...@@ -739,17 +619,7 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc
pReply->matchIndex = matchIndex; pReply->matchIndex = matchIndex;
// msg event log // msg event log
do { syncLogSendAppendEntriesReply(ths, pReply, "");
char host[64];
uint16_t port;
syncUtilU642Addr(pReply->destId.addr, host, sizeof(host), &port);
char logBuf[256];
snprintf(logBuf, sizeof(logBuf),
"send sync-append-entries-reply to %s:%d, {term:%" PRIu64 ", pterm:%" PRIu64
", success:%d, match-index:%" PRId64 "}",
host, port, pReply->term, pReply->privateTerm, pReply->success, pReply->matchIndex);
syncNodeEventLog(ths, logBuf);
} while (0);
// send response // send response
SRpcMsg rpcMsg; SRpcMsg rpcMsg;
...@@ -782,20 +652,7 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc ...@@ -782,20 +652,7 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc
bool condition = condition1 || condition2; bool condition = condition1 || condition2;
if (condition) { if (condition) {
do { syncLogRecvAppendEntriesBatch(ths, pMsg, "not match");
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
char logBuf[256];
snprintf(logBuf, sizeof(logBuf),
"recv sync-append-entries-batch from %s:%d {term:" PRIu64 ", pre-index:" PRId64 ", pre-term:" PRIu64
", commit:" PRId64 ", pterm:" PRIu64
", "
"count:%d}, not match",
host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->commitIndex, pMsg->privateTerm,
pMsg->dataCount);
syncNodeEventLog(ths, logBuf);
} while (0);
// maybe update commit index by snapshot // maybe update commit index by snapshot
syncNodeMaybeUpdateCommitBySnapshot(ths); syncNodeMaybeUpdateCommitBySnapshot(ths);
...@@ -810,17 +667,7 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc ...@@ -810,17 +667,7 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc
pReply->matchIndex = ths->commitIndex; pReply->matchIndex = ths->commitIndex;
// msg event log // msg event log
do { syncLogSendAppendEntriesReply(ths, pReply, "");
char host[64];
uint16_t port;
syncUtilU642Addr(pReply->destId.addr, host, sizeof(host), &port);
char logBuf[256];
snprintf(logBuf, sizeof(logBuf),
"send sync-append-entries-reply to %s:%d, {term:%" PRIu64 ", pterm:%" PRIu64
", success:%d, match-index:%" PRId64 "}",
host, port, pReply->term, pReply->privateTerm, pReply->success, pReply->matchIndex);
syncNodeEventLog(ths, logBuf);
} while (0);
// send response // send response
SRpcMsg rpcMsg; SRpcMsg rpcMsg;
...@@ -851,20 +698,7 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc ...@@ -851,20 +698,7 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc
bool hasAppendEntries = pMsg->dataLen > 0; bool hasAppendEntries = pMsg->dataLen > 0;
SOffsetAndContLen* metaTableArr = syncAppendEntriesBatchMetaTableArray(pMsg); SOffsetAndContLen* metaTableArr = syncAppendEntriesBatchMetaTableArray(pMsg);
do { syncLogRecvAppendEntriesBatch(ths, pMsg, "really match");
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
char logBuf[256];
snprintf(logBuf, sizeof(logBuf),
"recv sync-append-entries-batch from %s:%d {term:" PRIu64 ", pre-index:" PRId64 ", pre-term:" PRIu64
", commit:" PRId64 ", pterm:" PRIu64
", "
"count:%d}, match",
host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->commitIndex, pMsg->privateTerm,
pMsg->dataCount);
syncNodeEventLog(ths, logBuf);
} while (0);
if (hasExtraEntries) { if (hasExtraEntries) {
// make log same, rollback deleted entries // make log same, rollback deleted entries
...@@ -903,17 +737,7 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc ...@@ -903,17 +737,7 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc
pReply->matchIndex = hasAppendEntries ? pMsg->prevLogIndex + pMsg->dataCount : pMsg->prevLogIndex; pReply->matchIndex = hasAppendEntries ? pMsg->prevLogIndex + pMsg->dataCount : pMsg->prevLogIndex;
// msg event log // msg event log
do { syncLogSendAppendEntriesReply(ths, pReply, "");
char host[64];
uint16_t port;
syncUtilU642Addr(pReply->destId.addr, host, sizeof(host), &port);
char logBuf[256];
snprintf(logBuf, sizeof(logBuf),
"send sync-append-entries-reply to %s:%d, {term:%" PRIu64 ", pterm:%" PRIu64
", success:%d, match-index:%" PRId64 "}",
host, port, pReply->term, pReply->privateTerm, pReply->success, pReply->matchIndex);
syncNodeEventLog(ths, logBuf);
} while (0);
// send response // send response
SRpcMsg rpcMsg; SRpcMsg rpcMsg;
...@@ -966,22 +790,8 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs ...@@ -966,22 +790,8 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
// if already drop replica, do not process // if already drop replica, do not process
if (!syncNodeInRaftGroup(ths, &(pMsg->srcId)) && !ths->pRaftCfg->isStandBy) { if (!syncNodeInRaftGroup(ths, &(pMsg->srcId)) && !ths->pRaftCfg->isStandBy) {
do { syncLogRecvAppendEntries(ths, pMsg, "maybe replica already dropped");
char host[64]; return -1;
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
char logBuf[256];
snprintf(logBuf, sizeof(logBuf),
"recv sync-append-entries from %s:%d {term:" PRIu64 ", pre-index:" PRId64 ", pre-term:" PRIu64
", commit:" PRId64 ", pterm:" PRIu64
", "
"datalen:%d}, maybe replica dropped",
host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->commitIndex, pMsg->privateTerm,
pMsg->dataLen);
syncNodeErrorLog(ths, logBuf);
} while (0);
return ret;
} }
// maybe update term // maybe update term
...@@ -1004,24 +814,9 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs ...@@ -1004,24 +814,9 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
do { do {
bool condition = pMsg->term == ths->pRaftStore->currentTerm && ths->state == TAOS_SYNC_STATE_CANDIDATE; bool condition = pMsg->term == ths->pRaftStore->currentTerm && ths->state == TAOS_SYNC_STATE_CANDIDATE;
if (condition) { if (condition) {
do { syncLogRecvAppendEntries(ths, pMsg, "candidate to follower");
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
char logBuf[256];
snprintf(logBuf, sizeof(logBuf),
"recv sync-append-entries from %s:%d {term:" PRIu64 ", pre-index:" PRId64 ", pre-term:" PRIu64
", commit:" PRId64 ", pterm:" PRIu64
", "
"datalen:%d}, candidate to follower",
host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->commitIndex, pMsg->privateTerm,
pMsg->dataLen);
syncNodeEventLog(ths, logBuf);
} while (0);
syncNodeBecomeFollower(ths, "from candidate by append entries"); syncNodeBecomeFollower(ths, "from candidate by append entries");
// do not reply? return 0; // do not reply?
return ret;
} }
} while (0); } while (0);
...@@ -1084,7 +879,7 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs ...@@ -1084,7 +879,7 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
} while (0); } while (0);
#endif #endif
// fake match2 // fake match
// //
// condition1: // condition1:
// preIndex <= my commit index // preIndex <= my commit index
...@@ -1097,20 +892,7 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs ...@@ -1097,20 +892,7 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
bool condition = (pMsg->term == ths->pRaftStore->currentTerm) && (ths->state == TAOS_SYNC_STATE_FOLLOWER) && bool condition = (pMsg->term == ths->pRaftStore->currentTerm) && (ths->state == TAOS_SYNC_STATE_FOLLOWER) &&
(pMsg->prevLogIndex <= ths->commitIndex); (pMsg->prevLogIndex <= ths->commitIndex);
if (condition) { if (condition) {
do { syncLogRecvAppendEntries(ths, pMsg, "fake match");
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
char logBuf[256];
snprintf(logBuf, sizeof(logBuf),
"recv sync-append-entries from %s:%d {term:" PRIu64 ", pre-index:" PRId64 ", pre-term:" PRIu64
", commit:" PRId64 ", pterm:" PRIu64
", "
"datalen:%d}, fake match2",
host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->commitIndex, pMsg->privateTerm,
pMsg->dataLen);
syncNodeEventLog(ths, logBuf);
} while (0);
SyncIndex matchIndex = ths->commitIndex; SyncIndex matchIndex = ths->commitIndex;
bool hasAppendEntries = pMsg->dataLen > 0; bool hasAppendEntries = pMsg->dataLen > 0;
...@@ -1156,17 +938,7 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs ...@@ -1156,17 +938,7 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
pReply->matchIndex = matchIndex; pReply->matchIndex = matchIndex;
// msg event log // msg event log
do { syncLogSendAppendEntriesReply(ths, pReply, "");
char host[64];
uint16_t port;
syncUtilU642Addr(pReply->destId.addr, host, sizeof(host), &port);
char logBuf[256];
snprintf(logBuf, sizeof(logBuf),
"send sync-append-entries-reply to %s:%d, {term:%" PRIu64 ", pterm:%" PRIu64
", success:%d, match-index:%" PRId64 "}",
host, port, pReply->term, pReply->privateTerm, pReply->success, pReply->matchIndex);
syncNodeEventLog(ths, logBuf);
} while (0);
// send response // send response
SRpcMsg rpcMsg; SRpcMsg rpcMsg;
...@@ -1199,20 +971,7 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs ...@@ -1199,20 +971,7 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
bool condition = condition1 || condition2; bool condition = condition1 || condition2;
if (condition) { if (condition) {
do { syncLogRecvAppendEntries(ths, pMsg, "not match");
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
char logBuf[256];
snprintf(logBuf, sizeof(logBuf),
"recv sync-append-entries from %s:%d {term:" PRIu64 ", pre-index:" PRId64 ", pre-term:" PRIu64
", commit:" PRId64 ", pterm:" PRIu64
", "
"datalen:%d}, not match",
host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->commitIndex, pMsg->privateTerm,
pMsg->dataLen);
syncNodeEventLog(ths, logBuf);
} while (0);
// prepare response msg // prepare response msg
SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild(ths->vgId); SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild(ths->vgId);
...@@ -1224,17 +983,7 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs ...@@ -1224,17 +983,7 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
pReply->matchIndex = SYNC_INDEX_INVALID; pReply->matchIndex = SYNC_INDEX_INVALID;
// msg event log // msg event log
do { syncLogSendAppendEntriesReply(ths, pReply, "");
char host[64];
uint16_t port;
syncUtilU642Addr(pReply->destId.addr, host, sizeof(host), &port);
char logBuf[256];
snprintf(logBuf, sizeof(logBuf),
"send sync-append-entries-reply to %s:%d, {term:%" PRIu64 ", pterm:%" PRIu64
", success:%d, match-index:%" PRId64 "}",
host, port, pReply->term, pReply->privateTerm, pReply->success, pReply->matchIndex);
syncNodeEventLog(ths, logBuf);
} while (0);
// send response // send response
SRpcMsg rpcMsg; SRpcMsg rpcMsg;
...@@ -1264,20 +1013,7 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs ...@@ -1264,20 +1013,7 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
// has entries in SyncAppendEntries msg // has entries in SyncAppendEntries msg
bool hasAppendEntries = pMsg->dataLen > 0; bool hasAppendEntries = pMsg->dataLen > 0;
do { syncLogRecvAppendEntries(ths, pMsg, "really match");
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
char logBuf[256];
snprintf(logBuf, sizeof(logBuf),
"recv sync-append-entries from %s:%d {term:" PRIu64 ", pre-index:" PRId64 ", pre-term:" PRIu64
", commit:" PRId64 ", pterm:" PRIu64
", "
"datalen:%d}, match",
host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->commitIndex, pMsg->privateTerm,
pMsg->dataLen);
syncNodeEventLog(ths, logBuf);
} while (0);
if (hasExtraEntries) { if (hasExtraEntries) {
// make log same, rollback deleted entries // make log same, rollback deleted entries
...@@ -1312,17 +1048,7 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs ...@@ -1312,17 +1048,7 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
pReply->matchIndex = hasAppendEntries ? pMsg->prevLogIndex + 1 : pMsg->prevLogIndex; pReply->matchIndex = hasAppendEntries ? pMsg->prevLogIndex + 1 : pMsg->prevLogIndex;
// msg event log // msg event log
do { syncLogSendAppendEntriesReply(ths, pReply, "");
char host[64];
uint16_t port;
syncUtilU642Addr(pReply->destId.addr, host, sizeof(host), &port);
char logBuf[256];
snprintf(logBuf, sizeof(logBuf),
"send sync-append-entries-reply to %s:%d, {term:%" PRIu64 ", pterm:%" PRIu64
", success:%d, match-index:%" PRId64 "}",
host, port, pReply->term, pReply->privateTerm, pReply->success, pReply->matchIndex);
syncNodeEventLog(ths, logBuf);
} while (0);
// send response // send response
SRpcMsg rpcMsg; SRpcMsg rpcMsg;
......
...@@ -42,36 +42,13 @@ int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* p ...@@ -42,36 +42,13 @@ int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* p
// if already drop replica, do not process // if already drop replica, do not process
if (!syncNodeInRaftGroup(ths, &(pMsg->srcId)) && !ths->pRaftCfg->isStandBy) { if (!syncNodeInRaftGroup(ths, &(pMsg->srcId)) && !ths->pRaftCfg->isStandBy) {
do { syncLogRecvAppendEntriesReply(ths, pMsg, "maybe replica already dropped");
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
char logBuf[256];
snprintf(logBuf, sizeof(logBuf),
"recv sync-append-entries-reply from %s:%d {term:" PRIu64 ", pterm:" PRIu64 ", success:%d, match:" PRId64
"}, maybe replica "
"already dropped",
host, port, pMsg->term, pMsg->privateTerm, pMsg->success, pMsg->matchIndex);
syncNodeErrorLog(ths, logBuf);
} while (0);
return -1; return -1;
} }
// drop stale response // drop stale response
if (pMsg->term < ths->pRaftStore->currentTerm) { if (pMsg->term < ths->pRaftStore->currentTerm) {
do { syncLogRecvAppendEntriesReply(ths, pMsg, "drop stale response");
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
char logBuf[256];
snprintf(logBuf, sizeof(logBuf),
"recv sync-append-entries-reply from %s:%d {term:" PRIu64 ", pterm:" PRIu64 ", success:%d, match:" PRId64
"}, drop stale response",
host, port, pMsg->term, pMsg->privateTerm, pMsg->success, pMsg->matchIndex);
syncNodeEventLog(ths, logBuf);
} while (0);
return 0; return 0;
} }
...@@ -81,23 +58,15 @@ int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* p ...@@ -81,23 +58,15 @@ int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* p
// } // }
if (pMsg->term > ths->pRaftStore->currentTerm) { if (pMsg->term > ths->pRaftStore->currentTerm) {
do { syncLogRecvAppendEntriesReply(ths, pMsg, "error term");
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
char logBuf[256];
snprintf(logBuf, sizeof(logBuf),
"recv sync-append-entries-reply from %s:%d {term:" PRIu64 ", pterm:" PRIu64 ", success:%d, match:" PRId64
"}, error term",
host, port, pMsg->term, pMsg->privateTerm, pMsg->success, pMsg->matchIndex);
syncNodeErrorLog(ths, logBuf);
} while (0);
return -1; return -1;
} }
ASSERT(pMsg->term == ths->pRaftStore->currentTerm); ASSERT(pMsg->term == ths->pRaftStore->currentTerm);
SyncIndex beforeNextIndex = syncIndexMgrGetIndex(ths->pNextIndex, &(pMsg->srcId));
SyncIndex beforeMatchIndex = syncIndexMgrGetIndex(ths->pMatchIndex, &(pMsg->srcId));
if (pMsg->success) { if (pMsg->success) {
// nextIndex' = [nextIndex EXCEPT ![i][j] = m.mmatchIndex + 1] // nextIndex' = [nextIndex EXCEPT ![i][j] = m.mmatchIndex + 1]
syncIndexMgrSetIndex(ths->pNextIndex, &(pMsg->srcId), pMsg->matchIndex + 1); syncIndexMgrSetIndex(ths->pNextIndex, &(pMsg->srcId), pMsg->matchIndex + 1);
...@@ -120,20 +89,13 @@ int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* p ...@@ -120,20 +89,13 @@ int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* p
syncIndexMgrSetIndex(ths->pNextIndex, &(pMsg->srcId), nextIndex); syncIndexMgrSetIndex(ths->pNextIndex, &(pMsg->srcId), nextIndex);
} }
SyncIndex afterNextIndex = syncIndexMgrGetIndex(ths->pNextIndex, &(pMsg->srcId));
SyncIndex afterMatchIndex = syncIndexMgrGetIndex(ths->pMatchIndex, &(pMsg->srcId));
do { do {
char host[64]; char logBuf[256];
uint16_t port; snprintf(logBuf, sizeof(logBuf), "before next:%ld, match:%ld, after next:%ld, match:%ld", beforeNextIndex,
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); beforeMatchIndex, afterNextIndex, afterMatchIndex);
char logBuf[256]; syncLogRecvAppendEntriesReply(ths, pMsg, logBuf);
SyncIndex nextIndex = syncIndexMgrGetIndex(ths->pNextIndex, &(pMsg->srcId));
SyncIndex matchIndex = syncIndexMgrGetIndex(ths->pMatchIndex, &(pMsg->srcId));
snprintf(logBuf, sizeof(logBuf),
"recv sync-append-entries-reply from %s:%d {term:" PRIu64 ", pterm:" PRIu64 ", success:%d, match:" PRId64
"}, after next:" PRId64
", "
"match:" PRId64 "",
host, port, pMsg->term, pMsg->privateTerm, pMsg->success, pMsg->matchIndex, nextIndex, matchIndex);
syncNodeEventLog(ths, logBuf);
} while (0); } while (0);
return 0; return 0;
...@@ -179,58 +141,27 @@ int32_t syncNodeOnAppendEntriesReplySnapshot2Cb(SSyncNode* ths, SyncAppendEntrie ...@@ -179,58 +141,27 @@ int32_t syncNodeOnAppendEntriesReplySnapshot2Cb(SSyncNode* ths, SyncAppendEntrie
// if already drop replica, do not process // if already drop replica, do not process
if (!syncNodeInRaftGroup(ths, &(pMsg->srcId)) && !ths->pRaftCfg->isStandBy) { if (!syncNodeInRaftGroup(ths, &(pMsg->srcId)) && !ths->pRaftCfg->isStandBy) {
do { syncLogRecvAppendEntriesReply(ths, pMsg, "maybe replica already dropped");
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
char logBuf[256];
snprintf(logBuf, sizeof(logBuf),
"recv sync-append-entries-reply from %s:%d {term:" PRIu64 ", pterm:" PRIu64 ", success:%d, match:" PRId64
"}, maybe replica "
"already dropped",
host, port, pMsg->term, pMsg->privateTerm, pMsg->success, pMsg->matchIndex);
syncNodeErrorLog(ths, logBuf);
} while (0);
return -1; return -1;
} }
// drop stale response // drop stale response
if (pMsg->term < ths->pRaftStore->currentTerm) { if (pMsg->term < ths->pRaftStore->currentTerm) {
do { syncLogRecvAppendEntriesReply(ths, pMsg, "drop stale response");
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
char logBuf[256];
snprintf(logBuf, sizeof(logBuf),
"recv sync-append-entries-reply from %s:%d {term:" PRIu64 ", pterm:" PRIu64 ", success:%d, match:" PRId64
"}, drop stale response",
host, port, pMsg->term, pMsg->privateTerm, pMsg->success, pMsg->matchIndex);
syncNodeEventLog(ths, logBuf);
} while (0);
return 0; return 0;
} }
// error term // error term
if (pMsg->term > ths->pRaftStore->currentTerm) { if (pMsg->term > ths->pRaftStore->currentTerm) {
do { syncLogRecvAppendEntriesReply(ths, pMsg, "error term");
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
char logBuf[256];
snprintf(logBuf, sizeof(logBuf),
"recv sync-append-entries-reply from %s:%d {term:" PRIu64 ", pterm:" PRIu64 ", success:%d, match:" PRId64
"}, error term",
host, port, pMsg->term, pMsg->privateTerm, pMsg->success, pMsg->matchIndex);
syncNodeErrorLog(ths, logBuf);
} while (0);
return -1; return -1;
} }
ASSERT(pMsg->term == ths->pRaftStore->currentTerm); ASSERT(pMsg->term == ths->pRaftStore->currentTerm);
SyncIndex beforeNextIndex = syncIndexMgrGetIndex(ths->pNextIndex, &(pMsg->srcId));
SyncIndex beforeMatchIndex = syncIndexMgrGetIndex(ths->pMatchIndex, &(pMsg->srcId));
if (pMsg->success) { if (pMsg->success) {
SyncIndex newNextIndex = pMsg->matchIndex + 1; SyncIndex newNextIndex = pMsg->matchIndex + 1;
SyncIndex newMatchIndex = pMsg->matchIndex; SyncIndex newMatchIndex = pMsg->matchIndex;
...@@ -343,20 +274,13 @@ int32_t syncNodeOnAppendEntriesReplySnapshot2Cb(SSyncNode* ths, SyncAppendEntrie ...@@ -343,20 +274,13 @@ int32_t syncNodeOnAppendEntriesReplySnapshot2Cb(SSyncNode* ths, SyncAppendEntrie
} while (0); } while (0);
} }
SyncIndex afterNextIndex = syncIndexMgrGetIndex(ths->pNextIndex, &(pMsg->srcId));
SyncIndex afterMatchIndex = syncIndexMgrGetIndex(ths->pMatchIndex, &(pMsg->srcId));
do { do {
char host[64]; char logBuf[256];
uint16_t port; snprintf(logBuf, sizeof(logBuf), "before next:%ld, match:%ld, after next:%ld, match:%ld", beforeNextIndex,
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); beforeMatchIndex, afterNextIndex, afterMatchIndex);
char logBuf[256]; syncLogRecvAppendEntriesReply(ths, pMsg, logBuf);
SyncIndex nextIndex = syncIndexMgrGetIndex(ths->pNextIndex, &(pMsg->srcId));
SyncIndex matchIndex = syncIndexMgrGetIndex(ths->pMatchIndex, &(pMsg->srcId));
snprintf(logBuf, sizeof(logBuf),
"recv sync-append-entries-reply from %s:%d {term:" PRIu64 ", pterm:" PRIu64 ", success:%d, match:" PRId64
"}, after next:" PRId64
", "
"match:" PRId64 "",
host, port, pMsg->term, pMsg->privateTerm, pMsg->success, pMsg->matchIndex, nextIndex, matchIndex);
syncNodeEventLog(ths, logBuf);
} while (0); } while (0);
return 0; return 0;
...@@ -367,36 +291,13 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries ...@@ -367,36 +291,13 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries
// if already drop replica, do not process // if already drop replica, do not process
if (!syncNodeInRaftGroup(ths, &(pMsg->srcId)) && !ths->pRaftCfg->isStandBy) { if (!syncNodeInRaftGroup(ths, &(pMsg->srcId)) && !ths->pRaftCfg->isStandBy) {
do { syncLogRecvAppendEntriesReply(ths, pMsg, "maybe replica already dropped");
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
char logBuf[256];
snprintf(logBuf, sizeof(logBuf),
"recv sync-append-entries-reply from %s:%d {term:" PRIu64 ", pterm:" PRIu64 ", success:%d, match:" PRId64
"}, maybe replica "
"already dropped",
host, port, pMsg->term, pMsg->privateTerm, pMsg->success, pMsg->matchIndex);
syncNodeErrorLog(ths, logBuf);
} while (0);
return -1; return -1;
} }
// drop stale response // drop stale response
if (pMsg->term < ths->pRaftStore->currentTerm) { if (pMsg->term < ths->pRaftStore->currentTerm) {
do { syncLogRecvAppendEntriesReply(ths, pMsg, "drop stale response");
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
char logBuf[256];
snprintf(logBuf, sizeof(logBuf),
"recv sync-append-entries-reply from %s:%d {term:" PRIu64 ", pterm:" PRIu64 ", success:%d, match:" PRId64
"}, drop stale response",
host, port, pMsg->term, pMsg->privateTerm, pMsg->success, pMsg->matchIndex);
syncNodeEventLog(ths, logBuf);
} while (0);
return 0; return 0;
} }
...@@ -406,23 +307,15 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries ...@@ -406,23 +307,15 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries
// } // }
if (pMsg->term > ths->pRaftStore->currentTerm) { if (pMsg->term > ths->pRaftStore->currentTerm) {
do { syncLogRecvAppendEntriesReply(ths, pMsg, "error term");
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
char logBuf[256];
snprintf(logBuf, sizeof(logBuf),
"recv sync-append-entries-reply from %s:%d {term:" PRIu64 ", pterm:" PRIu64 ", success:%d, match:" PRId64
"}, error term",
host, port, pMsg->term, pMsg->privateTerm, pMsg->success, pMsg->matchIndex);
syncNodeErrorLog(ths, logBuf);
} while (0);
return -1; return -1;
} }
ASSERT(pMsg->term == ths->pRaftStore->currentTerm); ASSERT(pMsg->term == ths->pRaftStore->currentTerm);
SyncIndex beforeNextIndex = syncIndexMgrGetIndex(ths->pNextIndex, &(pMsg->srcId));
SyncIndex beforeMatchIndex = syncIndexMgrGetIndex(ths->pMatchIndex, &(pMsg->srcId));
if (pMsg->success) { if (pMsg->success) {
// nextIndex' = [nextIndex EXCEPT ![i][j] = m.mmatchIndex + 1] // nextIndex' = [nextIndex EXCEPT ![i][j] = m.mmatchIndex + 1]
syncIndexMgrSetIndex(ths->pNextIndex, &(pMsg->srcId), pMsg->matchIndex + 1); syncIndexMgrSetIndex(ths->pNextIndex, &(pMsg->srcId), pMsg->matchIndex + 1);
...@@ -490,20 +383,13 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries ...@@ -490,20 +383,13 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries
} }
} }
SyncIndex afterNextIndex = syncIndexMgrGetIndex(ths->pNextIndex, &(pMsg->srcId));
SyncIndex afterMatchIndex = syncIndexMgrGetIndex(ths->pMatchIndex, &(pMsg->srcId));
do { do {
char host[64]; char logBuf[256];
uint16_t port; snprintf(logBuf, sizeof(logBuf), "before next:%ld, match:%ld, after next:%ld, match:%ld", beforeNextIndex,
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); beforeMatchIndex, afterNextIndex, afterMatchIndex);
char logBuf[256]; syncLogRecvAppendEntriesReply(ths, pMsg, logBuf);
SyncIndex nextIndex = syncIndexMgrGetIndex(ths->pNextIndex, &(pMsg->srcId));
SyncIndex matchIndex = syncIndexMgrGetIndex(ths->pMatchIndex, &(pMsg->srcId));
snprintf(logBuf, sizeof(logBuf),
"recv sync-append-entries-reply from %s:%d {term:" PRIu64 ", pterm:" PRIu64 ", success:%d, match:" PRId64
"}, after next:" PRId64
", "
"match:" PRId64 "",
host, port, pMsg->term, pMsg->privateTerm, pMsg->success, pMsg->matchIndex, nextIndex, matchIndex);
syncNodeEventLog(ths, logBuf);
} while (0); } while (0);
return 0; return 0;
......
...@@ -120,18 +120,7 @@ int32_t syncNodeElect(SSyncNode* pSyncNode) { ...@@ -120,18 +120,7 @@ int32_t syncNodeElect(SSyncNode* pSyncNode) {
int32_t syncNodeRequestVote(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncRequestVote* pMsg) { int32_t syncNodeRequestVote(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncRequestVote* pMsg) {
int32_t ret = 0; int32_t ret = 0;
syncLogSendRequestVote(pSyncNode, pMsg, "");
do {
char host[64];
uint16_t port;
syncUtilU642Addr(destRaftId->addr, host, sizeof(host), &port);
char logBuf[256];
snprintf(logBuf, sizeof(logBuf),
"send sync-request-vote to %s:%d {term:%" PRIu64 ", lindex:%" PRId64 ", lterm:%" PRIu64 "", host, port,
pMsg->term, pMsg->lastLogIndex, pMsg->lastLogTerm);
syncNodeEventLog(pSyncNode, logBuf);
} while (0);
SRpcMsg rpcMsg; SRpcMsg rpcMsg;
syncRequestVote2RpcMsg(pMsg, &rpcMsg); syncRequestVote2RpcMsg(pMsg, &rpcMsg);
......
...@@ -2925,4 +2925,126 @@ bool syncNodeCanChange(SSyncNode* pSyncNode) { ...@@ -2925,4 +2925,126 @@ bool syncNodeCanChange(SSyncNode* pSyncNode) {
} }
return true; return true;
} }
\ No newline at end of file
void syncLogSendRequestVote(SSyncNode* pSyncNode, const SyncRequestVote* pMsg, const char* s) {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
char logBuf[256];
snprintf(logBuf, sizeof(logBuf),
"send sync-request-vote to %s:%d {term:%" PRIu64 ", lindex:%" PRId64 ", lterm:%" PRIu64 "}, %s", host, port,
pMsg->term, pMsg->lastLogIndex, pMsg->lastLogTerm, s);
syncNodeEventLog(pSyncNode, logBuf);
}
void syncLogRecvRequestVote(SSyncNode* pSyncNode, const SyncRequestVote* pMsg, const char* s) {
char logBuf[256];
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
snprintf(logBuf, sizeof(logBuf),
"recv sync-request-vote from %s:%d, {term:%" PRIu64 ", lindex:%" PRId64 ", lterm:%" PRIu64 "}, %s", host,
port, pMsg->term, pMsg->lastLogIndex, pMsg->lastLogTerm, s);
syncNodeEventLog(pSyncNode, logBuf);
}
void syncLogSendRequestVoteReply(SSyncNode* pSyncNode, const SyncRequestVoteReply* pMsg, const char* s) {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
char logBuf[256];
snprintf(logBuf, sizeof(logBuf), "send sync-request-vote-reply to %s:%d {term:%" PRIu64 ", grant:%d}, %s", host, port,
pMsg->term, pMsg->voteGranted, s);
syncNodeEventLog(pSyncNode, logBuf);
}
void syncLogRecvRequestVoteReply(SSyncNode* pSyncNode, const SyncRequestVoteReply* pMsg, const char* s) {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
char logBuf[256];
snprintf(logBuf, sizeof(logBuf), "recv sync-request-vote-reply from %s:%d {term:%" PRIu64 ", grant:%d}, %s", host,
port, pMsg->term, pMsg->voteGranted, s);
syncNodeEventLog(pSyncNode, logBuf);
}
void syncLogSendAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMsg, const char* s) {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
char logBuf[256];
snprintf(logBuf, sizeof(logBuf),
"send sync-append-entries to %s:%d, {term:%" PRIu64 ", pre-index:%" PRId64 ", pre-term:%" PRIu64
", pterm:%" PRIu64 ", commit:%" PRId64
", "
"datalen:%d}, %s",
host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->privateTerm, pMsg->commitIndex,
pMsg->dataLen, s);
syncNodeEventLog(pSyncNode, logBuf);
}
void syncLogRecvAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMsg, const char* s) {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
char logBuf[256];
snprintf(logBuf, sizeof(logBuf),
"recv sync-append-entries from %s:%d {term:" PRIu64 ", pre-index:" PRId64 ", pre-term:" PRIu64
", commit:" PRId64 ", pterm:" PRIu64
", "
"datalen:%d}, %s",
host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->commitIndex, pMsg->privateTerm,
pMsg->dataLen, s);
syncNodeErrorLog(pSyncNode, logBuf);
}
void syncLogSendAppendEntriesBatch(SSyncNode* pSyncNode, const SyncAppendEntriesBatch* pMsg, const char* s) {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
char logBuf[256];
snprintf(logBuf, sizeof(logBuf),
"send sync-append-entries-batch to %s:%d, {term:%" PRIu64 ", pre-index:%" PRId64 ", pre-term:%" PRIu64
", pterm:%" PRIu64 ", commit:%" PRId64 ", datalen:%d, count:%d}, %s",
pSyncNode->vgId, host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->privateTerm,
pMsg->commitIndex, pMsg->dataLen, pMsg->dataCount, s);
syncNodeEventLog(pSyncNode, logBuf);
}
void syncLogRecvAppendEntriesBatch(SSyncNode* pSyncNode, const SyncAppendEntriesBatch* pMsg, const char* s) {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
char logBuf[256];
snprintf(logBuf, sizeof(logBuf),
"recv sync-append-entries-batch from %s:%d, {term:%" PRIu64 ", pre-index:%" PRId64 ", pre-term:%" PRIu64
", pterm:%" PRIu64 ", commit:%" PRId64 ", datalen:%d, count:%d}, %s",
pSyncNode->vgId, host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->privateTerm,
pMsg->commitIndex, pMsg->dataLen, pMsg->dataCount, s);
syncNodeErrorLog(pSyncNode, logBuf);
}
void syncLogSendAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s) {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
char logBuf[256];
snprintf(logBuf, sizeof(logBuf),
"send sync-append-entries-reply to %s:%d, {term:%" PRIu64 ", pterm:%" PRIu64 ", success:%d, match:%" PRId64
"}, %s",
host, port, pMsg->term, pMsg->privateTerm, pMsg->success, pMsg->matchIndex, s);
syncNodeEventLog(pSyncNode, logBuf);
}
void syncLogRecvAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s) {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
char logBuf[256];
snprintf(logBuf, sizeof(logBuf),
"recv sync-append-entries-reply from %s:%d {term:" PRIu64 ", pterm:" PRIu64 ", success:%d, match:" PRId64
"}, %s",
host, port, pMsg->term, pMsg->privateTerm, pMsg->success, pMsg->matchIndex, s);
syncNodeErrorLog(pSyncNode, logBuf);
}
...@@ -313,21 +313,7 @@ int32_t syncNodeReplicate(SSyncNode* pSyncNode) { ...@@ -313,21 +313,7 @@ int32_t syncNodeReplicate(SSyncNode* pSyncNode) {
int32_t syncNodeAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncAppendEntries* pMsg) { int32_t syncNodeAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncAppendEntries* pMsg) {
int32_t ret = 0; int32_t ret = 0;
syncLogSendAppendEntries(pSyncNode, pMsg, "");
do {
char host[64];
uint16_t port;
syncUtilU642Addr(destRaftId->addr, host, sizeof(host), &port);
char logBuf[256];
snprintf(logBuf, sizeof(logBuf),
"send sync-append-entries to %s:%d, {term:%" PRIu64 ", pre-index:%" PRId64 ", pre-term:%" PRIu64
", pterm:%" PRIu64 ", commit:%" PRId64
", "
"datalen:%d}",
host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->privateTerm, pMsg->commitIndex,
pMsg->dataLen);
syncNodeEventLog(pSyncNode, logBuf);
} while (0);
SRpcMsg rpcMsg; SRpcMsg rpcMsg;
syncAppendEntries2RpcMsg(pMsg, &rpcMsg); syncAppendEntries2RpcMsg(pMsg, &rpcMsg);
...@@ -337,18 +323,7 @@ int32_t syncNodeAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, c ...@@ -337,18 +323,7 @@ int32_t syncNodeAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, c
int32_t syncNodeAppendEntriesBatch(SSyncNode* pSyncNode, const SRaftId* destRaftId, int32_t syncNodeAppendEntriesBatch(SSyncNode* pSyncNode, const SRaftId* destRaftId,
const SyncAppendEntriesBatch* pMsg) { const SyncAppendEntriesBatch* pMsg) {
do { syncLogSendAppendEntriesBatch(pSyncNode, pMsg, "");
char host[64];
uint16_t port;
syncUtilU642Addr(destRaftId->addr, host, sizeof(host), &port);
char logBuf[256];
snprintf(logBuf, sizeof(logBuf),
"send sync-append-entries-batch to %s:%d, {term:%" PRIu64 ", pre-index:%" PRId64 ", pre-term:%" PRIu64
", pterm:%" PRIu64 ", commit:%" PRId64 ", datalen:%d, datacount:%d}",
pSyncNode->vgId, host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->privateTerm,
pMsg->commitIndex, pMsg->dataLen, pMsg->dataCount);
syncNodeEventLog(pSyncNode, logBuf);
} while (0);
SRpcMsg rpcMsg; SRpcMsg rpcMsg;
syncAppendEntriesBatch2RpcMsg(pMsg, &rpcMsg); syncAppendEntriesBatch2RpcMsg(pMsg, &rpcMsg);
......
...@@ -47,18 +47,7 @@ int32_t syncNodeOnRequestVoteCb(SSyncNode* ths, SyncRequestVote* pMsg) { ...@@ -47,18 +47,7 @@ int32_t syncNodeOnRequestVoteCb(SSyncNode* ths, SyncRequestVote* pMsg) {
// if already drop replica, do not process // if already drop replica, do not process
if (!syncNodeInRaftGroup(ths, &(pMsg->srcId)) && !ths->pRaftCfg->isStandBy) { if (!syncNodeInRaftGroup(ths, &(pMsg->srcId)) && !ths->pRaftCfg->isStandBy) {
do { syncLogRecvRequestVote(ths, pMsg, "maybe replica already dropped");
char logBuf[256];
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
snprintf(logBuf, sizeof(logBuf),
"recv sync-request-vote from %s:%d, {term:%" PRIu64 ", lindex:%" PRId64 ", lterm:%" PRIu64
"}, maybe replica already dropped",
host, port, pMsg->term, pMsg->lastLogIndex, pMsg->lastLogTerm);
syncNodeEventLog(ths, logBuf);
} while (0);
return -1; return -1;
} }
...@@ -91,15 +80,10 @@ int32_t syncNodeOnRequestVoteCb(SSyncNode* ths, SyncRequestVote* pMsg) { ...@@ -91,15 +80,10 @@ int32_t syncNodeOnRequestVoteCb(SSyncNode* ths, SyncRequestVote* pMsg) {
// trace log // trace log
do { do {
char logBuf[256]; char logBuf[32];
char host[64]; snprintf(logBuf, sizeof(logBuf), "grant:%d", pReply->voteGranted);
uint16_t port; syncLogRecvRequestVote(ths, pMsg, logBuf);
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); syncLogSendRequestVoteReply(ths, pReply, "");
snprintf(logBuf, sizeof(logBuf),
"recv sync-request-vote from %s:%d, {term:%" PRIu64 ", lindex:%" PRId64 ", lterm:%" PRIu64
"}, reply-grant:%d",
host, port, pMsg->term, pMsg->lastLogIndex, pMsg->lastLogTerm, pReply->voteGranted);
syncNodeEventLog(ths, logBuf);
} while (0); } while (0);
SRpcMsg rpcMsg; SRpcMsg rpcMsg;
...@@ -212,18 +196,7 @@ int32_t syncNodeOnRequestVoteSnapshotCb(SSyncNode* ths, SyncRequestVote* pMsg) { ...@@ -212,18 +196,7 @@ int32_t syncNodeOnRequestVoteSnapshotCb(SSyncNode* ths, SyncRequestVote* pMsg) {
// if already drop replica, do not process // if already drop replica, do not process
if (!syncNodeInRaftGroup(ths, &(pMsg->srcId)) && !ths->pRaftCfg->isStandBy) { if (!syncNodeInRaftGroup(ths, &(pMsg->srcId)) && !ths->pRaftCfg->isStandBy) {
do { syncLogRecvRequestVote(ths, pMsg, "maybe replica already dropped");
char logBuf[256];
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
snprintf(logBuf, sizeof(logBuf),
"recv sync-request-vote from %s:%d, {term:%" PRIu64 ", lindex:%" PRId64 ", lterm:%" PRIu64
"}, maybe replica already dropped",
host, port, pMsg->term, pMsg->lastLogIndex, pMsg->lastLogTerm);
syncNodeEventLog(ths, logBuf);
} while (0);
return -1; return -1;
} }
...@@ -254,15 +227,10 @@ int32_t syncNodeOnRequestVoteSnapshotCb(SSyncNode* ths, SyncRequestVote* pMsg) { ...@@ -254,15 +227,10 @@ int32_t syncNodeOnRequestVoteSnapshotCb(SSyncNode* ths, SyncRequestVote* pMsg) {
// trace log // trace log
do { do {
char logBuf[256]; char logBuf[32];
char host[64]; snprintf(logBuf, sizeof(logBuf), "grant:%d", pReply->voteGranted);
uint16_t port; syncLogRecvRequestVote(ths, pMsg, logBuf);
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); syncLogSendRequestVoteReply(ths, pReply, "");
snprintf(logBuf, sizeof(logBuf),
"recv sync-request-vote from %s:%d, {term:%" PRIu64 ", lindex:%" PRId64 ", lterm:%" PRIu64
"}, reply-grant:%d",
host, port, pMsg->term, pMsg->lastLogIndex, pMsg->lastLogTerm, pReply->voteGranted);
syncNodeEventLog(ths, logBuf);
} while (0); } while (0);
SRpcMsg rpcMsg; SRpcMsg rpcMsg;
......
...@@ -40,40 +40,15 @@ ...@@ -40,40 +40,15 @@
int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg) { int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg) {
int32_t ret = 0; int32_t ret = 0;
// trace log
do {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
char logBuf[256];
snprintf(logBuf, sizeof(logBuf), "recv sync-request-vote-reply from %s:%d {term:%" PRIu64 ", grant:%d} ", host,
port, pMsg->term, pMsg->voteGranted);
syncNodeEventLog(ths, logBuf);
} while (0);
// if already drop replica, do not process // if already drop replica, do not process
if (!syncNodeInRaftGroup(ths, &(pMsg->srcId)) && !ths->pRaftCfg->isStandBy) { if (!syncNodeInRaftGroup(ths, &(pMsg->srcId)) && !ths->pRaftCfg->isStandBy) {
char host[64]; syncLogRecvRequestVoteReply(ths, pMsg, "maybe replica already dropped");
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
char logBuf[256];
snprintf(logBuf, sizeof(logBuf),
"recv sync-request-vote-reply from %s:%d {term:%" PRIu64 ", grant:%d}, maybe replica dropped", host, port,
pMsg->term, pMsg->voteGranted);
syncNodeErrorLog(ths, logBuf);
return -1; return -1;
} }
// drop stale response // drop stale response
if (pMsg->term < ths->pRaftStore->currentTerm) { if (pMsg->term < ths->pRaftStore->currentTerm) {
char host[64]; syncLogRecvRequestVoteReply(ths, pMsg, "drop stale response");
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
char logBuf[256];
snprintf(logBuf, sizeof(logBuf),
"recv sync-request-vote-reply from %s:%d {term:%" PRIu64 ", grant:%d}, drop stale response", host, port,
pMsg->term, pMsg->voteGranted);
syncNodeErrorLog(ths, logBuf);
return -1; return -1;
} }
...@@ -84,16 +59,11 @@ int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg) ...@@ -84,16 +59,11 @@ int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg)
// } // }
if (pMsg->term > ths->pRaftStore->currentTerm) { if (pMsg->term > ths->pRaftStore->currentTerm) {
char host[64]; syncLogRecvRequestVoteReply(ths, pMsg, "error term");
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
char logBuf[256];
snprintf(logBuf, sizeof(logBuf), "recv sync-request-vote-reply from %s:%d {term:%" PRIu64 ", grant:%d}, error term",
host, port, pMsg->term, pMsg->voteGranted);
syncNodeErrorLog(ths, logBuf);
return -1; return -1;
} }
syncLogRecvRequestVoteReply(ths, pMsg, "");
ASSERT(pMsg->term == ths->pRaftStore->currentTerm); ASSERT(pMsg->term == ths->pRaftStore->currentTerm);
// This tallies votes even when the current state is not Candidate, // This tallies votes even when the current state is not Candidate,
...@@ -185,40 +155,15 @@ int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg) ...@@ -185,40 +155,15 @@ int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg)
int32_t syncNodeOnRequestVoteReplySnapshotCb(SSyncNode* ths, SyncRequestVoteReply* pMsg) { int32_t syncNodeOnRequestVoteReplySnapshotCb(SSyncNode* ths, SyncRequestVoteReply* pMsg) {
int32_t ret = 0; int32_t ret = 0;
// trace log
do {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
char logBuf[256];
snprintf(logBuf, sizeof(logBuf), "recv sync-request-vote-reply from %s:%d {term:%" PRIu64 ", grant:%d} ", host,
port, pMsg->term, pMsg->voteGranted);
syncNodeEventLog(ths, logBuf);
} while (0);
// if already drop replica, do not process // if already drop replica, do not process
if (!syncNodeInRaftGroup(ths, &(pMsg->srcId)) && !ths->pRaftCfg->isStandBy) { if (!syncNodeInRaftGroup(ths, &(pMsg->srcId)) && !ths->pRaftCfg->isStandBy) {
char host[64]; syncLogRecvRequestVoteReply(ths, pMsg, "maybe replica already dropped");
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
char logBuf[256];
snprintf(logBuf, sizeof(logBuf),
"recv sync-request-vote-reply from %s:%d {term:%" PRIu64 ", grant:%d}, maybe replica dropped", host, port,
pMsg->term, pMsg->voteGranted);
syncNodeErrorLog(ths, logBuf);
return -1; return -1;
} }
// drop stale response // drop stale response
if (pMsg->term < ths->pRaftStore->currentTerm) { if (pMsg->term < ths->pRaftStore->currentTerm) {
char host[64]; syncLogRecvRequestVoteReply(ths, pMsg, "drop stale response");
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
char logBuf[256];
snprintf(logBuf, sizeof(logBuf),
"recv sync-request-vote-reply from %s:%d {term:%" PRIu64 ", grant:%d}, drop stale response", host, port,
pMsg->term, pMsg->voteGranted);
syncNodeErrorLog(ths, logBuf);
return -1; return -1;
} }
...@@ -229,16 +174,11 @@ int32_t syncNodeOnRequestVoteReplySnapshotCb(SSyncNode* ths, SyncRequestVoteRepl ...@@ -229,16 +174,11 @@ int32_t syncNodeOnRequestVoteReplySnapshotCb(SSyncNode* ths, SyncRequestVoteRepl
// } // }
if (pMsg->term > ths->pRaftStore->currentTerm) { if (pMsg->term > ths->pRaftStore->currentTerm) {
char host[64]; syncLogRecvRequestVoteReply(ths, pMsg, "error term");
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
char logBuf[256];
snprintf(logBuf, sizeof(logBuf), "recv sync-request-vote-reply from %s:%d {term:%" PRIu64 ", grant:%d}, error term",
host, port, pMsg->term, pMsg->voteGranted);
syncNodeErrorLog(ths, logBuf);
return -1; return -1;
} }
syncLogRecvRequestVoteReply(ths, pMsg, "");
ASSERT(pMsg->term == ths->pRaftStore->currentTerm); ASSERT(pMsg->term == ths->pRaftStore->currentTerm);
// This tallies votes even when the current state is not Candidate, // This tallies votes even when the current state is not Candidate,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册