From 4940080a1c7f698723ece7ada3973ed96da904ba Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Sun, 13 Nov 2022 18:00:11 +0800 Subject: [PATCH] refact: adjust sync log prints --- source/libs/sync/inc/syncInt.h | 38 +--- source/libs/sync/inc/syncUtil.h | 33 +++ source/libs/sync/src/syncAppendEntries.c | 12 -- source/libs/sync/src/syncAppendEntriesReply.c | 3 +- source/libs/sync/src/syncElection.c | 8 - source/libs/sync/src/syncMain.c | 146 ------------- source/libs/sync/src/syncRaftLog.c | 1 + source/libs/sync/src/syncReplication.c | 11 - source/libs/sync/src/syncRequestVote.c | 17 -- source/libs/sync/src/syncRequestVoteReply.c | 8 - source/libs/sync/src/syncRespMgr.c | 1 + source/libs/sync/src/syncTimeout.c | 3 +- source/libs/sync/src/syncUtil.c | 203 ++++++++++++++++++ 13 files changed, 243 insertions(+), 241 deletions(-) diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index 940cc4f391..304d1bb695 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -258,12 +258,10 @@ int32_t syncNodeOnLocalCmd(SSyncNode* ths, const SRpcMsg* pMsg); // timer control -------------- int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode); int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode); - int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode, int32_t ms); int32_t syncNodeStopElectTimer(SSyncNode* pSyncNode); int32_t syncNodeRestartElectTimer(SSyncNode* pSyncNode, int32_t ms); int32_t syncNodeResetElectTimer(SSyncNode* pSyncNode); - int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode); int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode); int32_t syncNodeRestartHeartbeatTimer(SSyncNode* pSyncNode); @@ -282,7 +280,6 @@ void syncNodeUpdateTermWithoutStepDown(SSyncNode* pSyncNode, SyncTerm term); void syncNodeStepDown(SSyncNode* pSyncNode, SyncTerm newTerm); void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr); void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr); - void syncNodeCandidate2Leader(SSyncNode* pSyncNode); void syncNodeFollower2Candidate(SSyncNode* pSyncNode); void syncNodeLeader2Follower(SSyncNode* pSyncNode); @@ -321,8 +318,7 @@ bool syncNodeNeedSendAppendEntries(SSyncNode* ths, const SRaftId* pDestId, const int32_t syncGetSnapshotMeta(int64_t rid, struct SSnapshotMeta* sMeta); int32_t syncGetSnapshotMetaByIndex(int64_t rid, SyncIndex snapshotIndex, struct SSnapshotMeta* sMeta); -bool syncNodeCanChange(SSyncNode* pSyncNode); - +bool syncNodeCanChange(SSyncNode* pSyncNode); int32_t syncNodeLeaderTransfer(SSyncNode* pSyncNode); int32_t syncNodeLeaderTransferTo(SSyncNode* pSyncNode, SNodeInfo newLeader); int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* pEntry); @@ -332,38 +328,6 @@ int32_t syncNodeDynamicQuorum(const SSyncNode* pSyncNode); bool syncNodeIsMnode(SSyncNode* pSyncNode); int32_t syncNodePeerStateInit(SSyncNode* pSyncNode); -// trace log -void syncLogRecvTimer(SSyncNode* pSyncNode, const SyncTimeout* pMsg, const char* s); -void syncLogRecvLocalCmd(SSyncNode* pSyncNode, const SyncLocalCmd* pMsg, const char* s); - -void syncLogSendAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s); -void syncLogRecvAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s); - -void syncLogSendHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, const char* s); -void syncLogRecvHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, const char* s); - -void syncLogSendHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* pMsg, const char* s); -void syncLogRecvHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* pMsg, const char* s); - -void syncLogSendSyncPreSnapshot(SSyncNode* pSyncNode, const SyncPreSnapshot* pMsg, const char* s); -void syncLogRecvSyncPreSnapshot(SSyncNode* pSyncNode, const SyncPreSnapshot* pMsg, const char* s); - -void syncLogSendSyncPreSnapshotReply(SSyncNode* pSyncNode, const SyncPreSnapshotReply* pMsg, const char* s); -void syncLogRecvSyncPreSnapshotReply(SSyncNode* pSyncNode, const SyncPreSnapshotReply* pMsg, const char* s); - -void syncLogSendSyncSnapshotSend(SSyncNode* pSyncNode, const SyncSnapshotSend* pMsg, const char* s); -void syncLogRecvSyncSnapshotSend(SSyncNode* pSyncNode, const SyncSnapshotSend* pMsg, const char* s); - -void syncLogSendSyncSnapshotRsp(SSyncNode* pSyncNode, const SyncSnapshotRsp* pMsg, const char* s); -void syncLogRecvSyncSnapshotRsp(SSyncNode* pSyncNode, const SyncSnapshotRsp* pMsg, const char* s); - -// syncUtil.h -void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, SSyncNode* pNode, const char* format, ...); -void syncPrintSnapshotSenderLog(const char* flags, ELogLevel level, int32_t dflag, SSyncSnapshotSender* pSender, - const char* format, ...); -void syncPrintSnapshotReceiverLog(const char* flags, ELogLevel level, int32_t dflag, SSyncSnapshotReceiver* pReceiver, - const char* format, ...); - #ifdef __cplusplus } #endif diff --git a/source/libs/sync/inc/syncUtil.h b/source/libs/sync/inc/syncUtil.h index 076101ef43..d920e98ab0 100644 --- a/source/libs/sync/inc/syncUtil.h +++ b/source/libs/sync/inc/syncUtil.h @@ -48,6 +48,39 @@ void syncPrintSnapshotSenderLog(const char* flags, ELogLevel level, int32_t dfla void syncPrintSnapshotReceiverLog(const char* flags, ELogLevel level, int32_t dflag, SSyncSnapshotReceiver* pReceiver, const char* format, ...); +void syncLogRecvTimer(SSyncNode* pSyncNode, const SyncTimeout* pMsg, const char* s); +void syncLogRecvLocalCmd(SSyncNode* pSyncNode, const SyncLocalCmd* pMsg, const char* s); + +void syncLogSendAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s); +void syncLogRecvAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s); + +void syncLogSendHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, const char* s); +void syncLogRecvHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, const char* s); + +void syncLogSendHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* pMsg, const char* s); +void syncLogRecvHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* pMsg, const char* s); + +void syncLogSendSyncPreSnapshot(SSyncNode* pSyncNode, const SyncPreSnapshot* pMsg, const char* s); +void syncLogRecvSyncPreSnapshot(SSyncNode* pSyncNode, const SyncPreSnapshot* pMsg, const char* s); + +void syncLogSendSyncPreSnapshotReply(SSyncNode* pSyncNode, const SyncPreSnapshotReply* pMsg, const char* s); +void syncLogRecvSyncPreSnapshotReply(SSyncNode* pSyncNode, const SyncPreSnapshotReply* pMsg, const char* s); + +void syncLogSendSyncSnapshotSend(SSyncNode* pSyncNode, const SyncSnapshotSend* pMsg, const char* s); +void syncLogRecvSyncSnapshotSend(SSyncNode* pSyncNode, const SyncSnapshotSend* pMsg, const char* s); + +void syncLogSendSyncSnapshotRsp(SSyncNode* pSyncNode, const SyncSnapshotRsp* pMsg, const char* s); +void syncLogRecvSyncSnapshotRsp(SSyncNode* pSyncNode, const SyncSnapshotRsp* pMsg, const char* s); + +void syncLogRecvAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMsg, const char* s); +void syncLogSendAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMsg, const char* s); + +void syncLogRecvRequestVote(SSyncNode* pSyncNode, const SyncRequestVote* pMsg, const char* s); +void syncLogSendRequestVote(SSyncNode* pNode, const SyncRequestVote* pMsg, const char* s); + +void syncLogRecvRequestVoteReply(SSyncNode* pSyncNode, const SyncRequestVoteReply* pMsg, const char* s); +void syncLogSendRequestVoteReply(SSyncNode* pSyncNode, const SyncRequestVoteReply* pMsg, const char* s); + #ifdef __cplusplus } #endif diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index 0b6119457a..4551702b34 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -124,18 +124,6 @@ int32_t syncNodeFollowerCommit(SSyncNode* ths, SyncIndex newCommitIndex) { return 0; } -void syncLogRecvAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMsg, const char* s) { - char host[64]; - uint16_t port; - syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); - - sNTrace(pSyncNode, - "recv sync-append-entries from %s:%d {term:%" PRId64 ", pre-index:%" PRId64 ", pre-term:%" PRId64 - ", cmt:%" PRId64 ", pterm:%" PRId64 ", datalen:%d}, %s", - host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->commitIndex, pMsg->privateTerm, - pMsg->dataLen, s); -} - int32_t syncNodeOnAppendEntries(SSyncNode* ths, const SRpcMsg* pRpcMsg) { SyncAppendEntries* pMsg = pRpcMsg->pCont; SRpcMsg rpcRsp = {0}; diff --git a/source/libs/sync/src/syncAppendEntriesReply.c b/source/libs/sync/src/syncAppendEntriesReply.c index 6b2bf250ad..94c4de3c5e 100644 --- a/source/libs/sync/src/syncAppendEntriesReply.c +++ b/source/libs/sync/src/syncAppendEntriesReply.c @@ -15,12 +15,13 @@ #define _DEFAULT_SOURCE #include "syncAppendEntriesReply.h" -#include "syncMessage.h" #include "syncCommit.h" #include "syncIndexMgr.h" +#include "syncMessage.h" #include "syncRaftStore.h" #include "syncReplication.h" #include "syncSnapshot.h" +#include "syncUtil.h" // TLA+ Spec // HandleAppendEntriesResponse(i, j, m) == diff --git a/source/libs/sync/src/syncElection.c b/source/libs/sync/src/syncElection.c index 9f0839fc02..5c41e43fd4 100644 --- a/source/libs/sync/src/syncElection.c +++ b/source/libs/sync/src/syncElection.c @@ -33,14 +33,6 @@ // mdest |-> j]) // /\ UNCHANGED <> -static void syncLogSendRequestVote(SSyncNode* pNode, const SyncRequestVote* pMsg, const char* s) { - char host[64]; - uint16_t port; - syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port); - sNTrace(pNode, "send sync-request-vote to %s:%d {term:%" PRId64 ", lindex:%" PRId64 ", lterm:%" PRId64 "}, %s", host, - port, pMsg->term, pMsg->lastLogIndex, pMsg->lastLogTerm, s); -} - static int32_t syncNodeRequestVotePeers(SSyncNode* pNode) { if (pNode->state != TAOS_SYNC_STATE_CANDIDATE) { sNTrace(pNode, "not candidate, stop elect"); diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 33813af28a..f4cc8627f3 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -2480,149 +2480,3 @@ bool syncNodeCanChange(SSyncNode* pSyncNode) { return true; } - -void syncLogRecvTimer(SSyncNode* pSyncNode, const SyncTimeout* pMsg, const char* s) { - sNTrace(pSyncNode, "recv sync-timer {type:%s, lc:%" PRId64 ", ms:%d, data:%p}, %s", - syncTimerTypeStr(pMsg->timeoutType), pMsg->logicClock, pMsg->timerMS, pMsg->data, s); -} - -void syncLogSendAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s) { - char host[64]; - uint16_t port; - syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port); - - sNTrace(pSyncNode, - "send sync-append-entries-reply to %s:%d, {term:%" PRId64 ", pterm:%" PRId64 ", success:%d, match:%" PRId64 - "}, %s", - host, port, pMsg->term, pMsg->privateTerm, pMsg->success, pMsg->matchIndex, s); -} - -void syncLogRecvAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s) { - char host[64]; - uint16_t port; - syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); - - sNTrace(pSyncNode, - "recv sync-append-entries-reply from %s:%d {term:%" PRId64 ", pterm:%" PRId64 ", success:%d, match:%" PRId64 - "}, %s", - host, port, pMsg->term, pMsg->privateTerm, pMsg->success, pMsg->matchIndex, s); -} - -void syncLogSendHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, const char* s) { - char host[64]; - uint16_t port; - syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port); - - sNTrace(pSyncNode, - "send sync-heartbeat to %s:%d {term:%" PRId64 ", cmt:%" PRId64 ", min-match:%" PRId64 ", pterm:%" PRId64 - "}, %s", - host, port, pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pMsg->privateTerm, s); -} - -void syncLogRecvHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, const char* s) { - char host[64]; - uint16_t port; - syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); - - sNTrace(pSyncNode, - "recv sync-heartbeat from %s:%d {term:%" PRId64 ", cmt:%" PRId64 ", min-match:%" PRId64 ", pterm:%" PRId64 - "}, %s", - host, port, pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pMsg->privateTerm, s); -} - -void syncLogSendHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* pMsg, const char* s) { - char host[64]; - uint16_t port; - syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port); - - sNTrace(pSyncNode, "send sync-heartbeat-reply from %s:%d {term:%" PRId64 ", pterm:%" PRId64 "}, %s", host, port, - pMsg->term, pMsg->privateTerm, s); -} - -void syncLogRecvHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* pMsg, const char* s) { - char host[64]; - uint16_t port; - syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); - sNTrace(pSyncNode, "recv sync-heartbeat-reply from %s:%d {term:%" PRId64 ", pterm:%" PRId64 "}, %s", host, port, - pMsg->term, pMsg->privateTerm, s); -} - -void syncLogRecvLocalCmd(SSyncNode* pSyncNode, const SyncLocalCmd* pMsg, const char* s) { - sNTrace(pSyncNode, "recv sync-local-cmd {cmd:%d-%s, sd-new-term:%" PRId64 ", fc-index:%" PRId64 "}, %s", pMsg->cmd, - syncLocalCmdGetStr(pMsg->cmd), pMsg->sdNewTerm, pMsg->fcIndex, s); -} - -void syncLogSendSyncPreSnapshot(SSyncNode* pSyncNode, const SyncPreSnapshot* pMsg, const char* s) { - char host[64]; - uint16_t port; - syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port); - sNTrace(pSyncNode, "send sync-pre-snapshot to %s:%d {term:%" PRId64 "}, %s", host, port, pMsg->term, s); -} - -void syncLogRecvSyncPreSnapshot(SSyncNode* pSyncNode, const SyncPreSnapshot* pMsg, const char* s) { - char host[64]; - uint16_t port; - syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); - sNTrace(pSyncNode, "recv sync-pre-snapshot from %s:%d {term:%" PRId64 "}, %s", host, port, pMsg->term, s); -} - -void syncLogSendSyncPreSnapshotReply(SSyncNode* pSyncNode, const SyncPreSnapshotReply* pMsg, const char* s) { - char host[64]; - uint16_t port; - syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port); - sNTrace(pSyncNode, "send sync-pre-snapshot-reply to %s:%d {term:%" PRId64 ", snap-start:%" PRId64 "}, %s", host, port, - pMsg->term, pMsg->snapStart, s); -} - -void syncLogRecvSyncPreSnapshotReply(SSyncNode* pSyncNode, const SyncPreSnapshotReply* pMsg, const char* s) { - char host[64]; - uint16_t port; - syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); - sNTrace(pSyncNode, "recv sync-pre-snapshot-reply from %s:%d {term:%" PRId64 ", snap-start:%" PRId64 "}, %s", host, - port, pMsg->term, pMsg->snapStart, s); -} - -void syncLogSendSyncSnapshotSend(SSyncNode* pSyncNode, const SyncSnapshotSend* pMsg, const char* s) { - char host[64]; - uint16_t port; - syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port); - - sNTrace(pSyncNode, - "send sync-snapshot-send from %s:%d {term:%" PRId64 ", begin:%" PRId64 ", end:%" PRId64 ", lterm:%" PRId64 - ", stime:%" PRId64 ", seq:%d}, %s", - host, port, pMsg->term, pMsg->beginIndex, pMsg->lastIndex, pMsg->lastTerm, pMsg->startTime, pMsg->seq, s); -} - -void syncLogRecvSyncSnapshotSend(SSyncNode* pSyncNode, const SyncSnapshotSend* pMsg, const char* s) { - char host[64]; - uint16_t port; - syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); - - sNTrace(pSyncNode, - "recv sync-snapshot-send from %s:%d {term:%" PRId64 ", begin:%" PRId64 ", lst:%" PRId64 ", lterm:%" PRId64 - ", stime:%" PRId64 ", seq:%d, len:%u}, %s", - host, port, pMsg->term, pMsg->beginIndex, pMsg->lastIndex, pMsg->lastTerm, pMsg->startTime, pMsg->seq, - pMsg->dataLen, s); -} - -void syncLogSendSyncSnapshotRsp(SSyncNode* pSyncNode, const SyncSnapshotRsp* pMsg, const char* s) { - char host[64]; - uint16_t port; - syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port); - - sNTrace(pSyncNode, - "send sync-snapshot-rsp from %s:%d {term:%" PRId64 ", begin:%" PRId64 ", lst:%" PRId64 ", lterm:%" PRId64 - ", stime:%" PRId64 ", ack:%d}, %s", - host, port, pMsg->term, pMsg->snapBeginIndex, pMsg->lastIndex, pMsg->lastTerm, pMsg->startTime, pMsg->ack, s); -} - -void syncLogRecvSyncSnapshotRsp(SSyncNode* pSyncNode, const SyncSnapshotRsp* pMsg, const char* s) { - char host[64]; - uint16_t port; - syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); - - sNTrace(pSyncNode, - "recv sync-snapshot-rsp from %s:%d {term:%" PRId64 ", begin:%" PRId64 ", lst:%" PRId64 ", lterm:%" PRId64 - ", stime:%" PRId64 ", ack:%d}, %s", - host, port, pMsg->term, pMsg->snapBeginIndex, pMsg->lastIndex, pMsg->lastTerm, pMsg->startTime, pMsg->ack, s); -} diff --git a/source/libs/sync/src/syncRaftLog.c b/source/libs/sync/src/syncRaftLog.c index 2cdb18a109..7e4b18ab88 100644 --- a/source/libs/sync/src/syncRaftLog.c +++ b/source/libs/sync/src/syncRaftLog.c @@ -17,6 +17,7 @@ #include "syncRaftLog.h" #include "syncRaftCfg.h" #include "syncRaftStore.h" +#include "syncUtil.h" // log[m .. n] diff --git a/source/libs/sync/src/syncReplication.c b/source/libs/sync/src/syncReplication.c index 0f2057210c..f09bac2139 100644 --- a/source/libs/sync/src/syncReplication.c +++ b/source/libs/sync/src/syncReplication.c @@ -137,17 +137,6 @@ int32_t syncNodeReplicate(SSyncNode* pSyncNode) { return 0; } -static void syncLogSendAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMsg, const char* s) { - char host[64]; - uint16_t port; - syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port); - sNTrace(pSyncNode, - "send sync-append-entries to %s:%d, {term:%" PRId64 ", pre-index:%" PRId64 ", pre-term:%" PRId64 - ", pterm:%" PRId64 ", cmt:%" PRId64 ", datalen:%d}, %s", - host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->privateTerm, pMsg->commitIndex, - pMsg->dataLen, s); -} - int32_t syncNodeSendAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, SRpcMsg* pRpcMsg) { int32_t ret = 0; SyncAppendEntries* pMsg = pRpcMsg->pCont; diff --git a/source/libs/sync/src/syncRequestVote.c b/source/libs/sync/src/syncRequestVote.c index e2cce28671..8ffc22ee25 100644 --- a/source/libs/sync/src/syncRequestVote.c +++ b/source/libs/sync/src/syncRequestVote.c @@ -88,23 +88,6 @@ static bool syncNodeOnRequestVoteLogOK(SSyncNode* pSyncNode, SyncRequestVote* pM return false; } -static void syncLogRecvRequestVote(SSyncNode* pSyncNode, const SyncRequestVote* pMsg, const char* s) { - char logBuf[256]; - char host[64]; - uint16_t port; - syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); - sNTrace(pSyncNode, "recv sync-request-vote from %s:%d, {term:%" PRId64 ", lindex:%" PRId64 ", lterm:%" PRId64 "}, %s", - host, port, pMsg->term, pMsg->lastLogIndex, pMsg->lastLogTerm, s); -} - -static void syncLogSendRequestVoteReply(SSyncNode* pSyncNode, const SyncRequestVoteReply* pMsg, const char* s) { - char host[64]; - uint16_t port; - syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port); - sNTrace(pSyncNode, "send sync-request-vote-reply to %s:%d {term:%" PRId64 ", grant:%d}, %s", host, port, pMsg->term, - pMsg->voteGranted, s); -} - int32_t syncNodeOnRequestVote(SSyncNode* ths, const SRpcMsg* pRpcMsg) { int32_t ret = 0; SyncRequestVote* pMsg = pRpcMsg->pCont; diff --git a/source/libs/sync/src/syncRequestVoteReply.c b/source/libs/sync/src/syncRequestVoteReply.c index a9d363630e..563f475070 100644 --- a/source/libs/sync/src/syncRequestVoteReply.c +++ b/source/libs/sync/src/syncRequestVoteReply.c @@ -38,14 +38,6 @@ // /\ UNCHANGED <> // -static void syncLogRecvRequestVoteReply(SSyncNode* pSyncNode, const SyncRequestVoteReply* pMsg, const char* s) { - char host[64]; - uint16_t port; - syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); - sNTrace(pSyncNode, "recv sync-request-vote-reply from %s:%d {term:%" PRId64 ", grant:%d}, %s", host, port, pMsg->term, - pMsg->voteGranted, s); -} - int32_t syncNodeOnRequestVoteReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) { int32_t ret = 0; SyncRequestVoteReply* pMsg = pRpcMsg->pCont; diff --git a/source/libs/sync/src/syncRespMgr.c b/source/libs/sync/src/syncRespMgr.c index de8f1927ae..049b02d73e 100644 --- a/source/libs/sync/src/syncRespMgr.c +++ b/source/libs/sync/src/syncRespMgr.c @@ -17,6 +17,7 @@ #include "syncRespMgr.h" #include "syncRaftEntry.h" #include "syncRaftStore.h" +#include "syncUtil.h" SSyncRespMgr *syncRespMgrCreate(void *data, int64_t ttl) { SSyncRespMgr *pObj = taosMemoryCalloc(1, sizeof(SSyncRespMgr)); diff --git a/source/libs/sync/src/syncTimeout.c b/source/libs/sync/src/syncTimeout.c index 7793ca4104..3d4583aadb 100644 --- a/source/libs/sync/src/syncTimeout.c +++ b/source/libs/sync/src/syncTimeout.c @@ -19,12 +19,13 @@ #include "syncRaftCfg.h" #include "syncRaftLog.h" #include "syncReplication.h" +#include "syncUtil.h" static void syncNodeCleanConfigIndex(SSyncNode* ths) { int32_t newArrIndex = 0; SyncIndex newConfigIndexArr[MAX_CONFIG_INDEX_COUNT] = {0}; SSnapshot snapshot = {0}; - + ths->pFsm->FpGetSnapshotInfo(ths->pFsm, &snapshot); if (snapshot.lastApplyIndex != SYNC_INDEX_INVALID) { for (int32_t i = 0; i < ths->pRaftCfg->configIndexCount; ++i) { diff --git a/source/libs/sync/src/syncUtil.c b/source/libs/sync/src/syncUtil.c index 894aa05aad..a575de7e56 100644 --- a/source/libs/sync/src/syncUtil.c +++ b/source/libs/sync/src/syncUtil.c @@ -15,6 +15,7 @@ #define _DEFAULT_SOURCE #include "syncUtil.h" +#include "syncMessage.h" #include "syncRaftCfg.h" #include "syncRaftStore.h" #include "syncSnapshot.h" @@ -345,3 +346,205 @@ void syncPrintSnapshotReceiverLog(const char* flags, ELogLevel level, int32_t df pNode->replicaNum, pNode->pRaftCfg->lastConfigIndex, pNode->changing, pNode->restoreFinish, quorum, pNode->electTimerLogicClock, pNode->heartbeatTimerLogicClockUser, peerStr, cfgStr); } + +void syncLogRecvTimer(SSyncNode* pSyncNode, const SyncTimeout* pMsg, const char* s) { + sNTrace(pSyncNode, "recv sync-timer {type:%s, lc:%" PRId64 ", ms:%d, data:%p}, %s", + syncTimerTypeStr(pMsg->timeoutType), pMsg->logicClock, pMsg->timerMS, pMsg->data, s); +} + +void syncLogRecvLocalCmd(SSyncNode* pSyncNode, const SyncLocalCmd* pMsg, const char* s) { + sNTrace(pSyncNode, "recv sync-local-cmd {cmd:%d-%s, sd-new-term:%" PRId64 ", fc-index:%" PRId64 "}, %s", pMsg->cmd, + syncLocalCmdGetStr(pMsg->cmd), pMsg->sdNewTerm, pMsg->fcIndex, s); +} + +void syncLogSendAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s) { + char host[64]; + uint16_t port; + syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port); + + sNTrace(pSyncNode, + "send sync-append-entries-reply to %s:%d, {term:%" PRId64 ", pterm:%" PRId64 ", success:%d, match:%" PRId64 + "}, %s", + host, port, pMsg->term, pMsg->privateTerm, pMsg->success, pMsg->matchIndex, s); +} + +void syncLogRecvAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s) { + char host[64]; + uint16_t port; + syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); + + sNTrace(pSyncNode, + "recv sync-append-entries-reply from %s:%d {term:%" PRId64 ", pterm:%" PRId64 ", success:%d, match:%" PRId64 + "}, %s", + host, port, pMsg->term, pMsg->privateTerm, pMsg->success, pMsg->matchIndex, s); +} + +void syncLogSendHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, const char* s) { + char host[64]; + uint16_t port; + syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port); + + sNTrace(pSyncNode, + "send sync-heartbeat to %s:%d {term:%" PRId64 ", cmt:%" PRId64 ", min-match:%" PRId64 ", pterm:%" PRId64 + "}, %s", + host, port, pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pMsg->privateTerm, s); +} + +void syncLogRecvHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, const char* s) { + char host[64]; + uint16_t port; + syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); + + sNTrace(pSyncNode, + "recv sync-heartbeat from %s:%d {term:%" PRId64 ", cmt:%" PRId64 ", min-match:%" PRId64 ", pterm:%" PRId64 + "}, %s", + host, port, pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pMsg->privateTerm, s); +} + +void syncLogSendHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* pMsg, const char* s) { + char host[64]; + uint16_t port; + syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port); + + sNTrace(pSyncNode, "send sync-heartbeat-reply from %s:%d {term:%" PRId64 ", pterm:%" PRId64 "}, %s", host, port, + pMsg->term, pMsg->privateTerm, s); +} + +void syncLogRecvHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* pMsg, const char* s) { + char host[64]; + uint16_t port; + syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); + sNTrace(pSyncNode, "recv sync-heartbeat-reply from %s:%d {term:%" PRId64 ", pterm:%" PRId64 "}, %s", host, port, + pMsg->term, pMsg->privateTerm, s); +} + +void syncLogSendSyncPreSnapshot(SSyncNode* pSyncNode, const SyncPreSnapshot* pMsg, const char* s) { + char host[64]; + uint16_t port; + syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port); + sNTrace(pSyncNode, "send sync-pre-snapshot to %s:%d {term:%" PRId64 "}, %s", host, port, pMsg->term, s); +} + +void syncLogRecvSyncPreSnapshot(SSyncNode* pSyncNode, const SyncPreSnapshot* pMsg, const char* s) { + char host[64]; + uint16_t port; + syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); + sNTrace(pSyncNode, "recv sync-pre-snapshot from %s:%d {term:%" PRId64 "}, %s", host, port, pMsg->term, s); +} + +void syncLogSendSyncPreSnapshotReply(SSyncNode* pSyncNode, const SyncPreSnapshotReply* pMsg, const char* s) { + char host[64]; + uint16_t port; + syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port); + sNTrace(pSyncNode, "send sync-pre-snapshot-reply to %s:%d {term:%" PRId64 ", snap-start:%" PRId64 "}, %s", host, port, + pMsg->term, pMsg->snapStart, s); +} + +void syncLogRecvSyncPreSnapshotReply(SSyncNode* pSyncNode, const SyncPreSnapshotReply* pMsg, const char* s) { + char host[64]; + uint16_t port; + syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); + sNTrace(pSyncNode, "recv sync-pre-snapshot-reply from %s:%d {term:%" PRId64 ", snap-start:%" PRId64 "}, %s", host, + port, pMsg->term, pMsg->snapStart, s); +} + +void syncLogSendSyncSnapshotSend(SSyncNode* pSyncNode, const SyncSnapshotSend* pMsg, const char* s) { + char host[64]; + uint16_t port; + syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port); + + sNTrace(pSyncNode, + "send sync-snapshot-send from %s:%d {term:%" PRId64 ", begin:%" PRId64 ", end:%" PRId64 ", lterm:%" PRId64 + ", stime:%" PRId64 ", seq:%d}, %s", + host, port, pMsg->term, pMsg->beginIndex, pMsg->lastIndex, pMsg->lastTerm, pMsg->startTime, pMsg->seq, s); +} + +void syncLogRecvSyncSnapshotSend(SSyncNode* pSyncNode, const SyncSnapshotSend* pMsg, const char* s) { + char host[64]; + uint16_t port; + syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); + + sNTrace(pSyncNode, + "recv sync-snapshot-send from %s:%d {term:%" PRId64 ", begin:%" PRId64 ", lst:%" PRId64 ", lterm:%" PRId64 + ", stime:%" PRId64 ", seq:%d, len:%u}, %s", + host, port, pMsg->term, pMsg->beginIndex, pMsg->lastIndex, pMsg->lastTerm, pMsg->startTime, pMsg->seq, + pMsg->dataLen, s); +} + +void syncLogSendSyncSnapshotRsp(SSyncNode* pSyncNode, const SyncSnapshotRsp* pMsg, const char* s) { + char host[64]; + uint16_t port; + syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port); + + sNTrace(pSyncNode, + "send sync-snapshot-rsp from %s:%d {term:%" PRId64 ", begin:%" PRId64 ", lst:%" PRId64 ", lterm:%" PRId64 + ", stime:%" PRId64 ", ack:%d}, %s", + host, port, pMsg->term, pMsg->snapBeginIndex, pMsg->lastIndex, pMsg->lastTerm, pMsg->startTime, pMsg->ack, s); +} + +void syncLogRecvSyncSnapshotRsp(SSyncNode* pSyncNode, const SyncSnapshotRsp* pMsg, const char* s) { + char host[64]; + uint16_t port; + syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); + + sNTrace(pSyncNode, + "recv sync-snapshot-rsp from %s:%d {term:%" PRId64 ", begin:%" PRId64 ", lst:%" PRId64 ", lterm:%" PRId64 + ", stime:%" PRId64 ", ack:%d}, %s", + host, port, pMsg->term, pMsg->snapBeginIndex, pMsg->lastIndex, pMsg->lastTerm, pMsg->startTime, pMsg->ack, s); +} + +void syncLogRecvAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMsg, const char* s) { + char host[64]; + uint16_t port; + syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); + + sNTrace(pSyncNode, + "recv sync-append-entries from %s:%d {term:%" PRId64 ", pre-index:%" PRId64 ", pre-term:%" PRId64 + ", cmt:%" PRId64 ", pterm:%" PRId64 ", datalen:%d}, %s", + host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->commitIndex, pMsg->privateTerm, + pMsg->dataLen, s); +} + +void syncLogSendAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMsg, const char* s) { + char host[64]; + uint16_t port; + syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port); + sNTrace(pSyncNode, + "send sync-append-entries to %s:%d, {term:%" PRId64 ", pre-index:%" PRId64 ", pre-term:%" PRId64 + ", pterm:%" PRId64 ", cmt:%" PRId64 ", datalen:%d}, %s", + host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->privateTerm, pMsg->commitIndex, + pMsg->dataLen, s); +} + +void syncLogRecvRequestVote(SSyncNode* pSyncNode, const SyncRequestVote* pMsg, const char* s) { + char logBuf[256]; + char host[64]; + uint16_t port; + syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); + sNTrace(pSyncNode, "recv sync-request-vote from %s:%d, {term:%" PRId64 ", lindex:%" PRId64 ", lterm:%" PRId64 "}, %s", + host, port, pMsg->term, pMsg->lastLogIndex, pMsg->lastLogTerm, s); +} + +void syncLogSendRequestVote(SSyncNode* pNode, const SyncRequestVote* pMsg, const char* s) { + char host[64]; + uint16_t port; + syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port); + sNTrace(pNode, "send sync-request-vote to %s:%d {term:%" PRId64 ", lindex:%" PRId64 ", lterm:%" PRId64 "}, %s", host, + port, pMsg->term, pMsg->lastLogIndex, pMsg->lastLogTerm, s); +} + +void syncLogRecvRequestVoteReply(SSyncNode* pSyncNode, const SyncRequestVoteReply* pMsg, const char* s) { + char host[64]; + uint16_t port; + syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); + sNTrace(pSyncNode, "recv sync-request-vote-reply from %s:%d {term:%" PRId64 ", grant:%d}, %s", host, port, pMsg->term, + pMsg->voteGranted, s); +} + +void syncLogSendRequestVoteReply(SSyncNode* pSyncNode, const SyncRequestVoteReply* pMsg, const char* s) { + char host[64]; + uint16_t port; + syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port); + sNTrace(pSyncNode, "send sync-request-vote-reply to %s:%d {term:%" PRId64 ", grant:%d}, %s", host, port, pMsg->term, + pMsg->voteGranted, s); +} \ No newline at end of file -- GitLab