提交 4940080a 编写于 作者: S Shengliang Guan

refact: adjust sync log prints

上级 f1a41f5a
...@@ -258,12 +258,10 @@ int32_t syncNodeOnLocalCmd(SSyncNode* ths, const SRpcMsg* pMsg); ...@@ -258,12 +258,10 @@ int32_t syncNodeOnLocalCmd(SSyncNode* ths, const SRpcMsg* pMsg);
// timer control -------------- // timer control --------------
int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode); int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode);
int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode); int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode);
int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode, int32_t ms); int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode, int32_t ms);
int32_t syncNodeStopElectTimer(SSyncNode* pSyncNode); int32_t syncNodeStopElectTimer(SSyncNode* pSyncNode);
int32_t syncNodeRestartElectTimer(SSyncNode* pSyncNode, int32_t ms); int32_t syncNodeRestartElectTimer(SSyncNode* pSyncNode, int32_t ms);
int32_t syncNodeResetElectTimer(SSyncNode* pSyncNode); int32_t syncNodeResetElectTimer(SSyncNode* pSyncNode);
int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode); int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode);
int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode); int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode);
int32_t syncNodeRestartHeartbeatTimer(SSyncNode* pSyncNode); int32_t syncNodeRestartHeartbeatTimer(SSyncNode* pSyncNode);
...@@ -282,7 +280,6 @@ void syncNodeUpdateTermWithoutStepDown(SSyncNode* pSyncNode, SyncTerm term); ...@@ -282,7 +280,6 @@ void syncNodeUpdateTermWithoutStepDown(SSyncNode* pSyncNode, SyncTerm term);
void syncNodeStepDown(SSyncNode* pSyncNode, SyncTerm newTerm); void syncNodeStepDown(SSyncNode* pSyncNode, SyncTerm newTerm);
void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr); void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr);
void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr); void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr);
void syncNodeCandidate2Leader(SSyncNode* pSyncNode); void syncNodeCandidate2Leader(SSyncNode* pSyncNode);
void syncNodeFollower2Candidate(SSyncNode* pSyncNode); void syncNodeFollower2Candidate(SSyncNode* pSyncNode);
void syncNodeLeader2Follower(SSyncNode* pSyncNode); void syncNodeLeader2Follower(SSyncNode* pSyncNode);
...@@ -322,7 +319,6 @@ int32_t syncGetSnapshotMeta(int64_t rid, struct SSnapshotMeta* sMeta); ...@@ -322,7 +319,6 @@ int32_t syncGetSnapshotMeta(int64_t rid, struct SSnapshotMeta* sMeta);
int32_t syncGetSnapshotMetaByIndex(int64_t rid, SyncIndex snapshotIndex, struct SSnapshotMeta* sMeta); int32_t syncGetSnapshotMetaByIndex(int64_t rid, SyncIndex snapshotIndex, struct SSnapshotMeta* sMeta);
bool syncNodeCanChange(SSyncNode* pSyncNode); bool syncNodeCanChange(SSyncNode* pSyncNode);
int32_t syncNodeLeaderTransfer(SSyncNode* pSyncNode); int32_t syncNodeLeaderTransfer(SSyncNode* pSyncNode);
int32_t syncNodeLeaderTransferTo(SSyncNode* pSyncNode, SNodeInfo newLeader); int32_t syncNodeLeaderTransferTo(SSyncNode* pSyncNode, SNodeInfo newLeader);
int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* pEntry); int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* pEntry);
...@@ -332,38 +328,6 @@ int32_t syncNodeDynamicQuorum(const SSyncNode* pSyncNode); ...@@ -332,38 +328,6 @@ int32_t syncNodeDynamicQuorum(const SSyncNode* pSyncNode);
bool syncNodeIsMnode(SSyncNode* pSyncNode); bool syncNodeIsMnode(SSyncNode* pSyncNode);
int32_t syncNodePeerStateInit(SSyncNode* pSyncNode); int32_t syncNodePeerStateInit(SSyncNode* pSyncNode);
// trace log
void syncLogRecvTimer(SSyncNode* pSyncNode, const SyncTimeout* pMsg, const char* s);
void syncLogRecvLocalCmd(SSyncNode* pSyncNode, const SyncLocalCmd* pMsg, const char* s);
void syncLogSendAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s);
void syncLogRecvAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s);
void syncLogSendHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, const char* s);
void syncLogRecvHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, const char* s);
void syncLogSendHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* pMsg, const char* s);
void syncLogRecvHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* pMsg, const char* s);
void syncLogSendSyncPreSnapshot(SSyncNode* pSyncNode, const SyncPreSnapshot* pMsg, const char* s);
void syncLogRecvSyncPreSnapshot(SSyncNode* pSyncNode, const SyncPreSnapshot* pMsg, const char* s);
void syncLogSendSyncPreSnapshotReply(SSyncNode* pSyncNode, const SyncPreSnapshotReply* pMsg, const char* s);
void syncLogRecvSyncPreSnapshotReply(SSyncNode* pSyncNode, const SyncPreSnapshotReply* pMsg, const char* s);
void syncLogSendSyncSnapshotSend(SSyncNode* pSyncNode, const SyncSnapshotSend* pMsg, const char* s);
void syncLogRecvSyncSnapshotSend(SSyncNode* pSyncNode, const SyncSnapshotSend* pMsg, const char* s);
void syncLogSendSyncSnapshotRsp(SSyncNode* pSyncNode, const SyncSnapshotRsp* pMsg, const char* s);
void syncLogRecvSyncSnapshotRsp(SSyncNode* pSyncNode, const SyncSnapshotRsp* pMsg, const char* s);
// syncUtil.h
void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, SSyncNode* pNode, const char* format, ...);
void syncPrintSnapshotSenderLog(const char* flags, ELogLevel level, int32_t dflag, SSyncSnapshotSender* pSender,
const char* format, ...);
void syncPrintSnapshotReceiverLog(const char* flags, ELogLevel level, int32_t dflag, SSyncSnapshotReceiver* pReceiver,
const char* format, ...);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -48,6 +48,39 @@ void syncPrintSnapshotSenderLog(const char* flags, ELogLevel level, int32_t dfla ...@@ -48,6 +48,39 @@ void syncPrintSnapshotSenderLog(const char* flags, ELogLevel level, int32_t dfla
void syncPrintSnapshotReceiverLog(const char* flags, ELogLevel level, int32_t dflag, SSyncSnapshotReceiver* pReceiver, void syncPrintSnapshotReceiverLog(const char* flags, ELogLevel level, int32_t dflag, SSyncSnapshotReceiver* pReceiver,
const char* format, ...); const char* format, ...);
void syncLogRecvTimer(SSyncNode* pSyncNode, const SyncTimeout* pMsg, const char* s);
void syncLogRecvLocalCmd(SSyncNode* pSyncNode, const SyncLocalCmd* pMsg, const char* s);
void syncLogSendAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s);
void syncLogRecvAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s);
void syncLogSendHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, const char* s);
void syncLogRecvHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, const char* s);
void syncLogSendHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* pMsg, const char* s);
void syncLogRecvHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* pMsg, const char* s);
void syncLogSendSyncPreSnapshot(SSyncNode* pSyncNode, const SyncPreSnapshot* pMsg, const char* s);
void syncLogRecvSyncPreSnapshot(SSyncNode* pSyncNode, const SyncPreSnapshot* pMsg, const char* s);
void syncLogSendSyncPreSnapshotReply(SSyncNode* pSyncNode, const SyncPreSnapshotReply* pMsg, const char* s);
void syncLogRecvSyncPreSnapshotReply(SSyncNode* pSyncNode, const SyncPreSnapshotReply* pMsg, const char* s);
void syncLogSendSyncSnapshotSend(SSyncNode* pSyncNode, const SyncSnapshotSend* pMsg, const char* s);
void syncLogRecvSyncSnapshotSend(SSyncNode* pSyncNode, const SyncSnapshotSend* pMsg, const char* s);
void syncLogSendSyncSnapshotRsp(SSyncNode* pSyncNode, const SyncSnapshotRsp* pMsg, const char* s);
void syncLogRecvSyncSnapshotRsp(SSyncNode* pSyncNode, const SyncSnapshotRsp* pMsg, const char* s);
void syncLogRecvAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMsg, const char* s);
void syncLogSendAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMsg, const char* s);
void syncLogRecvRequestVote(SSyncNode* pSyncNode, const SyncRequestVote* pMsg, const char* s);
void syncLogSendRequestVote(SSyncNode* pNode, const SyncRequestVote* pMsg, const char* s);
void syncLogRecvRequestVoteReply(SSyncNode* pSyncNode, const SyncRequestVoteReply* pMsg, const char* s);
void syncLogSendRequestVoteReply(SSyncNode* pSyncNode, const SyncRequestVoteReply* pMsg, const char* s);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -124,18 +124,6 @@ int32_t syncNodeFollowerCommit(SSyncNode* ths, SyncIndex newCommitIndex) { ...@@ -124,18 +124,6 @@ int32_t syncNodeFollowerCommit(SSyncNode* ths, SyncIndex newCommitIndex) {
return 0; return 0;
} }
void syncLogRecvAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMsg, const char* s) {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
sNTrace(pSyncNode,
"recv sync-append-entries from %s:%d {term:%" PRId64 ", pre-index:%" PRId64 ", pre-term:%" PRId64
", cmt:%" PRId64 ", pterm:%" PRId64 ", datalen:%d}, %s",
host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->commitIndex, pMsg->privateTerm,
pMsg->dataLen, s);
}
int32_t syncNodeOnAppendEntries(SSyncNode* ths, const SRpcMsg* pRpcMsg) { int32_t syncNodeOnAppendEntries(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
SyncAppendEntries* pMsg = pRpcMsg->pCont; SyncAppendEntries* pMsg = pRpcMsg->pCont;
SRpcMsg rpcRsp = {0}; SRpcMsg rpcRsp = {0};
......
...@@ -15,12 +15,13 @@ ...@@ -15,12 +15,13 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "syncAppendEntriesReply.h" #include "syncAppendEntriesReply.h"
#include "syncMessage.h"
#include "syncCommit.h" #include "syncCommit.h"
#include "syncIndexMgr.h" #include "syncIndexMgr.h"
#include "syncMessage.h"
#include "syncRaftStore.h" #include "syncRaftStore.h"
#include "syncReplication.h" #include "syncReplication.h"
#include "syncSnapshot.h" #include "syncSnapshot.h"
#include "syncUtil.h"
// TLA+ Spec // TLA+ Spec
// HandleAppendEntriesResponse(i, j, m) == // HandleAppendEntriesResponse(i, j, m) ==
......
...@@ -33,14 +33,6 @@ ...@@ -33,14 +33,6 @@
// mdest |-> j]) // mdest |-> j])
// /\ UNCHANGED <<serverVars, candidateVars, leaderVars, logVars>> // /\ UNCHANGED <<serverVars, candidateVars, leaderVars, logVars>>
static void syncLogSendRequestVote(SSyncNode* pNode, const SyncRequestVote* pMsg, const char* s) {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
sNTrace(pNode, "send sync-request-vote to %s:%d {term:%" PRId64 ", lindex:%" PRId64 ", lterm:%" PRId64 "}, %s", host,
port, pMsg->term, pMsg->lastLogIndex, pMsg->lastLogTerm, s);
}
static int32_t syncNodeRequestVotePeers(SSyncNode* pNode) { static int32_t syncNodeRequestVotePeers(SSyncNode* pNode) {
if (pNode->state != TAOS_SYNC_STATE_CANDIDATE) { if (pNode->state != TAOS_SYNC_STATE_CANDIDATE) {
sNTrace(pNode, "not candidate, stop elect"); sNTrace(pNode, "not candidate, stop elect");
......
...@@ -2480,149 +2480,3 @@ bool syncNodeCanChange(SSyncNode* pSyncNode) { ...@@ -2480,149 +2480,3 @@ bool syncNodeCanChange(SSyncNode* pSyncNode) {
return true; return true;
} }
void syncLogRecvTimer(SSyncNode* pSyncNode, const SyncTimeout* pMsg, const char* s) {
sNTrace(pSyncNode, "recv sync-timer {type:%s, lc:%" PRId64 ", ms:%d, data:%p}, %s",
syncTimerTypeStr(pMsg->timeoutType), pMsg->logicClock, pMsg->timerMS, pMsg->data, s);
}
void syncLogSendAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s) {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
sNTrace(pSyncNode,
"send sync-append-entries-reply to %s:%d, {term:%" PRId64 ", pterm:%" PRId64 ", success:%d, match:%" PRId64
"}, %s",
host, port, pMsg->term, pMsg->privateTerm, pMsg->success, pMsg->matchIndex, s);
}
void syncLogRecvAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s) {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
sNTrace(pSyncNode,
"recv sync-append-entries-reply from %s:%d {term:%" PRId64 ", pterm:%" PRId64 ", success:%d, match:%" PRId64
"}, %s",
host, port, pMsg->term, pMsg->privateTerm, pMsg->success, pMsg->matchIndex, s);
}
void syncLogSendHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, const char* s) {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
sNTrace(pSyncNode,
"send sync-heartbeat to %s:%d {term:%" PRId64 ", cmt:%" PRId64 ", min-match:%" PRId64 ", pterm:%" PRId64
"}, %s",
host, port, pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pMsg->privateTerm, s);
}
void syncLogRecvHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, const char* s) {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
sNTrace(pSyncNode,
"recv sync-heartbeat from %s:%d {term:%" PRId64 ", cmt:%" PRId64 ", min-match:%" PRId64 ", pterm:%" PRId64
"}, %s",
host, port, pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pMsg->privateTerm, s);
}
void syncLogSendHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* pMsg, const char* s) {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
sNTrace(pSyncNode, "send sync-heartbeat-reply from %s:%d {term:%" PRId64 ", pterm:%" PRId64 "}, %s", host, port,
pMsg->term, pMsg->privateTerm, s);
}
void syncLogRecvHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* pMsg, const char* s) {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
sNTrace(pSyncNode, "recv sync-heartbeat-reply from %s:%d {term:%" PRId64 ", pterm:%" PRId64 "}, %s", host, port,
pMsg->term, pMsg->privateTerm, s);
}
void syncLogRecvLocalCmd(SSyncNode* pSyncNode, const SyncLocalCmd* pMsg, const char* s) {
sNTrace(pSyncNode, "recv sync-local-cmd {cmd:%d-%s, sd-new-term:%" PRId64 ", fc-index:%" PRId64 "}, %s", pMsg->cmd,
syncLocalCmdGetStr(pMsg->cmd), pMsg->sdNewTerm, pMsg->fcIndex, s);
}
void syncLogSendSyncPreSnapshot(SSyncNode* pSyncNode, const SyncPreSnapshot* pMsg, const char* s) {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
sNTrace(pSyncNode, "send sync-pre-snapshot to %s:%d {term:%" PRId64 "}, %s", host, port, pMsg->term, s);
}
void syncLogRecvSyncPreSnapshot(SSyncNode* pSyncNode, const SyncPreSnapshot* pMsg, const char* s) {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
sNTrace(pSyncNode, "recv sync-pre-snapshot from %s:%d {term:%" PRId64 "}, %s", host, port, pMsg->term, s);
}
void syncLogSendSyncPreSnapshotReply(SSyncNode* pSyncNode, const SyncPreSnapshotReply* pMsg, const char* s) {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
sNTrace(pSyncNode, "send sync-pre-snapshot-reply to %s:%d {term:%" PRId64 ", snap-start:%" PRId64 "}, %s", host, port,
pMsg->term, pMsg->snapStart, s);
}
void syncLogRecvSyncPreSnapshotReply(SSyncNode* pSyncNode, const SyncPreSnapshotReply* pMsg, const char* s) {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
sNTrace(pSyncNode, "recv sync-pre-snapshot-reply from %s:%d {term:%" PRId64 ", snap-start:%" PRId64 "}, %s", host,
port, pMsg->term, pMsg->snapStart, s);
}
void syncLogSendSyncSnapshotSend(SSyncNode* pSyncNode, const SyncSnapshotSend* pMsg, const char* s) {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
sNTrace(pSyncNode,
"send sync-snapshot-send from %s:%d {term:%" PRId64 ", begin:%" PRId64 ", end:%" PRId64 ", lterm:%" PRId64
", stime:%" PRId64 ", seq:%d}, %s",
host, port, pMsg->term, pMsg->beginIndex, pMsg->lastIndex, pMsg->lastTerm, pMsg->startTime, pMsg->seq, s);
}
void syncLogRecvSyncSnapshotSend(SSyncNode* pSyncNode, const SyncSnapshotSend* pMsg, const char* s) {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
sNTrace(pSyncNode,
"recv sync-snapshot-send from %s:%d {term:%" PRId64 ", begin:%" PRId64 ", lst:%" PRId64 ", lterm:%" PRId64
", stime:%" PRId64 ", seq:%d, len:%u}, %s",
host, port, pMsg->term, pMsg->beginIndex, pMsg->lastIndex, pMsg->lastTerm, pMsg->startTime, pMsg->seq,
pMsg->dataLen, s);
}
void syncLogSendSyncSnapshotRsp(SSyncNode* pSyncNode, const SyncSnapshotRsp* pMsg, const char* s) {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
sNTrace(pSyncNode,
"send sync-snapshot-rsp from %s:%d {term:%" PRId64 ", begin:%" PRId64 ", lst:%" PRId64 ", lterm:%" PRId64
", stime:%" PRId64 ", ack:%d}, %s",
host, port, pMsg->term, pMsg->snapBeginIndex, pMsg->lastIndex, pMsg->lastTerm, pMsg->startTime, pMsg->ack, s);
}
void syncLogRecvSyncSnapshotRsp(SSyncNode* pSyncNode, const SyncSnapshotRsp* pMsg, const char* s) {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
sNTrace(pSyncNode,
"recv sync-snapshot-rsp from %s:%d {term:%" PRId64 ", begin:%" PRId64 ", lst:%" PRId64 ", lterm:%" PRId64
", stime:%" PRId64 ", ack:%d}, %s",
host, port, pMsg->term, pMsg->snapBeginIndex, pMsg->lastIndex, pMsg->lastTerm, pMsg->startTime, pMsg->ack, s);
}
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
#include "syncRaftLog.h" #include "syncRaftLog.h"
#include "syncRaftCfg.h" #include "syncRaftCfg.h"
#include "syncRaftStore.h" #include "syncRaftStore.h"
#include "syncUtil.h"
// log[m .. n] // log[m .. n]
......
...@@ -137,17 +137,6 @@ int32_t syncNodeReplicate(SSyncNode* pSyncNode) { ...@@ -137,17 +137,6 @@ int32_t syncNodeReplicate(SSyncNode* pSyncNode) {
return 0; return 0;
} }
static void syncLogSendAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMsg, const char* s) {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
sNTrace(pSyncNode,
"send sync-append-entries to %s:%d, {term:%" PRId64 ", pre-index:%" PRId64 ", pre-term:%" PRId64
", pterm:%" PRId64 ", cmt:%" PRId64 ", datalen:%d}, %s",
host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->privateTerm, pMsg->commitIndex,
pMsg->dataLen, s);
}
int32_t syncNodeSendAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, SRpcMsg* pRpcMsg) { int32_t syncNodeSendAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, SRpcMsg* pRpcMsg) {
int32_t ret = 0; int32_t ret = 0;
SyncAppendEntries* pMsg = pRpcMsg->pCont; SyncAppendEntries* pMsg = pRpcMsg->pCont;
......
...@@ -88,23 +88,6 @@ static bool syncNodeOnRequestVoteLogOK(SSyncNode* pSyncNode, SyncRequestVote* pM ...@@ -88,23 +88,6 @@ static bool syncNodeOnRequestVoteLogOK(SSyncNode* pSyncNode, SyncRequestVote* pM
return false; return false;
} }
static void syncLogRecvRequestVote(SSyncNode* pSyncNode, const SyncRequestVote* pMsg, const char* s) {
char logBuf[256];
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
sNTrace(pSyncNode, "recv sync-request-vote from %s:%d, {term:%" PRId64 ", lindex:%" PRId64 ", lterm:%" PRId64 "}, %s",
host, port, pMsg->term, pMsg->lastLogIndex, pMsg->lastLogTerm, s);
}
static void syncLogSendRequestVoteReply(SSyncNode* pSyncNode, const SyncRequestVoteReply* pMsg, const char* s) {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
sNTrace(pSyncNode, "send sync-request-vote-reply to %s:%d {term:%" PRId64 ", grant:%d}, %s", host, port, pMsg->term,
pMsg->voteGranted, s);
}
int32_t syncNodeOnRequestVote(SSyncNode* ths, const SRpcMsg* pRpcMsg) { int32_t syncNodeOnRequestVote(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
int32_t ret = 0; int32_t ret = 0;
SyncRequestVote* pMsg = pRpcMsg->pCont; SyncRequestVote* pMsg = pRpcMsg->pCont;
......
...@@ -38,14 +38,6 @@ ...@@ -38,14 +38,6 @@
// /\ UNCHANGED <<serverVars, votedFor, leaderVars, logVars>> // /\ UNCHANGED <<serverVars, votedFor, leaderVars, logVars>>
// //
static void syncLogRecvRequestVoteReply(SSyncNode* pSyncNode, const SyncRequestVoteReply* pMsg, const char* s) {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
sNTrace(pSyncNode, "recv sync-request-vote-reply from %s:%d {term:%" PRId64 ", grant:%d}, %s", host, port, pMsg->term,
pMsg->voteGranted, s);
}
int32_t syncNodeOnRequestVoteReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) { int32_t syncNodeOnRequestVoteReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
int32_t ret = 0; int32_t ret = 0;
SyncRequestVoteReply* pMsg = pRpcMsg->pCont; SyncRequestVoteReply* pMsg = pRpcMsg->pCont;
......
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
#include "syncRespMgr.h" #include "syncRespMgr.h"
#include "syncRaftEntry.h" #include "syncRaftEntry.h"
#include "syncRaftStore.h" #include "syncRaftStore.h"
#include "syncUtil.h"
SSyncRespMgr *syncRespMgrCreate(void *data, int64_t ttl) { SSyncRespMgr *syncRespMgrCreate(void *data, int64_t ttl) {
SSyncRespMgr *pObj = taosMemoryCalloc(1, sizeof(SSyncRespMgr)); SSyncRespMgr *pObj = taosMemoryCalloc(1, sizeof(SSyncRespMgr));
......
...@@ -19,6 +19,7 @@ ...@@ -19,6 +19,7 @@
#include "syncRaftCfg.h" #include "syncRaftCfg.h"
#include "syncRaftLog.h" #include "syncRaftLog.h"
#include "syncReplication.h" #include "syncReplication.h"
#include "syncUtil.h"
static void syncNodeCleanConfigIndex(SSyncNode* ths) { static void syncNodeCleanConfigIndex(SSyncNode* ths) {
int32_t newArrIndex = 0; int32_t newArrIndex = 0;
......
...@@ -15,6 +15,7 @@ ...@@ -15,6 +15,7 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "syncUtil.h" #include "syncUtil.h"
#include "syncMessage.h"
#include "syncRaftCfg.h" #include "syncRaftCfg.h"
#include "syncRaftStore.h" #include "syncRaftStore.h"
#include "syncSnapshot.h" #include "syncSnapshot.h"
...@@ -345,3 +346,205 @@ void syncPrintSnapshotReceiverLog(const char* flags, ELogLevel level, int32_t df ...@@ -345,3 +346,205 @@ void syncPrintSnapshotReceiverLog(const char* flags, ELogLevel level, int32_t df
pNode->replicaNum, pNode->pRaftCfg->lastConfigIndex, pNode->changing, pNode->restoreFinish, quorum, pNode->replicaNum, pNode->pRaftCfg->lastConfigIndex, pNode->changing, pNode->restoreFinish, quorum,
pNode->electTimerLogicClock, pNode->heartbeatTimerLogicClockUser, peerStr, cfgStr); pNode->electTimerLogicClock, pNode->heartbeatTimerLogicClockUser, peerStr, cfgStr);
} }
void syncLogRecvTimer(SSyncNode* pSyncNode, const SyncTimeout* pMsg, const char* s) {
sNTrace(pSyncNode, "recv sync-timer {type:%s, lc:%" PRId64 ", ms:%d, data:%p}, %s",
syncTimerTypeStr(pMsg->timeoutType), pMsg->logicClock, pMsg->timerMS, pMsg->data, s);
}
void syncLogRecvLocalCmd(SSyncNode* pSyncNode, const SyncLocalCmd* pMsg, const char* s) {
sNTrace(pSyncNode, "recv sync-local-cmd {cmd:%d-%s, sd-new-term:%" PRId64 ", fc-index:%" PRId64 "}, %s", pMsg->cmd,
syncLocalCmdGetStr(pMsg->cmd), pMsg->sdNewTerm, pMsg->fcIndex, s);
}
void syncLogSendAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s) {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
sNTrace(pSyncNode,
"send sync-append-entries-reply to %s:%d, {term:%" PRId64 ", pterm:%" PRId64 ", success:%d, match:%" PRId64
"}, %s",
host, port, pMsg->term, pMsg->privateTerm, pMsg->success, pMsg->matchIndex, s);
}
void syncLogRecvAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s) {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
sNTrace(pSyncNode,
"recv sync-append-entries-reply from %s:%d {term:%" PRId64 ", pterm:%" PRId64 ", success:%d, match:%" PRId64
"}, %s",
host, port, pMsg->term, pMsg->privateTerm, pMsg->success, pMsg->matchIndex, s);
}
void syncLogSendHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, const char* s) {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
sNTrace(pSyncNode,
"send sync-heartbeat to %s:%d {term:%" PRId64 ", cmt:%" PRId64 ", min-match:%" PRId64 ", pterm:%" PRId64
"}, %s",
host, port, pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pMsg->privateTerm, s);
}
void syncLogRecvHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, const char* s) {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
sNTrace(pSyncNode,
"recv sync-heartbeat from %s:%d {term:%" PRId64 ", cmt:%" PRId64 ", min-match:%" PRId64 ", pterm:%" PRId64
"}, %s",
host, port, pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pMsg->privateTerm, s);
}
void syncLogSendHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* pMsg, const char* s) {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
sNTrace(pSyncNode, "send sync-heartbeat-reply from %s:%d {term:%" PRId64 ", pterm:%" PRId64 "}, %s", host, port,
pMsg->term, pMsg->privateTerm, s);
}
void syncLogRecvHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* pMsg, const char* s) {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
sNTrace(pSyncNode, "recv sync-heartbeat-reply from %s:%d {term:%" PRId64 ", pterm:%" PRId64 "}, %s", host, port,
pMsg->term, pMsg->privateTerm, s);
}
void syncLogSendSyncPreSnapshot(SSyncNode* pSyncNode, const SyncPreSnapshot* pMsg, const char* s) {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
sNTrace(pSyncNode, "send sync-pre-snapshot to %s:%d {term:%" PRId64 "}, %s", host, port, pMsg->term, s);
}
void syncLogRecvSyncPreSnapshot(SSyncNode* pSyncNode, const SyncPreSnapshot* pMsg, const char* s) {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
sNTrace(pSyncNode, "recv sync-pre-snapshot from %s:%d {term:%" PRId64 "}, %s", host, port, pMsg->term, s);
}
void syncLogSendSyncPreSnapshotReply(SSyncNode* pSyncNode, const SyncPreSnapshotReply* pMsg, const char* s) {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
sNTrace(pSyncNode, "send sync-pre-snapshot-reply to %s:%d {term:%" PRId64 ", snap-start:%" PRId64 "}, %s", host, port,
pMsg->term, pMsg->snapStart, s);
}
void syncLogRecvSyncPreSnapshotReply(SSyncNode* pSyncNode, const SyncPreSnapshotReply* pMsg, const char* s) {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
sNTrace(pSyncNode, "recv sync-pre-snapshot-reply from %s:%d {term:%" PRId64 ", snap-start:%" PRId64 "}, %s", host,
port, pMsg->term, pMsg->snapStart, s);
}
void syncLogSendSyncSnapshotSend(SSyncNode* pSyncNode, const SyncSnapshotSend* pMsg, const char* s) {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
sNTrace(pSyncNode,
"send sync-snapshot-send from %s:%d {term:%" PRId64 ", begin:%" PRId64 ", end:%" PRId64 ", lterm:%" PRId64
", stime:%" PRId64 ", seq:%d}, %s",
host, port, pMsg->term, pMsg->beginIndex, pMsg->lastIndex, pMsg->lastTerm, pMsg->startTime, pMsg->seq, s);
}
void syncLogRecvSyncSnapshotSend(SSyncNode* pSyncNode, const SyncSnapshotSend* pMsg, const char* s) {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
sNTrace(pSyncNode,
"recv sync-snapshot-send from %s:%d {term:%" PRId64 ", begin:%" PRId64 ", lst:%" PRId64 ", lterm:%" PRId64
", stime:%" PRId64 ", seq:%d, len:%u}, %s",
host, port, pMsg->term, pMsg->beginIndex, pMsg->lastIndex, pMsg->lastTerm, pMsg->startTime, pMsg->seq,
pMsg->dataLen, s);
}
void syncLogSendSyncSnapshotRsp(SSyncNode* pSyncNode, const SyncSnapshotRsp* pMsg, const char* s) {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
sNTrace(pSyncNode,
"send sync-snapshot-rsp from %s:%d {term:%" PRId64 ", begin:%" PRId64 ", lst:%" PRId64 ", lterm:%" PRId64
", stime:%" PRId64 ", ack:%d}, %s",
host, port, pMsg->term, pMsg->snapBeginIndex, pMsg->lastIndex, pMsg->lastTerm, pMsg->startTime, pMsg->ack, s);
}
void syncLogRecvSyncSnapshotRsp(SSyncNode* pSyncNode, const SyncSnapshotRsp* pMsg, const char* s) {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
sNTrace(pSyncNode,
"recv sync-snapshot-rsp from %s:%d {term:%" PRId64 ", begin:%" PRId64 ", lst:%" PRId64 ", lterm:%" PRId64
", stime:%" PRId64 ", ack:%d}, %s",
host, port, pMsg->term, pMsg->snapBeginIndex, pMsg->lastIndex, pMsg->lastTerm, pMsg->startTime, pMsg->ack, s);
}
void syncLogRecvAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMsg, const char* s) {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
sNTrace(pSyncNode,
"recv sync-append-entries from %s:%d {term:%" PRId64 ", pre-index:%" PRId64 ", pre-term:%" PRId64
", cmt:%" PRId64 ", pterm:%" PRId64 ", datalen:%d}, %s",
host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->commitIndex, pMsg->privateTerm,
pMsg->dataLen, s);
}
void syncLogSendAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMsg, const char* s) {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
sNTrace(pSyncNode,
"send sync-append-entries to %s:%d, {term:%" PRId64 ", pre-index:%" PRId64 ", pre-term:%" PRId64
", pterm:%" PRId64 ", cmt:%" PRId64 ", datalen:%d}, %s",
host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->privateTerm, pMsg->commitIndex,
pMsg->dataLen, s);
}
void syncLogRecvRequestVote(SSyncNode* pSyncNode, const SyncRequestVote* pMsg, const char* s) {
char logBuf[256];
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
sNTrace(pSyncNode, "recv sync-request-vote from %s:%d, {term:%" PRId64 ", lindex:%" PRId64 ", lterm:%" PRId64 "}, %s",
host, port, pMsg->term, pMsg->lastLogIndex, pMsg->lastLogTerm, s);
}
void syncLogSendRequestVote(SSyncNode* pNode, const SyncRequestVote* pMsg, const char* s) {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
sNTrace(pNode, "send sync-request-vote to %s:%d {term:%" PRId64 ", lindex:%" PRId64 ", lterm:%" PRId64 "}, %s", host,
port, pMsg->term, pMsg->lastLogIndex, pMsg->lastLogTerm, s);
}
void syncLogRecvRequestVoteReply(SSyncNode* pSyncNode, const SyncRequestVoteReply* pMsg, const char* s) {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
sNTrace(pSyncNode, "recv sync-request-vote-reply from %s:%d {term:%" PRId64 ", grant:%d}, %s", host, port, pMsg->term,
pMsg->voteGranted, s);
}
void syncLogSendRequestVoteReply(SSyncNode* pSyncNode, const SyncRequestVoteReply* pMsg, const char* s) {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
sNTrace(pSyncNode, "send sync-request-vote-reply to %s:%d {term:%" PRId64 ", grant:%d}, %s", host, port, pMsg->term,
pMsg->voteGranted, s);
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册